Jump to content
Attila Kovacs

pipeline and visual feedback

Recommended Posts

What is the proper way to display the progress from a pipeline stage when the main thread is blocked by WaitFor()? VCL.

Share this post


Link to post

There is none. Only the main thread should update the UI in a VCL application so if it is blocked, you're out of options.

 

In theory, you could create a window purely by the Windows API and use it to show the progress, but that is probably much to much work. 

 

Better solution would be to not block the VCL application at all.

Share this post


Link to post

Thanks, does this mean pipeline is the wrong approach here or can I run the pipeline without Waitfor? Could not find anything in docs.

 

Edit: ok, I think I just have to omit WaitFor. I'll give it a try.

Edited by Attila Kovacs

Share this post


Link to post

Of course you can run a pipeline without a WaitFor.

 

You have different options to detect when a pipeline has finished its work.

 

a) The main program can count the number of items sent to the pipeline and number of items returned from. (If there is a simple correspondence between two - for example if each input produces exactly one output.)

b) The pipeline itself can detect that it has no more work and then it can signal this to the main program.

 

When you detect a terminating condition, you can shut down the pipeline (with WaitFor) and you'll done.

 

See the "folder scanner" in OTL examples folder for an example of the b) technique or read this chapter of the book: http://www.omnithreadlibrary.com/book/chap10.html#howto-webDownload

Share this post


Link to post

Sadly I'm stuck already, if I omit WaitFor(), nothing happens.

Occasionally one task will be executed if I'm hitting the start button like a maniac, but I have 4 values assigned to the input.

 

I'm playing with the example with retriever/inserter.

 

  pipeline := Parallel.pipeline //
    .Stage(Retriever) //
    .NumTasks(Environment.Process.Affinity.Count * 2) //
    .Stage(Inserter, Parallel.TaskConfig.OnMessage(Self)) //
    .Run //
    ;

  pipeline.input.Add(TThreadParam.Create(....));
  pipeline.input.Add(TThreadParam.Create(....));
  pipeline.input.Add(TThreadParam.Create(....));
  pipeline.input.Add(TThreadParam.Create(....));
  pipeline.input.CompleteAdding;
  // pipeline.WaitFor(INFINITE);

 

I've tried to put ".run" after completeadding as a 'trial and fail'attempt, and in this case on the first run 2 tasks are executed, and from the second run all tasks are executed.

 

I'm really missing the explanations  from the doc, there is also an example

"procedure TfrmOtlParallelExceptions.btnPipeline1Click(Sender: TObject);"

where is no waitfor() but processing output right after "CompleteAdding", which is also confusing.

 

// Provide input
37   with pipeline.Input do begin
38     // few normal elements
39     Add(1);
40     Add(2);
41     // then trigger the exception in the first stage;
42     // this exception should be 'corrected' in the second stage
43     Add('three');
44     Add(4);
45     CompleteAdding;
46   end;
47 
48   // Process output; there should be no exception in the output collection
49   for value in pipeline.Output do
50     Log(value.AsString);

 

 

Share this post


Link to post

I don't know. Show us the code. You are probably doing something after that "WaitFor" and I don't know what.

 

Quote

where is no waitfor() but processing output right after "CompleteAdding", which is also confusing.

 

What is confusing here? You don't have to wait for pipeline to terminate (WaitFor) to start processing its output.

Share this post


Link to post

As I put more code into the pipeline stages some questions are arising.

For example, some of the exceptions are arriving as "string" into the final output collection instead as "exception".

One case is "Abort" and the other one is if I'm re-raising an exception in the stage code. Why is that?

 

Also, what is the proper way to re-run a task or add a new one after "CompleteAdding"?

I would like to return the task with the exception and be able to re-fire the same task. (Network communication)

Do I need a new pipeline?

 

Edited by Attila Kovacs

Share this post


Link to post
5 minutes ago, Attila Kovacs said:

For example, some of the exceptions are arriving as "string" into the final output collection instead as "exception".

Without having a reproducible example, I have no idea.

5 minutes ago, Attila Kovacs said:

Do I need a new pipeline?

Yes, you need a new pipeline. After a pipeline goes into CompleteAdding state, it cannot be "revived".

  • Thanks 1

Share this post


Link to post
On 4/7/2020 at 4:26 PM, Primož Gabrijelčič said:

Without having a reproducible example, I have no idea.

Never underestimate this kind of answers, this tells me always that there is a user error. It was. Thx.

 

I have here something what I'm not getting and very hard to debug.

(got it, not even multi threading problem, it just revealed it \o/)

 

btw, I still can't figure out what "Parallel.TaskConfig.OnMessage(Self)" does, as 2nd parameter to a stage.

 

thx

 

 

 

 

Edited by Attila Kovacs

Share this post


Link to post

Ooookay, everything is working perfectly, it was a journey, transforming the legacy code to work with OTL, which I really enjoyed.

It's frightening how easy and how fast can you write complex and _working_ code with this lib.

The online documentation seems to be a bit behind the current release (didn't check the book yet), so I took a copy of the book at leanpub with a custom input in the box as a small donation and a big thank you.

Share this post


Link to post

Create an account or sign in to comment

You need to be a member in order to leave a comment

Create an account

Sign up for a new account in our community. It's easy!

Register a new account

Sign in

Already have an account? Sign in here.

Sign In Now
×