Skip to content

Instantly share code, notes, and snippets.

@WomB0ComB0
Created November 15, 2024 18:56
Show Gist options
  • Save WomB0ComB0/53c20e004878adcb69e25d2c47646833 to your computer and use it in GitHub Desktop.
Save WomB0ComB0/53c20e004878adcb69e25d2c47646833 to your computer and use it in GitHub Desktop.
Typescript multi-threading template (will apply opinionated changes soon)
// types.ts
export interface WorkerMessage<T = any> {
id: string;
type: 'TASK' | 'RESULT' | 'ERROR' | 'STATUS';
payload: T;
timestamp: number;
}
export interface WorkerTask {
id: string;
data: any;
type: string;
}
export interface WorkerResult {
taskId: string;
result: any;
error?: string;
executionTime?: number;
}
// worker.ts
class WorkerThread {
private taskHandlers: Map<string, (data: any) => Promise<any>> = new Map();
constructor() {
// Register message handler
self.onmessage = this.handleMessage.bind(this);
// Register default task handlers
this.registerTaskHandler('DEFAULT', async (data) => {
// Default task implementation
return data;
});
}
private async handleMessage(event: MessageEvent<WorkerMessage>) {
const { id, type, payload } = event.data;
if (type === 'TASK') {
try {
const startTime = performance.now();
const handler = this.taskHandlers.get(payload.type) || this.taskHandlers.get('DEFAULT');
if (!handler) {
throw new Error(`No handler registered for task type: ${payload.type}`);
}
const result = await handler(payload.data);
const executionTime = performance.now() - startTime;
const response: WorkerMessage<WorkerResult> = {
id,
type: 'RESULT',
payload: {
taskId: payload.id,
result,
executionTime
},
timestamp: Date.now()
};
self.postMessage(response);
} catch (error) {
const errorResponse: WorkerMessage = {
id,
type: 'ERROR',
payload: {
taskId: payload.id,
error: error instanceof Error ? error.message : String(error)
},
timestamp: Date.now()
};
self.postMessage(errorResponse);
}
}
}
public registerTaskHandler(type: string, handler: (data: any) => Promise<any>) {
this.taskHandlers.set(type, handler);
}
}
// Initialize worker
const worker = new WorkerThread();
// Register task handlers
worker.registerTaskHandler('COMPUTE_INTENSIVE', async (data) => {
// Example compute-intensive task
let result = 0;
for (let i = 0; i < data.iterations; i++) {
result += Math.sqrt(i);
}
return result;
});
// threadPool.ts
export class ThreadPool {
private workers: Worker[] = [];
private taskQueue: WorkerTask[] = [];
private activeWorkers: Set<Worker> = new Set();
private results: Map<string, WorkerResult> = new Map();
private callbacks: Map<string, [(result: any) => void, (error: any) => void]> = new Map();
private workerScript: string;
private maxWorkers: number;
constructor(workerScript: string, maxWorkers = navigator.hardwareConcurrency || 4) {
this.workerScript = workerScript;
this.maxWorkers = maxWorkers;
}
private createWorker(): Worker {
const worker = new Worker(this.workerScript, { type: 'module' });
worker.onmessage = (event: MessageEvent<WorkerMessage>) => {
const { id, type, payload } = event.data;
if (type === 'RESULT' || type === 'ERROR') {
this.handleWorkerComplete(worker, id, payload);
}
};
worker.onerror = (error) => {
console.error('Worker error:', error);
this.handleWorkerError(worker, error);
};
return worker;
}
private handleWorkerComplete(worker: Worker, taskId: string, result: WorkerResult) {
this.activeWorkers.delete(worker);
this.results.set(taskId, result);
const callbacks = this.callbacks.get(taskId);
if (callbacks) {
const [resolve, reject] = callbacks;
if (result.error) {
reject(result.error);
} else {
resolve(result.result);
}
this.callbacks.delete(taskId);
}
this.processNextTask();
}
private handleWorkerError(worker: Worker, error: ErrorEvent) {
this.activeWorkers.delete(worker);
worker.terminate();
const index = this.workers.indexOf(worker);
if (index !== -1) {
this.workers.splice(index, 1);
}
this.processNextTask();
}
private async processNextTask() {
if (this.taskQueue.length === 0) return;
let worker: Worker | undefined;
// Reuse existing idle worker
for (const w of this.workers) {
if (!this.activeWorkers.has(w)) {
worker = w;
break;
}
}
// Create new worker if needed and possible
if (!worker && this.workers.length < this.maxWorkers) {
worker = this.createWorker();
this.workers.push(worker);
}
if (worker) {
const task = this.taskQueue.shift();
if (task) {
this.activeWorkers.add(worker);
const message: WorkerMessage = {
id: task.id,
type: 'TASK',
payload: task,
timestamp: Date.now()
};
worker.postMessage(message);
}
}
}
public async submitTask<T = any>(type: string, data: any): Promise<T> {
const task: WorkerTask = {
id: crypto.randomUUID(),
type,
data
};
return new Promise((resolve, reject) => {
this.callbacks.set(task.id, [resolve, reject]);
this.taskQueue.push(task);
this.processNextTask();
});
}
public shutdown() {
for (const worker of this.workers) {
worker.terminate();
}
this.workers = [];
this.activeWorkers.clear();
this.taskQueue = [];
this.callbacks.clear();
this.results.clear();
}
public getActiveTaskCount(): number {
return this.activeWorkers.size;
}
public getQueuedTaskCount(): number {
return this.taskQueue.length;
}
public getAllResults(): Map<string, WorkerResult> {
return new Map(this.results);
}
}
// example-usage.ts
async function example() {
// Initialize thread pool
const threadPool = new ThreadPool('worker.js');
try {
// Submit multiple tasks
const tasks = [];
for (let i = 0; i < 10; i++) {
const task = threadPool.submitTask('COMPUTE_INTENSIVE', {
iterations: 1000000
});
tasks.push(task);
}
// Wait for all tasks to complete
const results = await Promise.all(tasks);
console.log('All tasks completed:', results);
// Get execution statistics
const allResults = threadPool.getAllResults();
const totalExecutionTime = Array.from(allResults.values())
.reduce((sum, result) => sum + (result.executionTime || 0), 0);
console.log('Total execution time:', totalExecutionTime);
} catch (error) {
console.error('Error executing tasks:', error);
} finally {
// Clean up
threadPool.shutdown();
}
}
// Start the example
example().catch(console.error);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment