Skip to content

Instantly share code, notes, and snippets.

@DamianEdwards
Last active April 17, 2026 05:34
Show Gist options
  • Select an option

  • Save DamianEdwards/55e0b790c0e012641cdfcf34d9588889 to your computer and use it in GitHub Desktop.

Select an option

Save DamianEdwards/55e0b790c0e012641cdfcf34d9588889 to your computer and use it in GitHub Desktop.
hex1b daemon spawner sample
#!/usr/bin/env dotnet
#:property SuppressTrimAnalysisWarnings=false
#:property TrimmerSingleWarn=false
#:package System.CommandLine@2.0.6
#:package Spectre.Console@0.55.0
#:package Hex1b@0.128.0
using System.CommandLine;
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Channels;
using Hex1b;
using Hex1b.Automation;
using Hex1b.Input;
using Hex1b.Tokens;
using Hex1b.Widgets;
using Spectre.Console;
const string TargetCommand = "copilot";
const string TargetPrompt = "We're just testing remote sessions here in the context of a proof regarding the spawning and management of background instances of `copilot`. Stay running and await further instructions.";
const string TargetWorkingDirectory = @"D:\src\GitHub\DamianEdwards\kusto-cli";
const string DotnetHost = "dotnet";
const int MaxTrackedInstances = 5;
const string ControllerLogFileName = "hex1b-daemon.log";
const int WorkloadLogTailCharacterLimit = 4000;
TimeSpan JoinSocketReadyTimeout = TimeSpan.FromSeconds(30);
string TargetArguments = $"--yolo --remote -i \"{TargetPrompt}\"";
string[] TargetArgumentList = ["--yolo", "--remote", "-i", TargetPrompt];
TimeSpan SecondaryCtrlCDelay = TimeSpan.FromSeconds(1);
TimeSpan StartupTrackingDelay = TimeSpan.FromSeconds(3);
TimeSpan GracefulShutdownTimeout = TimeSpan.FromSeconds(15);
if (!OperatingSystem.IsWindows())
{
AnsiConsole.MarkupLine("[red]This sample is only supported on Windows.[/]");
return 1;
}
return await BuildRootCommand().Parse(args).InvokeAsync();
RootCommand BuildRootCommand()
{
RootCommand root = new("Daemon-style controller for copilot sessions hosted through Hex1b PTY terminals.");
Option<bool> backgroundOption = new("--background")
{
Description = "Run without keyboard input.",
Hidden = true,
};
Command runCommand = new("run", "Run the controller in the foreground and listen for keyboard input.");
runCommand.Add(backgroundOption);
runCommand.SetAction((parseResult, cancellationToken) => RunAsync(parseResult.GetValue(backgroundOption), cancellationToken));
root.Add(runCommand);
Command startCommand = new("start", "Start a background run instance for the current directory.");
startCommand.SetAction((_, cancellationToken) => StartAsync(cancellationToken));
root.Add(startCommand);
Command stopCommand = new("stop", "Stop the background run instance for the current directory.");
stopCommand.SetAction((_, cancellationToken) => StopAsync(cancellationToken));
root.Add(stopCommand);
Command joinCommand = new("join", "Join the running background instance in the current terminal.");
joinCommand.SetAction((_, cancellationToken) => JoinAsync(cancellationToken));
Command joinSessionCommand = new("session", "Join a tracked copilot session hosted by the background controller.");
joinSessionCommand.SetAction((_, cancellationToken) => JoinSessionCommandAsync(cancellationToken));
joinCommand.Add(joinSessionCommand);
root.Add(joinCommand);
root.SetAction(_ =>
{
RenderBanner();
AnsiConsole.MarkupLine("[yellow]Choose one of:[/] [aqua]run[/], [aqua]start[/], [aqua]join[/], [aqua]join session[/], or [aqua]stop[/].");
return 1;
});
return root;
}
async Task<int> RunAsync(bool backgroundMode, CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
string controllerLogPath = GetControllerLogPath(currentDirectory);
string stopEventName = GetStopEventName(currentDirectory);
using FileStream? runLock = TryAcquireRunLock(currentDirectory);
if (runLock is null)
{
AnsiConsole.MarkupLine($"[red]A run instance is already active for[/] [aqua]{Escape(currentDirectory)}[/].");
AnsiConsole.MarkupLine("[yellow]Use[/] [aqua]dotnet hex1bapp.cs -- stop[/] [yellow]to stop it first.[/]");
return 1;
}
CleanupStalePidFile(currentDirectory, pidFilePath);
using EventWaitHandle stopEvent = new(false, EventResetMode.AutoReset, stopEventName);
RenderBanner();
AnsiConsole.MarkupLine($"[green]Hex1b controller running in[/] [aqua]{Escape(currentDirectory)}[/].");
AnsiConsole.MarkupLine($"[grey]Controller log:[/] [aqua]{Escape(controllerLogPath)}[/]");
AppendLog(controllerLogPath, $"Controller started in {currentDirectory} (backgroundMode={backgroundMode}).");
AppendLog(controllerLogPath, $"Target command: {TargetCommand} {TargetArguments}");
AppendLog(controllerLogPath, $"Target working directory: {TargetWorkingDirectory}");
if (backgroundMode)
{
AnsiConsole.MarkupLine("[grey]Background mode is active. Use[/] [aqua]dotnet hex1bapp.cs -- stop[/] [grey]to stop this controller.[/]");
}
else
{
AnsiConsole.MarkupLine($"[grey]Press[/] [aqua]S[/] [grey]to start[/] [aqua]{Escape(TargetCommand)} {Escape(TargetArguments)}[/] [grey]in[/] [aqua]{Escape(TargetWorkingDirectory)}[/][grey],[/] [aqua]1[/]-[aqua]5[/] [grey]to stop an instance,[/] [aqua]J[/] [grey]then[/] [aqua]1[/]-[aqua]5[/] [grey]to join a tracked session terminal, or[/] [aqua]Ctrl+C[/] [grey]to stop everything and exit.[/]");
}
List<TrackedSession> trackedSessions = [];
int nextSessionToken = 1;
using CancellationTokenSource keyboardCts = new();
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, keyboardCts.Token);
using CancellationTokenSource stopMonitorCts = new();
int shutdownRequested = 0;
bool sessionJoinActive = false;
string latestBackgroundStatus = "Background controller is running.";
DaemonJoinServer? joinServer = backgroundMode ? new DaemonJoinServer(GetJoinPipeName(currentDirectory), message => AppendLog(controllerLogPath, message)) : null;
joinServer?.Start();
void RequestShutdown(string message)
{
if (Interlocked.Exchange(ref shutdownRequested, 1) != 0)
{
return;
}
AppendLog(controllerLogPath, $"Shutdown requested: {Markup.Remove(message)}");
latestBackgroundStatus = Markup.Remove(message);
if (joinServer is not null)
{
_ = joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus, isShuttingDown: true));
}
AnsiConsole.MarkupLine(message);
keyboardCts.Cancel();
}
Task stopSignalTask = Task.Run(() =>
{
int signaledIndex = WaitHandle.WaitAny([stopEvent, stopMonitorCts.Token.WaitHandle]);
if (signaledIndex == 0)
{
RequestShutdown("[yellow]Stop signal received. Stopping tracked instances before exit...[/]");
}
}, CancellationToken.None);
ConsoleCancelEventHandler? cancelHandler = null;
cancelHandler = (_, eventArgs) =>
{
eventArgs.Cancel = true;
if (sessionJoinActive)
{
return;
}
RequestShutdown("[yellow]Ctrl+C received. Stopping tracked instances before exit...[/]");
};
Console.CancelKeyPress += cancelHandler;
try
{
if (backgroundMode)
{
try
{
TrackedSession session = await StartTrackedSessionAsync(currentDirectory, nextSessionToken++, controllerLogPath, cancellationToken, enableDiagnostics: false);
trackedSessions.Add(session);
AnsiConsole.MarkupLine($"[green]Auto-started instance[/] [aqua]1[/] [green]for background mode.[/]");
AnsiConsole.MarkupLine($"[grey]Workload log:[/] [aqua]{Escape(session.WorkloadLogPath)}[/]");
AppendLog(controllerLogPath, $"Auto-started session token {session.SessionToken}. Workload log: {session.WorkloadLogPath}");
latestBackgroundStatus = "Auto-started instance 1 for background mode.";
if (joinServer is not null)
{
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus));
}
}
catch (Exception ex) when (ex is Win32Exception or InvalidOperationException or DirectoryNotFoundException)
{
AnsiConsole.MarkupLine($"[red]Failed to auto-start {Escape(TargetCommand)}:[/] {Escape(ex.Message)}");
AppendLog(controllerLogPath, $"Failed to auto-start {TargetCommand}: {ex}");
return 1;
}
RenderTrackedSessions(trackedSessions);
try
{
while (!linkedCts.IsCancellationRequested)
{
List<string> reconcileMessages = await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: false);
if (reconcileMessages.Count > 0)
{
latestBackgroundStatus = reconcileMessages[^1];
if (joinServer is not null)
{
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus));
}
}
if (joinServer is not null)
{
while (joinServer.TryReadRequest(out DaemonJoinRequest? request))
{
latestBackgroundStatus = await ProcessJoinRequestAsync(request, trackedSessions, currentDirectory, controllerLogPath, cancellationToken, RequestShutdown);
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus, request.Command == DaemonJoinCommand.Shutdown));
}
}
await Task.Delay(TimeSpan.FromMilliseconds(250), linkedCts.Token);
}
}
catch (OperationCanceledException) when (keyboardCts.IsCancellationRequested)
{
}
}
else
{
RenderTrackedSessions(trackedSessions);
bool awaitingSessionJoinOrdinal = false;
while (!linkedCts.IsCancellationRequested)
{
await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: true);
ConsoleKeyInfo? keyInfo;
try
{
keyInfo = await AnsiConsole.Console.Input.ReadKeyAsync(true, linkedCts.Token);
}
catch (OperationCanceledException) when (keyboardCts.IsCancellationRequested)
{
break;
}
await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: true);
if (keyInfo is null)
{
continue;
}
ConsoleKeyInfo key = keyInfo.Value;
char keyChar = key.KeyChar;
if (awaitingSessionJoinOrdinal)
{
if (key.Key == ConsoleKey.Escape)
{
awaitingSessionJoinOrdinal = false;
AnsiConsole.MarkupLine("[grey]Session join canceled.[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
if (!char.IsDigit(keyChar))
{
awaitingSessionJoinOrdinal = false;
AnsiConsole.MarkupLine($"[yellow]Expected a session number after[/] [aqua]J[/][yellow], but got[/] [aqua]{Escape(DescribeKey(key))}[/][yellow].[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
awaitingSessionJoinOrdinal = false;
int joinOrdinal = keyChar - '0';
if (joinOrdinal < 1 || joinOrdinal > trackedSessions.Count)
{
AnsiConsole.MarkupLine($"[yellow]No tracked session matches key[/] [aqua]{joinOrdinal}[/].");
RenderTrackedSessions(trackedSessions);
continue;
}
TrackedSession sessionToJoin = trackedSessions[joinOrdinal - 1];
AnsiConsole.MarkupLine($"[grey]Joining tracked session[/] [aqua]{joinOrdinal}[/] [grey](session token {sessionToJoin.SessionToken})[/][grey]. Detach from the session to return here.[/]");
AppendLog(controllerLogPath, $"User requested join for tracked session {joinOrdinal} (session token {sessionToJoin.SessionToken}).");
sessionJoinActive = true;
try
{
await JoinTrackedSessionAsync(currentDirectory, sessionToJoin.SessionToken, cancellationToken);
}
finally
{
sessionJoinActive = false;
}
RenderTrackedSessions(trackedSessions);
continue;
}
if (char.ToUpperInvariant(keyChar) == 'S')
{
if (trackedSessions.Count >= MaxTrackedInstances)
{
AnsiConsole.MarkupLine($"[yellow]Already tracking the maximum of {MaxTrackedInstances} instances.[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
try
{
TrackedSession session = await StartTrackedSessionAsync(currentDirectory, nextSessionToken++, controllerLogPath, cancellationToken, enableDiagnostics: false);
trackedSessions.Add(session);
AnsiConsole.MarkupLine($"[green]Started instance[/] [aqua]{trackedSessions.Count}[/] [grey](session token {session.SessionToken})[/].");
AnsiConsole.MarkupLine($"[grey]Workload log:[/] [aqua]{Escape(session.WorkloadLogPath)}[/]");
AppendLog(controllerLogPath, $"Started tracked session token {session.SessionToken}. Workload log: {session.WorkloadLogPath}");
}
catch (Exception ex) when (ex is Win32Exception or InvalidOperationException or DirectoryNotFoundException)
{
AnsiConsole.MarkupLine($"[red]Failed to start {Escape(TargetCommand)}:[/] {Escape(ex.Message)}");
AppendLog(controllerLogPath, $"Failed to start {TargetCommand}: {ex}");
}
RenderTrackedSessions(trackedSessions);
continue;
}
if (char.ToUpperInvariant(keyChar) == 'J')
{
if (trackedSessions.Count == 0)
{
AnsiConsole.MarkupLine("[yellow]There are no tracked copilot sessions available to join.[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
awaitingSessionJoinOrdinal = true;
AnsiConsole.MarkupLine("[grey]Join mode:[/] [aqua]press 1-5[/] [grey]to join a tracked session, or[/] [aqua]Esc[/] [grey]to cancel.[/]");
continue;
}
if (char.IsDigit(keyChar))
{
int ordinal = keyChar - '0';
if (ordinal < 1 || ordinal > trackedSessions.Count)
{
AnsiConsole.MarkupLine($"[yellow]No tracked instance matches key[/] [aqua]{ordinal}[/].");
RenderTrackedSessions(trackedSessions);
continue;
}
TrackedSession session = trackedSessions[ordinal - 1];
AppendLog(controllerLogPath, $"User requested shutdown of instance {ordinal} (session token {session.SessionToken}).");
StopProcessOutcome outcome = await StopTrackedSessionAsync(session, $"instance {ordinal}", cancellationToken, controllerLogPath);
trackedSessions.RemoveAt(ordinal - 1);
if (outcome == StopProcessOutcome.AlreadyExited)
{
AnsiConsole.MarkupLine($"[grey]Instance {ordinal} was already stopped.[/]");
}
RenderTrackedSessions(trackedSessions);
continue;
}
if (key.Key == ConsoleKey.Escape)
{
AnsiConsole.MarkupLine("[grey]Use Ctrl+C to shut down the controller and all tracked instances.[/]");
continue;
}
AnsiConsole.MarkupLine($"[grey]Ignored key[/] [aqua]{Escape(DescribeKey(key))}[/][grey].[/]");
}
}
}
finally
{
Console.CancelKeyPress -= cancelHandler;
stopMonitorCts.Cancel();
await stopSignalTask;
await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: false);
if (trackedSessions.Count > 0)
{
AnsiConsole.MarkupLine($"[yellow]Stopping {trackedSessions.Count} tracked instance(s)...[/]");
AppendLog(controllerLogPath, $"Stopping {trackedSessions.Count} tracked session(s) during controller shutdown.");
await Task.WhenAll(trackedSessions.Select(session => StopTrackedSessionAsync(session, $"session {session.SessionToken}", CancellationToken.None, controllerLogPath)));
trackedSessions.Clear();
}
if (joinServer is not null)
{
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, "Controller stopped.", isShuttingDown: true));
await joinServer.DisposeAsync();
}
CleanupOwnedPidFile(currentDirectory, pidFilePath);
AppendLog(controllerLogPath, "Controller stopped.");
RenderTrackedSessions(trackedSessions);
AnsiConsole.MarkupLine("[green]Controller stopped.[/]");
}
return 0;
}
async Task<int> StartAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
if (TryReadDaemonState(pidFilePath, out DaemonState? existingState) &&
TryGetVerifiedProcess(existingState!, out Process? existingProcess))
{
using (existingProcess)
{
AnsiConsole.MarkupLine($"[yellow]A background controller is already running for[/] [aqua]{Escape(currentDirectory)}[/] [grey](PID {existingState!.Pid})[/].");
return 1;
}
}
CleanupStalePidFile(currentDirectory, pidFilePath);
SelfLaunchSpec selfLaunchSpec = GetSelfLaunchSpec();
ProcessStartInfo startInfo = new()
{
FileName = selfLaunchSpec.FileName,
WorkingDirectory = currentDirectory,
UseShellExecute = false,
CreateNoWindow = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
};
foreach (string argument in selfLaunchSpec.ArgumentPrefix)
{
startInfo.ArgumentList.Add(argument);
}
startInfo.ArgumentList.Add("run");
startInfo.ArgumentList.Add("--background");
using Process process = Process.Start(startInfo)
?? throw new InvalidOperationException("Failed to start the background controller process.");
await Task.Delay(500, cancellationToken);
if (process.HasExited)
{
string output = (await process.StandardOutput.ReadToEndAsync(cancellationToken)).Trim();
string error = (await process.StandardError.ReadToEndAsync(cancellationToken)).Trim();
string details = string.Join(Environment.NewLine, new[] { output, error }.Where(value => !string.IsNullOrWhiteSpace(value)));
if (string.IsNullOrWhiteSpace(details))
{
details = "The background controller exited before it could finish starting.";
}
AnsiConsole.MarkupLine($"[red]{Escape(details)}[/]");
return 1;
}
DaemonState state = new()
{
Pid = process.Id,
StartedAtUtc = process.StartTime.ToUniversalTime(),
};
WriteDaemonState(pidFilePath, state);
AnsiConsole.MarkupLine($"[green]Started background controller[/] [grey](PID {process.Id})[/] [green]for[/] [aqua]{Escape(currentDirectory)}[/].");
AnsiConsole.MarkupLine($"[grey]State file:[/] [aqua]{Escape(pidFilePath)}[/]");
return 0;
}
async Task<int> StopAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
if (!TryGetActiveDaemonProcess(currentDirectory, out string pidFilePath, out DaemonState? state, out Process? process))
{
return 1;
}
using (process)
{
AnsiConsole.MarkupLine($"[yellow]Sending stop signal to background controller[/] [grey](PID {state!.Pid})[/][yellow]...[/]");
AppendLog(GetControllerLogPath(currentDirectory), $"Stop command requested shutdown of background controller PID {state.Pid}.");
bool signalSent = SignalStopEvent(currentDirectory);
if (!signalSent)
{
AnsiConsole.MarkupLine("[yellow]The controller stop signal was unavailable. Waiting for exit before forcing a kill.[/]");
}
bool exitedGracefully = await WaitForExitAsync(process, GracefulShutdownTimeout, cancellationToken);
if (!exitedGracefully)
{
AnsiConsole.MarkupLine("[red]Background controller did not exit within 15 seconds; killing it.[/]");
process.Kill(entireProcessTree: true);
await process.WaitForExitAsync(cancellationToken);
}
}
DeletePidFileIfPresent(pidFilePath);
AnsiConsole.MarkupLine("[green]Background controller stopped.[/]");
return 0;
}
async Task<int> JoinAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
if (!TryGetActiveDaemonProcess(currentDirectory, out _, out DaemonState? state, out Process? process))
{
return 1;
}
using (process)
{
string pipeName = GetJoinPipeName(currentDirectory);
await using DaemonJoinClient joinClient = new(pipeName, currentDirectory);
bool connected = await joinClient.ConnectAsync(JoinSocketReadyTimeout, cancellationToken);
if (!connected)
{
AnsiConsole.MarkupLine($"[red]No joinable background controller became available for PID {state!.Pid}.[/]");
AnsiConsole.MarkupLine($"[grey]Expected join pipe:[/] [aqua]{Escape(pipeName)}[/]");
return 1;
}
return await joinClient.RunAsync(cancellationToken);
}
}
async Task<int> JoinSessionCommandAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
if (!TryGetActiveDaemonProcess(currentDirectory, out _, out DaemonState? state, out Process? process))
{
return 1;
}
using (process)
{
DaemonJoinState? joinState = await TryReadDaemonJoinStateAsync(GetJoinPipeName(currentDirectory), cancellationToken);
if (joinState is null)
{
AnsiConsole.MarkupLine($"[red]No joinable background controller became available for PID {state!.Pid}.[/]");
return 1;
}
if (joinState.Sessions.Count == 0)
{
AnsiConsole.MarkupLine("[yellow]There are no tracked copilot instances available to join.[/]");
return 1;
}
RenderJoinableDaemonSessions(joinState.Sessions);
int ordinal = PromptForSessionOrdinal(joinState.Sessions);
DaemonJoinTrackedSession selectedSession = joinState.Sessions.Single(session => session.Ordinal == ordinal);
return await JoinTrackedSessionAsync(currentDirectory, selectedSession.SessionToken, cancellationToken);
}
}
async Task<DaemonJoinState?> TryReadDaemonJoinStateAsync(string pipeName, CancellationToken cancellationToken)
{
try
{
using NamedPipeClientStream pipe = new(".", pipeName, PipeDirection.InOut, System.IO.Pipes.PipeOptions.Asynchronous);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(JoinSocketReadyTimeout);
await pipe.ConnectAsync(timeoutCts.Token);
using StreamReader reader = new(pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
string? line = await reader.ReadLineAsync(timeoutCts.Token);
if (string.IsNullOrWhiteSpace(line))
{
return null;
}
return JsonSerializer.Deserialize(line, Hex1bAppJsonSerializerContext.Default.DaemonJoinState);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return null;
}
catch (IOException)
{
return null;
}
catch (JsonException)
{
return null;
}
}
int PromptForSessionOrdinal(IReadOnlyList<DaemonJoinTrackedSession> sessions)
{
HashSet<int> validOrdinals = sessions.Select(session => session.Ordinal).ToHashSet();
TextPrompt<int> prompt = new TextPrompt<int>("[grey]Enter the session number to join:[/]")
.Validate(value => validOrdinals.Contains(value)
? Spectre.Console.ValidationResult.Success()
: Spectre.Console.ValidationResult.Error("[red]Choose one of the listed session numbers.[/]"));
return AnsiConsole.Prompt(prompt);
}
void RenderJoinableDaemonSessions(IReadOnlyList<DaemonJoinTrackedSession> sessions)
{
AppShared.RenderSessionTable(
"Joinable copilot sessions",
sessions.Select(session => (session.Ordinal, session.SessionToken, session.StartedAtUtc)));
}
async Task<int> JoinTrackedSessionAsync(string currentDirectory, int sessionToken, CancellationToken cancellationToken)
{
string pipeName = GetSessionJoinPipeName(currentDirectory, sessionToken);
await using PipeAttachTransport transport = new(pipeName);
await using JoinSessionClient client = new(transport);
AttachTransportResult connectResult = await transport.ConnectAsync(cancellationToken);
if (!connectResult.Success)
{
AnsiConsole.MarkupLine($"[red]Tracked session {sessionToken} is not currently joinable:[/] {Escape(connectResult.Error ?? "Unknown error.")}");
return 1;
}
return await client.RunAsync(connectResult, cancellationToken);
}
async Task<string> ProcessJoinRequestAsync(DaemonJoinRequest request, List<TrackedSession> trackedSessions, string currentDirectory, string controllerLogPath, CancellationToken cancellationToken, Action<string> requestShutdown)
{
switch (request.Command)
{
case DaemonJoinCommand.Start:
if (trackedSessions.Count >= MaxTrackedInstances)
{
return $"Already tracking the maximum of {MaxTrackedInstances} instances.";
}
try
{
int nextSessionToken = trackedSessions.Count == 0 ? 1 : trackedSessions.Max(session => session.SessionToken) + 1;
TrackedSession session = await StartTrackedSessionAsync(currentDirectory, nextSessionToken, controllerLogPath, cancellationToken, enableDiagnostics: false);
trackedSessions.Add(session);
AppendLog(controllerLogPath, $"Join client started tracked session token {session.SessionToken}.");
return $"Started instance {trackedSessions.Count} (session token {session.SessionToken}).";
}
catch (Exception ex) when (ex is Win32Exception or InvalidOperationException or DirectoryNotFoundException)
{
AppendLog(controllerLogPath, $"Join client failed to start {TargetCommand}: {ex}");
return $"Failed to start {TargetCommand}: {ex.Message}";
}
case DaemonJoinCommand.Stop:
if (request.Ordinal is null || request.Ordinal < 1 || request.Ordinal > trackedSessions.Count)
{
return $"No tracked instance matches key {request.Ordinal ?? 0}.";
}
int ordinal = request.Ordinal.Value;
TrackedSession sessionToStop = trackedSessions[ordinal - 1];
StopProcessOutcome outcome = await StopTrackedSessionAsync(sessionToStop, $"instance {ordinal}", cancellationToken, controllerLogPath);
trackedSessions.RemoveAt(ordinal - 1);
return outcome == StopProcessOutcome.AlreadyExited
? $"Instance {ordinal} was already stopped."
: $"Stopped instance {ordinal}.";
case DaemonJoinCommand.Shutdown:
AppendLog(controllerLogPath, "Join client requested controller shutdown.");
requestShutdown("[yellow]Join requested daemon shutdown. Stopping tracked instances before exit...[/]");
return "Joined client requested daemon shutdown.";
default:
return "Join request ignored.";
}
}
async Task<TrackedSession> StartTrackedSessionAsync(string currentDirectory, int sessionToken, string controllerLogPath, CancellationToken cancellationToken, bool enableDiagnostics)
{
if (!Directory.Exists(TargetWorkingDirectory))
{
throw new DirectoryNotFoundException($"The configured repo path does not exist: {TargetWorkingDirectory}");
}
string workloadLogPath = CreateLogPath(sessionToken, "");
string sessionDiagnosticLogPath = CreateLogPath(sessionToken, ".controller");
string sessionJoinPipeName = GetSessionJoinPipeName(currentDirectory, sessionToken);
SessionJoinPresentationFilter sessionJoinFilter = new(sessionJoinPipeName, message => AppendLog(controllerLogPath, message));
AppendLog(sessionDiagnosticLogPath, $"Preparing Hex1b PTY host for {TargetCommand} at {TargetWorkingDirectory}.");
Hex1bTerminal terminal = CreateTargetTerminal(workloadLogPath, sessionJoinFilter, enableDiagnostics);
CancellationTokenSource runCts = new();
Task<TrackedSessionOutcome> completion = RunTrackedSessionAsync(sessionToken, terminal, sessionJoinFilter, runCts.Token, sessionDiagnosticLogPath, controllerLogPath);
TrackedSession session = new(sessionToken, terminal, sessionJoinFilter, sessionJoinPipeName, runCts, completion, workloadLogPath, sessionDiagnosticLogPath, DateTimeOffset.UtcNow);
AppendLog(controllerLogPath, $"Created Hex1b terminal for session token {sessionToken}. Workload log: {workloadLogPath}. Session join pipe: {sessionJoinPipeName}");
await Task.Delay(StartupTrackingDelay, cancellationToken);
if (completion.IsCompleted)
{
TrackedSessionOutcome outcome = await completion;
throw new InvalidOperationException(BuildStartupFailureMessage($"{TargetCommand} exited during startup.", session, outcome));
}
return session;
}
async Task<TrackedSessionOutcome> RunTrackedSessionAsync(int sessionToken, Hex1bTerminal terminal, SessionJoinPresentationFilter sessionJoinFilter, CancellationToken cancellationToken, string sessionDiagnosticLogPath, string controllerLogPath)
{
try
{
int exitCode = await terminal.RunAsync(cancellationToken);
AppendLog(controllerLogPath, $"Session token {sessionToken} exited with code {exitCode}.");
AppendLog(sessionDiagnosticLogPath, $"Hex1b terminal completed with exit code {exitCode}.");
return new TrackedSessionOutcome(exitCode, false, null, DateTimeOffset.UtcNow);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
AppendLog(controllerLogPath, $"Session token {sessionToken} was cancelled.");
AppendLog(sessionDiagnosticLogPath, "Hex1b terminal cancellation requested.");
return new TrackedSessionOutcome(null, true, null, DateTimeOffset.UtcNow);
}
catch (Exception ex)
{
AppendLog(controllerLogPath, $"Session token {sessionToken} faulted: {ex}");
AppendLog(sessionDiagnosticLogPath, $"Hex1b terminal faulted: {ex}");
return new TrackedSessionOutcome(null, false, ex, DateTimeOffset.UtcNow);
}
finally
{
await sessionJoinFilter.DisposeAsync();
await terminal.DisposeAsync();
}
}
async Task<StopProcessOutcome> StopTrackedSessionAsync(TrackedSession session, string label, CancellationToken cancellationToken, string controllerLogPath)
{
if (session.Completion.IsCompleted)
{
AppendLog(controllerLogPath, $"Requested stop for {label} (session token {session.SessionToken}), but it had already exited.");
AppendLog(session.SessionDiagnosticLogPath, $"Controller requested stop for {label}, but the session had already exited.");
return StopProcessOutcome.AlreadyExited;
}
int signalAttempt = 0;
foreach (TimeSpan waitWindow in new[] { SecondaryCtrlCDelay, GracefulShutdownTimeout - SecondaryCtrlCDelay })
{
if (waitWindow <= TimeSpan.Zero)
{
continue;
}
signalAttempt++;
AppendLog(controllerLogPath, $"Sending Hex1b Ctrl+C attempt {signalAttempt} to {label} (session token {session.SessionToken}).");
AppendLog(session.SessionDiagnosticLogPath, $"Controller sending Hex1b Ctrl+C attempt {signalAttempt} to {label}.");
AnsiConsole.MarkupLine($"[yellow]Sending Ctrl+C to[/] [aqua]{Escape(label)}[/] [grey](session token {session.SessionToken})[/][yellow]...[/]");
await SendCtrlCThroughHex1bAsync(session.Terminal, cancellationToken);
bool exitedGracefully = await WaitForSessionExitAsync(session, waitWindow, cancellationToken);
if (exitedGracefully)
{
AppendLog(controllerLogPath, $"Stopped {label} (session token {session.SessionToken}) gracefully.");
AppendLog(session.SessionDiagnosticLogPath, $"Controller observed graceful shutdown after Ctrl+C.");
AnsiConsole.MarkupLine($"[green]Stopped[/] [aqua]{Escape(label)}[/] [grey](session token {session.SessionToken})[/] [green]gracefully.[/]");
return StopProcessOutcome.Graceful;
}
}
AppendLog(controllerLogPath, $"{label} (session token {session.SessionToken}) did not exit after two Ctrl+C attempts; disposing terminal.");
AppendLog(session.SessionDiagnosticLogPath, "Session did not exit after two Ctrl+C attempts; disposing terminal.");
AnsiConsole.MarkupLine($"[red]{Escape(label)} did not exit within 15 seconds after two Ctrl+C attempts; disposing terminal.[/]");
session.RunCts.Cancel();
await session.Terminal.DisposeAsync();
await WaitForSessionExitAsync(session, TimeSpan.FromSeconds(5), CancellationToken.None);
AnsiConsole.MarkupLine($"[green]Disposed[/] [aqua]{Escape(label)}[/] [grey](session token {session.SessionToken})[/].");
AppendLog(controllerLogPath, $"Disposed {label} (session token {session.SessionToken}) after timeout.");
return StopProcessOutcome.Killed;
}
async Task SendCtrlCThroughHex1bAsync(Hex1bTerminal terminal, CancellationToken cancellationToken)
{
Hex1bTerminalInputSequence sequence = new Hex1bTerminalInputSequenceBuilder()
.Key(Hex1bKey.C, Hex1bModifiers.Control)
.Build();
await sequence.ApplyAsync(terminal, cancellationToken);
}
static async Task<bool> WaitForSessionExitAsync(TrackedSession session, TimeSpan timeout, CancellationToken cancellationToken)
{
if (session.Completion.IsCompleted)
{
return true;
}
Task completedTask = await Task.WhenAny(session.Completion, Task.Delay(timeout, cancellationToken));
return completedTask == session.Completion;
}
static async Task<bool> WaitForExitAsync(Process process, TimeSpan timeout, CancellationToken cancellationToken)
{
if (process.HasExited)
{
return true;
}
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
try
{
await process.WaitForExitAsync(timeoutCts.Token);
return true;
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return process.HasExited;
}
}
async Task<List<string>> ReconcileExitedSessionsAsync(List<TrackedSession> trackedSessions, string controllerLogPath, bool echoToConsole)
{
List<TrackedSession> removedSessions = [];
List<string> reconcileMessages = [];
for (int index = trackedSessions.Count - 1; index >= 0; index--)
{
TrackedSession session = trackedSessions[index];
if (!session.Completion.IsCompleted)
{
continue;
}
trackedSessions.RemoveAt(index);
removedSessions.Add(session);
}
removedSessions.Reverse();
if (removedSessions.Count == 0)
{
return reconcileMessages;
}
foreach (TrackedSession removedSession in removedSessions)
{
TrackedSessionOutcome outcome = await removedSession.Completion;
string statusText = DescribeOutcome(outcome);
string message = $"Tracked instance exited on its own (session token {removedSession.SessionToken}). Outcome: {statusText}";
reconcileMessages.Add(message);
AppendLog(controllerLogPath, $"Tracked session token {removedSession.SessionToken} exited on its own. Outcome: {statusText}. Workload log: {removedSession.WorkloadLogPath}");
if (echoToConsole)
{
AnsiConsole.MarkupLine($"[yellow]Tracked instance exited on its own[/] [grey](session token {removedSession.SessionToken})[/][yellow]. Outcome:[/] {Escape(statusText)}");
AnsiConsole.MarkupLine($"[grey]Workload log:[/] [aqua]{Escape(removedSession.WorkloadLogPath)}[/]");
}
string workloadLogTail = ReadWorkloadLogTail(removedSession.WorkloadLogPath);
if (!string.IsNullOrWhiteSpace(workloadLogTail))
{
AppendLog(controllerLogPath, $"Last workload output for session token {removedSession.SessionToken}:{Environment.NewLine}{workloadLogTail}");
if (echoToConsole)
{
AnsiConsole.MarkupLine($"[grey]Last workload output:[/]{Environment.NewLine}{Escape(workloadLogTail)}");
}
}
}
AppendLog(controllerLogPath, $"Tracked instance count is now {trackedSessions.Count}.");
if (echoToConsole)
{
RenderTrackedSessions(trackedSessions);
}
return reconcileMessages;
}
void RenderBanner()
{
Panel panel = new(new Markup("[bold aqua]Hex1b Graceful Process Controller[/]"))
{
Border = BoxBorder.Rounded,
Header = new PanelHeader("daemon-style CLI"),
Expand = false,
};
AnsiConsole.Write(panel);
}
void RenderTrackedSessions(IReadOnlyList<TrackedSession> trackedSessions)
{
if (trackedSessions.Count == 0)
{
AnsiConsole.Write(new Panel(new Markup("[grey]No tracked instances are running.[/]"))
{
Header = new PanelHeader("tracked instances"),
Border = BoxBorder.Rounded,
});
return;
}
AppShared.RenderSessionTable(
"Tracked running instances",
trackedSessions.Select((session, index) => (index + 1, session.SessionToken, session.StartedAtUtc)));
}
static bool TryReadDaemonState(string pidFilePath, out DaemonState? state)
{
state = null;
if (!File.Exists(pidFilePath))
{
return false;
}
try
{
string text = File.ReadAllText(pidFilePath).Trim();
state = JsonSerializer.Deserialize(text, Hex1bAppJsonSerializerContext.Default.DaemonState);
if (state is not null)
{
return true;
}
if (int.TryParse(text, NumberStyles.Integer, CultureInfo.InvariantCulture, out int legacyPid))
{
state = new DaemonState
{
Pid = legacyPid,
StartedAtUtc = default,
};
return true;
}
return false;
}
catch (JsonException)
{
return false;
}
}
static void WriteDaemonState(string pidFilePath, DaemonState state)
{
string json = JsonSerializer.Serialize(state, Hex1bAppJsonSerializerContext.Default.DaemonState);
File.WriteAllText(pidFilePath, json);
}
void CleanupStalePidFile(string currentDirectory, string pidFilePath)
{
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
return;
}
if (TryGetVerifiedProcess(state!, out Process? process))
{
process.Dispose();
return;
}
DeletePidFileIfPresent(pidFilePath);
AnsiConsole.MarkupLine($"[grey]Removed stale pid.json from[/] [aqua]{Escape(currentDirectory)}[/][grey].[/]");
}
void CleanupOwnedPidFile(string currentDirectory, string pidFilePath)
{
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
return;
}
if (state!.Pid != Environment.ProcessId)
{
return;
}
try
{
using Process current = Process.GetCurrentProcess();
DateTimeOffset actualStartedAtUtc = current.StartTime.ToUniversalTime();
if (state.StartedAtUtc == default || state.StartedAtUtc == actualStartedAtUtc)
{
DeletePidFileIfPresent(pidFilePath);
AnsiConsole.MarkupLine($"[grey]Removed pid.json for[/] [aqua]{Escape(currentDirectory)}[/][grey].[/]");
}
}
catch (Win32Exception)
{
}
}
static void DeletePidFileIfPresent(string pidFilePath)
{
try
{
if (File.Exists(pidFilePath))
{
File.Delete(pidFilePath);
}
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
static bool SignalStopEvent(string currentDirectory)
{
if (!OperatingSystem.IsWindows())
{
return false;
}
try
{
using EventWaitHandle stopEvent = EventWaitHandle.OpenExisting(GetStopEventName(currentDirectory));
return stopEvent.Set();
}
catch (WaitHandleCannotBeOpenedException)
{
return false;
}
}
static bool TryGetVerifiedProcess(DaemonState state, [NotNullWhen(true)] out Process? process) =>
TryGetVerifiedProcessById(state.Pid, state.StartedAtUtc, out process);
static bool TryGetVerifiedProcessById(int pid, DateTimeOffset startedAtUtc, [NotNullWhen(true)] out Process? process)
{
process = null;
try
{
process = Process.GetProcessById(pid);
if (process.HasExited)
{
process.Dispose();
process = null;
return false;
}
if (startedAtUtc != default && process.StartTime.ToUniversalTime() != startedAtUtc.UtcDateTime)
{
process.Dispose();
process = null;
return false;
}
return true;
}
catch (ArgumentException)
{
process?.Dispose();
process = null;
return false;
}
catch (InvalidOperationException)
{
process?.Dispose();
process = null;
return false;
}
catch (Win32Exception)
{
process?.Dispose();
process = null;
return false;
}
}
bool TryGetActiveDaemonProcess(
string currentDirectory,
out string pidFilePath,
[NotNullWhen(true)] out DaemonState? state,
[NotNullWhen(true)] out Process? process)
{
pidFilePath = GetPidFilePath(currentDirectory);
state = null;
process = null;
if (!TryReadDaemonState(pidFilePath, out state))
{
AnsiConsole.MarkupLine($"[yellow]No pid.json was found in[/] [aqua]{Escape(currentDirectory)}[/].");
return false;
}
DaemonState verifiedState = state ?? throw new InvalidOperationException("Daemon state should have been loaded.");
if (TryGetVerifiedProcess(verifiedState, out process))
{
state = verifiedState;
return true;
}
CleanupStalePidFile(currentDirectory, pidFilePath);
AnsiConsole.MarkupLine("[yellow]The recorded background controller is no longer running. Removed stale pid.json.[/]");
return false;
}
static string GetPidFilePath(string currentDirectory) => Path.Combine(currentDirectory, "pid.json");
static string GetControllerLogPath(string currentDirectory) => Path.Combine(currentDirectory, ControllerLogFileName);
static string GetStopEventName(string currentDirectory) => $@"Local\Hex1bGracefulProcessShutdown-Stop-{AppShared.ComputeStableHash(currentDirectory)}";
static string GetJoinPipeName(string currentDirectory) => $"Hex1bGracefulProcessShutdown-Join-{AppShared.ComputeStableHash(currentDirectory)}";
static string GetSessionJoinPipeName(string currentDirectory, int sessionToken) => AppShared.GetSessionJoinPipeName(currentDirectory, sessionToken);
static DaemonJoinState BuildJoinState(IReadOnlyList<TrackedSession> trackedSessions, string statusMessage, bool isShuttingDown = false) =>
new()
{
StatusMessage = statusMessage,
IsShuttingDown = isShuttingDown,
Sessions =
[
.. trackedSessions.Select((session, index) => new DaemonJoinTrackedSession
{
Ordinal = index + 1,
SessionToken = session.SessionToken,
StartedAtUtc = session.StartedAtUtc,
})
],
};
static SelfLaunchSpec GetSelfLaunchSpec()
{
string? processPath = Environment.ProcessPath;
if (!string.IsNullOrWhiteSpace(processPath) &&
File.Exists(processPath) &&
!IsDotnetHostPath(processPath))
{
return new SelfLaunchSpec(processPath, []);
}
return new SelfLaunchSpec(DotnetHost, [GetScriptPath(), "--"]);
}
static bool IsDotnetHostPath(string path)
{
string fileName = Path.GetFileName(path);
return string.Equals(fileName, "dotnet", StringComparison.OrdinalIgnoreCase) ||
string.Equals(fileName, "dotnet.exe", StringComparison.OrdinalIgnoreCase);
}
static FileStream? TryAcquireRunLock(string currentDirectory)
{
try
{
string lockDirectoryPath = Path.Combine(Path.GetTempPath(), "Hex1bGracefulProcessShutdown");
Directory.CreateDirectory(lockDirectoryPath);
string lockPath = Path.Combine(lockDirectoryPath, $"{AppShared.ComputeStableHash(currentDirectory)}.lock");
return new FileStream(lockPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None);
}
catch (IOException)
{
return null;
}
}
Hex1bTerminal CreateTargetTerminal(string workloadLogPath, SessionJoinPresentationFilter sessionJoinFilter, bool enableDiagnostics)
{
Hex1bTerminalBuilder builder = Hex1bTerminal.CreateBuilder()
.WithDimensions(120, 40)
.WithHeadless()
.WithWorkloadLogging(workloadLogPath)
.AddPresentationFilter(sessionJoinFilter);
if (enableDiagnostics)
{
builder = builder.WithDiagnostics($"hex1bapp-background-{Environment.ProcessId}", forceEnable: true);
}
return builder
.WithPtyProcess(options =>
{
options.FileName = TargetCommand;
options.Arguments = new List<string>(TargetArgumentList);
options.WorkingDirectory = TargetWorkingDirectory;
options.WindowsPtyMode = WindowsPtyMode.RequireProxy;
options.InheritEnvironment = true;
})
.Build();
}
static string CreateLogPath(int sessionToken, string suffix)
{
string logDirectoryPath = Path.Combine(Path.GetTempPath(), "Hex1bGracefulProcessShutdown", "logs");
Directory.CreateDirectory(logDirectoryPath);
return Path.Combine(logDirectoryPath, $"copilot-session-{sessionToken}-{DateTimeOffset.UtcNow:yyyyMMddHHmmssfff}{suffix}.log");
}
static string DescribeOutcome(TrackedSessionOutcome outcome)
{
if (outcome.Failure is not null)
{
return $"faulted: {outcome.Failure.Message}";
}
if (outcome.Cancelled)
{
return "cancelled";
}
return $"exit code {outcome.ExitCode ?? 0}";
}
static string BuildStartupFailureMessage(string prefix, TrackedSession session, TrackedSessionOutcome outcome)
{
string details = DescribeOutcome(outcome);
string workloadLogTail = ReadWorkloadLogTail(session.WorkloadLogPath);
string sessionDiagnosticLogTail = ReadWorkloadLogTail(session.SessionDiagnosticLogPath);
if (!string.IsNullOrWhiteSpace(sessionDiagnosticLogTail))
{
details = $"{details}{Environment.NewLine}Controller session log:{Environment.NewLine}{sessionDiagnosticLogTail}";
}
if (string.IsNullOrWhiteSpace(workloadLogTail))
{
return $"{prefix} Outcome: {details}";
}
return $"{prefix} Outcome: {details}{Environment.NewLine}Workload log:{Environment.NewLine}{workloadLogTail}";
}
static void AppendLog(string path, string message)
{
string line = $"[{DateTimeOffset.Now:yyyy-MM-dd HH:mm:ss.fff}] {message}{Environment.NewLine}";
TryWriteLogFile(path, line, append: true);
}
static bool TryWriteLogFile(string path, string content, bool append)
{
string? directoryPath = Path.GetDirectoryName(path);
if (!string.IsNullOrWhiteSpace(directoryPath))
{
Directory.CreateDirectory(directoryPath);
}
for (int attempt = 0; attempt < 5; attempt++)
{
try
{
FileMode fileMode = append ? FileMode.Append : FileMode.Create;
using FileStream stream = new(path, fileMode, FileAccess.Write, FileShare.ReadWrite | FileShare.Delete);
using StreamWriter writer = new(stream, Encoding.UTF8);
writer.Write(content);
writer.Flush();
return true;
}
catch (IOException)
{
if (attempt == 4)
{
return false;
}
Thread.Sleep(25);
}
catch (UnauthorizedAccessException)
{
if (attempt == 4)
{
return false;
}
Thread.Sleep(25);
}
}
return false;
}
static string ReadWorkloadLogTail(string workloadLogPath)
{
try
{
if (!File.Exists(workloadLogPath))
{
return string.Empty;
}
string logContent = File.ReadAllText(workloadLogPath).Trim();
if (string.IsNullOrWhiteSpace(logContent))
{
return string.Empty;
}
if (logContent.Length <= WorkloadLogTailCharacterLimit)
{
return logContent;
}
return $"...{logContent[^WorkloadLogTailCharacterLimit..]}";
}
catch (IOException)
{
return string.Empty;
}
catch (UnauthorizedAccessException)
{
return string.Empty;
}
}
static string Escape(string value) => Markup.Escape(value);
static string DescribeKey(ConsoleKeyInfo key) => AppShared.DescribeKey(key);
static string GetScriptPath([CallerFilePath] string path = "") => path;
static class AppShared
{
public static string DescribeKey(ConsoleKeyInfo key) =>
key.KeyChar == '\0' ? key.Key.ToString() : key.KeyChar.ToString();
public static string ComputeStableHash(string value)
{
byte[] bytes = SHA256.HashData(Encoding.UTF8.GetBytes(value.ToUpperInvariant()));
return Convert.ToHexString(bytes);
}
public static string GetSessionJoinPipeName(string currentDirectory, int sessionToken) =>
$"Hex1bGracefulProcessShutdown-Session-{ComputeStableHash(currentDirectory)}-{sessionToken.ToString(CultureInfo.InvariantCulture)}";
public static void RenderSessionTable(string title, IEnumerable<(int Ordinal, int SessionToken, DateTimeOffset StartedAtUtc)> sessions)
{
Table table = new();
table.Border = TableBorder.Rounded;
table.Title = new TableTitle(title);
table.AddColumn("#");
table.AddColumn("Session token");
table.AddColumn("Started (local)");
foreach ((int ordinal, int sessionToken, DateTimeOffset startedAtUtc) in sessions)
{
table.AddRow(
ordinal.ToString(CultureInfo.InvariantCulture),
sessionToken.ToString(CultureInfo.InvariantCulture),
startedAtUtc.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture));
}
AnsiConsole.Write(table);
}
}
sealed record DaemonState
{
public required int Pid { get; init; }
public required DateTimeOffset StartedAtUtc { get; init; }
}
sealed record AttachSocketRequest
{
[JsonPropertyName("method")]
public required string Method { get; init; }
}
sealed record AttachSocketResponse
{
[JsonPropertyName("success")]
public bool Success { get; init; }
[JsonPropertyName("width")]
public int? Width { get; init; }
[JsonPropertyName("height")]
public int? Height { get; init; }
[JsonPropertyName("leader")]
public bool? Leader { get; init; }
[JsonPropertyName("data")]
public string? Data { get; init; }
[JsonPropertyName("error")]
public string? Error { get; init; }
}
enum DaemonJoinCommand
{
Snapshot,
Start,
Stop,
Shutdown,
}
sealed record DaemonJoinRequest
{
public required DaemonJoinCommand Command { get; init; }
public int? Ordinal { get; init; }
}
sealed record DaemonJoinTrackedSession
{
public required int Ordinal { get; init; }
public required int SessionToken { get; init; }
public required DateTimeOffset StartedAtUtc { get; init; }
}
sealed record DaemonJoinState
{
public required string StatusMessage { get; init; }
public required bool IsShuttingDown { get; init; }
public required List<DaemonJoinTrackedSession> Sessions { get; init; }
}
sealed class TrackedSession(
int sessionToken,
Hex1bTerminal terminal,
SessionJoinPresentationFilter sessionJoinFilter,
string sessionJoinPipeName,
CancellationTokenSource runCts,
Task<TrackedSessionOutcome> completion,
string workloadLogPath,
string sessionDiagnosticLogPath,
DateTimeOffset startedAtUtc)
{
public int SessionToken { get; } = sessionToken;
public Hex1bTerminal Terminal { get; } = terminal;
public SessionJoinPresentationFilter SessionJoinFilter { get; } = sessionJoinFilter;
public string SessionJoinPipeName { get; } = sessionJoinPipeName;
public CancellationTokenSource RunCts { get; } = runCts;
public Task<TrackedSessionOutcome> Completion { get; } = completion;
public string WorkloadLogPath { get; } = workloadLogPath;
public string SessionDiagnosticLogPath { get; } = sessionDiagnosticLogPath;
public DateTimeOffset StartedAtUtc { get; } = startedAtUtc;
}
sealed record TrackedSessionOutcome(int? ExitCode, bool Cancelled, Exception? Failure, DateTimeOffset EndedAtUtc);
sealed record SelfLaunchSpec(string FileName, IReadOnlyList<string> ArgumentPrefix);
[JsonSerializable(typeof(DaemonState))]
[JsonSerializable(typeof(AttachSocketRequest))]
[JsonSerializable(typeof(AttachSocketResponse))]
[JsonSerializable(typeof(DaemonJoinRequest))]
[JsonSerializable(typeof(DaemonJoinState))]
sealed partial class Hex1bAppJsonSerializerContext : JsonSerializerContext;
enum StopProcessOutcome
{
Graceful,
Killed,
AlreadyExited,
}
sealed class DaemonJoinServer(string pipeName, Action<string> log) : IAsyncDisposable
{
private readonly string _pipeName = pipeName;
private readonly Action<string> _log = log;
private readonly CancellationTokenSource _cts = new();
private readonly Channel<DaemonJoinRequest> _requests = Channel.CreateUnbounded<DaemonJoinRequest>();
private readonly object _connectionLock = new();
private DaemonJoinConnection? _connection;
private DaemonJoinState _latestState = new()
{
StatusMessage = "Background controller is starting.",
IsShuttingDown = false,
Sessions = [],
};
private Task? _acceptLoop;
public void Start()
{
_log($"Join server starting on pipe '{_pipeName}'.");
_acceptLoop = AcceptLoopAsync(_cts.Token);
}
public bool TryReadRequest([NotNullWhen(true)] out DaemonJoinRequest? request) => _requests.Reader.TryRead(out request);
public async Task PublishStateAsync(DaemonJoinState state)
{
_latestState = state;
DaemonJoinConnection? connection;
lock (_connectionLock)
{
connection = _connection;
}
if (connection is not null)
{
_log($"Join server publishing state: '{state.StatusMessage}' (sessions={state.Sessions.Count}, shuttingDown={state.IsShuttingDown}).");
await connection.SendStateAsync(state);
}
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
if (_acceptLoop is not null)
{
try
{
await _acceptLoop;
}
catch (OperationCanceledException)
{
}
}
DaemonJoinConnection? connection;
lock (_connectionLock)
{
connection = _connection;
_connection = null;
}
if (connection is not null)
{
await connection.DisposeAsync();
}
_cts.Dispose();
}
private async Task AcceptLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
_log($"Join server waiting for connection on pipe '{_pipeName}'.");
NamedPipeServerStream server = new(
_pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous);
try
{
await server.WaitForConnectionAsync(cancellationToken);
_log("Join server accepted a client connection.");
}
catch (OperationCanceledException)
{
await server.DisposeAsync();
break;
}
DaemonJoinConnection connection = new(server, _latestState, _requests.Writer, ClearConnectionAsync, _log);
lock (_connectionLock)
{
_connection = connection;
}
await connection.RunAsync(cancellationToken);
}
}
private ValueTask ClearConnectionAsync(DaemonJoinConnection connection)
{
lock (_connectionLock)
{
if (ReferenceEquals(_connection, connection))
{
_connection = null;
}
}
return ValueTask.CompletedTask;
}
}
sealed class DaemonJoinConnection(
NamedPipeServerStream pipe,
DaemonJoinState initialState,
ChannelWriter<DaemonJoinRequest> requestWriter,
Func<DaemonJoinConnection, ValueTask> onClosed,
Action<string> log) : IAsyncDisposable
{
private readonly NamedPipeServerStream _pipe = pipe;
private readonly DaemonJoinState _initialState = initialState;
private readonly ChannelWriter<DaemonJoinRequest> _requestWriter = requestWriter;
private readonly Func<DaemonJoinConnection, ValueTask> _onClosed = onClosed;
private readonly Action<string> _log = log;
private readonly SemaphoreSlim _writeLock = new(1, 1);
private readonly StreamReader _reader = new(pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
private readonly StreamWriter _writer = new(pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
private bool _disposed;
public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
_log($"Join connection sending initial state '{_initialState.StatusMessage}' with {_initialState.Sessions.Count} session(s).");
await SendStateAsync(_initialState);
while (!cancellationToken.IsCancellationRequested)
{
string? line = await _reader.ReadLineAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(line))
{
break;
}
DaemonJoinRequest? request = JsonSerializer.Deserialize(line, Hex1bAppJsonSerializerContext.Default.DaemonJoinRequest);
if (request is null)
{
continue;
}
_log($"Join connection received request {request.Command} (ordinal={request.Ordinal?.ToString(CultureInfo.InvariantCulture) ?? "null"}).");
await _requestWriter.WriteAsync(request, cancellationToken);
if (request.Command == DaemonJoinCommand.Shutdown)
{
break;
}
}
}
catch (OperationCanceledException)
{
}
catch (IOException)
{
_log("Join connection ended because the pipe closed.");
}
finally
{
_log("Join connection closing.");
await DisposeAsync();
await _onClosed(this);
}
}
public async Task SendStateAsync(DaemonJoinState state)
{
if (_disposed)
{
return;
}
await _writeLock.WaitAsync();
try
{
if (_disposed)
{
return;
}
string payload = JsonSerializer.Serialize(state, Hex1bAppJsonSerializerContext.Default.DaemonJoinState);
_log($"Join connection writing state payload: '{state.StatusMessage}' (sessions={state.Sessions.Count}, shuttingDown={state.IsShuttingDown}).");
await _writer.WriteLineAsync(payload);
}
catch (IOException)
{
_log("Join connection write failed because the pipe closed.");
await DisposeAsync();
}
finally
{
_writeLock.Release();
}
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
_reader.Dispose();
await _writer.DisposeAsync();
await _pipe.DisposeAsync();
_writeLock.Dispose();
}
}
sealed class DaemonJoinClient(string pipeName, string currentDirectory) : IAsyncDisposable
{
private readonly string _pipeName = pipeName;
private readonly string _currentDirectory = currentDirectory;
private readonly Channel<DaemonJoinState> _stateUpdates = Channel.CreateUnbounded<DaemonJoinState>();
private NamedPipeClientStream? _pipe;
private StreamReader? _reader;
private StreamWriter? _writer;
private DaemonJoinState _state = new()
{
StatusMessage = "Connecting to background controller...",
IsShuttingDown = false,
Sessions = [],
};
private bool _awaitingSessionJoinOrdinal;
private bool _sessionJoinActive;
private string? _localStatusMessage;
public async Task<bool> ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
try
{
_pipe = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, System.IO.Pipes.PipeOptions.Asynchronous);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
await _pipe.ConnectAsync(timeoutCts.Token);
_reader = new StreamReader(_pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
_writer = new StreamWriter(_pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
return true;
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return false;
}
catch (TimeoutException)
{
return false;
}
catch (IOException)
{
return false;
}
}
public async Task<int> RunAsync(CancellationToken cancellationToken)
{
if (_reader is null || _writer is null)
{
return 1;
}
using CancellationTokenSource joinCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
ConsoleCancelEventHandler? cancelHandler = null;
cancelHandler = (_, eventArgs) =>
{
eventArgs.Cancel = true;
if (_sessionJoinActive)
{
return;
}
joinCts.Cancel();
};
Console.CancelKeyPress += cancelHandler;
Task listenTask = ListenAsync(joinCts.Token);
try
{
RenderState();
Task<ConsoleKeyInfo?> keyTask = AnsiConsole.Console.Input.ReadKeyAsync(true, joinCts.Token);
Task<DaemonJoinState> stateTask = _stateUpdates.Reader.ReadAsync(joinCts.Token).AsTask();
while (!joinCts.IsCancellationRequested)
{
Task completedTask;
try
{
completedTask = await Task.WhenAny(keyTask, stateTask);
}
catch (OperationCanceledException)
{
break;
}
if (completedTask == stateTask)
{
try
{
_state = await stateTask;
}
catch (OperationCanceledException)
{
break;
}
catch (ChannelClosedException)
{
break;
}
if (!_awaitingSessionJoinOrdinal)
{
_localStatusMessage = null;
}
RenderState();
if (_state.IsShuttingDown)
{
joinCts.Cancel();
break;
}
stateTask = _stateUpdates.Reader.ReadAsync(joinCts.Token).AsTask();
continue;
}
ConsoleKeyInfo? keyInfo;
try
{
keyInfo = await keyTask;
}
catch (OperationCanceledException)
{
break;
}
keyTask = AnsiConsole.Console.Input.ReadKeyAsync(true, joinCts.Token);
if (keyInfo is null)
{
continue;
}
ConsoleKeyInfo key = keyInfo.Value;
char keyChar = key.KeyChar;
if (_awaitingSessionJoinOrdinal)
{
if (key.Key == ConsoleKey.Escape)
{
_awaitingSessionJoinOrdinal = false;
_localStatusMessage = "Session join canceled.";
RenderState();
continue;
}
if (!char.IsDigit(keyChar))
{
_awaitingSessionJoinOrdinal = false;
_localStatusMessage = $"Expected a session number after J, but got {AppShared.DescribeKey(key)}.";
RenderState();
continue;
}
_awaitingSessionJoinOrdinal = false;
int ordinal = keyChar - '0';
DaemonJoinTrackedSession? sessionToJoin = _state.Sessions.FirstOrDefault(session => session.Ordinal == ordinal);
if (sessionToJoin is null)
{
_localStatusMessage = $"No tracked session matches key {ordinal}.";
RenderState();
continue;
}
_localStatusMessage = $"Joining tracked session {ordinal} (session token {sessionToJoin.SessionToken}). Detach from it to return here.";
RenderState();
await using PipeAttachTransport transport = new(AppShared.GetSessionJoinPipeName(_currentDirectory, sessionToJoin.SessionToken));
AttachTransportResult joinResult = await transport.ConnectAsync(joinCts.Token);
if (!joinResult.Success)
{
_localStatusMessage = $"Tracked session {ordinal} is no longer joinable: {joinResult.Error ?? "Unknown error."}";
RenderState();
continue;
}
await using JoinSessionClient sessionClient = new(transport);
_sessionJoinActive = true;
try
{
await sessionClient.RunAsync(joinResult, joinCts.Token);
}
finally
{
_sessionJoinActive = false;
}
_localStatusMessage = $"Returned from tracked session {ordinal}.";
RenderState();
continue;
}
if (char.ToUpperInvariant(keyChar) == 'S')
{
await SendRequestAsync(new DaemonJoinRequest { Command = DaemonJoinCommand.Start });
continue;
}
if (char.ToUpperInvariant(keyChar) == 'J')
{
if (_state.Sessions.Count == 0)
{
_localStatusMessage = "There are no tracked copilot sessions available to join.";
}
else
{
_awaitingSessionJoinOrdinal = true;
_localStatusMessage = "Join mode: press 1-5 to choose a tracked session, or Esc to cancel.";
}
RenderState();
continue;
}
if (char.IsDigit(keyChar))
{
await SendRequestAsync(new DaemonJoinRequest
{
Command = DaemonJoinCommand.Stop,
Ordinal = keyChar - '0',
});
continue;
}
if (key.Key == ConsoleKey.Escape)
{
joinCts.Cancel();
break;
}
}
}
finally
{
Console.CancelKeyPress -= cancelHandler;
joinCts.Cancel();
try
{
await listenTask;
}
catch
{
}
}
return 0;
}
public async ValueTask DisposeAsync()
{
if (_writer is not null)
{
await _writer.DisposeAsync();
_writer = null;
}
_reader?.Dispose();
_reader = null;
if (_pipe is not null)
{
await _pipe.DisposeAsync();
_pipe = null;
}
}
private async Task ListenAsync(CancellationToken cancellationToken)
{
if (_reader is null)
{
_stateUpdates.Writer.TryComplete();
return;
}
try
{
while (!cancellationToken.IsCancellationRequested)
{
string? line;
try
{
line = await _reader.ReadLineAsync(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (IOException)
{
break;
}
if (string.IsNullOrWhiteSpace(line))
{
break;
}
DaemonJoinState? state = JsonSerializer.Deserialize(line, Hex1bAppJsonSerializerContext.Default.DaemonJoinState);
if (state is null)
{
continue;
}
await _stateUpdates.Writer.WriteAsync(state, cancellationToken);
if (state.IsShuttingDown)
{
break;
}
}
}
finally
{
_stateUpdates.Writer.TryComplete();
}
}
private Task SendRequestAsync(DaemonJoinRequest request)
{
if (_writer is null)
{
return Task.CompletedTask;
}
string payload = JsonSerializer.Serialize(request, Hex1bAppJsonSerializerContext.Default.DaemonJoinRequest);
return _writer.WriteLineAsync(payload);
}
private void RenderState()
{
AnsiConsole.Clear();
Panel panel = new(new Markup("[bold aqua]Hex1b Graceful Process Controller[/]"))
{
Border = BoxBorder.Rounded,
Header = new PanelHeader("joined background daemon"),
Expand = false,
};
AnsiConsole.Write(panel);
AnsiConsole.MarkupLine($"[green]Joined background controller in[/] [aqua]{Markup.Escape(_currentDirectory)}[/].");
if (_awaitingSessionJoinOrdinal)
{
AnsiConsole.MarkupLine("[grey]Join mode is active:[/] [aqua]press 1-5[/] [grey]to join a tracked session, or[/] [aqua]Esc[/]/[aqua]Ctrl+C[/] [grey]to cancel or detach.[/]");
}
else
{
AnsiConsole.MarkupLine("[grey]Press[/] [aqua]S[/] [grey]to start an instance,[/] [aqua]1[/]-[aqua]5[/] [grey]to stop an instance,[/] [aqua]J[/] [grey]then[/] [aqua]1[/]-[aqua]5[/] [grey]to join a tracked session, or[/] [aqua]Esc[/]/[aqua]Ctrl+C[/] [grey]to detach.[/]");
}
AnsiConsole.MarkupLine($"[grey]Status:[/] {Markup.Escape(_localStatusMessage ?? _state.StatusMessage)}");
if (_state.Sessions.Count == 0)
{
AnsiConsole.Write(new Panel(new Markup("[grey]No tracked instances are running.[/]"))
{
Header = new PanelHeader("tracked instances"),
Border = BoxBorder.Rounded,
});
return;
}
AppShared.RenderSessionTable(
"Tracked running instances",
_state.Sessions.Select(session => (session.Ordinal, session.SessionToken, session.StartedAtUtc)));
}
}
sealed class SessionJoinPresentationFilter(string pipeName, Action<string> log) : ITerminalAwarePresentationFilter, IAsyncDisposable
{
private static readonly Action<Hex1bTerminal, int, int>? ResizeWithWorkloadInvoker = CreateResizeWithWorkloadInvoker();
private readonly string _pipeName = pipeName;
private readonly Action<string> _log = log;
private readonly CancellationTokenSource _cts = new();
private readonly List<SessionJoinAttachSession> _sessions = [];
private readonly object _attachLock = new();
private SessionJoinAttachSession? _leaderSession;
private bool _mouseTrackingEnabled;
private bool _sgrMouseModeEnabled;
private bool _bracketedPasteEnabled;
private Hex1bTerminal? _terminal;
private Task? _acceptLoop;
private bool _started;
private bool _disposed;
public void SetTerminal(Hex1bTerminal terminal) => _terminal = terminal;
public ValueTask OnSessionStartAsync(int width, int height, DateTimeOffset timestamp, CancellationToken ct = default)
{
StartListening();
return ValueTask.CompletedTask;
}
public ValueTask<IReadOnlyList<AnsiToken>> OnOutputAsync(IReadOnlyList<AppliedToken> appliedTokens, TimeSpan elapsed, CancellationToken ct = default)
{
IReadOnlyList<AnsiToken> tokens = [.. appliedTokens.Select(token => token.Token)];
foreach (AnsiToken token in tokens)
{
if (token is PrivateModeToken privateMode)
{
switch (privateMode.Mode)
{
case 1000 or 1002 or 1003:
_mouseTrackingEnabled = privateMode.Enable;
break;
case 1006:
_sgrMouseModeEnabled = privateMode.Enable;
break;
case 2004:
_bracketedPasteEnabled = privateMode.Enable;
break;
}
}
}
lock (_attachLock)
{
if (_sessions.Count > 0)
{
byte[] ansi = Encoding.UTF8.GetBytes(AnsiTokenSerializer.Serialize(tokens));
if (ansi.Length > 0)
{
AttachTransportFrame frame = AttachTransportFrame.Output(ansi);
for (int index = _sessions.Count - 1; index >= 0; index--)
{
if (!_sessions[index].Channel.Writer.TryWrite(frame))
{
_sessions.RemoveAt(index);
}
}
}
}
}
return ValueTask.FromResult(tokens);
}
public ValueTask OnInputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnResizeAsync(int width, int height, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnSessionEndAsync(TimeSpan elapsed, CancellationToken ct = default) => DisposeAsync();
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
await _cts.CancelAsync();
if (_acceptLoop is not null)
{
try
{
await _acceptLoop;
}
catch (OperationCanceledException)
{
}
}
List<SessionJoinAttachSession> sessionsToClose;
lock (_attachLock)
{
sessionsToClose = [.. _sessions];
_sessions.Clear();
_leaderSession = null;
}
foreach (SessionJoinAttachSession session in sessionsToClose)
{
session.Channel.Writer.TryWrite(AttachTransportFrame.Exit());
session.Channel.Writer.TryComplete();
}
_cts.Dispose();
}
private void StartListening()
{
if (_started || _disposed)
{
return;
}
_started = true;
_log($"Session join server starting on pipe '{_pipeName}'.");
_acceptLoop = AcceptLoopAsync(_cts.Token);
}
private async Task AcceptLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
NamedPipeServerStream server = new(
_pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous);
try
{
await server.WaitForConnectionAsync(cancellationToken);
_log($"Session join server accepted a client on '{_pipeName}'.");
await HandleConnectionAsync(server, cancellationToken);
}
catch (OperationCanceledException)
{
await server.DisposeAsync();
break;
}
catch (IOException)
{
await server.DisposeAsync();
}
}
}
private async Task HandleConnectionAsync(NamedPipeServerStream pipe, CancellationToken cancellationToken)
{
await using (pipe)
{
await using StreamWriter writer = new(pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
using StreamReader reader = new(pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
try
{
string? requestLine = await reader.ReadLineAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(requestLine))
{
return;
}
AttachSocketRequest? request = JsonSerializer.Deserialize(requestLine, Hex1bAppJsonSerializerContext.Default.AttachSocketRequest);
if (!string.Equals(request?.Method, "attach", StringComparison.OrdinalIgnoreCase))
{
AttachSocketResponse errorResponse = new() { Success = false, Error = "Only attach is supported for session joins." };
await writer.WriteLineAsync(JsonSerializer.Serialize(errorResponse, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse).AsMemory(), cancellationToken);
return;
}
await using SessionJoinAttachSession session = CreateAttachSession();
AttachSocketResponse attachResponse = new()
{
Success = true,
Width = session.Width,
Height = session.Height,
Leader = session.IsLeader,
Data = session.InitialScreen,
};
await writer.WriteLineAsync(JsonSerializer.Serialize(attachResponse, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse).AsMemory(), cancellationToken);
using CancellationTokenSource detachCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task outputTask = StreamOutputFromSessionAsync(session, writer, detachCts.Token);
Task inputTask = StreamInputToSessionAsync(session, reader, detachCts);
await Task.WhenAny(outputTask, inputTask);
await detachCts.CancelAsync();
try
{
await Task.WhenAll(outputTask, inputTask);
}
catch (OperationCanceledException)
{
}
}
catch (IOException)
{
}
}
}
private SessionJoinAttachSession CreateAttachSession()
{
if (_terminal is null)
{
throw new InvalidOperationException("Terminal not initialized.");
}
Channel<AttachTransportFrame> channel = Channel.CreateBounded<AttachTransportFrame>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false,
});
using var snapshot = _terminal.CreateSnapshot();
string initialAnsi = snapshot.ToAnsi(new TerminalAnsiOptions
{
IncludeClearScreen = true,
IncludeTrailingNewline = true,
});
SessionJoinAttachSession session = new(this, channel, snapshot.Width, snapshot.Height, false, initialAnsi);
lock (_attachLock)
{
_sessions.Add(session);
if (_leaderSession is null)
{
_leaderSession = session;
session.IsLeader = true;
}
}
string modeReplay = BuildModeReplaySequence();
if (modeReplay.Length > 0)
{
session.Channel.Writer.TryWrite(AttachTransportFrame.Output(Encoding.UTF8.GetBytes(modeReplay)));
}
return session;
}
private static async Task StreamOutputFromSessionAsync(SessionJoinAttachSession session, StreamWriter writer, CancellationToken cancellationToken)
{
try
{
await foreach (AttachTransportFrame frame in session.Frames.ReadAllAsync(cancellationToken))
{
switch (frame.Kind)
{
case AttachTransportFrameKind.Output:
await writer.WriteLineAsync($"o:{Convert.ToBase64String(frame.Data.Span)}".AsMemory(), cancellationToken);
break;
case AttachTransportFrameKind.Resize:
(int width, int height) = frame.GetResize();
await writer.WriteLineAsync($"r:{width},{height}".AsMemory(), cancellationToken);
break;
case AttachTransportFrameKind.LeaderChanged:
await writer.WriteLineAsync($"leader:{(frame.GetIsLeader() ? "true" : "false")}".AsMemory(), cancellationToken);
break;
case AttachTransportFrameKind.Exit:
await writer.WriteLineAsync("exit".AsMemory(), cancellationToken);
return;
}
}
}
catch (OperationCanceledException)
{
}
}
private async Task StreamInputToSessionAsync(SessionJoinAttachSession session, StreamReader reader, CancellationTokenSource detachCts)
{
try
{
while (!detachCts.Token.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync(detachCts.Token);
if (line is null || string.Equals(line, "detach", StringComparison.Ordinal))
{
await detachCts.CancelAsync();
return;
}
if (string.Equals(line, "shutdown", StringComparison.Ordinal))
{
await detachCts.CancelAsync();
return;
}
if (line.StartsWith("i:", StringComparison.Ordinal))
{
await session.SendInputAsync(Convert.FromBase64String(line[2..]));
continue;
}
if (line.StartsWith("r:", StringComparison.Ordinal))
{
string[] parts = line[2..].Split(',');
if (parts.Length == 2 &&
int.TryParse(parts[0], NumberStyles.Integer, CultureInfo.InvariantCulture, out int width) &&
int.TryParse(parts[1], NumberStyles.Integer, CultureInfo.InvariantCulture, out int height))
{
await session.SendResizeAsync(width, height);
}
continue;
}
if (string.Equals(line, "lead", StringComparison.Ordinal))
{
await session.ClaimLeadAsync();
}
}
}
catch (OperationCanceledException)
{
}
catch (IOException)
{
await detachCts.CancelAsync();
}
}
internal Task SendInputFromSessionAsync(SessionJoinAttachSession session, byte[] data) =>
_terminal?.SendInputAsync(data) ?? Task.CompletedTask;
internal Task SendResizeFromSessionAsync(SessionJoinAttachSession session, int width, int height)
{
if (_terminal is null)
{
return Task.CompletedTask;
}
bool isLeader;
lock (_attachLock)
{
isLeader = _leaderSession == session;
}
if (!isLeader)
{
return Task.CompletedTask;
}
ResizeTerminalAndWorkload(_terminal, width, height);
AttachTransportFrame frame = AttachTransportFrame.Resize(width, height);
lock (_attachLock)
{
foreach (SessionJoinAttachSession attachedSession in _sessions)
{
if (!ReferenceEquals(attachedSession, session))
{
attachedSession.Channel.Writer.TryWrite(frame);
}
}
}
return Task.CompletedTask;
}
[DynamicDependency(DynamicallyAccessedMemberTypes.NonPublicMethods, typeof(Hex1bTerminal))]
private static Action<Hex1bTerminal, int, int>? CreateResizeWithWorkloadInvoker()
{
MethodInfo? method = typeof(Hex1bTerminal).GetMethod(
"ResizeWithWorkload",
BindingFlags.Instance | BindingFlags.NonPublic,
binder: null,
types: [typeof(int), typeof(int)],
modifiers: null);
if (method is null)
{
return null;
}
return (terminal, width, height) => method.Invoke(terminal, [width, height]);
}
private static void ResizeTerminalAndWorkload(Hex1bTerminal terminal, int width, int height)
{
if (ResizeWithWorkloadInvoker is { } invoker)
{
invoker(terminal, width, height);
return;
}
terminal.Resize(width, height);
}
internal Task ClaimLeadFromSessionAsync(SessionJoinAttachSession session)
{
SessionJoinAttachSession? oldLeader;
lock (_attachLock)
{
oldLeader = _leaderSession;
_leaderSession = session;
session.IsLeader = true;
}
if (oldLeader is not null && !ReferenceEquals(oldLeader, session))
{
oldLeader.IsLeader = false;
oldLeader.Channel.Writer.TryWrite(AttachTransportFrame.LeaderChanged(false));
}
session.Channel.Writer.TryWrite(AttachTransportFrame.LeaderChanged(true));
return Task.CompletedTask;
}
internal void RemoveSession(SessionJoinAttachSession session)
{
lock (_attachLock)
{
_sessions.Remove(session);
if (ReferenceEquals(_leaderSession, session))
{
_leaderSession = _sessions.Count > 0 ? _sessions[0] : null;
if (_leaderSession is not null)
{
_leaderSession.IsLeader = true;
_leaderSession.Channel.Writer.TryWrite(AttachTransportFrame.LeaderChanged(true));
}
}
}
}
private string BuildModeReplaySequence()
{
StringBuilder builder = new();
if (_mouseTrackingEnabled)
{
builder.Append("\x1b[?1000h");
builder.Append("\x1b[?1002h");
builder.Append("\x1b[?1003h");
}
if (_sgrMouseModeEnabled)
{
builder.Append("\x1b[?1006h");
}
if (_bracketedPasteEnabled)
{
builder.Append("\x1b[?2004h");
}
return builder.ToString();
}
}
sealed class SessionJoinAttachSession : IAsyncDisposable
{
private readonly SessionJoinPresentationFilter _filter;
private bool _disposed;
public SessionJoinAttachSession(SessionJoinPresentationFilter filter, Channel<AttachTransportFrame> channel, int width, int height, bool isLeader, string? initialScreen)
{
_filter = filter;
Channel = channel;
IsLeader = isLeader;
Width = width;
Height = height;
InitialScreen = initialScreen;
}
public ChannelReader<AttachTransportFrame> Frames => Channel.Reader;
public Channel<AttachTransportFrame> Channel { get; }
public bool IsLeader { get; set; }
public int Width { get; }
public int Height { get; }
public string? InitialScreen { get; }
public Task SendInputAsync(byte[] data) => _filter.SendInputFromSessionAsync(this, data);
public Task SendResizeAsync(int width, int height) => _filter.SendResizeFromSessionAsync(this, width, height);
public Task ClaimLeadAsync() => _filter.ClaimLeadFromSessionAsync(this);
public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_filter.RemoveSession(this);
Channel.Writer.TryComplete();
return ValueTask.CompletedTask;
}
}
enum AttachTransportFrameKind
{
Output,
Resize,
LeaderChanged,
Exit,
}
readonly record struct AttachTransportFrame(AttachTransportFrameKind Kind, ReadOnlyMemory<byte> Data)
{
public static AttachTransportFrame Output(ReadOnlyMemory<byte> data) => new(AttachTransportFrameKind.Output, data);
public static AttachTransportFrame Resize(int width, int height)
{
byte[] bytes = new byte[8];
BitConverter.TryWriteBytes(bytes.AsSpan(0, 4), width);
BitConverter.TryWriteBytes(bytes.AsSpan(4, 4), height);
return new AttachTransportFrame(AttachTransportFrameKind.Resize, bytes);
}
public static AttachTransportFrame LeaderChanged(bool isLeader) =>
new(AttachTransportFrameKind.LeaderChanged, new byte[] { isLeader ? (byte)1 : (byte)0 });
public static AttachTransportFrame Exit() => new(AttachTransportFrameKind.Exit, default);
public (int Width, int Height) GetResize()
{
ReadOnlySpan<byte> span = Data.Span;
return (BitConverter.ToInt32(span[..4]), BitConverter.ToInt32(span[4..8]));
}
public bool GetIsLeader() => !Data.IsEmpty && Data.Span[0] != 0;
}
sealed record AttachTransportResult(
bool Success,
int Width,
int Height,
bool IsLeader,
string? InitialScreen,
string? Error);
interface IAttachTransport : IAsyncDisposable
{
Task<AttachTransportResult> ConnectAsync(CancellationToken cancellationToken);
Task SendInputAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken);
Task SendResizeAsync(int width, int height, CancellationToken cancellationToken);
Task ClaimLeadAsync(CancellationToken cancellationToken);
Task DetachAsync(CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
IAsyncEnumerable<AttachTransportFrame> ReadFramesAsync(CancellationToken cancellationToken);
}
sealed class PipeAttachTransport(string pipeName) : IAttachTransport
{
private readonly string _pipeName = pipeName;
private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(30);
private NamedPipeClientStream? _pipe;
private StreamReader? _reader;
private StreamWriter? _writer;
public async Task<AttachTransportResult> ConnectAsync(CancellationToken cancellationToken)
{
try
{
_pipe = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, System.IO.Pipes.PipeOptions.Asynchronous);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(ConnectTimeout);
await _pipe.ConnectAsync(timeoutCts.Token);
_reader = new StreamReader(_pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
_writer = new StreamWriter(_pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
AttachSocketRequest request = new() { Method = "attach" };
string requestJson = JsonSerializer.Serialize(request, Hex1bAppJsonSerializerContext.Default.AttachSocketRequest);
await _writer.WriteLineAsync(requestJson.AsMemory(), timeoutCts.Token);
string? responseLine = await _reader.ReadLineAsync(timeoutCts.Token);
if (string.IsNullOrWhiteSpace(responseLine))
{
return new AttachTransportResult(false, 0, 0, false, null, "Empty response from session host.");
}
AttachSocketResponse? response = JsonSerializer.Deserialize(responseLine, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse);
if (response is null)
{
return new AttachTransportResult(false, 0, 0, false, null, "Failed to deserialize session attach response.");
}
if (!response.Success)
{
return new AttachTransportResult(false, 0, 0, false, null, response.Error ?? "Session attach failed.");
}
return new AttachTransportResult(true, response.Width ?? 0, response.Height ?? 0, response.Leader ?? false, response.Data, null);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return new AttachTransportResult(false, 0, 0, false, null, "Timed out waiting for the session join pipe.");
}
catch (IOException ex)
{
return new AttachTransportResult(false, 0, 0, false, null, ex.Message);
}
catch (JsonException ex)
{
return new AttachTransportResult(false, 0, 0, false, null, ex.Message);
}
}
public Task SendInputAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) =>
WriteLineAsync($"i:{Convert.ToBase64String(data.Span)}", cancellationToken);
public Task SendResizeAsync(int width, int height, CancellationToken cancellationToken) =>
WriteLineAsync($"r:{width},{height}", cancellationToken);
public Task ClaimLeadAsync(CancellationToken cancellationToken) =>
WriteLineAsync("lead", cancellationToken);
public Task DetachAsync(CancellationToken cancellationToken) =>
WriteLineAsync("detach", cancellationToken);
public Task ShutdownAsync(CancellationToken cancellationToken) =>
WriteLineAsync("shutdown", cancellationToken);
public async IAsyncEnumerable<AttachTransportFrame> ReadFramesAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
if (_reader is null)
{
yield break;
}
while (!cancellationToken.IsCancellationRequested)
{
string? line;
try
{
line = await _reader.ReadLineAsync(cancellationToken);
}
catch (OperationCanceledException)
{
yield break;
}
catch (IOException)
{
yield break;
}
if (line is null)
{
yield break;
}
if (line.StartsWith("o:", StringComparison.Ordinal))
{
yield return AttachTransportFrame.Output(Convert.FromBase64String(line[2..]));
continue;
}
if (line.StartsWith("r:", StringComparison.Ordinal))
{
string[] parts = line[2..].Split(',');
if (parts.Length == 2 &&
int.TryParse(parts[0], NumberStyles.Integer, CultureInfo.InvariantCulture, out int width) &&
int.TryParse(parts[1], NumberStyles.Integer, CultureInfo.InvariantCulture, out int height))
{
yield return AttachTransportFrame.Resize(width, height);
}
continue;
}
if (line.StartsWith("leader:", StringComparison.Ordinal))
{
yield return AttachTransportFrame.LeaderChanged(string.Equals(line[7..], "true", StringComparison.OrdinalIgnoreCase));
continue;
}
if (string.Equals(line, "exit", StringComparison.OrdinalIgnoreCase))
{
yield return AttachTransportFrame.Exit();
yield break;
}
}
}
public async ValueTask DisposeAsync()
{
if (_writer is not null)
{
await _writer.DisposeAsync();
_writer = null;
}
_reader?.Dispose();
_reader = null;
if (_pipe is not null)
{
await _pipe.DisposeAsync();
_pipe = null;
}
}
private Task WriteLineAsync(string line, CancellationToken cancellationToken)
{
if (_writer is null)
{
return Task.CompletedTask;
}
return _writer.WriteLineAsync(line.AsMemory(), cancellationToken);
}
}
sealed class JoinSessionClient(IAttachTransport transport) : IAsyncDisposable
{
private readonly IAttachTransport _transport = transport;
private readonly Pipe _outputPipe = new();
private int _initialScreenReplayStarted;
private bool _isLeader;
private bool _shutdownRequested;
private int _remoteWidth;
private int _remoteHeight;
private int _displayWidth;
private int _displayHeight;
private Hex1bTerminal? _terminal;
private CancellationTokenSource? _sessionCts;
public async Task<int> RunAsync(AttachTransportResult connectResult, CancellationToken cancellationToken)
{
if (!connectResult.Success)
{
AnsiConsole.MarkupLine($"[red]Failed to join the terminal session:[/] {Markup.Escape(connectResult.Error ?? "Unknown error.")}");
return 1;
}
_isLeader = connectResult.IsLeader;
_remoteWidth = connectResult.Width;
_remoteHeight = connectResult.Height;
bool preSizedToDisplay = await TryApplyInitialResizeAsync(cancellationToken);
InputInterceptStream inputIntercept = new(this);
StreamWorkloadAdapter workload = new(_outputPipe.Reader.AsStream(), inputIntercept)
{
Width = _remoteWidth,
Height = _remoteHeight,
};
_terminal = Hex1bTerminal.CreateBuilder()
.WithDimensions(_remoteWidth, _remoteHeight)
.WithWorkload(workload)
.AddPresentationFilter(new ResizeFilter(this))
.AddWorkloadFilter(new CtrlCDetachFilter(this))
.Build();
_sessionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
TaskCompletionSource firstRemoteOutput = new(TaskCreationOptions.RunContinuationsAsynchronously);
Task networkOutputTask = PumpNetworkOutputAsync(firstRemoteOutput, _sessionCts.Token);
if (!preSizedToDisplay && !string.IsNullOrEmpty(connectResult.InitialScreen))
{
await WriteInitialScreenAsync(connectResult.InitialScreen, _sessionCts.Token);
}
else if (preSizedToDisplay && !string.IsNullOrEmpty(connectResult.InitialScreen))
{
_ = Task.Run(
async () =>
{
try
{
Task completed = await Task.WhenAny(firstRemoteOutput.Task, Task.Delay(750, _sessionCts.Token));
if (completed != firstRemoteOutput.Task)
{
await WriteInitialScreenAsync(connectResult.InitialScreen, _sessionCts.Token);
}
}
catch (OperationCanceledException)
{
}
});
}
ConsoleCancelEventHandler cancelHandler = (_, args) =>
{
args.Cancel = true;
RequestStop();
};
Console.CancelKeyPress += cancelHandler;
try
{
await _terminal.RunAsync(_sessionCts.Token);
}
catch (OperationCanceledException)
{
}
finally
{
if (_sessionCts is not null)
{
await _sessionCts.CancelAsync();
}
if (_shutdownRequested)
{
try
{
await _transport.ShutdownAsync(CancellationToken.None);
}
catch
{
}
}
else
{
try
{
await _transport.DetachAsync(CancellationToken.None);
}
catch
{
}
}
_outputPipe.Writer.Complete();
try
{
await networkOutputTask;
}
catch
{
}
Console.CancelKeyPress -= cancelHandler;
}
return 0;
}
public async ValueTask DisposeAsync()
{
if (_sessionCts is not null)
{
await _sessionCts.CancelAsync();
_sessionCts.Dispose();
_sessionCts = null;
}
if (_terminal is not null)
{
await _terminal.DisposeAsync();
_terminal = null;
}
}
internal async Task SendInputAsync(byte[] buffer, int offset, int count)
{
await _transport.SendInputAsync(buffer.AsMemory(offset, count), CancellationToken.None);
}
internal async Task HandleCommandByteAsync(byte commandByte)
{
switch (commandByte)
{
case (byte)'c':
case (byte)'C':
await _transport.SendInputAsync(new byte[] { 0x03 }, CancellationToken.None);
break;
case (byte)'d':
case (byte)'D':
RequestStop();
break;
case (byte)'l':
case (byte)'L':
await _transport.ClaimLeadAsync(CancellationToken.None);
break;
case (byte)'q':
case (byte)'Q':
_shutdownRequested = true;
RequestStop();
break;
case 0x1D:
await _transport.SendInputAsync(new byte[] { 0x1D }, CancellationToken.None);
break;
}
}
internal void HandleLocalCtrlCIntercept(int count)
{
if (count <= 0)
{
return;
}
RequestStop();
}
private async Task WriteInitialScreenAsync(string? initialScreen, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(initialScreen))
{
return;
}
if (Interlocked.Exchange(ref _initialScreenReplayStarted, 1) != 0)
{
return;
}
byte[] initialBytes = Encoding.UTF8.GetBytes(initialScreen);
await _outputPipe.Writer.WriteAsync(initialBytes, cancellationToken);
await _outputPipe.Writer.FlushAsync(cancellationToken);
}
private async Task PumpNetworkOutputAsync(TaskCompletionSource firstRemoteOutput, CancellationToken cancellationToken)
{
try
{
await foreach (AttachTransportFrame frame in _transport.ReadFramesAsync(cancellationToken))
{
switch (frame.Kind)
{
case AttachTransportFrameKind.Output:
await _outputPipe.Writer.WriteAsync(frame.Data, cancellationToken);
await _outputPipe.Writer.FlushAsync(cancellationToken);
firstRemoteOutput.TrySetResult();
break;
case AttachTransportFrameKind.Resize:
(int width, int height) = frame.GetResize();
_remoteWidth = width;
_remoteHeight = height;
_terminal?.Resize(width, height);
break;
case AttachTransportFrameKind.LeaderChanged:
_isLeader = frame.GetIsLeader();
if (_isLeader)
{
await SendResizeForCurrentDisplayAsync();
}
break;
case AttachTransportFrameKind.Exit:
RequestStop();
return;
}
}
}
catch (OperationCanceledException)
{
}
catch
{
RequestStop();
}
finally
{
firstRemoteOutput.TrySetResult();
_outputPipe.Writer.Complete();
}
}
private async Task HandleDisplayResizeAsync(int displayWidth, int displayHeight)
{
_displayWidth = displayWidth;
_displayHeight = displayHeight;
if (_isLeader)
{
await SendResizeForCurrentDisplayAsync();
}
}
private async Task<bool> TryApplyInitialResizeAsync(CancellationToken cancellationToken)
{
if (!_isLeader)
{
return false;
}
(int Width, int Height)? displaySize = TryGetCurrentDisplaySize();
if (displaySize is null)
{
return false;
}
_displayWidth = displaySize.Value.Width;
_displayHeight = displaySize.Value.Height;
(int Width, int Height)? targetSize = CalculateResizeTarget(_displayWidth, _displayHeight, _remoteWidth, _remoteHeight);
if (targetSize is null)
{
return false;
}
_remoteWidth = targetSize.Value.Width;
_remoteHeight = targetSize.Value.Height;
await _transport.SendResizeAsync(_remoteWidth, _remoteHeight, cancellationToken);
return true;
}
private async Task SendResizeForCurrentDisplayAsync()
{
(int Width, int Height)? targetSize = CalculateResizeTarget(_displayWidth, _displayHeight, _remoteWidth, _remoteHeight);
if (targetSize is null)
{
return;
}
_remoteWidth = targetSize.Value.Width;
_remoteHeight = targetSize.Value.Height;
await _transport.SendResizeAsync(_remoteWidth, _remoteHeight, CancellationToken.None);
}
private static (int Width, int Height)? CalculateResizeTarget(int displayWidth, int displayHeight, int remoteWidth, int remoteHeight)
{
int targetWidth = displayWidth;
int targetHeight = displayHeight;
if (targetWidth < 1 || targetHeight < 1)
{
return null;
}
if (targetWidth == remoteWidth && targetHeight == remoteHeight)
{
return null;
}
return (targetWidth, targetHeight);
}
private static (int Width, int Height)? TryGetCurrentDisplaySize()
{
try
{
int width = Console.WindowWidth;
int height = Console.WindowHeight;
return width > 0 && height > 0 ? (width, height) : null;
}
catch (IOException)
{
return null;
}
catch (InvalidOperationException)
{
return null;
}
}
private void RequestStop()
{
if (_sessionCts is { IsCancellationRequested: false } sessionCts)
{
_ = sessionCts.CancelAsync();
}
}
private sealed class ResizeFilter(JoinSessionClient client) : IHex1bTerminalPresentationFilter
{
public ValueTask OnSessionStartAsync(int width, int height, DateTimeOffset timestamp, CancellationToken ct = default) =>
new(client.HandleDisplayResizeAsync(width, height));
public ValueTask<IReadOnlyList<AnsiToken>> OnOutputAsync(IReadOnlyList<AppliedToken> appliedTokens, TimeSpan elapsed, CancellationToken ct = default) =>
ValueTask.FromResult<IReadOnlyList<AnsiToken>>([.. appliedTokens.Select(token => token.Token)]);
public ValueTask OnInputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnResizeAsync(int width, int height, TimeSpan elapsed, CancellationToken ct = default) =>
new(client.HandleDisplayResizeAsync(width, height));
public ValueTask OnSessionEndAsync(TimeSpan elapsed, CancellationToken ct = default) => default;
}
private sealed class CtrlCDetachFilter(JoinSessionClient client) : IHex1bTerminalWorkloadFilter
{
public ValueTask OnSessionStartAsync(int width, int height, DateTimeOffset timestamp, CancellationToken ct = default) => default;
public ValueTask OnOutputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnFrameCompleteAsync(TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnInputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default)
{
int ctrlCCount = tokens.OfType<ControlCharacterToken>().Count(token => token.Character == '\u0003');
if (ctrlCCount > 0)
{
client.HandleLocalCtrlCIntercept(ctrlCCount);
}
return default;
}
public ValueTask OnResizeAsync(int width, int height, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnSessionEndAsync(TimeSpan elapsed, CancellationToken ct = default) => default;
}
private sealed class InputInterceptStream(JoinSessionClient client) : Stream
{
private bool _inCommandMode;
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Flush()
{
}
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) =>
WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
const byte CtrlC = 0x03;
const byte CtrlRightBracket = 0x1D;
int start = offset;
for (int index = offset; index < offset + count; index++)
{
if (_inCommandMode)
{
_inCommandMode = false;
await client.HandleCommandByteAsync(buffer[index]);
start = index + 1;
continue;
}
if (buffer[index] == CtrlC)
{
if (index > start)
{
await client.SendInputAsync(buffer, start, index - start);
}
client.HandleLocalCtrlCIntercept(1);
start = index + 1;
continue;
}
if (buffer[index] == CtrlRightBracket)
{
if (index > start)
{
await client.SendInputAsync(buffer, start, index - start);
}
_inCommandMode = true;
start = index + 1;
}
}
if (start < offset + count && !_inCommandMode)
{
await client.SendInputAsync(buffer, start, offset + count - start);
}
}
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
byte[] copy = buffer.ToArray();
await WriteAsync(copy, 0, copy.Length, cancellationToken);
}
}
}
#!/usr/bin/env dotnet
#:property SuppressTrimAnalysisWarnings=false
#:property TrimmerSingleWarn=false
#:package System.CommandLine@2.0.6
#:package Spectre.Console@0.55.0
#:package Hex1b@0.128.0
using System.CommandLine;
using System.ComponentModel;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Globalization;
using System.IO.Pipelines;
using System.IO.Pipes;
using System.Net.Sockets;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Security.Cryptography;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Channels;
using Hex1b;
using Hex1b.Automation;
using Hex1b.Diagnostics;
using Hex1b.Input;
using Hex1b.Tokens;
using Hex1b.Widgets;
using Spectre.Console;
const string TargetCommand = "copilot";
const string TargetPrompt = "We're just testing remote sessions here in the context of a proof regarding the spawning and management of background instances of `copilot`. Stay running and await further instructions.";
const string TargetWorkingDirectory = @"D:\src\GitHub\DamianEdwards\kusto-cli";
const string DotnetHost = "dotnet";
const int MaxTrackedInstances = 5;
const string ControllerLogFileName = "hex1b-daemon.log";
const int WorkloadLogTailCharacterLimit = 4000;
TimeSpan JoinSocketReadyTimeout = TimeSpan.FromSeconds(30);
string TargetArguments = $"--yolo --remote -i \"{TargetPrompt}\"";
string[] TargetArgumentList = ["--yolo", "--remote", "-i", TargetPrompt];
TimeSpan SecondaryCtrlCDelay = TimeSpan.FromSeconds(1);
TimeSpan StartupTrackingDelay = TimeSpan.FromSeconds(3);
TimeSpan GracefulShutdownTimeout = TimeSpan.FromSeconds(15);
if (!OperatingSystem.IsWindows())
{
AnsiConsole.MarkupLine("[red]This sample is only supported on Windows.[/]");
return 1;
}
return await BuildRootCommand().Parse(args).InvokeAsync();
RootCommand BuildRootCommand()
{
RootCommand root = new("Daemon-style controller for copilot sessions hosted through Hex1b PTY terminals.");
Option<bool> backgroundOption = new("--background")
{
Description = "Run without keyboard input.",
Hidden = true,
};
Command runCommand = new("run", "Run the controller in the foreground and listen for keyboard input.");
runCommand.Add(backgroundOption);
runCommand.SetAction((parseResult, cancellationToken) => RunAsync(parseResult.GetValue(backgroundOption), cancellationToken));
root.Add(runCommand);
Command startCommand = new("start", "Start a background run instance for the current directory.");
startCommand.SetAction((_, cancellationToken) => StartAsync(cancellationToken));
root.Add(startCommand);
Command stopCommand = new("stop", "Stop the background run instance for the current directory.");
stopCommand.SetAction((_, cancellationToken) => StopAsync(cancellationToken));
root.Add(stopCommand);
Command joinCommand = new("join", "Join the running background instance in the current terminal.");
joinCommand.SetAction((_, cancellationToken) => JoinAsync(cancellationToken));
Command joinSessionCommand = new("session", "Join a tracked copilot session hosted by the background controller.");
joinSessionCommand.SetAction((_, cancellationToken) => JoinSessionCommandAsync(cancellationToken));
joinCommand.Add(joinSessionCommand);
root.Add(joinCommand);
root.SetAction(_ =>
{
RenderBanner();
AnsiConsole.MarkupLine("[yellow]Choose one of:[/] [aqua]run[/], [aqua]start[/], [aqua]join[/], [aqua]join session[/], or [aqua]stop[/].");
return 1;
});
return root;
}
async Task<int> RunAsync(bool backgroundMode, CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
string controllerLogPath = GetControllerLogPath(currentDirectory);
string stopEventName = GetStopEventName(currentDirectory);
using SingleInstanceLock? runLock = TryAcquireRunLock(currentDirectory);
if (runLock is null)
{
AnsiConsole.MarkupLine($"[red]A run instance is already active for[/] [aqua]{Escape(currentDirectory)}[/].");
AnsiConsole.MarkupLine("[yellow]Use[/] [aqua]dotnet hex1bapp.cs -- stop[/] [yellow]to stop it first.[/]");
return 1;
}
CleanupStalePidFile(currentDirectory, pidFilePath);
using EventWaitHandle stopEvent = new(false, EventResetMode.AutoReset, stopEventName);
RenderBanner();
AnsiConsole.MarkupLine($"[green]Hex1b controller running in[/] [aqua]{Escape(currentDirectory)}[/].");
AnsiConsole.MarkupLine($"[grey]Controller log:[/] [aqua]{Escape(controllerLogPath)}[/]");
AppendControllerLog(controllerLogPath, $"Controller started in {currentDirectory} (backgroundMode={backgroundMode}).");
AppendControllerLog(controllerLogPath, $"Target command: {TargetCommand} {TargetArguments}");
AppendControllerLog(controllerLogPath, $"Target working directory: {TargetWorkingDirectory}");
if (backgroundMode)
{
AnsiConsole.MarkupLine("[grey]Background mode is active. Use[/] [aqua]dotnet hex1bapp.cs -- stop[/] [grey]to stop this controller.[/]");
}
else
{
AnsiConsole.MarkupLine($"[grey]Press[/] [aqua]S[/] [grey]to start[/] [aqua]{Escape(TargetCommand)} {Escape(TargetArguments)}[/] [grey]in[/] [aqua]{Escape(TargetWorkingDirectory)}[/][grey],[/] [aqua]1[/]-[aqua]5[/] [grey]to stop an instance,[/] [aqua]J[/] [grey]then[/] [aqua]1[/]-[aqua]5[/] [grey]to join a tracked session terminal, or[/] [aqua]Ctrl+C[/] [grey]to stop everything and exit.[/]");
}
List<TrackedSession> trackedSessions = [];
int nextSessionToken = 1;
using CancellationTokenSource keyboardCts = new();
using CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, keyboardCts.Token);
using CancellationTokenSource stopMonitorCts = new();
int shutdownRequested = 0;
bool sessionJoinActive = false;
string latestBackgroundStatus = "Background controller is running.";
DaemonJoinServer? joinServer = backgroundMode ? new DaemonJoinServer(GetJoinPipeName(currentDirectory), message => AppendControllerLog(controllerLogPath, message)) : null;
joinServer?.Start();
void RequestShutdown(string message)
{
if (Interlocked.Exchange(ref shutdownRequested, 1) != 0)
{
return;
}
AppendControllerLog(controllerLogPath, $"Shutdown requested: {Markup.Remove(message)}");
latestBackgroundStatus = Markup.Remove(message);
if (joinServer is not null)
{
_ = joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus, isShuttingDown: true));
}
AnsiConsole.MarkupLine(message);
keyboardCts.Cancel();
}
Task stopSignalTask = Task.Run(() =>
{
int signaledIndex = WaitHandle.WaitAny([stopEvent, stopMonitorCts.Token.WaitHandle]);
if (signaledIndex == 0)
{
RequestShutdown("[yellow]Stop signal received. Stopping tracked instances before exit...[/]");
}
}, CancellationToken.None);
ConsoleCancelEventHandler? cancelHandler = null;
cancelHandler = (_, eventArgs) =>
{
eventArgs.Cancel = true;
if (sessionJoinActive)
{
return;
}
RequestShutdown("[yellow]Ctrl+C received. Stopping tracked instances before exit...[/]");
};
Console.CancelKeyPress += cancelHandler;
try
{
if (backgroundMode)
{
try
{
TrackedSession session = await StartTrackedSessionAsync(currentDirectory, nextSessionToken++, controllerLogPath, cancellationToken, enableDiagnostics: false);
trackedSessions.Add(session);
AnsiConsole.MarkupLine($"[green]Auto-started instance[/] [aqua]1[/] [green]for background mode.[/]");
AnsiConsole.MarkupLine($"[grey]Workload log:[/] [aqua]{Escape(session.WorkloadLogPath)}[/]");
AppendControllerLog(controllerLogPath, $"Auto-started session token {session.SessionToken}. Workload log: {session.WorkloadLogPath}");
latestBackgroundStatus = "Auto-started instance 1 for background mode.";
if (joinServer is not null)
{
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus));
}
}
catch (Exception ex) when (ex is Win32Exception or InvalidOperationException or DirectoryNotFoundException)
{
AnsiConsole.MarkupLine($"[red]Failed to auto-start {Escape(TargetCommand)}:[/] {Escape(ex.Message)}");
AppendControllerLog(controllerLogPath, $"Failed to auto-start {TargetCommand}: {ex}");
return 1;
}
RenderTrackedSessions(trackedSessions);
try
{
while (!linkedCts.IsCancellationRequested)
{
List<string> reconcileMessages = await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: false);
if (reconcileMessages.Count > 0)
{
latestBackgroundStatus = reconcileMessages[^1];
if (joinServer is not null)
{
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus));
}
}
if (joinServer is not null)
{
while (joinServer.TryReadRequest(out DaemonJoinRequest? request))
{
latestBackgroundStatus = await ProcessJoinRequestAsync(request, trackedSessions, currentDirectory, controllerLogPath, cancellationToken, RequestShutdown);
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, latestBackgroundStatus, request.Command == DaemonJoinCommand.Shutdown));
}
}
await Task.Delay(TimeSpan.FromMilliseconds(250), linkedCts.Token);
}
}
catch (OperationCanceledException) when (keyboardCts.IsCancellationRequested)
{
}
}
else
{
RenderTrackedSessions(trackedSessions);
bool awaitingSessionJoinOrdinal = false;
while (!linkedCts.IsCancellationRequested)
{
await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: true);
ConsoleKeyInfo? keyInfo;
try
{
keyInfo = await AnsiConsole.Console.Input.ReadKeyAsync(true, linkedCts.Token);
}
catch (OperationCanceledException) when (keyboardCts.IsCancellationRequested)
{
break;
}
await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: true);
if (keyInfo is null)
{
continue;
}
ConsoleKeyInfo key = keyInfo.Value;
char keyChar = key.KeyChar;
if (awaitingSessionJoinOrdinal)
{
if (key.Key == ConsoleKey.Escape)
{
awaitingSessionJoinOrdinal = false;
AnsiConsole.MarkupLine("[grey]Session join canceled.[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
if (!char.IsDigit(keyChar))
{
awaitingSessionJoinOrdinal = false;
AnsiConsole.MarkupLine($"[yellow]Expected a session number after[/] [aqua]J[/][yellow], but got[/] [aqua]{Escape(DescribeKey(key))}[/][yellow].[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
awaitingSessionJoinOrdinal = false;
int joinOrdinal = keyChar - '0';
if (joinOrdinal < 1 || joinOrdinal > trackedSessions.Count)
{
AnsiConsole.MarkupLine($"[yellow]No tracked session matches key[/] [aqua]{joinOrdinal}[/].");
RenderTrackedSessions(trackedSessions);
continue;
}
TrackedSession sessionToJoin = trackedSessions[joinOrdinal - 1];
AnsiConsole.MarkupLine($"[grey]Joining tracked session[/] [aqua]{joinOrdinal}[/] [grey](session token {sessionToJoin.SessionToken})[/][grey]. Detach from the session to return here.[/]");
AppendControllerLog(controllerLogPath, $"User requested join for tracked session {joinOrdinal} (session token {sessionToJoin.SessionToken}).");
sessionJoinActive = true;
try
{
await JoinTrackedSessionAsync(currentDirectory, sessionToJoin.SessionToken, cancellationToken);
}
finally
{
sessionJoinActive = false;
}
RenderTrackedSessions(trackedSessions);
continue;
}
if (char.ToUpperInvariant(keyChar) == 'S')
{
if (trackedSessions.Count >= MaxTrackedInstances)
{
AnsiConsole.MarkupLine($"[yellow]Already tracking the maximum of {MaxTrackedInstances} instances.[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
try
{
TrackedSession session = await StartTrackedSessionAsync(currentDirectory, nextSessionToken++, controllerLogPath, cancellationToken, enableDiagnostics: false);
trackedSessions.Add(session);
AnsiConsole.MarkupLine($"[green]Started instance[/] [aqua]{trackedSessions.Count}[/] [grey](session token {session.SessionToken})[/].");
AnsiConsole.MarkupLine($"[grey]Workload log:[/] [aqua]{Escape(session.WorkloadLogPath)}[/]");
AppendControllerLog(controllerLogPath, $"Started tracked session token {session.SessionToken}. Workload log: {session.WorkloadLogPath}");
}
catch (Exception ex) when (ex is Win32Exception or InvalidOperationException or DirectoryNotFoundException)
{
AnsiConsole.MarkupLine($"[red]Failed to start {Escape(TargetCommand)}:[/] {Escape(ex.Message)}");
AppendControllerLog(controllerLogPath, $"Failed to start {TargetCommand}: {ex}");
}
RenderTrackedSessions(trackedSessions);
continue;
}
if (char.ToUpperInvariant(keyChar) == 'J')
{
if (trackedSessions.Count == 0)
{
AnsiConsole.MarkupLine("[yellow]There are no tracked copilot sessions available to join.[/]");
RenderTrackedSessions(trackedSessions);
continue;
}
awaitingSessionJoinOrdinal = true;
AnsiConsole.MarkupLine("[grey]Join mode:[/] [aqua]press 1-5[/] [grey]to join a tracked session, or[/] [aqua]Esc[/] [grey]to cancel.[/]");
continue;
}
if (char.IsDigit(keyChar))
{
int ordinal = keyChar - '0';
if (ordinal < 1 || ordinal > trackedSessions.Count)
{
AnsiConsole.MarkupLine($"[yellow]No tracked instance matches key[/] [aqua]{ordinal}[/].");
RenderTrackedSessions(trackedSessions);
continue;
}
TrackedSession session = trackedSessions[ordinal - 1];
AppendControllerLog(controllerLogPath, $"User requested shutdown of instance {ordinal} (session token {session.SessionToken}).");
StopProcessOutcome outcome = await StopTrackedSessionAsync(session, $"instance {ordinal}", cancellationToken, controllerLogPath);
trackedSessions.RemoveAt(ordinal - 1);
if (outcome == StopProcessOutcome.AlreadyExited)
{
AnsiConsole.MarkupLine($"[grey]Instance {ordinal} was already stopped.[/]");
}
RenderTrackedSessions(trackedSessions);
continue;
}
if (key.Key == ConsoleKey.Escape)
{
AnsiConsole.MarkupLine("[grey]Use Ctrl+C to shut down the controller and all tracked instances.[/]");
continue;
}
AnsiConsole.MarkupLine($"[grey]Ignored key[/] [aqua]{Escape(DescribeKey(key))}[/][grey].[/]");
}
}
}
finally
{
Console.CancelKeyPress -= cancelHandler;
stopMonitorCts.Cancel();
await stopSignalTask;
await ReconcileExitedSessionsAsync(trackedSessions, controllerLogPath, echoToConsole: false);
if (trackedSessions.Count > 0)
{
AnsiConsole.MarkupLine($"[yellow]Stopping {trackedSessions.Count} tracked instance(s)...[/]");
AppendControllerLog(controllerLogPath, $"Stopping {trackedSessions.Count} tracked session(s) during controller shutdown.");
await Task.WhenAll(trackedSessions.Select(session => StopTrackedSessionAsync(session, $"session {session.SessionToken}", CancellationToken.None, controllerLogPath)));
trackedSessions.Clear();
}
if (joinServer is not null)
{
await joinServer.PublishStateAsync(BuildJoinState(trackedSessions, "Controller stopped.", isShuttingDown: true));
await joinServer.DisposeAsync();
}
CleanupOwnedPidFile(currentDirectory, pidFilePath);
AppendControllerLog(controllerLogPath, "Controller stopped.");
RenderTrackedSessions(trackedSessions);
AnsiConsole.MarkupLine("[green]Controller stopped.[/]");
}
return 0;
}
async Task<int> StartAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
if (TryReadDaemonState(pidFilePath, out DaemonState? existingState) &&
TryGetVerifiedProcess(existingState!, out Process? existingProcess))
{
using (existingProcess)
{
AnsiConsole.MarkupLine($"[yellow]A background controller is already running for[/] [aqua]{Escape(currentDirectory)}[/] [grey](PID {existingState!.Pid})[/].");
return 1;
}
}
CleanupStalePidFile(currentDirectory, pidFilePath);
SelfLaunchSpec selfLaunchSpec = GetSelfLaunchSpec();
ProcessStartInfo startInfo = new()
{
FileName = selfLaunchSpec.FileName,
WorkingDirectory = currentDirectory,
UseShellExecute = false,
CreateNoWindow = true,
RedirectStandardOutput = true,
RedirectStandardError = true,
};
foreach (string argument in selfLaunchSpec.ArgumentPrefix)
{
startInfo.ArgumentList.Add(argument);
}
startInfo.ArgumentList.Add("run");
startInfo.ArgumentList.Add("--background");
using Process process = Process.Start(startInfo)
?? throw new InvalidOperationException("Failed to start the background controller process.");
await Task.Delay(500, cancellationToken);
if (process.HasExited)
{
string output = (await process.StandardOutput.ReadToEndAsync(cancellationToken)).Trim();
string error = (await process.StandardError.ReadToEndAsync(cancellationToken)).Trim();
string details = string.Join(Environment.NewLine, new[] { output, error }.Where(value => !string.IsNullOrWhiteSpace(value)));
if (string.IsNullOrWhiteSpace(details))
{
details = "The background controller exited before it could finish starting.";
}
AnsiConsole.MarkupLine($"[red]{Escape(details)}[/]");
return 1;
}
DaemonState state = new()
{
Pid = process.Id,
StartedAtUtc = process.StartTime.ToUniversalTime(),
};
WriteDaemonState(pidFilePath, state);
AnsiConsole.MarkupLine($"[green]Started background controller[/] [grey](PID {process.Id})[/] [green]for[/] [aqua]{Escape(currentDirectory)}[/].");
AnsiConsole.MarkupLine($"[grey]State file:[/] [aqua]{Escape(pidFilePath)}[/]");
return 0;
}
async Task<int> StopAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
AnsiConsole.MarkupLine($"[yellow]No pid.json was found in[/] [aqua]{Escape(currentDirectory)}[/].");
return 1;
}
if (!TryGetVerifiedProcess(state!, out Process? process))
{
CleanupStalePidFile(currentDirectory, pidFilePath);
AnsiConsole.MarkupLine("[yellow]The recorded background controller is no longer running. Removed stale pid.json.[/]");
return 1;
}
using (process)
{
AnsiConsole.MarkupLine($"[yellow]Sending stop signal to background controller[/] [grey](PID {state!.Pid})[/][yellow]...[/]");
AppendControllerLog(GetControllerLogPath(currentDirectory), $"Stop command requested shutdown of background controller PID {state.Pid}.");
bool signalSent = SignalStopEvent(currentDirectory);
if (!signalSent)
{
AnsiConsole.MarkupLine("[yellow]The controller stop signal was unavailable. Waiting for exit before forcing a kill.[/]");
}
bool exitedGracefully = await WaitForExitAsync(process, GracefulShutdownTimeout, cancellationToken);
if (!exitedGracefully)
{
AnsiConsole.MarkupLine("[red]Background controller did not exit within 15 seconds; killing it.[/]");
process.Kill(entireProcessTree: true);
await process.WaitForExitAsync(cancellationToken);
}
}
DeletePidFileIfPresent(pidFilePath);
AnsiConsole.MarkupLine("[green]Background controller stopped.[/]");
return 0;
}
async Task<int> JoinAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
AnsiConsole.MarkupLine($"[yellow]No pid.json was found in[/] [aqua]{Escape(currentDirectory)}[/].");
return 1;
}
if (!TryGetVerifiedProcess(state!, out Process? process))
{
CleanupStalePidFile(currentDirectory, pidFilePath);
AnsiConsole.MarkupLine("[yellow]The recorded background controller is no longer running. Removed stale pid.json.[/]");
return 1;
}
using (process)
{
string pipeName = GetJoinPipeName(currentDirectory);
await using DaemonJoinClient joinClient = new(pipeName, currentDirectory);
bool connected = await joinClient.ConnectAsync(JoinSocketReadyTimeout, cancellationToken);
if (!connected)
{
AnsiConsole.MarkupLine($"[red]No joinable background controller became available for PID {state!.Pid}.[/]");
AnsiConsole.MarkupLine($"[grey]Expected join pipe:[/] [aqua]{Escape(pipeName)}[/]");
return 1;
}
return await joinClient.RunAsync(cancellationToken);
}
}
async Task<int> JoinSessionCommandAsync(CancellationToken cancellationToken)
{
string currentDirectory = Path.GetFullPath(Environment.CurrentDirectory);
string pidFilePath = GetPidFilePath(currentDirectory);
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
AnsiConsole.MarkupLine($"[yellow]No pid.json was found in[/] [aqua]{Escape(currentDirectory)}[/].");
return 1;
}
if (!TryGetVerifiedProcess(state!, out Process? process))
{
CleanupStalePidFile(currentDirectory, pidFilePath);
AnsiConsole.MarkupLine("[yellow]The recorded background controller is no longer running. Removed stale pid.json.[/]");
return 1;
}
using (process)
{
DaemonJoinState? joinState = await TryReadDaemonJoinStateAsync(GetJoinPipeName(currentDirectory), cancellationToken);
if (joinState is null)
{
AnsiConsole.MarkupLine($"[red]No joinable background controller became available for PID {state!.Pid}.[/]");
return 1;
}
if (joinState.Sessions.Count == 0)
{
AnsiConsole.MarkupLine("[yellow]There are no tracked copilot instances available to join.[/]");
return 1;
}
RenderJoinableDaemonSessions(joinState.Sessions);
int ordinal = PromptForSessionOrdinal(joinState.Sessions);
DaemonJoinTrackedSession selectedSession = joinState.Sessions.Single(session => session.Ordinal == ordinal);
return await JoinTrackedSessionAsync(currentDirectory, selectedSession.SessionToken, cancellationToken);
}
}
async Task<DaemonJoinState?> TryReadDaemonJoinStateAsync(string pipeName, CancellationToken cancellationToken)
{
try
{
using NamedPipeClientStream pipe = new(".", pipeName, PipeDirection.InOut, System.IO.Pipes.PipeOptions.Asynchronous);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(JoinSocketReadyTimeout);
await pipe.ConnectAsync(timeoutCts.Token);
using StreamReader reader = new(pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
string? line = await reader.ReadLineAsync(timeoutCts.Token);
if (string.IsNullOrWhiteSpace(line))
{
return null;
}
return JsonSerializer.Deserialize(line, Hex1bAppJsonSerializerContext.Default.DaemonJoinState);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return null;
}
catch (IOException)
{
return null;
}
catch (JsonException)
{
return null;
}
}
int PromptForSessionOrdinal(IReadOnlyList<DaemonJoinTrackedSession> sessions)
{
HashSet<int> validOrdinals = sessions.Select(session => session.Ordinal).ToHashSet();
TextPrompt<int> prompt = new TextPrompt<int>("[grey]Enter the session number to join:[/]")
.Validate(value => validOrdinals.Contains(value)
? Spectre.Console.ValidationResult.Success()
: Spectre.Console.ValidationResult.Error("[red]Choose one of the listed session numbers.[/]"));
return AnsiConsole.Prompt(prompt);
}
void RenderJoinableDaemonSessions(IReadOnlyList<DaemonJoinTrackedSession> sessions)
{
Table table = new();
table.Border = TableBorder.Rounded;
table.Title = new TableTitle("Joinable copilot sessions");
table.AddColumn("#");
table.AddColumn("Session token");
table.AddColumn("Started (local)");
foreach (DaemonJoinTrackedSession session in sessions)
{
table.AddRow(
session.Ordinal.ToString(CultureInfo.InvariantCulture),
session.SessionToken.ToString(CultureInfo.InvariantCulture),
session.StartedAtUtc.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture));
}
AnsiConsole.Write(table);
}
async Task<int> JoinTrackedSessionAsync(string currentDirectory, int sessionToken, CancellationToken cancellationToken)
{
string pipeName = GetSessionJoinPipeName(currentDirectory, sessionToken);
await using PipeAttachTransport transport = new(pipeName);
await using JoinSessionClient client = new(transport);
AttachTransportResult connectResult = await transport.ConnectAsync(cancellationToken);
if (!connectResult.Success)
{
AnsiConsole.MarkupLine($"[red]Tracked session {sessionToken} is not currently joinable:[/] {Escape(connectResult.Error ?? "Unknown error.")}");
return 1;
}
return await client.RunAsync(connectResult, cancellationToken);
}
async Task<string> ProcessJoinRequestAsync(DaemonJoinRequest request, List<TrackedSession> trackedSessions, string currentDirectory, string controllerLogPath, CancellationToken cancellationToken, Action<string> requestShutdown)
{
switch (request.Command)
{
case DaemonJoinCommand.Start:
if (trackedSessions.Count >= MaxTrackedInstances)
{
return $"Already tracking the maximum of {MaxTrackedInstances} instances.";
}
try
{
int nextSessionToken = trackedSessions.Count == 0 ? 1 : trackedSessions.Max(session => session.SessionToken) + 1;
TrackedSession session = await StartTrackedSessionAsync(currentDirectory, nextSessionToken, controllerLogPath, cancellationToken, enableDiagnostics: false);
trackedSessions.Add(session);
AppendControllerLog(controllerLogPath, $"Join client started tracked session token {session.SessionToken}.");
return $"Started instance {trackedSessions.Count} (session token {session.SessionToken}).";
}
catch (Exception ex) when (ex is Win32Exception or InvalidOperationException or DirectoryNotFoundException)
{
AppendControllerLog(controllerLogPath, $"Join client failed to start {TargetCommand}: {ex}");
return $"Failed to start {TargetCommand}: {ex.Message}";
}
case DaemonJoinCommand.Stop:
if (request.Ordinal is null || request.Ordinal < 1 || request.Ordinal > trackedSessions.Count)
{
return $"No tracked instance matches key {request.Ordinal ?? 0}.";
}
int ordinal = request.Ordinal.Value;
TrackedSession sessionToStop = trackedSessions[ordinal - 1];
StopProcessOutcome outcome = await StopTrackedSessionAsync(sessionToStop, $"instance {ordinal}", cancellationToken, controllerLogPath);
trackedSessions.RemoveAt(ordinal - 1);
return outcome == StopProcessOutcome.AlreadyExited
? $"Instance {ordinal} was already stopped."
: $"Stopped instance {ordinal}.";
case DaemonJoinCommand.Shutdown:
AppendControllerLog(controllerLogPath, "Join client requested controller shutdown.");
requestShutdown("[yellow]Join requested daemon shutdown. Stopping tracked instances before exit...[/]");
return "Joined client requested daemon shutdown.";
default:
return "Join request ignored.";
}
}
async Task<TrackedSession> StartTrackedSessionAsync(string currentDirectory, int sessionToken, string controllerLogPath, CancellationToken cancellationToken, bool enableDiagnostics)
{
if (!Directory.Exists(TargetWorkingDirectory))
{
throw new DirectoryNotFoundException($"The configured repo path does not exist: {TargetWorkingDirectory}");
}
string workloadLogPath = CreateWorkloadLogPath(sessionToken);
string sessionDiagnosticLogPath = CreateSessionDiagnosticLogPath(sessionToken);
string sessionJoinPipeName = GetSessionJoinPipeName(currentDirectory, sessionToken);
SessionJoinPresentationFilter sessionJoinFilter = new(sessionJoinPipeName, message => AppendControllerLog(controllerLogPath, message));
AppendSessionDiagnostic(sessionDiagnosticLogPath, $"Preparing Hex1b PTY host for {TargetCommand} at {TargetWorkingDirectory}.");
Hex1bTerminal terminal = CreateTargetTerminal(workloadLogPath, sessionJoinFilter, enableDiagnostics);
CancellationTokenSource runCts = new();
Task<TrackedSessionOutcome> completion = RunTrackedSessionAsync(sessionToken, terminal, sessionJoinFilter, runCts.Token, sessionDiagnosticLogPath, controllerLogPath);
TrackedSession session = new(sessionToken, terminal, sessionJoinFilter, sessionJoinPipeName, runCts, completion, workloadLogPath, sessionDiagnosticLogPath, DateTimeOffset.UtcNow);
AppendControllerLog(controllerLogPath, $"Created Hex1b terminal for session token {sessionToken}. Workload log: {workloadLogPath}. Session join pipe: {sessionJoinPipeName}");
await Task.Delay(StartupTrackingDelay, cancellationToken);
if (completion.IsCompleted)
{
TrackedSessionOutcome outcome = await completion;
throw new InvalidOperationException(BuildStartupFailureMessage($"{TargetCommand} exited during startup.", session, outcome));
}
return session;
}
async Task<TrackedSessionOutcome> RunTrackedSessionAsync(int sessionToken, Hex1bTerminal terminal, SessionJoinPresentationFilter sessionJoinFilter, CancellationToken cancellationToken, string sessionDiagnosticLogPath, string controllerLogPath)
{
try
{
int exitCode = await terminal.RunAsync(cancellationToken);
AppendControllerLog(controllerLogPath, $"Session token {sessionToken} exited with code {exitCode}.");
AppendSessionDiagnostic(sessionDiagnosticLogPath, $"Hex1b terminal completed with exit code {exitCode}.");
return new TrackedSessionOutcome(exitCode, false, null, DateTimeOffset.UtcNow);
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
AppendControllerLog(controllerLogPath, $"Session token {sessionToken} was cancelled.");
AppendSessionDiagnostic(sessionDiagnosticLogPath, "Hex1b terminal cancellation requested.");
return new TrackedSessionOutcome(null, true, null, DateTimeOffset.UtcNow);
}
catch (Exception ex)
{
AppendControllerLog(controllerLogPath, $"Session token {sessionToken} faulted: {ex}");
AppendSessionDiagnostic(sessionDiagnosticLogPath, $"Hex1b terminal faulted: {ex}");
return new TrackedSessionOutcome(null, false, ex, DateTimeOffset.UtcNow);
}
finally
{
await sessionJoinFilter.DisposeAsync();
await terminal.DisposeAsync();
}
}
async Task<StopProcessOutcome> StopTrackedSessionAsync(TrackedSession session, string label, CancellationToken cancellationToken, string controllerLogPath)
{
if (session.Completion.IsCompleted)
{
AppendControllerLog(controllerLogPath, $"Requested stop for {label} (session token {session.SessionToken}), but it had already exited.");
AppendSessionDiagnostic(session.SessionDiagnosticLogPath, $"Controller requested stop for {label}, but the session had already exited.");
return StopProcessOutcome.AlreadyExited;
}
int signalAttempt = 0;
foreach (TimeSpan waitWindow in new[] { SecondaryCtrlCDelay, GracefulShutdownTimeout - SecondaryCtrlCDelay })
{
if (waitWindow <= TimeSpan.Zero)
{
continue;
}
signalAttempt++;
AppendControllerLog(controllerLogPath, $"Sending Hex1b Ctrl+C attempt {signalAttempt} to {label} (session token {session.SessionToken}).");
AppendSessionDiagnostic(session.SessionDiagnosticLogPath, $"Controller sending Hex1b Ctrl+C attempt {signalAttempt} to {label}.");
AnsiConsole.MarkupLine($"[yellow]Sending Ctrl+C to[/] [aqua]{Escape(label)}[/] [grey](session token {session.SessionToken})[/][yellow]...[/]");
await SendCtrlCThroughHex1bAsync(session.Terminal, cancellationToken);
bool exitedGracefully = await WaitForSessionExitAsync(session, waitWindow, cancellationToken);
if (exitedGracefully)
{
AppendControllerLog(controllerLogPath, $"Stopped {label} (session token {session.SessionToken}) gracefully.");
AppendSessionDiagnostic(session.SessionDiagnosticLogPath, $"Controller observed graceful shutdown after Ctrl+C.");
AnsiConsole.MarkupLine($"[green]Stopped[/] [aqua]{Escape(label)}[/] [grey](session token {session.SessionToken})[/] [green]gracefully.[/]");
return StopProcessOutcome.Graceful;
}
}
AppendControllerLog(controllerLogPath, $"{label} (session token {session.SessionToken}) did not exit after two Ctrl+C attempts; disposing terminal.");
AppendSessionDiagnostic(session.SessionDiagnosticLogPath, "Session did not exit after two Ctrl+C attempts; disposing terminal.");
AnsiConsole.MarkupLine($"[red]{Escape(label)} did not exit within 15 seconds after two Ctrl+C attempts; disposing terminal.[/]");
session.RunCts.Cancel();
await session.Terminal.DisposeAsync();
await WaitForSessionExitAsync(session, TimeSpan.FromSeconds(5), CancellationToken.None);
AnsiConsole.MarkupLine($"[green]Disposed[/] [aqua]{Escape(label)}[/] [grey](session token {session.SessionToken})[/].");
AppendControllerLog(controllerLogPath, $"Disposed {label} (session token {session.SessionToken}) after timeout.");
return StopProcessOutcome.Killed;
}
async Task SendCtrlCThroughHex1bAsync(Hex1bTerminal terminal, CancellationToken cancellationToken)
{
Hex1bTerminalInputSequence sequence = new Hex1bTerminalInputSequenceBuilder()
.Key(Hex1bKey.C, Hex1bModifiers.Control)
.Build();
await sequence.ApplyAsync(terminal, cancellationToken);
}
static async Task<bool> WaitForSessionExitAsync(TrackedSession session, TimeSpan timeout, CancellationToken cancellationToken)
{
if (session.Completion.IsCompleted)
{
return true;
}
Task completedTask = await Task.WhenAny(session.Completion, Task.Delay(timeout, cancellationToken));
return completedTask == session.Completion;
}
static async Task<bool> WaitForExitAsync(Process process, TimeSpan timeout, CancellationToken cancellationToken)
{
if (process.HasExited)
{
return true;
}
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
try
{
await process.WaitForExitAsync(timeoutCts.Token);
return true;
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return process.HasExited;
}
}
async Task<List<string>> ReconcileExitedSessionsAsync(List<TrackedSession> trackedSessions, string controllerLogPath, bool echoToConsole)
{
List<TrackedSession> removedSessions = [];
List<string> reconcileMessages = [];
for (int index = trackedSessions.Count - 1; index >= 0; index--)
{
TrackedSession session = trackedSessions[index];
if (!session.Completion.IsCompleted)
{
continue;
}
trackedSessions.RemoveAt(index);
removedSessions.Add(session);
}
removedSessions.Reverse();
if (removedSessions.Count == 0)
{
return reconcileMessages;
}
foreach (TrackedSession removedSession in removedSessions)
{
TrackedSessionOutcome outcome = await removedSession.Completion;
string statusText = DescribeOutcome(outcome);
string message = $"Tracked instance exited on its own (session token {removedSession.SessionToken}). Outcome: {statusText}";
reconcileMessages.Add(message);
AppendControllerLog(controllerLogPath, $"Tracked session token {removedSession.SessionToken} exited on its own. Outcome: {statusText}. Workload log: {removedSession.WorkloadLogPath}");
if (echoToConsole)
{
AnsiConsole.MarkupLine($"[yellow]Tracked instance exited on its own[/] [grey](session token {removedSession.SessionToken})[/][yellow]. Outcome:[/] {Escape(statusText)}");
AnsiConsole.MarkupLine($"[grey]Workload log:[/] [aqua]{Escape(removedSession.WorkloadLogPath)}[/]");
}
string workloadLogTail = ReadWorkloadLogTail(removedSession.WorkloadLogPath);
if (!string.IsNullOrWhiteSpace(workloadLogTail))
{
AppendControllerLog(controllerLogPath, $"Last workload output for session token {removedSession.SessionToken}:{Environment.NewLine}{workloadLogTail}");
if (echoToConsole)
{
AnsiConsole.MarkupLine($"[grey]Last workload output:[/]{Environment.NewLine}{Escape(workloadLogTail)}");
}
}
}
AppendControllerLog(controllerLogPath, $"Tracked instance count is now {trackedSessions.Count}.");
if (echoToConsole)
{
RenderTrackedSessions(trackedSessions);
}
return reconcileMessages;
}
void RenderBanner()
{
Panel panel = new(new Markup("[bold aqua]Hex1b Graceful Process Controller[/]"))
{
Border = BoxBorder.Rounded,
Header = new PanelHeader("daemon-style CLI"),
Expand = false,
};
AnsiConsole.Write(panel);
}
void RenderTrackedSessions(IReadOnlyList<TrackedSession> trackedSessions)
{
if (trackedSessions.Count == 0)
{
AnsiConsole.Write(new Panel(new Markup("[grey]No tracked instances are running.[/]"))
{
Header = new PanelHeader("tracked instances"),
Border = BoxBorder.Rounded,
});
return;
}
Table table = new();
table.Border = TableBorder.Rounded;
table.Title = new TableTitle("Tracked running instances");
table.AddColumn("#");
table.AddColumn("Session token");
table.AddColumn("Started (local)");
for (int index = 0; index < trackedSessions.Count; index++)
{
TrackedSession session = trackedSessions[index];
table.AddRow(
(index + 1).ToString(CultureInfo.InvariantCulture),
session.SessionToken.ToString(CultureInfo.InvariantCulture),
session.StartedAtUtc.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture));
}
AnsiConsole.Write(table);
}
static bool TryReadDaemonState(string pidFilePath, out DaemonState? state)
{
state = null;
if (!File.Exists(pidFilePath))
{
return false;
}
try
{
string json = File.ReadAllText(pidFilePath).Trim();
state = JsonSerializer.Deserialize(json, Hex1bAppJsonSerializerContext.Default.DaemonState);
if (state is not null)
{
return true;
}
if (int.TryParse(json, NumberStyles.Integer, CultureInfo.InvariantCulture, out int legacyPid))
{
state = new DaemonState
{
Pid = legacyPid,
StartedAtUtc = default,
};
return true;
}
return false;
}
catch (JsonException)
{
string text = File.ReadAllText(pidFilePath).Trim();
if (int.TryParse(text, NumberStyles.Integer, CultureInfo.InvariantCulture, out int legacyPid))
{
state = new DaemonState
{
Pid = legacyPid,
StartedAtUtc = default,
};
return true;
}
return false;
}
}
static void WriteDaemonState(string pidFilePath, DaemonState state)
{
string json = JsonSerializer.Serialize(state, Hex1bAppJsonSerializerContext.Default.DaemonState);
File.WriteAllText(pidFilePath, json);
}
void CleanupStalePidFile(string currentDirectory, string pidFilePath)
{
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
return;
}
if (TryGetVerifiedProcess(state!, out Process? process))
{
process.Dispose();
return;
}
DeletePidFileIfPresent(pidFilePath);
AnsiConsole.MarkupLine($"[grey]Removed stale pid.json from[/] [aqua]{Escape(currentDirectory)}[/][grey].[/]");
}
void CleanupOwnedPidFile(string currentDirectory, string pidFilePath)
{
if (!TryReadDaemonState(pidFilePath, out DaemonState? state))
{
return;
}
if (state!.Pid != Environment.ProcessId)
{
return;
}
try
{
using Process current = Process.GetCurrentProcess();
DateTimeOffset actualStartedAtUtc = current.StartTime.ToUniversalTime();
if (state.StartedAtUtc == default || state.StartedAtUtc == actualStartedAtUtc)
{
DeletePidFileIfPresent(pidFilePath);
AnsiConsole.MarkupLine($"[grey]Removed pid.json for[/] [aqua]{Escape(currentDirectory)}[/][grey].[/]");
}
}
catch (Win32Exception)
{
}
}
static void DeletePidFileIfPresent(string pidFilePath)
{
try
{
if (File.Exists(pidFilePath))
{
File.Delete(pidFilePath);
}
}
catch (IOException)
{
}
catch (UnauthorizedAccessException)
{
}
}
static bool SignalStopEvent(string currentDirectory)
{
if (!OperatingSystem.IsWindows())
{
return false;
}
try
{
using EventWaitHandle stopEvent = EventWaitHandle.OpenExisting(GetStopEventName(currentDirectory));
return stopEvent.Set();
}
catch (WaitHandleCannotBeOpenedException)
{
return false;
}
}
static bool TryGetVerifiedProcess(DaemonState state, [NotNullWhen(true)] out Process? process) =>
TryGetVerifiedProcessById(state.Pid, state.StartedAtUtc, out process);
static bool TryGetVerifiedProcessById(int pid, DateTimeOffset startedAtUtc, [NotNullWhen(true)] out Process? process)
{
process = null;
try
{
process = Process.GetProcessById(pid);
if (process.HasExited)
{
process.Dispose();
process = null;
return false;
}
if (startedAtUtc != default && process.StartTime.ToUniversalTime() != startedAtUtc.UtcDateTime)
{
process.Dispose();
process = null;
return false;
}
return true;
}
catch (ArgumentException)
{
process?.Dispose();
process = null;
return false;
}
catch (InvalidOperationException)
{
process?.Dispose();
process = null;
return false;
}
catch (Win32Exception)
{
process?.Dispose();
process = null;
return false;
}
}
static string GetPidFilePath(string currentDirectory) => Path.Combine(currentDirectory, "pid.json");
static string GetControllerLogPath(string currentDirectory) => Path.Combine(currentDirectory, ControllerLogFileName);
static string GetStopEventName(string currentDirectory) => $@"Local\Hex1bGracefulProcessShutdown-Stop-{ComputeStableHash(currentDirectory)}";
static string GetJoinPipeName(string currentDirectory) => $"Hex1bGracefulProcessShutdown-Join-{ComputeStableHash(currentDirectory)}";
static string GetSessionJoinPipeName(string currentDirectory, int sessionToken) => $"Hex1bGracefulProcessShutdown-Session-{ComputeStableHash(currentDirectory)}-{sessionToken.ToString(CultureInfo.InvariantCulture)}";
static DaemonJoinState BuildJoinState(IReadOnlyList<TrackedSession> trackedSessions, string statusMessage, bool isShuttingDown = false) =>
new()
{
StatusMessage = statusMessage,
IsShuttingDown = isShuttingDown,
Sessions =
[
.. trackedSessions.Select((session, index) => new DaemonJoinTrackedSession
{
Ordinal = index + 1,
SessionToken = session.SessionToken,
StartedAtUtc = session.StartedAtUtc,
})
],
};
static SelfLaunchSpec GetSelfLaunchSpec()
{
string? processPath = Environment.ProcessPath;
if (!string.IsNullOrWhiteSpace(processPath) &&
File.Exists(processPath) &&
!IsDotnetHostPath(processPath))
{
return new SelfLaunchSpec(processPath, []);
}
return new SelfLaunchSpec(DotnetHost, [GetScriptPath(), "--"]);
}
static bool IsDotnetHostPath(string path)
{
string fileName = Path.GetFileName(path);
return string.Equals(fileName, "dotnet", StringComparison.OrdinalIgnoreCase) ||
string.Equals(fileName, "dotnet.exe", StringComparison.OrdinalIgnoreCase);
}
static SingleInstanceLock? TryAcquireRunLock(string currentDirectory)
{
try
{
string lockDirectoryPath = Path.Combine(Path.GetTempPath(), "Hex1bGracefulProcessShutdown");
Directory.CreateDirectory(lockDirectoryPath);
string lockPath = Path.Combine(lockDirectoryPath, $"{ComputeStableHash(currentDirectory)}.lock");
FileStream lockStream = new(lockPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None);
return new SingleInstanceLock(lockStream);
}
catch (IOException)
{
return null;
}
}
static string ComputeStableHash(string value)
{
byte[] bytes = SHA256.HashData(Encoding.UTF8.GetBytes(value.ToUpperInvariant()));
return Convert.ToHexString(bytes);
}
Hex1bTerminal CreateTargetTerminal(string workloadLogPath, SessionJoinPresentationFilter sessionJoinFilter, bool enableDiagnostics)
{
Hex1bTerminalBuilder builder = Hex1bTerminal.CreateBuilder()
.WithDimensions(120, 40)
.WithHeadless()
.WithWorkloadLogging(workloadLogPath)
.AddPresentationFilter(sessionJoinFilter);
if (enableDiagnostics)
{
builder = builder.WithDiagnostics($"hex1bapp-background-{Environment.ProcessId}", forceEnable: true);
}
return builder
.WithPtyProcess(options =>
{
options.FileName = TargetCommand;
options.Arguments = new List<string>(TargetArgumentList);
options.WorkingDirectory = TargetWorkingDirectory;
options.WindowsPtyMode = WindowsPtyMode.RequireProxy;
options.InheritEnvironment = true;
})
.Build();
}
static string CreateWorkloadLogPath(int sessionToken)
{
string logDirectoryPath = Path.Combine(Path.GetTempPath(), "Hex1bGracefulProcessShutdown", "logs");
Directory.CreateDirectory(logDirectoryPath);
return Path.Combine(logDirectoryPath, $"copilot-session-{sessionToken}-{DateTimeOffset.UtcNow:yyyyMMddHHmmssfff}.log");
}
static string CreateSessionDiagnosticLogPath(int sessionToken)
{
string logDirectoryPath = Path.Combine(Path.GetTempPath(), "Hex1bGracefulProcessShutdown", "logs");
Directory.CreateDirectory(logDirectoryPath);
return Path.Combine(logDirectoryPath, $"copilot-session-{sessionToken}-{DateTimeOffset.UtcNow:yyyyMMddHHmmssfff}.controller.log");
}
static string DescribeOutcome(TrackedSessionOutcome outcome)
{
if (outcome.Failure is not null)
{
return $"faulted: {outcome.Failure.Message}";
}
if (outcome.Cancelled)
{
return "cancelled";
}
return $"exit code {outcome.ExitCode ?? 0}";
}
static string BuildStartupFailureMessage(string prefix, TrackedSession session, TrackedSessionOutcome outcome)
{
string details = DescribeOutcome(outcome);
string workloadLogTail = ReadWorkloadLogTail(session.WorkloadLogPath);
string sessionDiagnosticLogTail = ReadWorkloadLogTail(session.SessionDiagnosticLogPath);
if (!string.IsNullOrWhiteSpace(sessionDiagnosticLogTail))
{
details = $"{details}{Environment.NewLine}Controller session log:{Environment.NewLine}{sessionDiagnosticLogTail}";
}
if (string.IsNullOrWhiteSpace(workloadLogTail))
{
return $"{prefix} Outcome: {details}";
}
return $"{prefix} Outcome: {details}{Environment.NewLine}Workload log:{Environment.NewLine}{workloadLogTail}";
}
static void AppendControllerLog(string controllerLogPath, string message)
{
string line = $"[{DateTimeOffset.Now:yyyy-MM-dd HH:mm:ss.fff}] {message}{Environment.NewLine}";
TryWriteLogFile(controllerLogPath, line, append: true);
}
static void AppendSessionDiagnostic(string sessionDiagnosticLogPath, string message)
{
string line = $"[{DateTimeOffset.Now:yyyy-MM-dd HH:mm:ss.fff}] {message}{Environment.NewLine}";
TryWriteLogFile(sessionDiagnosticLogPath, line, append: true);
}
static bool TryWriteLogFile(string path, string content, bool append)
{
string? directoryPath = Path.GetDirectoryName(path);
if (!string.IsNullOrWhiteSpace(directoryPath))
{
Directory.CreateDirectory(directoryPath);
}
for (int attempt = 0; attempt < 5; attempt++)
{
try
{
FileMode fileMode = append ? FileMode.Append : FileMode.Create;
using FileStream stream = new(path, fileMode, FileAccess.Write, FileShare.ReadWrite | FileShare.Delete);
using StreamWriter writer = new(stream, Encoding.UTF8);
writer.Write(content);
writer.Flush();
return true;
}
catch (IOException)
{
if (attempt == 4)
{
return false;
}
Thread.Sleep(25);
}
catch (UnauthorizedAccessException)
{
if (attempt == 4)
{
return false;
}
Thread.Sleep(25);
}
}
return false;
}
static string ReadWorkloadLogTail(string workloadLogPath)
{
try
{
if (!File.Exists(workloadLogPath))
{
return string.Empty;
}
string logContent = File.ReadAllText(workloadLogPath).Trim();
if (string.IsNullOrWhiteSpace(logContent))
{
return string.Empty;
}
if (logContent.Length <= WorkloadLogTailCharacterLimit)
{
return logContent;
}
return $"...{logContent[^WorkloadLogTailCharacterLimit..]}";
}
catch (IOException)
{
return string.Empty;
}
catch (UnauthorizedAccessException)
{
return string.Empty;
}
}
static string Escape(string value) => Markup.Escape(value);
static string DescribeKey(ConsoleKeyInfo key) =>
key.KeyChar == '\0' ? key.Key.ToString() : key.KeyChar.ToString();
static string GetScriptPath([CallerFilePath] string path = "") => path;
sealed record DaemonState
{
public required int Pid { get; init; }
public required DateTimeOffset StartedAtUtc { get; init; }
}
sealed record AttachSocketRequest
{
[JsonPropertyName("method")]
public required string Method { get; init; }
}
sealed record AttachSocketResponse
{
[JsonPropertyName("success")]
public bool Success { get; init; }
[JsonPropertyName("width")]
public int? Width { get; init; }
[JsonPropertyName("height")]
public int? Height { get; init; }
[JsonPropertyName("leader")]
public bool? Leader { get; init; }
[JsonPropertyName("data")]
public string? Data { get; init; }
[JsonPropertyName("error")]
public string? Error { get; init; }
}
enum DaemonJoinCommand
{
Snapshot,
Start,
Stop,
Shutdown,
}
sealed record DaemonJoinRequest
{
public required DaemonJoinCommand Command { get; init; }
public int? Ordinal { get; init; }
}
sealed record DaemonJoinTrackedSession
{
public required int Ordinal { get; init; }
public required int SessionToken { get; init; }
public required DateTimeOffset StartedAtUtc { get; init; }
}
sealed record DaemonJoinState
{
public required string StatusMessage { get; init; }
public required bool IsShuttingDown { get; init; }
public required List<DaemonJoinTrackedSession> Sessions { get; init; }
}
sealed class TrackedSession(
int sessionToken,
Hex1bTerminal terminal,
SessionJoinPresentationFilter sessionJoinFilter,
string sessionJoinPipeName,
CancellationTokenSource runCts,
Task<TrackedSessionOutcome> completion,
string workloadLogPath,
string sessionDiagnosticLogPath,
DateTimeOffset startedAtUtc)
{
public int SessionToken { get; } = sessionToken;
public Hex1bTerminal Terminal { get; } = terminal;
public SessionJoinPresentationFilter SessionJoinFilter { get; } = sessionJoinFilter;
public string SessionJoinPipeName { get; } = sessionJoinPipeName;
public CancellationTokenSource RunCts { get; } = runCts;
public Task<TrackedSessionOutcome> Completion { get; } = completion;
public string WorkloadLogPath { get; } = workloadLogPath;
public string SessionDiagnosticLogPath { get; } = sessionDiagnosticLogPath;
public DateTimeOffset StartedAtUtc { get; } = startedAtUtc;
}
sealed record TrackedSessionOutcome(int? ExitCode, bool Cancelled, Exception? Failure, DateTimeOffset EndedAtUtc);
sealed record SelfLaunchSpec(string FileName, IReadOnlyList<string> ArgumentPrefix);
[JsonSerializable(typeof(DaemonState))]
[JsonSerializable(typeof(AttachSocketRequest))]
[JsonSerializable(typeof(AttachSocketResponse))]
[JsonSerializable(typeof(DaemonJoinRequest))]
[JsonSerializable(typeof(DaemonJoinState))]
sealed partial class Hex1bAppJsonSerializerContext : JsonSerializerContext;
enum StopProcessOutcome
{
Graceful,
Killed,
AlreadyExited,
}
sealed class SingleInstanceLock(FileStream lockStream) : IDisposable
{
private readonly FileStream _lockStream = lockStream;
private bool _disposed;
public void Dispose()
{
if (_disposed)
{
return;
}
_lockStream.Dispose();
_disposed = true;
}
}
sealed class DaemonJoinServer(string pipeName, Action<string> log) : IAsyncDisposable
{
private readonly string _pipeName = pipeName;
private readonly Action<string> _log = log;
private readonly CancellationTokenSource _cts = new();
private readonly Channel<DaemonJoinRequest> _requests = Channel.CreateUnbounded<DaemonJoinRequest>();
private readonly object _connectionLock = new();
private DaemonJoinConnection? _connection;
private DaemonJoinState _latestState = new()
{
StatusMessage = "Background controller is starting.",
IsShuttingDown = false,
Sessions = [],
};
private Task? _acceptLoop;
public void Start()
{
_log($"Join server starting on pipe '{_pipeName}'.");
_acceptLoop = AcceptLoopAsync(_cts.Token);
}
public bool TryReadRequest([NotNullWhen(true)] out DaemonJoinRequest? request) => _requests.Reader.TryRead(out request);
public async Task PublishStateAsync(DaemonJoinState state)
{
_latestState = state;
DaemonJoinConnection? connection;
lock (_connectionLock)
{
connection = _connection;
}
if (connection is not null)
{
_log($"Join server publishing state: '{state.StatusMessage}' (sessions={state.Sessions.Count}, shuttingDown={state.IsShuttingDown}).");
await connection.SendStateAsync(state);
}
}
public async ValueTask DisposeAsync()
{
await _cts.CancelAsync();
if (_acceptLoop is not null)
{
try
{
await _acceptLoop;
}
catch (OperationCanceledException)
{
}
}
DaemonJoinConnection? connection;
lock (_connectionLock)
{
connection = _connection;
_connection = null;
}
if (connection is not null)
{
await connection.DisposeAsync();
}
_cts.Dispose();
}
private async Task AcceptLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
_log($"Join server waiting for connection on pipe '{_pipeName}'.");
NamedPipeServerStream server = new(
_pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous);
try
{
await server.WaitForConnectionAsync(cancellationToken);
_log("Join server accepted a client connection.");
}
catch (OperationCanceledException)
{
await server.DisposeAsync();
break;
}
DaemonJoinConnection connection = new(server, _latestState, _requests.Writer, ClearConnectionAsync, _log);
lock (_connectionLock)
{
_connection = connection;
}
await connection.RunAsync(cancellationToken);
}
}
private ValueTask ClearConnectionAsync(DaemonJoinConnection connection)
{
lock (_connectionLock)
{
if (ReferenceEquals(_connection, connection))
{
_connection = null;
}
}
return ValueTask.CompletedTask;
}
}
sealed class DaemonJoinConnection(
NamedPipeServerStream pipe,
DaemonJoinState initialState,
ChannelWriter<DaemonJoinRequest> requestWriter,
Func<DaemonJoinConnection, ValueTask> onClosed,
Action<string> log) : IAsyncDisposable
{
private readonly NamedPipeServerStream _pipe = pipe;
private readonly DaemonJoinState _initialState = initialState;
private readonly ChannelWriter<DaemonJoinRequest> _requestWriter = requestWriter;
private readonly Func<DaemonJoinConnection, ValueTask> _onClosed = onClosed;
private readonly Action<string> _log = log;
private readonly SemaphoreSlim _writeLock = new(1, 1);
private readonly StreamReader _reader = new(pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
private readonly StreamWriter _writer = new(pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
private bool _disposed;
public async Task RunAsync(CancellationToken cancellationToken)
{
try
{
_log($"Join connection sending initial state '{_initialState.StatusMessage}' with {_initialState.Sessions.Count} session(s).");
await SendStateAsync(_initialState);
while (!cancellationToken.IsCancellationRequested)
{
string? line = await _reader.ReadLineAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(line))
{
break;
}
DaemonJoinRequest? request = JsonSerializer.Deserialize(line, Hex1bAppJsonSerializerContext.Default.DaemonJoinRequest);
if (request is null)
{
continue;
}
_log($"Join connection received request {request.Command} (ordinal={request.Ordinal?.ToString(CultureInfo.InvariantCulture) ?? "null"}).");
await _requestWriter.WriteAsync(request, cancellationToken);
if (request.Command == DaemonJoinCommand.Shutdown)
{
break;
}
}
}
catch (OperationCanceledException)
{
}
catch (IOException)
{
_log("Join connection ended because the pipe closed.");
}
finally
{
_log("Join connection closing.");
await DisposeAsync();
await _onClosed(this);
}
}
public async Task SendStateAsync(DaemonJoinState state)
{
if (_disposed)
{
return;
}
await _writeLock.WaitAsync();
try
{
if (_disposed)
{
return;
}
string payload = JsonSerializer.Serialize(state, Hex1bAppJsonSerializerContext.Default.DaemonJoinState);
_log($"Join connection writing state payload: '{state.StatusMessage}' (sessions={state.Sessions.Count}, shuttingDown={state.IsShuttingDown}).");
await _writer.WriteLineAsync(payload);
}
catch (IOException)
{
_log("Join connection write failed because the pipe closed.");
await DisposeAsync();
}
finally
{
_writeLock.Release();
}
}
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
_reader.Dispose();
await _writer.DisposeAsync();
await _pipe.DisposeAsync();
_writeLock.Dispose();
}
}
sealed class DaemonJoinClient(string pipeName, string currentDirectory) : IAsyncDisposable
{
private readonly string _pipeName = pipeName;
private readonly string _currentDirectory = currentDirectory;
private readonly Channel<DaemonJoinState> _stateUpdates = Channel.CreateUnbounded<DaemonJoinState>();
private NamedPipeClientStream? _pipe;
private StreamReader? _reader;
private StreamWriter? _writer;
private DaemonJoinState _state = new()
{
StatusMessage = "Connecting to background controller...",
IsShuttingDown = false,
Sessions = [],
};
private bool _awaitingSessionJoinOrdinal;
private bool _sessionJoinActive;
private string? _localStatusMessage;
public async Task<bool> ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
{
try
{
_pipe = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, System.IO.Pipes.PipeOptions.Asynchronous);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(timeout);
await _pipe.ConnectAsync(timeoutCts.Token);
_reader = new StreamReader(_pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
_writer = new StreamWriter(_pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
return true;
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return false;
}
catch (TimeoutException)
{
return false;
}
catch (IOException)
{
return false;
}
}
public async Task<int> RunAsync(CancellationToken cancellationToken)
{
if (_reader is null || _writer is null)
{
return 1;
}
using CancellationTokenSource joinCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
ConsoleCancelEventHandler? cancelHandler = null;
cancelHandler = (_, eventArgs) =>
{
eventArgs.Cancel = true;
if (_sessionJoinActive)
{
return;
}
joinCts.Cancel();
};
Console.CancelKeyPress += cancelHandler;
Task listenTask = ListenAsync(joinCts.Token);
try
{
RenderState();
Task<ConsoleKeyInfo?> keyTask = AnsiConsole.Console.Input.ReadKeyAsync(true, joinCts.Token);
Task<DaemonJoinState> stateTask = _stateUpdates.Reader.ReadAsync(joinCts.Token).AsTask();
while (!joinCts.IsCancellationRequested)
{
Task completedTask;
try
{
completedTask = await Task.WhenAny(keyTask, stateTask);
}
catch (OperationCanceledException)
{
break;
}
if (completedTask == stateTask)
{
try
{
_state = await stateTask;
}
catch (OperationCanceledException)
{
break;
}
catch (ChannelClosedException)
{
break;
}
if (!_awaitingSessionJoinOrdinal)
{
_localStatusMessage = null;
}
RenderState();
if (_state.IsShuttingDown)
{
joinCts.Cancel();
break;
}
stateTask = _stateUpdates.Reader.ReadAsync(joinCts.Token).AsTask();
continue;
}
ConsoleKeyInfo? keyInfo;
try
{
keyInfo = await keyTask;
}
catch (OperationCanceledException)
{
break;
}
keyTask = AnsiConsole.Console.Input.ReadKeyAsync(true, joinCts.Token);
if (keyInfo is null)
{
continue;
}
ConsoleKeyInfo key = keyInfo.Value;
char keyChar = key.KeyChar;
if (_awaitingSessionJoinOrdinal)
{
if (key.Key == ConsoleKey.Escape)
{
_awaitingSessionJoinOrdinal = false;
_localStatusMessage = "Session join canceled.";
RenderState();
continue;
}
if (!char.IsDigit(keyChar))
{
_awaitingSessionJoinOrdinal = false;
_localStatusMessage = $"Expected a session number after J, but got {DescribeKey(key)}.";
RenderState();
continue;
}
_awaitingSessionJoinOrdinal = false;
int ordinal = keyChar - '0';
DaemonJoinTrackedSession? sessionToJoin = _state.Sessions.FirstOrDefault(session => session.Ordinal == ordinal);
if (sessionToJoin is null)
{
_localStatusMessage = $"No tracked session matches key {ordinal}.";
RenderState();
continue;
}
_localStatusMessage = $"Joining tracked session {ordinal} (session token {sessionToJoin.SessionToken}). Detach from it to return here.";
RenderState();
await using PipeAttachTransport transport = new(GetSessionJoinPipeName(_currentDirectory, sessionToJoin.SessionToken));
AttachTransportResult joinResult = await transport.ConnectAsync(joinCts.Token);
if (!joinResult.Success)
{
_localStatusMessage = $"Tracked session {ordinal} is no longer joinable: {joinResult.Error ?? "Unknown error."}";
RenderState();
continue;
}
await using JoinSessionClient sessionClient = new(transport);
_sessionJoinActive = true;
try
{
await sessionClient.RunAsync(joinResult, joinCts.Token);
}
finally
{
_sessionJoinActive = false;
}
_localStatusMessage = $"Returned from tracked session {ordinal}.";
RenderState();
continue;
}
if (char.ToUpperInvariant(keyChar) == 'S')
{
await SendRequestAsync(new DaemonJoinRequest { Command = DaemonJoinCommand.Start });
continue;
}
if (char.ToUpperInvariant(keyChar) == 'J')
{
if (_state.Sessions.Count == 0)
{
_localStatusMessage = "There are no tracked copilot sessions available to join.";
}
else
{
_awaitingSessionJoinOrdinal = true;
_localStatusMessage = "Join mode: press 1-5 to choose a tracked session, or Esc to cancel.";
}
RenderState();
continue;
}
if (char.IsDigit(keyChar))
{
await SendRequestAsync(new DaemonJoinRequest
{
Command = DaemonJoinCommand.Stop,
Ordinal = keyChar - '0',
});
continue;
}
if (key.Key == ConsoleKey.Escape)
{
joinCts.Cancel();
break;
}
}
}
finally
{
Console.CancelKeyPress -= cancelHandler;
joinCts.Cancel();
try
{
await listenTask;
}
catch
{
}
}
return 0;
}
public async ValueTask DisposeAsync()
{
if (_writer is not null)
{
await _writer.DisposeAsync();
_writer = null;
}
_reader?.Dispose();
_reader = null;
if (_pipe is not null)
{
await _pipe.DisposeAsync();
_pipe = null;
}
}
private async Task ListenAsync(CancellationToken cancellationToken)
{
if (_reader is null)
{
_stateUpdates.Writer.TryComplete();
return;
}
try
{
while (!cancellationToken.IsCancellationRequested)
{
string? line;
try
{
line = await _reader.ReadLineAsync(cancellationToken);
}
catch (OperationCanceledException)
{
break;
}
catch (IOException)
{
break;
}
if (string.IsNullOrWhiteSpace(line))
{
break;
}
DaemonJoinState? state = JsonSerializer.Deserialize(line, Hex1bAppJsonSerializerContext.Default.DaemonJoinState);
if (state is null)
{
continue;
}
await _stateUpdates.Writer.WriteAsync(state, cancellationToken);
if (state.IsShuttingDown)
{
break;
}
}
}
finally
{
_stateUpdates.Writer.TryComplete();
}
}
private Task SendRequestAsync(DaemonJoinRequest request)
{
if (_writer is null)
{
return Task.CompletedTask;
}
string payload = JsonSerializer.Serialize(request, Hex1bAppJsonSerializerContext.Default.DaemonJoinRequest);
return _writer.WriteLineAsync(payload);
}
private void RenderState()
{
AnsiConsole.Clear();
Panel panel = new(new Markup("[bold aqua]Hex1b Graceful Process Controller[/]"))
{
Border = BoxBorder.Rounded,
Header = new PanelHeader("joined background daemon"),
Expand = false,
};
AnsiConsole.Write(panel);
AnsiConsole.MarkupLine($"[green]Joined background controller in[/] [aqua]{Markup.Escape(_currentDirectory)}[/].");
if (_awaitingSessionJoinOrdinal)
{
AnsiConsole.MarkupLine("[grey]Join mode is active:[/] [aqua]press 1-5[/] [grey]to join a tracked session, or[/] [aqua]Esc[/]/[aqua]Ctrl+C[/] [grey]to cancel or detach.[/]");
}
else
{
AnsiConsole.MarkupLine("[grey]Press[/] [aqua]S[/] [grey]to start an instance,[/] [aqua]1[/]-[aqua]5[/] [grey]to stop an instance,[/] [aqua]J[/] [grey]then[/] [aqua]1[/]-[aqua]5[/] [grey]to join a tracked session, or[/] [aqua]Esc[/]/[aqua]Ctrl+C[/] [grey]to detach.[/]");
}
AnsiConsole.MarkupLine($"[grey]Status:[/] {Markup.Escape(_localStatusMessage ?? _state.StatusMessage)}");
if (_state.Sessions.Count == 0)
{
AnsiConsole.Write(new Panel(new Markup("[grey]No tracked instances are running.[/]"))
{
Header = new PanelHeader("tracked instances"),
Border = BoxBorder.Rounded,
});
return;
}
Table table = new();
table.Border = TableBorder.Rounded;
table.Title = new TableTitle("Tracked running instances");
table.AddColumn("#");
table.AddColumn("Session token");
table.AddColumn("Started (local)");
foreach (DaemonJoinTrackedSession session in _state.Sessions)
{
table.AddRow(
session.Ordinal.ToString(CultureInfo.InvariantCulture),
session.SessionToken.ToString(CultureInfo.InvariantCulture),
session.StartedAtUtc.ToLocalTime().ToString("yyyy-MM-dd HH:mm:ss", CultureInfo.InvariantCulture));
}
AnsiConsole.Write(table);
}
private static string DescribeKey(ConsoleKeyInfo key) =>
key.KeyChar == '\0' ? key.Key.ToString() : key.KeyChar.ToString();
private static string GetSessionJoinPipeName(string currentDirectory, int sessionToken) =>
$"Hex1bGracefulProcessShutdown-Session-{ComputeStableHash(currentDirectory)}-{sessionToken.ToString(CultureInfo.InvariantCulture)}";
private static string ComputeStableHash(string value)
{
byte[] bytes = SHA256.HashData(Encoding.UTF8.GetBytes(value.ToUpperInvariant()));
return Convert.ToHexString(bytes);
}
}
sealed class SessionJoinPresentationFilter(string pipeName, Action<string> log) : ITerminalAwarePresentationFilter, IAsyncDisposable
{
private static readonly Action<Hex1bTerminal, int, int>? ResizeWithWorkloadInvoker = CreateResizeWithWorkloadInvoker();
private readonly string _pipeName = pipeName;
private readonly Action<string> _log = log;
private readonly CancellationTokenSource _cts = new();
private readonly List<SessionJoinAttachSession> _sessions = [];
private readonly object _attachLock = new();
private SessionJoinAttachSession? _leaderSession;
private bool _mouseTrackingEnabled;
private bool _sgrMouseModeEnabled;
private bool _bracketedPasteEnabled;
private Hex1bTerminal? _terminal;
private Task? _acceptLoop;
private bool _started;
private bool _disposed;
public void SetTerminal(Hex1bTerminal terminal) => _terminal = terminal;
public ValueTask OnSessionStartAsync(int width, int height, DateTimeOffset timestamp, CancellationToken ct = default)
{
StartListening();
return ValueTask.CompletedTask;
}
public ValueTask<IReadOnlyList<AnsiToken>> OnOutputAsync(IReadOnlyList<AppliedToken> appliedTokens, TimeSpan elapsed, CancellationToken ct = default)
{
IReadOnlyList<AnsiToken> tokens = [.. appliedTokens.Select(token => token.Token)];
foreach (AnsiToken token in tokens)
{
if (token is PrivateModeToken privateMode)
{
switch (privateMode.Mode)
{
case 1000 or 1002 or 1003:
_mouseTrackingEnabled = privateMode.Enable;
break;
case 1006:
_sgrMouseModeEnabled = privateMode.Enable;
break;
case 2004:
_bracketedPasteEnabled = privateMode.Enable;
break;
}
}
}
lock (_attachLock)
{
if (_sessions.Count > 0)
{
byte[] ansi = Encoding.UTF8.GetBytes(AnsiTokenSerializer.Serialize(tokens));
if (ansi.Length > 0)
{
AttachTransportFrame frame = AttachTransportFrame.Output(ansi);
for (int index = _sessions.Count - 1; index >= 0; index--)
{
if (!_sessions[index].Channel.Writer.TryWrite(frame))
{
_sessions.RemoveAt(index);
}
}
}
}
}
return ValueTask.FromResult(tokens);
}
public ValueTask OnInputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnResizeAsync(int width, int height, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnSessionEndAsync(TimeSpan elapsed, CancellationToken ct = default) => DisposeAsync();
public async ValueTask DisposeAsync()
{
if (_disposed)
{
return;
}
_disposed = true;
await _cts.CancelAsync();
if (_acceptLoop is not null)
{
try
{
await _acceptLoop;
}
catch (OperationCanceledException)
{
}
}
List<SessionJoinAttachSession> sessionsToClose;
lock (_attachLock)
{
sessionsToClose = [.. _sessions];
_sessions.Clear();
_leaderSession = null;
}
foreach (SessionJoinAttachSession session in sessionsToClose)
{
session.Channel.Writer.TryWrite(AttachTransportFrame.Exit());
session.Channel.Writer.TryComplete();
}
_cts.Dispose();
}
private void StartListening()
{
if (_started || _disposed)
{
return;
}
_started = true;
_log($"Session join server starting on pipe '{_pipeName}'.");
_acceptLoop = AcceptLoopAsync(_cts.Token);
}
private async Task AcceptLoopAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
NamedPipeServerStream server = new(
_pipeName,
PipeDirection.InOut,
1,
PipeTransmissionMode.Byte,
System.IO.Pipes.PipeOptions.Asynchronous);
try
{
await server.WaitForConnectionAsync(cancellationToken);
_log($"Session join server accepted a client on '{_pipeName}'.");
await HandleConnectionAsync(server, cancellationToken);
}
catch (OperationCanceledException)
{
await server.DisposeAsync();
break;
}
catch (IOException)
{
await server.DisposeAsync();
}
}
}
private async Task HandleConnectionAsync(NamedPipeServerStream pipe, CancellationToken cancellationToken)
{
await using (pipe)
{
await using StreamWriter writer = new(pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
using StreamReader reader = new(pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
try
{
string? requestLine = await reader.ReadLineAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(requestLine))
{
return;
}
AttachSocketRequest? request = JsonSerializer.Deserialize(requestLine, Hex1bAppJsonSerializerContext.Default.AttachSocketRequest);
if (!string.Equals(request?.Method, "attach", StringComparison.OrdinalIgnoreCase))
{
AttachSocketResponse errorResponse = new() { Success = false, Error = "Only attach is supported for session joins." };
await writer.WriteLineAsync(JsonSerializer.Serialize(errorResponse, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse).AsMemory(), cancellationToken);
return;
}
await using SessionJoinAttachSession session = CreateAttachSession();
AttachSocketResponse attachResponse = new()
{
Success = true,
Width = session.Width,
Height = session.Height,
Leader = session.IsLeader,
Data = session.InitialScreen,
};
await writer.WriteLineAsync(JsonSerializer.Serialize(attachResponse, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse).AsMemory(), cancellationToken);
using CancellationTokenSource detachCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task outputTask = StreamOutputFromSessionAsync(session, writer, detachCts.Token);
Task inputTask = StreamInputToSessionAsync(session, reader, detachCts);
await Task.WhenAny(outputTask, inputTask);
await detachCts.CancelAsync();
try
{
await Task.WhenAll(outputTask, inputTask);
}
catch (OperationCanceledException)
{
}
}
catch (IOException)
{
}
}
}
private SessionJoinAttachSession CreateAttachSession()
{
if (_terminal is null)
{
throw new InvalidOperationException("Terminal not initialized.");
}
Channel<AttachTransportFrame> channel = Channel.CreateBounded<AttachTransportFrame>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleReader = true,
SingleWriter = false,
});
using var snapshot = _terminal.CreateSnapshot();
string initialAnsi = snapshot.ToAnsi(new TerminalAnsiOptions
{
IncludeClearScreen = true,
IncludeTrailingNewline = true,
});
SessionJoinAttachSession session = new(this, channel, snapshot.Width, snapshot.Height, false, initialAnsi);
lock (_attachLock)
{
_sessions.Add(session);
if (_leaderSession is null)
{
_leaderSession = session;
session.IsLeader = true;
}
}
string modeReplay = BuildModeReplaySequence();
if (modeReplay.Length > 0)
{
session.Channel.Writer.TryWrite(AttachTransportFrame.Output(Encoding.UTF8.GetBytes(modeReplay)));
}
return session;
}
private static async Task StreamOutputFromSessionAsync(SessionJoinAttachSession session, StreamWriter writer, CancellationToken cancellationToken)
{
try
{
await foreach (AttachTransportFrame frame in session.Frames.ReadAllAsync(cancellationToken))
{
switch (frame.Kind)
{
case AttachTransportFrameKind.Output:
await writer.WriteLineAsync($"o:{Convert.ToBase64String(frame.Data.Span)}".AsMemory(), cancellationToken);
break;
case AttachTransportFrameKind.Resize:
(int width, int height) = frame.GetResize();
await writer.WriteLineAsync($"r:{width},{height}".AsMemory(), cancellationToken);
break;
case AttachTransportFrameKind.LeaderChanged:
await writer.WriteLineAsync($"leader:{(frame.GetIsLeader() ? "true" : "false")}".AsMemory(), cancellationToken);
break;
case AttachTransportFrameKind.Exit:
await writer.WriteLineAsync("exit".AsMemory(), cancellationToken);
return;
}
}
}
catch (OperationCanceledException)
{
}
}
private async Task StreamInputToSessionAsync(SessionJoinAttachSession session, StreamReader reader, CancellationTokenSource detachCts)
{
try
{
while (!detachCts.Token.IsCancellationRequested)
{
string? line = await reader.ReadLineAsync(detachCts.Token);
if (line is null || string.Equals(line, "detach", StringComparison.Ordinal))
{
await detachCts.CancelAsync();
return;
}
if (string.Equals(line, "shutdown", StringComparison.Ordinal))
{
await detachCts.CancelAsync();
return;
}
if (line.StartsWith("i:", StringComparison.Ordinal))
{
await session.SendInputAsync(Convert.FromBase64String(line[2..]));
continue;
}
if (line.StartsWith("r:", StringComparison.Ordinal))
{
string[] parts = line[2..].Split(',');
if (parts.Length == 2 &&
int.TryParse(parts[0], NumberStyles.Integer, CultureInfo.InvariantCulture, out int width) &&
int.TryParse(parts[1], NumberStyles.Integer, CultureInfo.InvariantCulture, out int height))
{
await session.SendResizeAsync(width, height);
}
continue;
}
if (string.Equals(line, "lead", StringComparison.Ordinal))
{
await session.ClaimLeadAsync();
}
}
}
catch (OperationCanceledException)
{
}
catch (IOException)
{
await detachCts.CancelAsync();
}
}
internal Task SendInputFromSessionAsync(SessionJoinAttachSession session, byte[] data) =>
_terminal?.SendInputAsync(data) ?? Task.CompletedTask;
internal Task SendResizeFromSessionAsync(SessionJoinAttachSession session, int width, int height)
{
if (_terminal is null)
{
return Task.CompletedTask;
}
bool isLeader;
lock (_attachLock)
{
isLeader = _leaderSession == session;
}
if (!isLeader)
{
return Task.CompletedTask;
}
ResizeTerminalAndWorkload(_terminal, width, height);
AttachTransportFrame frame = AttachTransportFrame.Resize(width, height);
lock (_attachLock)
{
foreach (SessionJoinAttachSession attachedSession in _sessions)
{
if (!ReferenceEquals(attachedSession, session))
{
attachedSession.Channel.Writer.TryWrite(frame);
}
}
}
return Task.CompletedTask;
}
[DynamicDependency(DynamicallyAccessedMemberTypes.NonPublicMethods, typeof(Hex1bTerminal))]
private static Action<Hex1bTerminal, int, int>? CreateResizeWithWorkloadInvoker()
{
MethodInfo? method = typeof(Hex1bTerminal).GetMethod(
"ResizeWithWorkload",
BindingFlags.Instance | BindingFlags.NonPublic,
binder: null,
types: [typeof(int), typeof(int)],
modifiers: null);
if (method is null)
{
return null;
}
return (terminal, width, height) => method.Invoke(terminal, [width, height]);
}
private static void ResizeTerminalAndWorkload(Hex1bTerminal terminal, int width, int height)
{
if (ResizeWithWorkloadInvoker is { } invoker)
{
invoker(terminal, width, height);
return;
}
terminal.Resize(width, height);
}
internal Task ClaimLeadFromSessionAsync(SessionJoinAttachSession session)
{
SessionJoinAttachSession? oldLeader;
lock (_attachLock)
{
oldLeader = _leaderSession;
_leaderSession = session;
session.IsLeader = true;
}
if (oldLeader is not null && !ReferenceEquals(oldLeader, session))
{
oldLeader.IsLeader = false;
oldLeader.Channel.Writer.TryWrite(AttachTransportFrame.LeaderChanged(false));
}
session.Channel.Writer.TryWrite(AttachTransportFrame.LeaderChanged(true));
return Task.CompletedTask;
}
internal void RemoveSession(SessionJoinAttachSession session)
{
lock (_attachLock)
{
_sessions.Remove(session);
if (ReferenceEquals(_leaderSession, session))
{
_leaderSession = _sessions.Count > 0 ? _sessions[0] : null;
if (_leaderSession is not null)
{
_leaderSession.IsLeader = true;
_leaderSession.Channel.Writer.TryWrite(AttachTransportFrame.LeaderChanged(true));
}
}
}
}
private string BuildModeReplaySequence()
{
StringBuilder builder = new();
if (_mouseTrackingEnabled)
{
builder.Append("\x1b[?1000h");
builder.Append("\x1b[?1002h");
builder.Append("\x1b[?1003h");
}
if (_sgrMouseModeEnabled)
{
builder.Append("\x1b[?1006h");
}
if (_bracketedPasteEnabled)
{
builder.Append("\x1b[?2004h");
}
return builder.ToString();
}
}
sealed class SessionJoinAttachSession : IAsyncDisposable
{
private readonly SessionJoinPresentationFilter _filter;
private bool _disposed;
public SessionJoinAttachSession(SessionJoinPresentationFilter filter, Channel<AttachTransportFrame> channel, int width, int height, bool isLeader, string? initialScreen)
{
_filter = filter;
Channel = channel;
IsLeader = isLeader;
Width = width;
Height = height;
InitialScreen = initialScreen;
}
public ChannelReader<AttachTransportFrame> Frames => Channel.Reader;
public Channel<AttachTransportFrame> Channel { get; }
public bool IsLeader { get; set; }
public int Width { get; }
public int Height { get; }
public string? InitialScreen { get; }
public Task SendInputAsync(byte[] data) => _filter.SendInputFromSessionAsync(this, data);
public Task SendResizeAsync(int width, int height) => _filter.SendResizeFromSessionAsync(this, width, height);
public Task ClaimLeadAsync() => _filter.ClaimLeadFromSessionAsync(this);
public ValueTask DisposeAsync()
{
if (_disposed)
{
return ValueTask.CompletedTask;
}
_disposed = true;
_filter.RemoveSession(this);
Channel.Writer.TryComplete();
return ValueTask.CompletedTask;
}
}
enum AttachTransportFrameKind
{
Output,
Resize,
LeaderChanged,
Exit,
}
readonly record struct AttachTransportFrame(AttachTransportFrameKind Kind, ReadOnlyMemory<byte> Data)
{
public static AttachTransportFrame Output(ReadOnlyMemory<byte> data) => new(AttachTransportFrameKind.Output, data);
public static AttachTransportFrame Resize(int width, int height)
{
byte[] bytes = new byte[8];
BitConverter.TryWriteBytes(bytes.AsSpan(0, 4), width);
BitConverter.TryWriteBytes(bytes.AsSpan(4, 4), height);
return new AttachTransportFrame(AttachTransportFrameKind.Resize, bytes);
}
public static AttachTransportFrame LeaderChanged(bool isLeader) =>
new(AttachTransportFrameKind.LeaderChanged, new byte[] { isLeader ? (byte)1 : (byte)0 });
public static AttachTransportFrame Exit() => new(AttachTransportFrameKind.Exit, default);
public (int Width, int Height) GetResize()
{
ReadOnlySpan<byte> span = Data.Span;
return (BitConverter.ToInt32(span[..4]), BitConverter.ToInt32(span[4..8]));
}
public bool GetIsLeader() => !Data.IsEmpty && Data.Span[0] != 0;
}
sealed record AttachTransportResult(
bool Success,
int Width,
int Height,
bool IsLeader,
string? InitialScreen,
string? Error);
interface IAttachTransport : IAsyncDisposable
{
Task<AttachTransportResult> ConnectAsync(CancellationToken cancellationToken);
Task SendInputAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken);
Task SendResizeAsync(int width, int height, CancellationToken cancellationToken);
Task ClaimLeadAsync(CancellationToken cancellationToken);
Task DetachAsync(CancellationToken cancellationToken);
Task ShutdownAsync(CancellationToken cancellationToken);
IAsyncEnumerable<AttachTransportFrame> ReadFramesAsync(CancellationToken cancellationToken);
}
sealed class SocketAttachTransport(string socketPath) : IAttachTransport
{
private readonly string _socketPath = socketPath;
private Socket? _socket;
private NetworkStream? _stream;
private StreamReader? _reader;
private StreamWriter? _writer;
public async Task<AttachTransportResult> ConnectAsync(CancellationToken cancellationToken)
{
try
{
_socket = new Socket(AddressFamily.Unix, SocketType.Stream, ProtocolType.Unspecified);
await _socket.ConnectAsync(new UnixDomainSocketEndPoint(_socketPath), cancellationToken);
_stream = new NetworkStream(_socket, ownsSocket: false);
_reader = new StreamReader(_stream, Encoding.UTF8, leaveOpen: true);
_writer = new StreamWriter(_stream, Encoding.UTF8, bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
AttachSocketRequest request = new() { Method = "attach" };
string requestJson = JsonSerializer.Serialize(request, Hex1bAppJsonSerializerContext.Default.AttachSocketRequest);
await _writer.WriteLineAsync(requestJson.AsMemory(), cancellationToken);
string? responseLine = await _reader.ReadLineAsync(cancellationToken);
if (string.IsNullOrWhiteSpace(responseLine))
{
return new AttachTransportResult(false, 0, 0, false, null, "Empty response from terminal host.");
}
AttachSocketResponse? response = JsonSerializer.Deserialize(responseLine, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse);
if (response is null)
{
return new AttachTransportResult(false, 0, 0, false, null, "Failed to deserialize attach response.");
}
if (!response.Success)
{
return new AttachTransportResult(false, 0, 0, false, null, response.Error ?? "Attach failed.");
}
return new AttachTransportResult(true, response.Width ?? 0, response.Height ?? 0, response.Leader ?? false, response.Data, null);
}
catch (Exception ex) when (ex is IOException or SocketException or JsonException)
{
return new AttachTransportResult(false, 0, 0, false, null, ex.Message);
}
}
public Task SendInputAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) =>
WriteLineAsync($"i:{Convert.ToBase64String(data.Span)}", cancellationToken);
public Task SendResizeAsync(int width, int height, CancellationToken cancellationToken) =>
WriteLineAsync($"r:{width},{height}", cancellationToken);
public Task ClaimLeadAsync(CancellationToken cancellationToken) =>
WriteLineAsync("lead", cancellationToken);
public Task DetachAsync(CancellationToken cancellationToken) =>
WriteLineAsync("detach", cancellationToken);
public Task ShutdownAsync(CancellationToken cancellationToken) =>
WriteLineAsync("shutdown", cancellationToken);
public async IAsyncEnumerable<AttachTransportFrame> ReadFramesAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
if (_reader is null)
{
yield break;
}
while (!cancellationToken.IsCancellationRequested)
{
string? line;
try
{
line = await _reader.ReadLineAsync(cancellationToken);
}
catch (OperationCanceledException)
{
yield break;
}
catch (IOException)
{
yield break;
}
if (line is null)
{
yield break;
}
if (line.StartsWith("o:", StringComparison.Ordinal))
{
byte[] outputBytes = Convert.FromBase64String(line[2..]);
yield return AttachTransportFrame.Output(outputBytes);
continue;
}
if (line.StartsWith("r:", StringComparison.Ordinal))
{
string[] parts = line[2..].Split(',');
if (parts.Length == 2 &&
int.TryParse(parts[0], NumberStyles.Integer, CultureInfo.InvariantCulture, out int width) &&
int.TryParse(parts[1], NumberStyles.Integer, CultureInfo.InvariantCulture, out int height))
{
yield return AttachTransportFrame.Resize(width, height);
}
continue;
}
if (line.StartsWith("leader:", StringComparison.Ordinal))
{
yield return AttachTransportFrame.LeaderChanged(string.Equals(line[7..], "true", StringComparison.OrdinalIgnoreCase));
continue;
}
if (string.Equals(line, "exit", StringComparison.OrdinalIgnoreCase))
{
yield return AttachTransportFrame.Exit();
yield break;
}
}
}
public async ValueTask DisposeAsync()
{
if (_writer is not null)
{
await _writer.DisposeAsync();
_writer = null;
}
_reader?.Dispose();
_reader = null;
if (_stream is not null)
{
await _stream.DisposeAsync();
_stream = null;
}
_socket?.Dispose();
_socket = null;
}
private Task WriteLineAsync(string line, CancellationToken cancellationToken)
{
if (_writer is null)
{
return Task.CompletedTask;
}
return _writer.WriteLineAsync(line.AsMemory(), cancellationToken);
}
}
sealed class PipeAttachTransport(string pipeName) : IAttachTransport
{
private readonly string _pipeName = pipeName;
private static readonly TimeSpan ConnectTimeout = TimeSpan.FromSeconds(30);
private NamedPipeClientStream? _pipe;
private StreamReader? _reader;
private StreamWriter? _writer;
public async Task<AttachTransportResult> ConnectAsync(CancellationToken cancellationToken)
{
try
{
_pipe = new NamedPipeClientStream(".", _pipeName, PipeDirection.InOut, System.IO.Pipes.PipeOptions.Asynchronous);
using CancellationTokenSource timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
timeoutCts.CancelAfter(ConnectTimeout);
await _pipe.ConnectAsync(timeoutCts.Token);
_reader = new StreamReader(_pipe, new UTF8Encoding(false), detectEncodingFromByteOrderMarks: false, bufferSize: 1024, leaveOpen: true);
_writer = new StreamWriter(_pipe, new UTF8Encoding(false), bufferSize: 1024, leaveOpen: true) { AutoFlush = true };
AttachSocketRequest request = new() { Method = "attach" };
string requestJson = JsonSerializer.Serialize(request, Hex1bAppJsonSerializerContext.Default.AttachSocketRequest);
await _writer.WriteLineAsync(requestJson.AsMemory(), timeoutCts.Token);
string? responseLine = await _reader.ReadLineAsync(timeoutCts.Token);
if (string.IsNullOrWhiteSpace(responseLine))
{
return new AttachTransportResult(false, 0, 0, false, null, "Empty response from session host.");
}
AttachSocketResponse? response = JsonSerializer.Deserialize(responseLine, Hex1bAppJsonSerializerContext.Default.AttachSocketResponse);
if (response is null)
{
return new AttachTransportResult(false, 0, 0, false, null, "Failed to deserialize session attach response.");
}
if (!response.Success)
{
return new AttachTransportResult(false, 0, 0, false, null, response.Error ?? "Session attach failed.");
}
return new AttachTransportResult(true, response.Width ?? 0, response.Height ?? 0, response.Leader ?? false, response.Data, null);
}
catch (OperationCanceledException) when (!cancellationToken.IsCancellationRequested)
{
return new AttachTransportResult(false, 0, 0, false, null, "Timed out waiting for the session join pipe.");
}
catch (IOException ex)
{
return new AttachTransportResult(false, 0, 0, false, null, ex.Message);
}
catch (JsonException ex)
{
return new AttachTransportResult(false, 0, 0, false, null, ex.Message);
}
}
public Task SendInputAsync(ReadOnlyMemory<byte> data, CancellationToken cancellationToken) =>
WriteLineAsync($"i:{Convert.ToBase64String(data.Span)}", cancellationToken);
public Task SendResizeAsync(int width, int height, CancellationToken cancellationToken) =>
WriteLineAsync($"r:{width},{height}", cancellationToken);
public Task ClaimLeadAsync(CancellationToken cancellationToken) =>
WriteLineAsync("lead", cancellationToken);
public Task DetachAsync(CancellationToken cancellationToken) =>
WriteLineAsync("detach", cancellationToken);
public Task ShutdownAsync(CancellationToken cancellationToken) =>
WriteLineAsync("shutdown", cancellationToken);
public async IAsyncEnumerable<AttachTransportFrame> ReadFramesAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
if (_reader is null)
{
yield break;
}
while (!cancellationToken.IsCancellationRequested)
{
string? line;
try
{
line = await _reader.ReadLineAsync(cancellationToken);
}
catch (OperationCanceledException)
{
yield break;
}
catch (IOException)
{
yield break;
}
if (line is null)
{
yield break;
}
if (line.StartsWith("o:", StringComparison.Ordinal))
{
yield return AttachTransportFrame.Output(Convert.FromBase64String(line[2..]));
continue;
}
if (line.StartsWith("r:", StringComparison.Ordinal))
{
string[] parts = line[2..].Split(',');
if (parts.Length == 2 &&
int.TryParse(parts[0], NumberStyles.Integer, CultureInfo.InvariantCulture, out int width) &&
int.TryParse(parts[1], NumberStyles.Integer, CultureInfo.InvariantCulture, out int height))
{
yield return AttachTransportFrame.Resize(width, height);
}
continue;
}
if (line.StartsWith("leader:", StringComparison.Ordinal))
{
yield return AttachTransportFrame.LeaderChanged(string.Equals(line[7..], "true", StringComparison.OrdinalIgnoreCase));
continue;
}
if (string.Equals(line, "exit", StringComparison.OrdinalIgnoreCase))
{
yield return AttachTransportFrame.Exit();
yield break;
}
}
}
public async ValueTask DisposeAsync()
{
if (_writer is not null)
{
await _writer.DisposeAsync();
_writer = null;
}
_reader?.Dispose();
_reader = null;
if (_pipe is not null)
{
await _pipe.DisposeAsync();
_pipe = null;
}
}
private Task WriteLineAsync(string line, CancellationToken cancellationToken)
{
if (_writer is null)
{
return Task.CompletedTask;
}
return _writer.WriteLineAsync(line.AsMemory(), cancellationToken);
}
}
sealed class JoinSessionClient(IAttachTransport transport) : IAsyncDisposable
{
private readonly IAttachTransport _transport = transport;
private readonly Pipe _outputPipe = new();
private int _initialScreenReplayStarted;
private bool _isLeader;
private bool _shutdownRequested;
private int _remoteWidth;
private int _remoteHeight;
private int _displayWidth;
private int _displayHeight;
private Hex1bTerminal? _terminal;
private CancellationTokenSource? _sessionCts;
public async Task<int> RunAsync(AttachTransportResult connectResult, CancellationToken cancellationToken)
{
if (!connectResult.Success)
{
AnsiConsole.MarkupLine($"[red]Failed to join the terminal session:[/] {Markup.Escape(connectResult.Error ?? "Unknown error.")}");
return 1;
}
_isLeader = connectResult.IsLeader;
_remoteWidth = connectResult.Width;
_remoteHeight = connectResult.Height;
bool preSizedToDisplay = await TryApplyInitialResizeAsync(cancellationToken);
InputInterceptStream inputIntercept = new(this);
StreamWorkloadAdapter workload = new(_outputPipe.Reader.AsStream(), inputIntercept)
{
Width = _remoteWidth,
Height = _remoteHeight,
};
_terminal = Hex1bTerminal.CreateBuilder()
.WithDimensions(_remoteWidth, _remoteHeight)
.WithWorkload(workload)
.AddPresentationFilter(new ResizeFilter(this))
.AddWorkloadFilter(new CtrlCDetachFilter(this))
.Build();
_sessionCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
TaskCompletionSource firstRemoteOutput = new(TaskCreationOptions.RunContinuationsAsynchronously);
Task networkOutputTask = PumpNetworkOutputAsync(firstRemoteOutput, _sessionCts.Token);
if (!preSizedToDisplay && !string.IsNullOrEmpty(connectResult.InitialScreen))
{
await WriteInitialScreenAsync(connectResult.InitialScreen, _sessionCts.Token);
}
else if (preSizedToDisplay && !string.IsNullOrEmpty(connectResult.InitialScreen))
{
_ = Task.Run(
async () =>
{
try
{
Task completed = await Task.WhenAny(firstRemoteOutput.Task, Task.Delay(750, _sessionCts.Token));
if (completed != firstRemoteOutput.Task)
{
await WriteInitialScreenAsync(connectResult.InitialScreen, _sessionCts.Token);
}
}
catch (OperationCanceledException)
{
}
});
}
ConsoleCancelEventHandler cancelHandler = (_, args) =>
{
args.Cancel = true;
RequestStop();
};
Console.CancelKeyPress += cancelHandler;
try
{
await _terminal.RunAsync(_sessionCts.Token);
}
catch (OperationCanceledException)
{
}
finally
{
if (_sessionCts is not null)
{
await _sessionCts.CancelAsync();
}
if (_shutdownRequested)
{
try
{
await _transport.ShutdownAsync(CancellationToken.None);
}
catch
{
}
}
else
{
try
{
await _transport.DetachAsync(CancellationToken.None);
}
catch
{
}
}
_outputPipe.Writer.Complete();
try
{
await networkOutputTask;
}
catch
{
}
Console.CancelKeyPress -= cancelHandler;
}
return 0;
}
public async ValueTask DisposeAsync()
{
if (_sessionCts is not null)
{
await _sessionCts.CancelAsync();
_sessionCts.Dispose();
_sessionCts = null;
}
if (_terminal is not null)
{
await _terminal.DisposeAsync();
_terminal = null;
}
}
internal async Task SendInputAsync(byte[] buffer, int offset, int count)
{
await _transport.SendInputAsync(buffer.AsMemory(offset, count), CancellationToken.None);
}
internal async Task HandleCommandByteAsync(byte commandByte)
{
switch (commandByte)
{
case (byte)'c':
case (byte)'C':
await _transport.SendInputAsync(new byte[] { 0x03 }, CancellationToken.None);
break;
case (byte)'d':
case (byte)'D':
RequestStop();
break;
case (byte)'l':
case (byte)'L':
await _transport.ClaimLeadAsync(CancellationToken.None);
break;
case (byte)'q':
case (byte)'Q':
_shutdownRequested = true;
RequestStop();
break;
case 0x1D:
await _transport.SendInputAsync(new byte[] { 0x1D }, CancellationToken.None);
break;
}
}
internal void HandleLocalCtrlCIntercept(int count)
{
if (count <= 0)
{
return;
}
RequestStop();
}
private async Task WriteInitialScreenAsync(string? initialScreen, CancellationToken cancellationToken)
{
if (string.IsNullOrEmpty(initialScreen))
{
return;
}
if (Interlocked.Exchange(ref _initialScreenReplayStarted, 1) != 0)
{
return;
}
byte[] initialBytes = Encoding.UTF8.GetBytes(initialScreen);
await _outputPipe.Writer.WriteAsync(initialBytes, cancellationToken);
await _outputPipe.Writer.FlushAsync(cancellationToken);
}
private async Task PumpNetworkOutputAsync(TaskCompletionSource firstRemoteOutput, CancellationToken cancellationToken)
{
try
{
await foreach (AttachTransportFrame frame in _transport.ReadFramesAsync(cancellationToken))
{
switch (frame.Kind)
{
case AttachTransportFrameKind.Output:
await _outputPipe.Writer.WriteAsync(frame.Data, cancellationToken);
await _outputPipe.Writer.FlushAsync(cancellationToken);
firstRemoteOutput.TrySetResult();
break;
case AttachTransportFrameKind.Resize:
(int width, int height) = frame.GetResize();
_remoteWidth = width;
_remoteHeight = height;
_terminal?.Resize(width, height);
break;
case AttachTransportFrameKind.LeaderChanged:
_isLeader = frame.GetIsLeader();
if (_isLeader)
{
await SendResizeForCurrentDisplayAsync();
}
break;
case AttachTransportFrameKind.Exit:
RequestStop();
return;
}
}
}
catch (OperationCanceledException)
{
}
catch
{
RequestStop();
}
finally
{
firstRemoteOutput.TrySetResult();
_outputPipe.Writer.Complete();
}
}
private async Task HandleDisplayResizeAsync(int displayWidth, int displayHeight)
{
_displayWidth = displayWidth;
_displayHeight = displayHeight;
if (_isLeader)
{
await SendResizeForCurrentDisplayAsync();
}
}
private async Task<bool> TryApplyInitialResizeAsync(CancellationToken cancellationToken)
{
if (!_isLeader)
{
return false;
}
(int Width, int Height)? displaySize = TryGetCurrentDisplaySize();
if (displaySize is null)
{
return false;
}
_displayWidth = displaySize.Value.Width;
_displayHeight = displaySize.Value.Height;
(int Width, int Height)? targetSize = CalculateResizeTarget(_displayWidth, _displayHeight, _remoteWidth, _remoteHeight);
if (targetSize is null)
{
return false;
}
_remoteWidth = targetSize.Value.Width;
_remoteHeight = targetSize.Value.Height;
await _transport.SendResizeAsync(_remoteWidth, _remoteHeight, cancellationToken);
return true;
}
private async Task SendResizeForCurrentDisplayAsync()
{
(int Width, int Height)? targetSize = CalculateResizeTarget(_displayWidth, _displayHeight, _remoteWidth, _remoteHeight);
if (targetSize is null)
{
return;
}
_remoteWidth = targetSize.Value.Width;
_remoteHeight = targetSize.Value.Height;
await _transport.SendResizeAsync(_remoteWidth, _remoteHeight, CancellationToken.None);
}
private static (int Width, int Height)? CalculateResizeTarget(int displayWidth, int displayHeight, int remoteWidth, int remoteHeight)
{
int targetWidth = displayWidth;
int targetHeight = displayHeight;
if (targetWidth < 1 || targetHeight < 1)
{
return null;
}
if (targetWidth == remoteWidth && targetHeight == remoteHeight)
{
return null;
}
return (targetWidth, targetHeight);
}
private static (int Width, int Height)? TryGetCurrentDisplaySize()
{
try
{
int width = Console.WindowWidth;
int height = Console.WindowHeight;
return width > 0 && height > 0 ? (width, height) : null;
}
catch (IOException)
{
return null;
}
catch (InvalidOperationException)
{
return null;
}
}
private void RequestStop()
{
if (_sessionCts is { IsCancellationRequested: false } sessionCts)
{
_ = sessionCts.CancelAsync();
}
}
private sealed class ResizeFilter(JoinSessionClient client) : IHex1bTerminalPresentationFilter
{
public ValueTask OnSessionStartAsync(int width, int height, DateTimeOffset timestamp, CancellationToken ct = default) =>
new(client.HandleDisplayResizeAsync(width, height));
public ValueTask<IReadOnlyList<AnsiToken>> OnOutputAsync(IReadOnlyList<AppliedToken> appliedTokens, TimeSpan elapsed, CancellationToken ct = default) =>
ValueTask.FromResult<IReadOnlyList<AnsiToken>>([.. appliedTokens.Select(token => token.Token)]);
public ValueTask OnInputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnResizeAsync(int width, int height, TimeSpan elapsed, CancellationToken ct = default) =>
new(client.HandleDisplayResizeAsync(width, height));
public ValueTask OnSessionEndAsync(TimeSpan elapsed, CancellationToken ct = default) => default;
}
private sealed class CtrlCDetachFilter(JoinSessionClient client) : IHex1bTerminalWorkloadFilter
{
public ValueTask OnSessionStartAsync(int width, int height, DateTimeOffset timestamp, CancellationToken ct = default) => default;
public ValueTask OnOutputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnFrameCompleteAsync(TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnInputAsync(IReadOnlyList<AnsiToken> tokens, TimeSpan elapsed, CancellationToken ct = default)
{
int ctrlCCount = tokens.OfType<ControlCharacterToken>().Count(token => token.Character == '\u0003');
if (ctrlCCount > 0)
{
client.HandleLocalCtrlCIntercept(ctrlCCount);
}
return default;
}
public ValueTask OnResizeAsync(int width, int height, TimeSpan elapsed, CancellationToken ct = default) => default;
public ValueTask OnSessionEndAsync(TimeSpan elapsed, CancellationToken ct = default) => default;
}
private sealed class InputInterceptStream(JoinSessionClient client) : Stream
{
private bool _inCommandMode;
public override bool CanRead => false;
public override bool CanSeek => false;
public override bool CanWrite => true;
public override long Length => throw new NotSupportedException();
public override long Position
{
get => throw new NotSupportedException();
set => throw new NotSupportedException();
}
public override void Flush()
{
}
public override Task FlushAsync(CancellationToken cancellationToken) => Task.CompletedTask;
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) =>
WriteAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
const byte CtrlC = 0x03;
const byte CtrlRightBracket = 0x1D;
int start = offset;
for (int index = offset; index < offset + count; index++)
{
if (_inCommandMode)
{
_inCommandMode = false;
await client.HandleCommandByteAsync(buffer[index]);
start = index + 1;
continue;
}
if (buffer[index] == CtrlC)
{
if (index > start)
{
await client.SendInputAsync(buffer, start, index - start);
}
client.HandleLocalCtrlCIntercept(1);
start = index + 1;
continue;
}
if (buffer[index] == CtrlRightBracket)
{
if (index > start)
{
await client.SendInputAsync(buffer, start, index - start);
}
_inCommandMode = true;
start = index + 1;
}
}
if (start < offset + count && !_inCommandMode)
{
await client.SendInputAsync(buffer, start, offset + count - start);
}
}
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
byte[] copy = buffer.ToArray();
await WriteAsync(copy, 0, copy.Length, cancellationToken);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment