Search the Community
Showing results for tags 'question'.
Found 1 result
-
message posted to a queue not processed until application.processmessage is called
Fulgan posted a topic in OmniThreadLibrary
Hello, I have written a background task that communicate with a network-based message queue. Since the implementation makes use of SQLIte and SQLite does not like to access the same database from several threads at the same time, I serialized all access to the queue in a single thread and used the Otl message queue attached to the task to get and send the messages from the main thread (and other background threads). Messages in and out are posted into the task's "comm" property and I have a written a dedicated, thread-safe class that maintains a collections of listeners (lambda functions that are interested in a given message). Once a handler has been "triggered" it can (optionally) be removed from the list. This worked great but I encountered an issue that I can't quite find how to solve: I wanted to write a synchronous method that would post a message to the (network) queue and wait until it gets an answer or times out (because synchronous calls are easier to use for my co-workers). Yet, I don't want to block in the main loop because other messages will need to be received and handled. So, for the synchronous call, I do the following: - Create a Cancellation token. - Add a handler for the message I'm expecting. Once the message is received, the token is signaled and the handler removed from the list. - Send the "query" message to the queue. - Wait until either the token gets signaled or the timeout has elapsed. (the code is slightly more complex that this but that is the idea). My problem is that, in my first version of the code, my handler never received the reply (I could see it going through the message queue). So, I added a single call to "Application.processMessages" after sending the message out and then, suddenly, the code worked. Only, I can't understand what I'm doing wrong: the handler is added before the message is being sent and everything else is going through the tasks's comm facility. Is there any explanation about what's happening ? How can I removed that requirement to call application.processmessages ? Here is the code that I used (I cleared it up a bit): The management task is created like this (from the main thread): FTask := CreateTask( procedure(const task: IOmniTask) var locQueue: IGMQueue; // This interface represent the networked message queue system Timer: TStopWatch; AThreadmessage: TOmniMessage; Messages: IList<IGMQMessage>; // This is a generic list from Spring4d. begin locQueue := FMessageQueue.Clone; // Make a local copy of the (global) message queue to make sure we have an unconnected instance locQueue.Connect(true); // Connect to the network message queue Timer := TStopWatch.StartNew; // record when the thread starts while not Task.CancellationToken.IsSignalled and not Task.Terminated do // main thread loop begin // queue maintenance if Timer.ElapsedMilliseconds > 10000 then // every 10 seconds only begin locQueue.Maintenance(1000); Timer := TStopWatch.StartNew; // remember when the last maintenance was performed. end; // send out new messages while Task.Comm.Receive(AThreadmessage) do begin case AThreadmessage.MsgID of WM_SENDMESSAGE: begin if Supports(AThreadmessage.MsgData.AsInterface, IGMQMessage) then // IGMQMessage is an interface that abstract a single message queue message begin locQueue.Enqueue(AThreadmessage.MsgData.AsInterface as IGMQMessage); end; end; end; end; // receive all messages addressed to us if locQueue.TryDequeueAll(Messages) then begin MessageQueueRouter.RouteMessages(Messages); // route messages to handlers end; Sleep(100); end; end, 'Message queue processor') .CancelWith(FCancellationToken).Schedule; RouteMessages contains a multimap connection of message types and handlers (which are simply lambda to be called): when a message is received, all handlers that have been registered for that message type are invoked and the handler is optionally removed from the list. So, in order to perform by synchronous version, I wrote this: var AMessageType: String; Waitable: IOmniCancellationToken; ReceivedMessage: IGMQMessage; Timer: TStopWatch; begin result := nil; ReceivedMessage := nil; if IsActive then begin Waitable := CreateOmniCancellationToken; // add a handler to receive the first reply message from the queue FHandlers.Add( AMessageType, // a string that specify what message type I'm interested in TMessageHandlerData.Create( function(const Src: IGMQMessage; out reply: IGMQMessage): boolean begin // this code is called when a message of the correct type is received if not Waitable.IsSignalled then // check if there is still someone listening ReceivedMessage := Src; // copy the message Waitable.Signal; // signal that we have something result := false; // returning false will tell the handler management class to remove this handler from the list. end, Waitable // the FHandlers inplementation will check if this token is signalled and remove the handler from the queue if it is. )); // send the query message to the network message queue Ftask.Comm.Send(WM_SENDMESSAGE, SrcMessage);// SrcMessage is the source message. It will cause other applications connected to the same queue to send a message back. Application.ProcessMessages; // ---- this is what I don't get. if I don't put this call here, I don't get the message later Timer := TStopWatch.StartNew; // how long have we been waiting while (not Waitable.IsSignalled) and (Timer.ElapsedMilliseconds < TimeOut) do // loop until either we time out or we get a message back begin sleep(10); end; if Waitable.IsSignalled and (ReceivedMessage <> nil) then // cehck if we had an answer result := ReceivedMessage // yup, send it back else Waitable.Signal; // no, signal the token so the handler will know it cannot send the result back (if it is running) or the tells the FHandlers instance that it can ignore and remove the handler end; end;