26

I have code that should change SharedPreferences into obsarvable storage with flow so I've code like this

internal val onKeyValueChange: Flow<String> = channelFlow {
        val callback = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
            coroutineScope.launch {
                //send(key)
                offer(key)
            }
        }

        sharedPreferences.registerOnSharedPreferenceChangeListener(callback)

        awaitClose {
            sharedPreferences.unregisterOnSharedPreferenceChangeListener(callback)
        }
    }

or this

internal val onKeyValueChange: Flow<String> = callbackFlow {
        val callback = SharedPreferences.OnSharedPreferenceChangeListener { _, key ->
            coroutineScope.launch {
                send(key)
                //offer(key)
            }
        }

        sharedPreferences.registerOnSharedPreferenceChangeListener(callback)

        awaitClose {
            sharedPreferences.unregisterOnSharedPreferenceChangeListener(callback)
        }
    }

Then I observe this preferences for token, userId, companyId and then log into but there is odd thing as I need to build app three times like changing token not causes tokenFlow to emit anything, then second time new userId not causes userIdFlow to emit anything, then after 3rd login I can logout/login and it works. On logout I am clearing all 3 properties stores in prefs token, userId, companyId.

Michał Ziobro
  • 10,759
  • 11
  • 88
  • 143

2 Answers2

40

For callbackFlow:

You cannot use emit() as the simple Flow (because it's a suspend function) inside a callback. Therefore the callbackFlow offers you a synchronized way to do it with the trySend() option.

Example:

fun observeData() = flow {
 myAwesomeInterface.addListener{ result ->
   emit(result) // NOT ALLOWED
 }
}

So, coroutines offer you the option of callbackFlow:

fun observeData() = callbackFlow {
 myAwesomeInterface.addListener{ result ->
   trySend(result) // ALLOWED
 }
 awaitClose{ myAwesomeInterface.removeListener() }
}

For channelFlow:

The main difference with it and the basic Flow is described in the documentation:

A channel with the default buffer size is used. Use the buffer operator on the resulting flow to specify a user-defined value and to control what happens when data is produced faster than consumed, i.e. to control the back-pressure behavior.

The trySend() still stands for the same thing. It's just a synchronized way (a non suspending way) for emit() or send()

I suggest you to check Romans Elizarov blog for more detailed information especially this post.

Regarding your code, for callbackFlow you wont' be needing a coroutine launch:

coroutineScope.launch {
                send(key)
                //trySend(key)
            }

Just use trySend()

Jakub Licznerski
  • 1,008
  • 1
  • 17
  • 33
coroutineDispatcher
  • 7,718
  • 6
  • 30
  • 58
1

Another Example, maybe much concrete:

private fun test() {
        lifecycleScope.launch {
            someFlow().collectLatest {
                Log.d("TAG", "Finally we received the result: $it")
                // Cancel this listener, so it will not be subscribed anymore to the callbackFlow. awaitClose() will be triggered.
                // cancel()
            }
        }
    }

    /**
     * Define a callbackFlow.
     */
    private fun someFlow() = callbackFlow {

        // A dummy class which will run some business logic and which will sent result back to listeners through ApiCallback methods.
        val service = ServiceTest() // a REST API class for example

        // A simple callback interface which will be called from ServiceTest
        val callback = object : ApiCallback {
            override fun someApiMethod(data: String) {
                // Sending method used by callbackFlow. Into a Flow we have emit(...) or for a ChannelFlow we have send(...)
                trySend(data)
            }

            override fun anotherApiMethod(data: String) {
                // Sending method used by callbackFlow. Into a Flow we have emit(...) or for a ChannelFlow we have send(...)
                trySend(data)
            }
        }

        // Register the ApiCallback for later usage by ServiceTest
        service.register(callback)

        // Dummy sample usage of callback flow.
        service.execute(1)
        service.execute(2)
        service.execute(3)
        service.execute(4)

        // When a listener subscribed through .collectLatest {} is calling cancel() the awaitClose will get executed.
        awaitClose {
            service.unregister()
        }
    }

    interface ApiCallback {
        fun someApiMethod(data: String)
        fun anotherApiMethod(data: String)
    }

    class ServiceTest {

        private var callback: ApiCallback? = null

        fun unregister() {
            callback = null
            Log.d("TAG", "Unregister the callback in the service class")
        }

        fun register(callback: ApiCallback) {
            Log.d("TAG", "Register the callback in the service class")
            this.callback = callback
        }

        fun execute(value: Int) {
            CoroutineScope(Dispatchers.IO).launch {
                if (value < 2) {
                    callback?.someApiMethod("message sent through someApiMethod: $value.")
                } else {
                    callback?.anotherApiMethod("message sent through anotherApiMethod: $value.")
                }
            }
        }
    }
Hagau Ioan
  • 51
  • 2