3
    function fnToDeferredObservable(fn) {
      return Rx.Observable.defer(() => {
       const result = fn();
       let observable;
       if (result instanceof Rx.Observable) {
        observable = result;
       } else if (result instanceof Promise) {
        observable = Rx.Observable.fromPromise(result);
       } else {
        observable = Rx.Observable.return(result);
       }
       return observable;
    });
   }
  function transactionEmmiter() {
    const eventStream = RxReact.FuncSubject.create(() => () =>   action(...attr));
    this.transactionStream = this.transactionStream.merge(eventStream);
  }
  function componentDidMount() { 
    this.transactionStream
    .map(fnToDeferredObservable)
    .concatAll()
    .subscribe((result) => console.log(result),
      (err) => console.log(err),
      () => console.log('complete'));
  }

 <button className="btn btn-default" onClick={this.transactionEmmiter(this.props.increment, 1)}>
      Increment
    </button>
    {' '}
    <button className="btn btn-default" onClick={this.transactionEmmiter(this.props.double)}>
      Double (Async)
    </button>
    <hr />
    <button className="btn btn-default" onClick={this.transactionEmmiter(this.props.random)}>
      Random Increment (Async)
    </button>

The eventStream applies for DomEvents, merging them into the transaction stream, which by itself defers any given handler (and creating observables from their return values) concatAll, concats these new observables. The effect is, that any async function is queued and waits for the before event to resolve. Works so far, but .concatAll somehow just strips all events after I initially emitted a DomEvent, it’s just a short period of time (until the first Observable completes) where the emmiting Dom Events are handled by the transaction subscription at all.

Click Events
———2——>
—1————>
————3—>
Map Value to function expression
———y2——>
—y1————>
————y3—>
Merge
—y1—y2—y3—>
Map to deferred Observable
—y1—y2—y3—>
—x1—x2—x3—>
Concat all
—>xy1—>xy2—>xy2
expected output is:
result = y()
—>result1—>result2—>result3———>

example:
with input:
———2——>
—1————>
————3—>
——————————4—>
expected:
—>result1—>result2—>result3—>result4
actual:
—>result1—>result2—>result3
rgruenke
  • 85
  • 1
  • 8
  • I have a hard time understanding your comment. Can you make a marble diagram which represent your expected behaviour (input, and expected output) and then present the actual output? – user3743222 Jan 26 '16 at 18:39
  • I gave it a try, see above, thanks! – rgruenke Jan 26 '16 at 20:34
  • Thanks for the extra info. I think you need help from some SO guy who experimented with RxReact. I still have no clue of the data flow here. `this.transactionEmmiter` seems to be a handler factory which takes arguments, but then the `function transactionEmmiter()` does not take any argument. Worse, you modify `this.transactionStream` merging subjects one after the other. That makes the whole thing quite hard to reason about. – user3743222 Jan 27 '16 at 00:15
  • When you do `this.transactionStream.map(fnToDeferredObservable).concatAll()`, each operator sets a series of callbacks (inner observers) to be executed when a value is pushed. But it does this only once, i.e. when this chain is executed. So when you change `this.transactionStream` after executing that operator chain, the callbacks remain attached to the first `this.transactionStream` you had. I can't tell if your problem comes from the fact that the next value of `transactionStream` is pushing values but there is no observer subscribed to it, but that's my bet. – user3743222 Jan 27 '16 at 00:19
  • For an illustrated example of data flows including the callback registering mechanism, have a look here http://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444. Good practice is programming with RxJS as functionally as possible : avoid side effects, isolate them (with `do`) when they are unavoidable, have streams whose output depend solely on their inputs. A stream is a constant and should not change. It already embodies a sequence of values. If you have a sequence of streams, then create a stream whose values will be streams. – user3743222 Jan 27 '16 at 00:28
  • this.transactionStream(fn,...arr) . Sorry forgotten the Params... transactionStream is only call es once for all dom node s and After that the subscriber is applied. Then all subjects events get merged into the transactionStream, and this is already working so dont World about that – rgruenke Jan 27 '16 at 07:17
  • additionally if i remove `.map(fnToDeferredObservable) .concatAll()` everything works fine, except that my functions aren't executed and so on, but the streaming works as excepted – rgruenke Jan 27 '16 at 08:51

0 Answers0