Nested tasks or ContinueWith()? Where to put WhenAll()
I am new to TPL and I'd like some guidance on how to accomplish this the RIGHT way. I have read up on the TPL library with these resources and I'm still unclear of the best approach:
- Parallel Programming with MS .NET (read online) : https://msdn.microsoft.com/en-us/library/ff963553.aspx
- MS Parallel Programming forum : https://social.msdn.microsoft.com/Forums/en-US/home?forum=parallelextensions
- Samples for Parallel Programming with the .NET Framework
- Task-based Asynchronous Programming
- And others
The end result is to submit over 500k records to a third party API. Attempts to do this in one step cause timeouts and other random errors.
Here is the current logic, which is only partially successful.
AssignGroupNumbers
assigns a Group Number to every record in the table. This results in each record being assigned to one of n groups, up to 50 or so. The original thought was to process each of these groups on a new thread therefore the GroupCount is user-defined allowing us to experiment with different numbers of threads. This may not be necessary with TPL.ProcessSourceData
reads the records for a specific group and callsProcessOneGroup
ProcessOneGroup
creates small XML documents of the records in each Group (about 100 records or so). This is also a user-defined number to allow experimentation.ProcessOneGroup
then callsSubmitToNLS
passing in the XML string.
SubmitToNLS
is the third party API which returns a success or failure status.
This diagram shows how I would break this up into tasks, so my questions are:
How to accomplish this without blocking the UI. I've tried [Task.Run( --> Parallel.ForEach] which works but how do I follow up with the subsequent tasks?
What is the best way to provide a progress bar, error handling and cancel-by-user in this scenario?
I hope the diagram and the narrative provide enough information. I have tried several approaches and don't currently have a presentable code sample.
UPDATE: After much reading and trial and error, I've written this code which seems to do the job. It's not DataFlow, but I intend to use that on my next project. Do you see any glaring problems with this code? Especially with regard to Exception handling and Updating the UI?
using ....
namespace actX
{
public partial class ImportactX_Parallel : Form
{
public ImportactX_Parallel()
{
InitializeComponent();
toolStripStatusLabel1.Text = string.Empty;
SetButtons("New");
}
Action _cancelWork;
/// <summary>
/// Assign Thread Numbers to each record in the selected dataset. Each group
/// of records will be extracted individually and then broken up into
/// batches of the size entered on this form.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnInitialize_Click(object sender, EventArgs e)
{
.
.
.
// ======================================================================== //
// Offload the Thread Number assignment so the UI thread remains responsive //
// ======================================================================== //
await Task.Run(() =>
{
try
{
using (var conn = new OracleConnection(oradb))
using (var cmd = new OracleCommand())
{
.
.
.
cmd.ExecuteNonQuery();
}
if (status.Contains("ERROR:"))
LogError("", process.Substring(0, 1), status);
}
catch (Exception ex)
{
status = "INITIALIZE ERROR: " + ex.Message;
}
});
// ================================================================= //
if (status.Contains("ERROR:"))
LogError("", process.Substring(0, 1), status);
.
.
.
MessageBox.Show("Initialization is complete", "Complete");
}
}
/// <summary>
/// Extract batches and add to a list with Parallel.For()
/// Submit each batch in the list with Parallel.ForEach()
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void btnSubmit_Click(object sender, EventArgs e)
{
.
.
.
try
{
// prepare to handle cancellation
var cts = new CancellationTokenSource();
var token = cts.Token;
this._cancelWork = () =>
{
this.btnCancel.Enabled = false;
cts.Cancel();
};
.
.
.
// ==========================
// Create the list of batches
// ==========================
await Task.Run(() => Parallel.For(1, threadCount + 1, options, (g, loopState) =>
{
var result = "Success";
try
{
using (var conn = new OracleConnection(oradb))
using (var cmdProcess = new OracleCommand())
{
.
.
.
var rdr = cmdProcess.ExecuteReader();
while (moreRecordsExist)
{
// check for cancellation
if (token.IsCancellationRequested)
{
loopState.Break();
return;
}
if (totalRecordsInThisBatch == 0)
{
// Start a new batch
}
// Add the current record to the batch
xmlBatch.Append(rdr["***"] + Environment.NewLine);
// Read the next XML record
moreRecordsExist = totalRecordsInThisBatch < 5 && rdr.Read(); // TEST: limit the record count for testing
if (totalRecordsInThisBatch >= MaxRecordsInOneBatch || moreRecordsExist == false)
{
// End the current batch
xmlBatch.Append("</actX>" + Environment.NewLine);
// Add the batch to the list
lock (lockList)
{
batchList.Add(xmlBatch.ToString());
}
// reset record count to trigger a new batch
totalRecordsInThisBatch = 0;
}
}
}
// Update progress indicators
lock (lockProgress)
{
progressBar1.BeginInvoke((MethodInvoker)delegate
{
progressBar1.Value++;
});
lstStatus.BeginInvoke((MethodInvoker)delegate
{
lstStatus.Items.Add(String.Format("{0}: Building batch list: Thread {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), g));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
});
}
}
catch (Exception ex)
{
result = String.Format("ERROR: {0}", ex.InnerException.Message);
}
if (result != "Success")
LogError("", process, result);
}));
lstStatus.Items.Add(String.Format("{0}: Building batch list: END", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.Items.Add("=============================================");
// ====================================================
// Submit all the batches in batchList to the processor
// ====================================================
var submitResult = await Task.Run(() => Parallel.ForEach(batchList, (batch, loopState) =>
{
var result = "Success";
// check for cancellation
if (token.IsCancellationRequested)
{
toolStripStatusLabel1.Text = "Cancellation requested, please wait for running threads to complete...";
lstStatus.BeginInvoke((MethodInvoker)delegate
{
lstStatus.Items.Add(String.Format("{0}: Cancellation requested...", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
});
loopState.Break();
}
// Initialize the ActiveX control for the current thread
Type actXType = Type.GetTypeFromProgID("activex control");
dynamic actX = Activator.CreateInstance(actXType);
if (actX != null)
{
// Verify the DB connection to actX
actX.ConnectionName = ConfigurationManager.AppSettings["actXConnectionName"];
if (actX.InitializedConnection())
{
actX.ImportString = batch;
if (actX.ImportXML == true)
{
.
.
.
}
else
{
result = "ERROR: " + actX.ErrorMessage;
}
}
actX = null;
}
else
{
result = "ERROR: Unable to create API object.";
}
if (result.Contains("ERROR:"))
LogError("", process, result);
// Update progress indicators
lock (lockProgress)
{
lstStatus.BeginInvoke((MethodInvoker)delegate
{
lstStatus.Items.Add(String.Format("{0}: Submission result: {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), result));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
});
progressBar1.BeginInvoke((MethodInvoker)delegate
{
progressBar1.Value++;
});
}
}));
var cancelledByUser = submitResult.IsCompleted == false && submitResult.LowestBreakIteration.HasValue == true;
if (cancelledByUser)
{
toolStripStatusLabel1.Text = "Cancelled by user";
lstStatus.Items.Add(String.Format("{0}: Cancelled by user", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
}
else
{
toolStripStatusLabel1.Text = "Complete";
lstStatus.Items.Add(String.Format("{0}: Submitting batches: END", DateTime.Now.ToString("yyyyMMddHHmmss")));
lstStatus.TopIndex = lstStatus.Items.Count - 1;
}
}
catch (AggregateException ae)
{
// Handle exceptions
if (ae.InnerExceptions.Count > 0)
{
toolStripStatusLabel1.Text = "Completed: with errors.";
foreach (var ex in ae.InnerExceptions)
{
lstStatus.Items.Add(String.Format("{0}: AGGREGATED ERRORS: {1}", DateTime.Now.ToString("yyyyMMddHHmmss"), ex.InnerException.Message));
// Log the error
}
lstStatus.TopIndex = lstStatus.Items.Count - 1;
}
}
catch (Exception ex)
{
MessageBox.Show(ex.Message);
}
finally
{
// Save status to text file so we can verify that everything ran correctly
var logFile = "actXImport_" + DateTime.Now.ToString("yyyyMMddHHmmss") + ".log";
using (TextWriter tw = new StreamWriter(logFile, false))
{
for (int i = 0; i <= lstStatus.Items.Count - 1; i++)
tw.WriteLine(lstStatus.Items[i]);
}
MessageBox.Show("Import is complete." + Environment.NewLine + Environment.NewLine + "See the log (" + logFile + ") for details.", "Complete");
Application.Exit();
}
// set the buttons
SetButtons("Processed");
this._cancelWork = null;
}
}
private void btnCancel_Click(object sender, EventArgs e)
{
if (this._cancelWork != null)
this._cancelWork();
}
public void LogError(string actX_id, string process, string errMessage)
{
using (var conn = new OracleConnection(oradb))
using (var cmd = new OracleCommand())
{
try
{
if (errMessage.Length > 4000)
errMessage = errMessage.Substring(0, 3000);
.
.
.
cmd.ExecuteNonQuery();
}
catch (Exception ex)
{
throw new Exception(ex.Message);
}
}
}
public void SelectionsChanged(object sender, EventArgs e)
{
SetButtons("New");
}
private void SetButtons(string mode)
{
this.btnInitialize.Enabled = mode == "New" || mode == "Processed" || mode == "Initialized";
this.btnSubmit.Enabled = mode == "Initialized";
this.btnCancel.Enabled = mode == "Initializing" || mode == "Processing";
this.grpDataSet.Enabled = !btnCancel.Enabled;
this.grpProcess.Enabled = !btnCancel.Enabled;
}
}
}