Jump to content
Shrinavat

How to pass a parameter to a certain stage of the pipeline?

Recommended Posts

I have a pipeline for downloading files to a specific database.

FFileDownloader := Parallel.Pipeline
  .Stage(Asy_URLBuilder)
  .Stage(Asy_URLRetriever)
  .Stage(Asy_DBInserter, Parallel.TaskConfig.OnMessage(Self))
  .Run;

procedure Asy_URLBuilder(const input, output: IOmniBlockingCollection);
var
  ovIN, ovOUT: TOmniValue;
begin
  for ovIN in input do begin
    // ... compose url for downloading
	output.TryAdd(ovOUT); // url is in ovOUT
  end;
end;

procedure Asy_URLRetriever(const input, output: IOmniBlockingCollection);
var
  ovIN, ovOUT: TOmniValue;
begin
  for ovIN in input do begin
    // ... downloading
	output.TryAdd(ovOUT); // downloaded file is in ovOUT
  end;
end;

procedure Asy_DBInserter(const input, output: IOmniBlockingCollection; const task: IOmniTask);
var
  ovIN: TOmniValue;
  DB: TxxxDatabase;
begin
  DB := TxxxDatabase.Create(nil);
  DB.DatabaseName := ??? ;
  DB.Open;
  
  for ovIN in input do begin
	// ... insert downloaded file in specific database
  end;

  DB.Commit;
  task.Comm.Send(WM_TASK_COMPLETED);
end;

I run a pipeline for various databases. I need to pass DatabaseName to the third stage of the pipeline. How can I do that?

Can I use the SetParameter method of Task controller when creating a pipeline? And if so, how?

Any help will be appreciated!

Share this post


Link to post
Guest

You should be able to use a method for a pipeline stage.

 

The class will take the dbname as a constructor argument and thats it.

Share this post


Link to post
Guest

I talk about this

  TDbClass = class
  private
    FDatabaseName: string;
  public
    constructor Create(const ADatabaseName: string);
    procedure Insert(const input, output: IOmniBlockingCollection; const task: IOmniTask);
  end;

constructor TDbClass.Create(const ADatabaseName: string);
begin
  inherited Create;
  FDatabaseName := ADatabaseName;
end;

procedure TDbClass.Insert(const input, output: IOmniBlockingCollection;
  const task: IOmniTask);
var
  ovIN: TOmniValue;
  // DB: TxxxDatabase;
begin
  // DB := TxxxDatabase.Create();
  // DB.DatabaseName := FDatabaseName;
  for ovIN in input do
  begin
    // ... insert downloaded file
  end;
  // DB.Commit;
end;

and use it in your pipeline build code like

var
  dbInstance: TDbClass;

dbInstance := TDBClass.Create('foo');

FFileDownloader := Parallel.Pipeline
  .Stage(Asy_URLBuilder)
  .Stage(Asy_URLRetriever)
  .Stage(dbInstance.Insert, Parallel.TaskConfig.OnMessage(Self))
  .Run;

 

Share this post


Link to post
Guest

Yes, it is

function Build_DbInserterStage(const ADatabaseName: string): TPipelineStageDelegateEx;
begin
  Result := (
    procedure(const input, output: IOmniBlockingCollection; const task: IOmniTask)
    var
      ovIN: TOmniValue;
      DB: TxxxDatabase;
    begin
      DB := TxxxDatabase.Create();
      DB.DatabaseName := ADatabaseName;
      for ovIN in input do
      begin
        // ... insert downloaded file
      end;
      DB.Commit;
    end);

end;

FFileDownloader := Parallel.Pipeline
  .Stage(Asy_URLBuilder)
  .Stage(Asy_URLRetriever)
  .Stage(Build_DbInserterStage('foo'), Parallel.TaskConfig.OnMessage(Self))
  .Run;

 

Share this post


Link to post

IOmniPipeline.PipelineStage[num] returns an interface which exposes Input and Output pipeline for that stage:

 

type
  IOmniPipelineStage = interface ['{DFDA7A07-6B28-4AA6-9218-59D3DF9C4B8E}']
    function  GetInput: IOmniBlockingCollection;
    function  GetOutput: IOmniBlockingCollection;
  //
    property Input: IOmniBlockingCollection read GetInput;
    property Output: IOmniBlockingCollection read GetOutput;
  end;

  IOmniPipeline = interface
    function  GetPipelineStage(idxStage: integer): IOmniPipelineStage;
  //
    ...
    property PipelineStage[idxStage: integer]: IOmniPipelineStage read GetPipelineStage;
  end;

 

You can use them to send data to a specific pipeline stage, for example:

 

pipeline.PipelineStage[2].Input.Add(42);

First pipeline stage has index 0.

Edited by Primož Gabrijelčič

Share this post


Link to post

Forgot to write that? Oh, me! 😞

 

In this case, the code can just assume that the first value in the pipeline is a database name:

 

function Build_DbInserterStage(const ADatabaseName: string): TPipelineStageDelegateEx;
begin
  Result := (
    procedure(const input, output: IOmniBlockingCollection; const task: IOmniTask)
    var
      ovIN: TOmniValue;
      DB: TxxxDatabase;
    begin
      DB := TxxxDatabase.Create();
      DB.DatabaseName := ovIN.Take;
      for ovIN in input do
      begin
        // ... insert downloaded file
      end;
      DB.Commit;
    end);
end;

 

In more general terms you can declare your special messages which carry metadata (database name) instead of a normal data (to be operated upon). The worker thread can then check the type of the message and react accordingly. You can use TOmniValue's array support for that:

 

    pipeline.PipelineStage[3].Input.Add(TOmniValue.CreateNamed(['Type', 'Config', 'DBName', dbName]));
    pipeline.Input.Add(TOmniValue.CreateNamed(['Type', 'Data', 'Value', value]));

    for ov in input do
      if ov['Type'] = 'Config' then
        db.DatabaseName := ov['DBName']
      else
        Process(ov['Value']);

 

 

  • Thanks 1

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×