This is my first question in SO, i'm new using DataFlow with BroadcastBlock and ActionBlock, i hope i can get solution in here. Here's the structure.
class SampleModel
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public bool Success { get; set; } = true;
public object UniqueData { get; set; }
public override string ToString()
{
StringBuilder sb = new StringBuilder();
sb.AppendLine($"Id - {Id}");
sb.AppendLine($"Success - {Success}");
sb.AppendLine($"UniqueData - {UniqueData}");
string tmp = sb.ToString();
sb.Clear();
return tmp;
}
}
class CreateDownloadTask
{
public async Task VeryLongProcess()
{
await Task.Run(async () =>
{
Console.WriteLine("Long Process Working..");
await Task.Delay(TimeSpan.FromSeconds(5));
Console.WriteLine("Long Process Done..");
});
}
public async Task CreateSimpleBroadcastX<T>(T data)
{
Action<T> process = async model =>
{
Console.WriteLine("Working..");
await VeryLongProcess();
Console.WriteLine("Done");
};
var broad = new BroadcastBlock<T>(null);
var action = new ActionBlock<T>(process);
var dflo = new DataflowLinkOptions { PropagateCompletion = true };
broad.LinkTo(action, dflo);
await broad.SendAsync(data);
broad.Complete();
await action.Completion.ContinueWith(async tsk =>
{
Console.WriteLine("Continue data");
}).ConfigureAwait(false);
Console.WriteLine("All Done");
}
}
var task = cdt.CreateSimpleBroadcastX<SampleModel>(new SampleModel
{
UniqueData = cdt.GetHashCode()
});
task.GetAwaiter().GetResult();
Console.WriteLin("Completed");
Working..
Long Process Working..
Long Process Done..
Done
Continue data
All Done
Completed
Working..
Long Process Working..
Continue data
All Done
Completed
Long Process Done..
Done
This is happen when ther is async-await
inside of ActionBlock.
Now, the question is, is that possible to make the result as i expected without WaitHandle
?
That mean, ActionBlock.Completion will be wait until the Action
or Delegate
inside the ActionBlock is complete executed?
Or i'm i doing it wrong with that patter?
Thanks in Advance, and sorry for my bad english.
Your problem is here:
Action<T> process = async model => ...
That code creates an async void
method, which should be avoided. One of the reasons you should avoid async void
is because it is difficult to know when the method has completed. And this is exactly what is happening: the ActionBlock<T>
cannot know when your delegate has completed because it is async void
.
The proper delegate type for an asynchronous method without a return value that takes a single argument is Func<T, Task>
:
Func<T, Task> process = async model => ...
Now that the asynchronous method returns a Task
, the ActionBlock
can know when it completes.