2

I want to maintain an object at the thread level or the coroutine level depending on the type of work the application is doing across different thread/coroutines. Is there a way to achieve this?

let's say for simplicity's sake, I can write a Spring Boot Application where many things are happening based on threads and only certain parts of code use coroutines to leverage their benefits. How do I maintain state based on the current execution? is there a way to do this?

MozenRath
  • 9,652
  • 13
  • 61
  • 104
  • Jumping back and forth between threads and coroutines is relatively tricky, but are you aware of [`ThreadContextElement`](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-thread-context-element/)? – Louis Wasserman May 11 '21 at 00:09
  • One thing I'd try is to spread the coroutine domain into your thread domain, you can run normal blocking code inside a coroutine, on the same thread that you'd usually run non-coroutine code on. The entry point would be a `runBlocking` on that thread. – Marko Topolnik May 11 '21 at 06:56
  • ThreadContextElement is specific to coroutines only, right? will I be able to access it from a non-coroutine scenario? – MozenRath May 12 '21 at 17:48
  • the idea is not to make coroutine code blocking but to have interoperability – MozenRath May 12 '21 at 17:49

1 Answers1

1

Maybe my answer is a little bit late, but you can do it by either

  1. ContinuationInterceptor
  2. CopyableThreadContextElement

More elaborated answer with examples is provided here.

Below I will show an example for ContinuationInterceptor copied from those answers.

class WrappedDispatcher(
    private val dispatcher: ContinuationInterceptor,
    private var savedCounter: Int = counterThreadLocal.get() ?: 0
) : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        dispatcher.interceptContinuation(ContinuationWrapper(continuation))

    private inner class ContinuationWrapper<T>(val base: Continuation<T>) : Continuation<T> by base {

        override fun resumeWith(result: Result<T>) {
            counterThreadLocal.set(savedCounter)
            try {
                base.resumeWith(result)
            } finally {
                savedCounter = counterThreadLocal.get()
            }
        }
    }
}

and usage

val counterThreadLocal: ThreadLocal<Int> = ThreadLocal()

fun showCounter() {
    println("-------------------------------------------------")
    println("Thread: ${Thread.currentThread().name}\n Counter value: ${counterThreadLocal.get()}")
}

fun main() {
    runBlocking(WrappedDispatcher(Dispatchers.IO)) {
        showCounter()
        counterThreadLocal.set(2)
        delay(100)
        showCounter()
        counterThreadLocal.set(3)
        withContext(WrappedDispatcher(Dispatchers.Default)) {
            println("__________NESTED START___________")
            counterThreadLocal.set(4)
            showCounter()
            println("__________NESTED END_____________")
        }
        delay(100)
        showCounter()
    }
}

output will be

-------------------------------------------------
Thread: DefaultDispatcher-worker-1
 Counter value: 0
-------------------------------------------------
Thread: DefaultDispatcher-worker-1
 Counter value: 2
__________NESTED START___________
-------------------------------------------------
Thread: DefaultDispatcher-worker-3
 Counter value: 4
__________NESTED END_____________
-------------------------------------------------
Thread: DefaultDispatcher-worker-3
 Counter value: 3
Alex
  • 3,923
  • 3
  • 25
  • 43