Jump to content
Fulgan

message posted to a queue not processed until application.processmessage is called

Recommended Posts

Hello,

 

I have written a background task that communicate with a network-based message queue. Since the implementation makes use of SQLIte and SQLite does not like to access the same database from several threads at the same time, I serialized all access to the queue in a single thread and used the Otl message queue attached to the task to get and send the messages from the main thread (and other background threads).

 

Messages in and out are posted into the task's "comm" property and I have a written a dedicated, thread-safe class that maintains a collections of listeners (lambda functions that are interested in a given message). Once a handler has been "triggered" it can (optionally) be removed from the list.

 

This worked great but I encountered an issue that I can't quite find how to solve: I wanted to write a synchronous method that would post a message to the (network) queue and wait until it gets an answer or times out (because synchronous calls are easier to use for my co-workers). Yet, I don't want to block in the main loop because other messages will need to be received and handled.

 

So, for the synchronous call, I do the following:

- Create a Cancellation token.

- Add a handler for the message I'm expecting. Once the message is received, the token is signaled and the handler removed from the list.

- Send the "query" message to the queue.

- Wait until either the token gets signaled or the timeout has elapsed.

 

(the code is slightly more complex that this but that is the idea).

 

My problem is that, in my first version of the code, my handler never received the reply (I could see it going through the message queue). So, I added a single call to "Application.processMessages" after sending the message out and then, suddenly, the code worked. Only, I can't understand what I'm doing wrong: the handler is added before the message is being sent and everything else is going through the tasks's comm facility.

 

Is there any explanation about what's happening ? How can I removed that requirement to call application.processmessages ?

 

Here is the code that I used (I cleared it up a bit):

 

 

The management task is created like this (from the main thread):

  FTask := CreateTask(
    procedure(const task: IOmniTask)
    var
      locQueue: IGMQueue; // This interface represent the networked message queue system
      Timer: TStopWatch;
      AThreadmessage: TOmniMessage;
      Messages: IList<IGMQMessage>; // This is a generic list from Spring4d.
    begin
      locQueue := FMessageQueue.Clone; // Make a local copy of the (global) message queue to make sure we have an unconnected instance
      locQueue.Connect(true);  // Connect to the network message queue
      Timer := TStopWatch.StartNew; // record when the thread starts
      while not Task.CancellationToken.IsSignalled and not Task.Terminated do // main thread loop
      begin
        // queue maintenance
        if Timer.ElapsedMilliseconds > 10000 then // every 10 seconds only
        begin
          locQueue.Maintenance(1000);
          Timer := TStopWatch.StartNew; // remember when the last maintenance was performed.
        end;
        // send out new messages
        while Task.Comm.Receive(AThreadmessage) do
        begin
          case AThreadmessage.MsgID of
            WM_SENDMESSAGE:
            begin
              if Supports(AThreadmessage.MsgData.AsInterface, IGMQMessage) then // IGMQMessage is an interface that abstract a single message queue message
              begin
                locQueue.Enqueue(AThreadmessage.MsgData.AsInterface as IGMQMessage);
              end;
            end;
          end;
        end;
        // receive all messages addressed to us
        if locQueue.TryDequeueAll(Messages) then
        begin
          MessageQueueRouter.RouteMessages(Messages); // route messages to handlers
        end;
        Sleep(100);
      end;
    end,
    'Message queue processor')
    .CancelWith(FCancellationToken).Schedule;

 

RouteMessages contains a multimap connection of message types and handlers (which are simply lambda to be called): when a message is received, all handlers that have been registered for that message type are invoked and the handler is optionally removed from the list.

 

So, in order to perform by synchronous version, I wrote this:

 

var
  AMessageType: String;
  Waitable: IOmniCancellationToken;
  ReceivedMessage: IGMQMessage;
  Timer: TStopWatch;
