Jump to content
pyscripter

Revisiting TThreadedQueue and TMonitor

Recommended Posts

@Darian Millerhas published a very nice article about the state of TThreadedQueue and TMonitor in Delphi.  He has also published at Github a stress test that shows how TThreadQueue still fails under stress.

 

I have played with his stress test and concluded that the problem is almost certainly in TMonitor. TMonitor implements a lock-free stack to recycle events created with the CreateEvent function.   The relevant code in SysUtils is

 

var
  EventCache: PEventItemHolder;
  EventItemHolders: PEventItemHolder;

procedure Push(var Stack: PEventItemHolder; EventItem: PEventItemHolder);
var
  LStack: PEventItemHolder;
begin
  repeat
    LStack := Stack;
    EventItem.Next := LStack;
  until AtomicCmpExchange(Pointer(Stack), EventItem, LStack) = LStack;
end;

function Pop(var Stack: PEventItemHolder): PEventItemHolder;
begin
  repeat
    Result := Stack;
    if Result = nil then
      Exit;
  until AtomicCmpExchange(Pointer(Stack), Result.Next, Result) = Result;
end;

This lock-free stack is used by NewWaitObj and FreeWaitObj which are part of the Monitor support protocol and used by TMonitor.   This works reasonably well, but under stress it fails.  The reason it fails is known as the ABA problem and is discussed in a similar context by a series of excellent blog posts by @Primož Gabrijelčičblog post 1, blog post 2, blog post 3.

 

His OmniThreadLibrary contains the following routine that he uses to deal with this problem.

/either 8-byte or 16-byte CAS, depending on the platform; destination must be propely aligned (8- or 16-byte)
function CAS(const oldData: pointer; oldReference: NativeInt; newData: pointer;
  newReference: NativeInt; var destination): boolean;
asm
{$IFNDEF CPUX64}
  push  edi
  push  ebx
  mov   ebx, newData
  mov   ecx, newReference
  mov   edi, destination
  lock cmpxchg8b qword ptr [edi]
  pop   ebx
  pop   edi
{$ELSE CPUX64}
  .noframe
  push  rbx                     //rsp := rsp - 8 !
  mov   rax, oldData
  mov   rbx, newData
  mov   rcx, newReference
  mov   r8, [destination + 8]   //+8 with respect to .noframe
  lock cmpxchg16b [r8]
  pop   rbx
{$ENDIF CPUX64}
  setz  al
end; { CAS }

I have tried to use this function to provide a solution for TMonitor similar to the one in OmniThreadLibrary.  (see attached iaStressTest.TThreadedQueue.PopItem that can be used with the original stress test).  Whilst still  not perfect it helps a lot in 32 bits with say up to 100 threads.  However it crashes in 64bits and I do not know why.  I am posting this here in case anyone with better knowledge than mine of assembler and thread programming can help with the challenge of fixing TMonitor.  It would be nice to try and get a fix included in 10.4.  And even if it is not included, it can be easily used as a patch in the same way as in the attached code.

 

 

iaStressTest.TThreadedQueue.PopItem.pas

Edited by pyscripter
  • Like 3

Share this post


Link to post

Not sure it's relevant but that x64 should use .PUSHNV to preserve rbx. And I think that also makes .NOFRAME incorrect. 

  • Thanks 1

Share this post


Link to post
12 hours ago, pyscripter said:

Whilst still  not perfect it helps a lot in 32 bits with say up to 100 threads.

You can't be doing multithreaded programming where "whilst still not perfect" as a valid statement. It's got to be right.

Share this post


Link to post
7 minutes ago, David Heffernan said:

You can't be doing multithreaded programming where "whilst still not perfect" as a valid statement. It's got to be right.

Absolutely right.  This is why I am passing the challenge to people like you, with much deeper knowledge than mine.  I would very much hope that @Primož Gabrijelčičfor instance, has a go at providing  a solution, since he has faced the very same issue in OmniThreadLibrary.

Edited by pyscripter

Share this post


Link to post
15 hours ago, pyscripter said:

 The reason it fails is known as the ABA problem

Ouch!

One would have thought they knew not to write code like that.

 

As far as I can tell it can be solved "simply" by changing the head of the two stacks from a pointer to record containing the pointer and a counter:

