multithreadingpowershellconcurrencyparallel-processingrunspace

Is there an easier way to run commands in parallel while keeping it efficient in Windows PowerShell?


This self-answer intends to provide an easy and efficient parallelism alternative for those stuck with Windows PowerShell and being unable to install Modules due to, for example, Company Policies.

In Windows PowerShell, the built-in available alternatives for local parallel invocations are Start-Job and workflow, both known to be very slow, inefficient, and one of them (workflow) is not even recommended to use and no longer available in newer versions of PowerShell.

The other alternative is to rely on the PowerShell SDK and code our own parallel logic using what the System.Management.Automation.Runspaces Namespace has to offer. This is definitively the most efficient approach and is what ForEach-Object -Parallel (in PowerShell Core) as well as the Start-ThreadJob (preinstalled in PowerShell Core and available in Windows PowerShell through the PowerShell Gallery) uses behind the scenes.

A simple example:

$throttlelimit = 3

$pool = [runspacefactory]::CreateRunspacePool(1, $throttlelimit)
$pool.Open()

$tasks = 0..10 | ForEach-Object {
    $ps = [powershell]::Create().AddScript({
        'hello world from {0}' -f [runspace]::DefaultRunspace.InstanceId
        Start-Sleep 3
    })
    $ps.RunspacePool = $pool

    @{ Instance = $ps; AsyncResult = $ps.BeginInvoke() }
}

$tasks | ForEach-Object {
    $_.Instance.EndInvoke($_.AsyncResult)
}

$tasks.Instance, $pool | ForEach-Object Dispose

This is great but gets tedious and often times complicated when the code has more complexity and in consequence brings lots of questions.

Is there an easier way to do it?


