1

I am currently implementing a protocol for a Bluetooth device and i am using the RxAndroidBle Library (version 1.4.3).

I have to request data from the device by writing to characteristic and then listening to the response via a characteristic notification.

To combine the 2 operations (writing and listening) I am using the code from: https://stackoverflow.com/a/41140523/734385

connectionObservable
        .flatMap( // when the connection is available...
                rxBleConnection -> rxBleConnection.setupNotification(AP_SCAN_DATA), // ... setup the notification...
                (rxBleConnection, apScanDataNotificationObservable) -> Observable.combineLatest( // ... when the notification is setup...
                        rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue), // ... write the characteristic...
                        apScanDataNotificationObservable.first(), // ... and observe for the first notification on the AP_SCAN_DATA
                        (writtenBytes, responseBytes) -> responseBytes // ... when both will appear return just the response bytes...
                )
        )
        .flatMap(observable -> observable) 

This approach works for me, the only problem is that the code gives me only the first 20 bytes (due to the apScanDataNotificationObservable.first()).

Unfortunately, I don't know the size of the package I am receiving. I can only extract the information from the header of the first 20 bytes. It seems like the RxJava buffer function all require to know the size beforehand.

Is there a way to make this work cleanly with the code above as part of the Rx chain?

In other words, can I control the number of emission based on the very first emission of an Rx chain?

Or do I have a completely wrong approach?

tiqz
  • 147
  • 1
  • 2
  • 8

1 Answers1

0

It is possible to achieve what you want.

The easiest way would be to exchange the Observable.combineLatest(...) to:

Observable.merge(
        rxBleConnection.writeCharacteristic(AP_SCAN_DATA, writeValue).ignoreElements(), // send the request but ignore the returned value
        apScanDataNotificationObservable.takeUntil(newResponseEndWatcher()) // take the response notifications until the response end watcher says so
);

Where newResponseEndWatcher() would need to contain the logic for determining if the received values are all that is expected. It could look like this:

private Func1<byte[], Boolean> newResponseEndWatcher() {
    return new Func1<byte[], Boolean>() {

        private static final int NOT_INITIALIZED = -1;

        private int totalLength = NOT_INITIALIZED;
        private int receivedLength = NOT_INITIALIZED;

        @Override
        public Boolean call(byte[] bytes) {
            if (isNotInitialized(totalLength)) { // if it is the first received value
                // parse totalLength from the header
            }
            // update receivedLength
            return receivedLength >= totalLength;
        }

        private boolean isNotInitialized(int value) {
            return value == NOT_INITIALIZED;
        }
    };
}

Just have in mind that Func1 which is the result newResponseEndWatcher() is stateful. If one would store into a variable the observable that is result of apScanDataNotificationObservable.takeUntil(newResponseEndWatcher()) the next subscriptions could end prematurely.

To mitigate this problem one may use Observable.using() function that would call newResponseEndWatcher() each time it is subscribed and then create a new apScanDataNotificationObservable.takeUntil(newResponseEndWatcher):

Observable.using(
        () -> newResponseEndWatcher(), // create a new response end watcher on each subscription
        responseEndWatcher -> apScanDataNotificationObservable.takeUntil(responseEndWatcher), // create the response observable that will complete properly
        responseEndWatcher -> { /* ignored, responseEndWatcher will get GCed eventually */ }
);
Dariusz Seweryn
  • 3,212
  • 2
  • 14
  • 21
  • thanks for the quick reply, i tried the code and i am getting the entire package, but I tried to use toList() to collect all the emission but I am never getting the list, the reason for that i think is that the takeUntil is not inclusive and it skips the last emission (and therefore also the toList() as it would be the only emission) i solved it for now by using the original code and using a flatmap to collect the emission and always returning Observable.empty() until the last emission – tiqz Nov 30 '17 at 14:53
  • From the `Observable.takeUntil(Func1)` Javadoc: `The difference between this operator and {@link #takeWhile(Func1)} is that here, the condition is evaluated after the item is emitted.` so I do not think it is the case? Maybe you have an off-by-one mistake in your `newResponseEndWatcher()` implementation? – Dariusz Seweryn Nov 30 '17 at 17:35
  • @tiqz any more info? `.toList()` should work just fine with any `Observable` that completes and if the implementation of `newResponseEndWatcher()` is correct the `apScanDataNotificationObservable.takeUntil(newResponseEndWatcher())` should complete without problems. – Dariusz Seweryn Dec 06 '17 at 10:15