0

I have a stream like this

---ab---ab---a---ba---bab---ab---ab---ab--->

And I want this.

---ab---ab------ab----ab-ab-ab---ab---ab--->

The point is, that I have data with beginning and end (JSON) and sometimes the data is cut in the half in the stream, and I want to join them again. How can I do that?

Gergely Fehérvári
  • 7,811
  • 6
  • 47
  • 74
  • How do you distinguish `ab` and `a` though? I was thinking about [bufferCount](http://rxmarbles.com/#bufferCount) but that is not the right thing, I'm sure. – Igor Soloydenko Oct 15 '17 at 23:14

3 Answers3

1

Looks like a job for the scan operator

// substitute appropriate real-world logic
const isProperlyFormed = (x) => x === 'ab'  
const isIncomplete = (x) => x[0] === 'a' && x.length === 1
const startsWithEnding = (x) => x[0] === 'b'
const getCorrected = (buffer, x) => buffer.prev + x[0]
const getTail = (buffer, x) => x.slice(1)

const initialBuffer = {
  emit: [],
  prev: null
}

const result = source
  .scan((buffer, x) => {
    if (isProperlyFormed(x)) {
      buffer = {emit: [x], prev:null}
    }
    if (isIncomplete(x)) {
      buffer = {emit: [], prev:x}
    }
    if (startsWithEnding(x)) {
      const corrected = getCorrected(buffer, x)
      const tail = getTail(buffer, x)
      if (isProperlyFormed(tail)) {
        buffer = {emit: [corrected, tail], prev: null}
      } else {
        buffer = {emit: [corrected], prev: tail}
      }
    }
    return buffer
  }, initialBuffer)
  .flatMap(x => x.emit)

Working CodePen

Edit

Looking at the test input stream, I think a case is missing, which will break the above.

I changed the test from

---ab---ab---a---ba---bab---ab---ab---ab--->

to

---ab---ab---a---ba---bab---aba---b---ab--->

and also slimmed down the algorithm

const getNextBuffer = (x) => {
  const items = x.split(/(ab)/g).filter(y => y)  // get valid items plus tail
  return {
    emit: items.filter(x => x === 'ab'),    // emit valid items
    save: items.filter(x => x !== 'ab')[0]  // save tail
  }
}

const initialBuffer = {
  emit: [],
  save: null
}

const result = source
  .scan((buffer, item) => {
    const bufferAndItem = (buffer.save ? buffer.save : '') + item
    return getNextBuffer(bufferAndItem)
  }, initialBuffer)
  .flatMap(x => x.emit)

Working example CodePen

Richard Matsen
  • 20,671
  • 3
  • 43
  • 77
1

First split the stream into full responses and partial. Then check if response is full. Full responses are good as such. Partial responses need to be synchronized, so we split their stream into first and second halves and just zip those streams together.

The strange looking Rx.Observable.of(g.partition(x => x[0] === 'a')) is because partition operator returns pair of observables, which cannot be chained.

const testStream = Rx.Observable.of('a1', 'a2', '_ab', 'b1', 'a3', 'b2', '_ab', 'a4', 'b3', '_ab', 'b4', 'a5', 'b5', '_ab')

testStream
  .groupBy(x => (x[0] === '_' && 'full') || 'partial')
  .mergeMap(g =>
    Rx.Observable.if(
      () => g.key == 'full',
      g,
      Rx.Observable.of(g.partition(x => x[0] === 'a'))
        .mergeMap(([as, bs]) => Rx.Observable.zip(as, bs))
    )
  )
  .do(x => console.log(x))
  .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.min.js"></script>
artur grzesiak
  • 20,230
  • 5
  • 46
  • 56
0

This is how I solved:

import Rx from 'rxjs/Rx';
import {last} from 'lodash';

const data$ = Rx.Observable.of('ab','ab','a','ba','bab','aba','b','ab');
const line$ = data$.flatMap(data => {
    const lines = data.match(/[^b]+b?|b/g); // https://stackoverflow.com/a/36465144/598280 https://stackoverflow.com/a/25221523/598280
    return Rx.Observable.from(lines);
});

const isComplete$ = line$.scan((acc, value) => {
    const isLineEndingLast = last(acc.value) === 'b';
    const id = isLineEndingLast ? acc.id + 1 : acc.id;
    const complete = last(value) === 'b';
    return {value, id, complete};
}, {value: 'b', id: 0, complete: true});

const grouped$ = isComplete$
    .groupBy(data => data.id, data => data, group => group.first(data => data.complete))
    .flatMap(group => group.reduce((acc, data) => acc + data.value, ''));

grouped$.subscribe(console.log);
Gergely Fehérvári
  • 7,811
  • 6
  • 47
  • 74