1

I am designing two components that asynchronously receive objects of a custom class (TMELogMessage) and store them in a thread-safe internal container. The first component is non visual (TMEFileLogger) and should write some info from these objects to a log file (non surprisingly). The second component (TMELogGrid) is a visual FMX.Grid descendant that should visualize some info from these objects in the UI. But what they do with these objects is, I think, irrelevant.

The problem I am facing is that these components do not actually know when these objects will be enqueued in their internal container, so they have to check the container themselves to see if there are any new objects that need processing, process them and remove them from the queue. Ideally I'd want this to be done when the application is not too busy, in a way similar to action updating, so as not to bog down the UI.

It is obviously wrong for a component to hook an event handler like Application.OnIdle... I could maybe subscribe to the TIdleMessage, but I'm not sure that is a good idea, as I've read that a some applications could never go idle. Using an internal timer seems a bit old-school. I could maybe use a low priority thread to poll the queue and then synchronize with the UI only when I find some object to process. I don't have other ideas though.

What is the proper way to do this in Delphi + multiplatform FireMonkey?

Frazz
  • 2,995
  • 2
  • 19
  • 33
  • I tend to use a somehow lazy update mode for this. An update event is triggered only every n milliseconds regardless how much items were received. And yes you need a timer that will fire the event after the delay – Sir Rufo Jul 19 '16 at 15:11
  • 3
    *some applications could never go idle* - if that occurs then the UI thread is blocked and the user cannot communicate with the application anymore and that is a design failure of the application. – Sir Rufo Jul 19 '16 at 16:23
  • 3
    "*It is obviously wrong for a component to hook an event handler like Application.OnIdle...*" - you can use the `TApplicationEvents` components for that. It allows multiple handlers to be assigned for the same `TApplication` events. "*I could maybe subscribe to the TIdleMessage*" - that is certainly an option. Or you can have your container use `TThread.Queue()` to schedule tasks on the main thread when new objects are pushed into the container. – Remy Lebeau Jul 19 '16 at 16:41

2 Answers2

2

I don't like to answer my own questions, but I wanted this question to be answered, as it might be helpful to others. While Deltics' answer is useful, that is not the way I decided to go. I followed the advice in Remy's comment and encapsulated everything in a message handling class that components and forms can use. So both the TMEFileLogger and the TMELogGrid now use an instance of this new TMEMessageHandler class.

Here is some interface code to explain what I did. Keep in mind that this is to be a substitute and enhancement of the rtl System.Messaging unit. The problem with the rtl messaging system is that it provides only for sending synchronous messages. I wanted a richer interface. This is what my message manager looks like:

  TMEMessageManager = Class
    ...
  Public
    ...
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
    Procedure PostDelayedMessage(Const Sender: TObject; AMessage: TMessage; Const DelayMSec: Cardinal; Const ADispose: Boolean = True); Inline;
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
    Procedure PostMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope; Const ADispose: Boolean = True); Inline;
    Procedure SendMessage(Const Sender: TObject; AMessage: TMessage; Const ADispose: Boolean = True); Inline;

    Function Subscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver): Integer; Overload;
    Function Subscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';
    Function Subscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure): Integer; Overload; Deprecated 'Use TMEMessageManager.Subscribe(AMessageClass, AReceiver)';

    Procedure Unsubscribe(Const AMessageClass: TClass; ID: Integer; Const Immediate: Boolean = False); Overload;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AReceiver: IMEEnvelopeReceiver; Const Immediate: Boolean = False); Overload;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AMethod: TMessageMethod; Const Immediate: Boolean = False); Overload; Deprecated;
    Procedure Unsubscribe(Const AMessageClass: TClass; Const AProcedure: TMessageProcedure; Const Immediate: Boolean = False); Overload; Deprecated;
    ...
  End;

The TMEMessageEnvelope is a wrapper for messages, defined as:

Type
  TMEMessageEnvelope = Class(TMEPersistent)
  Public
    ...
    Property Infos: TMEMessageInfos Read FInfos;
    Property Sender: TObject Read FSender;
    Property Msg: TMessage Read FMsg;
  End;

Receivers that subscribe via an envelope receiver will receive both synchronous and asynchronous messages. This is the preferred subscription method. Receivers that subscribe via an object method or via a procedure will only receive synchronous messages. This is maintained for compatibility with the RTL messaging system, but is deprecated.

The problem is that RTL messages cannot be posted, as they are. The subscribed consumers just provide a procedure or an object method to consume the message, right away. To post the message so as it can be consumed later, asynchronously, it needs to be wrapped and enqueued. This way the sender is isolated from the receivers. So actually... messages are posted (immediately or delayed) by first wrapping them in envelopes.

Here are the base interfaces envolved in this messaging system:

Type

  IMEClonableMessage = Interface(IInterface)
    ['{45B223E2-DCA8-4E42-9847-6A3FCC910891}']
    Function Clone: TMessage;
  End;

  IMEMessageSender = Interface(IInterface)
    ['{99AFDC4A-CE30-41A3-9AA5-D49F2F1106BD}']
    Procedure PostDelayedMessage(const M: TMessage; Const DelayMSec: Cardinal);
    Procedure PostMessage(Const M: TMessage);
    Procedure SendMessage(Const M: TMessage);
  End;

  IMEEnvelopeSender = Interface(IInterface)
    ['{C3AEC52C-A558-40AB-B07B-3000ECDB9C0C}']
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
  End;

  IMEEnvelopeReceiver = Interface(IInterface)
    ['{7D464713-2F25-4666-AAF8-757AF07688C3}']
    Procedure ClearEnvelopes;
    Procedure ProcessEnvelope;
    Procedure ProcessEnvelopes;
    Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure Subscribe(Const AMessageClass: TClass);
    Procedure Unsubscribe(Const AMessageClass: TClass);
  End;

The IMEClonableMessage interface is used to clone messages... asynchronous messages must be cloned... because if there are many subscribers to the same message, each will receive and consume the message in different times, so it is best that each has its own copy of the message.

The other interfaces are, I think, self explanatory.

And finally here is the TMEMessageHandler class:

  TMEMessageHandler = Class(TMEPersistent, IMEMessageSender, IMEEnvelopeSender, IMEEnvelopeReceiver)
    /// <summary>Basic thread-safe class that can send and receive synchronous and asynchronous messages and envelopes.</summary>
  Private
    FLock:                 TObject;
    FMessageManager:       TMEMessageManager;
    FSubscriptions:        TDictionary<TClass, Integer>;
    FEnvelopes:            TObjectList<TMEMessageEnvelope>;
    FOnReceiveEnvelope:    TReceiveEnvelopeEvent;
    FAutoProcessEnvelopes: Boolean;
    Procedure _Lock;
    Procedure _Unlock;
    Procedure ClearSubscriptions;
    Function GetMessageManager: TMEMessageManager;
    Procedure SetAutoProcessEnvelopes(Const Value: Boolean);
    Procedure SetMessageManager(Const Value: TMEMessageManager);
  Protected
    Function QueryInterface(Const IID: TGuid; Out Obj): HResult; Stdcall;
    Function _AddRef: Integer; Stdcall;
    Function _Release: Integer; Stdcall;
    Procedure DoReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure PostDelayedEnvelope(Const Envelope: TMEMessageEnvelope; Const DelayMSec: Cardinal);
    Procedure PostDelayedMessage(Const M: TMessage; Const DelayMSec: Cardinal);
    Procedure PostEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure PostMessage(Const M: TMessage);
    Procedure SendEnvelope(Const Envelope: TMEMessageEnvelope);
    Procedure SendMessage(Const M: TMessage);
    Function QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
    Procedure ReceiveEnvelope(Const Envelope: TMEMessageEnvelope);
  Public
    Constructor Create; Override;
    Destructor Destroy; Override;
    Procedure ClearEnvelopes;
    Procedure ProcessEnvelope;
    Procedure ProcessEnvelopes;
    Procedure Subscribe(Const AMessageClass: TClass);
    Procedure Unsubscribe(Const AMessageClass: TClass);
    Property AutoProcessEnvelopes: Boolean Read FAutoProcessEnvelopes Write SetAutoProcessEnvelopes Default True;
    Property MessageManager: TMEMessageManager Read GetMessageManager Write SetMessageManager;
    Property OnReceiveEnvelope: TReceiveEnvelopeEvent Read FOnReceiveEnvelope Write FOnReceiveEnvelope;
  End;

How all this works

The TMEMessageHandler immediately delegates to the MessageManager any Subscribe and Unsubscribe calls. It will always subscribe providing itself as an IMEEnvelopeReceiver. It keeps track of subscriptions in its internal dictionary so as to be more efficient when it unsubscribes.

It also immediately delegates any call to the Send, Post and PostDelayed methods. The TMEMessageManager:

  • Sends messages to subscribed procedures (as RTL)
  • Sends messages to subscribed object methods (as RTL)
  • Sends envelopes to subscribed receivers by calling their ReceiveEnvelope method
  • Posts envelopes (and envelope wrapped messages) to subscribed receivers by calling their QeueEnvelope method with a cloned copy of the envelope
  • Posts delayed envelopes (and envelope wrapped messages) to subscribed receivers by enqueing them first in an internal lightweight thread (TMEDelayedEnvelopeDeliverer) which has the message manager itself deliver them when the delay has passed

As a receiver, the TMEMessageHandler implements the ReceiveEnvelope by simply delegating to the OnReceiveEnvelope event-handler.

Posted envelopes are received by the QueueEnvelope method, which adds the envelope in its thread-safe queue and then, but only if AutoProcessEnvelopes is True, uses the main thread's Queue to call its own ProcessEnvelope method (as by Remy's suggestion):

Function TMEMessageHandler.QueueEnvelope(Const Envelope: TMEMessageEnvelope): Integer;
Begin
  _Lock;
  Try
    FEnvelopes.Add(Envelope);
    Result := FEnvelopes.Count;
  Finally
    _Unlock;
  End;
  If AutoProcessEnvelopes Then
    TThread.Queue(Nil,
      Procedure
      Begin
        ProcessEnvelope;
      End);
End;

The ProcessEnvelope method extracts the envelope from the thread-safe queue, calls the ReceiveEnvelope method (same as called by the message manager for synchronous messages) and then Frees the envelope (remember that this was a cloned copy just for this receiver):

Procedure TMEMessageHandler.ProcessEnvelope;
Var
  E: TMEMessageEnvelope;
Begin
  If FEnvelopes.Count > 0 Then Begin
    _Lock;
    Try
      E := FEnvelopes.Extract(FEnvelopes[0]);
    Finally
      _Unlock;
    End;
    E.UpdateInfo(mieReceived);
    ReceiveEnvelope(E);
    E.Free;
  End;
End;

The ProcessEnvelopes method just calls the former as many times as necessary to empty the asynchronous message queue:

Procedure TMEMessageHandler.ProcessEnvelopes;
Begin
  While FEnvelopes.Count > 0 Do
    ProcessEnvelope;
End;

How is the TMEMessageHandler used

Having defined TMELogMessage as an IMEClonableMessage to handle information to log, a minimal implementation for TMEFileLogger and other components looks like this:

Type
  TMEFileLogger = Class(TMEComponent)
  Private
    ...
    FMessageHandler:    TMEMessagehandler;
  Protected
    ...
    Procedure ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
    Property MessageHandler: TMEMessagehandler Read FMessageHandler;
  Public
    Constructor Create(AOwner: TComponent); Override;
    Destructor Destroy; Override;
    ...
  End;

Constructor TMEFileLogger.Create(AOwner: TComponent);
Begin
  Inherited;
  ...
  FMessageHandler                  := TMEMessagehandler.Create;
  MessageHandler.OnReceiveEnvelope := ReceiveLogEnvelope;
  MessageHandler.Subscribe(TMELogMessage);
End;

Destructor TMEFileLogger.Destroy;
Begin
  MessageHandler.Unsubscribe(TMELogMessage);
  MessageHandler.ProcessEnvelopes;
  FreeAndNil(FMessageHandler);
  ...
  Inherited;
End;

Procedure TMEFileLogger.ReceiveLogEnvelope(Const Envelope: TMEMessageEnvelope);
Begin
  If Envelope.Msg Is TMELogMessage Then
    With Envelope.Msg As TMELogMessage Do
      ... something useful ...
End;

Sorry for the long post, but I hope this can be useful to someone else.

Frazz
  • 2,995
  • 2
  • 19
  • 33
1

Queue implementations typically implement an event (OS synchronization object, not a VCL 'event') which application code can wait on. The event is set/fired/triggered/however you want to think of it whenever an item is added to an empty queue (or, if multiple items are added in a "batch", after they have all been added. The precise pattern may vary). If the queue in your case is your own implementation then I would consider adding such a mechanism to your implementation.

To avoid blocking the UI the application code creates a lightweight thread with the sole purpose of waiting on that queue event, de-queuing items from the queue into a UI thread-safe container and then notifying the UI thread that there are items to be processed. The monitoring thread then resumes waiting for the event to signal that there are yet more items in the queue.

In a VCL application the mechanism by which the monitoring thread notifies the UI could be a naive Synchronized procedure or (I would recommend) a message based notification posted to some form responsible for the UI processing of the items.

NOTE: The queue monitoring thread is also typically responsible for handling the case where the application/UI no longer cares about processing items (e.g. is shutting down) and so also listens for a "cancel" or "terminate" event which signals the thread to de-queue items but discard them (or deal with them in whatever way suits the application needs at this time) and then terminate (that is, the monitoring thread exits).

Deltics
  • 22,162
  • 2
  • 42
  • 70