I'm currently facing a challenging issue while working on a Dart application that reads messages over the Zenoh protocol. Here's a breakdown of my problem:
The problem arises when I introduce the Zenoh library in my C code, which creates a "subscriber." This subscriber spawns threads whenever messages are received. These threads are responsible for handling the incoming messages and should callback the contents to my Dart code.
However, when I try to invoke a Dart callback function from these threads, I encounter the following error:
error: Cannot invoke native callback outside an isolate.
In an attempt to solve this issue, I decided to use Dart:isolate and utilize send/ReceivePorts to send the data from the C threads. Unfortunately, this approach resulted in a different error:
error: Dart_NewSendPort expects there to be a current isolate. Did you forget to call Dart_CreateIsolateGroup or Dart_EnterIsolate?
I'm unsure if I'm approaching the problem correctly, and I'm not entirely sure if I fully understand it. I've dedicated nearly four days straight to resolving this issue, and I'm feeling quite stuck and frustrated.
I would greatly appreciate any insights, suggestions, or guidance on how to overcome this obstacle and successfully invoke Dart callbacks from the Zenoh C threads. Thank you in advance for your assistance!
Dart Code:
import 'dart:ffi';
import 'package:ffi/ffi.dart';
import 'dart:isolate';
// typedef Request Callback
typedef RequestCallbackC = Void Function(Int32 port);
typedef RequestCallbackDart = void Function(int port);
class CWrapperZenoh{
late final DynamicLibrary _zenohLib;
late final RequestCallbackDart _requestCallbackDart;
// for native c callback
late final ReceivePort _receivePort;
late final SendPort _sendPort;
CWrapperZenoh(){
_zenohLib = DynamicLibrary.open("bin/include/lib_c_lib.so");
_requestCallbackDart = _zenohLib.lookup<NativeFunction<RequestCallbackC>>
('request_callback').asFunction();
_receivePort = ReceivePort();
_sendPort = _receivePort.sendPort;
// listen
_receivePort.listen((message) {
print("listen message\n");
if (message is String) {
dartCallbackString(message);
} else if (message is int){
print(message);
}
});
}
// application calls this to initiate
callFunctions(){
print("sendport: ${_sendPort.nativePort}");
_requestCallbackDart(_sendPort.nativePort);
}
static void dartCallbackString(String s) {
print("Received string from C: $s\n");
}
}
C library:
#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "zenoh.h"
#include "include/dart_api.h"
#include "include/dart_api_dl.h"
#include <pthread.h>
// define Dart callback function
typedef void (*DartFunction)(const char*);
z_owned_config_t* config_ptr;
z_owned_session_t* sessionPtr;
z_owned_subscriber_t* sub;
z_owned_closure_sample_t callback;
Dart_Port dart_port;
DartFunction dartFunction;
void callbackFuncToDart(const z_sample_t *sample, void *arg);
void request_callback(Dart_Port port){
dart_port = port;
// config
config_ptr = malloc(sizeof(z_owned_config_t));
if (config_ptr != NULL) {
*config_ptr = zc_config_from_file("bin/include/DEFAULT_CONFIG_CONNECT.json5");
}
printf("config created\n");
fflush(stdout);
// open session
sessionPtr = (z_owned_session_t*)malloc(sizeof(z_owned_session_t));
*sessionPtr = z_open(config_ptr);
// subscribing
z_owned_closure_sample_t callback = z_closure(callbackFuncToDart);
sub = malloc(sizeof(z_owned_subscriber_t));
*sub = z_declare_subscriber(z_loan(*sessionPtr), z_keyexpr("geometry_msgs/msg/actualSpeed"), z_move(callback), NULL);
}
// callback to Dart
void callbackFuncToDart(const z_sample_t *sample, void *arg) {
// Create a SendPort to communicate with Dart isolate
Dart_Handle sendPort = Dart_NewSendPort(dart_port);
// Check if creating the SendPort was successful
if (!Dart_IsError(sendPort)) {
Dart_CObject message;
message.type = Dart_CObject_kInt32;
message.value.as_int32 = 1;
// Send the message using the SendPort
if (Dart_PostCObject(dart_port, &message)){
printf("Message sent\n");
fflush(stdout);
} else {
printf("Failed to create SendPort\n");
fflush(stdout);
}
}
//--Removed old irelevant code. comment relevant for my first threaded callback attempt--
//dartFunction(str); // different thread id than Darts "Main isolate". therefore i get the following: "error: Cannot invoke native callback outside an isolate."
}
A lot have happened since i posted the question and im not sure what caused this exact issue. nonetheless i made it work with the following changes.
In my C library i expose this function
// Initialize `dart_api_dl.h`
DART_EXPORT intptr_t InitDartApiDL(void* data) {
return Dart_InitializeApiDL(data);
}
And then i call it in my Dart constructor
ZenohWrapper() {
_zenohLib = DynamicLibrary.open('dirToLib/lib.so');
_initializeZenohFunctions();
final initializeApi = _zenohLib.lookupFunction<IntPtr Function(Pointer<Void>), int Function(Pointer<Void>)>("InitDartApiDL");
}