Skip to content

Instantly share code, notes, and snippets.

@cowboyd
cowboyd / use-resource.ts
Last active October 10, 2023 20:49
useResource hook to use Effection v3 resource.
import { run, type Operation } from "effection";
import { useEffect, useState, type DependencyList } from "react";
export type ResourceHandle<T> = {
type: 'pending';
} | {
type: 'resolved';
value: T;
} | {
type: 'rejected';
@cowboyd
cowboyd / event.ts
Last active September 19, 2023 14:35
Simple Event Notification using Delimited Continuations
import { type Computation, shift, reset } from "../deps.ts";
import { type Resolve } from "../types.ts";
export function* creatEvent<T>(): Computation<[Resolve<T>, Computation<T>]> {
let result: { value: T } | void = void 0;
let listeners: Resolve<T>[] = [];
let resolve = yield* reset<Resolve<T>>(function*() {
let value = yield* shift<T>(function*(k) {
return k.tail;
@cowboyd
cowboyd / 01-sleep.mjs
Last active September 19, 2023 11:07
Austin JS Structured Concurrency Talk, code samples
async function sleep(duration) {
await new Promise(resolve => setTimeout(resolve, duration));
}
async function main() {
await sleep(1000);
}
await main();
@cowboyd
cowboyd / benchmark.ts
Created September 18, 2023 19:41
Benchmark Effection memory pressure
import { each, main, sleep, spawn, createSignal } from "./mod.ts";
await main(function*() {
let perf = globalThis.performance;
let signal = createSignal<string>();
for (let i = 0; i < 1000; i ++) {
yield* spawn(function*() {
@cowboyd
cowboyd / zip.ts
Last active August 10, 2023 19:25
implement a stream zip using Effection
import {
all,
createChannel,
type Operation,
resource,
spawn,
type Stream,
} from "effection";
export function zip<T>(streams: Stream<T, never>[]): Stream<T[], never> {
@cowboyd
cowboyd / adder.ts
Last active August 9, 2023 17:51
Streaming Bitwise Adder in Effection
import { createChannel, resource, all, spawn, main, type Stream } from "effection";
type bit = 0 | 1;
type Bit = Stream<bit, never>;
function create1BitAdder(a$: Bit, b$: Bit, c$: Bit): Stream<[bit, bit], never> {
// the easiest way to represent a stream is as a resource because
// a resource that "provides" a subscription is by definition a stream
// because a stream is an operation that yields a subscription
return resource(function*(provide) {
@cowboyd
cowboyd / 01-queue.ts
Last active July 28, 2023 13:32
A Queue that implements v3 subscription. Useful when working with low level subscription apis.
import type { Resolve, Subscription } from "./types.ts";
import { action } from "./instructions.ts";
export interface Queue<T, TClose> extends Subscription<T, TClose> {
add(item: T): void;
close(value: TClose): void;
}
export function createQueue<T, TClose>(): Queue<T, TClose> {
type Item = IteratorResult<T, TClose>;
@cowboyd
cowboyd / buffer.ts
Last active July 18, 2023 08:38
Throttle any Effection stream using a fixed size buffer
/**
* Takes a buffer limit, and returns a stream combinator that converts a stream into stream
* limits the number of in flight items to that limit when it is subscribed to. Use with pipe:
* ```ts
* let doubleclicks = pipe(events, buffer(200), filter(isDoubleClick));
* ```
* Or as a standalone:
* let buffered = buffer(200)(events);
*
* No buffer is actually allocated until the resulting stream is subscribed to.
@cowboyd
cowboyd / take-filter.ts
Last active July 17, 2023 14:29
Use a stream combinator to filter the actions from a channel and return a stream that is only
import { filter } from "effection";
export function* useActions(pattern: ActionPattern): Stream<AnyAction> {
let match = matcher(pattern);
let { output } = yield* ActionContext;
// return a subscription to the filtered actions.
return yield* filter(match)(output);
}
@cowboyd
cowboyd / 01-signal.ts
Last active July 13, 2023 11:34
Create a context-free function that dispatches values to a subscriber context
import { createChannel, run, type Stream } from "./mod.ts";
export function createSignal<T>(): [(event: T) => void, Stream<T, never>] {
let { input, output } = createChannel<T, never>();
let pulse = (event: T) => run(() => input.send(event));
return [pulse, output];
}