The following may be more of an academic exercise than something useful in real life, as it relies on the Start-ThreadJob
cmdlet, which offers a lightweight, much faster thread-based alternative to the child-process-based regular background jobs created with Start-Job
, which also preserve type fidelity: Given how lightweight such jobs are and how quickly they can be created, creating a new one for every background task - rather than maintaining a single job to which background tasks are delegated - may be good enough.
Start-ThreadJob
comes with PowerShell (Core) 7+ and in Windows PowerShell can be installed on demand with, e.g., Install-Module ThreadJob -Scope CurrentUser
.
Implementing the code below with a child-process-based Start-Job
instance is possible, but would require either an intricate IPC mechanism to communicate work items, or a designated temporary file; either way, PowerShell's CLIXML serialization would have to be used, which invariably entails limited type fidelity for values passed from the caller and the results returned from the job.
The code below:
Defines function Get-WorkerJob
, which returns a custom object containing a thread job (.Job
property) and a synchronized queue (.Queue
property) that the job monitors for work items in a polling loop and processes sequentially.
The caller can add work items, in the form of hashtables with a ScriptBlock
entry and - if values from the caller must be passed - an ArgumentList
entry - note that $using:
references are NOT supported.[1]
The caller is responsible for collecting results via Receive-Job
, and for eventually cleaning up the job with Receive-Job
-Force
(a more graceful way to shut down the worker job could be implemented)
# Function that creates a thread-based worker job, using Start-ThreadJob.
function Get-WorkerJob {
# Create a synchronized queue.
$workerQueue = [System.Collections.Concurrent.ConcurrentQueue[hashtable]]::new()
# Construct and output a custom object containing the queue
# and a thread job that processes the queue.
[pscustomobject] @{
Job = Start-ThreadJob {
$workerQueue = $using:workerQueue
$workItem = $null
do {
while ($workerQueue.TryDequeue([ref] $workItem)) {
# [Console]::WriteLine('got one: ' + $workItem.ScriptBlock)
# To avoid cross-thread issues, the script block must be rebuilt.
# The challenge is that $using references are not supported in local
# script-block invocation. Hence only passing *arguments* is supported.
& $workItem.ScriptBlock.Ast.GetScriptBlock() $workItem.ArgumentList
}
# Queue is empty, sleep a little before checking for new work items.
Start-Sleep -Milliseconds 500
} while ($true)
}
Queue = $workerQueue
}
}
# -- Sample use of the function above.
# Create a thread-based worker job
$worker = Get-WorkerJob
# Pass two sample work items, one without and one with values from the caller.
@{ ScriptBlock = { Get-Date } },
@{ ScriptBlock = { Get-Date -Date $args[0] }; ArgumentList = '1/1/1970' } |
ForEach-Object {
# Send a work item to the worker job.
$worker.Queue.Enqueue($_)
Start-Sleep 1
# Receive, consume, and output the results (so far)
$worker.Job | Receive-Job
}
# Remove the worker job.
$worker.Job | Remove-Job -Force
[1] Adding such support is possible, but is cumbersome and complex - see this answer for what it takes; the code there is also used in refined form in the PSParallelPipeline
module.