Take basic pipeline:
CommandA | CommandB
CommandA is slow++ while CommandB requires user input to proceed. When CommandB proceeds to the next pipeline item it experiences considerable delay while CommandA does it's thing. The whole pipeline seems proceed on a one at a time basis.
I'd like to do something like:
CommandA | PrefetchPipeline -size 10 | CommandB
So that PrefetchPipeline buffers up to 10 items from CommandA but immediately gives the next item to CommandB whenever possible. There should be no delay for the first item but then if CommandB takes a lot of time to complete it's process
, items should still be fetched in the background from CommandA.
Is this possible in powershell?
Clarification:
1..100 | foreach-object { Start-Sleep 3; $_ } |
MagicFunction |
foreach-object { write-host $_; [void]$Host.UI.RawUI.ReadKey() }
Can one write a MagicFunction that accumulates stuff from the left and sends it to the right as soon as possible. So that if you wait 30 sec. without pressing a key then you get 10 items in quick succession (i.e. without waiting 3s for each) but without introducing additional delays (i.e. we don't want the MagicFunction to just wait until it has X number of items and then release them all at once).
The PowerShell pipeline is inherently synchronous, so only one command in the chain can run at any given time (even using ForEach-Object -parallel
the pipeline as a whole is still synchronous).
With PowerShell (Core) 7+, you may implement an asynchronous producer/consumer pattern using a thread job and a thread-safe collection:
# Create a thread-safe collection (queue) with an upper limit of 5 items
$buffer = [Collections.Concurrent.BlockingCollection[int]]::new( 5 )
# Start producer as a background thread
$job = Start-ThreadJob -ScriptBlock {
1..100 | ForEach-Object {
Start-Sleep -Seconds 1 # simulate a slow operation
($using:buffer).Add( $_ )
}
# Signal that we have finished using the buffer
($using:buffer).CompleteAdding()
}
# Consumer loop runs until producer thread calls CompleteAdding() and
# the buffer is empty.
while( -not $buffer.IsCompleted ) {
# Fetch and output any new items from the buffer
$item = 0
while( $buffer.TryTake( [ref] $item ) ) {
Write-Host "Received: $item"
}
Write-Host "Press any key to fetch the next batch of items..."
$null = $Host.UI.RawUI.ReadKey()
}
# Cleanup
$job | Receive-Job -Wait -AutoRemoveJob
$buffer.Dispose()
BlockingCollection
constructor argument). If the queue is full, it has to wait until the consumer removes an item.TryTake()
method, the consumer won't block if there are no items in the queue. If there are items in the queue, it will remove each item and output it. Use the Take()
method instead if you want the consumer to wait until a new item is in the queue.ReadKey()
call, the producer thread is still able to enqueue new items.using:
prefix is necessary to reference the $buffer
variable from inside the thread jobs script block.