Created
January 4, 2025 10:57
-
-
Save monotykamary/1d4e156a3be88f0966b12790f06c6dee to your computer and use it in GitHub Desktop.
BEAM-inspired scheduler in RxJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Scheduler } from 'rxjs'; | |
import { Subscription } from 'rxjs'; | |
import { Action } from 'rxjs/internal/scheduler/Action'; | |
export function arrRemove<T>(arr: T[] | undefined | null, item: T) { | |
if (arr) { | |
const index = arr.indexOf(item); | |
0 <= index && arr.splice(index, 1); | |
} | |
} | |
export type TimerHandle = number | ReturnType<typeof setTimeout>; | |
type SetIntervalFunction = ( | |
handler: () => void, | |
timeout?: number, | |
...args: any[] | |
) => TimerHandle; | |
type ClearIntervalFunction = (handle: TimerHandle) => void; | |
interface IntervalProvider { | |
setInterval: SetIntervalFunction; | |
clearInterval: ClearIntervalFunction; | |
delegate: | |
| { | |
setInterval: SetIntervalFunction; | |
clearInterval: ClearIntervalFunction; | |
} | |
| undefined; | |
} | |
export const intervalProvider: IntervalProvider = { | |
// When accessing the delegate, use the variable rather than `this` so that | |
// the functions can be called without being bound to the provider. | |
setInterval(handler: () => void, timeout?: number, ...args) { | |
const { delegate } = intervalProvider; | |
if (delegate?.setInterval) { | |
return delegate.setInterval(handler, timeout, ...args); | |
} | |
return setInterval(handler, timeout, ...args); | |
}, | |
clearInterval(handle) { | |
const { delegate } = intervalProvider; | |
return (delegate?.clearInterval || clearInterval)(handle as any); | |
}, | |
delegate: undefined, | |
}; | |
export class BeamAction<T> extends Action<T> { | |
public id: any; | |
public state?: T; | |
public delay!: number; | |
protected pending: boolean = false; | |
private reductionsLeft: number; | |
constructor( | |
protected scheduler: BeamScheduler, | |
protected work: (this: Action<T>, state?: T) => void | |
) { | |
super(scheduler, work); | |
this.reductionsLeft = scheduler.REDUCTIONS_PER_SLICE; | |
} | |
public schedule(state?: T, delay: number = 0): Subscription { | |
if (this.closed) { | |
return this; | |
} | |
this.state = state; | |
const id = this.id; | |
const scheduler = this.scheduler; | |
if (id != null) { | |
this.id = this.recycleAsyncId(scheduler, id, delay); | |
} | |
this.pending = true; | |
this.delay = delay; | |
this.id = this.id ?? this.requestAsyncId(scheduler, this.id, delay); | |
return this; | |
} | |
protected requestAsyncId( | |
scheduler: BeamScheduler, | |
_id?: any, | |
delay: number = 0 | |
): any { | |
return intervalProvider.setInterval( | |
scheduler.flush.bind(scheduler, this), | |
delay || scheduler.schedulerPeriodMs | |
); | |
} | |
protected recycleAsyncId( | |
_scheduler: BeamScheduler, | |
id?: any, | |
delay: number | null = 0 | |
): any { | |
if (delay != null && this.delay === delay && this.pending === false) { | |
return id; | |
} | |
if (id != null) { | |
intervalProvider.clearInterval(id); | |
} | |
return undefined; | |
} | |
public execute(state: T, delay: number): any { | |
if (this.closed) { | |
return new Error('executing a cancelled action'); | |
} | |
this.pending = false; | |
if (this.reductionsLeft <= 0) { | |
// Reset reductions and reschedule | |
this.reductionsLeft = this.scheduler.REDUCTIONS_PER_SLICE; | |
this.reschedule(); | |
return undefined; | |
} | |
const error = this._execute(state, delay); | |
this.reductionsLeft--; | |
if (error) { | |
return error; | |
} else if (this.pending === false && this.id != null) { | |
this.id = this.recycleAsyncId(this.scheduler, this.id, null); | |
} | |
} | |
private reschedule(): void { | |
const scheduler = this.scheduler; | |
this.schedule(this.state, scheduler.schedulerPeriodMs); | |
} | |
protected _execute(state: T, _delay: number): any { | |
let errored: boolean = false; | |
let errorValue: any; | |
try { | |
this.work(state); | |
} catch (e) { | |
errored = true; | |
errorValue = e ? e : new Error('Scheduled action threw falsy error'); | |
} | |
if (errored) { | |
this.unsubscribe(); | |
return errorValue; | |
} | |
} | |
unsubscribe() { | |
if (!this.closed) { | |
const { id, scheduler } = this; | |
const { actions } = scheduler; | |
this.work = this.state = this.scheduler = null!; | |
this.pending = false; | |
arrRemove(actions, this); | |
if (id != null) { | |
this.id = this.recycleAsyncId(scheduler, id, null); | |
} | |
this.delay = null!; | |
super.unsubscribe(); | |
} | |
} | |
} | |
export class BeamScheduler extends Scheduler { | |
public actions: Array<BeamAction<any>> = []; | |
public _active: boolean = false; | |
public readonly REDUCTIONS_PER_SLICE = 2000; | |
constructor(public schedulerPeriodMs = 4) { | |
super(BeamAction); | |
} | |
public flush(action: BeamAction<any>): void { | |
const { actions } = this; | |
if (this._active) { | |
actions.push(action); | |
return; | |
} | |
let error: any; | |
this._active = true; | |
do { | |
if ((error = action.execute(action.state, action.delay))) { | |
break; | |
} | |
} while ((action = actions.shift()!)); | |
this._active = false; | |
if (error) { | |
while ((action = actions.shift()!)) { | |
action.unsubscribe(); | |
} | |
throw error; | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example 1: Basic CPU-intensive task with cooperative scheduling | |
function demonstrateBasicScheduling() { | |
const scheduler = new BeamScheduler(4); // 4ms scheduling period | |
const heavyComputation = new Observable((subscriber) => { | |
let count = 0; | |
const compute = () => { | |
// Simulate CPU-intensive work | |
for (let i = 0; i < 1000000; i++) { | |
count += Math.sqrt(i); | |
} | |
subscriber.next(count); | |
if (!subscriber.closed) { | |
scheduler.schedule(compute); | |
} | |
}; | |
scheduler.schedule(compute); | |
}); | |
heavyComputation.subscribe({ | |
next: (value) => console.log('Computed value:', value), | |
complete: () => console.log('Computation complete'), | |
}); | |
} | |
// Example 2: Multiple concurrent tasks with fair scheduling | |
function demonstrateConcurrentScheduling() { | |
const scheduler = new BeamScheduler(4); | |
const task1 = new Observable((subscriber) => { | |
let iterations = 0; | |
const work = () => { | |
iterations++; | |
subscriber.next(`Task 1: ${iterations}`); | |
if (iterations < 100) { | |
scheduler.schedule(work); | |
} else { | |
subscriber.complete(); | |
} | |
}; | |
scheduler.schedule(work); | |
}); | |
const task2 = new Observable((subscriber) => { | |
let iterations = 0; | |
const work = () => { | |
iterations++; | |
subscriber.next(`Task 2: ${iterations}`); | |
if (iterations < 100) { | |
scheduler.schedule(work); | |
} else { | |
subscriber.complete(); | |
} | |
}; | |
scheduler.schedule(work); | |
}); | |
// Run both tasks concurrently | |
task1.subscribe(console.log); | |
task2.subscribe(console.log); | |
} | |
// Example 3: Priority-based scheduling | |
class PriorityBeamScheduler extends BeamScheduler { | |
private priorityQueue: Array<[number, BeamAction<any>]> = []; | |
constructor(schedulerPeriodMs = 4) { | |
super(schedulerPeriodMs); | |
} | |
scheduleWithPriority(work: () => void, priority: number = 0) { | |
const action = new BeamAction(this, () => work()); | |
this.priorityQueue.push([priority, action]); | |
this.priorityQueue.sort((a, b) => b[0] - a[0]); // Higher priority first | |
action.schedule(); | |
} | |
public flush(action: BeamAction<any>): void { | |
if (this._active) { | |
this.actions.push(action); | |
return; | |
} | |
this._active = true; | |
let error: any; | |
// Process priority queue first | |
while (this.priorityQueue.length > 0) { | |
const [_, priorityAction] = this.priorityQueue.shift()!; | |
if ( | |
(error = priorityAction.execute( | |
priorityAction.state, | |
priorityAction.delay | |
)) | |
) { | |
break; | |
} | |
} | |
// Then process regular actions | |
do { | |
if ((error = action.execute(action.state, action.delay))) { | |
break; | |
} | |
} while ((action = this.actions.shift()!)); | |
this._active = false; | |
if (error) { | |
while ((action = this.actions.shift()!)) { | |
action.unsubscribe(); | |
} | |
throw error; | |
} | |
} | |
} | |
// Example usage of priority scheduling | |
function demonstratePriorityScheduling() { | |
const scheduler = new PriorityBeamScheduler(4); | |
// High priority task | |
scheduler.scheduleWithPriority(() => { | |
console.log('High priority task executed'); | |
}, 2); | |
// Medium priority task | |
scheduler.scheduleWithPriority(() => { | |
console.log('Medium priority task executed'); | |
}, 1); | |
// Low priority task | |
scheduler.scheduleWithPriority(() => { | |
console.log('Low priority task executed'); | |
}, 0); | |
} | |
// Example 4: Adaptive scheduling based on system load | |
class AdaptiveBeamScheduler extends BeamScheduler { | |
private loadFactor: number = 1.0; | |
private readonly MIN_PERIOD_MS = 1; | |
private readonly MAX_PERIOD_MS = 16; | |
constructor(initialPeriodMs = 4) { | |
super(initialPeriodMs); | |
this.monitorSystemLoad(); | |
} | |
private monitorSystemLoad() { | |
setInterval(() => { | |
// Simple load monitoring based on queue size | |
const queueSize = this.actions.length; | |
if (queueSize > 100) { | |
this.loadFactor = Math.min(this.loadFactor * 1.2, 2.0); | |
} else if (queueSize < 10) { | |
this.loadFactor = Math.max(this.loadFactor * 0.8, 0.5); | |
} | |
// Adjust scheduler period based on load | |
this.schedulerPeriodMs = Math.max( | |
this.MIN_PERIOD_MS, | |
Math.min(this.MAX_PERIOD_MS, Math.floor(4 * this.loadFactor)) | |
); | |
}, 1000); | |
} | |
} | |
// Usage | |
const adaptiveScheduler = new AdaptiveBeamScheduler(); | |
// The scheduler will automatically adjust its period based on system load |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment