Jump to content

Recommended Posts

The following code works perfectly with one task but fails to process one input variable with two tasks, two input variables with three tasks, etc.

 

    pipeline := Parallel
      .Pipeline
      .Stage(

procedure(const input, output: IOmniBlockingCollection)
var
  NewAmt, OldAmt: Currency;
  aDataObj: TDataObj;
  aValue: TOmniValue;
  aDm: TdmRetriever;
begin
  aDm := TdmRetriever.Create(nil);
  aDm.SetUp(DataPath);
  aDataObj := TDataObj.Create('', 0, 0);

  try
    for aValue in input do // input is a stringlist of AcctNum
    begin
      s := aValue.AsString;
      MySL.Add(s);

      aDm.qRetrieverNew.SQL.Text := 'SELECT SUM(Amount) AS CurrYrAmt FROM TransBS'
        + #13 + 'WHERE (dDate <= ' + QuotedStr(ThruDateStr)
        + ') AND (AcctNum = ' + QuotedStr(s) + ')';

      aDm.qRetrieverOld.SQL.Text := 'SELECT SUM(Amount) AS PrevYrAmt FROM TransBS'
        + #13 + 'WHERE (dDate <= ' + QuotedStr(PrevThruDateStr)
        + ') AND (AcctNum = ' + QuotedStr(s) + ')';

      aDm.qRetrieverNew.Open;
      aDm.qRetrieverOld.Open;

      // Everything continues fine with one task; execution continues with first AcctNum
      // The first AcctNum is not processed below with two tasks
      // The first and second AcctNum are not processed below with three tasks

      if not VarIsNull(aDm.qRetrieverNew.FieldByName('CurrYrAmt').AsVariant) then
        NewAmt := aDm.qRetrieverNew.FieldByName('CurrYrAmt').AsCurrency
      else
        NewAmt := 0;

      if not VarIsNull(aDm.qRetrieverOld.FieldByName('PrevYrAmt').AsVariant) then
        OldAmt := aDm.qRetrieverOld.FieldByName('PrevYrAmt').AsCurrency
      else
        OldAmt := 0;

      MySL.Add(s + ', ' + CurrToStr(NewAmt) + ', ' + CurrToStr(OldAmt) );

      aDataObj.fAcctNum := s;
      aDataObj.fCurrYr := NewAmt;
      aDataObj.fPrevYr := OldAmt;


      if not output.TryAdd(aDataObj) then
        break;
    end;
    aDm.SessionRetriever.CloseDatabase(aDm.dbRetriever);
    aDm.SessionRetriever.DropConnections;
  finally
    aDataObj.Free;
    aDm.Free;
  end;
end
).NumTasks(aNumTasks)
      .Stage(

The results, as logged to the MySL stringlist, are attached.

 

I have no idea how to fix this. "aDm" is datamodule I create which contains the queries. I have placed the query openingss in a try-except and never received an error. I have placed query.isempty in a while loop since the query for AcctNum 1111 takes about 4 seconds.

 

The answer is probably something simple, but I'm at a loss as to what I am doing wrong. Will appreciate any help.

Log.txt

Share this post


Link to post
Guest

Is MySL a simple TStringList?

 

Then it is not threadsafe and can cause a wrong log result.

Share this post


Link to post

Yes, it is a simple TStringList. However, when the output is posted in the final inserter stage, the inserted output matches the stringlist results. One thread works, two threads fail to process the first input, three threads fail to process the first two inputs, and so forth. Results are the same with or without logging.

Share this post


Link to post
Guest

You have a race condition in your code but you did not show all the code to us.

 

How could we give you an answer how to fix it?

 

Does each thread has its own database connection?

Edited by Guest

Share this post


Link to post
58 minutes ago, OmahaMax said:

Yes, it is a simple TStringList. However, when the output is posted in the final inserter stage, the inserted output matches the stringlist results. One thread works, two threads fail to process the first input, three threads fail to process the first two inputs, and so forth. Results are the same with or without logging.

Would be easier for people to help if you could post a minimal repro. 

Share this post


Link to post
4 hours ago, Schokohase said:

Does each thread has its own database connection?

Yes. As you  can see in the code provided above, each thread creates a tdatamodule (aDm). aDm is set up using the following Procedure SetUp(DataPath). The TAbsSession component has property AutoSessionName set to True.

 

procedure TdmRetriever.SetUp(const aDataPath: string);
begin
  Randomize;
  DBRetriever.DatabaseName := 'BSJob' + IntToStr(Random(1000));
  DBRetriever.DatabaseFileName := aDataPath + 'ReporterDB.abs';
  DBRetriever.SessionName := SessionRetriever.SessionName;
  DBRetriever.MultiUser := True;
  DBRetriever.Open;

  tblTransBSRetriever.SessionName := SessionRetriever.SessionName;
  tblTransBSRetriever.DatabaseName := DBRetriever.DatabaseName;
  tblTransBSRetriever.TableName := 'TransBS';
  tblTransBSRetriever.IndexName := 'AcctNumDate';
  tblTransBSRetriever.Open;

  qRetrieverNew.SessionName := SessionRetriever.SessionName;
  qRetrieverNew.DatabaseName := DBRetriever.DatabaseName;

  qRetrieverOld.SessionName := SessionRetriever.SessionName;
  qRetrieverOld.DatabaseName := DBRetriever.DatabaseName;

end;

So, I believe each thread does have its own database connection. I'm awfully new to multi-threaded database work, so maybe I'm wrong.
 

Edited by OmahaMax
Omitted a word

Share this post


Link to post
Guest

Sorry, but we cannot see anything that would make it clear.

 

In general we do not see, if you are using shared instances which are not threadsafe within your threads.

 

BTW

 

Please check the DatabaseName for each thread and you should see, every thread has the same DatabaseName - not very random.

You should call Randomize once per application. A VCL/FMX application will call it already internally, so you do not have to worry about.

 

Edited by Guest

Share this post


Link to post

@Schokohase: Thank you so much for taking the time to help and your suggestions. I'm going to take another route for the time being. However, when I have time, if I can figure out create a simple reproduction (as per David's suggestion) I will post/upload it.

Share this post


Link to post

If any of the database components of your datamodule has a dependency on COM libraries (such as tADOconnection) you must do a coInitialize (inside the context of the thread) before creating the datamodule.

Share this post


Link to post

No, thank goodness. I am using Absolute Database from Component Ace. I've been very happy with it.

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×