1

I have a c++ function that I want to call from my Kotlin code. That c++ function gets a callback function as an argument, doing some work and calls the callback when completes.

I already done it a few times before and everything was OK. However, I want to wrap it in a way so instead of passing a callback, it will return an Observable that will emit a value when the callback is called.

I created an example with a simpler code. What I did so far:

Kotlin code:

fun someFunc(str: String): Observable<String> {
    val subject = PublishSubject.create<String>()
    nativeFunc(object: TestCallback {
        override fun invoke(event: String) {
            println("Callback invoked. subject = $subject")
            subject.onNext("$event - $str")
        }
    })
    return subject
}

private external fun nativeFunc(callback: TestCallback)

Interface in Kotlin for the callback function:


interface TestCallback {
    fun invoke(event: String)
}

Native JNI code:

extern "C"
JNIEXPORT void JNICALL
Java_com_myProject_TestClass_nativeFunc(JNIEnv *env, jobject thiz, jobject callback) {
    env->GetJavaVM(&g_vm);
    auto g_callback = env->NewGlobalRef(callback);

    std::function<void()> * pCompletion = new std::function<void()>([g_callback]() {
        JNIEnv *newEnv = GetJniEnv();
        jclass callbackClazz = newEnv->FindClass("com/myproject/TestCallback");
        jmethodID invokeMethod = newEnv->GetMethodID(callbackClazz, "invoke", "(Ljava/lang/String;)V");
        string callbackStr = "Callback called";
        newEnv->CallVoidMethod(g_callback, invokeMethod, newEnv->NewStringUTF(callbackStr.c_str()));
        newEnv->DeleteGlobalRef(g_callback);
    });
    pCompletion->operator()(); // <--Similar function is passed to the c++ function. Lets skip that
}

A test function to run it all together

@Test
fun testSubject() {
    val testClass = TestClass()
    val someList = listOf("a", "b", "c")
    var done = false
    Observable.concat(someList.map { testClass.someFunc(it) })
        .take(3)
        .doOnNext { println("got next: $it") }
        .doOnComplete { done = true }
        .subscribe()
    while (!done);
}

The test function runs 3 times the someFunc function (which return an Observable instance, emitting a String on completion) and concat all Observables together.

What I would expect to be printed:

Callback invoked. subject = io.reactivex.subjects.PublishSubject@1f7acc8
got next: Callback called - a
Callback invoked. subject = io.reactivex.subjects.PublishSubject@7c9b161
got next: Callback called - b
Callback invoked. subject = io.reactivex.subjects.PublishSubject@6f24486
got next: Callback called - c

However the actual result is:

Callback invoked. subject = io.reactivex.subjects.PublishSubject@1f7acc8
Callback invoked. subject = io.reactivex.subjects.PublishSubject@7c9b161
Callback invoked. subject = io.reactivex.subjects.PublishSubject@6f24486

It seems like everything work as expected, however, although the line println("Callback invoked. subject = $subject") is printed (with the correct subject addresses), the onNext is not working and not emitting anything for some reason. I checked the same functionality without the native callback stuff and everything works fine.

Any suggestions???

lior_13
  • 577
  • 7
  • 18
  • 1
    Suggestion: get rid of Kotlin and work with Java. – Hack06 Jan 18 '21 at 16:15
  • 2
    How is that suppose to help with the problem? – lior_13 Jan 18 '21 at 16:17
  • Is there a possibility that you're blocking execution of that print? What happens if you remove that `while` loop? – Michael Jan 18 '21 at 17:21
  • @Michael The test will end. Similar functionality without native code work as expected. Also, the callback is being called and the log is printed, but I don't see the emission of the onNext – lior_13 Jan 19 '21 at 06:52
  • Try printing `$event` check if any error occurs and is not caught. And btw I'd highly suggest to use (flows from) `kotlinx.coroutines` instead of (observables from) rx, its more lightweight and has better API. – Animesh Sahu Jan 19 '21 at 08:46

1 Answers1

0

So after some research I found:

  1. When I call a C/C++ function from Java, the JNI does not create any new thread behind the scene. [see here]. Hence,
  2. The code runs synchronously, meaning - the subject emits an item and then the function returns the subject and being subscribed. So after the subscription, it missed the emitted item and lost it.
  3. I was wrong saying "I checked the same functionality without the native callback stuff and everything works fine.". I probably made a mistake there that made the non-native code asynchronous which gave me the returned subject "on-time" and printed the logs as expected.

The solution was to change the PublishSubject into a BehaviorSubject or ReplaySubject to cache the emitted item and get it once subscribed. Another solution could be switching the call to the native function to run in another thread, so while the function run, the subject is already returned and being subscribed.

lior_13
  • 577
  • 7
  • 18