0

I am restricted to .NET 3.0: this is an externally imposed requirement.

I need to process strings from a Generic.Queue. The main thread will Enqueue strings and the worker thread will Dequeue strings and then process them. At the top of the worker thread, it suspends itself and waits to be resumed when something enters the queue. Then it empties the queue (processes each item as they are dequeued) and then suspends itself when the queue is empty.

I created a small example program that succinctly illustrates the basic logic. I know the enqueue dequeue need to be protected but they are raw here as this is not the focus of my inquiry.

I looked at using Semaphores and Mutex objects but the way I understand the documentation, there does not seem to be a way to signal the event with out waiting on it. What I want is the resume to to unblock the worker thread (if it is blocked) so that it can loop through the queue. This signal happens only when the main thread adds and item to the queue. if the worker is already working then it will continue until the queue is empty. The need for the signal is only to start up the worker if was caught up and waiting for something to do.

using System;
using System.Collections.Generic;
using System.Threading;

namespace QueueProcessor
{
    class Program
    {
        static Queue<string> _Messages = new Queue<string>();
        static bool _Continue = true;
        static Thread _DequeueThread = null;

        static void Main( string[] args )
        {
            _DequeueThread = new Thread( DequeueThread );
            _DequeueThread.Start();
            for(;;)
            {
                Console.WriteLine( "Entersomething here:");
                string something = Console.ReadLine();
                if( something.Equals("quit") )
                {
                    _Continue = false;
                    ResumeThread();
                    break;
                }
                _Messages.Enqueue( something );
                ResumeThread();
            }
        }

        static void ResumeThread( )
        {
            try
            {
                _DequeueThread.Resume();  // .NET 3.0 is the limit
            }
            // If it is already resumed, the frame work throws an exception.
            // This seem unneccesary since if it is already resumed then what's the bother?
            catch( ThreadStateException ) 
            {
                Console.WriteLine( "Thread already running....." );
            }
        }

        static void SuspendThread()
        {
            _DequeueThread.Suspend(); // .NET 3.0 is the limit
        }

        static void DequeueThread()
        {
            Random randomTime = new Random();
            while( _Continue )
            {
                SuspendThread();
                while( _Messages.Count > 0)
                {
                    string message = _Messages.Dequeue();
                    Console.WriteLine( String.Format ( "Dequeue:{0}", message ) );
                    int timeout = randomTime.Next();
                    timeout %= 4000;
                    Thread.Sleep(timeout); // simulated taking a while to process message
                }
            }
        }
    }
}
Display name
  • 1,228
  • 1
  • 18
  • 29
  • Sorry, I also meant to note that the compiler tells me that Resume and Suspend are depreciated. I want to replace these calls but I'm not sure what actually works as a good replacement for them. – Display name May 23 '15 at 17:45
  • You are looking for Event (http://stackoverflow.com/questions/492020/blocking-and-waiting-for-an-event)... Not sure why you've not looked at source to see how ConcurrentQueue is implemented - http://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs,18bcbcbdddbcfdcb (or maybe you did - but forgot to add comments to post). – Alexei Levenkov May 23 '15 at 17:54
  • Thank you Alexei. AutoResetEvent is what I'm looking for. – Display name May 23 '15 at 18:14
  • "I am restricted to .NET 3.0: *this is an externally imposed requirement.*". I hope you are charging extra then as that restriction is clearly causing you more work. – David Arno May 23 '15 at 18:26
  • 1
    Please note that .NET version is completely unrelated to this. Suspending threads asynchronously from within the same process is completely broken for all applications, whether .NET or not. The related Win32 APIs are intended for use by debuggers, not general-purpose synchronization. – Ben Voigt May 23 '15 at 18:38

1 Answers1

0

This is the complete answer. Thank you Alexi for pointing me in the right direction.

using System;
using System.Collections.Generic;
using System.Threading;

namespace QueueProcessor
{

    class Program
    {
        static Queue<string> _Messages = new Queue<string>();
        static bool _Continue = true;
        static Thread _DequeueThread = null;
        static AutoResetEvent _Latch = null;

        static void Main( string[] args )
        {
            _DequeueThread = new Thread( DequeueThread );
            _Latch = new AutoResetEvent(false);
            _DequeueThread.Start();
            for(;;)
            {
                Console.WriteLine( "Entersomething here:");
                string something = Console.ReadLine();
                if( something.Equals("quit") )
                {
                    _Continue = false;
                    ResumeThread();
                    break;
                }
                lock( _Messages )
                {
                    _Messages.Enqueue( something );
                }
                ResumeThread();
            }
        }

        static void ResumeThread( )
        {
            _Latch.Set();
        }

        static void SuspendThread()
        {
            _Latch.WaitOne();
        }

        static void DequeueThread()
        {
            Random randomTime = new Random();
            while( _Continue )
            {
                SuspendThread();
                string message = string.Empty;
                for(;;)
                {
                    lock ( _Messages )
                    {
                        message = _Messages.Count == 0 ? string.Empty : _Messages.Dequeue();
                    }

                    if( String.IsNullOrEmpty( message ) ) break; // Loop exit condition

                    Console.WriteLine( String.Format ( "Dequeue:{0}", message ) );
                    int timeout = randomTime.Next();
                    timeout %= 4000;
                    Thread.Sleep(timeout); // simulated taking a while to process message
                }
            }
        }
    }
}
Display name
  • 1,228
  • 1
  • 18
  • 29