Last active
February 7, 2025 21:43
-
-
Save jpbruckler/dd78471241e372de864c4dd47d120fd1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function Invoke-IamWorkflow { | |
<# | |
.SYNOPSIS | |
Invokes one or more worker scripts in parallel using a runspace pool, passing in items and a shared thread-safe state. | |
.DESCRIPTION | |
The Invoke-IamWorkflow function takes an array of input objects and an array of worker script names. | |
It creates a shared ConcurrentDictionary to store global or aggregated state in a thread-safe manner. | |
Each worker script is executed in its own runspace, When all worker scripts have finished executing, | |
the function outputs the per-run results as well as the final shared state. | |
The shared state is available for all workers to read from and write to. This allows workers to share | |
data between each other, or to aggregate data from multiple runs. When writing to the shared state, | |
workers should use the $sharedState.AddOrUpdate() method to ensure thread safety. | |
.PARAMETER InputObject | |
Specifies the objects to process. Each object is passed to every worker script. | |
.PARAMETER Workers | |
Specifies the list of worker script names. Each worker script corresponds to a file named <Worker>.worker.ps1 | |
that is located under $env:ProgramData\PowerIAM\workers. | |
.EXAMPLE | |
PS C:\> 1..3 | Invoke-IamWorkflow -Workers 'TestWorker','AnotherWorker' | |
Demonstrates piping integers 1, 2, and 3 to this function. Each input integer is processed by both | |
TestWorker.worker.ps1 and AnotherWorker.worker.ps1. The final output includes the pipeline result | |
and shared state. | |
.EXAMPLE | |
PS> $users = Get-ADUser -Filter * -Properties Department | Select-Object -First 10 | |
PS> Invoke-IamWorkflow -InputObject $users -Workers .\Set-Department.ps1, .\Verify-User.ps1 | |
Processes 10 AD users through two worker scripts concurrently, maintaining shared state between executions. | |
.OUTPUTS | |
Outputs an object containing: | |
- Results: An array of pipeline output and errors from each run. | |
- SharedState: A ConcurrentDictionary object containing any shared data set by the worker scripts. | |
.NOTES | |
Thread Safety: | |
- Uses ConcurrentDictionary for shared state management | |
- Each runspace has isolated execution context | |
- Worker scripts should avoid modifying global state | |
Requires: PowerShell 7 or later | |
#> | |
[CmdletBinding()] | |
param( | |
[Parameter(Mandatory, ValueFromPipeline)] | |
[object[]]$InputObject, | |
[Parameter(Mandatory)] | |
[string[]]$Workers | |
) | |
begin { | |
# Create thread-safe shared dictionary | |
$sharedState = [System.Collections.Concurrent.ConcurrentDictionary[string, object]]::new() | |
# Create runspace pool with initial session state | |
$initialSessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault() | |
$initialSessionState.ImportPSModule('ActiveDirectory') | |
$runspacePool = [RunspaceFactory]::CreateRunspacePool($initialSessionState) | |
$runspacePool.Open() | |
$runspaces = @() | |
} | |
process { | |
foreach ($item in $InputObject) { | |
foreach ($worker in $Workers) { | |
#$scriptPath = Join-Path (Get-PSFConfigValue PowerIAM.Path.DataRoot) -ChildPath 'workers' -AdditionalChildPath "$worker.worker.ps1" | |
$workerPath = Join-Path $env:ProgramData -ChildPath 'PowerIAM' -AdditionalChildPath 'workers', "$worker.worker.ps1" | |
# Create PowerShell instance with dedicated variables | |
$ps = [PowerShell]::Create() | |
$ps.RunspacePool = $runspacePool | |
[void]$ps.AddScript({ | |
param($item, $shared, $scriptPath) | |
# Dot-source the worker script | |
. $scriptPath | |
}).AddArgument($item).AddArgument($sharedState).AddArgument((Resolve-Path $workerPath)) | |
# Store async handle with input object reference | |
$runspaces += [PSCustomObject]@{ | |
PowerShell = $ps | |
AsyncResult = $ps.BeginInvoke() | |
Item = $item | |
Worker = $worker | |
} | |
} | |
} | |
} | |
end { | |
# Process results and clean up | |
try { | |
$results = foreach ($rs in $runspaces) { | |
$output = $rs.PowerShell.EndInvoke($rs.AsyncResult) | |
[PSCustomObject]@{ | |
Item = $rs.Item | |
Worker = $rs.Worker | |
PipelineOutput = $output | |
Errors = $rs.PowerShell.Streams.Error | |
} | |
} | |
} | |
finally { | |
$runspaces | ForEach-Object { | |
$_.PowerShell.Dispose() | |
} | |
$runspacePool.Close() | |
$runspacePool.Dispose() | |
} | |
# Output results and shared state | |
[PSCustomObject]@{ | |
Results = $results | |
SharedState = $sharedState | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment