[Note: Line numbers refer to those inserted with Crayon for Wordpress at original blog post here. At that link, you can also download a 7z of the entire VS 2012 solution.]
So this is the idea:

So let's see how I solved the issue of the blocking collection and then dishing our work to the Parser Tasks... I used a very generic, and easily re-usable approach of Producer and Consumer. In fact, in my example below, you can even adjust the amount of Producers as well. You can adjust my code below to more closely approximate your work, and then adjust the # of threads/tasks/Consumers to see how well parallelism can work for you.
First, the usual suspects in Imports..
Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent
Module modStartHere
'
' Producer
'
Dim itemsToProduce = 10
Dim sleepProducer = 10 ' in milliseconds
Dim producerStartID = 1
Dim producersNumToStart = 1
Dim ProducerCTSs As New ConcurrentBag(Of CancellationTokenSource)
Dim moreItemsToAdd As Boolean = True
'
' Consumer
'
Dim sleepConsumer = 1000 ' in milliseconds
Dim consumerStartID = 100
Dim consumersNumToStart = 3
Dim ConsumerCTSs As New ConcurrentBag(Of CancellationTokenSource)
The Producer(s) are initially set up with the above. While the itemsToProduce doesn't change during the program, the number of Producers & Consumers will. This is very rough as a draft, and it will no doubt be streamlined in your own code at some point, but this demonstrates how to solve this problem very well.
I used "IDs" to show in the output which thread was doing what. They only thing needed in production is the list of CTS instances:
'
' the multi-thread-safe queue that is produced to and consumed from
'
Dim bc As New BlockingCollection(Of Integer)
'
' this # will be what is actually produced & consumed (1, 2, 3, ...)
'
Dim itemId As Integer = 0
'
The main machine here is that one little line:
Dim bc As New BlockingCollection(Of Integer)
Microsoft says:
BlockingCollection Overview .NET Framework 4.5
BlockingCollection(Of T) is a thread-safe collection class that
provides the following features:
An implementation of the Producer-Consumer pattern.
Concurrent adding and taking of items from multiple threads.
Optional maximum capacity.
Insertion and removal operations that block when collection is empty or full.
Insertion and removal "try" operations that do not block or that block up to a specified period of time.
Encapsulates any collection type that implements IProducerConsumerCollection(Of T)
Cancellation with cancellation tokens.
Two kinds of enumeration with foreach (For Each in Visual Basic):
-Read-only enumeration.
-Enumeration that removes items as they are enumerated.
itemId is just a variable that holds the fake payload. Producers will increment it by one to simulate a different object instance, or unit of work. You just change the type the BlockingCollection holds...
Now I'm not doing this in a FIFO way (which I will in production), but you can either do that, or even a FILO, as per Microsoft:
When you create a BlockingCollection(Of T) object, you can specify not
only the bounded capacity but also the type of collection to use. For
example, you could specify a ConcurrentQueue(Of T) object for first
in, first out (FIFO) behavior, or a ConcurrentStack(Of T) object for
last in, first out (LIFO) behavior.
Now THAT is useful! Here in this demo, I did it all willy-nilly... But like I said, for my specific need, I need FIFO, as in the diagram at the top...
Later, you will see the functions and subroutines, but the real magic here is in 2 collections-one for Producers and one for Consumers:
Dim ProducerCTSs As New ConcurrentBag(Of CancellationTokenSource)
Dim ConsumerCTSs As New ConcurrentBag(Of CancellationTokenSource)
The Magic: As each Task(thread) is either created or closed, the corresponding CancellationTokenSource is either added or removed from the appropriate collection above.
Seriously, that is it! :)
Next in the code, the initial producers and consumers are created:
'===============================
'
' start demo
'
Sub Main()
'
'===============================
'
' initial state:
'
' start our producer(s)
'
For ps As Integer = producerStartID To producerStartID + producersNumToStart - 1
CreateTask(ps, "Producer")
Next
'
' start our consumer(s)
'
For cs As Integer = consumerStartID To consumerStartID + consumersNumToStart - 1
CreateTask(cs, "Consumer")
Next
'
'=========================================
Aside from a few Thread.Sleep() calls, all the next part does is add or remove producer and consumer Tasks(threads). You can vary the initial values at the top to put it through the paces.
To create a Task... - CreateTask(, <"Producer" or "Consumer">)
To remove a Task, you (in one line) both get a random CTS, and then .Cancel() it:
GetRandomCTS(ProducerCTSs).Cancel()
GetRandomCTS(ConsumerCTSs).Cancel()
GetRandomCTS() takes the collection of CTS instances, picks one at random, then calls Cancel() on it.
'
Thread.Sleep(2000)
'
' create a producer
'
Console.WriteLine("creating producer 555...")
CreateTask(555, "Producer")
Thread.Sleep(1000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
Thread.Sleep(2000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
Thread.Sleep(1000)
'
' create a consumer
'
Console.WriteLine("creating consumer 222...")
CreateTask(222, "consumer")
Thread.Sleep(1000)
'
' cancel a producer
'
Console.WriteLine("cancelling random producer...")
GetRandomCTS(ProducerCTSs).Cancel()
Thread.Sleep(1000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
'
'==========================================
'
Console.ReadLine()
End Sub
And that is it!
Now for the fun parts:
#Region "Utilites"
''' <summary>
''' Retrieves a random cancellation token source from the given list of current threads...
''' Works for either producer or consumer
''' </summary>
''' <param name="ctsBag">ConcurrentBag(Of CancellationTokenSource)</param>
''' <returns>CancellationTokenSource</returns>
''' <remarks></remarks>
Function GetRandomCTS(ctsBag As ConcurrentBag(Of CancellationTokenSource)) As CancellationTokenSource
Dim cts As CancellationTokenSource = Nothing
Dim rndNum As Random = Nothing
Dim rndIndex As Integer = Nothing
Try
If ctsBag.Count = 1 Then
Console.WriteLine("There are no threads to cancel!")
Else
rndNum = New Random(12345)
rndIndex = rndNum.Next(0, ctsBag.Count - 1) ' because ElementAt() is zero-based index
cts = ctsBag.ElementAt(rndIndex)
End If
Catch ex As Exception
Console.WriteLine("GetRandomCTS() Exception: " & ex.StackTrace)
End Try
Return cts
End Function
Line 7: This is what we'll be returning, a CancellationTokenSource
Line 16: ctsBag.ElementAt() allows us to pull out a specific CTS instance by number.
Below, the CreateTask takes an argument for the # you want it to display when running (just for demo, to see which thread is doing what), and a string to tell it whether you want a new Producer of Consumer. Sure, I could have made it more complex, but this is just a rough draft. :)
Private Function CreateTask(taskId As Integer, taskType As String) As CancellationTokenSource
Dim t As Task = Nothing
Dim cts As New CancellationTokenSource()
Dim token As CancellationToken = cts.Token
Try
If taskType.ToLower = "producer" Then
t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning)
ProducerCTSs.Add(cts)
ElseIf taskType.ToLower = "consumer" Then
t = Task.Factory.StartNew(Sub() Consumer(taskId, token), token, TaskCreationOptions.LongRunning)
ConsumerCTSs.Add(cts)
Else
End If
Console.WriteLine("{0} Task {1} ({2}) running!", taskType, taskId.ToString("000"), t.Id)
Catch ex As Exception
Console.WriteLine("Task {0} CreateTask({1}) Exception: ", taskId.ToString("000"), taskType & ex.StackTrace)
End Try
Return cts
End Function
#End Region
Line 7 & 10: These call the Producer() or Consumer() classes below, passing them the CancellationTokenSource needed to let them be able to elegantly be cancelled while running without corrupting any data.
t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning)
Did you notice TaskCreationOptions.LongRunning? That is good in my case, and it improves performance by telling the program to not worry so much about wating what happens for cancellations too closely.
So what does a Producer() class look like?
#Region "Producer(s)"
Public Sub Producer(ByVal taskNum As Integer, ByVal ct As CancellationToken)
' Was cancellation already requested?
If ct.IsCancellationRequested = True Then
Console.WriteLine("Producer Task {0} was cancelled before Producer thread created!", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
'
'Dim r As Random = New Random(123)
Dim sw As New Stopwatch
Dim numAdded As Integer = 0
sw.Start()
While moreItemsToAdd = True
' Dim itemIn As Integer = r.Next(1, 1000)
itemId += 1 ' the payload
Try
bc.Add(itemId)
Console.WriteLine("--> " & taskNum.ToString("000") & " --> [+1 => Q has: " & bc.Count & "] added: " & itemId)
numAdded += 1
If ct.IsCancellationRequested Then
Console.WriteLine("Producer Task {0} cancelled", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
Thread.Sleep(sleepProducer)
Catch ex As OperationCanceledException
Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!")
Exit While
Catch ex As Exception
Console.WriteLine("Producer() Exception: " & ex.StackTrace)
End Try
If bc.Count >= itemsToProduce Then
moreItemsToAdd = False
End If
End While
sw.Stop()
' Let consumer know we are done.
Console.WriteLine("Producer stopped adding items! Added " & numAdded & " items in " & sw.Elapsed.TotalSeconds & " seconds!")
bc.CompleteAdding()
End Sub
#End Region
I know, I know... it looks complicated! But really it isn't. I'm not that smart! 1/2 the code is just to catch & handle the cancellation requests so that the processing doesn't corrupt any data. That, and a cheesy StopWatch() to time things... And yes, there are artifacts of earlier versions still commented out. Like I said "ROUGH"...
Line 17: Simply adds the itemId (our payload, could be anything) to the BlockingCollection (bc).
Line 20: If a cancellation, we take care of it here, and not a random part of the function, which would likely corrupt all kinds of things...
Line 31: I added this as a cheesy way to tell the Producers when to stop ...producing. This variable (limit) is set at the top of the code.
Line 38: bc.CompleteAdding() - This is a signal to everyone using the bc (BlockingCollection) that there will be no more items added. This way, the consumers know when to stop ...consuming!
"Why would they want to do that?"
Well, suppose you wanted a short-running task, or tasks, and needed to know they were done in order to continue... Yes, in my case, they are long-running, and in production I'll be starting each task with "TaskCreationOptions.LongRunning"
The Consumer() class is almost identical, with just a few tiny differences:
#Region "Consumer(s)"
Public Sub Consumer(ByVal taskNum As Integer, ByVal ct As CancellationToken)
If ct.IsCancellationRequested = True Then ' Was cancellation already requested?
Console.WriteLine("Consumer Task {0} was cancelled before Consumer thread created!", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
Dim totalTaken As Integer = 0
Dim sw As New Stopwatch
sw.Start()
While bc.IsCompleted = False
Dim itemOut As Integer = Nothing ' the payload
Try
itemOut = bc.Take()
Console.WriteLine("<-- " & taskNum.ToString("000") & " <-- [-1 => Q has: " & bc.Count & "] took: " & itemOut)
If ct.IsCancellationRequested Then
Console.WriteLine("Consumer Task {0} cancelled", taskNum.ToString("000"))
ct.ThrowIfCancellationRequested()
End If
totalTaken += 1
Catch ex As OperationCanceledException
Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!")
Exit While
Catch e As InvalidOperationException
' IOE means that Take() was called on a completed collection.
' In this example, we can simply catch the exception since the
' loop will break on the next iteration.
End Try
If (Not itemOut = Nothing) Then
Thread.Sleep(sleepConsumer)
End If
End While
sw.Stop()
If bc.IsCompleted = True Then
Console.WriteLine(vbCrLf & "Task " & taskNum.ToString("000") & " - No more items to take. Took " & totalTaken & " items in " & sw.Elapsed.TotalSeconds & " seconds!")
End If
End Sub
#End Region
End Module
Line 3: In both classes, we make sure to check right at the top to see if we've been cancelled. This way, we waste no time or resources if another Task/thread did the last piece of work just as we were being instantiated.
Line 13: itemOut = bc.Take() - Here we grab the next item (depends on FIFO or FILO/LIFO, as configured in the above discussion. This BlockingCollection does it all!
When you sit back and look at it, all the other code in this class is just to dress up Line 13!
So let's fire this puppy up!
Producer Task 001 (1) running!
--> 001 --> [+1 => Q has: 1] added: 1
<-- 100 <-- [-1 => Q has: 0] took: 1
Consumer Task 100 (2) running!
Consumer Task 101 (3) running!
Consumer Task 102 (4) running!
--> 001 --> [+1 => Q has: 1] added: 2
--> 001 --> [+1 => Q has: 2] added: 3
--> 001 --> [+1 => Q has: 3] added: 4
--> 001 --> [+1 => Q has: 4] added: 5
--> 001 --> [+1 => Q has: 5] added: 6
--> 001 --> [+1 => Q has: 6] added: 7
--> 001 --> [+1 => Q has: 7] added: 8
--> 001 --> [+1 => Q has: 8] added: 9
--> 001 --> [+1 => Q has: 9] added: 10
--> 001 --> [+1 => Q has: 10] added: 11
Producer stopped adding items! Added 11 items in 0.1631605 seconds!
<-- 101 <-- [-1 => Q has: 9] took: 2
<-- 100 <-- [-1 => Q has: 8] took: 3
<-- 101 <-- [-1 => Q has: 7] took: 4
<-- 102 <-- [-1 => Q has: 6] took: 5
creating producer 555...
Producer Task 555 (5) running!
<-- 100 <-- [-1 => Q has: 5] took: 6
Producer stopped adding items! Added 0 items in 1.09E-05 seconds!
<-- 101 <-- [-1 => Q has: 4] took: 7
<-- 102 <-- [-1 => Q has: 3] took: 8
cancelling random consumer...
<-- 100 <-- [-1 => Q has: 2] took: 9
<-- 101 <-- [-1 => Q has: 1] took: 10
<-- 102 <-- [-1 => Q has: 0] took: 11
Consumer Task 102 cancelled
Task 102 cancelling by request!
Task 102 - No more items to take. Took 2 items in 2.0128301 seconds!
Task 100 - No more items to take. Took 4 items in 4.0183264 seconds!
Task 101 - No more items to take. Took 4 items in 4.0007338 seconds!
cancelling random consumer...
creating consumer 222...
Task 222 - No more items to take. Took 0 items in 2.8E-06 seconds!
consumer Task 222 (6) running!
cancelling random producer...
cancelling random consumer...
Was this the output you expected?
Grab the entire solution 7z'd up for you, at the link below...
Download Solution from HERE!
It took me a while to figure out the whole CancellationToken concept, but now that I'm using it, and the bullet-proof-ness of the BlockingCollection, I'm confident my application can handle hundreds of objects per second without messing anything up.
My production application will read the amount of cores on the host machine, and use that to set the initial number of consumers. I will then adjust up & down, monitoring the time to complete (in an aggregate fashion), thus making the best of the host machine's resources, understanding that the host machine may be doing many other things at the same time as my application.
Thanks everyone!