1

BusinessLogic Flowchart

Nested tasks or ContinueWith()? Where to put WhenAll()

I am new to TPL and I'd like some guidance on how to accomplish this the RIGHT way. I have read up on the TPL library with these resources and I'm still unclear of the best approach:

The end result is to submit over 500k records to a third party API. Attempts to do this in one step cause timeouts and other random errors.

Here is the current logic, which is only partially successful.

  • AssignGroupNumbers assigns a Group Number to every record in the table. This results in each record being assigned to one of n groups, up to 50 or so. The original thought was to process each of these groups on a new thread therefore the GroupCount is user-defined allowing us to experiment with different numbers of threads. This may not be necessary with TPL.

  • ProcessSourceData reads the records for a specific group and calls ProcessOneGroup

  • ProcessOneGroup creates small XML documents of the records in each Group (about 100 records or so). This is also a user-defined number to allow experimentation. ProcessOneGroup then calls SubmitToNLS passing in the XML string.

SubmitToNLS is the third party API which returns a success or failure status.

This diagram shows how I would break this up into tasks, so my questions are:

  • How to accomplish this without blocking the UI. I've tried [Task.Run( --> Parallel.ForEach] which works but how do I follow up with the subsequent tasks?

  • What is the best way to provide a progress bar, error handling and cancel-by-user in this scenario?

I hope the diagram and the narrative provide enough information. I have tried several approaches and don't currently have a presentable code sample.

UPDATE: After much reading and trial and error, I've written this code which seems to do the job. It's not DataFlow, but I intend to use that on my next project. Do you see any glaring problems with this code? Especially with regard to Exception handling and Updating the UI?

using ....

namespace actX
{
    public partial class ImportactX_Parallel : Form
    {
        public ImportactX_Parallel()
        {
            InitializeComponent();
            toolStripStatusLabel1.Text = string.Empty;
            SetButtons("New");
        }

        Action _cancelWork;

        /// <summary>
        /// Assign Thread Numbers to each record in the selected dataset. Each group 
        /// of records will be extracted individually and then broken up into 
        /// batches of the size entered on this form.
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnInitialize_Click(object sender, EventArgs e)
        {
            .
            .
            .
                // ======================================================================== //
                // Offload the Thread Number assignment so the UI thread remains responsive //
                // ======================================================================== //
                await Task.Run(() =>
                {
                    try
                    {
                        using (var conn = new OracleConnection(oradb))
                        using (var cmd = new OracleCommand())
                        {
                            .
                            .
                            .
                            cmd.ExecuteNonQuery();
                        }
                        if (status.Contains("ERROR:"))
                            LogError("", process.Substring(0, 1), status);
                    }
                    catch (Exception ex)
                    {
                        status = "INITIALIZE ERROR: " + ex.Message;
                    }
                });
                // ================================================================= //

                if (status.Contains("ERROR:"))
                    LogError("", process.Substring(0, 1), status);
                .
                .
                .

                MessageBox.Show("Initialization is complete", "Complete");
            }
        }

