Skip to content

Instantly share code, notes, and snippets.

@monotykamary
Created January 4, 2025 10:57
Show Gist options
  • Save monotykamary/1d4e156a3be88f0966b12790f06c6dee to your computer and use it in GitHub Desktop.
Save monotykamary/1d4e156a3be88f0966b12790f06c6dee to your computer and use it in GitHub Desktop.
BEAM-inspired scheduler in RxJS
import { Observable, Scheduler } from 'rxjs';
import { Subscription } from 'rxjs';
import { Action } from 'rxjs/internal/scheduler/Action';
export function arrRemove<T>(arr: T[] | undefined | null, item: T) {
if (arr) {
const index = arr.indexOf(item);
0 <= index && arr.splice(index, 1);
}
}
export type TimerHandle = number | ReturnType<typeof setTimeout>;
type SetIntervalFunction = (
handler: () => void,
timeout?: number,
...args: any[]
) => TimerHandle;
type ClearIntervalFunction = (handle: TimerHandle) => void;
interface IntervalProvider {
setInterval: SetIntervalFunction;
clearInterval: ClearIntervalFunction;
delegate:
| {
setInterval: SetIntervalFunction;
clearInterval: ClearIntervalFunction;
}
| undefined;
}
export const intervalProvider: IntervalProvider = {
// When accessing the delegate, use the variable rather than `this` so that
// the functions can be called without being bound to the provider.
setInterval(handler: () => void, timeout?: number, ...args) {
const { delegate } = intervalProvider;
if (delegate?.setInterval) {
return delegate.setInterval(handler, timeout, ...args);
}
return setInterval(handler, timeout, ...args);
},
clearInterval(handle) {
const { delegate } = intervalProvider;
return (delegate?.clearInterval || clearInterval)(handle as any);
},
delegate: undefined,
};
export class BeamAction<T> extends Action<T> {
public id: any;
public state?: T;
public delay!: number;
protected pending: boolean = false;
private reductionsLeft: number;
constructor(
protected scheduler: BeamScheduler,
protected work: (this: Action<T>, state?: T) => void
) {
super(scheduler, work);
this.reductionsLeft = scheduler.REDUCTIONS_PER_SLICE;
}
public schedule(state?: T, delay: number = 0): Subscription {
if (this.closed) {
return this;
}
this.state = state;
const id = this.id;
const scheduler = this.scheduler;
if (id != null) {
this.id = this.recycleAsyncId(scheduler, id, delay);
}
this.pending = true;
this.delay = delay;
this.id = this.id ?? this.requestAsyncId(scheduler, this.id, delay);
return this;
}
protected requestAsyncId(
scheduler: BeamScheduler,
_id?: any,
delay: number = 0
): any {
return intervalProvider.setInterval(
scheduler.flush.bind(scheduler, this),
delay || scheduler.schedulerPeriodMs
);
}
protected recycleAsyncId(
_scheduler: BeamScheduler,
id?: any,
delay: number | null = 0
): any {
if (delay != null && this.delay === delay && this.pending === false) {
return id;
}
if (id != null) {
intervalProvider.clearInterval(id);
}
return undefined;
}
public execute(state: T, delay: number): any {
if (this.closed) {
return new Error('executing a cancelled action');
}
this.pending = false;
if (this.reductionsLeft <= 0) {
// Reset reductions and reschedule
this.reductionsLeft = this.scheduler.REDUCTIONS_PER_SLICE;
this.reschedule();
return undefined;
}
const error = this._execute(state, delay);
this.reductionsLeft--;
if (error) {
return error;
} else if (this.pending === false && this.id != null) {
this.id = this.recycleAsyncId(this.scheduler, this.id, null);
}
}
private reschedule(): void {
const scheduler = this.scheduler;
this.schedule(this.state, scheduler.schedulerPeriodMs);
}
protected _execute(state: T, _delay: number): any {
let errored: boolean = false;
let errorValue: any;
try {
this.work(state);
} catch (e) {
errored = true;
errorValue = e ? e : new Error('Scheduled action threw falsy error');
}
if (errored) {
this.unsubscribe();
return errorValue;
}
}
unsubscribe() {
if (!this.closed) {
const { id, scheduler } = this;
const { actions } = scheduler;
this.work = this.state = this.scheduler = null!;
this.pending = false;
arrRemove(actions, this);
if (id != null) {
this.id = this.recycleAsyncId(scheduler, id, null);
}
this.delay = null!;
super.unsubscribe();
}
}
}
export class BeamScheduler extends Scheduler {
public actions: Array<BeamAction<any>> = [];
public _active: boolean = false;
public readonly REDUCTIONS_PER_SLICE = 2000;
constructor(public schedulerPeriodMs = 4) {
super(BeamAction);
}
public flush(action: BeamAction<any>): void {
const { actions } = this;
if (this._active) {
actions.push(action);
return;
}
let error: any;
this._active = true;
do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while ((action = actions.shift()!));
this._active = false;
if (error) {
while ((action = actions.shift()!)) {
action.unsubscribe();
}
throw error;
}
}
}
// Example 1: Basic CPU-intensive task with cooperative scheduling
function demonstrateBasicScheduling() {
const scheduler = new BeamScheduler(4); // 4ms scheduling period
const heavyComputation = new Observable((subscriber) => {
let count = 0;
const compute = () => {
// Simulate CPU-intensive work
for (let i = 0; i < 1000000; i++) {
count += Math.sqrt(i);
}
subscriber.next(count);
if (!subscriber.closed) {
scheduler.schedule(compute);
}
};
scheduler.schedule(compute);
});
heavyComputation.subscribe({
next: (value) => console.log('Computed value:', value),
complete: () => console.log('Computation complete'),
});
}
// Example 2: Multiple concurrent tasks with fair scheduling
function demonstrateConcurrentScheduling() {
const scheduler = new BeamScheduler(4);
const task1 = new Observable((subscriber) => {
let iterations = 0;
const work = () => {
iterations++;
subscriber.next(`Task 1: ${iterations}`);
if (iterations < 100) {
scheduler.schedule(work);
} else {
subscriber.complete();
}
};
scheduler.schedule(work);
});
const task2 = new Observable((subscriber) => {
let iterations = 0;
const work = () => {
iterations++;
subscriber.next(`Task 2: ${iterations}`);
if (iterations < 100) {
scheduler.schedule(work);
} else {
subscriber.complete();
}
};
scheduler.schedule(work);
});
// Run both tasks concurrently
task1.subscribe(console.log);
task2.subscribe(console.log);
}
// Example 3: Priority-based scheduling
class PriorityBeamScheduler extends BeamScheduler {
private priorityQueue: Array<[number, BeamAction<any>]> = [];
constructor(schedulerPeriodMs = 4) {
super(schedulerPeriodMs);
}
scheduleWithPriority(work: () => void, priority: number = 0) {
const action = new BeamAction(this, () => work());
this.priorityQueue.push([priority, action]);
this.priorityQueue.sort((a, b) => b[0] - a[0]); // Higher priority first
action.schedule();
}
public flush(action: BeamAction<any>): void {
if (this._active) {
this.actions.push(action);
return;
}
this._active = true;
let error: any;
// Process priority queue first
while (this.priorityQueue.length > 0) {
const [_, priorityAction] = this.priorityQueue.shift()!;
if (
(error = priorityAction.execute(
priorityAction.state,
priorityAction.delay
))
) {
break;
}
}
// Then process regular actions
do {
if ((error = action.execute(action.state, action.delay))) {
break;
}
} while ((action = this.actions.shift()!));
this._active = false;
if (error) {
while ((action = this.actions.shift()!)) {
action.unsubscribe();
}
throw error;
}
}
}
// Example usage of priority scheduling
function demonstratePriorityScheduling() {
const scheduler = new PriorityBeamScheduler(4);
// High priority task
scheduler.scheduleWithPriority(() => {
console.log('High priority task executed');
}, 2);
// Medium priority task
scheduler.scheduleWithPriority(() => {
console.log('Medium priority task executed');
}, 1);
// Low priority task
scheduler.scheduleWithPriority(() => {
console.log('Low priority task executed');
}, 0);
}
// Example 4: Adaptive scheduling based on system load
class AdaptiveBeamScheduler extends BeamScheduler {
private loadFactor: number = 1.0;
private readonly MIN_PERIOD_MS = 1;
private readonly MAX_PERIOD_MS = 16;
constructor(initialPeriodMs = 4) {
super(initialPeriodMs);
this.monitorSystemLoad();
}
private monitorSystemLoad() {
setInterval(() => {
// Simple load monitoring based on queue size
const queueSize = this.actions.length;
if (queueSize > 100) {
this.loadFactor = Math.min(this.loadFactor * 1.2, 2.0);
} else if (queueSize < 10) {
this.loadFactor = Math.max(this.loadFactor * 0.8, 0.5);
}
// Adjust scheduler period based on load
this.schedulerPeriodMs = Math.max(
this.MIN_PERIOD_MS,
Math.min(this.MAX_PERIOD_MS, Math.floor(4 * this.loadFactor))
);
}, 1000);
}
}
// Usage
const adaptiveScheduler = new AdaptiveBeamScheduler();
// The scheduler will automatically adjust its period based on system load
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment