Last active
September 26, 2016 08:37
-
-
Save estliberitas/6009d30c4a75a5c011a75da2d6896ee1 to your computer and use it in GitHub Desktop.
Asynchronous parser
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const fs = require('fs'); | |
const random = (max) => Math.round(1 + Math.random() * (max - 1)); | |
// Simple frame protocol: | |
// 4 bytes - frame length | |
// then read amount of bytes specified in frame length | |
const ws = fs.createWriteStream('parser.data'); | |
const noOfFrames = random(100); | |
for (let i = 0; i < noOfFrames; i++) { | |
const frameLength = random(28); | |
const bufferSize = 4 + frameLength; | |
const buffer = new Buffer(bufferSize); | |
buffer.writeUInt32BE(frameLength); | |
buffer.fill(255, 4, bufferSize); | |
ws.write(buffer); | |
} | |
ws.end(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
/** | |
* The protocol is simple: | |
* | |
* - there are frames of various length | |
* - first 4 bytes determine length of frame body | |
* - once frame length is determined, next N bytes are read as frame body | |
* - next frame starts right after current frame body reading is complete | |
*/ | |
const EventEmitter = require('events'); | |
const fs = require('fs'); | |
const STATES = { | |
FRAME_START: 'FRAME_START', | |
FRAME_BODY: 'FRAME_BODY' | |
}; | |
const FRAME_HEADER_SIZE = 4; | |
const HANDLERS = { | |
[STATES.FRAME_START]: (parser, buffer) => { | |
if (!parser.currentBuffer) { | |
parser.currentBuffer = new Buffer(FRAME_HEADER_SIZE); | |
parser.currentBufferUsed = 0; | |
} | |
const toRead = Math.min(FRAME_HEADER_SIZE - parser.currentBufferUsed, buffer.length); | |
buffer.copy(parser.currentBuffer, parser.currentBufferUsed, 0, toRead); | |
parser.currentBufferUsed += toRead; | |
if (parser.currentBufferUsed === FRAME_HEADER_SIZE) { | |
// going to the next state | |
parser.state = STATES.FRAME_BODY; | |
parser.currentFrameSize = parser.currentBuffer.readUInt32BE(0); | |
parser.resetCurrentBuffer(); | |
} | |
return toRead; | |
}, | |
[STATES.FRAME_BODY]: (parser, buffer) => { | |
if (!parser.currentBuffer) { | |
parser.currentBuffer = new Buffer(parser.currentFrameSize); | |
parser.currentBufferUsed = 0; | |
} | |
const toRead = Math.min(parser.currentFrameSize - parser.currentBufferUsed, buffer.length); | |
buffer.copy(parser.currentBuffer, parser.currentBufferUsed, 0, toRead); | |
parser.currentBufferUsed += toRead; | |
if (parser.currentBufferUsed === parser.currentFrameSize) { | |
parser.emit('block', parser.currentFrameSize, parser.currentBuffer); | |
parser.state = STATES.FRAME_START; | |
parser.resetCurrentBuffer(); | |
} | |
return toRead; | |
} | |
}; | |
class Parser extends EventEmitter { | |
constructor() { | |
super(); | |
this.state = STATES.FRAME_START; | |
this.parsing = false; | |
// TODO replace with linked list | |
this.pendingBuffers = []; | |
this.resetCurrentBuffer(); | |
} | |
parse() { | |
const handler = HANDLERS[this.state]; | |
const buffer = this.pendingBuffers[0]; | |
const haveRead = handler(this, buffer); | |
if (haveRead < buffer.length) { | |
this.pendingBuffers[0] = buffer.slice(haveRead); | |
} | |
else { | |
this.pendingBuffers.shift(); | |
} | |
if (this.pendingBuffers.length) { | |
setImmediate(() => this.parse()); | |
} | |
else { | |
this.parsing = false; | |
} | |
} | |
push(buffer) { | |
this.pendingBuffers.push(buffer); | |
if (!this.parsing) { | |
this.parsing = true; | |
this.parse(); | |
} | |
} | |
resetCurrentBuffer() { | |
this.currentBuffer = null; | |
this.currentBufferUsed = 0; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment