0

I have an android app that uses CouchBase lite, I'm trying to save a document and get the acknowledgement using coroutin channel, the reason why I use a channel is to make sure every operation is done on the same scope

here is my try based on the selected answer here How to properly have a queue of pending operations using Kotlin Coroutines?

object DatabaseQueue {
    private val scope = CoroutineScope(IOCoroutineScope)
    private val queue = Channel<Job>(Channel.UNLIMITED)

    init {
        scope.launch(Dispatchers.Default) {
            for (job in queue) job.join()
        }
    }

    fun submit(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Unit
    ) {
        val job = scope.launch(context, CoroutineStart.LAZY, block)
        queue.trySendBlocking(job)
    }

    fun submitAsync(
        context: CoroutineContext = EmptyCoroutineContext,
        id: String,
        database: Database
    ): Deferred<Document?> {
        val job = scope.async(context, CoroutineStart.LAZY) {
            database.getDocument(id)
        }
        queue.trySendBlocking(job)
        return job
    }

    fun cancel() {
        queue.cancel()
        scope.cancel()
    }
}
fun Database.saveDocument(document: MutableDocument) {
    DatabaseQueue.submit {
        Timber.tag("quechk").d("saving :: ${document.id}")
        this@saveDocument.save(document)
    }
}

fun Database.getDocumentQ(id: String): Document? {
    return runBlocking {
        DatabaseQueue.submitAsync(id = id, database = this@getDocumentQ).also {
            Timber.tag("quechk").d("getting :: $id")
        }.await()
    }
}

my issue here is that when I have many db operations to write and read the reads are performing faster than the writes which gives me a null results, so,what I need to know is

  • is this the best way to do it or there is another optimal solution
  • how can I proccess the job and return the result from the channel in order to avoid the null result
  • Do you need a feature of executing one operation at a time and in the guaranteed FIFO order? This was a requirement in the linked question, but this is not clear in your own question. – broot Jan 29 '23 at 11:10
  • @broot yes, FIFO is what I'm looking for, but I need it to be tied to a single scope and channel to ensure no multiple operations are happening on the db at the same time, in our case, it could happen that we are saving a bulk of documents and we get another bulk from a websocket to save at the same time, we've encountered a loss of documents and sometimes documents are not deleting bec the db is locked and busy doing other operations – Mohammedsaif Kordia Jan 29 '23 at 13:10

1 Answers1

0

By modifying the original solution you actually made it work improperly. The whole idea was to create an inactive coroutine for each submitted block of code and then start executing these coroutines one by one. In your case you exposed a Deferred to a caller, so the caller is able to start executing a coroutine and as a result, coroutines no longer run sequentially, but concurrently.

The easiest way to fix this while keeping almost the same code would be to introduce another Deferred, which is not directly tight to the queued coroutine:

fun submitAsync(
    context: CoroutineContext = EmptyCoroutineContext,
    id: String,
    database: Database
): Deferred<Document?> {
    val ret = CompletableDeferred<Document?>()
    val job = scope.launch(context, CoroutineStart.LAZY) {
        ret.completeWith(runCatching { database.getDocument(id) })
    }
    queue.trySendBlocking(job)
    return ret
}

However, depending on your case it may be an overkill. For example, if you don't need to guarantee a strict FIFO ordering, a simple Mutex would be enough. Also, please note that classic approach of returning futures/deferreds only to await on them is an anti-pattern in coroutines. We should simply use a suspend function and call it directly.

broot
  • 21,588
  • 3
  • 30
  • 35