0

Experts,

This is what happens when you can't find a solution anywhere on the Interwebs, and you just have to hack at it until you get it looking good [enough].

I have a situation where I need to parse objects coming in at high speed, but the parsing takes a relatively long time-far slower than they come in... I can tell how many cores are on a box pretty easily, and then I can carve up a team of worker threads (Tasks) to all divide & conquer! But the problem with ANY multi-threaded application is "How Many Threads?"

There is no hard & fast answer, so don't bother looking. My approach is to make a flexible approach that my main thread can monitor to see if throughput (aggregate work completed in X amount of time) is at maximum for the machine it happens to be running on. Also, that same machine may vary in load, RAM available, etc. over time, so you can't just set it & forget it...

I'm just asking to answer my own question, which SO encourages.

  • What was the point of asking the question if you already had a solution? – Jim Mischel Mar 26 '14 at 03:07
  • 2
    @JimMischel [To share it with others.](http://blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/) – svick Mar 26 '14 at 10:42
  • Thanks! I rely a lot on SO for insight, or flat out answers, to programming questions... probably more than other sites I visit under various aliases... So As I solve a problem-even the trivial ones-I am trying to find time to post, and then save others' time in the process. Sorry @JimMischel if these seemed a daft thing to do! :) But even the smallest problem, the ones that seem silly and trivial to us now, were once 'big' problems! :) I know I said in the question that SO encourages this. I read that here: http://blog.stackoverflow.com/2011/07/its-ok-to-ask-and-answer-your-own-questions/ – Compassionate Narcissist Mar 26 '14 at 14:54

2 Answers2

1

If the parsers are expected to be busy all the time (or nearly so), it makes no sense to have more parser threads than CPU threads that can work on them. Having 10 parser threads when you have only 4 cores is pointless because there will be overhead in thread context switches. So allocate 3 or 4 worker threads (consumers) that service the queue.

See https://stackoverflow.com/a/2670568/56778 to determine the number of logical processors that are on your system. It makes no sense to have more worker threads than that. Whether it makes sense to use your complicated scheme of dynamically allocating workers is ... a matter of opinion. I'd be more inclined to have a timer that examines the queue state (number of items) once a minute, and have it allocate or deallocate worker threads appropriately, adding or deleting one worker per minute in order to avoid overshooting the mark. That would probably make very good use of avaliable resources.

Removing a worker would be very easy. If you create an AutoResetEvent that each thread checks each time through its loop, then the first thread that sees it can exit. For example:

private AutoResetEvent _killAThread = new AutoResetEvent(false);

// in each thread
while (!_killAThread.Wait(0) && !cancelToken.IsCancellationRequested)
{
    // thread waits for an item from the queue and processes it.
}

Granted, this could cause you to have too many threads waiting if the queue is empty for a long period, but I get the sense that you don't expect that condition to occur very often (if at all).

And, of course, adding a new consumer is easy enough. Your timer tick handler compares the existing queue size against some threshold and spins up a new thread if required. Or, if there are too many threads it calls _killATheread.Set(), and the next thread to finish its processing will check the event, see that it's set, and exit.

Community
  • 1
  • 1
Jim Mischel
  • 131,090
  • 20
  • 188
  • 351
  • You're right about # of cores: I mention this in the question (2nd paragraph), and last para of the answer. The AutoResetEvent logic eludes me as the BlockingCollection family is 'blocking' by nature. My threads already wait for a new item, but only the first Task to grab it gets (or even sees) it. Are you suggesting above for a scenario my initial answer doesn't take into consideration? Also, I added multiple producers to make it a lab/bench, not for my specific need. It is a complete test program to see how multiple threads affect performance given x cores. Thanks for writing! :) – Compassionate Narcissist Mar 26 '14 at 15:14
0

[Note: Line numbers refer to those inserted with Crayon for Wordpress at original blog post here. At that link, you can also download a 7z of the entire VS 2012 solution.]

So this is the idea:

enter image description here

So let's see how I solved the issue of the blocking collection and then dishing our work to the Parser Tasks... I used a very generic, and easily re-usable approach of Producer and Consumer. In fact, in my example below, you can even adjust the amount of Producers as well. You can adjust my code below to more closely approximate your work, and then adjust the # of threads/tasks/Consumers to see how well parallelism can work for you.

First, the usual suspects in Imports..

Imports System.Threading
Imports System.Threading.Tasks
Imports System.Collections.Concurrent

Module modStartHere
    '
    ' Producer
    '
    Dim itemsToProduce = 10
    Dim sleepProducer = 10 ' in milliseconds
    Dim producerStartID = 1
    Dim producersNumToStart = 1
    Dim ProducerCTSs As New ConcurrentBag(Of CancellationTokenSource)
    Dim moreItemsToAdd As Boolean = True
    '
    ' Consumer
    '
    Dim sleepConsumer = 1000 ' in milliseconds
    Dim consumerStartID = 100
    Dim consumersNumToStart = 3
    Dim ConsumerCTSs As New ConcurrentBag(Of CancellationTokenSource)

The Producer(s) are initially set up with the above. While the itemsToProduce doesn't change during the program, the number of Producers & Consumers will. This is very rough as a draft, and it will no doubt be streamlined in your own code at some point, but this demonstrates how to solve this problem very well.

I used "IDs" to show in the output which thread was doing what. They only thing needed in production is the list of CTS instances:

'
' the multi-thread-safe queue that is produced to and consumed from
'
Dim bc As New BlockingCollection(Of Integer)
'
' this # will be what is actually produced & consumed (1, 2, 3, ...)
'
Dim itemId As Integer = 0
'

The main machine here is that one little line:

Dim bc As New BlockingCollection(Of Integer)

Microsoft says:

BlockingCollection Overview .NET Framework 4.5

BlockingCollection(Of T) is a thread-safe collection class that provides the following features:

An implementation of the Producer-Consumer pattern.
Concurrent adding and taking of items from multiple threads.
Optional maximum capacity.
Insertion and removal operations that block when collection is empty or full.
Insertion and removal "try" operations that do not block or that block up to a specified period of time.
Encapsulates any collection type that implements IProducerConsumerCollection(Of T)
Cancellation with cancellation tokens.
Two kinds of enumeration with foreach (For Each in Visual Basic):

-Read-only enumeration.
-Enumeration that removes items as they are enumerated.

itemId is just a variable that holds the fake payload. Producers will increment it by one to simulate a different object instance, or unit of work. You just change the type the BlockingCollection holds...

Now I'm not doing this in a FIFO way (which I will in production), but you can either do that, or even a FILO, as per Microsoft:

When you create a BlockingCollection(Of T) object, you can specify not only the bounded capacity but also the type of collection to use. For example, you could specify a ConcurrentQueue(Of T) object for first in, first out (FIFO) behavior, or a ConcurrentStack(Of T) object for last in, first out (LIFO) behavior.

Now THAT is useful! Here in this demo, I did it all willy-nilly... But like I said, for my specific need, I need FIFO, as in the diagram at the top...

Later, you will see the functions and subroutines, but the real magic here is in 2 collections-one for Producers and one for Consumers:

Dim ProducerCTSs As New ConcurrentBag(Of CancellationTokenSource) Dim ConsumerCTSs As New ConcurrentBag(Of CancellationTokenSource)

The Magic: As each Task(thread) is either created or closed, the corresponding CancellationTokenSource is either added or removed from the appropriate collection above.

Seriously, that is it! :)

Next in the code, the initial producers and consumers are created:

'===============================
'
' start demo
'
Sub Main()
    '
    '===============================
    '
    ' initial state:
    '
    ' start our producer(s)
    '
    For ps As Integer = producerStartID To producerStartID + producersNumToStart - 1
        CreateTask(ps, "Producer")
    Next
    '
    ' start our consumer(s)
    '
    For cs As Integer = consumerStartID To consumerStartID + consumersNumToStart - 1
        CreateTask(cs, "Consumer")
    Next
    '
    '=========================================

Aside from a few Thread.Sleep() calls, all the next part does is add or remove producer and consumer Tasks(threads). You can vary the initial values at the top to put it through the paces.

To create a Task... - CreateTask(, <"Producer" or "Consumer">)

To remove a Task, you (in one line) both get a random CTS, and then .Cancel() it:

GetRandomCTS(ProducerCTSs).Cancel() GetRandomCTS(ConsumerCTSs).Cancel()

GetRandomCTS() takes the collection of CTS instances, picks one at random, then calls Cancel() on it.

'
Thread.Sleep(2000)
'
' create a producer
'
Console.WriteLine("creating producer 555...")
CreateTask(555, "Producer")
Thread.Sleep(1000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
Thread.Sleep(2000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
Thread.Sleep(1000)
'
' create a consumer
'
Console.WriteLine("creating consumer 222...")
CreateTask(222, "consumer")
Thread.Sleep(1000)
'
' cancel a producer
'
Console.WriteLine("cancelling random producer...")
GetRandomCTS(ProducerCTSs).Cancel()
Thread.Sleep(1000)
'
' cancel a consumer
'
Console.WriteLine("cancelling random consumer...")
GetRandomCTS(ConsumerCTSs).Cancel()
'
'==========================================
'
Console.ReadLine()

End Sub

And that is it!

Now for the fun parts:

#Region "Utilites"
    ''' <summary>
    ''' Retrieves a random cancellation token source from the given list of current threads...
    ''' Works for either producer or consumer
    ''' </summary>
    ''' <param name="ctsBag">ConcurrentBag(Of CancellationTokenSource)</param>
    ''' <returns>CancellationTokenSource</returns>
    ''' <remarks></remarks>
    Function GetRandomCTS(ctsBag As ConcurrentBag(Of CancellationTokenSource)) As CancellationTokenSource
        Dim cts As CancellationTokenSource = Nothing
        Dim rndNum As Random = Nothing
        Dim rndIndex As Integer = Nothing
        Try
            If ctsBag.Count = 1 Then
                Console.WriteLine("There are no threads to cancel!")
            Else
                rndNum = New Random(12345)
                rndIndex = rndNum.Next(0, ctsBag.Count - 1) ' because ElementAt() is zero-based index
                cts = ctsBag.ElementAt(rndIndex)
            End If
        Catch ex As Exception
            Console.WriteLine("GetRandomCTS() Exception: " & ex.StackTrace)
        End Try
        Return cts
    End Function

Line 7: This is what we'll be returning, a CancellationTokenSource

Line 16: ctsBag.ElementAt() allows us to pull out a specific CTS instance by number.

Below, the CreateTask takes an argument for the # you want it to display when running (just for demo, to see which thread is doing what), and a string to tell it whether you want a new Producer of Consumer. Sure, I could have made it more complex, but this is just a rough draft. :)

    Private Function CreateTask(taskId As Integer, taskType As String) As CancellationTokenSource
        Dim t As Task = Nothing
        Dim cts As New CancellationTokenSource()
        Dim token As CancellationToken = cts.Token
        Try
            If taskType.ToLower = "producer" Then
                t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning)
                ProducerCTSs.Add(cts)
            ElseIf taskType.ToLower = "consumer" Then
                t = Task.Factory.StartNew(Sub() Consumer(taskId, token), token, TaskCreationOptions.LongRunning)
                ConsumerCTSs.Add(cts)
            Else
            End If
            Console.WriteLine("{0} Task {1} ({2}) running!", taskType, taskId.ToString("000"), t.Id)
        Catch ex As Exception
            Console.WriteLine("Task {0} CreateTask({1}) Exception: ", taskId.ToString("000"), taskType & ex.StackTrace)
        End Try
        Return cts
    End Function
#End Region

Line 7 & 10: These call the Producer() or Consumer() classes below, passing them the CancellationTokenSource needed to let them be able to elegantly be cancelled while running without corrupting any data.

t = Task.Factory.StartNew(Sub() Producer(taskId, token), token, TaskCreationOptions.LongRunning)

Did you notice TaskCreationOptions.LongRunning? That is good in my case, and it improves performance by telling the program to not worry so much about wating what happens for cancellations too closely.

So what does a Producer() class look like?

#Region "Producer(s)"
    Public Sub Producer(ByVal taskNum As Integer, ByVal ct As CancellationToken)
        ' Was cancellation already requested?  
        If ct.IsCancellationRequested = True Then
            Console.WriteLine("Producer Task {0} was cancelled before Producer thread created!", taskNum.ToString("000"))
            ct.ThrowIfCancellationRequested()
        End If
        '
        'Dim r As Random = New Random(123)
        Dim sw As New Stopwatch
        Dim numAdded As Integer = 0
        sw.Start()
        While moreItemsToAdd = True
            ' Dim itemIn As Integer = r.Next(1, 1000)
            itemId += 1 ' the payload
            Try
                bc.Add(itemId)
                Console.WriteLine("--> " & taskNum.ToString("000") & " --> [+1 => Q has: " & bc.Count & "] added: " & itemId)
                numAdded += 1
                If ct.IsCancellationRequested Then
                    Console.WriteLine("Producer Task {0} cancelled", taskNum.ToString("000"))
                    ct.ThrowIfCancellationRequested()
                End If
                Thread.Sleep(sleepProducer)
            Catch ex As OperationCanceledException
                Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!")
                Exit While
            Catch ex As Exception
                Console.WriteLine("Producer() Exception: " & ex.StackTrace)
            End Try
            If bc.Count >= itemsToProduce Then
                moreItemsToAdd = False
            End If
        End While
        sw.Stop()
        ' Let consumer know we are done.
        Console.WriteLine("Producer stopped adding items! Added " & numAdded & " items in " & sw.Elapsed.TotalSeconds & " seconds!")
        bc.CompleteAdding()
    End Sub
#End Region

I know, I know... it looks complicated! But really it isn't. I'm not that smart! 1/2 the code is just to catch & handle the cancellation requests so that the processing doesn't corrupt any data. That, and a cheesy StopWatch() to time things... And yes, there are artifacts of earlier versions still commented out. Like I said "ROUGH"...

Line 17: Simply adds the itemId (our payload, could be anything) to the BlockingCollection (bc).

Line 20: If a cancellation, we take care of it here, and not a random part of the function, which would likely corrupt all kinds of things...

Line 31: I added this as a cheesy way to tell the Producers when to stop ...producing. This variable (limit) is set at the top of the code.

Line 38: bc.CompleteAdding() - This is a signal to everyone using the bc (BlockingCollection) that there will be no more items added. This way, the consumers know when to stop ...consuming!

"Why would they want to do that?"

Well, suppose you wanted a short-running task, or tasks, and needed to know they were done in order to continue... Yes, in my case, they are long-running, and in production I'll be starting each task with "TaskCreationOptions.LongRunning"

The Consumer() class is almost identical, with just a few tiny differences:

#Region "Consumer(s)"
    Public Sub Consumer(ByVal taskNum As Integer, ByVal ct As CancellationToken)
        If ct.IsCancellationRequested = True Then ' Was cancellation already requested?
            Console.WriteLine("Consumer Task {0} was cancelled before Consumer thread created!", taskNum.ToString("000"))
            ct.ThrowIfCancellationRequested()
        End If
        Dim totalTaken As Integer = 0
        Dim sw As New Stopwatch
        sw.Start()
        While bc.IsCompleted = False
            Dim itemOut As Integer = Nothing ' the payload
            Try
                itemOut = bc.Take()
                Console.WriteLine("<-- " & taskNum.ToString("000") & " <-- [-1 => Q has: " & bc.Count & "] took: " & itemOut)
                If ct.IsCancellationRequested Then
                    Console.WriteLine("Consumer Task {0} cancelled", taskNum.ToString("000"))
                    ct.ThrowIfCancellationRequested()
                End If
                totalTaken += 1
            Catch ex As OperationCanceledException
                Console.WriteLine("Task " & taskNum.ToString("000") & " cancelling by request!")
                Exit While
            Catch e As InvalidOperationException
                ' IOE means that Take() was called on a completed collection.
                ' In this example, we can simply catch the exception since the  
                ' loop will break on the next iteration.
            End Try
            If (Not itemOut = Nothing) Then
                Thread.Sleep(sleepConsumer)
            End If
        End While
        sw.Stop()
        If bc.IsCompleted = True Then
            Console.WriteLine(vbCrLf & "Task " & taskNum.ToString("000") & " - No more items to take. Took " & totalTaken & " items in " & sw.Elapsed.TotalSeconds & " seconds!")
        End If
    End Sub
#End Region
End Module

Line 3: In both classes, we make sure to check right at the top to see if we've been cancelled. This way, we waste no time or resources if another Task/thread did the last piece of work just as we were being instantiated.

Line 13: itemOut = bc.Take() - Here we grab the next item (depends on FIFO or FILO/LIFO, as configured in the above discussion. This BlockingCollection does it all!

When you sit back and look at it, all the other code in this class is just to dress up Line 13!

So let's fire this puppy up!

Producer Task 001 (1) running!
--> 001 --> [+1 => Q has: 1] added: 1
<-- 100 <-- [-1 => Q has: 0] took: 1
Consumer Task 100 (2) running!
Consumer Task 101 (3) running!
Consumer Task 102 (4) running!
--> 001 --> [+1 => Q has: 1] added: 2
--> 001 --> [+1 => Q has: 2] added: 3
--> 001 --> [+1 => Q has: 3] added: 4
--> 001 --> [+1 => Q has: 4] added: 5
--> 001 --> [+1 => Q has: 5] added: 6
--> 001 --> [+1 => Q has: 6] added: 7
--> 001 --> [+1 => Q has: 7] added: 8
--> 001 --> [+1 => Q has: 8] added: 9
--> 001 --> [+1 => Q has: 9] added: 10
--> 001 --> [+1 => Q has: 10] added: 11
Producer stopped adding items! Added 11 items in 0.1631605 seconds!
<-- 101 <-- [-1 => Q has: 9] took: 2
<-- 100 <-- [-1 => Q has: 8] took: 3
<-- 101 <-- [-1 => Q has: 7] took: 4
<-- 102 <-- [-1 => Q has: 6] took: 5
creating producer 555...
Producer Task 555 (5) running!
<-- 100 <-- [-1 => Q has: 5] took: 6
Producer stopped adding items! Added 0 items in 1.09E-05 seconds!
<-- 101 <-- [-1 => Q has: 4] took: 7
<-- 102 <-- [-1 => Q has: 3] took: 8
cancelling random consumer...
<-- 100 <-- [-1 => Q has: 2] took: 9
<-- 101 <-- [-1 => Q has: 1] took: 10
<-- 102 <-- [-1 => Q has: 0] took: 11
Consumer Task 102 cancelled
Task 102 cancelling by request!

Task 102 - No more items to take. Took 2 items in 2.0128301 seconds!

Task 100 - No more items to take. Took 4 items in 4.0183264 seconds!

Task 101 - No more items to take. Took 4 items in 4.0007338 seconds!
cancelling random consumer...
creating consumer 222...

Task 222 - No more items to take. Took 0 items in 2.8E-06 seconds!
consumer Task 222 (6) running!
cancelling random producer...
cancelling random consumer...

Was this the output you expected?

Grab the entire solution 7z'd up for you, at the link below...

Download Solution from HERE!

It took me a while to figure out the whole CancellationToken concept, but now that I'm using it, and the bullet-proof-ness of the BlockingCollection, I'm confident my application can handle hundreds of objects per second without messing anything up.

My production application will read the amount of cores on the host machine, and use that to set the initial number of consumers. I will then adjust up & down, monitoring the time to complete (in an aggregate fashion), thus making the best of the host machine's resources, understanding that the host machine may be doing many other things at the same time as my application.

Thanks everyone!