Skip to content

Instantly share code, notes, and snippets.

@Istar-Eldritch
Created April 26, 2018 23:42
Show Gist options
  • Save Istar-Eldritch/5788f137f541d50e04518d289ae95359 to your computer and use it in GitHub Desktop.
Save Istar-Eldritch/5788f137f541d50e04518d289ae95359 to your computer and use it in GitHub Desktop.
Rxjs node server
import { Client } from './client';
import { createServer, IncomingMessage, ServerResponse, IncomingHttpHeaders } from 'http';
import { Writable, Readable } from 'stream';
import { of, from, fromEvent, Subject, Observer, Observable } from 'rxjs';
import { map, filter, take, skip, tap, buffer, reduce } from 'rxjs/operators';
enum Method {
GET = 'GET',
POST = 'POST',
PUT = 'PUT',
PATCH = 'PATCH',
OPTIONS = 'OPTIONS'
}
interface RxRequest {
path: string;
headers: IncomingHttpHeaders;
method: Method;
body: Observable<Buffer>;
reply: (body?: Observable<Buffer>, statusCode?: number, headers?: {[k: string]: string}) => void;
}
interface ServerObservable {
serverHandler(request: IncomingMessage, response: ServerResponse): void;
requests: Observable<RxRequest>;
}
interface RxResponse {
headers: {[k: string]: string};
statusCode: number;
body: Observable<Buffer>;
}
function genServer(): ServerObservable {
const requests = new Subject<RxRequest>();
function serverHandler(request: IncomingMessage, response: ServerResponse) {
const path = request.url as any;
const headers = request.headers;
const method = request.method as any;
const body = new Subject<Buffer>();
function reply(body: Observable<Buffer> = from([]), statusCode: number = 200, headers: {[k: string]: string} = {}) {
response.statusCode = statusCode;
body.forEach(chunk => {
response.write(chunk);
})
.then(() => {
response.end();
})
}
requests.next({path, headers, method, body, reply});
}
return {
serverHandler,
requests
}
}
const {requests, serverHandler} = genServer();
function serialise(o: any): Buffer {
return Buffer.from(JSON.stringify(o))
}
function method(m: Method) {
return function(request: RxRequest) {
return request.method === m;
}
}
function path(check: RegExp | string) {
return function(request: RxRequest) {
if (check instanceof RegExp) {
return check.test(request.path);
} else {
return check === request.path;
}
}
}
const getRequests = requests.pipe(
filter(method(Method.GET)),
);
getRequests.pipe(
filter(path('/bookmarks')),
tap(event => {
console.log('Bang!')
event.reply()
})
)
.subscribe();
requests.pipe(
filter(event => event.method === Method.POST),
tap(event => {
console.log('Boom!')
event.reply()
})
)
.subscribe();
requests.subscribe((event) => {
event.reply(from([]), 404);
});
const server = createServer(serverHandler);
server.listen(3000);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment