I'm trying to understand parallel processing in PS5. Everything was straightforward until I came up on the overload of BeginInvoke with two parameters: BeginInvoke<TInput,TOutput>. Found something on a blog that works well for the TOutput parameter.
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = { Get-Random -Maximum 100 }
$Instances = (1..5) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
Running this and looking at $Output yields the expected result:
PS C:\Users\xyz> $Outputs 10 74 41 56 59
Now when I tried to pass something to the Script Block via $Inputs I never succeeded. I know that you can do it with AddParameters but I don't like to leave a stone unturned and would like to understand how you can do it with this overload. Spent a week now looking at resources on the net but could not find a way to do it correctly. From what I know this will be passed via the pipeline like the $Outputs is at the end. Here is one way ( from the thousand that I tried...) that doesn't work:
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
#Let's say I want to add a bias to the random value
$Inputs.Add( [PSCustomObject]@{
Bias = 100 }
)
$ScriptBlock = {
Param
(
#Hoping to get value from pipeline
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[System.Management.Automation.PSDataCollection]$Bias
)
$BiasValue = [PSCustomObject]$Bias[0]
(Get-Random -Maximum 100) + $BiasValue[0].'Bias' }
#Create the threads
$Instances = (1..10) | ForEach-Object {
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
[PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs,$Outputs)
}
}
#Wait for all threads to finish
while ( $Instances.State.IsCompleted -contains $False) { Start-Sleep -Milliseconds 100 }
Of course this code doesn't do anything useful, it's just a test to find out how to get the $Inputs value in the ScriptBlock. Now $Outputs is completely empty pointing at an error in the ScriptBlock.
Any help would be appreciated.
Hopefully this example helps you get a better understanding. The key issue with your code is that your scriptblock is missing its process
block and the parameter of the scriptblock should be psobject
or just object
since what the threads will receive from pipeline (TInput
) is just a pscustomobject
, they do not receive the entire PSDataCollection<TInput>
. It's also hard to give an example from PowerShell because it is single threaded.
$RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, 5)
$RunspacePool.Open()
$Inputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$Outputs = New-Object 'System.Management.Automation.PSDataCollection[PSObject]'
$ScriptBlock = {
Param(
# all threads will receive this and process it in parallel
[Parameter(Mandatory = $true, ValueFromPipeline = $true)]
[psobject] $Bias
)
process {
[pscustomobject]@{
ThreadId = [runspace]::DefaultRunspace.Id
PipelineInput = $Bias
Result = (Get-Random -Maximum 100) + $Bias.Value
}
Start-Sleep 1
}
}
$jobs = [System.Collections.Generic.List[object]]::new()
#Create the threads
1..10 | ForEach-Object {
# simulate input from pipeline
$Inputs.Add([pscustomobject]@{ Value = $_ })
# now start processing
$Instance = [powershell]::Create().AddScript($ScriptBlock)
$Instance.RunspacePool = $RunspacePool
$jobs.Add([PSCustomObject]@{
Instance = $Instance
State = $Instance.BeginInvoke($Inputs, $Outputs)
})
# simulate output, ReadAll() will copy the output into a new
# collection that we can safely read and clear itself
if ($Outputs.Count) {
$Outputs.ReadAll()
}
}
$Inputs.Complete()
# now block until processing is done
do {
$id = [System.Threading.WaitHandle]::WaitAny($jobs.State.AsyncWaitHandle, 200)
# if there is any output from threads, consume it
if ($Outputs.Count) {
$Outputs.ReadAll()
}
if ($id -eq [System.Threading.WaitHandle]::WaitTimeout) {
continue
}
$job = $jobs[$id]
$job.Instance.EndInvoke($job.State)
$job.Instance.Dispose()
$jobs.RemoveAt($id)
}
while ($jobs.Count)
$RunspacePool.Dispose()