Solution

  • NOTE

    Further updates to this function will be published to the GitHub repo as well as to the PowerShell Gallery. The code in this answer will no longer be maintained.

    The documentation for the function as well as usage examples can be found here. Note that the Module version no longer has a -ThreadOptions parameter and implements -UseNewRunspace and -TimeoutSeconds parameters however its usage should be the same.

    Contributions are more than welcome, if you wish to contribute, fork the repo and submit a pull request with the changes.


    Since this is a topic that can be confusing and often brings questions to the site I have decided to create this function that can simplify this tedious task and help those stuck in Windows PowerShell. The aim is to have it as simple and as friendly as possible, it should also be a function that could be copy-pasted in our $PROFILE to be reused whenever needed and not require the installation of a Module (as stated in the question).

    This function has been greatly inspired by RamblingCookieMonster's Invoke-Parallel and Boe Prox's PoshRSJob and is merely a simplified take on those with a few improvements.

    DEFINITION

    using namespace System.Collections
    using namespace System.Collections.Generic
    using namespace System.Management.Automation
    using namespace System.Management.Automation.Language
    using namespace System.Management.Automation.Runspaces
    using namespace System.Threading
    using namespace System.Text
    
    # The function must run in the scope of a Module.
    # `New-Module` must be used for portability. Otherwise store the
    # function in a `.psm1` and import it via `Import-Module`.
    New-Module PSParallelPipeline -ScriptBlock {
        class CommandCompleter : IArgumentCompleter {
            [IEnumerable[CompletionResult]] CompleteArgument(
                [string] $commandName,
                [string] $parameterName,
                [string] $wordToComplete,
                [CommandAst] $commandAst,
                [IDictionary] $fakeBoundParameters) {
    
                return [CompletionCompleters]::CompleteCommand(
                    $wordToComplete,
                    [NullString]::Value,
                    [CommandTypes]::Function)
            }
        }
    
        function Invoke-Parallel {
            [CmdletBinding(PositionalBinding = $false)]
            [Alias('parallel')]
            param(
                [Parameter(Mandatory, ValueFromPipeline)]
                [object] $InputObject,
    
                [Parameter(Mandatory, Position = 0)]
                [scriptblock] $ScriptBlock,
    
                [Parameter()]
                [ValidateRange(1, 63)]
                [int] $ThrottleLimit = 5,
    
                [Parameter()]
                [hashtable] $Variables,
    
                [Parameter()]
                [ValidateNotNullOrEmpty()]
                [ArgumentCompleter([CommandCompleter])]
                [string[]] $Functions,
    
                [Parameter()]
                [ValidateSet('ReuseThread', 'UseNewThread')]
                [PSThreadOptions] $ThreadOptions = [PSThreadOptions]::ReuseThread
            )
    
            begin {
                try {
                    $iss = [initialsessionstate]::CreateDefault2()
    
                    foreach ($key in $Variables.PSBase.Keys) {
                        if ($Variables[$key] -is [scriptblock]) {
                            $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
                                [PSArgumentException]::new('Passed-in script block variables are not supported.'),
                                'VariableCannotBeScriptBlock',
                                [ErrorCategory]::InvalidType,
                                $Variables[$key]))
                        }
    
                        $iss.Variables.Add([SessionStateVariableEntry]::new($key, $Variables[$key], ''))
                    }
    
                    foreach ($function in $Functions) {
                        $def = (Get-Command $function).Definition
                        $iss.Commands.Add([SessionStateFunctionEntry]::new($function, $def))
                    }
    
                    $usingParams = @{}
                    foreach ($usingstatement in $ScriptBlock.Ast.FindAll({ $args[0] -is [UsingExpressionAst] }, $true)) {
                        $variableAst = [UsingExpressionAst]::ExtractUsingVariable($usingstatement)
                        $varPath = $variableAst.VariablePath.UserPath
                        $varText = $usingstatement.ToString()
    
                        if ($usingstatement.SubExpression -is [VariableExpressionAst]) {
                            $varText = $varText.ToLowerInvariant()
                        }
    
                        $key = [Convert]::ToBase64String([Encoding]::Unicode.GetBytes($varText))
    
                        if ($usingParams.ContainsKey($key)) {
                            continue
                        }
    
                        $value = $PSCmdlet.SessionState.PSVariable.GetValue($varPath)
    
                        if ($value -is [scriptblock]) {
                            $PSCmdlet.ThrowTerminatingError([ErrorRecord]::new(
                                [PSArgumentException]::new('Passed-in script block variables are not supported.'),
                                'VariableCannotBeScriptBlock',
                                [ErrorCategory]::InvalidType,
                                $value))
                        }
    
                        if ($usingstatement.SubExpression -isnot [VariableExpressionAst]) {
                            [Stack[Ast]] $subexpressionStack = $usingstatement.SubExpression.FindAll({
                                $args[0] -is [IndexExpressionAst] -or
                                $args[0] -is [MemberExpressionAst] },
                                $false)
    
                            while ($subexpressionStack.Count) {
                                $subexpression = $subexpressionStack.Pop()
                                if ($subexpression -is [IndexExpressionAst]) {
                                    $idx = $subexpression.Index.SafeGetValue()
                                    $value = $value[$idx]
                                    continue
                                }
    
                                if ($subexpression -is [MemberExpressionAst]) {
                                    $member = $subexpression.Member.SafeGetValue()
                                    $value = $value.$member
                                }
                            }
                        }
    
                        $usingParams.Add($key, $value)
                    }
    
                    $pool = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host)
                    $tasks = [List[hashtable]]::new()
                    $pool.ThreadOptions = $ThreadOptions
                    $pool.Open()
                }
                catch {
                    $PSCmdlet.ThrowTerminatingError($_)
                }
            }
            process {
                try {
                    # Thanks to Patrick Meinecke for his help here.
                    # https://github.com/SeeminglyScience/
                    $ps = [powershell]::Create().
                        AddScript({ $args[0].InvokeWithContext($null, [psvariable]::new('_', $args[1])) }).
                        AddArgument($ScriptBlock.Ast.GetScriptBlock()).
                        AddArgument($InputObject)
    
                    # This is how `Start-Job` does it's magic.
                    # Thanks to Jordan Borean for his help here.
                    # https://github.com/jborean93
                    if ($usingParams.Count) {
                        $null = $ps.AddParameters(@{ '--%' = $usingParams })
                    }
    
                    $ps.RunspacePool = $pool
    
                    $tasks.Add(@{
                        Instance    = $ps
                        AsyncResult = $ps.BeginInvoke()
                    })
                }
                catch {
                    $PSCmdlet.WriteError($_)
                }
            }
            end {
                try {
                    while ($tasks.Count) {
                        $id = [WaitHandle]::WaitAny($tasks.AsyncResult.AsyncWaitHandle, 200)
    
                        if ($id -eq [WaitHandle]::WaitTimeout) {
                            continue
                        }
    
                        $task = $tasks[$id]
                        $task.Instance.EndInvoke($task.AsyncResult)
    
                        foreach ($err in $task.Instance.Streams.Error) {
                            $PSCmdlet.WriteError($err)
                        }
    
                        $tasks.RemoveAt($id)
                    }
                }
                catch {
                    $PSCmdlet.WriteError($_)
                }
                finally {
                    foreach ($task in $tasks.Instance) {
                        if ($task -is [IDisposable]) {
                            $task.Dispose()
                        }
                    }
    
                    if ($pool -is [IDisposable]) {
                        $pool.Dispose()
                    }
                }
            }
        }
    } -Function Invoke-Parallel | Import-Module -Force