Created
July 27, 2021 17:40
-
-
Save witalobenicio/f0634b0e672d105d7d9d099ec18efbfa to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Observable, Subject } from 'rxjs'; | |
import { DataCallback } from '@integrations/types'; | |
import { Task } from '@models/Task'; | |
export interface BatchChange { | |
used: number; | |
available: number; | |
} | |
export default abstract class Processor { | |
protected queue: Subject<Task>; | |
protected errorQueue: Subject<{ error: Error, task: Task }>; | |
protected pipe: Observable<DataCallback>; | |
protected availableBatch; | |
protected subscribers: ((change: BatchChange) => void)[]; | |
protected constructor(batch: number) { | |
// Batch can be different depending on the type of Processor | |
this.availableBatch = batch; | |
this.errorQueue = new Subject<{ error: Error, task: Task }>(); | |
this.queue = new Subject<Task>(); | |
} | |
protected buildObservable(executor: (data: Task) => Observable<DataCallback>): void { | |
this.pipe = this.queue.pipe( | |
mergeMap(data => executor(data).pipe( | |
delay(this.getDelay()), | |
catchError(error => { | |
this.errorQueue.next(error); | |
return EMPTY; | |
}) | |
), this.getBatch()) | |
); | |
return this.pipe; | |
} | |
public getAvailableBatch(): number { | |
return this.availableBatch; | |
} | |
public subscribeToBatchChanges(subscriber: (change: BatchChange) => void): void { | |
if (!this.subscribers) { | |
this.subscribers = []; | |
} | |
subscriber({ used: this.getBatch() - this.availableBatch, available: this.availableBatch }); | |
this.subscribers.push(subscriber); | |
} | |
public subscribe(next: (data: DataCallback) => void, error: (err: any) => void): void { | |
if (this.errorQueue) { | |
this.errorQueue.subscribe(this.batchHandler(error)); | |
} | |
this.pipe.subscribe(this.batchHandler(next)); | |
} | |
public addManyToQueue(tasks: Task[]): void { | |
tasks.forEach(task => { | |
this.changeAvailableBatch(-1); | |
this.queue.next(task); | |
}); | |
} | |
public addToQueue(task: Task): void { | |
this.changeAvailableBatch(-1); | |
this.queue.next(task); | |
} | |
protected abstract getBatch(): number; | |
protected abstract getDelay(): number; | |
private changeAvailableBatch(quantity: -1 | 1): void { | |
this.availableBatch = this.availableBatch + quantity; | |
} | |
private batchHandler(func: (err: any) => void): (err: any) => void; | |
private batchHandler(func: (data: DataCallback) => void): (data: DataCallback) => void; | |
private batchHandler(func: any): any { | |
return (value) => { | |
this.changeAvailableBatch(1); | |
func(value); | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment