Jump to content
RadStudio 10.3.1 was released today Read more... ×

Fulgan

Members
  • Content Count

    5
  • Joined

  • Last visited

Community Reputation

1 Neutral

Recent Profile Visitors

The recent visitors block is disabled and is not being shown to other users.

  1. No, they just get notified that a new message has arrived and can request it the from the object that manages the queue. Ok, thanks 🙂 I'll work something out. I'll try to, sure.
  2. Hello, I have an issue with the TOmniContainerWindowsMessageObserver and observers in general. I use one instance of TOmniMessageQueue to send data from a background thread to the main thread. However, it happens that several window are interested in receiving messages from the queue. So, I setup things like this: each form will create a TOmniContainerWindowsMessageObserver observer and attach it to the TOmniMessageQueue like this during the form's show event: MQObserver := CreateContainerWindowsMessageObserver(handle, WM_NEW_ITEM, 0, 0); // Create an observer that will notify the main form when new messages are received UserMessageQueue.ContainerSubject.Attach(MQObserver, coiNotifyOnAllInserts); // attach the observer to the message queue Likewise, when the form closes, the observer is detached and destroyed: UserMessageQueue.ContainerSubject.Detach(MQObserver, coiNotifyOnAllInserts); // we don't need the observer anymore FreeAndNil(MQObserver); My problem is that as soon as one observer is detached from the queue, all other observer stops notifying the other windows of the fact that a new message has been inserted. What am I missing here?
  3. The code that call Appliction.processMessages do run in the main thread (or should, really). The management thread is the one that will read messages from the network queue and pass then to the local (OmniThread) queue. It's that process that, somehow, didn't work (the code in the main thread would loop on sleep(10) waiting for a reply to be sent from the network and would always timeout. It was only after the code returned from the whole code that the message with the reply would appear in the queue). I'll try to write something simple that demonstrate the problem I was facing. In any case, thank you again for you help 🙂
  4. Thank you for the explanation. I have added the .MsgWait call to the management thread's contruction and it solved the issue. I have two questions left, however, regarding your suggestion that I add FTask.Comm.Receive(msg, 10) inside the wait loop: Won't that "steal" messages from the queue or that this will only grab messages coming from the management thread, not the ones going to it? Why doesn't the "Task.Comm.Receive(AThreadmessage)" instruction in the management thread do the same thing? Thanks again
  5. Hello, I have written a background task that communicate with a network-based message queue. Since the implementation makes use of SQLIte and SQLite does not like to access the same database from several threads at the same time, I serialized all access to the queue in a single thread and used the Otl message queue attached to the task to get and send the messages from the main thread (and other background threads). Messages in and out are posted into the task's "comm" property and I have a written a dedicated, thread-safe class that maintains a collections of listeners (lambda functions that are interested in a given message). Once a handler has been "triggered" it can (optionally) be removed from the list. This worked great but I encountered an issue that I can't quite find how to solve: I wanted to write a synchronous method that would post a message to the (network) queue and wait until it gets an answer or times out (because synchronous calls are easier to use for my co-workers). Yet, I don't want to block in the main loop because other messages will need to be received and handled. So, for the synchronous call, I do the following: - Create a Cancellation token. - Add a handler for the message I'm expecting. Once the message is received, the token is signaled and the handler removed from the list. - Send the "query" message to the queue. - Wait until either the token gets signaled or the timeout has elapsed. (the code is slightly more complex that this but that is the idea). My problem is that, in my first version of the code, my handler never received the reply (I could see it going through the message queue). So, I added a single call to "Application.processMessages" after sending the message out and then, suddenly, the code worked. Only, I can't understand what I'm doing wrong: the handler is added before the message is being sent and everything else is going through the tasks's comm facility. Is there any explanation about what's happening ? How can I removed that requirement to call application.processmessages ? Here is the code that I used (I cleared it up a bit): The management task is created like this (from the main thread): FTask := CreateTask( procedure(const task: IOmniTask) var locQueue: IGMQueue; // This interface represent the networked message queue system Timer: TStopWatch; AThreadmessage: TOmniMessage; Messages: IList<IGMQMessage>; // This is a generic list from Spring4d. begin locQueue := FMessageQueue.Clone; // Make a local copy of the (global) message queue to make sure we have an unconnected instance locQueue.Connect(true); // Connect to the network message queue Timer := TStopWatch.StartNew; // record when the thread starts while not Task.CancellationToken.IsSignalled and not Task.Terminated do // main thread loop begin // queue maintenance if Timer.ElapsedMilliseconds > 10000 then // every 10 seconds only begin locQueue.Maintenance(1000); Timer := TStopWatch.StartNew; // remember when the last maintenance was performed. end; // send out new messages while Task.Comm.Receive(AThreadmessage) do begin case AThreadmessage.MsgID of WM_SENDMESSAGE: begin if Supports(AThreadmessage.MsgData.AsInterface, IGMQMessage) then // IGMQMessage is an interface that abstract a single message queue message begin locQueue.Enqueue(AThreadmessage.MsgData.AsInterface as IGMQMessage); end; end; end; end; // receive all messages addressed to us if locQueue.TryDequeueAll(Messages) then begin MessageQueueRouter.RouteMessages(Messages); // route messages to handlers end; Sleep(100); end; end, 'Message queue processor') .CancelWith(FCancellationToken).Schedule; RouteMessages contains a multimap connection of message types and handlers (which are simply lambda to be called): when a message is received, all handlers that have been registered for that message type are invoked and the handler is optionally removed from the list. So, in order to perform by synchronous version, I wrote this: var AMessageType: String; Waitable: IOmniCancellationToken; ReceivedMessage: IGMQMessage; Timer: TStopWatch; begin result := nil; ReceivedMessage := nil; if IsActive then begin Waitable := CreateOmniCancellationToken; // add a handler to receive the first reply message from the queue FHandlers.Add( AMessageType, // a string that specify what message type I'm interested in TMessageHandlerData.Create( function(const Src: IGMQMessage; out reply: IGMQMessage): boolean begin // this code is called when a message of the correct type is received if not Waitable.IsSignalled then // check if there is still someone listening ReceivedMessage := Src; // copy the message Waitable.Signal; // signal that we have something result := false; // returning false will tell the handler management class to remove this handler from the list. end, Waitable // the FHandlers inplementation will check if this token is signalled and remove the handler from the queue if it is. )); // send the query message to the network message queue Ftask.Comm.Send(WM_SENDMESSAGE, SrcMessage);// SrcMessage is the source message. It will cause other applications connected to the same queue to send a message back. Application.ProcessMessages; // ---- this is what I don't get. if I don't put this call here, I don't get the message later Timer := TStopWatch.StartNew; // how long have we been waiting while (not Waitable.IsSignalled) and (Timer.ElapsedMilliseconds < TimeOut) do // loop until either we time out or we get a message back begin sleep(10); end; if Waitable.IsSignalled and (ReceivedMessage <> nil) then // cehck if we had an answer result := ReceivedMessage // yup, send it back else Waitable.Signal; // no, signal the token so the handler will know it cannot send the result back (if it is running) or the tells the FHandlers instance that it can ignore and remove the handler end; end;
×