3

I have some time consuming code in a foreach that uses task/await. it includes pulling data from the database, generating html, POSTing that to an API, and saving the replies to the DB.

A mock-up looks like this

List<label> labels = db.labels.ToList();
foreach (var x in list) 
{
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))

    //Render the HTML
    //do some fast stuff with objects

    List<response> res = await api.sendMessage(object);  //POST

    //put all the responses in the db
    foreach (var r in res) 
    {
        db.responses.add(r);
    }

    db.SaveChanges();
}

Time wise, generating the Html and posting it to the API seem to be taking most of the time.

Ideally it would be great if I could generate the HTML for the next item, and wait for the post to finish, before posting the next item.

Other ideas are also welcome. How would one go about this?

I first thought of adding a Task above the foreach and wait for that to finish before making the next POST, but then how do I process the last loop... it feels messy...

Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
Stefanvds
  • 5,868
  • 5
  • 48
  • 72

4 Answers4

5

You can do it in parallel but you will need different context in each Task.

Entity framework is not thread safe, so if you can't use one context in parallel tasks.

var tasks = myLabels.Select( async label=>{
    using(var db = new MyDbContext ()){
        // do processing...
        var response = await api.getresponse();
        db.Responses.Add(response);
        await db.SaveChangesAsync();
    } 
});

await Task.WhenAll(tasks);

In this case, all tasks will appear to run in parallel, and each task will have its own context.

If you don't create new Context per task, you will get error mentioned on this question Does Entity Framework support parallel async queries?

Akash Kava
  • 39,066
  • 20
  • 121
  • 167
  • 1
    It's maybe a very minor comment but I think the last line should be: `await Task.WhenAll(tasks); ` without **S** in **Task** as we are using [System.Threading.Tasks.Task Class](https://learn.microsoft.com/en-us/dotnet/api/system.threading.tasks.task?view=netcore-3.1) – Sebastian Inones Aug 24 '22 at 10:25
2

It's more an architecture problem than a code issue here, imo.

You could split your work into two separate parts:

  1. Get data from database and generate HTML
  2. Send API request and save response to database

You could run them both in parallel, and use a queue to coordinate that: whenever your HTML is ready it's added to a queue and another worker proceeds from there, taking that HTML and sending to the API.

Both parts can be done in multithreaded way too, e.g. you can process multiple items from the queue at the same time by having a set of workers looking for items to be processed in the queue.

MarcinJuraszek
  • 124,003
  • 15
  • 196
  • 263
2

This screams for the producer / consumer pattern: one producer produces data in a speed different than the consumer consumes it. Once the producer does not have anything to produce anymore it notifies the consumer that no data is expected anymore.

MSDN has a nice example of this pattern where several dataflowblocks are chained together: the output of one block is the input of another block.

Walkthrough: Creating a Dataflow Pipeline

The idea is as follows:

  • Create a class that will generate the HTML.
  • This class has an object of class System.Threading.Tasks.Dataflow.BufferBlock<T>
  • An async procedure creates all HTML output and await SendAsync the data to the bufferBlock
  • The buffer block implements interface ISourceBlock<T>. The class exposes this as a get property:

The code:

class MyProducer<T>
{
    private System.Threading.Tasks.Dataflow.BufferBlock<T> bufferBlock = new BufferBlock<T>();

    public ISourceBlock<T> Output {get {return this.bufferBlock;}

    public async ProcessAsync()
    {
        while (somethingToProduce)
        {
            T producedData = ProduceOutput(...)
            await this.bufferBlock.SendAsync(producedData);
        }
        // no date to send anymore. Mark the output complete:
        this.bufferBlock.Complete()
    }
}
  • A second class takes this ISourceBlock. It will wait at this source block until data arrives and processes it.
  • do this in an async function
  • stop when no more data is available

The code:

public class MyConsumer<T>
{
    ISourceBlock<T> Source {get; set;}
    public async Task ProcessAsync()
    {
        while (await this.Source.OutputAvailableAsync())
        {   // there is input of type T, read it:
            var input = await this.Source.ReceiveAsync();
            // process input
        }
        // if here, no more input expected. finish.
    }
}

Now put it together:

private async Task ProduceOutput<T>()
{
    var producer = new MyProducer<T>();
    var consumer = new MyConsumer<T>() {Source = producer.Output};
    var producerTask = Task.Run( () => producer.ProcessAsync());
    var consumerTask = Task.Run( () => consumer.ProcessAsync());
    // while both tasks are working you can do other things.
    // wait until both tasks are finished:
    await Task.WhenAll(new Task[] {producerTask, consumerTask});
}

For simplicity I've left out exception handling and cancellation. StackOverFlow has artibles about exception handling and cancellation of Tasks:

Harald Coppoolse
  • 28,834
  • 7
  • 67
  • 116
  • Thanks for your input. This sounds great, but I am guessing it is slightly overkill for this case, because the generation of 10k html's only takes something like 5 seconds. Meaning the whole procedure only would save that many seconds. Since talking to an API, I love the fact that I can easily chose how many parallel POST's I can make (in my solution) – Stefanvds Aug 05 '15 at 02:52
  • You're welcome. For me it was a nice exercise to create such a thing, especially when my producer sent downloaded files to consumers who had to interpret them, and send the converted data to another consumer while sending the original file to yet another consumer etc. It was nice to see in a windows form that all processes were cooperating in their own pace without me having to schedule anything. – Harald Coppoolse Aug 05 '15 at 14:20
  • I'm not working with a bigger data-set and am thinking of using this method. In your case i'm assuming your creator of tasks is the slower of the two? for me the processing/consumer would be the slower of the two. the problem here is i'd have to have like 5 parallel consumers for 1 'creator' of tasks. Any guidelines on that? – Stefanvds Sep 04 '15 at 03:40
0

This is what I ended up using: (https://stackoverflow.com/a/25877042/275990)

List<ToSend> sendToAPI = new List<ToSend>();
List<label> labels = db.labels.ToList();
foreach (var x in list) {
    var myLabels = labels.Where(q => !db.filter.Where(y => x.userid ==y.userid))
                         .Select(y => y.ID)
                         .Contains(q.id))

    //Render the HTML
    //do some fast stuff with objects
    sendToAPI.add(the object with HTML);
}

int maxParallelPOSTs=5;
await TaskHelper.ForEachAsync(sendToAPI, maxParallelPOSTs, async i => {
    using (NasContext db2 = new NasContext()) {
        List<response> res = await api.sendMessage(i.object);  //POST

        //put all the responses in the db
        foreach (var r in res) 
        {
            db2.responses.add(r);
        }

        db2.SaveChanges();
    }
});





    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body) {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select Task.Run(async delegate {
                using (partition)
                    while (partition.MoveNext()) {
                        await body(partition.Current).ContinueWith(t => {
                            if (t.Exception != null) {
                                string problem = t.Exception.ToString();
                            }
                            //observe exceptions
                        });

                    }
            }));
    }

basically lets me generate the HTML sync, which is fine, since it only takes a few seconds to generate 1000's but lets me post and save to DB async, with as many threads as I predefine. In this case I'm posting to the Mandrill API, parallel posts are no problem.

Community
  • 1
  • 1
Stefanvds
  • 5,868
  • 5
  • 48
  • 72