Created
May 23, 2020 08:44
-
-
Save simolus3/3c042445dfd12bdd6eff01a2f68aaeed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:math'; | |
import 'dart:async'; | |
import 'dart:typed_data'; | |
const _int32Length = 32 ~/ 8; | |
final _encodeInts = ByteData(_int32Length); | |
extension ReadLengthPrefixed on Stream<List<int>> { | |
// Note: Assumes that length bytes aren't splitted across multiple packets | |
Stream<Uint8List> transformLengthPrefixed() async* { | |
Uint8List currentChunk; | |
int currentChunkUsed; | |
await for (final element in this) { | |
var offsetInElement = 0; | |
// go through all bytes in the current message | |
while (offsetInElement < element.length) { | |
// If there's no chunk, we're expecting a new one, which starts with the length | |
if (currentChunk == null) { | |
// Extract length from the first 4 bytes | |
_encodeInts.buffer.asUint8List().setAll(0, | |
element.sublist(offsetInElement, offsetInElement + _int32Length)); | |
final length = _encodeInts.getUint32(0); | |
currentChunk = Uint8List(length); | |
currentChunkUsed = 0; | |
// Account for the bytes consumed to read the message length | |
offsetInElement += _int32Length; | |
if (offsetInElement >= element.length) { | |
break; // Done with the current message | |
} | |
} | |
// Try to fill current chunk | |
final available = min( | |
currentChunk.length - currentChunkUsed, | |
element.length - offsetInElement, | |
); | |
currentChunk.setAll(currentChunkUsed, | |
element.sublist(offsetInElement, offsetInElement + available)); | |
currentChunkUsed += available; | |
offsetInElement += available; | |
if (currentChunkUsed == currentChunk.length) { | |
yield currentChunk; | |
currentChunk = null; | |
currentChunkUsed = null; | |
} | |
} | |
} | |
} | |
} | |
extension SendLengthPrefixed on StreamSink<List<int>> { | |
void addWithLength(List<int> data) { | |
_encodeInts.setUint32(0, data.length); | |
add(_encodeInts.buffer.asUint8List()); | |
add(data); | |
} | |
} | |
void main() { | |
Stream<List<int>> entries = Stream.fromIterable([ | |
[ | |
0, 0, 0, 0, // first message (empty) | |
0, 0, 0, 1, 64, // second message ([64]) | |
0, 0, 0, 3 // length part of the third message (length = 3) | |
], | |
[1, 2, 3], // content of the third message | |
]); | |
entries.transformLengthPrefixed().forEach(print); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment