1. <#
  2. .SYNOPSIS
  3. A parallel ForEach that uses runspaces
  4.  
  5. .PARAMETER ScriptBlock
  6. ScriptBlock to execute for each InputObject
  7.  
  8. .PARAMETER ScriptFile
  9. Script file to execute for each InputObject
  10.  
  11. .PARAMETER InputObject
  12. Object(s) to run script against in parallel
  13.  
  14. .PARAMETER Throttle
  15. Maximum number of threads to run at one time.  Default: 5
  16.  
  17. .PARAMETER Timeout
  18. Stop each thread after this many minutes.  Default: 0 (Do not timeout)
  19.  
  20. WARNING:  This parameter should be used as a failsafe only
  21. Set it for roughly the entire duration you expect for all threads to complete
  22.  
  23. .PARAMETER SleepTimer
  24. When looping through open threads, wait this many milliseconds before looping again.  Default: 200
  25.  
  26. .PARAMETER ProgressId
  27. Identity of the progress bar to use for display.  Default: 0
  28.  
  29. .PARAMETER Activity
  30. String to pass to the progress bar for the -Activity parameter. No progress is displayed unless this is specified.  Default: Blank and do not show progress.
  31.  
  32. .EXAMPLE
  33. (1..50) | ForEach-Parallel -Throttle 4 -ScriptBlock { $this; Start-Sleep -Seconds (Get-Random -Minimum 0 -Maximum 5) } -Activity "One to fifty"
  34.  
  35. Send the number 1 through 50 to the scriptblock.  For each, display the number and then sleep for 0 to 5 seconds.  Display a progress bar.  Only execute 4 threads concurrently.
  36.  
  37. .EXAMPLE
  38. Get-Content .\servers.txt | Foreach-Parallel -Throttle 20 -Timeout 10 -SleepTimer 200 -ScriptFile .\query.ps1 -Verbose
  39.  
  40. Run query.ps1 against each computer in the servers.txt file.  Run 20 concurrently, timeout a thread if it takes longer than 10 minutes to run, give verbose output.
  41.  
  42. .FUNCTIONALITY
  43. PowerShell Language
  44.  
  45. .NOTES
  46. Credit to Tome Tanasovski for the original ForEach-Parallel, from which this script has been modified.
  47. http://powertoe.wordpress.com/2012/05/03/foreach-parallel/
  48.  
  49. Added:
  50.  - Progress bar
  51.  - Verbose output
  52.  - Changed $_ to $this within the scriptblock to avoid pipeline contention
  53. #>
  54. [cmdletbinding()]
  55. PARAM (
  56.     [Parameter(Mandatory=$false,position=0,ParameterSetName='ScriptBlock')]
  57.         [scriptblock]$ScriptBlock,
  58.  
  59.     [Parameter(Mandatory=$false,ParameterSetName='ScriptFile')]
  60.     [ValidateScript({test-path $_ -pathtype leaf})]
  61.         $ScriptFile,
  62.  
  63.     [Parameter(Mandatory=$true,ValueFromPipeline=$true)]
  64.         [PSObject]$InputObject,
  65.  
  66.         [int]$Throttle = 5,
  67.  
  68.         [double]$SleepTimer = 200,
  69.  
  70.         [double]$Timeout = 0,
  71.        
  72.         [int]$ProgressId = 0,
  73.        
  74.         [string]$Activity = 'Default'
  75. )
  76.  
  77. BEGIN {
  78.     $stopWatch = [Diagnostics.Stopwatch]::StartNew()
  79.    
  80.     if ($Activity -ne 'Default') { Write-Progress -Id $ProgressId -Activity $Activity -Status "Preparing" -PercentComplete 0 -CurrentOperation "Elapsed time: $($stopWatch.Elapsed.toString().subString(0,8))" }
  81.    
  82.     #Build the scriptblock depending on the parameter used
  83.     switch ($PSCmdlet.ParameterSetName) {
  84.         'ScriptBlock' {$ScriptBlock = $ExecutionContext.InvokeCommand.NewScriptBlock("param(`$this)`r`n" + $Scriptblock.ToString())}
  85.         'ScriptFile' {$scriptblock = [scriptblock]::Create($(get-content $ScriptFile | out-string))}
  86.         Default {Write-Error ("Must provide ScriptBlock or ScriptFile"); Return}
  87.     }
  88.    
  89.     #Define the initial sessionstate, create the runspacepool
  90.     Write-Verbose "Creating runspace pool with $Throttle threads"
  91.     $sessionState = [system.management.automation.runspaces.initialsessionstate]::CreateDefault()
  92.     $pool = [runspacefactory]::CreateRunspacePool(1, $Throttle, $sessionState, $host)
  93.     $pool.open()
  94.    
  95.     #array to hold details on each thread
  96.     $threads = @()
  97.  
  98.     #If inputObject is bound get a total count and set bound to true
  99.     $bound = $false
  100.     if( $PSBoundParameters.ContainsKey("inputObject") ) {
  101.         $bound = $true
  102.         $totalCount = $inputObject.count
  103.     }
  104.    
  105.     if ($Activity -ne 'Default') { Write-Progress -Id $ProgressId -Activity $Activity -Status "Creating threads" -CurrentOperation "Elapsed time: $($stopWatch.Elapsed.toString().subString(0,8))" }
  106.    
  107.     [int]$ProgressTotal = 0 # Total number of threads created
  108.     [int]$ProgressCount = 0 # Number of threads completed
  109. $runtemplate = @'
  110.     #For each pipeline object, create a new powershell instance, add to runspacepool
  111.     $powershell = [powershell]::Create().AddScript($ScriptBlock).AddArgument($InputObject)
  112.     $powershell.runspacepool=$pool
  113.     $startTime = Get-Date
  114.    
  115.     #add references to inputobject, instance, handle and startTime to threads array
  116.     $threads     += New-Object psobject -Property @{
  117.         Object    = $inputObject;
  118.         instance  = $powershell;
  119.         handle    = $powershell.begininvoke();
  120.         startTime = $startTime
  121.     }
  122.    
  123.     $ProgressTotal++
  124.     Write-Verbose "Added $inputobject to the runspacepool at $startTime"
  125.     if ($Activity -ne 'Default') { Write-Progress -Id $ProgressId -Activity $Activity -Status "$ProgressTotal threads created" -CurrentOperation "Elapsed time: $($stopWatch.Elapsed.toString().subString(0,8))" -PercentComplete ([math]::Min($ProgressTotal,95)) }
  126. '@
  127. }
  128.  
  129. PROCESS {
  130.     if($bound) {
  131.         $run = $runtemplate -replace 'inputObject', 'object'
  132.         foreach($object in $inputObject) {
  133.             Invoke-Expression -command $run
  134.         }
  135.     }
  136.     else {
  137.         Invoke-Expression -command $run
  138.     }
  139. }
  140.  
  141. END {
  142.     $notdone       = $true
  143.     $ProgressTotal = $threads.count;
  144.     $ProgressCount = 0
  145.     if ($Activity -ne 'Default') { Write-Progress -Id $ProgressId -Activity $Activity -Status "$ProgressCount of $ProgressTotal threads completed" -CurrentOperation "Elapsed time: $($stopWatch.Elapsed.toString().subString(0,8))" -PercentComplete (100*$ProgressCount/$ProgressTotal) }
  146.    
  147.     #Loop through threads.
  148.     while ($notdone) {
  149.  
  150.         $notdone = $false
  151.  
  152.         for ($i=0; $i -lt $threads.count; $i++) {
  153.             $thread = $threads[$i]
  154.             if ($thread) {
  155.  
  156.                 #If thread is complete, dispose of it.
  157.                 if ($thread.handle.iscompleted) {
  158.                     Write-verbose "Closing thread for $($thread.Object)"
  159.                     $thread.instance.endinvoke($thread.handle)
  160.                     $thread.instance.dispose()
  161.                     $threads[$i] = $null
  162.                     $ProgressCount++
  163.                 }
  164.  
  165.                 #Thread exceeded maxruntime timeout threshold
  166.                 elseif( $Timeout -ne 0 -and ( (get-date) - $thread.startTime ).totalminutes -gt $Timeout ) {
  167.                     Write-Error "Closing thread for $($thread.Object): Thread exceeded $Timeout minute limit" -TargetObject $thread.inputObject
  168.                     $thread.instance.dispose()
  169.                     $threads[$i] = $null
  170.                     $ProgressCount++
  171.                 }
  172.  
  173.                 #Thread is running, loop again!
  174.                 else {
  175.                     $notdone = $true
  176.                 }
  177.             }          
  178.         }
  179.         if ($Activity -ne 'Default') {Write-Progress -Id $ProgressId -Activity $Activity -Status "$ProgressCount of $ProgressTotal threads completed" -CurrentOperation "Elapsed time: $($stopWatch.Elapsed.toString().subString(0,8))" -PercentComplete (100*$ProgressCount/$ProgressTotal) }
  180.        
  181.         Start-Sleep -Milliseconds $SleepTimer
  182.     }
  183.     if ($Activity -ne 'Default') { Write-Progress -Id $ProgressId -Activity $Activity -Status "Completed" -CurrentOperation "Elapsed time: $($stopWatch.Elapsed.toString().subString(0,8))" -Completed }
  184. }