Skip to content

Instantly share code, notes, and snippets.

@andyjessop
Created October 10, 2024 09:06
Show Gist options
  • Save andyjessop/baa511956b50ec062cdc4b90335f22a0 to your computer and use it in GitHub Desktop.
Save andyjessop/baa511956b50ec062cdc4b90335f22a0 to your computer and use it in GitHub Desktop.
Server-based Worker Registry
// registry-server.ts
import express from 'express';
import { Server } from 'http';
const app = express();
const port = 3000;
app.use(express.json());
interface ClientInstance {
id: string;
lastHeartbeat: number;
}
const clients: ClientInstance[] = [];
const HEARTBEAT_TIMEOUT = 15000; // 15 seconds
const SERVER_SHUTDOWN_TIMEOUT = 30000; // 30 seconds
let serverShutdownTimeout: NodeJS.Timeout | null = null;
app.get('/health', (req, res) => {
res.status(200).send('OK');
});
app.post('/heartbeat', (req, res) => {
const { instanceId } = req.body;
const clientIndex = clients.findIndex(client => client.id === instanceId);
if (clientIndex !== -1) {
clients[clientIndex].lastHeartbeat = Date.now();
} else {
clients.push({ id: instanceId, lastHeartbeat: Date.now() });
}
if (serverShutdownTimeout) {
clearTimeout(serverShutdownTimeout);
serverShutdownTimeout = null;
}
res.status(200).send('Heartbeat received');
});
app.post('/unregister', (req, res) => {
const { instanceId } = req.body;
const clientIndex = clients.findIndex(client => client.id === instanceId);
if (clientIndex !== -1) {
clients.splice(clientIndex, 1);
}
res.status(200).send('Unregistered');
});
const server: Server = app.listen(port, () => {
console.log(`Registry server listening at http://localhost:${port}`);
});
function checkHeartbeats() {
const now = Date.now();
clients.forEach((client, index) => {
if (now - client.lastHeartbeat > HEARTBEAT_TIMEOUT) {
clients.splice(index, 1);
console.log(`Removed inactive client: ${client.id}`);
}
});
if (clients.length === 0 && !serverShutdownTimeout) {
console.log('No active clients, scheduling server shutdown');
serverShutdownTimeout = setTimeout(() => {
console.log('No clients reconnected, shutting down server');
server.close(() => {
process.exit(0);
});
}, SERVER_SHUTDOWN_TIMEOUT);
}
}
setInterval(checkHeartbeats, 5000); // Check heartbeats every 5 seconds
// WorkerRegistry.ts
import { spawn, ChildProcess } from 'child_process';
import axios from 'axios';
import { v4 as uuidv4 } from 'uuid';
export class WorkerRegistry {
private serverProcess: ChildProcess | null = null;
private serverUrl: string = 'http://localhost:3000';
private heartbeatInterval: NodeJS.Timeout | null = null;
private instanceId: string = uuidv4();
async initialize(): Promise<void> {
if (await this.isServerRunning()) {
console.log('Existing server found');
} else {
await this.startServer();
}
this.startHeartbeat();
}
private async isServerRunning(): Promise<boolean> {
try {
await axios.get(`${this.serverUrl}/health`);
return true;
} catch (error) {
return false;
}
}
private async startServer(): Promise<void> {
this.serverProcess = spawn('node', ['registry-server.js'], {
detached: true,
stdio: 'ignore'
});
this.serverProcess.unref();
// Wait for the server to start
await new Promise<void>((resolve) => {
const checkInterval = setInterval(async () => {
if (await this.isServerRunning()) {
clearInterval(checkInterval);
resolve();
}
}, 100);
});
console.log('New server started');
}
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(async () => {
try {
await axios.post(`${this.serverUrl}/heartbeat`, { instanceId: this.instanceId });
} catch (error) {
console.error('Failed to send heartbeat:', error);
}
}, 5000); // Send heartbeat every 5 seconds
}
async close(): Promise<void> {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
try {
await axios.post(`${this.serverUrl}/unregister`, { instanceId: this.instanceId });
} catch (error) {
console.error('Failed to unregister:', error);
}
}
// Add methods for registering and managing workers here
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment