28

I'm going a little crazy here. I'm trying to create an Observable<BigDecimal> extension function (against RxJava 2.x) to emit the average of the emissions, but I'm getting a compile error with the Single.zip() function. Does anybody have any ideas what I'm doing wrong? I tried to be explicit with all my types too and that didn't work...

import io.reactivex.Observable
import io.reactivex.Single
import java.math.BigDecimal


fun Observable<BigDecimal>.sum() = reduce { total, next -> total + next }

//compile error
fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
    Single.zip(it.sum().toSingle(), it.count()) {
        sum, count -> sum / BigDecimal.valueOf(count)
    }
}

enter image description here

tmn
  • 11,121
  • 15
  • 56
  • 112

3 Answers3

48

Type inference mostly does not work for rxJava2. It's not a type inference problem actually. Kotlin usually generates extension methods to that replaces SAM with kotlin functional types, but this technic does not work for overridden methods for some reason.

More details here https://youtrack.jetbrains.com/issue/KT-13609

As an option, you could try to specify types for lambda arguments

fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
    Single.zip(it.sum().toSingle(), it.count(), BiFunction {
        sum: BigDecimal, count: Long ->
        sum / BigDecimal.valueOf(count)
    })
}
Stepango
  • 4,721
  • 3
  • 31
  • 44
10

Type inference is failing for some reason, there must be somehow multiple combinations of types that could be inferred in this context.

You can specify the types explicitly with a more traditional (and unfortunately more verbose) syntax, like this:

fun Observable<BigDecimal>.average() = publish().autoConnect(2).let {
    Single.zip(it.sum().toSingle(), it.count(), BiFunction<BigDecimal, Long, BigDecimal> {
        sum, count ->
        sum / BigDecimal.valueOf(count)
    })
}

Update:

I've just found out while working on a similar problem that the actual problem here is that Kotlin isn't able to infer which Single.zip overload you're trying to call. From the official documentation:

If the Java class has multiple methods taking functional interfaces, you can choose the one you need to call by using an adapter function that converts a lambda to a specific SAM type. Those adapter functions are also generated by the compiler when needed.

So it turns out that using the more explicit SAM constructor solves this in itself, and gives you type inference back (basically, my previous answer was using a longer syntax than was actually required):

fun Observable<BigDecimal>.average(): Single<BigDecimal> = publish().autoConnect(2).let {
    Single.zip(it.sum().toSingle(), it.count(), BiFunction {
        sum, count ->
        sum / BigDecimal.valueOf(count)
    })
}
zsmb13
  • 85,752
  • 11
  • 221
  • 226
4

If type inferencing is the problem, one thing you can do is use RxKotlin

implementation "io.reactivex.rxjava2:rxkotlin:$rxKotlinVersion"

RxKotlin specifically provides SAM helpers to help mitigate the issues with type inferencing issues.

In which case,

Singles.zip(..., ...)

would be able to work just fine without any ambiguity. Notice I am using Singles and not Single

William Reed
  • 1,717
  • 1
  • 17
  • 30
  • 1
    Is it possible to use `Singles.zip(..., ...)` when I don't know the number of Singles (having them in a List)? – Micer Sep 26 '19 at 11:37
  • @Micer i think i am going to need some more context, it sounds like it would warrant its own question – William Reed Sep 27 '19 at 02:28
  • I have N urls, I need to make N network calls and merge the result. I've created Singles and put them in the list. I couldn't use `Singles.zip(..., ...)`, because it accepts exact number of Single objects. I finally solved it by using ` Single zip(final Iterable extends SingleSource extends T>> sources, Function super Object[], ? extends R> zipper)` from RxJava2 library. Thanks anyway. ;-) – Micer Sep 27 '19 at 14:07