This is my take on how to write a consumer thread. The first piece of the jigsaw is a blocking queue. Mine looks like this:
unit BlockingQueue;
interface
uses
Windows, SyncObjs, Generics.Collections;
type
TBlockingQueue<T> = class
//see Duffy, Concurrent Programming on Windows, pp248
private
FCapacity: Integer;
FQueue: TQueue<T>;
FLock: TCriticalSection;
FNotEmpty: TEvent;
function DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean;
public
constructor Create(Capacity: Integer=-1);//default to unbounded
destructor Destroy; override;
function Enqueue(const Value: T): Boolean;
procedure ForceEnqueue(const Value: T);
function Dequeue: T;
end;
implementation
{ TBlockingQueue<T> }
constructor TBlockingQueue<T>.Create(Capacity: Integer);
begin
inherited Create;
FCapacity := Capacity;
FQueue := TQueue<T>.Create;
FLock := TCriticalSection.Create;
FNotEmpty := TEvent.Create(nil, True, False, '');
end;
destructor TBlockingQueue<T>.Destroy;
begin
FNotEmpty.Free;
FLock.Free;
FQueue.Free;
inherited;
end;
function TBlockingQueue<T>.DoEnqueue(const Value: T; IgnoreCapacity: Boolean): Boolean;
var
WasEmpty: Boolean;
begin
FLock.Acquire;
Try
Result := IgnoreCapacity or (FCapacity=-1) or (FQueue.Count<FCapacity);
if Result then begin
WasEmpty := FQueue.Count=0;
FQueue.Enqueue(Value);
if WasEmpty then begin
FNotEmpty.SetEvent;
end;
end;
Finally
FLock.Release;
End;
end;
function TBlockingQueue<T>.Enqueue(const Value: T): Boolean;
begin
Result := DoEnqueue(Value, False);
end;
procedure TBlockingQueue<T>.ForceEnqueue(const Value: T);
begin
DoEnqueue(Value, True);
end;
function TBlockingQueue<T>.Dequeue: T;
begin
FLock.Acquire;
Try
while FQueue.Count=0 do begin
FLock.Release;
Try
FNotEmpty.WaitFor;
Finally
FLock.Acquire;
End;
end;
Result := FQueue.Dequeue;
if FQueue.Count=0 then begin
FNotEmpty.ResetEvent;
end;
Finally
FLock.Release;
End;
end;
end.
It is completely threadsafe. Any thread can enqueue. Any thread can dequeue. The dequeue function will block if the queue is empty. The queue can be operated in either bounded or unbounded modes.
Next up we need a thread that works with such a queue. The thread simply pulls jobs off the queue until it is told to terminate. My consumer thread looks like this:
unit ConsumerThread;
interface
uses
SysUtils, Classes, BlockingQueue;
type
TConsumerThread = class(TThread)
private
FQueue: TBlockingQueue<TProc>;
FQueueFinished: Boolean;
procedure SetQueueFinished;
protected
procedure TerminatedSet; override;
procedure Execute; override;
public
constructor Create(Queue: TBlockingQueue<TProc>);
end;
implementation
{ TConsumerThread }
constructor TConsumerThread.Create(Queue: TBlockingQueue<TProc>);
begin
inherited Create(False);
FQueue := Queue;
end;
procedure TConsumerThread.SetQueueFinished;
begin
FQueueFinished := True;
end;
procedure TConsumerThread.TerminatedSet;
begin
inherited;
//ensure that, if the queue is empty, we wake up the thread so that it can quit
FQueue.ForceEnqueue(SetQueueFinished);
end;
procedure TConsumerThread.Execute;
var
Proc: TProc;
begin
while not FQueueFinished do begin
Proc := FQueue.Dequeue();
Proc();
Proc := nil;//clear Proc immediately, rather than waiting for Dequeue to return since it blocks
end;
end;
end.
This has the very property that you are looking for. Namely that when the thread is destroyed, it will process all pending tasks before completing the destructor.
To see it in action, here's a short demonstration program:
unit Main;
interface
uses
Windows, SysUtils, Classes, Controls, Forms, StdCtrls,
BlockingQueue, ConsumerThread;
type
TMainForm = class(TForm)
Memo1: TMemo;
TaskCount: TEdit;
Start: TButton;
Stop: TButton;
procedure StartClick(Sender: TObject);
procedure StopClick(Sender: TObject);
private
FQueue: TBlockingQueue<TProc>;
FThread: TConsumerThread;
procedure Proc;
procedure Output(const Msg: string);
end;
implementation
{$R *.dfm}
procedure TMainForm.Output(const Msg: string);
begin
TThread.Synchronize(FThread,
procedure
begin
Memo1.Lines.Add(Msg);
end
);
end;
procedure TMainForm.Proc;
begin
Output(Format('Consumer thread ID: %d', [GetCurrentThreadId]));
Sleep(1000);
end;
procedure TMainForm.StartClick(Sender: TObject);
var
i: Integer;
begin
Memo1.Clear;
Output(Format('Main thread ID: %d', [GetCurrentThreadId]));
FQueue := TBlockingQueue<TProc>.Create;
FThread := TConsumerThread.Create(FQueue);
for i := 1 to StrToInt(TaskCount.Text) do
FQueue.Enqueue(Proc);
end;
procedure TMainForm.StopClick(Sender: TObject);
begin
Output('Stop clicked, calling thread destructor');
FreeAndNil(FThread);
Output('Thread destroyed');
FreeAndNil(FQueue);
end;
end.
object MainForm: TMainForm
Caption = 'MainForm'
ClientHeight = 560
ClientWidth = 904
object Memo1: TMemo
Left = 0
Top = 96
Width = 904
Height = 464
Align = alBottom
end
object TaskCount: TEdit
Left = 8
Top = 8
Width = 121
Height = 21
Text = '10'
end
object Start: TButton
Left = 8
Top = 48
Width = 89
Height = 23
Caption = 'Start'
OnClick = StartClick
end
object Stop: TButton
Left = 120
Top = 48
Width = 75
Height = 23
Caption = 'Stop'
OnClick = StopClick
end
end