powershellparallel-processingparallel.foreachpowershell-5.1

How can I improve this implementation of a foreach-parallel function in PowerShell v5?


I found this interesting function, ForEach-Parallel that works in PowerShell v5. It's part of the repo chocolatey-oneget-provider at GitHub that I've tried to improve upon.

The reason I'm not using PowerShell v7.x is that the other code base I'm working with only supports PowerShell v5.

But I'm having issues with running network commands in parallel, is that a limitation in the Windows 10 TCP-stack as the time saving is minimal?

The second example I've specified will generate an error output before starting.

One or more errors occurred.

Also I'm not that happy with my attempt of hiding the parameterized argument -ScriptBlock by just assuming it is in position 0.

function Invoke-ForEachParallel {
    [CmdletBinding()]
    [Alias('ForEach-Parallel')
    ][Diagnostics.CodeAnalysis.SuppressMessageAttribute(
        "PSAvoidUsingAliases",
        '',
        Justification = 'The used verb makes the most sense in this case.'
    )]
    param (
        [Parameter(Mandatory, ValueFromPipeline)]
        [Array]$Items, #! Not used in the code as a param so will only work as piped value

        [Parameter(Mandatory, Position = 0)]
        [Alias ('Process')] #* Bad way of making the script more compatible with foreach syntax
        [scriptblock]$ScriptBlock,

        [Parameter()]
        [Object[]]$ArgumentList,

        [Parameter()]
        [int]$MaxRunspaces = 16,

        [Parameter()]
        [int]$TimeOut = (60 * 60 * 1000) #TODO: Handle seconds as input as well?
    )

    if ($Input) {
        try {
            # create the optional argument- & parameter-lists to be used in the script-block
            $Arguments  = ''
            $Parameters = ''

            if ($ArgumentList) {
                for ($Index = 0; $Index -lt $ArgumentList.Length; $Index++) {
                    $Arguments  += ", `$$Index"
                    $Parameters += " `$$Index"
                }
            }

            # create the script-block to be executed
            # - the provided script-block is wrapped, so the provided arguments (ArgumentList)
            #   can be passed along with the current item ($_)
            # - the current module is always loaded

            $ScriptText =
@"
[CmdletBinding()]
param (`$_$Arguments)

function Wrapper {
$ScriptBlock
}

Wrapper $Parameters
"@

            $SessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()

            # providing the current host makes the output of each runspace show up in the current host
            $Pool = [runspacefactory]::CreateRunspacePool(1, $MaxRunspaces, $sessionState, $Host)
            $Pool.Open()

            # create a new runspace for each item
            $Runspaces      = @()
            $AsyncResults   = @()
            $Exceptions     = @()

            foreach ($Item in $Input) {
                $Runspace = [powershell]::create()
                $Runspace.RunSpacePool = $Pool
                $Runspaces += $Runspace

                $Runspace.Streams.Error.add_DataAdded({
                    Param (
                        [Object]$Sender,
                        [System.Management.Automation.DataAddedEventArgs]$e #!$e what?
                    )

                    foreach ($Item in $Sender.ReadAll()) {
                        throw "$($Item.Exception.Message)"
                    }
                })

                # add the generated script-block, passing the current item and optional arguments
                [void]$Runspace.AddScript($ScriptText)
                [void]$Runspace.AddArgument($Item)

                if ($ArgumentList) {
                    [void]$Runspace.AddParameters($ArgumentList)
                }

                # pass the Verbose-parameter
                [void]$Runspace.AddParameter('Verbose', $VerbosePreference -eq 'Continue')

                # start the runspace synchronously
                $AsyncResult    = $Runspace.BeginInvoke()
                $AsyncResults   += $AsyncResult

            }#end foreach

            # wait for all runspaces to finish
            for ($Index = 0; $Index -lt $AsyncResults.Length; $Index++) {
                $null = [System.Threading.WaitHandle]::WaitAll($AsyncResults[$Index].AsyncWaitHandle, $TimeOut)
            }

            # retrieve the result of each runspace
            $Errors = @()

            for ($Index = 0; $Index -lt $AsyncResults.Length; $Index++) {
                $AsyncResult    = $AsyncResults[$Index]
                $Runspace       = $Runspaces[$Index]

                # if needed, the following properties provide details of the runspace completion-status
                # $runspace.InvocationStateInfo.State
                # $runspace.InvocationStateInfo.Reason

                try {
                    Write-Output ($Runspace.EndInvoke($AsyncResult))
                }
                catch {
                    # collect each error, so they can be provided as a single error
                    $Errors += $_
                }

            }#end for

        }#end try

        finally {
            if ($Pool) {
                $Pool.Close()
            }
        }

        # handle the error(s)
        if ($Errors) {
            if ($Errors.Length -eq 1) {
                throw $Errors[0]
            }
            else {
                $Exceptions = [exception[]]($Errors).Exception
                $AllMessages = $([string]::Join("`n", $Exceptions.Message))
                throw (New-Object AggregateException -ArgumentList "One or more errors occurred:`n$AllMessages",$Exceptions)
            }
        }

    }#endif Input

<#
.SYNOPSIS
    Parallel foreach, using PowerShell runspaces.

.DESCRIPTION
    Parallel foreach, using PowerShell runspaces.

.NOTES
    MIT license of CodeblackNL

.LINK
    https://github.com/CodeblackNL/chocolatey-oneget-provider/tree/master

.PARAMETER Items
    The items for which to execute the ScriptBlock.

.PARAMETER ScripBlock
    The script-block to execute for each item.

.PARAMETER ArgumentList
    Array of arguments as objects that will be passed to the ScriptBlock.

.PARAMETER MaxRunspaces
    The maximum number of runspaces (to attempt) to run in parallel.
    The actual number of runspaces executing in parallel is determined by the runtime
    and is e.g. limited by available cores.
    Default is 16.

.PARAMETER TimeOut
    The time to wait for each runspace to complete, in milliseconds.
    Default is 1 hour.

.EXAMPLE
    1..10 | foreach {Invoke-WebRequest -Uri www.bing.com | select StatusCode}

    1..10 | foreach-parallel {Invoke-WebRequest -Uri www.bing.com | select StatusCode}

    Get the status code of accessing a site in parallel ten times.

.EXAMPLE
    1..10 | foreach {Test-NetConnection www.bing.com -Port 443 | select TcpTestSucceeded}

    1..10 | foreach-parallel {Test-NetConnection www.bing.com -Port 443 | select TcpTestSucceeded}

    Test opening a TCP-port ten times.

.EXAMPLE
    (1..10) | foreach {'Sleeping'; sleep -Seconds 1}

    (1..10) | foreach-parallel {'Sleeping'; sleep -Seconds 1}

    Sleep 1 second ten times.

#>

}

Solution

  • The module PSParallelPipeline, as suggested by @santiago, is much better suited 👍

    And it fix (most of) my issues. The delay when using network commands stands.

    But no need to re-invent the wheel.