Skip to content

Instantly share code, notes, and snippets.

@AlexGeb
Created January 16, 2024 13:47
Show Gist options
  • Save AlexGeb/801dec58873bef78fa6c4b0c22b7d565 to your computer and use it in GitHub Desktop.
Save AlexGeb/801dec58873bef78fa6c4b0c22b7d565 to your computer and use it in GitHub Desktop.
Priority Queue implementation with Effect
import { Effect, Queue, pipe } from 'effect';
import type { LazyArg } from 'effect/Function';
class PQueue {
private boundedQueue;
constructor({ concurrency }: { concurrency: number }) {
this.boundedQueue = Effect.runSync(Queue.bounded<any>(concurrency));
}
private queueTask<T>(task: Effect.Effect<never, never, T>) {
const self = this;
return Effect.gen(function* (_) {
yield* _(Queue.offer(self.boundedQueue, 'task')); // wait until queue is empty
const result = yield* _(task); // execute task
yield* _(Queue.take(self.boundedQueue)); // Remove from queue when task is done
return result;
});
}
add<T>(task: LazyArg<Promise<T>>) {
return Effect.runPromise(this.queueTask(Effect.promise(task)));
}
}
describe('Priority Queue', () => {
it('should work', async () => {
const task1 = () =>
pipe(
Effect.log(`task 1 start`),
Effect.flatMap(() => Effect.sleep(1000)),
Effect.tap(() => Effect.log(`task 1 done`)),
Effect.runPromise,
);
const task2 = () =>
pipe(
Effect.log(`task 2 start`),
Effect.flatMap(() => Effect.sleep(1000)),
Effect.tap(() => Effect.log(`task 2 done`)),
Effect.runPromise,
);
const pqueue = new PQueue({ concurrency: 1 });
pqueue.add(task1); // Add 1 task to the queue
pqueue.add(task2);
await Effect.runPromise(Effect.sleep(3000));
});
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment