17

I am trying to implement a data processing pipeline using TPL Dataflow. However, I am relatively new to dataflow and not completely sure how to use it properly for the problem I am trying to solve.

Problem:

I am trying to iterate through the list of files and process each file to read some data and then further process that data. Each file is roughly 700MB to 1GB in size. Each file contains JSON data. In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<> with yield return and then further process the data.

Once I get list of files, I want to process maximum 4-5 files at a time in parallel. My confusion comes from:

  • How to use IEnumerable<> and yeild return with async/await and dataflow. Came across this answer by svick, but still not sure how to convert IEnumerable<> to ISourceBlock and then link all blocks together and track completion.
  • In my case, producer will be really fast (going through list of files), but consumer will be very slow (processing each file - read data, deserialize JSON). In this case, how to track completion.
  • Should I use LinkTo feature of datablocks to connect various blocks? or use method such as OutputAvailableAsync() and ReceiveAsync() to propagate data from one block to another.

Code:

private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
    PrepareDataflow(token);
    var bufferTask = ListFilesAsync(_fileBufferBlock, token);

    var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
    return Task.WhenAll(tasks);
}

private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
    ...
    // Get list of file Uris
    ...
    foreach(var fileNameUri in fileNameUris)
        await targetBlock.SendAsync(fileNameUri, token);

    targetBlock.Complete();
}

private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
    var httpClient = new HttpClient();
    try
    {
        using (var stream = await httpClient.GetStreamAsync(fileNameUri))
        using (var sr = new StreamReader(stream))
        using (var jsonTextReader = new JsonTextReader(sr))
        {
            while (jsonTextReader.Read())
            {
                if (jsonTextReader.TokenType == JsonToken.StartObject)
                {
                    try
                    {
                        var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
                        await _messageBufferBlock.SendAsync(data, token);
                    }
                    catch (Exception ex)
                    {
                        _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
                    }
                }
            }
        }
    }
    catch(Exception ex)
    {
        // Should throw?
        // Or if converted to block then report using Fault() method?
    }
    finally
    {
        httpClient.Dispose();
        buffer.Complete();
    }
}

private void PrepareDataflow(CancellationToken token)
{
    _fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
    {
        CancellationToken = token
    });

    var actionExecuteOptions = new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = ProcessingSize,
        MaxMessagesPerTask = 1,
        MaxDegreeOfParallelism = ProcessingSize
    };
    _processingBlock = new ActionBlock<string>(async fileName =>
    {
        try
        {
            await ProcessFileAsync(fileName, token);
        }
        catch (Exception ex)
        {
            _logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
            // Should fault the block?
        }
    }, actionExecuteOptions);

    _fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });

    _messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
    {
        CancellationToken = token,
        BoundedCapacity = 50000
    });
    _messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}

In the above code, I am not using IEnumerable<DataType> and yield return as I cannot use it with async/await. So I am linking input buffer to ActionBlock<DataType> which in turn posts to another queue. However by using ActionBlock<>, I cannot link it to next block for processing and have to manually Post/SendAsync from ActionBlock<> to BufferBlock<>. Also, in this case, not sure, how to track completion.

This code works, but, I am sure there could be better solution then this and I can just link all the block (instead of ActionBlock<DataType> and then sending messages from it to BufferBlock<DataType>)

Another option could be to convert IEnumerable<> to IObservable<> using Rx, but again I am not much familiar with Rx and don't know exactly how to mix TPL Dataflow and Rx

Community
  • 1
  • 1
Tejas Vora
  • 538
  • 9
  • 19
  • Your processing is CPU bound. Therefore, async IO is pointless. It does not save you one millisecond of processing time. Delete everything async and the problem becomes easy. – usr Feb 12 '16 at 22:20
  • 2
    @usr: I haven't looked closely at this specific scenario; the question is too broadly stated, and doesn't provide a good [mcve] with which one would actually fully understand the context. It may well be that async operations here are not useful. However, it is IMHO a fallacy to think that just because processing is CPU bound, async I/O is "pointless". Async operations provide architectural benefits independent of possible performance benefits, and the lack of the latter does not preclude the possibility of the former. – Peter Duniho Feb 12 '16 at 23:36
  • @PeterDuniho What architectural benefits are there? You can always simulate any form of concurrency or parallelism using threads. The only point of async IO is being threadless (and in case of async IO plus await being awesome with GUI scenarios). The code quality detriments are significant, however. – usr Feb 12 '16 at 23:53
  • 1
    Going to refrain from closing this. Somehow I think there is a good core to this question. Since it is novel material, as opposed to the 100 rote async questions per day ("Oh my app locked up because I called Result or Wait!"), I'll give this the benefit of the doubt. @PeterDuniho – usr Feb 12 '16 at 23:53
  • 2
    _"The only point of async IO is being threadless"_ -- I guess we'll have to agree to disagree. First, async I/O isn't even "threadless"; it just happens to use the IOCP thread pool instead of requiring additional explicitly-created threads. Second, the `async` idiom in C# provides a very good, clean way to implement asynchronous code in a virtually non-asynchronous form, which is useful regardless of any performance benefits. YMMV. – Peter Duniho Feb 12 '16 at 23:58
  • Actually, there is not even an IOCP thread. http://blog.stephencleary.com/2013/11/there-is-no-thread.html If there are 10m sockets open and being read, there are not 10m IOCP threads. Maybe a few dozen. I love await, though :) – usr Feb 13 '16 at 00:02
  • There will be IO bound work here. The part which reads the file - either from Disk or using `HttpClient` and opens a stream - which I feed into `StreamReader` and `JsonTextReader`. - So yes, to answer your comments (@usr) - there will be IO work and also `async/await` provides a cleaner way to implement code – Tejas Vora Feb 13 '16 at 02:49
  • Though `JSON.NET` does not support `async/await` yet. Think of a scenario, whereby I don't have file on disk, but hosted on web server. So, I would have something like `stream = await httpClient.GetStreamAsync(uri)` and then pass that stream to `StreamReader` and in turn `JsonTextReader`. Also, the file would be really large, so instead of deserializing whole file at once, I would like to process it record by record in file. That way I would not hit `OutOfMemoryException` – Tejas Vora Feb 13 '16 at 02:55
  • @TejasVora, the `_fileBufferBlock` implementation feels a bit... dirty. All you're doing is dumping your file names into the `BufferBlock`, which does not have a capacity limit, incurring `async/await` overheads for *zero* benefit. You could reduce the number of moving parts and just post or send each of your filenames directly to your `ActionBlock` instead. That `ActionBlock` *also* has a `BoundedCapacity`, and so will throttle the producer for you, thereby managing the back-pressure (which may be important as you stated that your producer is much faster than consumer). – Kirill Shlenskiy Feb 13 '16 at 03:23
  • You would like to introduce a caching or que in between when producer and consumer are running at different pace (usually producer is faster than consumer). Depending on your requirements, queue could be as simple as MSMQ to a high performance one like RabitMQ, Redis and so on. – Saleem Feb 13 '16 at 03:24
  • @Saleem, respectfully, TPL Dataflow has perfectly functional controls for synchronising producer and consumer blocks, so bringing in a monster like MSMQ into this is absolutely, totally unnecessary. – Kirill Shlenskiy Feb 13 '16 at 03:26
  • @KirillShlenskiy You are correct. Instead of posting to `BufferBlock<>`, I can directly post to `ActionBlock<>` or `TransformBlock<,>` or `IPropagateBlock<,>` and set `BoundedCapacity` and `MaxMessagesPerTask` and `MaxDegreeOfParallelism` on that block. The code above is sort of sample code. – Tejas Vora Feb 13 '16 at 04:58
  • @TejasVora are there fragments of a file that could be processed in parallel, or it is strictly sequential? – alexm Feb 13 '16 at 06:29
  • @alexm it's strictly sequential, as it contains JSON list. So I can't break it down and have different tasks process different parts. – Tejas Vora Feb 13 '16 at 06:35
  • ok, another question: How complex is to process a single DataType structure? – alexm Feb 13 '16 at 06:36
  • @alexm It's a fairly complex structure. With lots of nested classes and arrays. – Tejas Vora Feb 13 '16 at 06:37
  • I should have clarified the question: after DataType instance is constructed is there still some *expensive* processing to be done? – alexm Feb 13 '16 at 07:08
  • @alexm yes, need to push that data to various datastore/database – Tejas Vora Feb 13 '16 at 07:27
  • @TejasVora: If it is acceptable to read all `DataType` instances for a file at the same time (into an array), then you can use `TransformManyBlock`. If it's not, then what you have is as good as dataflow can do (Rx can do more). – Stephen Cleary Feb 13 '16 at 13:28
  • @StephenCleary that's the problem. Cannot read everything from file into array or list. Tis will cause out of memory exception. So the nest choice I have is to use `Ienumerable<>` or `IObservable<>` and process the records from file. – Tejas Vora Feb 13 '16 at 20:11
  • @TejasVora: See my answer. Since the JSON readers force synchrony, that's the best you can do. – Stephen Cleary Feb 13 '16 at 21:56

3 Answers3

10

Question 1

You plug an IEnumerable<T> producer into your TPL Dataflow chain by using Post or SendAsync directly on the consumer block, as follows:

foreach (string fileNameUri in fileNameUris)
{
    await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}

You can also use a BufferBlock<TInput>, but in your case it actually seems rather unnecessary (or even harmful - see the next part).

Question 2

When would you prefer SendAsync instead of Post? If your producer runs faster than the URIs can be processed (and you have indicated this to be the case), and you choose to give your _processingBlock a BoundedCapacity, then when the block's internal buffer reaches the specified capacity, your SendAsync will "hang" until a buffer slot frees up, and your foreach loop will be throttled. This feedback mechanism creates back pressure and ensures that you don't run out of memory.

Question 3

You should definitely use the LinkTo method to link your blocks in most cases. Unfortunately yours is a corner case due to the interplay of IDisposable and very large (potentially) sequences. So your completion will flow automatically between the buffer and processing blocks (due to LinkTo), but after that - you need to propagate it manually. This is tricky, but doable.

I'll illustrate this with a "Hello World" example where the producer iterates over each character and the consumer (which is really slow) outputs each character to the Debug window.

Note: LinkTo is not present.

// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
    await Task.Delay(100);

    Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

var producer = new ActionBlock<string>(async s =>
{
    foreach (char c in s)
    {
        await consumer.SendAsync(c);

        Debug.Print($"Yielded {c}");
    }
});

try
{
    producer.Post("Hello world");
    producer.Complete();

    await producer.Completion;
}
finally
{
    consumer.Complete();
}

// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);

This outputs:

Yielded H
H
Yielded e
e
Yielded l
l
Yielded l
l
Yielded o
o
Yielded  

Yielded w
w
Yielded o
o
Yielded r
r
Yielded l
l
Yielded d
d

As you can see from the output above, the producer is throttled and the handover buffer between the blocks never grows too large.

EDIT

You might find it cleaner to propagate completion via

producer.Completion.ContinueWith(
    _ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);

... right after producer definition. This allows you to slightly reduce producer/consumer coupling - but at the end you still have to remember to observe Task.WhenAll(producer.Completion, consumer.Completion).

Kirill Shlenskiy
  • 9,367
  • 27
  • 39
8

In order to process these files in parallel and not run of of memory, I am trying to use IEnumerable<> with yield return and then further process the data.

I don't believe this step is necessary. What you're actually avoiding here is just a list of filenames. Even if you had millions of files, the list of filenames is just not going to take up a significant amount of memory.

I am linking input buffer to ActionBlock which in turn posts to another queue. However by using ActionBlock<>, I cannot link it to next block for processing and have to manually Post/SendAsync from ActionBlock<> to BufferBlock<>. Also, in this case, not sure, how to track completion.

ActionBlock<TInput> is an "end of the line" block. It only accepts input and does not produce any output. In your case, you don't want ActionBlock<TInput>; you want TransformManyBlock<TInput, TOutput>, which takes input, runs a function on it, and produces output (with any number of output items for each input item).

Another point to keep in mind is that all buffer blocks have an input buffer. So the extra BufferBlock is unnecessary.

Finally, if you're already in "dataflow land", it's usually best to end with a dataflow block that actually does something (e.g., ActionBlock instead of BufferBlock). In this case, you could use the BufferBlock as a bounded producer/consumer queue, where some other code is consuming the results. Personally, I would consider that it may be cleaner to rewrite the consuming code as the action of an ActionBlock, but it may also be cleaner to keep the consumer independent of the dataflow. For the code below, I left in the final bounded BufferBlock, but if you use this solution, consider changing that final block to a bounded ActionBlock instead.

private const int ProcessingSize= 4;
private static readonly HttpClient HttpClient = new HttpClient();
private TransformBlock<string, DataType> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;

public Task ProduceAsync()
{
  PrepareDataflow(token);
  ListFiles(_fileBufferBlock, token);
  _processingBlock.Complete();
  return _processingBlock.Completion;
}

private void ListFiles(ITargetBlock<string> targetBlock, CancellationToken token)
{
  ... // Get list of file Uris, occasionally calling token.ThrowIfCancellationRequested()
  foreach(var fileNameUri in fileNameUris)
    _processingBlock.Post(fileNameUri);
}

private async Task<IEnumerable<DataType>> ProcessFileAsync(string fileNameUri, CancellationToken token)
{
  return Process(await HttpClient.GetStreamAsync(fileNameUri), token);
}

private IEnumerable<DataType> Process(Stream stream, CancellationToken token)
{
  using (stream)
  using (var sr = new StreamReader(stream))
  using (var jsonTextReader = new JsonTextReader(sr))
  {
    while (jsonTextReader.Read())
    {
      token.ThrowIfCancellationRequested();
      if (jsonTextReader.TokenType == JsonToken.StartObject)
      {
        try
        {
          yield _jsonSerializer.Deserialize<DataType>(jsonTextReader);
        }
        catch (Exception ex)
        {
          _logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
        }
      }
    }
  }
}

private void PrepareDataflow(CancellationToken token)
{
  var executeOptions = new ExecutionDataflowBlockOptions
  {
    CancellationToken = token,
    MaxDegreeOfParallelism = ProcessingSize
  };
  _processingBlock = new TransformManyBlock<string, DataType>(fileName =>
      ProcessFileAsync(fileName, token), executeOptions);

  _messageBufferBlock = new BufferBlock<DataType>(new DataflowBlockOptions
  {
    CancellationToken = token,
    BoundedCapacity = 50000
  });
}

Alternatively, you could use Rx. Learning Rx can be pretty difficult though, especially for mixed asynchronous and parallel dataflow situations, which you have here.

As for your other questions:

How to use IEnumerable<> and yeild return with async/await and dataflow.

async and yield are not compatible at all. At least in today's language. In your situation, the JSON readers have to read from the stream synchronously anyway (they don't support asynchronous reading), so the actual stream processing is synchronous and can be used with yield. Doing the initial back-and-forth to get the stream itself can still be asynchronous and can be used with async. This is as good as we can get today, until the JSON readers support asynchronous reading and the language supports async yield. (Rx could do an "async yield" today, but the JSON reader still doesn't support async reading, so it won't help in this particular situation).

In this case, how to track completion.

If the JSON readers did support asynchronous reading, then the solution above would not be the best one. In that case, you would want to use a manual SendAsync call, and would need to link just the completion of these blocks, which can be done as such:

_processingBlock.Completion.ContinueWith(
    task =>
    {
      if (task.IsFaulted)
        ((IDataflowBlock)_messageBufferBlock).Fault(task.Exception);
      else if (!task.IsCanceled)
        _messageBufferBlock.Complete();
    },
    CancellationToken.None,
    TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
    TaskScheduler.Default);

Should I use LinkTo feature of datablocks to connect various blocks? or use method such as OutputAvailableAsync() and ReceiveAsync() to propagate data from one block to another.

Use LinkTo whenever you can. It handles all the corner cases for you.

// Should throw? // Should fault the block?

That's entirely up to you. By default, when any processing of any item fails, the block faults, and if you are propagating completion, the entire chain of blocks would fault.

Faulting blocks are rather drastic; they throw away any work in progress and refuse to continue processing. You have to build a new dataflow mesh if you want to retry.

If you prefer a "softer" error strategy, you can either catch the exceptions and do something like log them (which your code currently does), or you can change the nature of your dataflow block to pass along the exceptions as data items.

Stephen Cleary
  • 437,863
  • 77
  • 675
  • 810
4

It would be worth looking at Rx. Unless I'm missing something your entire code that you need (apart from your existing ProcessFileAsync method) would look like this:

var query =
    fileNameUris
        .Select(fileNameUri =>
            Observable
                .FromAsync(ct => ProcessFileAsync(fileNameUri, ct)))
        .Merge(maxConcurrent : 4);

var subscription =
    query
        .Subscribe(
            u => { },
            () => { Console.WriteLine("Done."); });

Done. It's run asynchronously. It's cancellable by calling subscription.Dispose();. And you can specify the maximum parallelism.

Enigmativity
  • 113,464
  • 11
  • 89
  • 172