OmahaMax 0 Posted June 22, 2019 The following code works perfectly with one task but fails to process one input variable with two tasks, two input variables with three tasks, etc. pipeline := Parallel .Pipeline .Stage( procedure(const input, output: IOmniBlockingCollection) var NewAmt, OldAmt: Currency; aDataObj: TDataObj; aValue: TOmniValue; aDm: TdmRetriever; begin aDm := TdmRetriever.Create(nil); aDm.SetUp(DataPath); aDataObj := TDataObj.Create('', 0, 0); try for aValue in input do // input is a stringlist of AcctNum begin s := aValue.AsString; MySL.Add(s); aDm.qRetrieverNew.SQL.Text := 'SELECT SUM(Amount) AS CurrYrAmt FROM TransBS' + #13 + 'WHERE (dDate <= ' + QuotedStr(ThruDateStr) + ') AND (AcctNum = ' + QuotedStr(s) + ')'; aDm.qRetrieverOld.SQL.Text := 'SELECT SUM(Amount) AS PrevYrAmt FROM TransBS' + #13 + 'WHERE (dDate <= ' + QuotedStr(PrevThruDateStr) + ') AND (AcctNum = ' + QuotedStr(s) + ')'; aDm.qRetrieverNew.Open; aDm.qRetrieverOld.Open; // Everything continues fine with one task; execution continues with first AcctNum // The first AcctNum is not processed below with two tasks // The first and second AcctNum are not processed below with three tasks if not VarIsNull(aDm.qRetrieverNew.FieldByName('CurrYrAmt').AsVariant) then NewAmt := aDm.qRetrieverNew.FieldByName('CurrYrAmt').AsCurrency else NewAmt := 0; if not VarIsNull(aDm.qRetrieverOld.FieldByName('PrevYrAmt').AsVariant) then OldAmt := aDm.qRetrieverOld.FieldByName('PrevYrAmt').AsCurrency else OldAmt := 0; MySL.Add(s + ', ' + CurrToStr(NewAmt) + ', ' + CurrToStr(OldAmt) ); aDataObj.fAcctNum := s; aDataObj.fCurrYr := NewAmt; aDataObj.fPrevYr := OldAmt; if not output.TryAdd(aDataObj) then break; end; aDm.SessionRetriever.CloseDatabase(aDm.dbRetriever); aDm.SessionRetriever.DropConnections; finally aDataObj.Free; aDm.Free; end; end ).NumTasks(aNumTasks) .Stage( The results, as logged to the MySL stringlist, are attached. I have no idea how to fix this. "aDm" is datamodule I create which contains the queries. I have placed the query openingss in a try-except and never received an error. I have placed query.isempty in a while loop since the query for AcctNum 1111 takes about 4 seconds. The answer is probably something simple, but I'm at a loss as to what I am doing wrong. Will appreciate any help. Log.txt Share this post Link to post
Guest Posted June 22, 2019 Is MySL a simple TStringList? Then it is not threadsafe and can cause a wrong log result. Share this post Link to post
OmahaMax 0 Posted June 22, 2019 Yes, it is a simple TStringList. However, when the output is posted in the final inserter stage, the inserted output matches the stringlist results. One thread works, two threads fail to process the first input, three threads fail to process the first two inputs, and so forth. Results are the same with or without logging. Share this post Link to post
Guest Posted June 22, 2019 (edited) You have a race condition in your code but you did not show all the code to us. How could we give you an answer how to fix it? Does each thread has its own database connection? Edited June 22, 2019 by Guest Share this post Link to post
David Heffernan 2353 Posted June 22, 2019 58 minutes ago, OmahaMax said: Yes, it is a simple TStringList. However, when the output is posted in the final inserter stage, the inserted output matches the stringlist results. One thread works, two threads fail to process the first input, three threads fail to process the first two inputs, and so forth. Results are the same with or without logging. Would be easier for people to help if you could post a minimal repro. Share this post Link to post
OmahaMax 0 Posted June 22, 2019 (edited) 4 hours ago, Schokohase said: Does each thread has its own database connection? Yes. As you can see in the code provided above, each thread creates a tdatamodule (aDm). aDm is set up using the following Procedure SetUp(DataPath). The TAbsSession component has property AutoSessionName set to True. procedure TdmRetriever.SetUp(const aDataPath: string); begin Randomize; DBRetriever.DatabaseName := 'BSJob' + IntToStr(Random(1000)); DBRetriever.DatabaseFileName := aDataPath + 'ReporterDB.abs'; DBRetriever.SessionName := SessionRetriever.SessionName; DBRetriever.MultiUser := True; DBRetriever.Open; tblTransBSRetriever.SessionName := SessionRetriever.SessionName; tblTransBSRetriever.DatabaseName := DBRetriever.DatabaseName; tblTransBSRetriever.TableName := 'TransBS'; tblTransBSRetriever.IndexName := 'AcctNumDate'; tblTransBSRetriever.Open; qRetrieverNew.SessionName := SessionRetriever.SessionName; qRetrieverNew.DatabaseName := DBRetriever.DatabaseName; qRetrieverOld.SessionName := SessionRetriever.SessionName; qRetrieverOld.DatabaseName := DBRetriever.DatabaseName; end; So, I believe each thread does have its own database connection. I'm awfully new to multi-threaded database work, so maybe I'm wrong. Edited June 22, 2019 by OmahaMax Omitted a word Share this post Link to post
Guest Posted June 23, 2019 (edited) Sorry, but we cannot see anything that would make it clear. In general we do not see, if you are using shared instances which are not threadsafe within your threads. BTW Please check the DatabaseName for each thread and you should see, every thread has the same DatabaseName - not very random. You should call Randomize once per application. A VCL/FMX application will call it already internally, so you do not have to worry about. Edited June 23, 2019 by Guest Share this post Link to post
OmahaMax 0 Posted June 23, 2019 @Schokohase: Thank you so much for taking the time to help and your suggestions. I'm going to take another route for the time being. However, when I have time, if I can figure out create a simple reproduction (as per David's suggestion) I will post/upload it. Share this post Link to post
A.M. Hoornweg 144 Posted June 24, 2019 If any of the database components of your datamodule has a dependency on COM libraries (such as tADOconnection) you must do a coInitialize (inside the context of the thread) before creating the datamodule. Share this post Link to post
OmahaMax 0 Posted June 25, 2019 No, thank goodness. I am using Absolute Database from Component Ace. I've been very happy with it. Share this post Link to post