47

This is a design question regarding the implementation of a Pipeline. The following is my naive implementation.

Interface for individual steps/stages in the pipeline:

public interface Step<T, U> {
    public U execute(T input);
}

Concrete implementations of steps/stages in pipeline:

public class StepOne implements Step<Integer, Integer> {
    @Override
    public Integer execute(Integer input) {
        return input + 100;
    }
}

public class StepTwo implements Step<Integer, Integer> {
    @Override
    public Integer execute(Integer input) {
        return input + 500;
    }
}

public class StepThree implements Step<Integer, String> {
    @Override
    public String execute(Integer input) {
        return "The final amount is " + input;
    }
}

The pipeline class will hold/register the steps in the pipeline and execute them one after the other:

public class Pipeline {
    private List<Step> pipelineSteps = new ArrayList<>();
    private Object firstStepInput = 100;

    public void addStep(Step step) {
        pipelineSteps.add(step);
    }

    public void execute() {
        for (Step step : pipelineSteps) {
            Object out = step.execute(firstStepInput);
            firstStepInput = out;
        }
   }
}

Diver program to execute the pipeline:

public class Main {
    public static void main(String[] args) {
        Pipeline pipeline = new Pipeline();
        pipeline.addStep(new StepOne());
        pipeline.addStep(new StepTwo());
        pipeline.addStep(new StepThree());

        pipeline.execute();
    } 
}

However, as you can see the naive implementation has many limitations.

One of the major ones is that since the requirement is that the output of each step could be of any type, the naive implementation is not type-safe (the execute method in the Pipeline class). If I happen to wire the steps in the pipeline incorrectly, the app will fail.

Can anyone help me design the solution by adding to what I have coded, or point me towards an already existing pattern to solve this?

Veltzer Doron
  • 934
  • 2
  • 10
  • 31
Prashant Chauhan
  • 570
  • 1
  • 5
  • 14
  • 1
    [this question](http://stackoverflow.com/questions/5686332/pipeline-pattern-implementation-in-java) links to [this document](http://parlab.eecs.berkeley.edu/wiki/_media/patterns/pipeline-v1.pdf) which references patterns. – Nick Bell Oct 09 '16 at 18:48
  • Thank you @NickBell for pointing to the paper. However, through the paper, I am not able to understand how the pipeline can be designed so that it could handle the stages/steps with different output types. – Prashant Chauhan Oct 09 '16 at 19:14
  • 2
    I would consider looking into [Java 1.8+ streams](https://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html) as they provide [functionality](https://docs.oracle.com/javase/tutorial/collections/streams/) / [doc's](http://www.oracle.com/technetwork/articles/java/ma14-java-se-8-streams-2177646.html) to the example you've stated. See here for [duplicate](http://stackoverflow.com/questions/8680610/java-generics-chaining-together-generic-function-object) – Nick Bell Oct 09 '16 at 20:57
  • Thanks Nick for putting the link of the [duplicate question](http://stackoverflow.com/questions/8680610/java-generics-chaining-together-generic-function-object). The example provided there helped a lot. – Prashant Chauhan Oct 09 '16 at 23:00
  • @NickBell link is down – yolob 21 Apr 16 '20 at 18:33

6 Answers6

31

why do you need an additional Pipeline class ? I think you can remove the middle man. this will make your api simpler, for example:

Step<Integer, String> source = Step.of(Object::toString);
Step<Integer, Integer> toHex = source.pipe(it -> Integer.parseInt(it, 16));

toHex.execute(11/*0x11*/);// return 17;

you can implement your pipeline pattern simply in as below :

interface Step<I, O> {

    O execute(I value);

    default <R> Step<I, R> pipe(Step<O, R> source) {
        return value -> source.execute(execute(value));
    }

    static <I, O> Step<I, O> of(Step<I, O> source) {
        return source;
    }
}

in prior java version you can use an abstract class instead:

abstract static class Step<I, O> {

    public abstract O execute(I value);

    public <R> Step<I, R> pipe(Step<O, R> source) {
        return new Step<I, R>() {
            @Override
            public R execute(I value) {
                return source.execute(Step.this.execute(value));
            }
        };
    }

    public static <I, O> Step<I, O> of(Step<I, O> source) {
        return source;
    }
}
edward_wong
  • 442
  • 7
  • 21
holi-java
  • 29,655
  • 7
  • 72
  • 83
  • 2
    By decents, underrated answer! – Grim Aug 29 '17 at 21:11
  • 1
    This is amazing. Can you explain how `Step.of(Object::toString)` works? How is `toString` being interpreted as `Step`? – z0r May 24 '19 at 04:20
  • Just implemented this in an application, so little code yet so effective. Very nice. – RedShift Nov 01 '19 at 13:32
  • 3
    @z0r: it's the same as `Step.of(input -> input.toString());`. The `::` operator is a lambda expression to reference a method of a class. – RedShift Nov 01 '19 at 13:35
  • how do you handle conditionals within pipeing? lets say pipe1-> pipe2 -> pipe3 or pipe4 depending on a condition. – beatrice Jan 09 '23 at 16:23
  • 1
    @beatrice it is so easy, you just need to write a composite pipe and use strategy pattern. let's say an `CompoundPipe` consists of `pipe3`, `pipe4` and an selection strategy. – holi-java Jan 22 '23 at 10:29
22

I would focus on

If I happen to wire the steps in the pipeline incorrectly, the app will fail.

Yes, this is a problem. StepThree is the stranger here. I do not think one simple pattern might help, I do think it must be a combination of strategy and builder pattern. For example:

Pipeline<Integer,Integer> intPipe = new Pipeline<>();
intPipe = intPipe.add(new StepOne()); // increment 100
intPipe = intPipe.add(new StepTwo()); // increment 500
Pipeline<String, Integer> strPipe = intPipe.add(new StepThree()); // convert

Whereat Pipeline is like this:

public static class Pipeline<IN, OUT> {
   //...
   public<A> Pipeline<OUT,A> add(Step<IN,A> step) {
     pipelineSteps.add(step);
     return (Pipeline<OUT,A>)this;
   }
}

Using the fast-builder-syntax this might work:

Pipeline<String, Integer> pipe = new Pipeline<Integer, Integer>()
    .add(new StepOne()).add(new StepTwo()).add(new StepThree());

This should work since generics are not part of the bytecode.

RubioRic
  • 2,442
  • 4
  • 28
  • 35
Grim
  • 1,938
  • 10
  • 56
  • 123
14

You don't need to create a new Interface for this.

Java 8 already has a Functional Interface called Function and it allows you to create a Chaining of Functions (in other words, your Pipeline).

Function<Integer, Integer> addOne = it -> {
            System.out.println(it + 1);
            return it + 1;
        };

Function<Integer, Integer> addTwo = it -> {
            System.out.println(it + 2);
            return it + 2;
        };

Function<Integer, Integer> timesTwo = input -> {
            System.out.println(input * 2);
            return input * 2;
        };

final Function<Integer, Integer> pipe = addOne
        .andThen(timesTwo)
        .andThen(addTwo);

pipe.apply(10);

If you want to read more about Functional Interfaces: https://medium.com/@julio.falbo/java-recent-history-java-8-part-2-functional-interface-predefined-functional-interface-2494f25610d5

Júlio Falbo
  • 1,756
  • 1
  • 9
  • 11
  • Can we also have an abstract method inside the algorithm, and force the user to implement that method. Something like: abstract Function userDefined; final Function pipe = sourceInt.andThen(userDefined).andThen()... – mayank bisht Mar 29 '20 at 12:38
  • 2
    Up vote for using builtin interfaces. This simplest solution. – Mr Jedi May 07 '20 at 17:52
6

Your approach is pretty good. However, I'd code the Pipeline class like this:

public class Pipeline {
    private List<Step> pipelineSteps = new ArrayList<>();
    private Object firstStepInput = 100;

    public Pipeline() {
        pipelineSteps.add(new StepOne());
        pipelineSteps.add(new StepTwo());
        pipelineSteps.add(new StepThree());
    }

    public void execute() {
        for (Step step : pipelineSteps) {
            Object out = step.execute(firstStepInput);
            firstStepInput = out;
        }
    }

    public String getResult() {
        return (String) firstStepInput;
    }
}

This way, all of the specific step knowledge is encapsulated in the Pipeline class.

In this case, the execute method can perform a loop. However, the execute class can perform the steps one by one, if necessary.

Gilbert Le Blanc
  • 50,182
  • 6
  • 67
  • 111
1

You can basically use chain of responsibility design pattern

Vladimir Stazhilov
  • 1,956
  • 4
  • 31
  • 63
0
public class Pipeline {

    private List<Step> pipelineSteps = new ArrayList<>();
    private Object firstStepInput = 100;

    public Pipeline() {
        pipelineSteps.add(new StepOne());
        pipelineSteps.add(new StepTwo());
        pipelineSteps.add(new StepThree());
}
Greg
  • 1,671
  • 2
  • 15
  • 30
murali
  • 1
  • Please format (there's a preview window when you write your answer) & **explain** your answer. You do want people to understand what you did, don't you ? – Rafalon Oct 06 '17 at 09:17