Skip to content

Instantly share code, notes, and snippets.

@dimfeld
Created December 9, 2024 19:09
Show Gist options
  • Save dimfeld/af5339dce936e34586d764b83347a2f7 to your computer and use it in GitHub Desktop.
Save dimfeld/af5339dce936e34586d764b83347a2f7 to your computer and use it in GitHub Desktop.
Restate TS Client Code

Ok, as promised here's my solution. This doesn't handle the "all the services in an object" case but it does prevent the need for client code to import types from the server packages.

So the first thing is that yes, the types are kind of weird. The ServiceDefinition type and its related types take both the name and parameters, but as you noted it just drops the parameters. The related version of ServiceDefinition returned by the client factory functions does hold on to the handler function info though, i'm guessing by using some infer clauses in the client functions themselves.

It would be really convenient if the exposed types actually contained the relevant handler definitions as well, so that we can pass the types around more easily. But that said, here's what I'm doing right now.

I'm using a monorepo with a package named @repo/restate which contains definitions of all the functions and also the factory functions for each service's client.

export interface TokenBucketLimiterFunctions {
  /** Try to consume the specified amount of tokens. */
  consume: (
    ctx: ObjectContext,
    opts: TokenBucketLimiterOptions
  ) => Promise<RateLimiterConsumeResult>;
  /** Manually refill tokens. */
  refill: (ctx: ObjectContext, tokens: number) => Promise<void>;
  /** Clear all data for this rate limiter. */
  clear: (ctx: ObjectContext) => Promise<void>;
  /** Inspect the current state of the rate limiter. */
  status: (ctx: ObjectSharedContext) => Promise<BucketCount>;
  /** An internal call for checking if the rate limiter is stale and can be cleared. */
  checkStale: (ctx: ObjectContext) => Promise<void>;
}

export const TokenBucketLimiterName = 'TokenBucketLimiter';

