1

I am writing an application where I need to respond very quickly to UDP messages to trigger cameras while at the same time processing the results of previous images from those cameras. The image processing is highly parallel and I am using a Parallel.ForEach to run pattern comparisons. The issue I am seeing is that the Parallel.ForEach is slowing down the responsiveness of the trigger code and creating an unwanted delay. I've tried adding task priorities using a custom task scheduler to increase the camera trigger code to highest and this helps reduce the problem but does not eliminate it.

I'm curious if anyone has any tips or recommendations on how to ensure that a particular task will be processed in a timely manner when other tasks are very thread intensive. There's a good chance that I'm overparallelizing my code and the sheer quantity of threads is a likely culprit.

I'm working to fix this overparallelization, but I should be able to address these problems in isolation. I want to ensure that even if overparallelization did exist that it would not hurt these other critical tasks.

I looked into partitioning the Parallel.ForEach but my data size is only 200 and each operation is heavy, so I don't believe I will see any significant gains by grouping my data.

I'm looking at processor affinity, but really want something a bit cleaner.

http://blog.rebuildall.net/2010/03/08/Running_NET_threads_on_selected_processor_cores

VB.net Process Affinity

My UDP camera trigger code:

Public Class UDPCameraTrigger
    Implements System.ComponentModel.INotifyPropertyChanged

    Protected _Records As Concurrent.ConcurrentDictionary(Of Integer, clsVisionRecord)
    Protected _Cameras As IEnumerable(Of Acquisition.AsyncCamera)

    Protected HighPriorityTaskScheduler As TaskScheduler
    Protected HighPriorityTaskFactory As TaskFactory

    Protected groupEP As Net.IPEndPoint

    Protected Async Sub StartListener()
        Using listener As New Net.Sockets.UdpClient(groupEP)

            While True
                Try
                    Dim Result As Net.Sockets.UdpReceiveResult = Await HighPriorityTaskFactory.StartNew(AddressOf listener.ReceiveAsync).Unwrap
                    Dim packet As New TriggerEventPacket
                    packet.ParsePacketBytes(Result.Buffer)
                    Using Markers.EnterSpan("UDPTriggerEvent #" & packet.RecordID.ToString)

                        Dim VRecord As New clsVisionRecord(packet.RecordID)
                        Dim TotalDelay As Double = VRecord.StartTime.Subtract(packet.TimeStamp).TotalMilliseconds
                        If TotalDelay > 5 Then WriteToLog("VRecord created for " & packet.RecordID.ToString & " with delay of " & TotalDelay.ToString & "ms", EventLogEntryType.Information)
                        If _Records.TryAdd(packet.RecordID, VRecord) Then
                            For Each Camera As Acquisition.AsyncCamera In _Cameras
                                HighPriorityTaskFactory.StartNew(Sub() Camera.Trigger(VRecord))
                            Next
                        Else
                            WriteToLog("Duplicate received " & packet.RecordID, EventLogEntryType.Warning)
                        End If
                    End Using
                Catch ex As Exception
                    ex.WriteToLog()
                End Try
            End While
        End Using
    End Sub

    Public Sub New(Records As Concurrent.ConcurrentDictionary(Of Integer, clsVisionRecord), Cameras As IEnumerable(Of Acquisition.AsyncCamera), Port As Integer)
        If (Port < 1) Or (Port > 65535) Then Throw New ArgumentOutOfRangeException("Port", "Specified port of " & Port.ToString & " is invalid")

        groupEP = New Net.IPEndPoint(Net.IPAddress.Any, Port)
        HighPriorityTaskScheduler = New TaskSchedulerPrioritized(12, System.Threading.ApartmentState.MTA, System.Threading.ThreadPriority.Highest)
        HighPriorityTaskFactory = New TaskFactory(HighPriorityTaskScheduler)

        HighPriorityTaskFactory.StartNew(AddressOf StartListener)

        _Records = Records
        _Cameras = Cameras
    End Sub

    ' Create the OnPropertyChanged method to raise the event 
    Protected Sub OnPropertyChanged(ByVal name As String)
        RaiseEvent PropertyChanged(Me, New System.ComponentModel.PropertyChangedEventArgs(name))
    End Sub
    Public Event PropertyChanged(sender As Object, e As System.ComponentModel.PropertyChangedEventArgs) Implements System.ComponentModel.INotifyPropertyChanged.PropertyChanged
End Class

If I encounter a delay >50ms when triggering the camera, then it makes the image too far off-center to use. (capturing items moving on a conveyor at a decent clip so timing is critical). I have the option to do a hardware trigger, but I want to avoid this because I lose the ability to have robust ticketing between the various systems.

The camera returns the image and updates the record. The record is then moved into a pipeline that is based on TPL Dataflow blocks. Within these dataflow blocks there are multiple Parallel.ForEach methods used to perform a series of analysis tasks on the image. The code is too extensive to post here in full but I am happy to discuss what I have or post segments as necessary.

Community
  • 1
  • 1
VoteCoffee
  • 4,692
  • 1
  • 41
  • 44
  • It's better to move the high-load processing to a low priority scheduler because that implicitly prioritizes the entire rest of the process. You can't accidentally miss something (like a thread-pool thread). – usr Sep 04 '14 at 15:57
  • By default the TPL library uses a below normal priority I believe. – VoteCoffee Sep 04 '14 at 15:58
  • 1
    Yes, it uses the thread-pool. Make Parallel.* use low-priority threads. – usr Sep 04 '14 at 15:58
  • I see. I'll give it a whirl. – VoteCoffee Sep 04 '14 at 15:59
  • How do you perform the UDP IO? Asynchronously? In that case you have the thread-pool in your way. – usr Sep 04 '14 at 16:00
  • I'll edit my question to show that code. I am using highest priority along with await by using taskfactory and unwrap. – VoteCoffee Sep 04 '14 at 16:31
  • 1
    All async IO completion notifications are initially executed on the thread-pool. await helps dispatch that flow of control elsewhere *after running on the thread-pool*. That's why you need to run the high-load work on low-prio threads. – usr Sep 05 '14 at 00:49

0 Answers0