After cogitating on your problem a little more I believe I found a more general solution. Let's start with the EventStream
constructor (which is more general than your Stream
constructor):
function EventStream() {
this.listeners = [];
}
Then we create a dispatch
method to add events to the stream:
EventStream.prototype.dispatch = function (event) {
return this.listeners.map(function (listener) {
return listener(event);
});
};
Next we'll create a map
method which is again more general than your foreach
method:
EventStream.prototype.map = function (f) {
var stream = new EventStream;
this.listeners.push(function (x) {
return stream.dispatch(f(x));
});
return stream;
};
Now when you map
a function over an event stream you get an entirely new event stream. For example if your stream is [0,1,3,5..]
and you map (+2)
over it then the new stream would be [2,3,5,7..]
.
We'll also create a few more beneficial utility methods like filter
, scan
and merge
as follows:
EventStream.prototype.filter = function (f) {
var stream = new EventStream;
this.listeners.push(function (x) {
if (f(x)) return stream.dispatch(x);
});
return stream;
};
The filter
method filter out certain events in an event stream to create an entirely new event stream. For example given [2,3,5,7..]
and the function odd
the filtered event stream would be [3,5,7..]
.
EventStream.prototype.scan = function (a, f) {
var stream = new EventStream;
setTimeout(function () {
stream.dispatch(a);
});
this.listeners.push(function (x) {
return stream.dispatch(a = f(a, x));
});
return stream;
};
The scan
method is used to cumulatively create a new event stream. For example given the stream [3,5,7..]
, the initial value 0
and the scanning function (+)
the new event stream would be [0,3,8,15..]
.
EventStream.prototype.merge = function (that) {
var stream = new EventStream;
this.listeners.push(function (x) {
return stream.dispatch(new Left(x));
});
this.listeners.push(function (y) {
return stream.dispatch(new Right(x));
});
return stream;
};
function Left(x) {
this.left = x;
}
function Right(x) {
this.right = x;
}
The merge
method combines two separate event streams into one. To differentiate which stream generated each event we tag all the events as either left or right.
Alright, now onto bigger problems. Let's create a zip
method. The really cool thing is that we can create zip
using the map
, filter
, scan
and merge
methods as follows:
EventStream.prototype.zip = function (that) {
return this.merge(that).scan([[], [], null], function (acc, event) {
var left = acc[0], right = acc[1];
if (event instanceof Left) {
var value = event.left;
return right.length ?
[left, right.slice(1), new Just([value, right[0]])] :
[left.concat(value), right, null];
} else {
var value = event.right;
return left.length ?
[left.slice(1), right, new Just([left[0], value])] :
[tuple(left, right.concat(value), null];
}
})
.filter(function (a) {
return a[2] instanceof Just;
})
.map(function (a) {
return a[2].just;
});
};
function Just(x) {
this.just = x;
}
Now you can use it as follows:
stream1.zip(stream2).map(function (v) {
console.log(v);
});
You can define stream1
and stream2
as follows:
var stream1 = getRandomStream();
var stream2 = getRandomStream();
function getRandomStream() {
var stream = new EventStream;
setInterval(function () {
stream.dispatch(Math.random());
}, ((Math.random() * 100) + 500) | 0);
return stream;
}
That's all there is to it. No need for promises.