Last active
January 7, 2023 19:26
-
-
Save stefandanaita/88c4d8b187400d5b07524cd0a12843b2 to your computer and use it in GitHub Desktop.
Code for a worker that can receive logpush http requests and parse the contents using streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SPDX-License-Identifier: MIT-0 | |
export interface Env { | |
} | |
export default { | |
async fetch( | |
request: Request, | |
env: Env, | |
ctx: ExecutionContext | |
): Promise<Response> { | |
if (!request.body) { | |
return new Response("Oops", { status: 500 }); | |
} | |
const events = request.body | |
.pipeThrough(new DecompressionStream("gzip")) | |
.pipeThrough(new TextDecoderStream()) | |
.pipeThrough(readlineStream()); | |
for await (const event of streamAsyncIterator(events)) { | |
// Do stuff with the event | |
const parsedEvent = JSON.parse(event); | |
console.log(parsedEvent.Event.RayID); | |
} | |
return new Response("Hello World!"); | |
}, | |
}; | |
async function* streamAsyncIterator(stream: ReadableStream) { | |
// Get a lock on the stream | |
const reader = stream.getReader(); | |
try { | |
while (true) { | |
// Read from the stream | |
const { done, value } = await reader.read(); | |
// Exit if we're done | |
if (done) { | |
return; | |
} | |
// Else yield the chunk | |
yield value; | |
} | |
} | |
finally { | |
reader.releaseLock(); | |
} | |
} | |
interface ReadlineTransformerOptions { | |
skipEmpty: boolean; | |
} | |
const defaultOptions: ReadlineTransformerOptions = { | |
skipEmpty: true, | |
}; | |
export class ReadlineTransformer implements Transformer { | |
options: ReadlineTransformerOptions; | |
lastString: string; | |
separator: RegExp; | |
public constructor(options?: ReadlineTransformerOptions) { | |
this.options = { ...defaultOptions, ...options }; | |
this.lastString = ''; | |
this.separator = /[\r\n]+/; | |
} | |
public transform(chunk: string, controller: TransformStreamDefaultController<string>) { | |
// prepend with previous string (empty if none) | |
const str = `${this.lastString}${chunk}`; | |
// Extract lines from chunk | |
const lines = str.split(this.separator); | |
// Save last line as it might be incomplete | |
this.lastString = (lines.pop() || '').trim(); | |
// eslint-disable-next-line no-restricted-syntax | |
for (const line of lines) { | |
const d = this.options.skipEmpty ? line.trim() : line; | |
if (d.length > 0) controller.enqueue(d); | |
} | |
} | |
public flush(controller: TransformStreamDefaultController<string>) { | |
if (this.lastString.length > 0) controller.enqueue(this.lastString); | |
} | |
} | |
export const readlineStream = () => new TransformStream(new ReadlineTransformer()); |
Nice. Thanks (: In case it is helpful, I wrote up a javascript impl based on this that can either filter on the encoded LogPush stream (u8) as-is or filter on its decoded stream (str): https://github.com/serverless-dns/serverless-dns/blob/7a6657afb8177bd7368b8ea5195fc974fafe69e5/src/commons/lf-transformer.js#L14-L175
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
✔️