1

I am trying to implement this example of write/notification handling (Using RxAndroidBle, how do I subscribe to responses from writing to a characteristic?).

connectionObservable
                    .flatMap((Function<RxBleConnection, Observable<Observable<byte[]>>>)
                            (RxBleConnection rxBleConnection) -> {
                                return rxBleConnection.setupNotification(TX_CHAR_UUID);
                            },
                            (BiFunction<RxBleConnection, Observable<byte[]>, Observable<byte[]>>)
                                    (rxBleConnection, apScanDataNotificationObservable) -> {
                                        return Observable.combineLatest(
                                                rxBleConnection.writeCharacteristic(RX_CHAR_UUID, getInputBytes()),
                                                apScanDataNotificationObservable.first(),
                                                new BiFunction<byte[], byte[], byte[]>() {
                                                    @Override
                                                    public byte[] apply(byte[] writtenBytes, byte[] responseBytes) throws Exception {
                                                        return responseBytes;
                                                    }
                                                }
                                        );
                                    }
                    ).flatMap(new Function<Observable<byte[]>, Observable<byte[]>>() {
                @Override
                public Observable<byte[]> apply(Observable<byte[]> observable) throws Exception {
                    return observable;
                }

            })
                    .first()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<byte[]>() {
                        @Override
                        public void accept(byte[] bytes) throws Exception {
                            Log.i("Ivan1", "notification response...." + bytes.toString());
                        }

                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            Log.i("Ivan", "notification response...." + throwable.toString());

                        }

                    });

I tried to write with rxjava1 and rxjava2, but in both cases I got a compile time error for apScanDataNotificationObservable.first(). It says "first(byte[]) in Observable cannot be applied to ()". So I dont know what argument I should pass to the first method.

Vadim Kotov
  • 8,084
  • 8
  • 48
  • 62
kosancicivan
  • 55
  • 1
  • 6

1 Answers1

2

The reason why you get this error is because the original answer was provided for a RxJava1 version of the RxAndroidBle and you are using RxJava2 at least in the above example.

Between RxJava1 and RxJava2 the method Observable.first() has changed signature and implementation. Equivalent function for RxJava2 is Observable.take(int count)

You should change this line:

apScanDataNotificationObservable.first(),

To this:

apScanDataNotificationObservable.take(1),

Also, the Observable.combineLatest() accepts two Observable parameters where rxBleConnection.writeCharacteristic() is a Single. You should change this line:

rxBleConnection.writeCharacteristic(RX_CHAR_UUID, getInputBytes()),

To this:

rxBleConnection.writeCharacteristic(RX_CHAR_UUID, getInputBytes()).toObservable(),
Dariusz Seweryn
  • 3,212
  • 2
  • 14
  • 21