0

I'm taking a look at the async library but I can't seem to find a control flow for handling pipelines. I'm just wondering if I'm missing something here.

I want to implement a pipeline. Example:

let pipeline = [];
pipeline.push((input, next) => { next(null, input); });
pipeline.push((input, next) => { next(null, input); });
var pipelineResult = pipelineRunner.run(pipeline, 'sample input', (error, result) => {});

Explanation: A series of functions is called. Each function receives an input and a next function. Each function processes the input and passes it as a parameter to the next function. As a result of the pipeline execution, I get the processed input, or, if any function calls next with an error, the pipeline stops and the callback is called.

I guess this is a pretty common use case so I think async can do it, but I'm not being able to find it. If you know of any other library that can achieve such result, that would be acceptable too.

Andre Pena
  • 56,650
  • 48
  • 196
  • 243

2 Answers2

1

You are looking for the async.waterfall function.

Alternatively you can apply asyc.seq or async.compose with multiple arguments if you need a function that you can pass an initial input to.

Bergi
  • 630,263
  • 148
  • 957
  • 1,375
  • Haha.. I just spent the last 30 minutes implementing my own (I just posted as an answer) because I didn't see that `waterfall` cound be used with async functions.. I thought the functions needed to return what would be passed to the next one. I'm going to accept your answer. – Andre Pena Dec 08 '15 at 01:20
0

I ended up implementing it myself even though, as @Bergi just showed, async does have support for it.

/**
 * Runs asynchronous pipelines
 */
class PipelineRunner {
    /**
     * Runs the given pipeline
     * @param pipeline - The array of functions that should be executed (middleware)
     * @param middlewareArgs - The array of arguments that should be passed in to the middleware
     * @param input
     * @param next
     */
    run(pipeline, middlewareArgs, input, next) {
        if (!pipeline) throw Error('\'pipeline\' should be truthy');
        if (!context) throw Error('\'context\' should be truthy');
        if (!input) throw Error('\'input\' should be truthy');
        if (!next) throw Error('\'next\' should be truthy');
        if (!pipeline.length) throw Error('\'pipeline.length\' should be truthy');

        let index = 0;

        // the link function "binds" every function in the pipeline array together
        let link = (error, result) => {
            if (error) {
                next(error);
                return;
            }
            let nextIndex = index++;
            if (nextIndex < pipeline.length) {
                let args = [result].concat(middlewareArgs).concat(link);
                pipeline[nextIndex].apply(null, args);
            }
            else {
                next(null, result);
            }
        };

        let args = [input].concat(middlewareArgs).concat(link);
        pipeline[index++].apply(null, args);
    }
}

export default new PipelineRunner();

Unit tests:

import chai from 'chai';
import pipelineRunner from '../src/server/lib/pipelines/pipelineRunner';
let assert = chai.assert;


describe('PipelineRunner', () => {
    describe('run', function() {
        it('Happy path', () => {
            let pipeline = [];
            pipeline.push((input, next) => { next(null, input); });
            pipeline.push((input, next) => { next(null, input); });

            pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
                assert.strictEqual(result, "happy");
            });
        });

        it('Happy path - with arguments', () => {
            let pipeline = [];
            pipeline.push((input, someArgument, next) => {
                assert.strictEqual(someArgument, 'something that should be passed in');
                next(null, input);
            });
            pipeline.push((input, someArgument, next) => { next(null, input); });

            pipelineRunner.run(pipeline, ['something that should be passed in'], 'happy', (error, result) => {
                assert.strictEqual(result, "happy");
            });
        });

        it('When something goes wrong', () => {
            let pipeline = [];
            pipeline.push((input, next) => { next(null, input); });
            pipeline.push((input, next) => { next('something went wrong'); });

            pipelineRunner.run(pipeline, [], 'happy', (error, result) => {
                assert.strictEqual(error, 'something went wrong');
            });
        });
    });
});
Andre Pena
  • 56,650
  • 48
  • 196
  • 243
  • WTH do you use a `class` with a method and instantiate it like a "singleton", instead of simply exporting that `run` function? – Bergi Dec 08 '15 at 01:19
  • Yes.. I do this by default but in this case, there shouldn't have any other method on that class so I might as well exporting a function as you said. – Andre Pena Dec 08 '15 at 01:22
  • Even if you have multiple methods, there is no reason to use a `class`. All you need is a ["static" object](http://stackoverflow.com/q/29893591/1048572) with those methods, or just multiple named exports. You should only use the `class` syntax if you actually need multiple instances - so unless you `export` the class (constructor) itself you're probably abusing it. – Bergi Dec 08 '15 at 01:26