I have been working on a personal project involving real time data using Kefir (or Bacon.js, pick your favorite) and have gotten to a point where I need to log the data in a database to append an id, and then pass the object with the id down the chain. Actually inserting the data into a database (NeDB) is not the issue, but rather its use of callbacks and continued execution while the record is being inserted to the database and how to work around this behavior.
Overly simplified example:
Suppose we have several devices dumping parsed data into a bus/pool:
function Position(data) {
this.id = null;
this.longitude = data.longitude;
this.latitude = data.latitude;
}
self.positionDataPool.map(function(position)) { // is this even what really needs to be done?
// unsure what to do here {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
if(e) { ... }
, else {
position.id = newRecord._id;
return position;
}
}
//}
})
.filter(function(position) {
// the position without an id is passed here
...
});
I suspect this is the incorrect or inappropriate use of the map function, but am out ideas after having tried several things. Any thoughts, suggestions, or help would be greatly appreciated.
My Solution
After doing a ton more reading and experimenting (on top of what I already did) and going back to my days of working with stream processing on a daily basis, I came up with the following solution. While this may not be the most efficient, this solution takes input from multiple sources by plugging several event sources into a data pool (not shown). A completely new stream is created to do a single operation on the object/data. While extensibility was not the objective here, this allows multiple sources to watch for data coming off the stream rather than dumping it straight into the filter. Finally, the data coming from the processed stream is filtered to only show the results we want.
self.savedPositionDataStream = Kefir.stream(function(emitter) {
self.positionDataPool.onValue(function(val) {
self.db.insert {
longitude: position.longitude
, latitude: position.latitude
}, function(e, newRecord) {
val.id = newRecord._id;
emitter.emit(val);
}
});
});
self.filteredPositionData = savedPositionDataStream.filter(...);