Kotlin suspend
functions should be nonblocking by convention (1). Often we have old Java code which relies on java Thread interruption mechanism, which we cannot (don't want to) modif (2):
public void doSomething(String arg) {
for (int i = 0; i < 100_000; i++) {
heavyCrunch(arg, i);
if (Thread.interrupted()) {
// We've been interrupted: no more crunching.
return;
}
}
}
What is the best way to adapt this code for usage in coroutines?
Version A: is unacceptable because it will run the code on the caller thread. So it will violate the "suspending functions do not block the caller thread" convention:
suspend fun doSomething(param: String) = delegate.performBlockingCode(param)
Version B: is better because it would run the blocking function in background thread, thus it wouldn't block the callers thread (except if by chance the caller uses the same thread from Dispatchers.Default threads pool). But coroutines job cancelling wouldn't interrupt performBlockingCode() which relies on thread interruption.
suspend fun doSomething(param: String) = withContext(Dispatchers.Default) {
delegate.performBlockingCode(param)
}
Version C: is currently the only way which I see to make it working. The idea is to convert blocking function into nonblocking with Java mechanisms and later use suspendCancellableCoroutine
(3) for converting asynchronous method into suspend function:
private ExecutorService executor = Executors.newSingleThreadExecutor();
public Future doSomethingAsync(String arg) {
return executor.submit(() -> {
doSomething(arg);
});
}
suspend fun doSomething(param: String) = suspendCancellableCoroutine<Any> { cont ->
try {
val future = delegate.doSomethingAsync(param)
} catch (e: InterruptedException) {
throw CancellationException()
}
cont.invokeOnCancellation { future.cancel(true) }
}
As commented below, above code won't work properly, because continuation.resumeWith() is not called
Version D: uses CompletableFuture: which provides a way to register callback for when completable completes: thenAccept
private ExecutorService executor = Executors.newSingleThreadExecutor();
public CompletableFuture doSomethingAsync(String arg) {
return CompletableFuture.runAsync(() -> doSomething(arg), executor);
}
suspend fun doSomething(param: String) = suspendCancellableCoroutine<Any> { cont ->
try {
val completableFuture = delegate.doSomethingAsync(param)
completableFuture.thenAccept { cont.resumeWith(Result.success(it)) }
cont.invokeOnCancellation { completableFuture.cancel(true) }
} catch (e: InterruptedException) {
throw CancellationException()
}
}
Do you know any better way for that?