Jump to content
kokoslolos

Lockfree approach on a Single reader, Multiple Writer queue

Recommended Posts

Hi all,

Ā 

For a system I'm currently designing and developing I have the need for a queue that N threads are going to add items and a single thread is going to consume them

I have created a class and tested it, it works well in my opinion and I thought to share it here for your opinion as well. Maybe I missed something and it's not actually thread safe.

The basic idea is a "ring" buffer and two indexes for read and write access. Also I took as granted that reading an integer is an atomic operation. Please keep in mind the code

is draft and definitely needs polishing and more checks.Ā  And here it is:

unit uMWSRList;

interface

uses Windows, Classes, SyncObjs, SysUtils;

const
  MAX_SIZE = 32;
  
type
  TMWSRList = class(TObject)
  private    
    FWriteIndex : Integer;
    FReadIndex : Integer;
    FAvailableCount : Integer;
    FItems : Array [0..MAX_SIZE-1] of Integer;
    FOnNewItemAdd : TNotifyEvent;
  public   
    constructor Create;
    function AddItem(pItem : Integer):Integer;
    function GetItem():Integer;
    function ItemsAvailable():Boolean;

    property OnNewItemAdd : TNotifyEvent read FOnNewItemAdd write FOnNewItemAdd;
  end;   

implementation

{ TMWSRList }
constructor TMWSRList.Create;
begin
  FWriteIndex := -1;
  FReadIndex := -1;
end;

function TMWSRList.AddItem(pItem : Integer):Integer;
var
  aOverFlow : Integer;
begin
  Result := InterlockedIncrement(FWriteIndex);

  if Result >= MAX_SIZE then
  begin
    aOverFlow := Result - MAX_SIZE;
	
    { Reset FWriteIndex when it reaches MAX_SIZE }
    InterlockedCompareExchange(FWriteIndex, -1, MAX_SIZE + aOverFlow);
    Result := InterlockedIncrement(FWriteIndex);
  end;

  FItems[Result] := pItem;
  InterlockedIncrement(FAvailableCount);

  if Assigned(FOnNewItemAdd) then  
    FOnNewItemAdd(self);
end;

function TMWSRList.GetItem():Integer;
begin
  if FAvailableCount > 0 then
  begin
    Inc(FReadIndex);
    
    if FReadIndex = MAX_SIZE then      
      FReadIndex := 0;
    
    Result := FItems[FReadIndex];
        
    InterlockedDecrement(FAvailableCount);
  end
  else Result := -1;
end;

function TMWSRList.ItemsAvailable():Boolean;
begin
  Result := FAvailableCount > 0;
end;

end.

Ā 

Regards.

Ā 

Share this post


Link to post

Definitely doesn't work correctly. For starters, two parallel readers would clash horribly.

Ā 

If you want to implement lock-free structures, start by writing thorough stress-tests. They should run different scenarios - one reader, few readers, many readersĀ ... very many readers (more than the number of cores in the system) - in a loop which runs for a long time (minutes, at least, preferably more).Ā Ā 

Ā 

If you want to use something that is well-tested, takeĀ TOmniBoundedQueueĀ from the OmniThreadLibrary.ļ»æ

  • Like 1

Share this post


Link to post

I know for sure that it doesn't work for more than one readers, my implementation is specific for single reader and I plan to add checks to make sure only one thread reads. If you see GetItem(), I intentionally used Inc() and not Interlocked version knowing that only one reader will exist. Having that in mind, Primož Gabrijelčič, do you think it's a good start?

Also thank you dummzeuch for your reply, will check your implementation.

  • Like 1

Share this post


Link to post
48 minutes ago, Primož Gabrijelčič said:

two parallel readers would clash horribly.

It explicitly says "single reader", so that's a non issue.

I agree with the rest of your post though.

Share this post


Link to post
19 minutes ago, dummzeuch said:

It explicitly says "single reader", so that's a non issue.ļ»æ

Ā 

Aaaaah, single READER! Sorry, folks. That is such a weird concept (although, I admit, useful in some situations) that my brain autocorrected it into single WRITER. šŸ™‚ šŸ™‚Ā Silly brain!

Share this post


Link to post
1 hour ago, dummzeuch said:

Just in case you are interested: Here is my implementation of a lock free MultiWriteSingeRead queue. Beware that I have never really tested it.

There's at least one race condition. Stopped reading once I spotted it:

Ā  Ā 
if InterlockedDecrement(FActiveWriters) = 0 then
  FActiveWritersZeroEvent.SetEvent;

Ā 

Share this post


Link to post

The writer also doesn't work.

Ā 

function TMWSRList.AddItem(pItem : Integer):Integer;
var
  aOverFlow : Integer;
begin
  Result := InterlockedIncrement(FWriteIndex);

  if Result >= MAX_SIZE then
  begin
    aOverFlow := Result - MAX_SIZE;
	
    { Reset FWriteIndex when it reaches MAX_SIZE }
    InterlockedCompareExchangeļ»æ(FWriteIndex, -1, MAX_SIZE + aOverFlow);
    Result := InterlockedIncrement(FWriteIndex);
  end;ļ»æ

  FItems[Result] := pItem;
  InterlockedIncrement(FAvailableCount);

  if Assigned(FOnNewItemAdd) then  
    FOnNewItemAdd(self);
end;
  1. FWriteIndex is at the end of array.
  2. Two threads call AddItem.
  3. Both threads increment FWriteIndex.
  4. Both threads enter `if`.
  5. Second thread executes InterlockedCompareExchange and InterlockedIncrement. That will skip slot 0 which will never be used, if I'm not mistaken.

Plus - what use is InterlockedCompareExchange if you never test if the exchange was successful?

Ā 

Plus - the code doesn't work if the queue overflows (reader stops but writers keep writing). But that is by design, I guess?

Ā 

Share this post


Link to post

If you can live with FIFO LIFO/FILO/stack semantics then you can use Windows SLists. They're lock free and can be used with multiple readers and writers.

Edited by Anders Melander
fixed FIFO
  • Like 2

Share this post


Link to post
24 minutes ago, Primož Gabrijelčič said:

The writer also doesn't work.

  1. FWriteIndex is at the end of array.
  2. Two threads call AddItem.
  3. Both threads increment FWriteIndex.
  4. Both threads enter `if`.
  5. Second thread executes InterlockedCompareExchange and InterlockedIncrement. That will skip slot 0 which will never be used, if I'm not mistaken.

Plus - what use is InterlockedCompareExchange if you never test if the exchange was successful?

Ā 

Plus - the code doesn't work if the queue overflows (reader stops but writers keep writing). But that is by design, I guess?

Ā 

I expect that more than two threads will enter "if", but only one will reset the FWriteIndex because of the "MAX_SIZE + aOverFlow" passed to InterlockedCompareExchange. The writer I believe works ok :). About the overflow, it's one of the checks I plan to add, even though I believe the single reader will always be fast enough to consume the items.

Thank you for your time!

Share this post


Link to post

This can be done without locks, but it takes two flags, an atomic test-and-set operation (or increment-and-return-value), as well as a strict protocol that must be followed by everybody.

Ā 

I don't see evidence of that here.

Ā 

As an aside, what options are there in Delphi for an atomic test-and-set or incr-and-return-value operation?

Ā 

I'm not sure anything at Delphi's source language level could be considered "atomic"Ā  for concurrency purposes.

Ā 

TRIVIA FACTOID: There was never a real-time or multi-tasking version of any OS built that ran in "protected mode" on Intel's 80286 chip, because the microinstructions that implemented the PUSH IP operation in protected mode could be interrupted between incrementing SP and setting [SP] = IP.

Share this post


Link to post
15 hours ago, Anders Melander said:

There's at least one race condition. Stopped reading once I spotted it:

Ā  Ā 

if InterlockedDecrement(FActiveWriters) = 0 then
  FActiveWritersZeroEvent.SetEvent;

Ā 

I think we should not discuss my version in this thread as it is mainly about koskolos' implementation. As I said: I wrote that back when I read about lock free structures but never really tested it thoroughly and lost interest later.

It would be too confusing to discuss both in the same thread.

Share this post


Link to post
8 minutes ago, dummzeuch said:

I think we should not discuss my version in this thread as it is mainly about koskolos' implementation.

Then you shouldn't have mentioned it.

If you post something as an example of a lock free implementation and it's clearly not thread safe, then I think it's fair that people are warned about that fact.

Share this post


Link to post

I have asked an admin to remove that post. I have no time now to actually check that race condition you mention.

Share this post


Link to post
16 hours ago, Primož Gabrijelčič said:

Ā 

Aaaaah, single READER! Sorry, folks. That is such a weird concept (although, I admit, useful in some situations) that my brain autocorrected it into single WRITER. šŸ™‚ šŸ™‚Ā Silly brain!

I use MWSR queues for stuff like logging. Multiple threads queue things for logging, and a single log thread does the actual work.

Basically, it is a useable model for any kind of data that in the end needs to be serialized to storage.

Share this post


Link to post

BTW, OmniThreadLibrary's TOmniBoundedQueue was implemented in 2008. Then we needed about a year to find few weird boundary conditions where the code would crash and fix them.Ā 

Ā 

After that the queue was working perfectly until 2018, when I found a race condition which caused the queue to say "sorry, can't add an item, the queue is full", when the queue has actually just become empty. Nobody noticed the problem in the meantime, we all thought the code is perfect, and the unit/stress test did not test for this specific scenario ...

Ā 

Writing lock-free structures is harder than you think. (Applies to me, too.)

  • Like 1
  • Thanks 1

Share this post


Link to post
1 minute ago, Primož Gabrijelčič said:

BTW, OmniThreadLibrary's TOmniBoundedQueue was implemented in 2008. Then we needed about a year to find few weird boundary conditions where the code would crash and fix them.Ā 

Ā 

After that the queue was working perfectly until 2018, when I found a race condition which caused the queue to say "sorry, can't add an item, the queue is full", when the queue has actually just become empty. Nobody noticed the problem in the meantime, we all thought the code is perfect, and the unit/stress test did not test for this specific scenario ...

Ā 

Writing lock-free structures is harder than you think. (Applies to me, too.)

Obviously testing thread-related code is hard too. :classic_wink:

Share this post


Link to post
1 hour ago, Rudy Velthuis said:

Obviously testing thread-related code is hard too. :classic_wink:

Indeed.

Ā 

In single-threaded code, good unit tests will show that the code obviouslyĀ has no errors.

In multi-threaded code, good unit tests will show that the code has no obvious errors.

Ā 

IOW, unit tests can only prove that your multi-threaded code doesn't work, not that it does.

Edited by Primož Gabrijelčič
  • Like 2

Share this post


Link to post
9 minutes ago, Primož Gabrijelčič said:

IOW, unit tests can only prove that your multi-threaded code doesn't work, not that it does.

I would argue that the same is true for single threaded code as well.

  • Like 2

Share this post


Link to post
2 hours ago, Primož Gabrijelčič said:

In single-threaded code, good unit tests will show that the code obviouslyĀ has no errors.

In multi-threaded code, good unit tests will show that the code has no obvious errors. ļ»æ

Aw come one, you got that somewhere else. <g>

Ā 

Ah. here it is: " There are two ways of constructing a software design: One way is to make it so simple that there are obviously no deficiencies, and the other way is to make it so complicated that there are no obvious deficiencies. The first method is far more difficult." -- C.A.R. Hoare

Ā 

And: "Program testing can be used to show the presence of bugs, but never to show their absence!" -- Edsger W. Dijkstra.

Ā 

After all, I still have this quotes file I use for XanaNews. <g>

Edited by Rudy Velthuis
  • Like 1

Share this post


Link to post

Indeed, I got the "obvious" part from Hoare šŸ™‚

Ā 

The parallel part is all mine though. And I stand behind it.

Ā 

If one writes a unit test which tests the specification in full, it can in theory prove that a single-threaded unit complies with the specification.

Ā 

Throwing parallel processing in the mix, however, prevents us from even that result. At most one can say is that the unit probably works as expected inside one test environment on one computer.

  • Like 1

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now

Ɨ