cdart-isolatesdart-ffizenoh

Invoking Dart Callbacks from Zenoh C Threads - Isolate and Callback Error


I'm currently facing a challenging issue while working on a Dart application that reads messages over the Zenoh protocol. Here's a breakdown of my problem:

  1. My Dart code executes native C code via Dart:ffi. (without issue)
  2. The C code then calllbacks the Dart code with data (without issue for direct callback)

The problem arises when I introduce the Zenoh library in my C code, which creates a "subscriber." This subscriber spawns threads whenever messages are received. These threads are responsible for handling the incoming messages and should callback the contents to my Dart code.

However, when I try to invoke a Dart callback function from these threads, I encounter the following error: error: Cannot invoke native callback outside an isolate.

In an attempt to solve this issue, I decided to use Dart:isolate and utilize send/ReceivePorts to send the data from the C threads. Unfortunately, this approach resulted in a different error: error: Dart_NewSendPort expects there to be a current isolate. Did you forget to call Dart_CreateIsolateGroup or Dart_EnterIsolate?

I'm unsure if I'm approaching the problem correctly, and I'm not entirely sure if I fully understand it. I've dedicated nearly four days straight to resolving this issue, and I'm feeling quite stuck and frustrated.

I would greatly appreciate any insights, suggestions, or guidance on how to overcome this obstacle and successfully invoke Dart callbacks from the Zenoh C threads. Thank you in advance for your assistance!


Dart Code:

import 'dart:ffi';
import 'package:ffi/ffi.dart';
import 'dart:isolate';

// typedef Request Callback 
typedef RequestCallbackC = Void Function(Int32 port);
typedef RequestCallbackDart = void Function(int port);

class CWrapperZenoh{
  late final DynamicLibrary _zenohLib;
  late final RequestCallbackDart _requestCallbackDart;
  // for native c callback
  late final ReceivePort _receivePort;
  late final SendPort _sendPort;

  CWrapperZenoh(){
    _zenohLib = DynamicLibrary.open("bin/include/lib_c_lib.so");
    _requestCallbackDart = _zenohLib.lookup<NativeFunction<RequestCallbackC>> 
      ('request_callback').asFunction();

    _receivePort = ReceivePort();
    _sendPort = _receivePort.sendPort;

    // listen
    _receivePort.listen((message) {
      print("listen message\n");
      if (message is String) {
        dartCallbackString(message);
      } else if (message is int){
        print(message);
      }
    });
  }

// application calls this to initiate
callFunctions(){
  print("sendport: ${_sendPort.nativePort}");
  _requestCallbackDart(_sendPort.nativePort);
}

static void dartCallbackString(String s) {
  print("Received string from C: $s\n");
}

}


C library:

    #include <string.h>
    #include <stdio.h>
    #include <unistd.h>
    #include "zenoh.h"
    #include "include/dart_api.h"
    #include "include/dart_api_dl.h"
    #include <pthread.h>

    // define Dart callback function
    typedef void (*DartFunction)(const char*);

    z_owned_config_t* config_ptr;
    z_owned_session_t* sessionPtr;
    z_owned_subscriber_t* sub;
    z_owned_closure_sample_t callback;
    Dart_Port dart_port;

    DartFunction dartFunction;

    
    void callbackFuncToDart(const z_sample_t *sample, void *arg);

    void request_callback(Dart_Port port){
        dart_port = port;

        // config
        config_ptr = malloc(sizeof(z_owned_config_t));
        if (config_ptr != NULL) {
            *config_ptr = zc_config_from_file("bin/include/DEFAULT_CONFIG_CONNECT.json5");
        }
        printf("config created\n");
        fflush(stdout);


        // open session
        sessionPtr = (z_owned_session_t*)malloc(sizeof(z_owned_session_t));
        *sessionPtr = z_open(config_ptr);
        
        // subscribing
        z_owned_closure_sample_t callback = z_closure(callbackFuncToDart);
        
        sub = malloc(sizeof(z_owned_subscriber_t));
        *sub = z_declare_subscriber(z_loan(*sessionPtr), z_keyexpr("geometry_msgs/msg/actualSpeed"), z_move(callback), NULL);
    }

    // callback to Dart
    void callbackFuncToDart(const z_sample_t *sample, void *arg) {
        
        // Create a SendPort to communicate with Dart isolate
        Dart_Handle sendPort = Dart_NewSendPort(dart_port);

        // Check if creating the SendPort was successful
        if (!Dart_IsError(sendPort)) {
            Dart_CObject message;
            message.type = Dart_CObject_kInt32;
            message.value.as_int32 = 1;

            // Send the message using the SendPort
            if (Dart_PostCObject(dart_port, &message)){
                printf("Message sent\n");
                fflush(stdout);
            } else {
                printf("Failed to create SendPort\n");
                fflush(stdout);
            }   
        }

        //--Removed old irelevant code. comment relevant for my first threaded callback attempt--
        //dartFunction(str);    // different thread id than Darts "Main isolate". therefore i get the following: "error: Cannot invoke native callback outside an isolate."
                                            
    }

Solution

  • A lot have happened since i posted the question and im not sure what caused this exact issue. nonetheless i made it work with the following changes.

    In my C library i expose this function

    // Initialize `dart_api_dl.h`
    DART_EXPORT intptr_t InitDartApiDL(void* data) {
      return Dart_InitializeApiDL(data);
    }
    

    And then i call it in my Dart constructor

    ZenohWrapper() {
      _zenohLib = DynamicLibrary.open('dirToLib/lib.so');
      _initializeZenohFunctions();
      final initializeApi = _zenohLib.lookupFunction<IntPtr Function(Pointer<Void>), int Function(Pointer<Void>)>("InitDartApiDL");
    }