powershellasynchronouspipeline

Powershell pipeline accumulating buffer


Take basic pipeline:

CommandA | CommandB

CommandA is slow++ while CommandB requires user input to proceed. When CommandB proceeds to the next pipeline item it experiences considerable delay while CommandA does it's thing. The whole pipeline seems proceed on a one at a time basis.

I'd like to do something like:

CommandA | PrefetchPipeline -size 10 | CommandB

So that PrefetchPipeline buffers up to 10 items from CommandA but immediately gives the next item to CommandB whenever possible. There should be no delay for the first item but then if CommandB takes a lot of time to complete it's process, items should still be fetched in the background from CommandA.

Is this possible in powershell?

Clarification:

1..100 | foreach-object { Start-Sleep 3; $_ } |
  MagicFunction |
  foreach-object { write-host $_; [void]$Host.UI.RawUI.ReadKey() }

Can one write a MagicFunction that accumulates stuff from the left and sends it to the right as soon as possible. So that if you wait 30 sec. without pressing a key then you get 10 items in quick succession (i.e. without waiting 3s for each) but without introducing additional delays (i.e. we don't want the MagicFunction to just wait until it has X number of items and then release them all at once).


Solution

  • The PowerShell pipeline is inherently synchronous, so only one command in the chain can run at any given time (even using ForEach-Object -parallel the pipeline as a whole is still synchronous).

    With PowerShell (Core) 7+, you may implement an asynchronous producer/consumer pattern using a thread job and a thread-safe collection:

    # Create a thread-safe collection (queue) with an upper limit of 5 items
    $buffer = [Collections.Concurrent.BlockingCollection[int]]::new( 5 )
    
    # Start producer as a background thread
    $job = Start-ThreadJob -ScriptBlock { 
        
        1..100 | ForEach-Object {
            
            Start-Sleep -Seconds 1  # simulate a slow operation
    
            ($using:buffer).Add( $_ )
        }
    
        # Signal that we have finished using the buffer
        ($using:buffer).CompleteAdding()
    }
    
    # Consumer loop runs until producer thread calls CompleteAdding() and
    # the buffer is empty.
    while( -not $buffer.IsCompleted ) {
    
        # Fetch and output any new items from the buffer
        $item = 0
        while( $buffer.TryTake( [ref] $item ) ) { 
            Write-Host "Received: $item"
        }
    
        Write-Host "Press any key to fetch the next batch of items..."
        $null = $Host.UI.RawUI.ReadKey()
    }
    
    # Cleanup
    $job | Receive-Job -Wait -AutoRemoveJob 
    $buffer.Dispose()