1

I have two unsorted observables of different types. Both of these types share a common key. I'd like to join them into a new observable emitting pairs of corresponding elements and I can't figure out how to do it.

Note that some of the keys might be missing. It would be OK if not complete pairs are dropped, but having null in place of missing piece would be even better.

Input 1:

Entity(id = 2),
Entity(id = 1),
Entity(id = 4)

Input 2:

Dto(id = 3),
Dto(id = 2),
Dto(id = 1)

Expected output (in any order):

Pair(Entity(id = 1), Dto(id = 1)),
Pair(Entity(id = 2), Dto(id = 2)),
Pair(null, Dto(id = 3)),
Pair(Entity(id = 4), null)
  • 1
    How long would you want the Observables to wait for their corresponding element? – Luka Jacobowitz Aug 14 '17 at 09:30
  • Indefinitely I guess. The entities come from the DB and Dtos are from Http request so I should have the full data set in memory – Kacper Lamparski Aug 14 '17 at 09:33
  • If that's so, why do streaming at all, might be a lot easier converting them to in memory collections and doing the transformations then. – Luka Jacobowitz Aug 14 '17 at 09:37
  • https://stackoverflow.com/questions/29220050/rxjava-merge-observables-of-different-type - Sort them first, zip them after, call an action of both which combines them. – Emanuel Aug 14 '17 at 09:37
  • @EmanuelSeibold This wouldn't work as the keys are missing - unless I'm missing something ofc. – Kacper Lamparski Aug 14 '17 at 09:42
  • @LukaJacobowitz To be honest I've expected this answer :D I think I just needed to hear it from someone more experienced in Rx – Kacper Lamparski Aug 14 '17 at 09:42
  • Kacper, it does. Sort it by Key, the data with the missing key goes to the end and. Means if you combine them "entity 1, entity 2, dto 3, entity 4, dto 5, entity null, dto null, entity null, ... But i think even if you can solve this, this doesnt seems like a legit design pattern – Emanuel Aug 14 '17 at 09:43
  • Protip: If you have to wait indefinitely for something, streaming can't help you. :) – Luka Jacobowitz Aug 14 '17 at 09:50
  • I'll keep that in mind :) Thanks a lot for your help! – Kacper Lamparski Aug 14 '17 at 11:19
  • Maybe duplicate of https://stackoverflow.com/questions/45220837/full-outer-join-of-two-ordered-observables ? – akarnokd Aug 14 '17 at 11:31

1 Answers1

1

First, Observable.merge the streams together: this gives you a stream of all the items. (In the code below, I've used a custom Either class to tag each stream.)

Then, for each item in the stream, try to match it with a previously-observed item of the other type, and output the pair. If not, save it to be matched later.

Finally, once the stream is done, the remaining unmatched elements won't be matched with anything, so they can be emitted unpaired.

import io.reactivex.Observable

data class Entity(val id: Int)
data class Dto(val id: Int)

sealed class Either<out A, out B>
data class Left<A>(val value: A) : Either<A, Nothing>()
data class Right<B>(val value: B) : Either<Nothing, B>()

fun <A : Any, B : Any, C> joinById(a: Observable<A>, idA: (A) -> C, b: Observable<B>, idB : (B) -> C): Observable<Pair<A?, B?>> {
    val unmatchedA = mutableMapOf<C, A>()
    val unmatchedB = mutableMapOf<C, B>()
    val merged = Observable.mergeDelayError(a.map(::Left), b.map(::Right)).flatMap { latest ->
        when (latest) {
            is Left -> {
                val id = idA(latest.value)
                unmatchedB.remove(id)?.let { return@flatMap Observable.just(latest.value to it) }
                unmatchedA.put(id, latest.value)
            }
            is Right -> {
                val id = idB(latest.value)
                unmatchedA.remove(id)?.let { return@flatMap Observable.just(it to latest.value) }
                unmatchedB.put(id, latest.value)
            }
        }
        Observable.empty<Nothing>()
    }
    return Observable.concat(merged, Observable.create { emitter ->
        unmatchedA.values.forEach { emitter.onNext(it to null) }
        unmatchedB.values.forEach { emitter.onNext(null to it) }
        emitter.onComplete()
    })
}

fun main(args: Array<String>) {
    val entities = Observable.just(Entity(2), Entity(1), Entity(4))
    val dtos = Observable.just(Dto(3), Dto(2), Dto(1))
    joinById(entities, Entity::id, dtos, Dto::id).blockingForEach(::println)
}
(Entity(id=2), Dto(id=2))
(Entity(id=1), Dto(id=1))
(Entity(id=4), null)
(null, Dto(id=3))

Note that this may have some odd behavior if the ids are repeated within a stream, and depending on the structure of the streams it's possible that this will end up buffering a lot of elements in memory.

ephemient
  • 198,619
  • 38
  • 280
  • 391