5

In this question, it was explained how to add to a concurrent ThreadSafe collection Powershell: How to add Result to an Array (ForEach-Object -Parallel)

I have a simpler use case , where I would just like to increment a single value. (Integer).

Is it possible to do in Powershell using some sort of Atomic Integer data type?

$myAtomicCounter = 0

$myItems | ForEach-Object -Parallel {
    #...other work

    $myAtomicCounter.ThreadSafeAdd(2)

    # .. some more work using counter
}

Write-Host($myAtomicCounter)
Santiago Squarzon
  • 41,465
  • 5
  • 14
  • 37
Ryu S.
  • 1,538
  • 2
  • 22
  • 41
  • This is possible but also means that only one thread can update that value at a given time (meaning, other threads are locked while this happens). Can you explain why you want to do this? There is possible a better way – Santiago Squarzon Jan 26 '23 at 21:37
  • @SantiagoSquarzon I would like to keep track of a sum of a value located in each item. ... e.g. if I processed item1, item3, item4, , when im processing item2, i want to have the sum of all item1.val, item2.val and item4.val, on hand to use. I could build a job hashmap and scan to see which ones are done, then sum each time, but i'd prefer to just use a counter like this – Ryu S. Jan 26 '23 at 21:49
  • 1
    The simplest way to do it is output the processed count from your parallel loop into a new pipeline linear loop that will keep updating the count of processed items in a thread safe manner – Santiago Squarzon Jan 26 '23 at 22:01

1 Answers1

7

In PowerShell when updating a single value from multiple threads you must use a locking mechanism, for example Mutex, SemaphoreSlim or even Monitor.Enter otherwise the updating operation will not be thread safe. A synchronized hashtable does not ensure that updating the key values is thread safe.

Below is a simple demo that proves what is stated above:

$sync = [hashtable]::Synchronized(@{ })
$attempts = 0

do {
    $sync['Value'] = 0
    $attempts++
    0..10 | ForEach-Object -Parallel {
        $sync = $using:sync
        Start-Sleep -Milliseconds 200
        $sync['Value']++
    } -ThrottleLimit 11
}
while ($sync['Value'] -eq 11)

"It took $attempts attempts to fail..."

Supposing we have an array of arrays:

$toProcess = 0..10 | ForEach-Object {
    , (Get-Random -Count (Get-Random -Minimum 5 -Maximum 10))
}

And you wanted to keep track of the processed items in each array, here is how you could do it using Mutex:

$processedItems = [hashtable]::Synchronized(@{
    Lock    = [System.Threading.Mutex]::new()
    Counter = 0
})

$toProcess | ForEach-Object -Parallel {
    # using sleep as to emulate doing something here
    Start-Sleep (Get-Random -Maximum 5)

    # bring the local variable to this scope
    $ref = $using:processedItems
    # lock this thread until I can write
    if($ref['Lock'].WaitOne()) {
        # when I can write, update the value
        $ref['Counter'] += $_.Count
        # and realease this lock so others threads can write
        $ref['Lock'].ReleaseMutex()
    }
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$processedItems['Counter'] -eq $processedCount

Another example of tread safe incrementing a counter using Monitor.Enter and a custom function that tries to resemble the C# lock statement:

function lock {
    param(
        [Parameter(Mandatory)]
        [object] $Object,

        [Parameter(Mandatory)]
        [scriptblock] $ScriptBlock
    )

    try {
        [System.Threading.Monitor]::Enter($Object)
        & $ScriptBlock
    }
    finally {
        [System.Threading.Monitor]::Exit($Object)
    }
}

$utils = [hashtable]::Synchronized(@{
    LockFunc = $function:lock.ToString()
    Counter  = @(0)
})

$toProcess | ForEach-Object -Parallel {
    # bring the utils var to this scope
    $utils = $using:utils
    # define the `lock` function here
    $function:lock = $utils['LockFunc']

    Start-Sleep (Get-Random -Maximum 5)

    # lock the counter array
    lock($utils['Counter'].SyncRoot) {
        # increment and release when done
        $utils['Counter'][0] += $_.Count
    }
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$utils['Counter'][0] -eq $processedCount

A much simpler approach in PowerShell would be to output from your parallel loop into a linear loop where you can safely update the counter without having to care about thread safety:

$counter = 0

$toProcess | ForEach-Object -Parallel {
    # using sleep as to emulate doing something here
    Start-Sleep (Get-Random -Maximum 5)

    # when this thread is done,
    # output this array of processed items
    $_
    
} | ForEach-Object {
    # then the output from the parallel loop is received in this linear
    # thread safe loop where we can update the counter
    $counter += $_.Count
}

$processedCount = ($toProcess | Write-Output | Measure-Object).Count

# Should be True:
$counter -eq $processedCount
Santiago Squarzon
  • 41,465
  • 5
  • 14
  • 37
  • 1
    What about [`Interlocked.Increment`](https://learn.microsoft.com/en-us/dotnet/api/system.threading.interlocked.increment?view=net-7.0) as [suggested](https://stackoverflow.com/questions/13181740/c-sharp-thread-safe-fastest-counter) by the C# guys? – zett42 Jan 27 '23 at 10:34
  • @zett42 definitely really cool, didn't know about it. You should add it as answer. Worth noting this is also a locking mechanism – Santiago Squarzon Jan 27 '23 at 12:29
  • 2
    mmm doesn't seem to play well in powershell @zett42 I'm not getting consistent results `$i = [ref] 0; 0..100 | ForEach-Object -Parallel { $i = $using:i; Start-Sleep (Get-Random -Maximum 4); $null = [System.Threading.Interlocked]::Increment($i) }` sometimes 101 as expected others less than that – Santiago Squarzon Jan 27 '23 at 12:34
  • 1
    I haven't used `Interlocked.Increment` in PowerShell yet. Thanks for testing! That it's not working reliably propably has to do with the fact that PowerShell's `[ref]` is not a real reference, just a wrapper class. – zett42 Jan 27 '23 at 13:19
  • How would I go about this using the class System.Collections.Concurrent.ConcurrentDictionary? – Dennis Jun 22 '23 at 17:13
  • @Dennis same concept, you still need a lock. The only thing that changes is how you define using `[...ConcurrentDictionary[string, object]]::new()` instead of `[hashtable]::Synchronized(@{...` – Santiago Squarzon Jun 22 '23 at 17:16
  • But doesn't `ConcurrentDictionary` provides the locks for me when using the specific methods for that in the class [How to add and remove items from a ConcurrentDictionary](https://learn.microsoft.com/en-us/dotnet/standard/collections/thread-safe/how-to-add-and-remove-items)? – Dennis Jun 22 '23 at 17:24
  • @Dennis please just try to increment a single key in a concurrent dictionary as shown in the example of Github and see if its thread safe or not. I can assure you its not. Otherwise feel free to ask a new question – Santiago Squarzon Jun 22 '23 at 17:25
  • That example uses `++`. Is there no practical usage with the methods `AddOrUpdate`, `GetOrAdd`, `TryAdd`, `TryGetValue`, `TryUpdate`, `TryRemove`? – Dennis Jun 22 '23 at 17:28
  • Is this inaccurate? "If you call AddOrUpdate simultaneously on different threads, addValueFactory may be called multiple times, but its key/value pair might not be added to the dictionary for every call." as stated in [ConcurrentDictionary.AddOrUpdate Method](https://learn.microsoft.com/en-us/dotnet/api/system.collections.concurrent.concurrentdictionary-2.addorupdate?view=net-7.0). I realize som values might be lost, but the structure won't break. – Dennis Jun 22 '23 at 17:50
  • @Dennis how does this relate to the question being asked here? This question was about incrementing a single value. Please ask a new question if you have doubts. – Santiago Squarzon Jun 22 '23 at 17:55
  • You've lost me. Wa are talking about incrementing the value in a thread safe way? And I was asking why we couldn't use the method `AddOrUpdate` for that (then I found out). – Dennis Jun 22 '23 at 18:00
  • 1
    @Dennis `AddOrUpdate` can do what its name implies, add or update a key value pair however to increment a single value you need to know said value beforehand to add `+ 1` to it. The reason why a lock is required before incrementing is quite simple, 2 or more threads can attempt to read the value at the same time and then try to update said value at the same time creating a race condition. The lock ensures that only 1 thread can read and update said value in a given time lapse. – Santiago Squarzon Jun 22 '23 at 18:10
  • 1
    Ok, now I'm with you. Thanks :) – Dennis Jun 22 '23 at 18:13