Skip to content

Instantly share code, notes, and snippets.

@ezzabuzaid
Created October 2, 2024 20:54
Show Gist options
  • Save ezzabuzaid/083aee9ec78e5f5e86c079962605c290 to your computer and use it in GitHub Desktop.
Save ezzabuzaid/083aee9ec78e5f5e86c079962605c290 to your computer and use it in GitHub Desktop.
rxjs-server.mjs
import { Console } from 'node:console';
import { createReadStream, createWriteStream } from 'node:fs';
import {
type IncomingMessage,
type ServerResponse,
createServer,
} from 'node:http';
import { Readable } from 'node:stream';
import ollama from 'ollama';
import {
type Observable,
type ObservableInput,
Subject,
from,
merge,
of,
} from 'rxjs';
import { concatAll, concatMap, delay, map, switchMap } from 'rxjs/operators';
type Context = { request: IncomingMessage; response: ServerResponse };
function rr() {
const routes: Record<
string,
{ controller: Subject<Context>; source: Observable<Context> }
> = {};
return {
handle: (request: IncomingMessage, response: ServerResponse) => {
const url = new URL(
`http://${process.env.HOST ?? 'localhost'}${request.url}`,
);
const path = url.pathname;
const route = routes[path];
if (!route) {
response.writeHead(404);
response.end();
return;
}
route.controller.next({ request, response });
},
route: (
path: string,
handler: (
context: Context,
) => ObservableInput<
string | Buffer | Readable | ReadableStream | Blob | ArrayBuffer
>,
) => {
const subject = new Subject<Context>();
routes[path] = {
controller: subject,
source: subject.asObservable(),
};
subject.asObservable().subscribe((context) => {
Readable.from(from(handler(context))).pipe(context.response);
});
},
};
}
const { route, handle } = rr();
// Send uuids every second
const source$ = of(Array.from({ length: 100 }, () => crypto.randomUUID()));
route('/', (context) =>
source$.pipe(
concatAll(),
concatMap((id) => of(id).pipe(delay(1000))),
map((id, index) => `${index}: ${id}\n`),
),
);
const logger = new Console(
createWriteStream('log.txt'),
createWriteStream('error.txt'),
);
route('/logs', (context) =>
merge(
from(createReadStream('log.txt', 'utf-8')).pipe(
map((entry) => JSON.stringify({ type: 'debug', entry })),
),
from(createReadStream('error.txt', 'utf-8')).pipe(
map((entry) => JSON.stringify({ type: 'error', entry })),
),
),
);
setInterval(() => {
logger.log('Hello, world!');
}, 1000);
route('/fight-bear', (context) =>
from(
ollama.chat({
model: 'llama3.1',
stream: true,
messages: [
{
role: 'user',
content: 'Tips & tricks to fight a bear',
},
],
}),
).pipe(
switchMap((response) => response),
map((response) => response.message.content),
),
);
const server = createServer(handle);
server.listen(3000, () => {
console.log('Listening on http://localhost:3000/');
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment