Shrinavat 16 Posted February 25, 2019 I have a pipeline for downloading files to a specific database. FFileDownloader := Parallel.Pipeline .Stage(Asy_URLBuilder) .Stage(Asy_URLRetriever) .Stage(Asy_DBInserter, Parallel.TaskConfig.OnMessage(Self)) .Run; procedure Asy_URLBuilder(const input, output: IOmniBlockingCollection); var ovIN, ovOUT: TOmniValue; begin for ovIN in input do begin // ... compose url for downloading output.TryAdd(ovOUT); // url is in ovOUT end; end; procedure Asy_URLRetriever(const input, output: IOmniBlockingCollection); var ovIN, ovOUT: TOmniValue; begin for ovIN in input do begin // ... downloading output.TryAdd(ovOUT); // downloaded file is in ovOUT end; end; procedure Asy_DBInserter(const input, output: IOmniBlockingCollection; const task: IOmniTask); var ovIN: TOmniValue; DB: TxxxDatabase; begin DB := TxxxDatabase.Create(nil); DB.DatabaseName := ??? ; DB.Open; for ovIN in input do begin // ... insert downloaded file in specific database end; DB.Commit; task.Comm.Send(WM_TASK_COMPLETED); end; I run a pipeline for various databases. I need to pass DatabaseName to the third stage of the pipeline. How can I do that? Can I use the SetParameter method of Task controller when creating a pipeline? And if so, how? Any help will be appreciated! Share this post Link to post
Guest Posted February 25, 2019 You should be able to use a method for a pipeline stage. The class will take the dbname as a constructor argument and thats it. Share this post Link to post
Shrinavat 16 Posted February 25, 2019 @Schokohase Sorry, I'm afraid I don't know what you're talking about. What method what class i shoud use? The pipeline does not have an Initialize method. Share this post Link to post
Guest Posted February 25, 2019 I talk about this TDbClass = class private FDatabaseName: string; public constructor Create(const ADatabaseName: string); procedure Insert(const input, output: IOmniBlockingCollection; const task: IOmniTask); end; constructor TDbClass.Create(const ADatabaseName: string); begin inherited Create; FDatabaseName := ADatabaseName; end; procedure TDbClass.Insert(const input, output: IOmniBlockingCollection; const task: IOmniTask); var ovIN: TOmniValue; // DB: TxxxDatabase; begin // DB := TxxxDatabase.Create(); // DB.DatabaseName := FDatabaseName; for ovIN in input do begin // ... insert downloaded file end; // DB.Commit; end; and use it in your pipeline build code like var dbInstance: TDbClass; dbInstance := TDBClass.Create('foo'); FFileDownloader := Parallel.Pipeline .Stage(Asy_URLBuilder) .Stage(Asy_URLRetriever) .Stage(dbInstance.Insert, Parallel.TaskConfig.OnMessage(Self)) .Run; Share this post Link to post
Shrinavat 16 Posted February 25, 2019 @Schokohase Thank you, it's works! I wonder if there is another solution? Without creating an extra class? Share this post Link to post
Guest Posted February 25, 2019 Yes, it is function Build_DbInserterStage(const ADatabaseName: string): TPipelineStageDelegateEx; begin Result := ( procedure(const input, output: IOmniBlockingCollection; const task: IOmniTask) var ovIN: TOmniValue; DB: TxxxDatabase; begin DB := TxxxDatabase.Create(); DB.DatabaseName := ADatabaseName; for ovIN in input do begin // ... insert downloaded file end; DB.Commit; end); end; FFileDownloader := Parallel.Pipeline .Stage(Asy_URLBuilder) .Stage(Asy_URLRetriever) .Stage(Build_DbInserterStage('foo'), Parallel.TaskConfig.OnMessage(Self)) .Run; Share this post Link to post
Primož Gabrijelčič 223 Posted February 25, 2019 (edited) IOmniPipeline.PipelineStage[num] returns an interface which exposes Input and Output pipeline for that stage: type IOmniPipelineStage = interface ['{DFDA7A07-6B28-4AA6-9218-59D3DF9C4B8E}'] function GetInput: IOmniBlockingCollection; function GetOutput: IOmniBlockingCollection; // property Input: IOmniBlockingCollection read GetInput; property Output: IOmniBlockingCollection read GetOutput; end; IOmniPipeline = interface function GetPipelineStage(idxStage: integer): IOmniPipelineStage; // ... property PipelineStage[idxStage: integer]: IOmniPipelineStage read GetPipelineStage; end; You can use them to send data to a specific pipeline stage, for example: pipeline.PipelineStage[2].Input.Add(42); First pipeline stage has index 0. Edited February 25, 2019 by Primož Gabrijelčič Share this post Link to post
Guest Posted February 25, 2019 @Primož Gabrijelčič And how does this help the OP in this case (set the Database Name in a stage)? Share this post Link to post
Primož Gabrijelčič 223 Posted February 25, 2019 Forgot to write that? Oh, me! 😞 In this case, the code can just assume that the first value in the pipeline is a database name: function Build_DbInserterStage(const ADatabaseName: string): TPipelineStageDelegateEx; begin Result := ( procedure(const input, output: IOmniBlockingCollection; const task: IOmniTask) var ovIN: TOmniValue; DB: TxxxDatabase; begin DB := TxxxDatabase.Create(); DB.DatabaseName := ovIN.Take; for ovIN in input do begin // ... insert downloaded file end; DB.Commit; end); end; In more general terms you can declare your special messages which carry metadata (database name) instead of a normal data (to be operated upon). The worker thread can then check the type of the message and react accordingly. You can use TOmniValue's array support for that: pipeline.PipelineStage[3].Input.Add(TOmniValue.CreateNamed(['Type', 'Config', 'DBName', dbName])); pipeline.Input.Add(TOmniValue.CreateNamed(['Type', 'Data', 'Value', value])); for ov in input do if ov['Type'] = 'Config' then db.DatabaseName := ov['DBName'] else Process(ov['Value']); 1 Share this post Link to post