1

Recently I started to investigate a Pipeline pattern or also known as Pipes and Filters. I thought it is a good way to structure the code and applications which just process the data. I used this article as a base for my pipeline and steps implementation (but this is not so important). As usual blog is covering simple scenario but in my case I need (or maybe not) to work on IDisposable objects which might travel through the process.

For instance Streams

Let's consider simple pipeline which should load csv file and insert its rows into some db. In simple abstraction we could implement such functions

Stream Step1(string filePath)
IEnumerable<RowType> Step2(Stream stream)
bool Step3(IEnumerable<RowType> data)

Now my question is if that is a good approach. Because if we implement that as step after step processing the Stream object leaves first step and it is easy to fall into a memory leakage problem. I know that some might say that I should have Step1 which is loading and deserialising data but we are considering simple process. We might have more complex ones where passing a Stream makes more sense.

I am wondering how can I implement such pipelines to avoid memory leaks and also avoiding loading whole file into MemoryStream (which would be safer). Should I somehow wrap each step in try..catch blocks to call Dispose() if something goes wrong? Or should I pass all IDisposable resources into Pipeline object which will be wrapped with using to dispose all resources produced during processing correctly?

aloisdg
  • 22,270
  • 6
  • 85
  • 105
jgasiorowski
  • 1,033
  • 10
  • 20
  • It might be worth reconsidering whether the pattern is useful in this scenario. You can write this to deserialize the data into a collection, and then insert the items into a database. Those are two classes. You invoke them both, one after the other. Even if I was using some sort of pipeline I'd still isolate them and call them from the pipeline steps. So that raises the question - what does arranging them in a pipeline add in this case? It seems like we could arrange any sequential steps in pipelines, but should we? – Scott Hannen Jul 30 '19 at 16:15
  • @ScottHannen basically if I will use the pattern it will be easier to communicate with rest of the team - each member has different experience. So it is a big advantage of preparing few interfaces and saying to the team 'we should do that with pipeline pattern'. Then it is easy to design steps and share work between developers. I totally agree with you. If I would do it in side project then I don't mind. – jgasiorowski Jul 30 '19 at 21:33
  • Sharing work though interfaces is awesome, but you don't need a pipeline to do that. It might even make the intent less clear. You could define two abstractions as interfaces: One deserializes, and the other writes to the database. The application depends on both of those abstractions. And then different developers can work on their implementations. – Scott Hannen Jul 30 '19 at 21:47

2 Answers2

1

If it's planned to be used like Step3( Step2( Step1(filePath) ) ), then Step2 should dispose the stream. It may use yield return feature of c#, which creates an implementation of IEnumerator<> underneath, which implements IDisposable, and allows for "subscribing" for the "event" of the finishing of enumerating and call Stream.Dispose at that point. E.g. :

IEnumerable<RowType> Step2(Stream stream)
{
    using(stream)
    using(StreamReader sr = new StreamReader(stream))
    {
        while(!sr.EndOfStream)
        {
           yield return Parse(sr.ReadLine()); //yield return implements IEnumerator<>
        }
    } // finally part of the using will be called from IEnumerator<>.Dispose()        
}

Then if Step3 either uses LINQ

bool Step3(IEnumerable<RowType> data) => data.Any(item => SomeDecisionLogic(item));

or foreach

bool Step3(IEnumerable<RowType> data)
{
    foreach(var item in data)
        if(SomeDecisionLogic(item)))
            return true;
}

for enumerating, both of them guarantee to call IEnumerator<>.Dispose() (ref1, ECMA-334 C# Spec, ch.13.9.5 ), which will call Stream.Dispose

Renat
  • 7,718
  • 2
  • 20
  • 34
  • Thanks, it looks like it is not a problem to pass stream outside of a step - we just need to make sure we don't forget to dispose it in any scenario – jgasiorowski Jul 30 '19 at 21:37
  • Yes, and `foreach` / `LINQ` methods automatically call `Dispose` on `IEnumerator<>` – Renat Jul 30 '19 at 22:04
1

IMO it's worth having a pipeline if the interaction is between at least 2 different systems and if the work can be executed in parallel. Otherwise it's more overhead.

In this case there are 2 systems: the file system where the CSV file is and the database. I think the pipeline should have at least 2 steps that run in parallel:

IEnumerable<Row> ReadFromCsv(string csvFilePath)
void UpdateDabase<IEnumerable<Row> rows)

In this case it should be clear that the Stream is bound to ReadFromCsv.

IEnumerable<Row> ReadFromCsv(path)
{
  using(var stream = File.OpenRead(path))
  {
      var lines = GetLines(stream); // yield one at a time, not all at once
      foreach (var line in line) yield return GetRow(line);
  }
}

I guess the scope depends on the steps - which in turn depend on the way you design the pipeline based on your needs.

Suiden
  • 622
  • 4
  • 17
  • Also have a look at [TPL Dataflow](https://learn.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library). I have not used it, so my answer may be totally off when compared to this. – Suiden Jul 30 '19 at 19:53
  • Thanks. As I said earlier. I feel that using a pattern for such processes helps the team to communicate better and share the work. Your point of view is very helpful – jgasiorowski Jul 30 '19 at 21:42