kokoslolos 1 Posted March 5, 2019 Hi all, Ā For a system I'm currently designing and developing I have the need for a queue that N threads are going to add items and a single thread is going to consume them I have created a class and tested it, it works well in my opinion and I thought to share it here for your opinion as well. Maybe I missed something and it's not actually thread safe. The basic idea is a "ring" buffer and two indexes for read and write access. Also I took as granted that reading an integer is an atomic operation. Please keep in mind the code is draft and definitely needs polishing and more checks.Ā And here it is: unit uMWSRList; interface uses Windows, Classes, SyncObjs, SysUtils; const MAX_SIZE = 32; type TMWSRList = class(TObject) private FWriteIndex : Integer; FReadIndex : Integer; FAvailableCount : Integer; FItems : Array [0..MAX_SIZE-1] of Integer; FOnNewItemAdd : TNotifyEvent; public constructor Create; function AddItem(pItem : Integer):Integer; function GetItem():Integer; function ItemsAvailable():Boolean; property OnNewItemAdd : TNotifyEvent read FOnNewItemAdd write FOnNewItemAdd; end; implementation { TMWSRList } constructor TMWSRList.Create; begin FWriteIndex := -1; FReadIndex := -1; end; function TMWSRList.AddItem(pItem : Integer):Integer; var aOverFlow : Integer; begin Result := InterlockedIncrement(FWriteIndex); if Result >= MAX_SIZE then begin aOverFlow := Result - MAX_SIZE; { Reset FWriteIndex when it reaches MAX_SIZE } InterlockedCompareExchange(FWriteIndex, -1, MAX_SIZE + aOverFlow); Result := InterlockedIncrement(FWriteIndex); end; FItems[Result] := pItem; InterlockedIncrement(FAvailableCount); if Assigned(FOnNewItemAdd) then FOnNewItemAdd(self); end; function TMWSRList.GetItem():Integer; begin if FAvailableCount > 0 then begin Inc(FReadIndex); if FReadIndex = MAX_SIZE then FReadIndex := 0; Result := FItems[FReadIndex]; InterlockedDecrement(FAvailableCount); end else Result := -1; end; function TMWSRList.ItemsAvailable():Boolean; begin Result := FAvailableCount > 0; end; end. Ā Regards. Ā Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 5, 2019 Definitely doesn't work correctly. For starters, two parallel readers would clash horribly. Ā If you want to implement lock-free structures, start by writing thorough stress-tests. They should run different scenarios - one reader, few readers, many readersĀ ... very many readers (more than the number of cores in the system) - in a loop which runs for a long time (minutes, at least, preferably more).Ā Ā Ā If you want to use something that is well-tested, takeĀ TOmniBoundedQueueĀ from the OmniThreadLibrary.ļ»æ 1 Share this post Link to post
kokoslolos 1 Posted March 5, 2019 I know for sure that it doesn't work for more than one readers, my implementation is specific for single reader and I plan to add checks to make sure only one thread reads. If you see GetItem(), I intentionally used Inc() and not Interlocked version knowing that only one reader will exist. Having that in mind, Primož GabrijelÄiÄ, do you think it's a good start? Also thank you dummzeuch for your reply, will check your implementation. 1 Share this post Link to post
dummzeuch 1505 Posted March 5, 2019 48 minutes ago, Primož GabrijelÄiÄ said: two parallel readers would clash horribly. It explicitly says "single reader", so that's a non issue. I agree with the rest of your post though. Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 5, 2019 19 minutes ago, dummzeuch said: It explicitly says "single reader", so that's a non issue.ļ»æ Ā Aaaaah, single READER! Sorry, folks. That is such a weird concept (although, I admit, useful in some situations) that my brain autocorrected it into single WRITER. š šĀ Silly brain! Share this post Link to post
Anders Melander 1783 Posted March 5, 2019 1 hour ago, dummzeuch said: Just in case you are interested: Here is my implementation of a lock free MultiWriteSingeRead queue. Beware that I have never really tested it. There's at least one race condition. Stopped reading once I spotted it: Ā Ā if InterlockedDecrement(FActiveWriters) = 0 then FActiveWritersZeroEvent.SetEvent; Ā Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 5, 2019 The writer also doesn't work. Ā function TMWSRList.AddItem(pItem : Integer):Integer; var aOverFlow : Integer; begin Result := InterlockedIncrement(FWriteIndex); if Result >= MAX_SIZE then begin aOverFlow := Result - MAX_SIZE; { Reset FWriteIndex when it reaches MAX_SIZE } InterlockedCompareExchangeļ»æ(FWriteIndex, -1, MAX_SIZE + aOverFlow); Result := InterlockedIncrement(FWriteIndex); end;ļ»æ FItems[Result] := pItem; InterlockedIncrement(FAvailableCount); if Assigned(FOnNewItemAdd) then FOnNewItemAdd(self); end; FWriteIndex is at the end of array. Two threads call AddItem. Both threads increment FWriteIndex. Both threads enter `if`. Second thread executes InterlockedCompareExchange and InterlockedIncrement. That will skip slot 0 which will never be used, if I'm not mistaken. Plus - what use is InterlockedCompareExchange if you never test if the exchange was successful? Ā Plus - the code doesn't work if the queue overflows (reader stops but writers keep writing). But that is by design, I guess? Ā Share this post Link to post
Anders Melander 1783 Posted March 5, 2019 (edited) If you can live with FIFO LIFO/FILO/stack semantics then you can use Windows SLists. They're lock free and can be used with multiple readers and writers. Edited March 5, 2019 by Anders Melander fixed FIFO 1 Share this post Link to post
Anders Melander 1783 Posted March 5, 2019 You have a problem so you decide to use threads. you problems.2 have Now 3 4 Share this post Link to post
kokoslolos 1 Posted March 5, 2019 24 minutes ago, Primož GabrijelÄiÄ said: The writer also doesn't work. FWriteIndex is at the end of array. Two threads call AddItem. Both threads increment FWriteIndex. Both threads enter `if`. Second thread executes InterlockedCompareExchange and InterlockedIncrement. That will skip slot 0 which will never be used, if I'm not mistaken. Plus - what use is InterlockedCompareExchange if you never test if the exchange was successful? Ā Plus - the code doesn't work if the queue overflows (reader stops but writers keep writing). But that is by design, I guess? Ā I expect that more than two threads will enter "if", but only one will reset the FWriteIndex because of the "MAX_SIZE + aOverFlow" passed to InterlockedCompareExchange. The writer I believe works ok :). About the overflow, it's one of the checks I plan to add, even though I believe the single reader will always be fast enough to consume the items. Thank you for your time! Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 6, 2019 As I said before, don't "believe" that the writer works ok - run tests! Share this post Link to post
DiGi 14 Posted March 6, 2019 Just one complete offtopic "funny" web: https://deadlockempire.github.io/ - find deadlocks yourself... Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 6, 2019 51 minutes ago, DiGi said: Just one complete offtopic "funny" web: https://deadlockempire.github.io/ - find deadlocks yourself... There's a Delphi version, too:Ā http://deadlockempire.4delphi.com/delphi/ 2 1 Share this post Link to post
David Schwartz 426 Posted March 6, 2019 This can be done without locks, but it takes two flags, an atomic test-and-set operation (or increment-and-return-value), as well as a strict protocol that must be followed by everybody. Ā I don't see evidence of that here. Ā As an aside, what options are there in Delphi for an atomic test-and-set or incr-and-return-value operation? Ā I'm not sure anything at Delphi's source language level could be considered "atomic"Ā for concurrency purposes. Ā TRIVIA FACTOID: There was never a real-time or multi-tasking version of any OS built that ran in "protected mode" on Intel's 80286 chip, because the microinstructions that implemented the PUSH IP operation in protected mode could be interrupted between incrementing SP and setting [SP] = IP. Share this post Link to post
Guest Posted March 6, 2019 41 minutes ago, David Schwartz said: what options are there in Delphi for an atomic test-and-set or incr-and-return-value operation? System.Atomic* routines (e.g. AtomicCmpExchange) TInterlocked in SyncObjs Share this post Link to post
dummzeuch 1505 Posted March 6, 2019 15 hours ago, Anders Melander said: There's at least one race condition. Stopped reading once I spotted it: Ā Ā if InterlockedDecrement(FActiveWriters) = 0 then FActiveWritersZeroEvent.SetEvent; Ā I think we should not discuss my version in this thread as it is mainly about koskolos' implementation. As I said: I wrote that back when I read about lock free structures but never really tested it thoroughly and lost interest later. It would be too confusing to discuss both in the same thread. Share this post Link to post
Anders Melander 1783 Posted March 6, 2019 8 minutes ago, dummzeuch said: I think we should not discuss my version in this thread as it is mainly about koskolos' implementation. Then you shouldn't have mentioned it. If you post something as an example of a lock free implementation and it's clearly not thread safe, then I think it's fair that people are warned about that fact. Share this post Link to post
dummzeuch 1505 Posted March 6, 2019 I have asked an admin to remove that post. I have no time now to actually check that race condition you mention. Share this post Link to post
Lars Fosdal 1792 Posted March 6, 2019 16 hours ago, Primož GabrijelÄiÄ said: Ā Aaaaah, single READER! Sorry, folks. That is such a weird concept (although, I admit, useful in some situations) that my brain autocorrected it into single WRITER. š šĀ Silly brain! I use MWSR queues for stuff like logging. Multiple threads queue things for logging, and a single log thread does the actual work. Basically, it is a useable model for any kind of data that in the end needs to be serialized to storage. Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 6, 2019 BTW, OmniThreadLibrary's TOmniBoundedQueue was implemented in 2008. Then we needed about a year to find few weird boundary conditions where the code would crash and fix them.Ā Ā After that the queue was working perfectly until 2018, when I found a race condition which caused the queue to say "sorry, can't add an item, the queue is full", when the queue has actually just become empty. Nobody noticed the problem in the meantime, we all thought the code is perfect, and the unit/stress test did not test for this specific scenario ... Ā Writing lock-free structures is harder than you think. (Applies to me, too.) 1 1 Share this post Link to post
Rudy Velthuis 91 Posted March 6, 2019 1 minute ago, Primož GabrijelÄiÄ said: BTW, OmniThreadLibrary's TOmniBoundedQueue was implemented in 2008. Then we needed about a year to find few weird boundary conditions where the code would crash and fix them.Ā Ā After that the queue was working perfectly until 2018, when I found a race condition which caused the queue to say "sorry, can't add an item, the queue is full", when the queue has actually just become empty. Nobody noticed the problem in the meantime, we all thought the code is perfect, and the unit/stress test did not test for this specific scenario ... Ā Writing lock-free structures is harder than you think. (Applies to me, too.) Obviously testing thread-related code is hard too. Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 6, 2019 (edited) 1 hour ago, Rudy Velthuis said: Obviously testing thread-related code is hard too. Indeed. Ā In single-threaded code, good unit tests will show that the code obviouslyĀ has no errors. In multi-threaded code, good unit tests will show that the code has no obvious errors. Ā IOW, unit tests can only prove that your multi-threaded code doesn't work, not that it does. Edited March 6, 2019 by Primož GabrijelÄiÄ 2 Share this post Link to post
M.Joos 30 Posted March 6, 2019 9 minutes ago, Primož GabrijelÄiÄ said: IOW, unit tests can only prove that your multi-threaded code doesn't work, not that it does. I would argue that the same is true for single threaded code as well. 2 Share this post Link to post
Rudy Velthuis 91 Posted March 6, 2019 (edited) 2 hours ago, Primož GabrijelÄiÄ said: In single-threaded code, good unit tests will show that the code obviouslyĀ has no errors. In multi-threaded code, good unit tests will show that the code has no obvious errors. ļ»æ Aw come one, you got that somewhere else. <g> Ā Ah. here it is: " There are two ways of constructing a software design: One way is to make it so simple that there are obviously no deficiencies, and the other way is to make it so complicated that there are no obvious deficiencies. The first method is far more difficult." -- C.A.R. Hoare Ā And: "Program testing can be used to show the presence of bugs, but never to show their absence!" -- Edsger W. Dijkstra. Ā After all, I still have this quotes file I use for XanaNews. <g> Edited March 6, 2019 by Rudy Velthuis 1 Share this post Link to post
Primož GabrijelÄiÄ 223 Posted March 6, 2019 Indeed, I got the "obvious" part from Hoare š Ā The parallel part is all mine though. And I stand behind it. Ā If one writes a unit test which tests the specification in full, it can in theory prove that a single-threaded unit complies with the specification. Ā Throwing parallel processing in the mix, however, prevents us from even that result. At most one can say is that the unit probably works as expected inside one test environment on one computer. 1 Share this post Link to post