In this question, it was explained how to add to a concurrent ThreadSafe collection Powershell: How to add Result to an Array (ForEach-Object -Parallel)
I have a simpler use case , where I would just like to increment a single value. (Integer).
Is it possible to do in Powershell using some sort of Atomic Integer data type?
$myAtomicCounter = 0
$myItems | ForEach-Object -Parallel {
#...other work
$myAtomicCounter.ThreadSafeAdd(2)
# .. some more work using counter
}
Write-Host($myAtomicCounter)
In PowerShell when updating a single value from multiple threads you must use a locking mechanism, for example Mutex
, SemaphoreSlim
or even Monitor.Enter
otherwise the updating operation will not be thread safe. A synchronized hashtable does not ensure that updating the key values is thread safe.
Below is a simple demo that proves what is stated above:
$sync = [hashtable]::Synchronized(@{ })
$iteration = 0
do {
$sync['Value'] = 0
$iteration++
0..10 | ForEach-Object -Parallel {
$sync = $using:sync
Start-Sleep -Milliseconds 200
$sync['Value']++
} -ThrottleLimit 11
}
while ($sync['Value'] -eq 11)
"On iteration $iteration the value was $($sync['Value'])"
Interlocked.Increment
is also not thread-safe in PowerShell as PSReference
is not the same as a C# by ref
value. Again, simple example to prove it:
$iteration = 0
do {
$iteration++
$i = [ref] 0
0..10 | ForEach-Object -Parallel {
$i = $using:i
# [System.Threading.Monitor]::Enter($i) => makes this thread safe
Start-Sleep -Milliseconds 200
$null = [System.Threading.Interlocked]::Increment($i)
# [System.Threading.Monitor]::Exit($i) => makes this thread safe
}
}
until ($i.Value -ne 11)
"On iteration $iteration the value was $($i.Value)"
A few examples on how we can increment the value in a thread safely manner:
Mutex
:$processedItems = [hashtable]::Synchronized(@{
Lock = [System.Threading.Mutex]::new()
Counter = 0
})
0..10 | ForEach-Object -Parallel {
# using sleep as to emulate doing something here
Start-Sleep -Milliseconds (Get-Random -Maximum 200)
# bring the local variable to this scope
$ref = $using:processedItems
# lock this thread until I can write
if ($ref['Lock'].WaitOne()) {
# when I can write, update the value
$ref['Counter']++
# and realease this lock so others threads can write
$ref['Lock'].ReleaseMutex()
}
}
# Should be True:
$processedItems['Counter'] -eq 11
Monitor.Enter
and a custom function that tries to resemble the C# lock
statement:function lock {
param(
[Parameter(Mandatory)]
[object] $Object,
[Parameter(Mandatory)]
[scriptblock] $ScriptBlock
)
try {
[System.Threading.Monitor]::Enter($Object)
& $ScriptBlock
}
finally {
[System.Threading.Monitor]::Exit($Object)
}
}
$utils = [hashtable]::Synchronized(@{
LockFunc = $function:lock.ToString()
Counter = @(0)
})
0..10 | ForEach-Object -Parallel {
# bring the utils var to this scope
$utils = $using:utils
# define the `lock` function here
$function:lock = $utils['LockFunc']
Start-Sleep -Milliseconds (Get-Random -Maximum 200)
# lock the counter array
lock($utils['Counter'].SyncRoot) {
# increment and release when done
$utils['Counter'][0]++
}
}
# Should be True:
$utils['Counter'][0] -eq 11
SemaphoreSlim
:$utils = [hashtable]::Synchronized(@{
Semaphore = [System.Threading.SemaphoreSlim]::new(1, 1)
Counter = @(0)
})
0..10 | ForEach-Object -Parallel {
# bring the utils var to this scope
$utils = $using:utils
Start-Sleep -Milliseconds (Get-Random -Maximum 200)
# wait here until we can write
$utils['Semaphore'].Wait()
# once i can write, increment the value
$utils['Counter'][0]++
# and release so other thread can write
$null = $utils['Semaphore'].Release()
}
# Should be True:
$utils['Counter'][0] -eq 11
$counter = 0
0..10 | ForEach-Object -Parallel {
# using sleep as to emulate doing something here
Start-Sleep -Milliseconds (Get-Random -Maximum 200)
# when this thread is done, output from the parallel
$_
} | ForEach-Object {
# then the output from the parallel loop is received in this linear
# thread safe loop where we can update the counter
$counter++
}
# Should be True:
$counter -eq 11