0

I'm currently trying to implement a simple offline/online sync mechanism using observables.

Basically, I have two observables:

  1. Connection observable: The first observable gives me the information whether or not there is an internet connections. It emits when the network state changes
  2. Data observable: The second observable has the data that needs to synced. It emits when there is new data to be synced

What I want to achieve is to combine the above observables so that:

  • As long as the connection state is false, the combined observable shouldn't emit. In this case, the data observable should retain its state
  • As long as the connection state is true, the combined observable should emit every time there is data in the data observable
  • If the connection state switches from false to true, it should emit for every value on the data observable

A small example that currently uses filter and combineLatest can be found here: https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js

Unfortunately, this doesn't behave as intended at all.

Is there any operator to achieve the required behavior? As an alternative, I could maybe poll the connection status of course and emit every X seconds. But ideally, I'd like a clean combination of Observables, though I'm a bit lost with what operator makes most sense.

To clear the idea up: I need to sync all data, not just the latest. So the data observable should buffer the data.

Gernot R. Bauer
  • 380
  • 2
  • 14

3 Answers3

1

It looks like were on the right path and could do just the following:

combineLatest([
  offlineOnlineSubject,
  dataSubject
])
  .pipe(
    filter(([online, counter]) => online),
  )
  .subscribe(([online, counter]) => {
    syncedIndicator.textContent = counter;
  });
martin
  • 93,354
  • 25
  • 191
  • 226
  • Using this approach, the latest entry will be synced when going online. But if I emitted 4 elements in the data observable, only the last element will be synced, and the previous elements will be filtered out. I need to sync all the data however. – Gernot R. Bauer Jun 27 '22 at 13:06
  • So you want to buffer all data emissions while the other Observable is offline? – martin Jun 27 '22 at 13:11
  • Yes, that would be the basic idea. – Gernot R. Bauer Jun 27 '22 at 13:26
1

So when we are in offline mode, I store all the emitted data in a buffer, and once we change from offline to online, I simply emit everything in the buffer:

let dataBuffer = [];
let isPreviouslyOnline = false;

combineLatest([dataSubject, offlineOnlineSubject])
  .pipe(
    filter(([data, isOnline]) => {
      if (!isOnline) {
        if (!isPreviouslyOnline) {
          dataBuffer.push(data);
        }

        isPreviouslyOnline = false;
        return false;
      }

      return true;
    }),
    switchMap(([data]) => {
      isPreviouslyOnline = true;
      if (dataBuffer.length > 0) {
        const tempData = [...dataBuffer];
        dataBuffer = [];

        return from(tempData);
      } else {
        return of(data);
      }
    })
  )
  .subscribe((data) => {
    console.log("Data is: ", data);
  });

Code sandbox: https://codesandbox.io/s/offline-sync-forked-k38kc3?file=/src/index.js

I think it works but it doesn't feel great, was trying to use the native rxjs buffer operator to achieve it but couldn't figure out how. Would love to see if anyone has a better/cleaner solution.

Nam
  • 554
  • 4
  • 11
  • Thanks. I agree that this doesn't feel great. We now have a solution using bufferWhen and two additional Observables, but this doesn't feal great as well. So I'm currently trying to find a solution using the search term "gate" and check what will come up. – Gernot R. Bauer Jun 28 '22 at 07:32
1

After also searching for the term "gate", I found the following stack overflow question and post: Conditional emission delays with rxjs

Basically, the answer is using delayWhen to achieve the desired result.

I've updated an example here: https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357

The crucial part is:

const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();

const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));

dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
  console.log("Syncing data", {
    counter
  });

  syncedIndicator.innerHTML += `<li>${counter}</li>`;
});

Wrapped in a custom typescript operator:

import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';

export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
  const gateTriggerFn = () => gateTrigger.pipe(
    filter((v) => v)
  );

  return (source: Observable<T | null | undefined>) => source.pipe(
    delayWhen(gateTriggerFn)
  );
}

It seems so far that this solution is doing what I intend it to do.

Gernot R. Bauer
  • 380
  • 2
  • 14