Created
June 28, 2023 12:18
-
-
Save willtemperley/0dc0f24adbd98b061d6d1aaffafa515d to your computer and use it in GitHub Desktop.
Iterator over binary delimited protobuf input stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// ProtoIter.swift | |
// | |
// Created by Will Temperley on 28/06/2023. | |
// Varint decoding adapted from SwiftProtobuf | |
// | |
import Foundation | |
import SwiftProtobuf | |
extension InputStream { | |
func readData(maxLength length: Int) throws -> Data { | |
var buffer = [UInt8](repeating: 0, count: length) | |
let result = self.read(&buffer, maxLength: buffer.count) | |
if result < 0 { | |
throw self.streamError ?? POSIXError(.EIO) | |
} else { | |
return Data(buffer.prefix(result)) | |
} | |
} | |
} | |
/** | |
Iterates over a binary-delimited protobuf input stream | |
*/ | |
class MessageIterator<M: Message>: IteratorProtocol { | |
let inputStream: InputStream | |
var pointer: Int = 0 | |
let bufferLength: Int | |
var buffer: Data | |
var endOfStream = false | |
init(inputStream: InputStream, bufferLength: Int = 32768) throws { | |
self.inputStream = inputStream | |
self.bufferLength = bufferLength | |
buffer = try inputStream.readData(maxLength: bufferLength) | |
} | |
func next() -> M? { | |
do { | |
if (endOfStream) { | |
return nil | |
} | |
let messageLength = try decodeVarint() | |
var message = M() | |
let data = try read(for: Int(messageLength)) | |
try message.merge(serializedData: data) | |
return message | |
} catch { | |
print(error) | |
return nil | |
} | |
} | |
/** | |
Read the required data quantity from the buffer, | |
loading more from the stream if necessary | |
*/ | |
func read(for nBytes: Int) throws -> Data { | |
let end = pointer + nBytes | |
if (end < buffer.count) { | |
let messageData = buffer[pointer..<end] | |
pointer += nBytes | |
return messageData | |
} else { | |
let prefix = buffer[pointer..<buffer.count] | |
let remainingByteCount = nBytes - prefix.count | |
var toLoad = bufferLength | |
if (remainingByteCount > bufferLength) { | |
let nPages = Int(ceil(Float(remainingByteCount) / Float(bufferLength))) | |
toLoad = nPages * bufferLength | |
} | |
//more data required | |
buffer = try inputStream.readData(maxLength: toLoad) | |
if (buffer.isEmpty) { | |
endOfStream = true | |
} | |
let suffix = buffer[0..<remainingByteCount] | |
pointer = remainingByteCount | |
return prefix + suffix | |
} | |
} | |
//Adapted from SwiftProtobuf BinaryDecoder | |
func decodeVarint() throws -> UInt64 { | |
let slice: Data = try read(for: 1) | |
var c = slice[slice.startIndex] | |
if c & 0x80 == 0 { | |
return UInt64(c) | |
} | |
var value = UInt64(c & 0x7f) | |
var shift = UInt64(7) | |
while true { | |
if shift > 63 { | |
throw BinaryDecodingError.malformedProtobuf | |
} | |
let slice: Data = try read(for: 1) | |
c = slice[slice.startIndex] | |
value |= UInt64(c & 0x7f) << shift | |
if c & 0x80 == 0 { | |
return value | |
} | |
shift += 7 | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment