Fulgan 1 Posted December 11, 2018 Hello, I have written a background task that communicate with a network-based message queue. Since the implementation makes use of SQLIte and SQLite does not like to access the same database from several threads at the same time, I serialized all access to the queue in a single thread and used the Otl message queue attached to the task to get and send the messages from the main thread (and other background threads). Messages in and out are posted into the task's "comm" property and I have a written a dedicated, thread-safe class that maintains a collections of listeners (lambda functions that are interested in a given message). Once a handler has been "triggered" it can (optionally) be removed from the list. This worked great but I encountered an issue that I can't quite find how to solve: I wanted to write a synchronous method that would post a message to the (network) queue and wait until it gets an answer or times out (because synchronous calls are easier to use for my co-workers). Yet, I don't want to block in the main loop because other messages will need to be received and handled. So, for the synchronous call, I do the following: - Create a Cancellation token. - Add a handler for the message I'm expecting. Once the message is received, the token is signaled and the handler removed from the list. - Send the "query" message to the queue. - Wait until either the token gets signaled or the timeout has elapsed. (the code is slightly more complex that this but that is the idea). My problem is that, in my first version of the code, my handler never received the reply (I could see it going through the message queue). So, I added a single call to "Application.processMessages" after sending the message out and then, suddenly, the code worked. Only, I can't understand what I'm doing wrong: the handler is added before the message is being sent and everything else is going through the tasks's comm facility. Is there any explanation about what's happening ? How can I removed that requirement to call application.processmessages ? Here is the code that I used (I cleared it up a bit): The management task is created like this (from the main thread): FTask := CreateTask( procedure(const task: IOmniTask) var locQueue: IGMQueue; // This interface represent the networked message queue system Timer: TStopWatch; AThreadmessage: TOmniMessage; Messages: IList<IGMQMessage>; // This is a generic list from Spring4d. begin locQueue := FMessageQueue.Clone; // Make a local copy of the (global) message queue to make sure we have an unconnected instance locQueue.Connect(true); // Connect to the network message queue Timer := TStopWatch.StartNew; // record when the thread starts while not Task.CancellationToken.IsSignalled and not Task.Terminated do // main thread loop begin // queue maintenance if Timer.ElapsedMilliseconds > 10000 then // every 10 seconds only begin locQueue.Maintenance(1000); Timer := TStopWatch.StartNew; // remember when the last maintenance was performed. end; // send out new messages while Task.Comm.Receive(AThreadmessage) do begin case AThreadmessage.MsgID of WM_SENDMESSAGE: begin if Supports(AThreadmessage.MsgData.AsInterface, IGMQMessage) then // IGMQMessage is an interface that abstract a single message queue message begin locQueue.Enqueue(AThreadmessage.MsgData.AsInterface as IGMQMessage); end; end; end; end; // receive all messages addressed to us if locQueue.TryDequeueAll(Messages) then begin MessageQueueRouter.RouteMessages(Messages); // route messages to handlers end; Sleep(100); end; end, 'Message queue processor') .CancelWith(FCancellationToken).Schedule; RouteMessages contains a multimap connection of message types and handlers (which are simply lambda to be called): when a message is received, all handlers that have been registered for that message type are invoked and the handler is optionally removed from the list. So, in order to perform by synchronous version, I wrote this: var AMessageType: String; Waitable: IOmniCancellationToken; ReceivedMessage: IGMQMessage; Timer: TStopWatch; begin result := nil; ReceivedMessage := nil; if IsActive then begin Waitable := CreateOmniCancellationToken; // add a handler to receive the first reply message from the queue FHandlers.Add( AMessageType, // a string that specify what message type I'm interested in TMessageHandlerData.Create( function(const Src: IGMQMessage; out reply: IGMQMessage): boolean begin // this code is called when a message of the correct type is received if not Waitable.IsSignalled then // check if there is still someone listening ReceivedMessage := Src; // copy the message Waitable.Signal; // signal that we have something result := false; // returning false will tell the handler management class to remove this handler from the list. end, Waitable // the FHandlers inplementation will check if this token is signalled and remove the handler from the queue if it is. )); // send the query message to the network message queue Ftask.Comm.Send(WM_SENDMESSAGE, SrcMessage);// SrcMessage is the source message. It will cause other applications connected to the same queue to send a message back. Application.ProcessMessages; // ---- this is what I don't get. if I don't put this call here, I don't get the message later Timer := TStopWatch.StartNew; // how long have we been waiting while (not Waitable.IsSignalled) and (Timer.ElapsedMilliseconds < TimeOut) do // loop until either we time out or we get a message back begin sleep(10); end; if Waitable.IsSignalled and (ReceivedMessage <> nil) then // cehck if we had an answer result := ReceivedMessage // yup, send it back else Waitable.Signal; // no, signal the token so the handler will know it cannot send the result back (if it is running) or the tells the FHandlers instance that it can ignore and remove the handler end; end; 1 Share this post Link to post
Primož Gabrijelčič 223 Posted December 11, 2018 When you send a message from a background thread to the main thread, the message is inserted into internal message queue and a Windows message is dispatched to a hidden window created by the main thread. (Actually, by the thread that created the task, but in your case this is the main thread.) When this hidden window receives and processes the message, all messages from the internal message queue are fetched and processed. The Application.ProcessMessage call triggers the message loop so Windows messages may be processed. Similarly, if a background thread creates OTL task, that background thread (the owner) must call MsgWaitForMultipleObjects[Ex] so that messages are processed. (Alternatively, if that background thread (the owner) is itself an OTL thread, you can create it with modifier .MsgWait.) IOW, if you want to use messaging subsystem, messages must be processed. You can pump the FTask.Comm channel manually during the wait. Instead of Sleep(10) you can call FTask.Comm.Receive(msg, 10). This will return False on no message and True if there was a message in the queue (in which case it is fetched and stored in the `msg` parameter.) Then you will not need Application.ProcessMessages. Share this post Link to post
Fulgan 1 Posted December 13, 2018 Thank you for the explanation. I have added the .MsgWait call to the management thread's contruction and it solved the issue. I have two questions left, however, regarding your suggestion that I add FTask.Comm.Receive(msg, 10) inside the wait loop: Won't that "steal" messages from the queue or that this will only grab messages coming from the management thread, not the ones going to it? Why doesn't the "Task.Comm.Receive(AThreadmessage)" instruction in the management thread do the same thing? Thanks again Share this post Link to post
Primož Gabrijelčič 223 Posted December 13, 2018 I thought your code that calls Application.ProcessMessages runs in the main thread. (Otherwise you really should not do that.) If it is running in a background thread, I don't know. I would have to see a working (and as minimal as possible) test program that exhibits your problem to tell you more. Re 1: I assumed FTask is your task controller. So reading from its Comm property would read messages sent FROM the task. Re 2: Task.Comm.Receive runs in the task thread and it DOES receive messages that were sent TO the task. Share this post Link to post
Fulgan 1 Posted December 13, 2018 The code that call Appliction.processMessages do run in the main thread (or should, really). The management thread is the one that will read messages from the network queue and pass then to the local (OmniThread) queue. It's that process that, somehow, didn't work (the code in the main thread would loop on sleep(10) waiting for a reply to be sent from the network and would always timeout. It was only after the code returned from the whole code that the message with the reply would appear in the queue). I'll try to write something simple that demonstrate the problem I was facing. In any case, thank you again for you help 🙂 Share this post Link to post