2

I am working on a solution where I have a list of payloads which are needed to POSTED to one API endpoint.

The payload structure looks something like this and I have IList of thousands of such payloads -

{
    "Body": [{
        "Payload": {
            "emailaddress1": "test@email.com",
            "initials": "RP",
            "lastname": "Patil",
            "firstname": "Patil"
        },
        "EntityName": "person",
        "Url": "https://<someurl>/api/v1/persons",
        "CollectionName": "persons",
        "Method": "POST"
    },
    {
        "Payload": {
            "study_name": "Chemistry",
            "teacher_name": "John Doe"
        },
        "EntityName": "study",
        "Url": "https://<someurl>/api/v1/studies",
        "CollectionName": "studies",
        "Method": "POST"
    }]
}

Now, the first object in this payload i.e. person needs to be created first and when I receive the ID of that entity, I add that entity in second payload which is `study' after which the object becomes something like this -

{
    "Payload": {
        "person_id" : <newly_generated_id>
        "study_name": "Chemistry",
        "teacher_name": "John Doe"
    },
    "EntityName": "study",
    "Url": "https://<someurl>/api/v1/studies",
    "CollectionName": "studies",
    "Method": "POST"
}

Currently I am doing it in old traditional way where I iterate through the IList and use async await. Wait for the response from first API call and then call the second one. However foreach being single threaded, it is taking forever to create those many entities.

 foreach(var payload in payloads)
 {

    var personObj = payload["Body"].Where(obj => obj["EntityName"].ToString().Equals("person")).FirstOrDefault();
    var studyObj = payload["Body"].Where(obj => obj["EntityName"].ToString().Equals("study")).FirstOrDefault();
    var personId = await _apiService.AddPerson(personObj);

    if(personId != null){
        studyObj["person_id"] = personId;
        var studyId = await _apiService.AddStudy(studyObj);

        //and more code.. this is just a sample code that demonstrates my implementation
    }
 }

This works well, however I am blocking all the threads till all thousands of persons and their studies have been created in our system. Can it be achieved by spawning single thread in each iteration? or similar implementation?

Can anyone guide me through better approach for this particular scenario? Something like Parallel.ForEach?

Mrinal Kamboj
  • 11,300
  • 5
  • 40
  • 74
Rahul Patil
  • 5,656
  • 6
  • 37
  • 65

2 Answers2

2

Extract logic inside foreach to the one asynchronous method and call this method in loop without waiting.
Wait for result only after looping whole list

var allTasks = new List<Task>();
foreach(var payload in payloads)
{
    allTasks.Add(SendPayloadAsync(payload));
}

await Task.WhenAll(allTasks);

private async Task SendPayloadAsync(Payload payload)
{
    var personObj = 
      payload["Body"].FirstOrDefault(obj => obj["EntityName"].ToString().Equals("person"));
    var studyObj = 
      payload["Body"].FirstOrDefault(obj => obj["EntityName"].ToString().Equals("study"));
    var personId = await _apiService.AddPerson(personObj);

    if(personId != null)
    {
        studyObj["person_id"] = personId;
        var studyId = await _apiService.AddStudy(studyObj);
    }
}

This approach will use only one thread.
You don't need sending every payload in parallel, creating new thread which only waiting for response from the service is waste of resources.
async-await was designing for this purpose.

Fabio
  • 31,528
  • 4
  • 33
  • 72
1

I like the idea of using Microsoft's Reactive Framework (NuGet "System.Reactive"). Then you can do this:

Func<JObject, string, JToken> getToken =
    (p, t) => p["Body"].Where(obj => obj["EntityName"].ToString() == t).FirstOrDefault();

Func<JToken, string, JToken> setPersonId = (j, t) =>
{
    j["person_id"] = t;
    return j;
};

var query =
    from payload in payloads.ToObservable()
    let personObj = getToken(payload, "person")
    let studyObj = getToken(payload, "study")
    from personId in Observable.FromAsync(() => _apiService.AddPerson(personObj))
    where personId != null
    from studyId in Observable.FromAsync(() => _apiService.AddPerson(setPersonId(studyObj, personId)))
    select new { personObj, studyObj, };

query.Subscribe();
Enigmativity
  • 113,464
  • 11
  • 89
  • 172
  • Can you tell me what are the benefits of implementing it this way? I would love to know more about this :-) – Rahul Patil Sep 29 '16 at 05:51
  • @RahulPatil - One of the key advantages is that this is a LINQ query, but based on observables, not enumerables, so it can nicely bring events and asynchronous operations together in a very compact form. It automatically handles the threading and you have full control of how the results are returned and handled. – Enigmativity Sep 29 '16 at 08:08
  • @Enigmativity, Why would you consider using Reactive programming in this case ? what are the observable events in this case and what's the follow up action post event, current code is quite similar to doing it for Enumerable – Mrinal Kamboj Sep 29 '16 at 18:52
  • @MrinalKamboj - I already gave a fairly good reason. Rx is a very powerful toolkit. I like it. – Enigmativity Sep 30 '16 at 00:12