Created
April 26, 2018 23:42
-
-
Save Istar-Eldritch/5788f137f541d50e04518d289ae95359 to your computer and use it in GitHub Desktop.
Rxjs node server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Client } from './client'; | |
import { createServer, IncomingMessage, ServerResponse, IncomingHttpHeaders } from 'http'; | |
import { Writable, Readable } from 'stream'; | |
import { of, from, fromEvent, Subject, Observer, Observable } from 'rxjs'; | |
import { map, filter, take, skip, tap, buffer, reduce } from 'rxjs/operators'; | |
enum Method { | |
GET = 'GET', | |
POST = 'POST', | |
PUT = 'PUT', | |
PATCH = 'PATCH', | |
OPTIONS = 'OPTIONS' | |
} | |
interface RxRequest { | |
path: string; | |
headers: IncomingHttpHeaders; | |
method: Method; | |
body: Observable<Buffer>; | |
reply: (body?: Observable<Buffer>, statusCode?: number, headers?: {[k: string]: string}) => void; | |
} | |
interface ServerObservable { | |
serverHandler(request: IncomingMessage, response: ServerResponse): void; | |
requests: Observable<RxRequest>; | |
} | |
interface RxResponse { | |
headers: {[k: string]: string}; | |
statusCode: number; | |
body: Observable<Buffer>; | |
} | |
function genServer(): ServerObservable { | |
const requests = new Subject<RxRequest>(); | |
function serverHandler(request: IncomingMessage, response: ServerResponse) { | |
const path = request.url as any; | |
const headers = request.headers; | |
const method = request.method as any; | |
const body = new Subject<Buffer>(); | |
function reply(body: Observable<Buffer> = from([]), statusCode: number = 200, headers: {[k: string]: string} = {}) { | |
response.statusCode = statusCode; | |
body.forEach(chunk => { | |
response.write(chunk); | |
}) | |
.then(() => { | |
response.end(); | |
}) | |
} | |
requests.next({path, headers, method, body, reply}); | |
} | |
return { | |
serverHandler, | |
requests | |
} | |
} | |
const {requests, serverHandler} = genServer(); | |
function serialise(o: any): Buffer { | |
return Buffer.from(JSON.stringify(o)) | |
} | |
function method(m: Method) { | |
return function(request: RxRequest) { | |
return request.method === m; | |
} | |
} | |
function path(check: RegExp | string) { | |
return function(request: RxRequest) { | |
if (check instanceof RegExp) { | |
return check.test(request.path); | |
} else { | |
return check === request.path; | |
} | |
} | |
} | |
const getRequests = requests.pipe( | |
filter(method(Method.GET)), | |
); | |
getRequests.pipe( | |
filter(path('/bookmarks')), | |
tap(event => { | |
console.log('Bang!') | |
event.reply() | |
}) | |
) | |
.subscribe(); | |
requests.pipe( | |
filter(event => event.method === Method.POST), | |
tap(event => { | |
console.log('Boom!') | |
event.reply() | |
}) | |
) | |
.subscribe(); | |
requests.subscribe((event) => { | |
event.reply(from([]), 404); | |
}); | |
const server = createServer(serverHandler); | |
server.listen(3000); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment