13

I am new to developing in node.js (though relatively experienced at client-side javascript) and I'm running into lots of questions about good practices when dealing with asynchronous operations in node.js.

My specific issue (though I imagine this is a fairly general purpose topic) is that I have a node.js app (running on a Raspberry Pi) that is recording the readings from several temperature probes every 10 seconds to an in memory data structure. This works just fine. The data accumulates over time in memory and, as it accumulates and reaches a particular size threshold, the data is regularly aged (keeping only the last N days of data) to keep it from growing beyond a certain size. This temperature data is used to control some other appliances.

Then, I have a separate interval timer that writes this data out to disk every so often (to persist it if the process crashes). I'm using async node.js (fs.open(), fs.write() and fs.close()) disk IO to write the data out to disk.

And, because of the async nature of the disk IO, it occurs to me that the very data structure I'm trying to write to disk may get modified right in the middle of me writing it out to disk. That would potentially be a bad thing. If data is only appended to the data structure while writing out to disk, that won't actually cause a problem with the way I'm writing the data, but there are some circumstances where earlier data can be modified as new data is being recorded and that would really mess with the integrity of what I'm in the middle of writing to disk.

I can think of all sorts of somewhat ugly safeguards I could put in my code such as:

  1. Switch to synchronous IO to write the data to disk (don't really want to do that for server responsiveness reasons).
  2. Set a flag when I started writing data and don't record any new data while that flag is set (causes me to lose the recording of data during the write).
  3. More complicated versions of option 2 where I set the flag and when the flag is set, new data goes in a separate, temporary data structure that when the file IO is done is then merged with the real data (doable, but seems ugly).
  4. Take a snapshot copy of the original data and take your time to write that copy to disk knowing that nobody else will be modifying the copy. I don't want to do this because the data set is relatively large and I'm in a limited memory environment (Raspberry PI).

So, my question is what are design patterns for writing a large data set with async IO when other operations may want to modify that data during the async IO? Are there more general purpose ways of handling my issue than the specific work-arounds listed above?

jfriend00
  • 683,504
  • 96
  • 985
  • 979
  • 1
    Maybe this helps to some degree? http://stackoverflow.com/q/14795145/218196 – Felix Kling Sep 06 '14 at 00:27
  • 2
    @FelixKling - that article looks like a good description of how the async operations in node.js work. I understand that already. In fact, it's my understanding of the async architecture that informs me that I have a concurrency issue to deal with and that's why I'm looking for good practice design patterns for solving that concurrency issue. – jfriend00 Sep 06 '14 at 00:31
  • As far as I understood node is still single threaded, and concurrent execution doesn't necessarily mean *parallel* execution, so there should not be an issue with reading and writing data at the same time (because it can't happen, just like in the browser). But these are all just guesses, I actually don't know for sure, so don't listen to me (too much) :) – Felix Kling Sep 06 '14 at 00:34
  • @FelixKling - no, that's not the case when using multiple async IO operations. I've verified it DOES happen in my case. My interval timer to collect temperatures and add them to my data structure does fire during the middle of a series of async IO operations. I've seen it happen in my logging (not every time, but sometimes which means I have a problem). It just like if you were doing multiple consecutive async Ajax operations in the browser, many other events/timers could fire in between those Ajax operations. Some thing here. – jfriend00 Sep 06 '14 at 00:41
  • I think I misunderstood the whole problem :D Moving on :) – Felix Kling Sep 06 '14 at 00:43
  • I find option three seems the most viable, buffering the data inbetween is not really that "dirty" otherwise I may suggest constantly streaming the data to disk using a writable stream. So my advice would be to practically avoid writing large datasets to disc at once. – LJᛃ Sep 06 '14 at 02:06
  • @LJ_1102 - I considered writing the data as it comes in. One issue with that is that the "disk" in the Raspberry Pi is an SD card which has only a finite number of writes (a characteristic of all flash memory). I'm trying to make a system that will run unattended for 10 years so I shouldn't be writing that often - certainly not every time some new data is collected. – jfriend00 Sep 06 '14 at 02:18
  • 5
    Why are people voting to close this question? I'm asking how to write live data using async IO. – jfriend00 Sep 06 '14 at 04:00
  • Are you really serializing and writing the data in chunks? If not, your synchronous serialization process is effectively taking a snapshot without being distracted, and asynchronously writing that buffer is not a problem. Or is your actual problem that saving the data occurs in the middle of the multiple, asynchronous sensor fetches? – Bergi Sep 06 '14 at 12:50
  • 1
    @Bergi - yes I am writing the data in chunks. For memory usage reasons(this is a small Raspberry Pi without a lot of RAM), I don't want to serialize it all into memory at once. There are opportunities for additional sensor reads to occur between the async writes and I've actually seen that occur in my logs. – jfriend00 Sep 06 '14 at 15:43

2 Answers2

7

Your problem is data synchronization. Traditionally this is solved with locks/mutexes, but javascript/node doesn't really have anything like that built-in.

So, how do we solve this in node? We use queues. Personally, I use the queue function from the async module.

Queues work by keeping a list of tasks that need to be executed and only execute those tasks, in the order they're added to the queue, once the previous task has completed (similar to your option 3).

queue animation

Note: The async module's queue method can actually run multiple tasks concurrently (like the animation above shows) but, since we're talking data synchronization here, we don't want that. Luckily we can tell it to just run one at a time.

In your particular situation what you'll want to do is setup a queue which can do two types of tasks:

  1. Modify your data structure
  2. Write your data structure to disk

Whenever you get new data from your temperature probes, add the task to your queue to modify your data structure with that new data. Then, whenever your interval timer fires, add the task to your queue that writes your data structure to disk.

Since the queue will only run one task at a time, in the order they're added to the queue, it guarentees that you'll never be modifying your in-memory data structure while you're writing data to disk.

A very simple implementation of that might look like:

var dataQueue = async.queue(function(task, callback) {
    if (task.type === "newData") {
        memoryStore.add(task.data); // modify your data structure however you do it now
        callback(); // let the queue know the task is done; you can pass an error here as usual if needed
    } else if (task.type === "writeData") {
        fs.writeFile(task.filename, JSON.stringify(memoryStore), function(err) {
            // error handling
            callback(err); // let the queue know the task is done
        })
    } else {
        callback(new Error("Unknown Task")); // just in case we get a task we don't know about
    }
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

// call when you get new probe data
funcion addNewData(data) {
    dataQueue.push({task: "newData", data: data}, function(err) {
        // called when the task is complete; optional
    });
}

// write to disk every 5 minutes
setInterval(function() {
    dataQueue.push({task: "writeData", filename: "somefile.dat"}, function(err) {
        // called when the task is complete; optional
    });
}, 18000);

Also note that you can now add data to your data structure asynchronously. Say you add a new probe that fires off an event whenever its value changes. You can just addNewData(data) as you do with your existing probes and not worry about it conflicting with in-progress modifications or disk writes (this really comes in to play if you start writing to a database instead of an in-memory data store).


Update: A more elegant implementation using bind()

The idea is that you use bind() to bind arguments to a function and then push the new bound function that bind() returns on to the queue. That way you don't need to push some custom object on to the queue that it has to interpret; you can just give it a function to call, all setup with the correct arguments already. The only caveat is that the function has to take a callback as its last argument.

That should allow you to use all the existing functions you have (possibly with slight modifications) and just push them on to the queue when you need to make sure they don't run concurrently.

I threw this together to test the concept:

var async = require('async');

var dataQueue = async.queue(function(task, callback) {
    // task is just a function that takes a callback; call it
    task(callback); 
}, 1); // The 1 here is setting the concurrency of the queue so that it will only run one task at a time

function storeData(data, callback) {
    setTimeout(function() { // simulate async op
        console.log('store', data);
        callback(); // let the queue know the task is done
    }, 50);
}

function writeToDisk(filename, callback) {
    setTimeout(function() { // simulate async op
        console.log('write', filename);
        callback(); // let the queue know the task is done
    }, 250);
}

// store data every second
setInterval(function() {
    var data = {date: Date.now()}
    var boundStoreData = storeData.bind(null, data);
    dataQueue.push(boundStoreData, function(err) {
        console.log('store complete', data.date);
    })
}, 1000)

// write to disk every 2 seconds
setInterval(function() {
    var filename = Date.now() + ".dat"
    var boundWriteToDisk = writeToDisk.bind(null, filename);
    dataQueue.push(boundWriteToDisk, function(err) {
        console.log('write complete', filename);
    });
}, 2000);
Mike S
  • 41,895
  • 11
  • 89
  • 84
  • This solution is no different from option 3. We still have to find a way to stitch the incoming data from the queue into the data structure, which can get messy (we aren't just processing inserts... we are also processing deletes). – John Kurlak Sep 06 '14 at 05:43
  • 1
    @JohnKurlak - I think this could work. When I have new data to add to the data structure, I put an item in the data queue (operation ID and data). If there's nothing else going on at the time, the queue manager just runs the operation (e.g. calls a function) and the data is added to the local store right away. If there is a save operation going on at the time, the queued item sits in the queue. When the save operation completes, the queue manager can then pull the next item out of the queue and literally run the exact same code that would have run if the save operation was not running. – jfriend00 Sep 06 '14 at 05:47
  • Using `.bind()` seems like a nice simplifying improvement. – jfriend00 Sep 06 '14 at 16:29
  • 1
    I implemented the queue. Pretty simple actually and appears to work just great. If the data is locked, and one of the functions that can modify the data is called, each of those methods has it's implementaton in a local function (that automatically has access to the function arguments via closure) so I execute that local function if the data isn't locked or I queue it if it is. The hardest part about implementing it is working out a test that causes the (relatively low frequency) concurrency clash to occur often enough for you to test the code you wrote. – jfriend00 Sep 08 '14 at 03:14
6

First - let's show a practical solution and then let's dive into how and why it works:

var chain = Promise.resolve(); // Create a resolved promise
var fs = Promise.promisifyAll(require("fs"));

chain = chain.then(function(){
    return fs.writeAsync(...); // A
});

// some time in the future
chain = chain.then(function(){
    return fs.writeAsync(...); // This will always execute after A is done
})

Since you've tagged your question with promises - it's worth mentioning that promises solve this (quite complicated) problem very well on their own and do so quite easily.

Your data synchronization problem is called the producer consumer problem. There are a lot of ways to tackle synchronization in JavaScript - this recent piece by Q's KrisKowal is a good read on the subject.

Enter: Promises

The simplest way to solve it with promises is to chain everything through a single promise. I know you're experienced with promises yourself but for newer readers let's recap:

Promises are an abstraction over the notion of sequencing itself. A promise is a single (read discrete) unit of action. Chaining promises, much like ; in some languages, notes the end of one operation and the start of the next. Promises in JavaScript abstract two main things - the notion of actions taking time and exceptional conditions.

There is a 'higher' abstraction at play here called a monad, while A+ promises do not abide the monad laws strictly (for convenience) there are implementations of promises that do. Promises abstract a certain kind of processing where monads abstract the notion of processing itself, you can say that a promise is a monad or for the very least that they are monadic.

Promises start off as pending meaning they represent an action that has already started but has not completed yet. At some point they might go through resolution during which they settle at one of two states:

  • Fulfiled - indicating that the action has completed successfully.
  • Rejected - indicating that the action has not completed successfully.

Once a promise is settled it can no longer change its state. Just like you can continue a ; on the next line - you can continue a promise with the .then keyword which chains the previous action to the next.

Solving producer - consumer.

A traditional solution to the producer/consumer problem can be accomplished with traditional concurrency constructs like Dijkstra's semaphores. Indeed such a solution exists through promises or plain callbacks but I believe we can do something similar.

Instead, we'll keep a program running, and append new actions to it every time.

var fsQueue = Promise.resolve(); // start a new chain

// one place
fsQueue = fsQueue.then(function(){ // assuming promisified fs here
    return fs.writeAsync(...); 
});

// some other place
fsQueue = fsQueue.then(function(){
    return fs.writeAsync(...);
});

Adding actions to the queue assures we have ordered synchronization and actions will only execute after earlier ones have finished. This is the simplest synchronization solution to this problem and requires wrapping fs.asyncFunction calls by .thening them to your queue.

An alternative solution would be using something akin to a "monitor" - we can ensure the access is consistent from within by wrapping fs:

var fs = B.promisifyAll(require("fs")); // bluebird promisified fs 
var syncFs = { // sync stands for synchronized, not synchronous
    queue: B.resolve();
    writeAsync = function(){
        var args = arguments
        return (queue = queue.then( // only execute later
            return fs.writeAsync.apply(fs,arguments);
        });
    } // promisify other used functions similarly
};

Which would produce synchronized versions of fs actions. It is also possible to automate this (haven't tested) using something similar:

// assumes module is promisified and ignores nested functions
function synchronize(module){
    var ret = {}, queue = B.resolve();
    for(var fn in module){
        ret[fn] = function(){
            var args = arguments;
            queue = queue.then(function(){
                return module[fn].apply(module, args); 
            })
        };
    }
    ret.queue = queue; // expose the queue for handling errors
    return ret;
}

Which should produce a version of a module that synchronizes all its actions. Note that we get the added benefit that errors don't get suppressed and the file system will not be in inconsistent state because actions won't get executed until the error that caused the action not to execute gets handled.

Isn't that kind of similar to a queue?

Yes! Queues do something very similar (which you can see in the other answer) by providing a first in first out structure for actions. Much like program code which executes in that order to begin with. Promises are simply a stronger side of the same coin in my opinion.

The other answer also provides a viable option through queues.

About your suggested approaches

Switch to synchronous IO to write the data to disk (don't really want to do that for server responsiveness reasons).

While I agree this is the simplest - the 'monitor' approach of chaining all actions you need synchronized on the same queue is very similar.

Set a flag when I started writing data and don't record any new data while that flag is set (causes me to lose the recording of data during the write).

That flag is effectively a mutex. If you block (or yield and put the action in a queue) when someone retries to do that you've got a real mutex that holds the "mutex guarantees".

Retrying with that flag, and keeping a list of next actions to hold the flag is actually very common in implementations of a semaphore - one example is in the linux kernel.

More complicated versions of option 2 where I set the flag and when the flag is set, new data goes in a separate, temporary data structure that when the file IO is done is then merged with the real data (doable, but seems ugly). Take a snapshot copy of the original data and take your time to write that copy to disk knowing that nobody else will be modifying the copy. I don't want to do this because the data set is relatively large and I'm in a limited memory environment (Raspberry PI).

These approaches are usually called transactional RCU updates, they're actually very modern and very fast in some cases - for example for the "readers writers problem" (which is very very similar to what you have). Native support for these kicked in the linux kernel quite recently. Doing this in certain cases is actually both viable and performant although in your case is overcomplicating things a bit like you suggest.

So, to sum it up

  • It's not an easy problem, but an interesting one.
  • Luckily, promises solve it pretty well, they were built exactly to solve this sort of problem by abstracting the notion of a sequence.

Happy coding, Pi NodeJS project sounds awesome. Let me know if I could clarify this any further.

Community
  • 1
  • 1
Benjamin Gruenbaum
  • 270,886
  • 87
  • 504
  • 504
  • I appreciate the effort you went to explain how promises could be used in this regard, but they didn't seem as good a fit as a simple queue so I went with the queue. I am using promises all over the place to manage async IO (which is quite a pain to write even with promises). At least promises for my IO uses make the error handling more manageable. – jfriend00 Sep 08 '14 at 03:18
  • You're free to use it (or not) as you wish, but in my opinion if you're already using promises using `async`'s queue in addition is redundant since you already get this functionality from promises for free. – Benjamin Gruenbaum Sep 08 '14 at 09:38