I've been trying to implement a producer-consumer pattern with multiple producers using BlockingCollection<>
, Start-ThreadJob
and ForEach-Object -Parallel
. The results were mixed. Some code runs, some freezes and some just crashes powershell. So I'm thinking, I must be doing something fundamentally wrong:
using namespace System.Collections.Concurrent
class TestProducerConsumer
{
[int] $result = 0
[BlockingCollection[int]] $queue =
[BlockingCollection[int]]::new()
[void] producer([int]$i) { $this.queue.Add($i) }
[void] consumer() {
$sum = 0
$it = $this.queue.GetConsumingEnumerable()
foreach( $i in $it ) { $sum += $i }
$this.result = $sum
}
[void] Run() {
$job = Start-ThreadJob { ($using:this).consumer() }
1..10 | ForEach-Object -Parallel {
#($using:this).producer($_) # freezing
($using:this).queue.Add($_) # working
}
#Start-Sleep -Seconds 1 # freezing
$this.queue.CompleteAdding()
$job | Receive-Job -Wait
}
}
$t = [TestProducerConsumer]::new(); $t.Run(); $t
In the simplified test case above, there are two lines doing the same thing: One is getting the queue member from the instance and adding the value directly; the other is calling a method on the instance to add the value to the queue. The former works, the latter freezes!?
Also, adding back in the line with Start-Sleep
freezes the process.
Tested on Windows 10 with various PowerShell 7.* versions.
EDIT: Probably related to ForEach-Object -Parallel situationally drops pipeline input and similar issues