androidc++kotlinjava-native-interfacejniwrapper

JNI - Weird behavior using RxJava in native cpp callback function


I have a c++ function that I want to call from my Kotlin code. That c++ function gets a callback function as an argument, doing some work and calls the callback when completes.

I already done it a few times before and everything was OK. However, I want to wrap it in a way so instead of passing a callback, it will return an Observable that will emit a value when the callback is called.

I created an example with a simpler code. What I did so far:

Kotlin code:

fun someFunc(str: String): Observable<String> {
    val subject = PublishSubject.create<String>()
    nativeFunc(object: TestCallback {
        override fun invoke(event: String) {
            println("Callback invoked. subject = $subject")
            subject.onNext("$event - $str")
        }
    })
    return subject
}

private external fun nativeFunc(callback: TestCallback)

Interface in Kotlin for the callback function:


interface TestCallback {
    fun invoke(event: String)
}

Native JNI code:

extern "C"
JNIEXPORT void JNICALL
Java_com_myProject_TestClass_nativeFunc(JNIEnv *env, jobject thiz, jobject callback) {
    env->GetJavaVM(&g_vm);
    auto g_callback = env->NewGlobalRef(callback);

    std::function<void()> * pCompletion = new std::function<void()>([g_callback]() {
        JNIEnv *newEnv = GetJniEnv();
        jclass callbackClazz = newEnv->FindClass("com/myproject/TestCallback");
        jmethodID invokeMethod = newEnv->GetMethodID(callbackClazz, "invoke", "(Ljava/lang/String;)V");
        string callbackStr = "Callback called";
        newEnv->CallVoidMethod(g_callback, invokeMethod, newEnv->NewStringUTF(callbackStr.c_str()));
        newEnv->DeleteGlobalRef(g_callback);
    });
    pCompletion->operator()(); // <--Similar function is passed to the c++ function. Lets skip that
}

A test function to run it all together

@Test
fun testSubject() {
    val testClass = TestClass()
    val someList = listOf("a", "b", "c")
    var done = false
    Observable.concat(someList.map { testClass.someFunc(it) })
        .take(3)
        .doOnNext { println("got next: $it") }
        .doOnComplete { done = true }
        .subscribe()
    while (!done);
}

The test function runs 3 times the someFunc function (which return an Observable instance, emitting a String on completion) and concat all Observables together.

What I would expect to be printed:

Callback invoked. subject = io.reactivex.subjects.PublishSubject@1f7acc8
got next: Callback called - a
Callback invoked. subject = io.reactivex.subjects.PublishSubject@7c9b161
got next: Callback called - b
Callback invoked. subject = io.reactivex.subjects.PublishSubject@6f24486
got next: Callback called - c

However the actual result is:

Callback invoked. subject = io.reactivex.subjects.PublishSubject@1f7acc8
Callback invoked. subject = io.reactivex.subjects.PublishSubject@7c9b161
Callback invoked. subject = io.reactivex.subjects.PublishSubject@6f24486

It seems like everything work as expected, however, although the line println("Callback invoked. subject = $subject") is printed (with the correct subject addresses), the onNext is not working and not emitting anything for some reason. I checked the same functionality without the native callback stuff and everything works fine.

Any suggestions???


Solution

  • So after some research I found:

    1. When I call a C/C++ function from Java, the JNI does not create any new thread behind the scene. [see here]. Hence,
    2. The code runs synchronously, meaning - the subject emits an item and then the function returns the subject and being subscribed. So after the subscription, it missed the emitted item and lost it.
    3. I was wrong saying "I checked the same functionality without the native callback stuff and everything works fine.". I probably made a mistake there that made the non-native code asynchronous which gave me the returned subject "on-time" and printed the logs as expected.

    The solution was to change the PublishSubject into a BehaviorSubject or ReplaySubject to cache the emitted item and get it once subscribed. Another solution could be switching the call to the native function to run in another thread, so while the function run, the subject is already returned and being subscribed.