Last active
December 11, 2024 01:06
-
-
Save tim-smart/bc32caa3d0a817c7418d58421c2a7d5d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
Data, | |
Effect, | |
Layer, | |
pipe, | |
PubSub, | |
Schedule, | |
Scope, | |
Stream, | |
} from "effect" | |
import { Socket } from "@effect/platform" | |
import type { | |
At, | |
ComAtprotoSyncSubscribeRepos, | |
Records, | |
} from "@atcute/client/lexicons" | |
import "@atcute/bluesky/lexicons" | |
import { NodeRuntime, NodeSocket } from "@effect/platform-node" | |
import { Jetstream as SWJetstream } from "@skyware/jetstream" | |
export class JetstreamError extends Data.TaggedError("JetstreamError")<{ | |
message: string | |
cause: unknown | |
}> {} | |
class Jetstream extends Effect.Service<Jetstream>()("Jetstream", { | |
effect: Effect.gen(function* () { | |
// access the WebSocket constructor for the current platform | |
const ws = yield* Socket.WebSocketConstructor | |
const subscribe = < | |
WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, | |
>(options: { | |
readonly endpoint?: string | |
readonly wantedCollections?: Array<WantedCollections> | |
readonly startCursor?: number | |
}) => | |
Effect.gen(function* () { | |
const scope = yield* Effect.scope | |
// create a URL for the Jetstream endpoint | |
const url = new URL( | |
options.endpoint ?? "wss://jetstream1.us-east.bsky.network/subscribe", | |
) | |
options.wantedCollections?.forEach((collection) => { | |
url.searchParams.append("wantedCollections", collection) | |
}) | |
// create a mailbox to receive events | |
const pubsub = yield* PubSub.unbounded< | |
| CommitEvent<ResolveLexiconWildcard<WantedCollections>> | |
| AccountEvent | |
| IdentityEvent | |
>() | |
yield* Scope.addFinalizer(scope, pubsub.shutdown) | |
// track the cursor for the last event received | |
let cursor = options.startCursor ?? 0 | |
// create a WebSocket connection to the Jetstream endpoint | |
// It effectfully constructs the URL based on the current cursor | |
const socket = yield* Socket.makeWebSocket( | |
Effect.sync(() => { | |
if (cursor > 0) { | |
url.searchParams.set("cursor", cursor.toString()) | |
} | |
return url.toString() | |
}), | |
).pipe(Effect.provideService(Socket.WebSocketConstructor, ws)) | |
yield* pipe( | |
socket.runRaw((message) => { | |
try { | |
const event = JSON.parse(message as string) as | |
| CommitEvent<ResolveLexiconWildcard<WantedCollections>> | |
| AccountEvent | |
| IdentityEvent | |
if (event.time_us < cursor) return | |
cursor = event.time_us | |
switch (event.kind) { | |
case EventType.Commit: { | |
if ( | |
!event.commit?.collection || | |
!event.commit.rkey || | |
!event.commit.rev | |
) { | |
return | |
} | |
if ( | |
event.commit.operation === CommitType.Create && | |
!event.commit.record | |
) { | |
return | |
} | |
pubsub.unsafeOffer(event) | |
return | |
} | |
case EventType.Account: { | |
if (!event.account?.did) return | |
pubsub.unsafeOffer(event) | |
return | |
} | |
case EventType.Identity: { | |
if (!event.identity?.did) return | |
pubsub.unsafeOffer(event) | |
return | |
} | |
} | |
} catch (error) { | |
return new JetstreamError({ | |
message: "Failed to parse event", | |
cause: error, | |
}) | |
} | |
}), | |
Effect.tapErrorCause(Effect.logWarning), | |
Effect.retry( | |
Schedule.exponential("1 second").pipe( | |
Schedule.union(Schedule.spaced("10 second")), | |
), | |
), | |
Effect.forkIn(scope), | |
) | |
return pubsub | |
}) | |
const stream = < | |
WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, | |
>(options: { | |
readonly endpoint?: string | |
readonly wantedCollections?: Array<WantedCollections> | |
readonly startCursor?: number | |
}) => | |
subscribe(options).pipe( | |
Effect.map((pubsub) => Stream.fromPubSub(pubsub)), | |
Stream.unwrapScoped, | |
) | |
return { subscribe, stream } as const | |
}), | |
}) {} | |
// ---------------------------------------------------------------------------- | |
Effect.gen(function* () { | |
const jetstream = yield* Jetstream | |
yield* jetstream | |
.stream({ wantedCollections: ["app.bsky.feed.post"] }) | |
.pipe(Stream.runForEach(Effect.log)) | |
}).pipe( | |
Effect.provide( | |
Jetstream.Default.pipe(Layer.provide(NodeSocket.layerWebSocketConstructor)), | |
), | |
NodeRuntime.runMain, | |
) | |
// ---------------------------------------------------------------------------- | |
/** Resolves a lexicon name to its record operation. */ | |
export type ResolveLexicon<T extends string> = T extends keyof Records | |
? Records[T] | |
: { $type: T } | |
/** Checks if any member of a union is assignable to a given operation. */ | |
type UnionMemberIsAssignableTo<Union, AssignableTo> = | |
// Distribute over union members | |
Union extends Union | |
? // `Union` here refers to a given union member | |
Union extends AssignableTo | |
? true | |
: never | |
: never | |
/** Resolves a wildcard string to the record types it matches. */ | |
export type ResolveLexiconWildcard<T extends string> = | |
// Match the prefix | |
T extends `${infer Prefix}*` | |
? // Check that at least one collection name matches the prefix (we use `true extends` because `never` extends everything) | |
true extends UnionMemberIsAssignableTo< | |
keyof Records, | |
`${Prefix}${string}` | |
> | |
? // If so, return known matching collection names | |
keyof Records & `${Prefix}${string}` extends infer Lexicon extends | |
string | |
? Lexicon | |
: never | |
: // If no collection name matches the prefix, return as a operation-level wildcard string | |
`${Prefix}${string}` | |
: // If there's no wildcard, return the original string | |
T | |
/** The name of a collection. */ | |
export type Collection = keyof Records | (string & {}) | |
/** Generates all possible wildcard strings that match a given collection name. */ | |
type PossibleCollectionWildcards<CollectionName extends string> = | |
CollectionName extends `${infer Prefix}.${infer Suffix}` | |
? `${Prefix}.*` | `${Prefix}.${PossibleCollectionWildcards<Suffix>}` | |
: never | |
/** The name of a collection or a wildcard string matching multiple collections. */ | |
export type CollectionOrWildcard = | |
| PossibleCollectionWildcards<keyof Records> | |
| Collection | |
/** | |
* The types of events that are emitted by {@link Jetstream}. | |
* @enum | |
*/ | |
export const EventType = { | |
/** A new commit. */ | |
Commit: "commit", | |
/** An account's status was updated. */ | |
Account: "account", | |
/** An account's identity was updated. */ | |
Identity: "identity", | |
} as const | |
export type EventType = (typeof EventType)[keyof typeof EventType] | |
/** | |
* The types of commits that can be received. | |
* @enum | |
*/ | |
export const CommitType = { | |
/** A record was created. */ | |
Create: "create", | |
/** A record was updated. */ | |
Update: "update", | |
/** A record was deleted. */ | |
Delete: "delete", | |
} as const | |
export type CommitType = (typeof CommitType)[keyof typeof CommitType] | |
/** | |
* The base operation for events emitted by the {@link Jetstream} class. | |
*/ | |
export interface EventBase { | |
did: At.DID | |
time_us: number | |
kind: EventType | |
} | |
/** | |
* A commit event. Represents a commit to a user repository. | |
*/ | |
export interface CommitEvent<RecordType extends string> extends EventBase { | |
kind: typeof EventType.Commit | |
commit: Commit<RecordType> | |
} | |
/** A commit event where a record was created. */ | |
export interface CommitCreateEvent<RecordType extends string> | |
extends CommitEvent<RecordType> { | |
commit: CommitCreate<RecordType> | |
} | |
/** A commit event where a record was updated. */ | |
export interface CommitUpdateEvent<RecordType extends string> | |
extends CommitEvent<RecordType> { | |
commit: CommitUpdate<RecordType> | |
} | |
/** A commit event where a record was deleted. */ | |
export interface CommitDeleteEvent<RecordType extends string> | |
extends CommitEvent<RecordType> { | |
commit: CommitDelete<RecordType> | |
} | |
/** | |
* An account event. Represents a change to an account's status on a host (e.g. PDS or Relay). | |
*/ | |
export interface AccountEvent extends EventBase { | |
kind: typeof EventType.Account | |
account: ComAtprotoSyncSubscribeRepos.Account | |
} | |
/** | |
* An identity event. Represents a change to an account's identity. | |
*/ | |
export interface IdentityEvent extends EventBase { | |
kind: typeof EventType.Identity | |
identity: ComAtprotoSyncSubscribeRepos.Identity | |
} | |
/** | |
* The base operation for commit events. | |
*/ | |
export interface CommitBase<RecordType extends string> { | |
operation: CommitType | |
rev: string | |
collection: RecordType | |
rkey: string | |
} | |
/** | |
* A commit event representing a new record. | |
*/ | |
export interface CommitCreate<RecordType extends string> | |
extends CommitBase<RecordType> { | |
operation: typeof CommitType.Create | |
record: ResolveLexicon<RecordType> | |
cid: At.CID | |
} | |
/** | |
* A commit event representing an update to an existing record. | |
*/ | |
export interface CommitUpdate<RecordType extends string> | |
extends CommitBase<RecordType> { | |
operation: typeof CommitType.Update | |
record: ResolveLexicon<RecordType> | |
cid: At.CID | |
} | |
/** | |
* A commit event representing a deletion of an existing record. | |
*/ | |
export interface CommitDelete<RecordType extends string> | |
extends CommitBase<RecordType> { | |
operation: typeof CommitType.Delete | |
} | |
/** | |
* A commit event. | |
*/ | |
export type Commit<RecordType extends string> = | |
| CommitCreate<RecordType> | |
| CommitUpdate<RecordType> | |
| CommitDelete<RecordType> | |
Stream.asyncPush<any, Error>((emit) => | |
Effect.gen(function* () { | |
const jetstream = yield* Effect.acquireRelease( | |
Effect.sync( | |
() => | |
new SWJetstream({ | |
wantedCollections: ["app.bsky.feed.post"], | |
}), | |
), | |
(js) => Effect.sync(() => js.close()), | |
) | |
jetstream.start() | |
jetstream.onCreate("app.bsky.feed.post", (record) => { | |
emit.single(record) | |
}) | |
jetstream.on("error", (error) => { | |
emit.fail(error) | |
}) | |
}), | |
).pipe(Stream.retry(Schedule.exponential("1 second"))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment