6

I am trying to concoct a C++ data structure for modeling a simple N stage process where each stage can be replaced with a different function. One way is to use the OO approach and have an abstract base class with a virtual method for each stage; e.g.:

class Pipeline {
protected:
  virtual void processA(const In& in, BType& B) = 0;
  virtual void processB(const BType& B, BType& C) = 0;
  virtual void processC(const CType& C, BType& D) = 0;
  virtual void processD(const DType& D, Out& out) = 0;
public:
  void process(const In& in, Out& out) {
    Btype B;
    processA(in, B);
    Ctype C;
    processB(B, C);
    Btype D;
    processC(C, D);
    processD(D,out);
  }
};

The problem with this approach, if each of the N stages can be interchanged with M processes you have N*M possible subclasses.

Another idea is to store function objects:

class Pipeline {
public:
  std::function<void(const In& in, BType& B)> processA;
  std::function<void(const In& B, CType& C)> processB;
  std::function<void(const In& C, DType& D)> processC;
  std::function<void(const In& D, Out& out)> processD;
  void process(const In& in, Out& out) {
    Btype B;
    processA(in, B);
    Ctype C;
    processB(B, C);
    Btype D;
    processC(C, D);
    processD(D,out);
  }
};

The problem I am having with this approach is that the stages are not really independent and I would like a single object, in some cases, to store info concerning multiple stages.

Has anyone found a good data structure for a pipeline with replaceable parts? A bonus would be able to allow each stage to run concurrently.

wcochran
  • 10,089
  • 6
  • 61
  • 69
  • 2
    A quick glance at this and it seems super broken by design. Can't you make a standardized container for pipeline components and pack them into something like a `std::vector`? How performance sensitive is this code? Iterating over that might be good enough, but if you need absolute speed you may need to turn to a code generator or some really intense template work. – tadman Oct 10 '17 at 00:00
  • You should avoid output parameters. They are annoying to use as you demonstrated in your example and lead to not good code. – nwp Oct 10 '17 at 00:08
  • 1
    Can you give an example of how you would use the code assuming you had a perfect implementation? Would `Pipeline p{a2b, b2c, c2d}; bunch_of_ds = p.run(bunch_of_as);` be what it should look like in the end? – nwp Oct 10 '17 at 00:11
  • You want a pipeline whose stages are *independent?* What does that mean? You want the stages to "run concurrently"? Do you mean in sim time or real time? – Beta Oct 10 '17 at 00:16
  • 1
    @tadman The real application I am working on involves a fairly complex image processing pipeline. I want to be able to swap out stages and try a variety of different algorithms at each stage. Some of the stages will, in some cases, run on a GPU and can involve copying memory between the host and device. I wouldn't want to use a vector, since each element is of a different type (i.e., different functions with different parameters) -- wrapping them in a struct / class makes the most sense. – wcochran Oct 10 '17 at 00:21
  • 1
    @nwp Each stage can have some rather large specialized outputs (e.g., image pyramids) and the data structure will want to be reused. Are you implying I should wrap the input and output into one struct? That doesn't seem to appreciably change the problem. – wcochran Oct 10 '17 at 00:24
  • @Beta Eventually I want pipeline parallelism (think graphics pipeline). Each stage could be run on its own thread and each pipeline could be on its own stream. – wcochran Oct 10 '17 at 00:27
  • No, I'm implying that instead of `void (const In& in, BType& B)` you should be using `BType (const In& in)`. – nwp Oct 10 '17 at 00:27
  • 1
    @nwp I see. I actually will want to reuse the output data structures in my case since they are large and I want to avoid re-allocations when I can. – wcochran Oct 10 '17 at 00:30
  • I'm saying there's a few models to work with here in terms of how you design this. You could go with a chained function approach, like `x(y).y(z).z(a)...` and so on where the return value of one exposes a wrapper object that allows you to perform additional stage functions, or like the [Node.js streams model](https://nodejs.org/api/stream.html) where you explicitly pipe output from one stage into the next. – tadman Oct 10 '17 at 00:33
  • I could also see a solution with a long chain of inheritance, where the inheritance chain establishes how the input data is transformed into the final object. – tadman Oct 10 '17 at 00:34
  • 1
    @tadman Wouldn't that also result in the N*M processes too? – wcochran Oct 10 '17 at 00:39
  • @wcochran Why would it? Any linear chain shouldn't multiply the work unless you do something really, really odd. – tadman Oct 10 '17 at 00:40
  • @tadman you wouldn't have a linear chain, you would have a tree if you included all possibilities. – wcochran Oct 10 '17 at 00:42
  • One idea to think of: implement something like a "makefile" (set of targets, set of pattern rules). Construct all lazy objects up front, then force evaluation of whatever your main goals are. – o11c Oct 10 '17 at 01:01
  • Also, you want to think in terms of a [DAG](https://en.wikipedia.org/wiki/Directed_acyclic_graph), not a tree or sequence. – o11c Oct 10 '17 at 01:06
  • You should try posting this on Code Review. – BartoszKP Oct 10 '17 at 10:42

2 Answers2

11

Pointers to std function objects is a bad idea. They already can store pointers if needed.

I propose graphs.

A sink is a consumer:

template<class...Ts>
struct sink : std::function<void(Ts...)> {
  using std::function<void(Ts...)>::function;
};

A source is something that takes a consumer, and satisfies it:

template<class...Ts>
using source = sink<sink<Ts...>>;

A process is something that connects a producer to a consumer, possibly changing types:

template<class In, class Out>
using process = sink< source<In>, sink<Out> >;

Then we can define a pipeline operation:

template<class In, class Out>
sink<In> operator|( process< In, Out > a, sink< Out > b ){
  return [a,b]( In in ){
    a( [&in]( sink<In> s )mutable{ s(std::forward<In>(in)); }, b );
  };
}
template<class In, class Out>
source<Out> operator|( source< In > a, process< In, Out > b ){
  return [a,b]( sink<Out> out ){
    b( a, out );
  };
}

template<class In, class Mid, class Out>
process<In, Out> operator|( process<In, Mid> a, process<Mid, Out> b ){
  return [a,b]( source<In> in, sink<Out> out ){
    a( in, b|out ); // or b( in|a, out )
  };
}
template<class...Ts>
sink<> operator|( source<Ts...> a, sink<Ts...> b ){
  return[a,b]{ a(b); };
}

that should do it.

I assume that the state of the component pipeline elements is cheap to copy, so shared ptrs or raw pointers or somesuch.

If you want concurrency, simply spin up processes that provide queues of values and pass futures through the pipeline. But I think usually it is best to attach elements together and make the pipeline async, instead of stages.

Having the pipeline elements be things like gsl spans is also useful, allowing stages to have fixed buffers and pass results of computation in chunks without allocating.

A toy process to get you started:

process<char, char> to_upper = []( source<char> in, sink<char> out ){
  in( [&out]( char c ) { out( std::toupper(c) ); } );
};

and a source:

source<char> hello_world = [ptr="hello world"]( sink<char> s ){
  for (auto it = ptr; *it; ++it ){ s(*it); }
};
sink<char> print = [](char c){std::cout<<c;};

int main(){
  auto prog = hello_world|to_upper|print;
  prog();
}

outputs "HELLO WORLD".

live demo: https://ideone.com/MC4fDV

Note that this is a push based pipeline. A pull based pipeline is an alternative. Push pipelines allow easier job batching; pull pipelines can avoid making data that nobody wants. Push makes data spreading natural; pull makes data gathering natural.

Coroutines can also make this more natural. In a sense, the source is a coroutine that suspends when it calls the sink in a push pipeline. And in a pull the other way around. Coroutines may make push/pull to both work eitb the same processing code.

rturrado
  • 7,699
  • 6
  • 42
  • 62
Yakk - Adam Nevraumont
  • 262,606
  • 27
  • 330
  • 524
4

To make your first approach more interchangeable you could split up the abstract base class into multiple base classes, one per process. Then the base classes can be implemented by one or many objects. The pipeline would hold a reference, pointer or smart-pointer to each base class:

struct ProcessA {
  virtual void processA(const In& in, BType& B) = 0;
  virtual ~ProcessA() = default;
};
struct ProcessB {
  virtual void processB(const BType& B, CType& C) = 0;
  virtual ~ProcessB() = default;
};
// ...

struct Pipeline {
  ProcessA* processA;
  ProcessB* processB;
  ProcessC* processC;
  ProcessD* processD;

  void process(const In& in, Out& out) {
    BType B;
    processA->processA(in, B);
    CType C;
    processB->processB(B, C);
    DType D;
    processC->processC(C, D);
    processD->processD(D,out);
  }
};

struct SimpleProcessor : ProcessA, ProcessB, ProcessC, ProcessD {
  void processA(const In& in, BType& B) override;
  void processB(const BType& B, CType& C) override;
  void processC(const CType& C, DType& D) override;
  void processD(const DType& D, Out& out) override;
};

int main() {
  SimpleProcessor processor;
  Pipeline pipeline;
  pipeline.processA = &processor;
  pipeline.processB = &processor; 
  pipeline.processC = &processor; 
  pipeline.processD = &processor; 
  In in;
  Out out;
  pipeline.process(in, out);
}

Live demo.

Your second approach can work too. You can use something like a lambda to adapt a single object to fit each std::function:

struct Pipeline {
  std::function<void(const In& in, BType& B)>   processA;
  std::function<void(const BType& B, CType& C)> processB;
  std::function<void(const CType& C, DType& D)> processC;
  std::function<void(const DType& D, Out& out)> processD;

  void process(const In& in, Out& out) {
    BType B;
    processA(in, B);
    CType C;
    processB(B, C);
    DType D;
    processC(C, D);
    processD(D,out);
  }
};

int main() {
  SimpleProcessor proc;
  Pipeline pipeline;
  pipeline.processA = [&proc](const In& in, BType& B){ return proc.processA(in, B); };
  pipeline.processB = [&proc](const BType& B, CType& C){ return proc.processB(B, C); }; 
  pipeline.processC = [&proc](const CType& C, DType& D){ return proc.processC(C, D); }; 
  pipeline.processD = [&proc](const DType& D, Out& out){ return proc.processD(D, out); }; 
  In in;
  Out out;
  pipeline.process(in, out);
}

Live demo.

And yes, either approach would allow you to run each process concurrently but your BType, CType and DType would have to support concurrent access so they can be written to and read from at the same time. Concurrent queues for example.

Chris Drew
  • 14,926
  • 3
  • 34
  • 54
  • Interesting use of multiple inheritance with class SimpleProcessor. Also, I didn't know you could use std::function's as value types -- that is better than using pointers even though I don't understand have the compiler can allocate space for a value type for which it doesn't have specific information for. – wcochran Oct 10 '17 at 04:54
  • @wcochran multiple inheritance of pure abstract base classes (what would be called interfaces in other languages) is fairly standard nothing to be scared of. A typical implementation of `std::function` will use type erasure and will contain only a pointer to a base class so the size is known. See, for example, [this answer](https://stackoverflow.com/a/18453324/3422652). – Chris Drew Oct 11 '17 at 15:06