0

I'm writing a simple producer/consumer application, but I'm noticing a really strange behaviour..This is the code:

private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);

static void Main(string[] args)
{  
  // Start consumer thread
  Consume();
  
  // Produce
  var t = new Thread(() =>
  {
    while (true)
    {
      var packet = RtpPacket.GetNext();
      _queue.Post(packet);
      Thread.Sleep(70);
    }
  }
  t.Join();
}

static void Consume()
{
  _timelineThread = new Thread(async () =>
  {
    while (_stopping.WaitOne(0) == false)
    {
      // Start consuming...
      while (await _queue.OutputAvailableAsync())
      {
        var packet = await _queue.ReceiveAsync();
        // Some processing...
      }
    }
  });
  _timelineThread.Start();   
}

This is intended to be an infinite loop (until I route the _stopping signal). But, when _timelineThread hits the first await _queue.OutputAvailableAsync(), the thread changes state to 'Stopped'. There is something wrong that I'm not considering ?

If I change the Consume() function to this:

static void Consume()
{
  _timelineThread = new Thread(() =>
  {
    while (_stopping.WaitOne(0) == false)
    {
      // Start consuming...
      while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
      {
        var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
        // Some processing...
      }
    }
  });
  _timelineThread.Start();   
}

the thread runs without any problem..but the code is almost identical to the previous one..

EDIT: after one hour also this 'hack' doesn't seems to work..thread is 'Running' but I don't receive any data from the queue..

1 Answers1

0

The Thread constructor does not understand async delegates. You can read about this here:

My suggestion is to use a synchronous BlockingCollection<RtpPacket> instead of the BufferBlock<RtpPacket>, and consume it by enumerating the GetConsumingEnumerable method:

var _queue = new BlockingCollection<RtpPacket>();

var producer = new Thread(() =>
{
    while (true)
    {
        var packet = RtpPacket.GetNext();
        if (packet == null) { _queue.CompleteAdding(); break; }
        _queue.Add(packet);
        Thread.Sleep(70);
    }
});

var consumer = new Thread(() =>
{
    foreach (var packet in _queue.GetConsumingEnumerable())
    {
        // Some processing...
    }
});

producer.Start();
consumer.Start();

producer.Join();
consumer.Join();
Theodor Zoulias
  • 34,835
  • 7
  • 69
  • 104
  • Thanks, now my thread doesn't die but after one hour it hangs..so the problem is still opened. Actually the producer thread works and I'm seeing my `_queue` enlarging overtime. But the consumer thread doesn't process my packets. In `debug mode` I can see that the thread is still alive but `_queue.GetConsumingEnumerable()` never returns. – Mario Vaccaro Nov 15 '21 at 10:34
  • @MarioVaccaro well, it depends on what kind of processing you are doing with the packets, if you are cleaning up all resources associated with processing each packet etc. FYI the `BlockingCollection` supports also a bounded mode, where the number of items stored in the collection is limited. When the limit is reached, the producer will get blocked when calling `Add`, until empty space is again available. – Theodor Zoulias Nov 15 '21 at 10:46
  • 1
    Seems that you're right. Commenting out the processing method, consumer thread is still running after 3 hours. Processing method regards just the RTP header strip-off, MP3 payload convertion to PCM and finally write to file. Processing is inside a try..catch block so exception was well handled. If I can find solution I'll update the main thread. – Mario Vaccaro Nov 15 '21 at 14:42
  • @MarioVaccaro in case processing a packet takes lots of time, you could use more than one consumers. The `BlockingCollection` supports multiple consumers (as well as producers). Alternatively you could use an `ActioBlock` instead of the `BlockingCollection`, and configure its `MaxDegreeOfParallelism` option. – Theodor Zoulias Nov 15 '21 at 16:03
  • 1
    Thanks for your reply. Probably using multiple consumers will be the best approach for the processing, but these packets should converge (ordered) into one file, so I need atleast another queue for the writer. Meanwhile I've found the problem. Seems that `SharpMediaFoundationReader` (MP3 to PCM) has a memory leak somewhere... – Mario Vaccaro Nov 16 '21 at 17:26
  • @MarioVaccaro if you have more work to do with the packets after they are processed, like saving them into a file, you could use a parallelized `TransformBlock` as the first consumer, and link it to a non-parallel `ActionBlock` for the final processing. The `TransformBlock` by default preserves the order of the messages is receives, unless you explicitly tell it otherwise with the `EnsureOrdered = false` configuration. – Theodor Zoulias Nov 16 '21 at 17:40
  • 1
    Wow this seems very interesting, I'll delve into the matter. Thank you very much!! – Mario Vaccaro Nov 16 '21 at 18:41