type
  TEventStack = record
    Counter: int64;
    Head: PEventItemHolder;
  end;
  
var
  EventCache: TEventStack;
  ...
  EventItemHolders: TEventStack;

...and then change the Push and Pop functions to increment the transaction counter on each operation. Finally we just need a version of AtomicCmpExchange that operates on 128 bits (e.g. the 16-byte CAS mentioned above) to update the transaction Counter and the Head pointer in one go. I believe this will take care of the ABA problem.

Share this post


Link to post
33 minutes ago, Anders Melander said:

As far as I can tell it can be solved "simply" by changing the head of the two stacks from a pointer to record containing the pointer and a counter:

If you have a look at the posted file, this is what I tried to do.   Please have a go if you have the time.

Share this post


Link to post
19 minutes ago, pyscripter said:

If you have a look at the posted file, this is what I tried to do.

Oh. I generally don't bother with attached or linked files.

However from a brief glance at your code this looks fishy:

  until (AtomicCmpExchange(Stack.Spin, Spin + 1, Spin) = Spin) and
    CAS(Result, Spin + 1, Result.Next, Spin + 2, Stack);

I would think you'd need to test and update both Spin and Next in one go. That's why you need the 128 bit CAS.

I don't see the need for both the AtomicCompareExchange and the CAS.

 

You also need to modify the Push functions since it has the same problems as the Pop.

 

I might give it a go this week-end (if the weather is bad 🙂) but I suspect someone else might beat me to it.

Share this post


Link to post
16 minutes ago, Anders Melander said:

if the weather is bad

Then I hope the weather will be bad 😀

Share this post


Link to post

It's been raining so here you go (completely untested):

type
  TEventStack = record
    Counter: int64;
    Head: PEventItemHolder;

    procedure Push(EventItem: PEventItemHolder);
    function Pop: PEventItemHolder;
  end;

var
  EventCache: TEventStack;
  ...
  EventItemHolders: TEventStack;

procedure TEventStack.Push(EventItem: PEventItemHolder);
var
  Current, Next: TEventStack;
begin
  repeat
    // We don't need to copy atomically since the test below will detect tearing
    // but since the members should be aligned tearing should not occur anyway.
    Current := Self;

    EventItem.Next := Current.Head;
    Next.Head := EventItem;
    Next.Counter := Current.Counter + 1;
    // I'm assuming TInterlocked.CompareExchange returns a boolean: True on success.
  until TInterlocked.CompareExchange(Self, Next, Current);
end;

function TEventStack.Pop: PEventItemHolder;
var
  Current, Next: TEventStack;
begin
  repeat
    Current := Self;
    if (Current.Head = nil) then
      Exit(nil);
    Next.Head := Current.Head.Next;
    Next.Counter := Current.Counter + 1;
  until TInterlocked.CompareExchange(Self, Next, Current);
  Result := Current.Head;
end;

I've made the two functions members of the record to get rid of the first parameter.

What you (or someone else) need to do is provide an implementation of TInterlocked.CompareExchangeAtomicCmpExchange or CAS that handles 16 bytes and returns a boolean indicating success.

  • Thanks 1

Share this post


Link to post
15 hours ago, Anders Melander said:

What you (or someone else) need to do is provide an implementation of TInterlocked.CompareExchangeAtomicCmpExchange or CAS that handles 16 bytes and returns a boolean indicating success.

function InterlockedCompareExchage(var Dest: TEventStack; NewValue, CurrentValue: TEventStack): Boolean;
begin
  {$IFDEF CPUX64}
  Result := InterlockedCompareExchange128(@Dest, Int64(NewValue.Head), NewValue.Counter,  @CurrentValue);
  {$ELSE CPUX64}
  Result := InterlockedCompareExchange64(Int64(Dest), Int64(NewValue), Int64(CurrentValue)) = Int64(CurrentValue);
  {$ENDIF CPUX64}
end;

I have tried the above.  Works well in 32 bits, but crashes in 64bits.

The implementation of InterlockedCompareExchange128 in WinApi.Windows.pas is: 

