Created
November 15, 2024 18:56
-
-
Save WomB0ComB0/53c20e004878adcb69e25d2c47646833 to your computer and use it in GitHub Desktop.
Typescript multi-threading template (will apply opinionated changes soon)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// types.ts | |
export interface WorkerMessage<T = any> { | |
id: string; | |
type: 'TASK' | 'RESULT' | 'ERROR' | 'STATUS'; | |
payload: T; | |
timestamp: number; | |
} | |
export interface WorkerTask { | |
id: string; | |
data: any; | |
type: string; | |
} | |
export interface WorkerResult { | |
taskId: string; | |
result: any; | |
error?: string; | |
executionTime?: number; | |
} | |
// worker.ts | |
class WorkerThread { | |
private taskHandlers: Map<string, (data: any) => Promise<any>> = new Map(); | |
constructor() { | |
// Register message handler | |
self.onmessage = this.handleMessage.bind(this); | |
// Register default task handlers | |
this.registerTaskHandler('DEFAULT', async (data) => { | |
// Default task implementation | |
return data; | |
}); | |
} | |
private async handleMessage(event: MessageEvent<WorkerMessage>) { | |
const { id, type, payload } = event.data; | |
if (type === 'TASK') { | |
try { | |
const startTime = performance.now(); | |
const handler = this.taskHandlers.get(payload.type) || this.taskHandlers.get('DEFAULT'); | |
if (!handler) { | |
throw new Error(`No handler registered for task type: ${payload.type}`); | |
} | |
const result = await handler(payload.data); | |
const executionTime = performance.now() - startTime; | |
const response: WorkerMessage<WorkerResult> = { | |
id, | |
type: 'RESULT', | |
payload: { | |
taskId: payload.id, | |
result, | |
executionTime | |
}, | |
timestamp: Date.now() | |
}; | |
self.postMessage(response); | |
} catch (error) { | |
const errorResponse: WorkerMessage = { | |
id, | |
type: 'ERROR', | |
payload: { | |
taskId: payload.id, | |
error: error instanceof Error ? error.message : String(error) | |
}, | |
timestamp: Date.now() | |
}; | |
self.postMessage(errorResponse); | |
} | |
} | |
} | |
public registerTaskHandler(type: string, handler: (data: any) => Promise<any>) { | |
this.taskHandlers.set(type, handler); | |
} | |
} | |
// Initialize worker | |
const worker = new WorkerThread(); | |
// Register task handlers | |
worker.registerTaskHandler('COMPUTE_INTENSIVE', async (data) => { | |
// Example compute-intensive task | |
let result = 0; | |
for (let i = 0; i < data.iterations; i++) { | |
result += Math.sqrt(i); | |
} | |
return result; | |
}); | |
// threadPool.ts | |
export class ThreadPool { | |
private workers: Worker[] = []; | |
private taskQueue: WorkerTask[] = []; | |
private activeWorkers: Set<Worker> = new Set(); | |
private results: Map<string, WorkerResult> = new Map(); | |
private callbacks: Map<string, [(result: any) => void, (error: any) => void]> = new Map(); | |
private workerScript: string; | |
private maxWorkers: number; | |
constructor(workerScript: string, maxWorkers = navigator.hardwareConcurrency || 4) { | |
this.workerScript = workerScript; | |
this.maxWorkers = maxWorkers; | |
} | |
private createWorker(): Worker { | |
const worker = new Worker(this.workerScript, { type: 'module' }); | |
worker.onmessage = (event: MessageEvent<WorkerMessage>) => { | |
const { id, type, payload } = event.data; | |
if (type === 'RESULT' || type === 'ERROR') { | |
this.handleWorkerComplete(worker, id, payload); | |
} | |
}; | |
worker.onerror = (error) => { | |
console.error('Worker error:', error); | |
this.handleWorkerError(worker, error); | |
}; | |
return worker; | |
} | |
private handleWorkerComplete(worker: Worker, taskId: string, result: WorkerResult) { | |
this.activeWorkers.delete(worker); | |
this.results.set(taskId, result); | |
const callbacks = this.callbacks.get(taskId); | |
if (callbacks) { | |
const [resolve, reject] = callbacks; | |
if (result.error) { | |
reject(result.error); | |
} else { | |
resolve(result.result); | |
} | |
this.callbacks.delete(taskId); | |
} | |
this.processNextTask(); | |
} | |
private handleWorkerError(worker: Worker, error: ErrorEvent) { | |
this.activeWorkers.delete(worker); | |
worker.terminate(); | |
const index = this.workers.indexOf(worker); | |
if (index !== -1) { | |
this.workers.splice(index, 1); | |
} | |
this.processNextTask(); | |
} | |
private async processNextTask() { | |
if (this.taskQueue.length === 0) return; | |
let worker: Worker | undefined; | |
// Reuse existing idle worker | |
for (const w of this.workers) { | |
if (!this.activeWorkers.has(w)) { | |
worker = w; | |
break; | |
} | |
} | |
// Create new worker if needed and possible | |
if (!worker && this.workers.length < this.maxWorkers) { | |
worker = this.createWorker(); | |
this.workers.push(worker); | |
} | |
if (worker) { | |
const task = this.taskQueue.shift(); | |
if (task) { | |
this.activeWorkers.add(worker); | |
const message: WorkerMessage = { | |
id: task.id, | |
type: 'TASK', | |
payload: task, | |
timestamp: Date.now() | |
}; | |
worker.postMessage(message); | |
} | |
} | |
} | |
public async submitTask<T = any>(type: string, data: any): Promise<T> { | |
const task: WorkerTask = { | |
id: crypto.randomUUID(), | |
type, | |
data | |
}; | |
return new Promise((resolve, reject) => { | |
this.callbacks.set(task.id, [resolve, reject]); | |
this.taskQueue.push(task); | |
this.processNextTask(); | |
}); | |
} | |
public shutdown() { | |
for (const worker of this.workers) { | |
worker.terminate(); | |
} | |
this.workers = []; | |
this.activeWorkers.clear(); | |
this.taskQueue = []; | |
this.callbacks.clear(); | |
this.results.clear(); | |
} | |
public getActiveTaskCount(): number { | |
return this.activeWorkers.size; | |
} | |
public getQueuedTaskCount(): number { | |
return this.taskQueue.length; | |
} | |
public getAllResults(): Map<string, WorkerResult> { | |
return new Map(this.results); | |
} | |
} | |
// example-usage.ts | |
async function example() { | |
// Initialize thread pool | |
const threadPool = new ThreadPool('worker.js'); | |
try { | |
// Submit multiple tasks | |
const tasks = []; | |
for (let i = 0; i < 10; i++) { | |
const task = threadPool.submitTask('COMPUTE_INTENSIVE', { | |
iterations: 1000000 | |
}); | |
tasks.push(task); | |
} | |
// Wait for all tasks to complete | |
const results = await Promise.all(tasks); | |
console.log('All tasks completed:', results); | |
// Get execution statistics | |
const allResults = threadPool.getAllResults(); | |
const totalExecutionTime = Array.from(allResults.values()) | |
.reduce((sum, result) => sum + (result.executionTime || 0), 0); | |
console.log('Total execution time:', totalExecutionTime); | |
} catch (error) { | |
console.error('Error executing tasks:', error); | |
} finally { | |
// Clean up | |
threadPool.shutdown(); | |
} | |
} | |
// Start the example | |
example().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment