pyscripter 689 Posted May 14, 2020 (edited) @Darian Millerhas published a very nice article about the state of TThreadedQueue and TMonitor in Delphi. He has also published at Github a stress test that shows how TThreadQueue still fails under stress. I have played with his stress test and concluded that the problem is almost certainly in TMonitor. TMonitor implements a lock-free stack to recycle events created with the CreateEvent function. The relevant code in SysUtils is var EventCache: PEventItemHolder; EventItemHolders: PEventItemHolder; procedure Push(var Stack: PEventItemHolder; EventItem: PEventItemHolder); var LStack: PEventItemHolder; begin repeat LStack := Stack; EventItem.Next := LStack; until AtomicCmpExchange(Pointer(Stack), EventItem, LStack) = LStack; end; function Pop(var Stack: PEventItemHolder): PEventItemHolder; begin repeat Result := Stack; if Result = nil then Exit; until AtomicCmpExchange(Pointer(Stack), Result.Next, Result) = Result; end; This lock-free stack is used by NewWaitObj and FreeWaitObj which are part of the Monitor support protocol and used by TMonitor. This works reasonably well, but under stress it fails. The reason it fails is known as the ABA problem and is discussed in a similar context by a series of excellent blog posts by @Primož Gabrijelčič: blog post 1, blog post 2, blog post 3. His OmniThreadLibrary contains the following routine that he uses to deal with this problem. /either 8-byte or 16-byte CAS, depending on the platform; destination must be propely aligned (8- or 16-byte) function CAS(const oldData: pointer; oldReference: NativeInt; newData: pointer; newReference: NativeInt; var destination): boolean; asm {$IFNDEF CPUX64} push edi push ebx mov ebx, newData mov ecx, newReference mov edi, destination lock cmpxchg8b qword ptr [edi] pop ebx pop edi {$ELSE CPUX64} .noframe push rbx //rsp := rsp - 8 ! mov rax, oldData mov rbx, newData mov rcx, newReference mov r8, [destination + 8] //+8 with respect to .noframe lock cmpxchg16b [r8] pop rbx {$ENDIF CPUX64} setz al end; { CAS } I have tried to use this function to provide a solution for TMonitor similar to the one in OmniThreadLibrary. (see attached iaStressTest.TThreadedQueue.PopItem that can be used with the original stress test). Whilst still not perfect it helps a lot in 32 bits with say up to 100 threads. However it crashes in 64bits and I do not know why. I am posting this here in case anyone with better knowledge than mine of assembler and thread programming can help with the challenge of fixing TMonitor. It would be nice to try and get a fix included in 10.4. And even if it is not included, it can be easily used as a patch in the same way as in the attached code. iaStressTest.TThreadedQueue.PopItem.pas Edited May 14, 2020 by pyscripter 3 Share this post Link to post
David Heffernan 2345 Posted May 14, 2020 Not sure it's relevant but that x64 should use .PUSHNV to preserve rbx. And I think that also makes .NOFRAME incorrect. 1 Share this post Link to post
David Heffernan 2345 Posted May 15, 2020 12 hours ago, pyscripter said: Whilst still not perfect it helps a lot in 32 bits with say up to 100 threads. You can't be doing multithreaded programming where "whilst still not perfect" as a valid statement. It's got to be right. Share this post Link to post
pyscripter 689 Posted May 15, 2020 (edited) 7 minutes ago, David Heffernan said: You can't be doing multithreaded programming where "whilst still not perfect" as a valid statement. It's got to be right. Absolutely right. This is why I am passing the challenge to people like you, with much deeper knowledge than mine. I would very much hope that @Primož Gabrijelčičfor instance, has a go at providing a solution, since he has faced the very same issue in OmniThreadLibrary. Edited May 15, 2020 by pyscripter Share this post Link to post
Leif Uneus 43 Posted May 15, 2020 A little reference article to the CAS and ABA problem. http://ithare.com/cas-reactor-for-non-blocking-multithreaded-primitives/ Share this post Link to post
Anders Melander 1782 Posted May 15, 2020 15 hours ago, pyscripter said: The reason it fails is known as the ABA problem Ouch! One would have thought they knew not to write code like that. As far as I can tell it can be solved "simply" by changing the head of the two stacks from a pointer to record containing the pointer and a counter: type TEventStack = record Counter: int64; Head: PEventItemHolder; end; var EventCache: TEventStack; ... EventItemHolders: TEventStack; ...and then change the Push and Pop functions to increment the transaction counter on each operation. Finally we just need a version of AtomicCmpExchange that operates on 128 bits (e.g. the 16-byte CAS mentioned above) to update the transaction Counter and the Head pointer in one go. I believe this will take care of the ABA problem. Share this post Link to post
pyscripter 689 Posted May 15, 2020 33 minutes ago, Anders Melander said: As far as I can tell it can be solved "simply" by changing the head of the two stacks from a pointer to record containing the pointer and a counter: If you have a look at the posted file, this is what I tried to do. Please have a go if you have the time. Share this post Link to post
Anders Melander 1782 Posted May 15, 2020 19 minutes ago, pyscripter said: If you have a look at the posted file, this is what I tried to do. Oh. I generally don't bother with attached or linked files. However from a brief glance at your code this looks fishy: until (AtomicCmpExchange(Stack.Spin, Spin + 1, Spin) = Spin) and CAS(Result, Spin + 1, Result.Next, Spin + 2, Stack); I would think you'd need to test and update both Spin and Next in one go. That's why you need the 128 bit CAS. I don't see the need for both the AtomicCompareExchange and the CAS. You also need to modify the Push functions since it has the same problems as the Pop. I might give it a go this week-end (if the weather is bad 🙂) but I suspect someone else might beat me to it. Share this post Link to post
pyscripter 689 Posted May 15, 2020 16 minutes ago, Anders Melander said: if the weather is bad Then I hope the weather will be bad 😀 Share this post Link to post
Anders Melander 1782 Posted May 15, 2020 It's been raining so here you go (completely untested): type TEventStack = record Counter: int64; Head: PEventItemHolder; procedure Push(EventItem: PEventItemHolder); function Pop: PEventItemHolder; end; var EventCache: TEventStack; ... EventItemHolders: TEventStack; procedure TEventStack.Push(EventItem: PEventItemHolder); var Current, Next: TEventStack; begin repeat // We don't need to copy atomically since the test below will detect tearing // but since the members should be aligned tearing should not occur anyway. Current := Self; EventItem.Next := Current.Head; Next.Head := EventItem; Next.Counter := Current.Counter + 1; // I'm assuming TInterlocked.CompareExchange returns a boolean: True on success. until TInterlocked.CompareExchange(Self, Next, Current); end; function TEventStack.Pop: PEventItemHolder; var Current, Next: TEventStack; begin repeat Current := Self; if (Current.Head = nil) then Exit(nil); Next.Head := Current.Head.Next; Next.Counter := Current.Counter + 1; until TInterlocked.CompareExchange(Self, Next, Current); Result := Current.Head; end; I've made the two functions members of the record to get rid of the first parameter. What you (or someone else) need to do is provide an implementation of TInterlocked.CompareExchange, AtomicCmpExchange or CAS that handles 16 bytes and returns a boolean indicating success. 1 Share this post Link to post
Uwe Raabe 2057 Posted May 15, 2020 For Windows 8+ there is InterlockedCompareExchange128 Share this post Link to post
Anders Melander 1782 Posted May 15, 2020 1 hour ago, Uwe Raabe said: For Windows 8+ there is InterlockedCompareExchange128 Quote This function is [...] implemented using a compiler intrinsic. Share this post Link to post
pyscripter 689 Posted May 16, 2020 15 hours ago, Anders Melander said: What you (or someone else) need to do is provide an implementation of TInterlocked.CompareExchange, AtomicCmpExchange or CAS that handles 16 bytes and returns a boolean indicating success. function InterlockedCompareExchage(var Dest: TEventStack; NewValue, CurrentValue: TEventStack): Boolean; begin {$IFDEF CPUX64} Result := InterlockedCompareExchange128(@Dest, Int64(NewValue.Head), NewValue.Counter, @CurrentValue); {$ELSE CPUX64} Result := InterlockedCompareExchange64(Int64(Dest), Int64(NewValue), Int64(CurrentValue)) = Int64(CurrentValue); {$ENDIF CPUX64} end; I have tried the above. Works well in 32 bits, but crashes in 64bits. The implementation of InterlockedCompareExchange128 in WinApi.Windows.pas is: function InterlockedCompareExchange128(Destination: Pointer; ExchangeHigh, ExchangeLow: Int64; ComparandResult: Pointer): Bool; stdcall; asm MOV R10,RCX MOV RBX,R8 MOV RCX,RDX MOV RDX,[R9+8] MOV RAX,[R9] LOCK CMPXCHG16B [R10] SETZ AL end; Share this post Link to post
David Heffernan 2345 Posted May 16, 2020 (edited) I can't see where RBX is restored in that InterlockedCompareExchange128 code. It's a NV register. Function needs to start with .PUSHNV RBX I can't vouch for the rest of what it does. Edited May 16, 2020 by David Heffernan Share this post Link to post
Anders Melander 1782 Posted May 16, 2020 2 hours ago, pyscripter said: function InterlockedCompareExchage(var Dest: TEventStack; NewValue, CurrentValue: TEventStack😞 Boolean; You're missing an "n" in Exchange and the NewValue and CurrentValue parameters should be declared as const or [ref]. Share this post Link to post
pyscripter 689 Posted May 16, 2020 I am attaching a new version of the test code incorporating the suggestions of @Anders Melander in case anyone wants to try see attached iaStressTest.TThreadedQueue.PopItem that can be used with the original stress test of @Darian Miller). iaStressTest.TThreadedQueue.PopItem.pas 1 Share this post Link to post
Darian Miller 361 Posted May 16, 2020 On my Windows 10 system, the Win32 version of your code fails after 6 seconds with about 130 threads created before PopItem throws a premature timeout. (10.3.3) Share this post Link to post
Anders Melander 1782 Posted May 16, 2020 1 hour ago, pyscripter said: I am attaching a new version of the test code The test is IMO doing too much so it's hard to tell where the problem originates. Since we've been focusing on the TEventItemHolder stack I would start by verifying that the original code fails (although it's obvious that is has the ABA problem) and the new code doesn't fail. Then I would validate TMonitor.Wait and only then would I validate TThreadedQueue.PopItem. Share this post Link to post
pyscripter 689 Posted May 16, 2020 (edited) 5 hours ago, Darian Miller said: On my Windows 10 system, the Win32 version of your code fails after 6 seconds with about 130 threads created before PopItem throws a premature timeout. (10.3.3) Over here win 32 300 threads 10 minutes: 2020-05-17 01.01.08: StressTestPopItem Start: Waiting up to [600] seconds for PopItem to prematurely timeout. 2020-05-17 01.01.08: Note: Using [10] as PopTimeout on TThreadedQueue creation 2020-05-17 01.01.09: TThreadCreator Start: Creating up to [300] threads 2020-05-17 01.01.09: Note: Creating threads with a StackSize of [65536] 2020-05-17 01.01.09: TThreadCreator End: [300] worker threads created ... 2020-05-17 01.11.05: New Wait objects created: 14800000 2020-05-17 01.11.08: StressTestPopItem End: Overall maximum time limit reached for this test without an error detected...we will call this a success! 2020-05-17 01.11.08: Hit <enter> to exit NewWaitObj was called 15 million times. and Win32 2000 threads 30 seconds. 2020-05-17 01.30.28: StressTestPopItem Start: Waiting up to [30] seconds for PopItem to prematurely timeout. 2020-05-17 01.30.28: Note: Using [10] as PopTimeout on TThreadedQueue creation 2020-05-17 01.30.29: TThreadCreator Start: Creating up to [2000] threads 2020-05-17 01.30.29: Note: Creating threads with a StackSize of [65536] 2020-05-17 01.30.29: TThreadCreator End: [2000] worker threads created ... 2020-05-17 01.30.58: New Wait objects created: 5000000 2020-05-17 01.30.58: StressTestPopItem End: Overall maximum time limit reached for this test without an error detected...we will call this a success! 2020-05-17 01.30.58: Hit <enter> to exit NewWaitObj was called 5 million times in 30 seconds. Sometimes with many threads say more than 300 I get an early failure before all threads are created and I am not sure why. Once all the threads are created I get no errors. If you add Sleep(1000) at the top of TTestThread.Execute() to give it time for all the threads to be created these startup errors are avoided. Edited May 17, 2020 by pyscripter Share this post Link to post
pyscripter 689 Posted May 16, 2020 11 hours ago, David Heffernan said: I can't see where RBX is restored in that InterlockedCompareExchange128 code. It's a NV register. Function needs to start with .PUSHNV RBX I can't vouch for the rest of what it does. For reference this is from fpc. function InterlockedCompareExchange128(var Target: Int128Rec; NewValue: Int128Rec; Comperand: Int128Rec): Int128Rec; assembler; { win64: rcx ... pointer to result rdx ... target r8 ... NewValue r9 ... Comperand } {$ifdef win64} asm pushq %rbx { store result pointer for later use } pushq %rcx { load new value } movq (%r8),%rbx movq 8(%r8),%rcx { save target pointer for later use } movq %rdx,%r8 { load comperand } movq (%r9),%rax movq 8(%r9),%rdx {$ifdef oldbinutils} .byte 0xF0,0x49,0x0F,0xC7,0x08 {$else} lock cmpxchg16b (%r8) {$endif} { restore result pointer } popq %rcx { store result } movq %rax,(%rcx) movq %rdx,8(%rcx) popq %rbx end; Any chance of anyone coming up with a working InterlockedCompareExchange128 for Delphi? Share this post Link to post
Anders Melander 1782 Posted May 16, 2020 Here's a simple test of the queue only. I have only shown the test of the old code. The test of the new is similar. Old stack code fails after a short while [*] on 32-bit and 64 bit with an invalid pointer operation on freeing an item popped from the stack (a short while = ~4 seconds, approximately 1 million stack operations by 100 threads on a 4 core system). The reason it fails so quickly is that I have tried to limit the calls to the memory manager as that is a bottleneck which tends to serialize the operations a bit. New stack code succeeds on 32 bit but fails immediately on 64-bit with an AV on the LOCK CMPXCHG16B [R10] in InterlockedCompareExchange128. uses System.SyncObjs; const THREAD_COUNT = 100; ITERATIONS = 1000000; var Threads: array[0..THREAD_COUNT-1] of TThread; type PEventItemHolder = ^TEventItemHolder; TEventItemHolder = record Next: PEventItemHolder; Event: Pointer; end; TSyncEventItem = record Lock: Integer; Event: Pointer; end; procedure Push(var Stack: PEventItemHolder; EventItem: PEventItemHolder); var LStack: PEventItemHolder; begin repeat LStack := Stack; EventItem.Next := LStack; until AtomicCmpExchange(Pointer(Stack), EventItem, LStack) = LStack; end; function Pop(var Stack: PEventItemHolder): PEventItemHolder; begin repeat Result := Stack; if Result = nil then Exit; until AtomicCmpExchange(Pointer(Stack), Result.Next, Result) = Result; end; type TPushPopThread = class(TThread) private class var FEventCache: PEventItemHolder; class var FStackCount: integer; class var FReadyCount: integer; class var FLiveCount: integer; class var FOperations: int64; class var FFailure: boolean; private FEvent: TEvent; protected procedure Execute; override; public constructor Create(AEvent: TEvent); class property StackCount: integer read FStackCount; class property ReadyCount: integer read FReadyCount; class property LiveCount: integer read FLiveCount; class property Failure: boolean read FFailure; class property Operations: int64 read FOperations; end; constructor TPushPopThread.Create(AEvent: TEvent); begin inherited Create; FEvent := AEvent; end; procedure TPushPopThread.Execute; var Item, PoppedItem: PEventItemHolder; begin TInterlocked.Increment(FReadyCount); try FEvent.WaitFor(INFINITE); TInterlocked.Increment(FLiveCount); try try Item := nil; try for var i := 0 to ITERATIONS-1 do begin if (Terminated) or (FFailure) then exit; if (Random(4) = 0) then // A lot more Pops that Pushes begin if (Item = nil) then New(Item); Item.Event := pointer(TInterlocked.Increment(FStackCount)); Push(FEventCache, Item); Item := nil; end else begin PoppedItem := Pop(FEventCache); if (PoppedItem <> nil) then begin TInterlocked.Decrement(FStackCount); if (Item <> nil) then FreeMem(Item); Item := PoppedItem; end; end; TInterlocked.Increment(FOperations); end; // Pop and free remaining items PoppedItem := Pop(FEventCache); while (PoppedItem <> nil) do begin TInterlocked.Decrement(FStackCount); FreeMem(PoppedItem); PoppedItem := Pop(FEventCache); end; finally if (Item <> nil) then FreeMem(Item); end; except FFailure := True; raise; end; finally TInterlocked.Decrement(FLiveCount); end; finally TInterlocked.Decrement(FReadyCount); end; end; procedure TForm11.ButtonOldStartClick(Sender: TObject); begin var Event := TEvent.Create(nil, True, False, ''); try for var i := 0 to THREAD_COUNT-1 do if (Threads[i] = nil) then Threads[i] := TPushPopThread.Create(Event); // Wait for all threads ready to execute while (TPushPopThread.ReadyCount < THREAD_COUNT) do Sleep(1); // Start all threads Event.SetEvent; // Wait for thread completion while (TPushPopThread.LiveCount > 0) do Sleep(1); finally Event.Free; end; if (TPushPopThread.Failure) then ShowMessage('Failed') else if (TPushPopThread.StackCount > 0) then ShowMessage('Stack lost items') else ShowMessage('Success'); for var i := 0 to THREAD_COUNT-1 do if (Threads[i] <> nil) then begin Threads[i].WaitFor; FreeAndNil(Threads[i]); end; end; Share this post Link to post
Guest Posted May 16, 2020 @pyscripter To save you time : 1) The assembly is good and had nothing to be fixed. 2) The problem is in aligning ! both InterlockedCompareExchange64 and InterlockedCompareExchange128 must be aligned per documentation, i don't see any alignment in this code. 3) even in the original file there is this comment above the assembly part Quote //eighter 8-byte or 16-byte CAS, depending on the platform; destination must be propely aligned (8- or 16-byte) 4) when it is working on your side perfectly for long time, just try to compile with different IDE and will see the corruption or lets call it unpredicted behaviour, you can add/remove global varaible and they will miss the alignment too, they might miss alignement or return it by luck to be aligned, you might also simply change the order of the functions and it might start to woke fine or completly go wrong, that is big problem and very needed feature in Delphi compiler, which is force alignement per function per global varaible, i wish of there was something like that, as for LLVM there is, but Emabarcadero went ahead and removed/disabled that feature, instead evolve Delphi to produce such code with its compiler. The feature that is very needed is something like this : var XBY16 : array [0..31] of Byte; aligned 16; or procedure AlignedProc; align 8; Share this post Link to post
toms 29 Posted May 16, 2020 InterlockedCompareExchange128 for Delphi: https://github.com/Purik/AIO/blob/dc9f8d95c75aff993b3b7dcc679657bd228d7068/PasMP.pas#L2683 Share this post Link to post
Guest Posted May 16, 2020 (edited) @toms right, and if you notice it is used with pointers allocated by AllocateAlignedMemory That function has long implemetation and can be shorter. I use short and simple implementaion function TElBuiltInAESSymmetricCrypto.AllocateAligned16( var Buffer: ByteArray;Size:Integer): Pointer; begin SetLength(Buffer,Size+$10); Result:=@Buffer[$10-NativeUInt(Buffer) and $F]; end; procedure TElBuiltInAESSymmetricCrypto.AllocateMAlignedKeys; var Offset:Cardinal; begin FKey256:=PAESExpandedKey256(AllocateAligned16(FInternalKey256,SizeOf(TAESExpandedKey256))); Buffer should kept beside the result of AllocateAligned16 and used to free the allocated memory by SetLength(BufferResult,0) Edited May 16, 2020 by Guest Share this post Link to post
pyscripter 689 Posted May 17, 2020 (edited) @Kas Ob.Thanks for the info about alignment. Actually Delphi does offer support for alignment (undocumented feature see @Uwe Raabe response in this Stackoverflow question). If you change the definition of TEventStack to TEventStack = record Head: PEventItemHolder; Counter: NativeInt; procedure Push(EventItem: PEventItemHolder); function Pop: PEventItemHolder; end align {$IFDEF CPUX64}16{$ELSE CPUX64}8{$ENDIF CPUX64}; There are not longer crashes. The test still fails though and I am investigating why. And by the way. There is also the [Align(8)] attribute which works with fields, global variables and records it seems. Edited May 17, 2020 by pyscripter Share this post Link to post