function InterlockedCompareExchange128(Destination: Pointer; ExchangeHigh, ExchangeLow: Int64; ComparandResult: Pointer): Bool; stdcall;
asm
      MOV   R10,RCX
      MOV   RBX,R8
      MOV   RCX,RDX
      MOV   RDX,[R9+8]
      MOV   RAX,[R9]
 LOCK CMPXCHG16B [R10]
      SETZ  AL
end;

 

Share this post


Link to post

I can't see where RBX is restored in that InterlockedCompareExchange128 code. It's a NV register.  Function needs to start with .PUSHNV RBX

 

I can't vouch for the rest of what it does.

Edited by David Heffernan

Share this post


Link to post
2 hours ago, pyscripter said:

function InterlockedCompareExchage(var Dest: TEventStack; NewValue, CurrentValue: TEventStack😞 Boolean;

You're missing an "n" in Exchange and the NewValue and CurrentValue parameters should be declared as const or [ref].

Share this post


Link to post

On my Windows 10 system, the Win32 version of your code fails after 6 seconds with about 130 threads created before PopItem throws a premature timeout.   (10.3.3)

 

Share this post


Link to post
1 hour ago, pyscripter said:

I am attaching a new version of the test code

The test is IMO doing too much so it's hard to tell where the problem originates.

Since we've been focusing on the TEventItemHolder stack I would start by verifying that the original code fails (although it's obvious that is has the ABA problem) and the new code doesn't fail.

Then I would validate TMonitor.Wait and only then would I validate TThreadedQueue.PopItem.

Share this post


Link to post
5 hours ago, Darian Miller said:

On my Windows 10 system, the Win32 version of your code fails after 6 seconds with about 130 threads created before PopItem throws a premature timeout.   (10.3.3)

 

Over here win 32 300 threads 10 minutes:

2020-05-17 01.01.08: StressTestPopItem Start: Waiting up to [600] seconds for PopItem to prematurely timeout.
2020-05-17 01.01.08: Note: Using [10] as PopTimeout on TThreadedQueue creation
2020-05-17 01.01.09: TThreadCreator Start: Creating up to [300] threads
2020-05-17 01.01.09: Note: Creating threads with a StackSize of [65536]
2020-05-17 01.01.09: TThreadCreator End: [300] worker threads created
...
2020-05-17 01.11.05: New Wait objects created: 14800000
2020-05-17 01.11.08: StressTestPopItem End: Overall maximum time limit reached for this test without an error detected...we will call this a success!
2020-05-17 01.11.08: Hit <enter> to exit

NewWaitObj was called 15 million times.

 

and Win32 2000 threads 30 seconds.

2020-05-17 01.30.28: StressTestPopItem Start: Waiting up to [30] seconds for PopItem to prematurely timeout.
2020-05-17 01.30.28: Note: Using [10] as PopTimeout on TThreadedQueue creation
2020-05-17 01.30.29: TThreadCreator Start: Creating up to [2000] threads
2020-05-17 01.30.29: Note: Creating threads with a StackSize of [65536]
2020-05-17 01.30.29: TThreadCreator End: [2000] worker threads created
...
2020-05-17 01.30.58: New Wait objects created: 5000000
2020-05-17 01.30.58: StressTestPopItem End: Overall maximum time limit reached for this test without an error detected...we will call this a success!
2020-05-17 01.30.58: Hit <enter> to exit

NewWaitObj was called 5 million times in 30 seconds.

 

Sometimes with many threads say more than 300 I get an early failure before all threads are created and I am not sure why.  Once all the threads are created I get no errors.  If you add Sleep(1000) at the top of  TTestThread.Execute() to give it time for all the threads to be created these startup errors are avoided.

Edited by pyscripter

Share this post


Link to post
11 hours ago, David Heffernan said:

I can't see where RBX is restored in that InterlockedCompareExchange128 code. It's a NV register.  Function needs to start with .PUSHNV RBX

 

I can't vouch for the rest of what it does.

For reference this is from fpc.

    function InterlockedCompareExchange128(var Target: Int128Rec; NewValue: Int128Rec; Comperand: Int128Rec): Int128Rec; assembler;
     {
        win64:
          rcx ... pointer to result
          rdx ... target
          r8  ... NewValue
          r9  ... Comperand
      }
    {$ifdef win64}
      asm
        pushq %rbx

        { store result pointer for later use }
        pushq %rcx

        { load new value }
        movq (%r8),%rbx
        movq 8(%r8),%rcx

        { save target pointer for later use }
        movq %rdx,%r8

        { load comperand }
        movq (%r9),%rax
        movq 8(%r9),%rdx

        {$ifdef oldbinutils}
           .byte 0xF0,0x49,0x0F,0xC7,0x08
        {$else}
        lock cmpxchg16b (%r8)
        {$endif}
        { restore result pointer }
        popq %rcx

        { store result }
        movq %rax,(%rcx)
        movq %rdx,8(%rcx)

        popq %rbx
      end;

Any chance of anyone coming up with a working InterlockedCompareExchange128 for Delphi?

Share this post


Link to post

Here's a simple test of the queue only. I have only shown the test of the old code. The test of the new is similar.

 

Old stack code fails after a short while [*] on 32-bit and 64 bit with an invalid pointer operation on freeing an item popped from the stack (a short while = ~4 seconds, approximately 1 million stack operations by 100 threads on a 4 core system).

The reason it fails so quickly is that I have tried to limit the calls to the memory manager as that is a bottleneck which tends to serialize the operations a bit.

 

New stack code succeeds on 32 bit but fails immediately on 64-bit with an AV on the LOCK CMPXCHG16B [R10] in InterlockedCompareExchange128.

 

uses
  System.SyncObjs;

const
  THREAD_COUNT = 100;
  ITERATIONS = 1000000;

var
  Threads: array[0..THREAD_COUNT-1] of TThread;

type
  PEventItemHolder = ^TEventItemHolder;
  TEventItemHolder = record
    Next: PEventItemHolder;
    Event: Pointer;
  end;

  TSyncEventItem = record
    Lock: Integer;
    Event: Pointer;
  end;

procedure Push(var Stack: PEventItemHolder; EventItem: PEventItemHolder);
var
  LStack: PEventItemHolder;
begin
  repeat
    LStack := Stack;
    EventItem.Next := LStack;
  until AtomicCmpExchange(Pointer(Stack), EventItem, LStack) = LStack;
end;

function Pop(var Stack: PEventItemHolder): PEventItemHolder;
begin
  repeat
    Result := Stack;
    if Result = nil then
      Exit;
  until AtomicCmpExchange(Pointer(Stack), Result.Next, Result) = Result;
end;

type
  TPushPopThread = class(TThread)
  private
    class var FEventCache: PEventItemHolder;
    class var FStackCount: integer;
    class var FReadyCount: integer;
    class var FLiveCount: integer;
    class var FOperations: int64;
    class var FFailure: boolean;
  private
    FEvent: TEvent;
  protected
    procedure Execute; override;
  public
    constructor Create(AEvent: TEvent);
    class property StackCount: integer read FStackCount;
    class property ReadyCount: integer read FReadyCount;
    class property LiveCount: integer read FLiveCount;
    class property Failure: boolean read FFailure;
    class property Operations: int64 read FOperations;
  end;

constructor TPushPopThread.Create(AEvent: TEvent);
begin
  inherited Create;
  FEvent := AEvent;
end;

procedure TPushPopThread.Execute;
var
  Item, PoppedItem: PEventItemHolder;
begin
  TInterlocked.Increment(FReadyCount);
  try

    FEvent.WaitFor(INFINITE);

    TInterlocked.Increment(FLiveCount);
    try
      try
        Item := nil;
        try

          for var i := 0 to ITERATIONS-1 do
          begin
            if (Terminated) or (FFailure) then
              exit;

            if (Random(4) = 0) then // A lot more Pops that Pushes
            begin
              if (Item = nil) then
                New(Item);
              Item.Event := pointer(TInterlocked.Increment(FStackCount));
              Push(FEventCache, Item);
              Item := nil;
            end else
            begin
              PoppedItem := Pop(FEventCache);
              if (PoppedItem <> nil) then
              begin
                TInterlocked.Decrement(FStackCount);
                if (Item <> nil) then
                  FreeMem(Item);
                Item := PoppedItem;
              end;
            end;
            TInterlocked.Increment(FOperations);
          end;

          // Pop and free remaining items
          PoppedItem := Pop(FEventCache);
          while (PoppedItem <> nil) do
          begin
            TInterlocked.Decrement(FStackCount);
            FreeMem(PoppedItem);
            PoppedItem := Pop(FEventCache);
          end;

        finally
          if (Item <> nil) then
            FreeMem(Item);
        end;
      except
        FFailure := True;
        raise;
      end;
    finally
      TInterlocked.Decrement(FLiveCount);
    end;
  finally
    TInterlocked.Decrement(FReadyCount);
  end;
end;

procedure TForm11.ButtonOldStartClick(Sender: TObject);
begin
  var Event := TEvent.Create(nil, True, False, '');
  try

    for var i := 0 to THREAD_COUNT-1 do
      if (Threads[i] = nil) then
        Threads[i] := TPushPopThread.Create(Event);

    // Wait for all threads ready to execute
    while (TPushPopThread.ReadyCount < THREAD_COUNT) do
      Sleep(1);

    // Start all threads
    Event.SetEvent;

    // Wait for thread completion
    while (TPushPopThread.LiveCount > 0) do
      Sleep(1);

  finally
    Event.Free;
  end;

  if (TPushPopThread.Failure) then
    ShowMessage('Failed')
  else
  if (TPushPopThread.StackCount > 0) then
    ShowMessage('Stack lost items')
  else
    ShowMessage('Success');

  for var i := 0 to THREAD_COUNT-1 do
    if (Threads[i] <> nil) then
    begin
      Threads[i].WaitFor;
      FreeAndNil(Threads[i]);
    end;
end;

 

Share this post


Link to post
Guest

@pyscripter To save you time
1) The assembly is good and had nothing to be fixed.
2) The problem is in aligning ! both InterlockedCompareExchange64 and InterlockedCompareExchange128 must be aligned per documentation, i don't see any alignment in this code.
3) even in the original file there is this comment above the assembly part

Quote

//eighter 8-byte or 16-byte CAS, depending on the platform; destination must be propely aligned (8- or 16-byte)

4) when it is working on your side perfectly for long time, just try to compile with different IDE and will see the corruption or lets call it unpredicted behaviour, you can add/remove global varaible and they will miss the alignment too, they might miss alignement or return it by luck to be aligned, you might also simply change the order of the functions and it might start to woke fine or completly go wrong, that is big problem and very needed feature in Delphi compiler, which is force alignement per function per global varaible, i wish of there was something like that, as for LLVM there is, but Emabarcadero went ahead and removed/disabled that feature, instead evolve Delphi to produce such code with its compiler.

The feature that is very needed is something like this :
var
  XBY16 : array [0..31] of Byte; aligned 16;

or 
procedure AlignedProc; align 8;

Share this post


Link to post
Guest

@toms right, and if you notice it is used with pointers allocated by AllocateAlignedMemory

That function has long implemetation and can be shorter.

 

I use short and simple implementaion 

function TElBuiltInAESSymmetricCrypto.AllocateAligned16(
  var Buffer: ByteArray;Size:Integer): Pointer;
begin
  SetLength(Buffer,Size+$10);
  Result:=@Buffer[$10-NativeUInt(Buffer) and $F];
end;

procedure TElBuiltInAESSymmetricCrypto.AllocateMAlignedKeys;
var
  Offset:Cardinal;
begin
  FKey256:=PAESExpandedKey256(AllocateAligned16(FInternalKey256,SizeOf(TAESExpandedKey256)));

Buffer should kept beside the result of AllocateAligned16 and used to free the allocated memory by SetLength(BufferResult,0)

Edited by Guest

Share this post


Link to post

@Kas Ob.Thanks for the info about alignment.

 

Actually Delphi does offer support for alignment (undocumented feature see @Uwe Raabe response in this Stackoverflow question).  If you change the definition of TEventStack to

  TEventStack = record
    Head: PEventItemHolder;
    Counter: NativeInt;
    procedure Push(EventItem: PEventItemHolder);
    function Pop: PEventItemHolder;
  end align {$IFDEF CPUX64}16{$ELSE CPUX64}8{$ENDIF CPUX64};

There are not longer crashes.  The test still fails though and I am investigating why.

 

And by the way. There is also the  [Align(8)] attribute which works with fields, global variables and records it seems.

Edited by pyscripter

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

×