1

I have a list of project numbers that I need to process. A project could have about 8000 items and I need to get the data for each item in the project and then push this data into a list of servers. Can anybody please tell me the following..

1) I have 1000 items in iR but only 998 were written to the servers. Did I loose items by using broadCastBlock? 2) Am I doing the await on all actionBlocks correctly? 3) How do I make the database call async?

Here is the database code

    public  MemcachedDTO GetIR(MemcachedDTO dtoItem)
    {

        string[] Tables = new string[] { "iowa", "la" };
        using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString))
        {
            using (SqlCommand command = new SqlCommand("test", connection))
            {
                DataSet Result = new DataSet();
                command.CommandType = CommandType.StoredProcedure;

                command.Parameters.Add("@ProjectId", SqlDbType.VarChar);
                command.Parameters["@ProjectId"].Value = dtoItem.ProjectId;


                connection.Open();
                Result.EnforceConstraints = false;
                Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables);
                dtoItem.test = Result;
            }
        }
        return dtoItem;
    }

Update: I have updated the code to the below. It just hangs when I run it and only writes 1/4 of the data to the server? Can you please let me know what I am doing wrong?

      public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
    {
        var targetsList = targets.ToList();

        var block = new ActionBlock<T>(
            async item =>
            {
                foreach (var target in targetsList)
                {
                    await target.SendAsync(item);
                }
            }, new ExecutionDataflowBlockOptions
            {
                CancellationToken = options.CancellationToken
            });

        block.Completion.ContinueWith(task =>
        {
            foreach (var target in targetsList)
            {
                if (task.Exception != null)
                    target.Fault(task.Exception);
                else
                    target.Complete();
            }
        });

        return block;
    }

    [HttpGet]
    public async Task< HttpResponseMessage> ReloadItem(string projectQuery)
    {
        try
        {

            var linkCompletion = new ExecutionDataflowBlockOptions
            {
                 MaxDegreeOfParallelism = 2
            };
             var cts = new CancellationTokenSource();
            var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token };


            IList<string> projectIds = projectQuery.Split(',').ToList();
            IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();

            var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
                dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

            List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>();


            List<MemcachedDTO> dtoList = new List<MemcachedDTO>();

            foreach (string pid in projectIds)
            {
                IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
                dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
                dtoList.AddRange(dtoTemp);
            }
            foreach (string s in serverList)
            {
                var action = new ActionBlock<MemcachedDTO>(
                async dto => await PostEachServerAsync(dto, s, "setitemcache"));
                actionList.Add(action);
            }
            var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions);

            foreach (MemcachedDTO d in dtoList)
            {
                await iR.SendAsync(d);
            }

            iR.Complete();
            iR.LinkTo(bBlock);
            await Task.WhenAll(actionList.Select(action => action.Completion).ToList());

            return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
        }
        catch (Exception ex)
        {
            return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
        }
    }
klkj898
  • 31
  • 5
  • Related: [BroadcastBlock with Guaranteed Delivery in TPL Dataflow](https://stackoverflow.com/questions/22127660/broadcastblock-with-guaranteed-delivery-in-tpl-dataflow) – Theodor Zoulias Nov 17 '20 at 17:10

1 Answers1

1

1) I have 1000 items in iR but only 998 were written to the servers. Did I loose items by using broadCastBlock?

Yes in the code below you set BoundedCapacity to one, if at anytime your BroadcastBlock cannot pass along an item it will drop it. Additionally a BroadcastBlock will only propagate Completion to one TargetBlock, do not use PropagateCompletion=true here. If you want all blocks to complete you need to handle Completion manually. This can be done by setting the ContinueWith on the BroadcastBlock to pass Completion to all of the connected targets.

var action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, s, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = 1 });
broadcast.LinkTo(action, linkCompletion);
actionList.Add(action);

Option: Instead of the BroadcastBlock use a properly bounded BufferBlock. When your downstream blocks are bound to one item they cannot receive additional items until they finish processing what they have. That will allow the BufferBlock to offer its items to another, possibly idle, ActionBlock.

When you add items into a throttled flow, i.e. a flow with a BoundedCapacity less than Unbounded. You need to be using the SendAsync method or at least handling the return of Post. I'd recommend simply using SendAsync:

foreach (MemcachedDTO d in dtoList)
{
    await iR.SendAsync(d);
}

That will force your method signature to become:

public async Task<HttpResponseMessage> ReloadItem(string projectQuery)

2) Am I doing the await on all actionBlocks correctly?

The previous change will permit you to loose the blocking Wait call in favor of a await Task.WhenAlll

iR.Complete();
actionList.ForEach(x => x.Completion.Wait());

To:

iR.Complete();
await bufferBlock.Completion.ContinueWith(tsk => actionList.ForEach(x => x.Complete());
await Task.WhenAll(actionList.Select(action => action.Completion).ToList());

3) How do I make the database call async?

I'm going to leave this open because it should be a separate question unrelated to TPL-Dataflow, but in short use an async Api to access your Db and async will naturally grow through your code base. This should get you started.

BufferBlock vs BroadcastBlock

After re-reading your previous question and the answer from @VMAtm. It seems you want each item sent to All five servers, in that case you will need a BroadcastBlock. You would use a BufferBlock to distribute the messages relatively evenly to a flexible pool of servers that each could handle a message. None the less, you will still need to take control of propagating completion and faults to all the connected ActionBlocks by awaiting the completion of the BroadcastBlock.

To Prevent BroadcastBlock Dropped Messages

In general you two options, set your ActionBlocks to be unbound, which is their default value:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = Unbounded });

Or broadcast messages your self from any variety of your own construction. Here is an example implementation from @i3arnon. And another from @svick

JSteward
  • 6,833
  • 2
  • 21
  • 30
  • Thank you for your detailed response @JSteward. I have updated my code in the above post and am still having issues... The application does not complete (I removed PropagateCompletion) and only writes 1/4 of the records. Could you please point me in the right direction? Thanks – klkj898 Jun 05 '17 at 18:48
  • The first thing that jumps out is: you're not linking your `TransformBlock` to your `BroadcastBlock` until after all data is sent into the flow. What happens when you fix that part? – JSteward Jun 06 '17 at 01:19
  • Another possibility, `PostEachServerAsync` is not awaited, if it's running `async` then it's being treated as fire and forget and may not complete by the time your flow completes. – JSteward Jun 06 '17 at 01:41
  • I have updated the code above @JSteward - please see updated code. I have linked the transform block to the broadcast block and awaited PostEachServerAsync. Now I am getting a Exception thrown: 'System.OutOfMemoryException' in System.Data.dll. Do you have any thoughts on how this can be fixed? – klkj898 Jun 06 '17 at 03:13
  • I don't actually see any update to your code, but you do have an unbound flow. Meaning that every `MemcachedDTO` is in memory. At this point you need to profile your code and find what's taking up so much memory. You can try throttling your flow to only allow _x_ number of items through at a given time. If you need further help, you'll need [minimal reproducible example](https://stackoverflow.com/help/mcve) and can post another question. – JSteward Jun 06 '17 at 12:50
  • I do see the `await` on `PostEachServerAsync` but an `OutOfMemoryException` likely means your trying to handle too much data all at once. You need to profile your code and find out what's eating up your memory. A [minimal reproducible example](https://stackoverflow.com/help/mcve) would be your best bet to get any further help with the exception. – JSteward Jun 06 '17 at 13:24