Skip to content

Instantly share code, notes, and snippets.

@jpbruckler
Last active February 7, 2025 21:43
Show Gist options
  • Save jpbruckler/dd78471241e372de864c4dd47d120fd1 to your computer and use it in GitHub Desktop.
Save jpbruckler/dd78471241e372de864c4dd47d120fd1 to your computer and use it in GitHub Desktop.
function Invoke-IamWorkflow {
<#
.SYNOPSIS
Invokes one or more worker scripts in parallel using a runspace pool, passing in items and a shared thread-safe state.
.DESCRIPTION
The Invoke-IamWorkflow function takes an array of input objects and an array of worker script names.
It creates a shared ConcurrentDictionary to store global or aggregated state in a thread-safe manner.
Each worker script is executed in its own runspace, When all worker scripts have finished executing,
the function outputs the per-run results as well as the final shared state.
The shared state is available for all workers to read from and write to. This allows workers to share
data between each other, or to aggregate data from multiple runs. When writing to the shared state,
workers should use the $sharedState.AddOrUpdate() method to ensure thread safety.
.PARAMETER InputObject
Specifies the objects to process. Each object is passed to every worker script.
.PARAMETER Workers
Specifies the list of worker script names. Each worker script corresponds to a file named <Worker>.worker.ps1
that is located under $env:ProgramData\PowerIAM\workers.
.EXAMPLE
PS C:\> 1..3 | Invoke-IamWorkflow -Workers 'TestWorker','AnotherWorker'
Demonstrates piping integers 1, 2, and 3 to this function. Each input integer is processed by both
TestWorker.worker.ps1 and AnotherWorker.worker.ps1. The final output includes the pipeline result
and shared state.
.EXAMPLE
PS> $users = Get-ADUser -Filter * -Properties Department | Select-Object -First 10
PS> Invoke-IamWorkflow -InputObject $users -Workers .\Set-Department.ps1, .\Verify-User.ps1
Processes 10 AD users through two worker scripts concurrently, maintaining shared state between executions.
.OUTPUTS
Outputs an object containing:
- Results: An array of pipeline output and errors from each run.
- SharedState: A ConcurrentDictionary object containing any shared data set by the worker scripts.
.NOTES
Thread Safety:
- Uses ConcurrentDictionary for shared state management
- Each runspace has isolated execution context
- Worker scripts should avoid modifying global state
Requires: PowerShell 7 or later
#>
[CmdletBinding()]
param(
[Parameter(Mandatory, ValueFromPipeline)]
[object[]]$InputObject,
[Parameter(Mandatory)]
[string[]]$Workers
)
begin {
# Create thread-safe shared dictionary
$sharedState = [System.Collections.Concurrent.ConcurrentDictionary[string, object]]::new()
# Create runspace pool with initial session state
$initialSessionState = [System.Management.Automation.Runspaces.InitialSessionState]::CreateDefault()
$initialSessionState.ImportPSModule('ActiveDirectory')
$runspacePool = [RunspaceFactory]::CreateRunspacePool($initialSessionState)
$runspacePool.Open()
$runspaces = @()
}
process {
foreach ($item in $InputObject) {
foreach ($worker in $Workers) {
#$scriptPath = Join-Path (Get-PSFConfigValue PowerIAM.Path.DataRoot) -ChildPath 'workers' -AdditionalChildPath "$worker.worker.ps1"
$workerPath = Join-Path $env:ProgramData -ChildPath 'PowerIAM' -AdditionalChildPath 'workers', "$worker.worker.ps1"
# Create PowerShell instance with dedicated variables
$ps = [PowerShell]::Create()
$ps.RunspacePool = $runspacePool
[void]$ps.AddScript({
param($item, $shared, $scriptPath)
# Dot-source the worker script
. $scriptPath
}).AddArgument($item).AddArgument($sharedState).AddArgument((Resolve-Path $workerPath))
# Store async handle with input object reference
$runspaces += [PSCustomObject]@{
PowerShell = $ps
AsyncResult = $ps.BeginInvoke()
Item = $item
Worker = $worker
}
}
}
}
end {
# Process results and clean up
try {
$results = foreach ($rs in $runspaces) {
$output = $rs.PowerShell.EndInvoke($rs.AsyncResult)
[PSCustomObject]@{
Item = $rs.Item
Worker = $rs.Worker
PipelineOutput = $output
Errors = $rs.PowerShell.Streams.Error
}
}
}
finally {
$runspaces | ForEach-Object {
$_.PowerShell.Dispose()
}
$runspacePool.Close()
$runspacePool.Dispose()
}
# Output results and shared state
[PSCustomObject]@{
Results = $results
SharedState = $sharedState
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment