Skip to content

Instantly share code, notes, and snippets.

@igalshilman
Last active March 4, 2025 08:04
Show Gist options
  • Save igalshilman/9b037183c9a4f675551090f85a58245c to your computer and use it in GitHub Desktop.
Save igalshilman/9b037183c9a4f675551090f85a58245c to your computer and use it in GitHub Desktop.
export type Subscriber = {
service: string;
handler: string;
key?: string;
};
export const pubsub = restate.object({
name: "pubsub",
handlers: {
/**
* Add a subscriber to the list of subscribers
*/
subscribe: async (ctx: restate.ObjectContext, subscriber: Subscriber) => {
const current = (await ctx.get<Subscriber[]>("sub")) ?? [];
current.push(subscriber);
ctx.set("sub", current);
},
/**
* Remove a subscriber
*
*/
unsubscribe: async (ctx: restate.ObjectContext, subscriber: Subscriber) => {
const current = (await ctx.get<Subscriber[]>("sub")) ?? [];
const filtered = current.filter((s) => s !== subscriber);
ctx.set("sub", filtered);
},
/**
* publish a message
*/
publish: async (ctx: restate.ObjectContext, data: any) => {
const current = (await ctx.get<Subscriber[]>("sub")) ?? [];
for (const subscriber of current) {
ctx.genericSend({
service: subscriber.service,
method: subscriber.handler,
key: subscriber.key,
parameter: data,
inputSerde: restate.serde.json,
});
}
},
},
});
@igalshilman
Copy link
Author

igalshilman commented Mar 3, 2025

export function subscribe(
  ctx: restate.Context,
  topic: string,
  subscriber: Subscriber
) {
  ctx.objectSendClient(pubsub, topic).subscribe(subscriber);
}

// usage

const user = restate.object({
  name: "user",
  handlers: {
    hello: async (ctx: restate.ObjectContext) => {
      // at this point a user is created and choses to subscribe to messages
      // from the pubsub service for a given topic.
      // note that since the pubsub service itself is a virtual object,
      // it can have wide or fine grained topics. (i.e. a topic for user that includes all of its events in the system)
      subscribe(ctx, "emails", {
        service: "user",
        handler: "onEvent",
        key: ctx.key,
      });
    },

    onEvent: async (ctx: restate.ObjectContext, event: any) => {
      // i will be notified here
      ctx.console.log("Received event", event);
    },
  },
});

@igalshilman
Copy link
Author

you can even publish an event externally like this:

curl ${INGRESS_URL}/pubsub/emails/publish --json '{ "foo" : "bar" }'

@slinkydeveloper
Copy link

Awesome 😍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment