I have a TPL DataFlow pipeline that parses msbuild project files in order to obtain the location of the respective binary and then processes these binaries.
Here is how it looks like:
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
TransformBlock<IProjectEx, IProjectEx> initializeProjects = new(p =>
{
p.Initialize();
return p;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = config.InitializeProjects });
TransformBlock<IProjectEx, AsmFileProcessor.Output> processAsmFiles = new(...,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = config.ProcessAsmFiles });
ActionBlock<AsmFileProcessor.Output> aggregateOutputs = new(...);
initializeProjects.LinkTo(processAsmFiles, linkOptions);
processAsmFiles.LinkTo(aggregateOutputs, linkOptions);
foreach (var p in GetUninitializedProjects())
{
if (!initializeProjects.Post(p))
{
throw new ApplicationException("Code Bug");
}
}
initializeProjects.Complete();
aggregateOutputs.Completion.GetAwaiter().GetResult();
The processAsmFiles
transform block mixes together IO (reading the binary file from disk) and CPU (processing its contents). I would like to separate IO from CPU in order to be able to provide distinct concurrency constraints.
One way to do it is pair IProjectEx
instances with the respective byte[]
objects and forward them to the processAsmFiles
block by introducing a readAsmFiles
transform block:
TransformBlock<IProjectEx, (IProjectEx, byte[])> readAsmFiles = new(async p =>
(p, await File.ReadAllBytesAsync(p.TargetPath)),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = config.ReadAsmFiles });
TransformBlock<(IProjectEx, byte[]), AsmFileProcessor.Output> processAsmFiles = new(...,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = config.ProcessAsmFiles });
...
initializeProjects.LinkTo(readAsmFiles, linkOptions);
readAsmFiles.LinkTo(processAsmFiles, linkOptions);
...
However, what if I want to cache ALL the binaries first in a ConcurrentDictionary<string, byte[]>
instance and only then proceed to the processAsmFiles
block? Can I do it within the context of a single DataFlow pipeline?
The question is a bit unclear and the title certainly asks a different thing from the question itself. It seems there's a confusion between workflow and dataflow.
The whole point of a CSP/Dataflow pipeline is to pass individual data/messages between processing blocks like Powershell or shell script pipelines, not load everything in memory and pass it from one method to another.
In a similar situation I have to parse several complex MB-sized files, extract ticket information, process it and then import it in a database. A TransformManyBlock
searches for files and returns their contents, a second one parses them and extracts tickets and another processes individual tickets . No ConcurrentDictionaries or compex code.
record FileContent(string path,string Content);
record Ticket(....,string path);
var crawler=new TransformManyBlock<DirectoryInfo,FileContent>(folder=>{
foreach (var file in folder.EnumerateFiles("*.ext",SearchOption.AllDirectories))
{
using var reader=file.OpenText();
yield return new FileContent(file.FullName,reader.ReadToEnd();
});
var parserDop=new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
var parser =new TransformManyBlock<FileContent,Ticket>(fc=>ParseToTickets(fc));
var batcher=new BatchBlock<Ticket>(5000);
var importer=new ActionBlock<Ticket[]>(...);
...
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
crawler.LinkTo(parser,linkOptions);
parser.LinkTo(importer,linkOptions);
That's it. After that, I post the folder to the head block and wait for the final block to complete
crawler.Post(someFolder);
crawler.Complete();
await importer.Completion;
The actual code is more complex, catching errors in each block and using Result<>
-like or Envelope
objects to wrap either a success or failure in a message that can be redirected to a logging block. Another option is to pass Error messages down the pipeline without processing, thus avoiding redirections.
I also use different DOP for the crawler and parser because parsing such big files is more expensive than searching and loading file contents, so I want more CPU cores to speed up parsing. It doesn't make sense using a larger DOP than there are cores for CPU-bound work.
If I wanted to search multiple folders, I could raise the crawler's DOP to 2 or a bit higher, but not too high. It's possible up to a point to get better performance due to OS and file system caching.
In other projects with even larger files I return the FileInfo instead of loading the contents and use parsing code that loads from a file stream. Whenever possible I use a RecyclableMemoryStream to create buffers and reduce allocations and GC pressure.
Batching/waiting until "all" results have been received
In some cases we may want to wait for previous blocks to complete before proceeding. Perhaps we want to wait until we've processed all rows from a single input file, before we export them to a CSV file.
Dataflow architectures aren't built for that kind of flow control, but for simple cases, we can emulate it with eg a special message that triggers a batch. That's where an Envelope<TMessage>
class comes handy, not only to pass Success/Failure but also to mark the final message.
The Custom Dataflow doc page shows how to either create a new block from scratch, or use DataFlowBlock.Encapsulate
to batch items by time, using a Queue internally. The same code can be adopted to batch until the last message is received.
To make the code generic, let's assume there's an Envelope<T>
that adds an IsLast
property. The blocks pass around Envelope<T>
messages, not just T
:
record Envelope<T>(T Message, bool IsLast );
The method that batches until the last message could look like this:
public static IPropagatorBlock<Envelope<T>, Envelope<T[]>> BatchUntilDone<T>()
{
var queue = new Queue<T>();
var source = new BufferBlock<Envelope<T[]>>();
var target = new ActionBlock<Envelope<T>>(item =>
{
queue.Enqueue(item.Message);
if (item.IsLast )
{
source.Post(new Envelope<T[]>(queue.ToArray(),true));
}
});
target.Completion.ContinueWith(delegate
{
source.Post(new Envelope<T[]>(queue.ToArray(),true));
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
This could become even more generic if the condition was passed as a parameter :
public static IPropagatorBlock<Envelope<T>, Envelope<T[]>> BatchUntilDone<T>(Function<Envelope<T>,bool> predicate)
{
var queue = new Queue<T>();
var source = new BufferBlock<Envelope<T[]>>();
var target = new ActionBlock<Envelope<T>>(item =>
{
queue.Enqueue(item.Message);
if (predicate(item))
{
source.Post(new Envelope<T[]>(queue.ToArray(),true));
}
});
target.Completion.ContinueWith(delegate
{
source.Post(new Envelope<T[]>(queue.ToArray(),true));
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
Another option is to use a BatchBlock with a very high capacity and trigger it explicitly by calling TriggerBatch. This could be done using code outside the blocks, or by a modified BatchUntil
:
public static IPropagatorBlock<T, T[]> BatchUntil<T>(Function<T>,bool> predicate)
{
var source = new BatchBlock<T>(10_000);
var target = new ActionBlock<T>(async item =>
{
await source.SendAsync(item.Message);
if (predicate(Item))
{
source.TriggerBatch();
}
});
target.Completion.ContinueWith(delegate
{
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}
Barrier - Waiting for completion
The same code can be used with eg a BroadcastBlock simply to wait for previous work to complete before starting the next block. That's somewhat of an edge case, as it's similar to waiting one pipeline to complete before starting another. Perhaps we want to run some database code after everything is imported.
This time nothing is cached
public static IPropagatorBlock<T, TMarker> Barrier<T,TMarker>(Function<T>,bool> donePredicate,TMarker marker)
{
var source = new BroadcastBlock<TMarger>(id=>id);
var target = new ActionBlock<T>(async item =>
{
if (predicate(Item))
{
source.Post(marker);
}
});
target.Completion.ContinueWith(delegate
{
source.Post(marker);
source.Complete();
});
return DataflowBlock.Encapsulate(target, source);
}