2

I'm curious to test out the performance/usefulness of asynchronous tasks in PowerShell with Start-ThreadJob, Start-Job and Start-Process. I have a folder with about 100 zip files and so came up with the following test:

New-Item "000" -ItemType Directory -Force   # Move the old zip files in here
foreach ($i in $zipfiles) {
    $name = $i -split ".zip"
    Start-Job -scriptblock {
        7z.exe x -o"$name" .\$name
        Move-Item $i 000\ -Force
        7z.exe a $i .\$name\*.*
    }
}

The problem with this is that it would start jobs for all 100 zip, which would probably be too much, so I want to set a value $numjobs, say 5, which I can change, such that only $numjobs will be started at the same time, and then the script will check for all 5 of the jobs ending before the next block of 5 will start. I'd like to then watch the CPU and memory depending upon the value of $numjobs

How would I tell a loop only to run 5 times, then wait for the Jobs to finish before continuing?

I see that it's easy to wait for jobs to finish

$jobs = $commands | Foreach-Object { Start-ThreadJob $_ }
$jobs | Receive-Job -Wait -AutoRemoveJobchange

but how might I wait for Start-Process tasks to end?

Although I would like to use Parallel-ForEach, the Enterprises that I work in will be solidly tied to PowerShell 5.1 for the next 3-4 years I expect with no chance to install PowerShell 7.x (although I would be curious for myself to test with Parallel-ForEach on my home system to compare all approaches).

YorSubs
  • 3,194
  • 7
  • 37
  • 60
  • 1
    You can still get _thread_-based parallelism in WinPS, namely if you install the `ThreadJob` module from the PowerShell Gallery; it supports throttling how many thread run at time. See [this answer](https://stackoverflow.com/a/56612574/45375) for more information. – mklement0 Oct 08 '22 at 13:16
  • 1
    Excellent, so I can run my test on all three, `Start-ThreadJob`, `Start-Job` and `Start-Process` (but I'm guessing that `Start-ThreadJob` will blow away the other two, will be interesting try my throttling test on them all). – YorSubs Oct 08 '22 at 13:20
  • I see that from that answer that waiting for the jobs to end is easy `$jobs = $commands | Foreach-Object { Start-ThreadJob $_ }`, `$jobs | Receive-Job -Wait -AutoRemoveJob` so really I'm just curious how I can stagger my `foreach` loop and how I might wait for `Start-Process` instances to end. – YorSubs Oct 08 '22 at 13:25
  • 1
    I highly doubt you will get better performance doing this in parallel. Zipping in parallel is not recommend iirc – Santiago Squarzon Oct 08 '22 at 14:02
  • ok, I might not get good results from this specific task then, but in principle, I'm quite interested in this structure to try on other things, so will still be interesting / useful. – YorSubs Oct 08 '22 at 14:15
  • 1
    I've added a helper function that may simplify the process of coding your own runspace code a lot, the syntax is more or less similar to `ForEach-Object -Parallel` though it has it's limitations – Santiago Squarzon Oct 08 '22 at 23:57
  • 1
    It's all amazing, from both you and @mklement0. Tons for me to digest here. Very useful and fascinating techniques that I'm going to make good use of, thanks. – YorSubs Oct 09 '22 at 06:22

3 Answers3

4

ForEach-Object -Parallel and Start-ThreadJob have built-in functionalities to limit the number of threads that can run at the same time, the same applies for Runspace with their RunspacePool which is what is used behind the scenes by both cmdlets.

Start-Job does not offer such functionality because each Job runs in a separate process as opposed to the cmdlets mentioned before which run in different threads all in the same process. I would also personally not consider it as a parallelism alternative, it is pretty slow and in most cases a linear loop will be faster than it. Serialization and deserialization can be a problem in some cases too.

How to limit the number of running threads?

Both cmdlets offer the -ThrottleLimit parameter for this.

How would the code look?

$dir = (New-Item "000" -ItemType Directory -Force).FullName

# ForEach-Object -Parallel
$zipfiles | ForEach-Object -Parallel {
    $name = [IO.Path]::GetFileNameWithoutExtension($_)
    7z.exe x -o $name .\$name
    Move-Item $_ $using:dir -Force
    7z.exe a $_ .\$name\*.*
} -ThrottleLimit 5

# Start-ThreadJob
$jobs = foreach ($i in $zipfiles) {
    Start-ThreadJob {
        $name = [IO.Path]::GetFileNameWithoutExtension($using:i)
        7z.exe x -o $name .\$name
        Move-Item $using:i $using:dir -Force
        7z.exe a $using:i .\$name\*.*
    } -ThrottleLimit 5
}
$jobs | Receive-Job -Wait -AutoRemoveJob

How to achieve the same having only PowerShell 5.1 available and no ability to install new modules?

The RunspacePool offer this same functionality, either with it's .SetMaxRunspaces(Int32) Method or by targeting one of the RunspaceFactory.CreateRunspacePool overloads offering a maxRunspaces limit as argument.

How would the code look?

$dir   = (New-Item "000" -ItemType Directory -Force).FullName
$limit = 5
$iss   = [initialsessionstate]::CreateDefault2()
$pool  = [runspacefactory]::CreateRunspacePool(1, $limit, $iss, $Host)
$pool.ThreadOptions = [Management.Automation.Runspaces.PSThreadOptions]::ReuseThread
$pool.Open()

$tasks  = foreach ($i in $zipfiles) {
    $ps = [powershell]::Create().AddScript({
        param($path, $dir)

        $name = [IO.Path]::GetFileNameWithoutExtension($path)
        7z.exe x -o $name .\$name
        Move-Item $path $dir -Force
        7z.exe a $path .\$name\*.*
    }).AddParameters(@{ path = $i; dir = $dir })
    $ps.RunspacePool = $pool

    @{ Instance = $ps; AsyncResult = $ps.BeginInvoke() }
}

foreach($task in $tasks) {
    $task['Instance'].EndInvoke($task['AsyncResult'])
    $task['Instance'].Dispose()
}
$pool.Dispose()

Note that for all examples, it's unclear if the 7zip code is correct or not, this answer attempts to demonstrate how async is done in PowerShell not how to zip files / folders.


Below is a helper function that can simplify the process of parallel invocations, tries to emulate ForEach-Object -Parallel and is compatible with PowerShell 5.1, though shouldn't be taken as a robust solution:

NOTE This Q&A offers a much better and robust alternative to below function.

using namespace System.Management.Automation
using namespace System.Management.Automation.Runspaces
using namespace System.Collections.Generic

function Invoke-Parallel {
    [CmdletBinding()]
    param(
        [Parameter(Mandatory, ValueFromPipeline, DontShow)]
        [object] $InputObject,

        [Parameter(Mandatory, Position = 0)]
        [scriptblock] $ScriptBlock,

        [Parameter()]
        [int] $ThrottleLimit = 5,

        [Parameter()]
        [hashtable] $ArgumentList
    )

    begin {
        $iss = [initialsessionstate]::CreateDefault2()
        if($PSBoundParameters.ContainsKey('ArgumentList')) {
            foreach($argument in $ArgumentList.GetEnumerator()) {
                $iss.Variables.Add([SessionStateVariableEntry]::new($argument.Key, $argument.Value, ''))
            }
        }
        $pool  = [runspacefactory]::CreateRunspacePool(1, $ThrottleLimit, $iss, $Host)
        $tasks = [List[hashtable]]::new()
        $pool.ThreadOptions = [PSThreadOptions]::ReuseThread
        $pool.Open()
    }
    process {
        try {
            $ps = [powershell]::Create().AddScript({
                $args[0].InvokeWithContext($null, [psvariable]::new("_", $args[1]))
            }).AddArgument($ScriptBlock.Ast.GetScriptBlock()).AddArgument($InputObject)

            $ps.RunspacePool = $pool
            $invocationInput = [PSDataCollection[object]]::new(1)
            $invocationInput.Add($InputObject)

            $tasks.Add(@{
                Instance    = $ps
                AsyncResult = $ps.BeginInvoke($invocationInput)
            })
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
    }
    end {
        try {
            foreach($task in $tasks) {
                $task['Instance'].EndInvoke($task['AsyncResult'])
                if($task['Instance'].HadErrors) {
                    $task['Instance'].Streams.Error
                }
                $task['Instance'].Dispose()
            }
        }
        catch {
            $PSCmdlet.WriteError($_)
        }
        finally {
            if($pool) { $pool.Dispose() }
        }
    }
}

An example of how it works:

# Hashtable Key becomes the Variable Name inside the Runspace!
$outsideVariables = @{ Message = 'Hello from {0}' }
0..10 | Invoke-Parallel {
    "[Item $_] - " + $message -f [runspace]::DefaultRunspace.InstanceId
    Start-Sleep 5
} -ArgumentList $outsideVariables -ThrottleLimit 3
Santiago Squarzon
  • 41,465
  • 5
  • 14
  • 37
4

To add to Santiago Squarzon's helpful answer:

Below is helper function Measure-Parallel, which allows you to compare the speed of the following approaches to parallelism:

  • Start-Job:

    • Child-process-based: creates a child PowerShell process behind the scenes, which makes this approach both slow and resource-intensive.
  • Start-ThreadJob - ships with PowerShell (Core) (v6+) ; installable via Install-Module ThreadJob in Windows PowerShell v5.1:

    • Thread-based: Much lighter-weight than Start-Job while providing the same functionality; additionally avoids potential loss of type fidelity due to cross-process serialization / deserialization.
  • ForEach-Object -Parallel - available only in PowerShell (Core) 7.0+:

    • Thread-based: In essence a simplified wrapper around Start-ThreadJob with support for direct pipeline input and direct output, with invariably synchronous overall execution (all launched threads are waited for).
  • Start-Process

    • Child-process-based: Invokes an external program asynchronously by default, on Windows in a new window by default.
    • Note that this approach only makes sense if your parallel tasks only consist of a single call to an external program, as opposed to needing to execute a block of PowerShell code.
    • Notably, the only way to capture output with this approach is by redirection to a file, invariably as text only.

Note:

  • Given that the tests below wrap a single call to an external executable (such as 7z.exe in your case), the Start-Process approach will perform best, because it doesn't have the overhead of job management. However, as noted above, this approach has fundamental limitations.

  • Due to its complexity, the runspace-pool-based approach from Santiago's answer wasn't included; if Start-ThreadJob or ForEach-Object -Parallel are available to you, you won't need to resort to this approach.

Sample Measure-Parallelism call, which contrast the runtime performance of the approaches:

# Run 20 jobs / processes in parallel, 5 at a time, comparing
# all approaches.
# Note: Omit the -Approach argument to enter interactive mode.
Measure-Parallel -Approach All -BatchSize 5 -JobCount 20

Sample output from a macOS machine running PowerShell 7.2.6 (timings vary based on many factors, but the ratios should provide a sense of relative performance):

# ... output from the jobs

JobCount                         : 20
BatchSize                        : 5
BatchCount                       : 4
Start-Job (secs.)                : 2.20
Start-ThreadJob (secs.)          : 1.17
Start-Process (secs.)            : 0.84
ForEach-Object -Parallel (secs.) : 0.94

Conclusions:

  • ForEach-Object -Parallel adds the least thread/job-management overhead, followed by Start-ThreadJob

  • Start-Job, due to needing an extra child process - for the hidden PowerShell instance running each task - is noticeably slower. It seems that on Windows the performance discrepancy is much more pronounced.


Measure-Parallel source code:

  • Important:

    • The function hard-codes sample input objects as well as what external program to invoke - you'll have to edit it yourself as needed; the hard-coded external program is the platform-native shell in this case (cmd.exe on Windows, /bin/sh on Unix-like platform), which is passed a command to simply echo each input object.

      • It wouldn't be too hard to modify the function to accept a script block as an argument, and to receive input objects for the jobs via the pipeline (though that would preclude the Start-Process approach, except if you explicitly call the block via the PowerShell CLI - but in that case Start-Job could just be used).
    • What the jobs / processes output goes directly to the display and cannot be captured.

  • The batch size, which defaults to 5, can be modified with -BatchSize; for the thread-based approaches, the batch size is also used as the -ThrottleLimit argument, i.e. the limit on how many threads are allowed to run at the same time. By default, a single batch is run, but you may request multiple batches indirectly by passing the total number of parallel runs to the -JobCount

  • You can select approaches via the array-valued -Approach parameter, which supports Job, ThreadJob, Process, ForEachParallel, and All, which combines all of the preceding.

    • If -Approach isn't specified, interactive mode is entered, where you're (repeatedly) prompted for the desired approach.
  • Except in interactive mode, a custom object with comparative timings is output.

function Measure-Parallel {

  [CmdletBinding()]
  param(
    [ValidateRange(2, 2147483647)] [int] $BatchSize = 5,
    [ValidateSet('Job', 'ThreadJob', 'Process', 'ForEachParallel', 'All')] [string[]] $Approach,
    [ValidateRange(2, 2147483647)] [int] $JobCount = $BatchSize # pass a higher count to run multiple batches
  )

  $noForEachParallel = $PSVersionTable.PSVersion.Major -lt 7
  $noStartThreadJob = -not (Get-Command -ErrorAction Ignore Start-ThreadJob)

  $interactive = -not $Approach
  if (-not $interactive) {
    # Translate the approach arguments into their corresponding hashtable keys (see below).
    if ('All' -eq $Approach) { $Approach = 'Job', 'ThreadJob', 'Process', 'ForEachParallel' }
    $approaches = $Approach.ForEach({
      if ($_ -eq 'ForEachParallel') { 'ForEach-Object -Parallel' }
      else { $_ -replace '^', 'Start-' }
    })
  }

  if ($noStartThreadJob) {
    if ($interactive -or $approaches -contains 'Start-ThreadJob') {
      Write-Warning "Start-ThreadJob is not installed, omitting its test; install it with ``Install-Module ThreadJob``"
      $approaches = $approaches.Where({ $_ -ne 'Start-ThreadJob' })
    }
  }
  if ($noForEachParallel) {
    if ($interactive -or $approaches -contains 'ForEach-Object -Parallel') {
      Write-Warning "ForEach-Object -Parallel is not available in this PowerShell version (requires v7+), omitting its test."
      $approaches = $approaches.Where({ $_ -ne 'ForEach-Object -Parallel' })
    }
  }

  # Simulated input: Create 'f0.zip', 'f1'.zip', ... file names.
  $zipFiles = 0..($JobCount - 1) -replace '^', 'f' -replace '$', '.zip'

  # Sample executables to run - here, the native shell is called to simply 
  # echo the argument given.
  # The external program to invoke.
  $exe = if ($env:OS -eq 'Windows_NT') { 'cmd.exe' } else { 'sh' }
  # The list of its arguments *as a single string* - use '{0}' as the placeholder for where the input object should go.
  $exeArgList = if ($env:OS -eq 'Windows_NT') { '/c "echo {0}"' } else { '-c "echo {0}"' }

  # A hashtable with script blocks that implement the 3 approaches to parallelism.
  $approachImpl = [ordered] @{}

  $approachImpl['Start-Job'] = { # child-process-based job
    param([array] $batch)
    $batch | 
    ForEach-Object {
      Start-Job { Invoke-Expression ($using:exe + ' ' + ($using:exeArgList -f $args[0])) } -ArgumentList $_
    } |
    Receive-Job -Wait -AutoRemoveJob # wait for all jobs, relay their output, then remove them.
  }

  if (-not $noStartThreadJob) {
    # If Start-ThreadJob is available, add an approach for it.
    $approachImpl['Start-ThreadJob'] = { # thread-based job - requires Install-Module ThreadJob in WinPS
      param([array] $batch)
      $batch |
      ForEach-Object {
        Start-ThreadJob -ThrottleLimit $BatchSize { Invoke-Expression ($using:exe + ' ' + ($using:exeArgList -f $args[0])) } -ArgumentList $_
      } |
      Receive-Job -Wait -AutoRemoveJob
    }
  }

  if (-not $noForEachParallel) {
    # If ForEach-Object -Parallel is supported (v7+), add an approach for it.
    $approachImpl['ForEach-Object -Parallel'] = {  
      param([array] $batch)
      $batch | ForEach-Object -ThrottleLimit $BatchSize -Parallel {
        Invoke-Expression ($using:exe + ' ' + ($using:exeArgList -f $_)) 
      }
    }
  }

  $approachImpl['Start-Process'] = { # direct execution of an external program
    param([array] $batch)
    $batch |
    ForEach-Object {
      Start-Process -NoNewWindow -PassThru $exe -ArgumentList ($exeArgList -f $_)
    } |
    Wait-Process # wait for all processes to terminate.
  }

  # Partition the array of all indices into subarrays (batches)
  $batches = @(
    0..([math]::Ceiling($zipFiles.Count / $batchSize) - 1) | ForEach-Object {
      , $zipFiles[($_ * $batchSize)..($_ * $batchSize + $batchSize - 1)]
    }
  )

  # In interactive use, print verbose messages by default
  if ($interactive) { $VerbosePreference = 'Continue' }

  :menu while ($true) {
    if ($interactive) {
      # Prompt for the approach to use.
      $choices = $approachImpl.Keys.ForEach({
        if ($_ -eq 'ForEach-Object -Parallel') { '&' + $_ }
        else { $_ -replace '-', '-&' }
      }) + '&Quit'
      $choice = $host.ui.PromptForChoice("Approach", "Select parallelism approach:", $choices, 0)
      if ($choice -eq $approachImpl.Count) { break }
      $approachKey = @($approachImpl.Keys)[$choice]
    }
    else {
      # Use the given approach(es)
      $approachKey = $approaches
    }
    $tsTotals = foreach ($appr in $approachKey) {
      $i = 0; $tsTotal = [timespan] 0
      $batches | ForEach-Object {
        $ts = Measure-Command { & $approachImpl[$appr] $_ | Out-Host }
        Write-Verbose "$batchSize-element '$appr' batch finished in $($ts.TotalSeconds.ToString('N2')) secs."
        $tsTotal += $ts
        if (++$i -eq $batches.Count) {
          # last batch processed.
          if ($batches.Count -gt 1) {
            Write-Verbose "'$appr' processing of $JobCount items overall finished in $($tsTotal.TotalSeconds.ToString('N2')) secs." 
          }
          $tsTotal # output the overall timing for this approach
        }
        elseif ($interactive) {
          $choice = $host.ui.PromptForChoice("Continue?", "Select action", ('&Next batch', '&Return to Menu', '&Quit'), 0)
          if ($choice -eq 1) { continue menu }
          if ($choice -eq 2) { break menu }
        }
      }
    }
    if (-not $interactive) {
      # Output a result object with the overall timings.
      $oht = [ordered] @{}; $i = 0
      $oht['JobCount'] = $JobCount
      $oht['BatchSize'] = $BatchSize
      $oht['BatchCount'] = $batches.Count
      foreach ($appr in $approachKey) {        
        $oht[($appr + ' (secs.)')] = $tsTotals[$i++].TotalSeconds.ToString('N2')
      }
      [pscustomobject] $oht
      break # break out of the infinite :menu loop
    }
  }

}
mklement0
  • 382,024
  • 64
  • 607
  • 775
  • nice function! I was thinking if it was worth adding a proxy command around `Start-Job` adding a `-ThrottleLimit` to my answer but still thinking if any effort should go there – Santiago Squarzon Oct 08 '22 at 17:56
  • 1
    Thanks, @Santiago. My guess is that it isn't worth it, unless you're stuck on a pre-v5 version. If `Start-ThreadJob` is available / installable, it'll do everything `Start-Job` does, only better and faster, including a `-ThrottleLimit` parameter. I've always felt that `Start-ThreadJob` isn't getting enough love. – mklement0 Oct 08 '22 at 18:05
  • 1
    And I have always considered `Start-Job` to be useless with `Start-ThreadJob` being available and if say a company doesn't allow installing external modules, the .NET APIs are available so. I guess not worth the trouble – Santiago Squarzon Oct 08 '22 at 18:08
1

You could add a counter to your foreach loop and break if the counter reaches your desired value

$numjobs = 5
$counter = 0
foreach ($i in $zipfiles) {
  $counter++
  if ($counter -ge $numjobs) {
    break 
  }
  <your code>
}

or with Powershells Foreach-Object

$numjobs = 5
$zipfiles | select -first $numjobs | Foreach-Object {
  <your code>
}

If you want to process the whole array in batches and wait for each batch to complete you have to save the object that is returned by Start-Job and pass it to Wait-Job like this:

$items = 1..100

$batchsize = 5

while ($true) {
    $jobs = @()
    $counter = 0
    foreach ($i in $items) {
        if ($counter -ge $batchsize) {
            $items = $items[$batchsize..($items.Length)]
            break 
        }
        $jobs += Start-Job -ScriptBlock { Start-Sleep 10 }
        $counter++
    }
    foreach ($job in $jobs) {
        $job | Wait-Job | Out-Null
    }
    if (!$items) {
        break
    }
}

By design arrays have fixed lengths, that's why I'm rewriting the whole array with $items = $items[$batchsize..($items.Length)]

yxc
  • 96
  • 5
  • I was hoping for it not to break out exactly, but to continue processing, waiting for each batch of `$numjob` jobs to completely finish before continuing with the next group of 5. – YorSubs Oct 08 '22 at 13:22
  • 1
    you can achive this with `Wait-Job` - I will edit my answer to fit your problem – yxc Oct 08 '22 at 13:29