begin
  result := nil;
  ReceivedMessage := nil;
  if IsActive then
  begin
    Waitable := CreateOmniCancellationToken;
    // add a handler to receive the first reply message from the queue
    FHandlers.Add(
          AMessageType, // a string that specify what message type I'm interested in
            TMessageHandlerData.Create(
            function(const Src: IGMQMessage; out reply: IGMQMessage): boolean
            begin // this code is called when a message of the correct type is received
              if not Waitable.IsSignalled then // check if there is still someone listening
                ReceivedMessage := Src; // copy the message
              Waitable.Signal; // signal that we have something
              result := false; // returning false will tell the handler management class to remove this handler from the list.
            end,
            Waitable // the FHandlers inplementation will check if this token is signalled and remove the handler from the queue if it is.
            ));
    // send the query message to the network message queue        
    Ftask.Comm.Send(WM_SENDMESSAGE, SrcMessage);// SrcMessage is the source message. It will cause other applications connected to the same queue to send a message back.
    Application.ProcessMessages; // ---- this is what I don't get. if I don't put this call here, I don't get the message later
    Timer := TStopWatch.StartNew; // how long have we been waiting
    while (not Waitable.IsSignalled) and (Timer.ElapsedMilliseconds < TimeOut)  do // loop until either we time out or we get a message back
    begin
      sleep(10);
    end;
    if Waitable.IsSignalled and (ReceivedMessage <> nil) then // cehck if we had an answer
      result := ReceivedMessage // yup, send it back
    else
      Waitable.Signal; // no, signal the token so the handler will know it cannot send the result back (if it is running) or the tells the FHandlers instance that it can ignore and remove the handler
  end;
end;

 

  • Like 1

Share this post


Link to post

When you send a message from a background thread to the main thread, the message is inserted into internal message queue and a Windows message is dispatched to a hidden window created by the main thread. (Actually, by the thread that created the task, but in your case this is the main thread.) When this hidden window receives and processes the message, all messages from the internal message queue are fetched and processed.

 

The Application.ProcessMessage call triggers the message loop so Windows messages may be processed.

 

Similarly, if a background thread creates OTL task, that background thread (the owner) must call MsgWaitForMultipleObjects[Ex] so that messages are processed. (Alternatively, if that background thread (the owner) is itself an OTL thread, you can create it with modifier .MsgWait.)

 

IOW, if you want to use messaging subsystem, messages must be processed. 

 

You can pump the FTask.Comm channel manually during the wait. Instead of Sleep(10) you can call FTask.Comm.Receive(msg, 10). This will return False on no message and True if there was a message in the queue (in which case it is fetched and stored in the `msg` parameter.) Then you will not need Application.ProcessMessages.

Share this post


Link to post

Thank you for the explanation. I have added the .MsgWait call to the management thread's contruction and it solved the issue.

 

I have two questions left, however, regarding your suggestion that I add FTask.Comm.Receive(msg, 10) inside the wait loop:

  • Won't that "steal" messages from the queue or that this will only grab messages coming from the management thread, not the ones going to it?
  • Why doesn't the "Task.Comm.Receive(AThreadmessage)" instruction in the management thread do the same thing?

 

Thanks again

Share this post


Link to post

I thought your code that calls Application.ProcessMessages runs in the main thread. (Otherwise you really should not do that.)

 

If it is running in a background thread, I don't know. I would have to see a working (and as minimal as possible) test program that exhibits your problem to tell you more.

 

Re 1: I assumed FTask is your task controller. So reading from its Comm property would read messages sent FROM the task. 

 

Re 2: Task.Comm.Receive runs in the task thread and it DOES receive messages that were sent TO the task.

 

Share this post


Link to post

The code that call Appliction.processMessages do run in the main thread (or should, really). The management thread is the one that will read messages from the network queue and pass then to the local (OmniThread) queue. It's that process that, somehow, didn't work (the code in the main thread would loop on sleep(10) waiting for a reply to be sent from the network and would always timeout. It was only after the code returned from the whole code that the message with the reply would appear in the queue).

 

I'll try to write something simple that demonstrate the problem I was facing.

 

In any case, thank you again for you help 🙂

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×