        /// <summary>
        /// Extract batches and add to a list with Parallel.For() 
        /// Submit each batch in the list with Parallel.ForEach()
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void btnSubmit_Click(object sender, EventArgs e)
        {
            .
            .
            .
                try
                {
                    // prepare to handle cancellation
                    var cts = new CancellationTokenSource();
                    var token = cts.Token;
                    this._cancelWork = () =>
                    {
                        this.btnCancel.Enabled = false;
                        cts.Cancel();
                    };
                    .
                    .
                    .

                    // ==========================
                    // Create the list of batches 
                    // ==========================
                    await Task.Run(() => Parallel.For(1, threadCount + 1, options, (g, loopState) =>
                    {
                        var result = "Success";
                        try
                        {
                            using (var conn = new OracleConnection(oradb))
                            using (var cmdProcess = new OracleCommand())
                            {
                                .
                                .
                                .
                                var rdr = cmdProcess.ExecuteReader();
                                while (moreRecordsExist)
                                {
                                    // check for cancellation
                                    if (token.IsCancellationRequested)
                                    {
                                        loopState.Break();
                                        return;
                                    }

                                    if (totalRecordsInThisBatch == 0)
                                    {
                                        // Start a new batch
                                    }

                                    // Add the current record to the batch
                                    xmlBatch.Append(rdr["***"] + Environment.NewLine);

                                    // Read the next XML record
                                    moreRecordsExist = totalRecordsInThisBatch < 5 && rdr.Read();  // TEST: limit the record count for testing

                                    if (totalRecordsInThisBatch >= MaxRecordsInOneBatch || moreRecordsExist == false)
                                    {
                                        // End the current batch
                                        xmlBatch.Append("</actX>" + Environment.NewLine);

                                        // Add the batch to the list
                                        lock (lockList)
                                        {
                                            batchList.Add(xmlBatch.ToString());
                                        }

                                        // reset record count to trigger a new batch
                                        totalRecordsInThisBatch = 0;
                                    }
                                }

                            }

                            // Update progress indicators
                            lock (lockProgress)
                            {
                                progressBar1.BeginInvoke((MethodInvoker)delegate
                                {
                                    progressBar1.Value++;
                                });

                                lstStatus.BeginInvoke((MethodInvoker)delegate
                                {
                                    lstStatus.Items.Add(String.Format("{0}: Building batch list: Thread {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), g));
                                    lstStatus.TopIndex = lstStatus.Items.Count - 1;
                                });
                            }
                        }
                        catch (Exception ex)
                        {
                            result = String.Format("ERROR: {0}", ex.InnerException.Message);
                        }

                        if (result != "Success")
                            LogError("", process, result);
                    }));
                    lstStatus.Items.Add(String.Format("{0}: Building batch list: END", DateTime.Now.ToString("yyyyMMddHHmmss")));
                    lstStatus.Items.Add("=============================================");

                    // ====================================================
                    // Submit all the batches in batchList to the processor
                    // ====================================================
                    var submitResult = await Task.Run(() => Parallel.ForEach(batchList, (batch, loopState) =>
                    {
                        var result = "Success";

                        // check for cancellation
                        if (token.IsCancellationRequested)
                        {
                            toolStripStatusLabel1.Text = "Cancellation requested, please wait for running threads to complete...";
                            lstStatus.BeginInvoke((MethodInvoker)delegate
                            {
                                lstStatus.Items.Add(String.Format("{0}: Cancellation requested...", DateTime.Now.ToString("yyyyMMddHHmmss")));
                                lstStatus.TopIndex = lstStatus.Items.Count - 1;
                            });
                            loopState.Break();
                        }

                        // Initialize the ActiveX control for the current thread
                        Type actXType = Type.GetTypeFromProgID("activex control");
                        dynamic actX = Activator.CreateInstance(actXType);
                        if (actX != null)
                        {
                            // Verify the DB connection to actX
                            actX.ConnectionName = ConfigurationManager.AppSettings["actXConnectionName"];
                            if (actX.InitializedConnection())
                            {
                                actX.ImportString = batch;
                                if (actX.ImportXML == true)
                                {
                                    .
                                    .
                                    .
                                }
                                else
                                {
                                    result = "ERROR: " + actX.ErrorMessage;
                                }
                            }
                            actX = null;
                        }
                        else
                        {
                            result = "ERROR: Unable to create API object.";
                        }
                        if (result.Contains("ERROR:"))
                            LogError("", process, result);

                        // Update progress indicators
                        lock (lockProgress)
                        {
                            lstStatus.BeginInvoke((MethodInvoker)delegate
                            {
                                lstStatus.Items.Add(String.Format("{0}: Submission result: {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), result));
                                lstStatus.TopIndex = lstStatus.Items.Count - 1;
                            });

                            progressBar1.BeginInvoke((MethodInvoker)delegate
                            {
                                progressBar1.Value++;
                            });
                        }
                    }));

                    var cancelledByUser = submitResult.IsCompleted == false && submitResult.LowestBreakIteration.HasValue == true;
                    if (cancelledByUser)
                    {
                        toolStripStatusLabel1.Text = "Cancelled by user";
                        lstStatus.Items.Add(String.Format("{0}: Cancelled by user", DateTime.Now.ToString("yyyyMMddHHmmss")));
                        lstStatus.TopIndex = lstStatus.Items.Count - 1;
                    }
                    else
                    {
                        toolStripStatusLabel1.Text = "Complete";
                        lstStatus.Items.Add(String.Format("{0}: Submitting batches: END", DateTime.Now.ToString("yyyyMMddHHmmss")));
                        lstStatus.TopIndex = lstStatus.Items.Count - 1;
                    }
                }
                catch (AggregateException ae)
                {
                    // Handle exceptions
                    if (ae.InnerExceptions.Count > 0)
                    {
                        toolStripStatusLabel1.Text = "Completed: with errors.";
                        foreach (var ex in ae.InnerExceptions)
                        {
                            lstStatus.Items.Add(String.Format("{0}: AGGREGATED ERRORS: {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), ex.InnerException.Message));
                            // Log the error
                        }
                        lstStatus.TopIndex = lstStatus.Items.Count - 1;
                    }
                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message);
                }
                finally
                {

                    // Save status to text file so we can verify that everything ran correctly
                    var logFile = "actXImport_" + DateTime.Now.ToString("yyyyMMddHHmmss") + ".log";
                    using (TextWriter tw = new StreamWriter(logFile, false))
                    {
                        for (int i = 0; i <= lstStatus.Items.Count - 1; i++)
                            tw.WriteLine(lstStatus.Items[i]);
                    }
                    MessageBox.Show("Import is complete." + Environment.NewLine + Environment.NewLine + "See the log (" + logFile + ") for details.", "Complete");
                    Application.Exit();
                }

                // set the buttons
                SetButtons("Processed");

                this._cancelWork = null;
            }
        }

        private void btnCancel_Click(object sender, EventArgs e)
        {
            if (this._cancelWork != null)
                this._cancelWork();
        }

        public void LogError(string actX_id, string process, string errMessage)
        {
            using (var conn = new OracleConnection(oradb))
            using (var cmd = new OracleCommand())
            {
                try
                {
                    if (errMessage.Length > 4000)
                        errMessage = errMessage.Substring(0, 3000);
                    .
                    .
                    .
                    cmd.ExecuteNonQuery();
                }
                catch (Exception ex)
                {
                    throw new Exception(ex.Message);
                }
            }
        }

        public void SelectionsChanged(object sender, EventArgs e)
        {
            SetButtons("New");
        }

        private void SetButtons(string mode)
        {
            this.btnInitialize.Enabled = mode == "New" || mode == "Processed" || mode == "Initialized";
            this.btnSubmit.Enabled = mode == "Initialized";
            this.btnCancel.Enabled = mode == "Initializing" || mode == "Processing";
            this.grpDataSet.Enabled = !btnCancel.Enabled;
            this.grpProcess.Enabled = !btnCancel.Enabled;
        }
    }
}

1 Answers1

0

TPL itself doesn't fit well for a queue implementation for a given scenario. There is a lot of options to do Producer/Consumer queue in .Net: TPL Dataflow, Rx.Net, ConcurrentQueue<T> clas, you name it.

For example, your code with TPL Dataflow will looks like:

var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

// store the group id
var buffer = new BufferBlock<int>();
// gather all the records for group
var transform = new TransformBlock<int, List<Record>>(groupId => GetSourceData(groupId));
buffer.LinkTo(transform, linkOptions);
// do something with records, no more than 4 groups simultaneously being handled
var action = new ActionBlock<List<Record>>(r => ProcessOneGroup(r), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
transform.LinkTo(action, linkOptions);

var n = 50;
AssignGroupNumbers();
for (var i = 0; i < n; ++i)
{
    await buffer.SendAsync(i);
}
buffer.Complete();
await action.Completion;

This is quite simple pipeline, which transforms your group id to records of such group, and handles them after that. Solution with dataflow could be tuned with number of options, like MaxDegreeOfParallelism, or even be rewritten with TransformManyBlock to handle each record separately. It's even possible to use async/await in it:

var buffer = new BufferBlock<int>();
var transform = new TransformManyBlock<int, Record>(groupId => GetSourceData(groupId));
buffer.LinkTo(transform, linkOptions);
var action = new ActionBlock<Record>(async r => await SubmitToNLS(r), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
transform.LinkTo(action, linkOptions);
...

A code similar to one above could be written with other libraries, and question itself is very broad.

VMAtm
  • 27,943
  • 17
  • 79
  • 125
  • TPL DataFlow does look promising... I guess it's time for more reading :) In the meantime, is there no way to accomplish this with basic TPL? – Larry Moody Jun 30 '17 at 21:33
  • Dataflow is a small library above TPL. You may read their code, and do something by yourself, but this is like ready-to-go solution. – VMAtm Jun 30 '17 at 22:39
  • It may be a good starting point, but I still need to support Progress, Exception handling and Cancel-by-User. And since I have no experience with TPL DataFlow, I wouldn't start down that path without a more thorough understanding of the library. – Larry Moody Jun 30 '17 at 22:59
  • Progress can be injected by some blocks in your pipeline, cancel is supported with token, as anythere in TPL. Exceptions must be handled in blocks. – VMAtm Jun 30 '17 at 23:05
  • Understood. I will work with this and post any questions I have. – Larry Moody Jun 30 '17 at 23:34