export function tokenBucketLimiterClient(
  client: Ingress,
  key: string
): IngressClient<TokenBucketLimiterFunctions>;
export function tokenBucketLimiterClient(
  client: Context,
  key: string
): Client<TokenBucketLimiterFunctions>;
export function tokenBucketLimiterClient(client: ClientFactory, key: string) {
  return client.objectClient<
    VirtualObjectDefinition<typeof TokenBucketLimiterName, TokenBucketLimiterFunctions>
  >({ name: TokenBucketLimiterName }, key);

The multiple definitions for the client function allow me to call the function with either an Ingress client or a context, and get proper Typescript typing for either one, since the return values are slightly different.

Because of how the client functions work, you have to write it this way. If you define the VirtualObjectDefinition with the generics as its own type separately, then it loses the handler info.

There's a similar function for creating a sendClient

So that handles the client side. Then in the server itself I use the same types to ensure that the prototypes match.

restate.object({
   name: TokenBucketLimiterName,
   handlers: {
     // the code here
   } satisfies TokenBucketLimiterFunctions
})

The main downside of this is that you have to define some of your types (data payloads, return values) in a different place from where they're actually used in the server, so there's more jumping around. The upside of course is that you don't have to import the application server packages in your client packages, and it avoids any potential for circular dependencies as well.

This separation of types also makes it much easier to create a restate service with certain mocked dependencies, because then you don't have to try to extract the type from a factory function for the object.

I do a lot of this type of thing, with some special glue code to also register multiple different objects with different service names

export function createTestService(db: Database) {
   return restate.object({
    // In tests, this adds a 
    name: endpointId(RateLimiterName),
    handlers: {
             ...
    }
});

All this is to make tests fast and run against a single existing restate instance in parallel since TestContainers are slow to start up, and also have a postgres database per test. At some point I might be able to share more about how I'm doing that too.

The endpointId function looks like this, and then most of the code is in the attached restate_test_environment.ts file.

const uniqueId = makeUuidv7();
export function endpointId<T extends string>(name: T): T {
  if (process.env.VITEST) {
    return `${name}_test_${uniqueId}` as T;
  }

  return name;
}
import * as restate from '@restatedev/restate-sdk';
import { execa } from 'execa';
import which from 'which';
import { GenericContainer, StartedTestContainer, TestContainers, Wait } from 'testcontainers';
import * as http2 from 'http2';
import * as net from 'net';
// Prepare the restate server
class ReassignableRestateEndpointServer {
currentHandler!: (req: http2.Http2ServerRequest, res: http2.Http2ServerResponse) => void;
restateServer!: http2.Http2Server;
endpointPort!: number;
serverUrl!: string;
deploymentId?: string;
restateCliPath: string | null | undefined;
registered = false;
constructor() {}
async init() {
// Start HTTP2 server on random port
this.restateServer = http2.createServer((req, res) => this.currentHandler(req, res));
await new Promise((resolve, reject) => {
this.restateServer.listen(0).once('listening', resolve).once('error', reject);
});
this.endpointPort = (this.restateServer.address() as net.AddressInfo).port;
this.restateCliPath = await which('restate', { nothrow: true });
}
setServerUrl(serverUrl: string) {
this.serverUrl = serverUrl;
}
/** Replace the current set of endpoints with a new one. */
async registerEndpoints(
mountServicesFn: (server: restate.RestateEndpoint) => restate.RestateEndpoint,
reregister = false
) {
const restateEndpoint = mountServicesFn(restate.endpoint());
this.currentHandler = restateEndpoint.http2Handler();
if (reregister || !this.registered) {
if (this.deploymentId) {
await this.deleteEndpointWithRestate();
}
this.deploymentId = await registerEndpointWithRestate(this.serverUrl, this.endpointPort);
this.registered = true;
}
}
async stop() {
if (this.deploymentId) {
await this.deleteEndpointWithRestate();
}
this.restateServer?.close();
}
static async start(): Promise<ReassignableRestateEndpointServer> {
const server = new ReassignableRestateEndpointServer();
await server.init();
console.info(`Restate endpoint listening on port ${server.endpointPort}`);
return server;
}
async cleanUpInvocations(serviceNames: string[]) {
const cliPath = this.restateCliPath;
if (!cliPath) {
return;
}
await Promise.all(
serviceNames.map(async (serviceName) => {
// Remove all invocations for the service.
// This can be done through the postgres interface as well but it's easier to
// use the CLI.
// This doesn't always find the invocations for reasons that aren't yet clear.
// You can use just cleanup-restate after the test to remove anything left over.
await execa({
reject: false,
all: true,
env: {
RESTATE_ADMIN_URL: this.serverUrl,
},
})`${cliPath} --yes invocations cancel --kill ${serviceName}`;
})
);
}
async deleteEndpointWithRestate() {
const serviceDesc = await fetch(`${this.serverUrl}/deployments/${this.deploymentId}`).then(
(r) => r.json()
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
const serviceNames: string[] = serviceDesc.services.map((s: { name: string }) => s.name);
const res = await fetch(`${this.serverUrl}/deployments/${this.deploymentId}?force=true`, {
method: 'DELETE',
});
if (!res.ok) {
const badResponse = await res.text();
throw new Error(`Error ${res.status} during registration: ${badResponse}`);
}
await this.cleanUpInvocations(serviceNames);
}
}
async function registerEndpointWithRestate(serverUrl: string, endpointPort: number) {
const endpointHost = startedAdminUrl ? 'host.docker.internal' : 'host.testcontainers.internal';
// Register this service endpoint
const res = await fetch(`${serverUrl}/deployments`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
// See https://node.testcontainers.org/features/networking/#expose-host-ports-to-container
uri: `http://${endpointHost}:${endpointPort}`,
// Set force for testing
force: true,
}),
});
if (!res.ok) {
const badResponse = await res.text();
throw new Error(`Error ${res.status} during registration: ${badResponse}`);
}
const result = await res.json();
console.info('Registered', result.id);
return result.id;
}
// Prepare the restate testcontainer
async function prepareRestateTestContainer(
restateServerPort: number
): Promise<StartedTestContainer> {
const restateContainer = new GenericContainer('docker.io/restatedev/restate:1.1.2')
.withEnvironment({})
// Just 1 partition for testing to start up faster
.withCommand(['--bootstrap-num-partitions', '1'])
// Expose ports
.withExposedPorts(8080, 9070)
// Wait start on health checks
.withWaitStrategy(Wait.forAll([Wait.forHttp('/health', 9070)]));
// This MUST be executed before starting the restate container
// Expose host port to access the restate server
await TestContainers.exposeHostPorts(restateServerPort);
// Start restate container
return await restateContainer.start();
}
function adminBaseUrl(startedRestateContainer: StartedTestContainer) {
return `http://${startedRestateContainer.getHost()}:${startedRestateContainer.getMappedPort(
9070
)}`;
}
// See libs/dev-services/restate.sh for these ports
const startedAdminUrl = process.env.USE_TEST_CONTAINERS
? ''
: process.env.TEST_RESTATE_ADMIN_URL || 'http://localhost:9080';
const startedClientUrl = process.env.USE_TEST_CONTAINERS
? ''
: process.env.TEST_RESTATE_CLIENT_URL || 'http://localhost:8099';
export class RestateTestEnvironment {
logs = '';
constructor(
readonly startedRestateHttpServer: ReassignableRestateEndpointServer,
readonly startedRestateContainer?: StartedTestContainer
) {}
public baseUrl(): string {
if (this.startedRestateContainer) {
return `http://${this.startedRestateContainer.getHost()}:${this.startedRestateContainer.getMappedPort(
8080
)}`;
} else if (startedClientUrl) {
return startedClientUrl;
} else {
throw new Error('Restate container not started, but we also had no client url');
}
}
public adminAPIBaseUrl(): string {
if (this.startedRestateContainer) {
return adminBaseUrl(this.startedRestateContainer);
} else if (startedAdminUrl) {
return startedAdminUrl;
} else {
throw new Error('Restate container not started, but we also had no admin url');
}
}
public async stop() {
await this.startedRestateContainer?.stop();
await this.startedRestateHttpServer.stop();
}
public static async start(): Promise<RestateTestEnvironment> {
let startedRestateHttpServer = await ReassignableRestateEndpointServer.start();
if ((startedAdminUrl && !startedClientUrl) || (!startedAdminUrl && startedClientUrl)) {
throw new Error('Saw TEST_RESTATE_ADMIN_URL or TEST_RESTATE_CLIENT_URL, but not both');
}
if (!startedAdminUrl) {
let startedRestateContainer = await prepareRestateTestContainer(
startedRestateHttpServer.endpointPort
);
const env = new RestateTestEnvironment(startedRestateHttpServer, startedRestateContainer);
let logStream = await startedRestateContainer.logs();
logStream.on('data', (data: string | Buffer) => (env.logs += data.toString()));
logStream.on('error', (err) => (env.logs += err.toString()));
startedRestateHttpServer.setServerUrl(adminBaseUrl(startedRestateContainer));
return env;
} else {
startedRestateHttpServer.setServerUrl(startedAdminUrl);
return new RestateTestEnvironment(startedRestateHttpServer, undefined);
}
}
/** Replace the current set of endpoints with a new one. This should be run before every test with the appropriate
* endpoints if you are using a new database for each test. */
async registerEndpoints(
mountServicesFn: (server: restate.RestateEndpoint) => restate.RestateEndpoint,
reregister = false
) {
await this.startedRestateHttpServer.registerEndpoints(mountServicesFn, reregister);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment