Last active
August 2, 2016 01:11
-
-
Save lxcid/eba382c6936c66e514c6858025c9278a to your computer and use it in GitHub Desktop.
DataBuffer + DataPipe
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// DataBuffer.swift | |
// WebSocket | |
// | |
// Created by Stan Chang Khin Boon on 30/7/16. | |
// Copyright © 2016 Trifia. All rights reserved. | |
// | |
/// DataBuffer uses a serial dispatch queue to manage its in and out operations asynchornously. | |
/// The use of dispatch queue ensure that in/out operations are thread safe. | |
/// But order of in/out operations are often important and care must taken when | |
/// scheduling in/out operations concurrently so as to ensure a deterministic behavior. | |
final class DataBuffer { | |
enum ReadResult { | |
case NoOperation | |
case Consume(bytes: Int) | |
} | |
typealias ReadHandler = (data: Data) -> ReadResult | |
typealias CompletionHandler = () -> Void | |
let serialQueue: DispatchQueue | |
var data: Data | |
init(serialQueue optSerialQueue: DispatchQueue? = nil) { | |
self.serialQueue = optSerialQueue ?? DispatchQueue(label: "com.trifia.networker.databuffer", attributes: [ .serial ], target: nil) | |
self.data = Data() | |
} | |
func asyncIn(data: Data, completionHandler: CompletionHandler? = nil) { | |
self.serialQueue.async { | |
self.in(data: data) | |
completionHandler?() | |
} | |
} | |
func asyncOut(handler: ReadHandler, completionHandler: CompletionHandler? = nil) { | |
self.serialQueue.async { | |
self.out(handler: handler) | |
completionHandler?() | |
} | |
} | |
func `in`(data: Data) { | |
dispatchPrecondition(condition: .onQueue(self.serialQueue)) | |
self.data.append(data) | |
} | |
func out(handler: ReadHandler) { | |
dispatchPrecondition(condition: .onQueue(self.serialQueue)) | |
let result = handler(data: self.data) | |
switch result { | |
case .NoOperation: | |
break // noop | |
case .Consume(bytes: let bytes): | |
self.data = self.data.subdata(in: Range(uncheckedBounds: (0, bytes))) | |
} | |
} | |
} | |
/// DataPipe wraps a DataBuffer, turning in operations into out events. | |
/// NOTE: Dispatch source was considered for coalescing out events, but because | |
/// it only promise that at least an out event is enqueued but not ensuring all in | |
/// operations are flush when the last out event is executed. This behavior might be | |
/// surprising for developer. We might consider other way to coalescing out events | |
/// in the future… e.g. Counting the in and scheduling out when its not 0… | |
final class DataPipe { | |
let dataBuffer: DataBuffer | |
var readHandler: DataBuffer.ReadHandler? | |
var serialQueue: DispatchQueue { | |
return self.dataBuffer.serialQueue | |
} | |
init(serialQueue optSerialQueue: DispatchQueue? = nil) { | |
let serialQueue = optSerialQueue ?? DispatchQueue(label: "com.trifia.networker.datapipe", attributes: [ .serial ], target: nil) | |
let dataBuffer = DataBuffer(serialQueue: serialQueue) | |
self.dataBuffer = dataBuffer | |
} | |
func asyncIn(data: Data) { | |
self.dataBuffer.asyncIn(data: data) { | |
self.flush() | |
} | |
} | |
func `in`(data: Data, flush: Bool = true) { | |
self.dataBuffer.in(data: data) | |
if flush { | |
self.flush() | |
} | |
} | |
func flush() { | |
guard let readHandler = self.readHandler else { | |
return | |
} | |
self.dataBuffer.out(handler: readHandler) | |
} | |
func asyncFlush() { | |
self.serialQueue.async { | |
self.flush() | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// | |
// TCPSession.swift | |
// WebSocket | |
// | |
// Created by Stan Chang Khin Boon on 29/7/16. | |
// Copyright © 2016 Trifia. All rights reserved. | |
// | |
import CoreFoundation | |
import Foundation | |
protocol TCPSessionDelegate : class { | |
func TCPSession(session: TCPSession, didReceiveData data: Data) -> DataBuffer.ReadResult | |
} | |
/// TCP Session represents a complete life cycle of a TCP socket. It is stateful. But it is not meant to be recycled. | |
class TCPSession { | |
let host: String | |
let port: UInt32 | |
let readStream: CFReadStream | |
let writeStream: CFWriteStream | |
let readQueue: DispatchQueue | |
let writeQueue: DispatchQueue | |
var readPipe: DataPipe | |
var writePipe: DataPipe | |
weak var delegate: TCPSessionDelegate? | |
init(host: String, port: UInt32, secure: Bool) throws { | |
guard !secure else { | |
throw Error.NotImplemented | |
} | |
var unmanagedReadStream: Unmanaged<CFReadStream>? | |
var unmanagedWriteStream: Unmanaged<CFWriteStream>? | |
CFStreamCreatePairWithSocketToHost(kCFAllocatorDefault, host, UInt32(port), &unmanagedReadStream, &unmanagedWriteStream) | |
guard let readStream = unmanagedReadStream?.takeRetainedValue() else { | |
throw Error.NoReadStream | |
} | |
guard let writeStream = unmanagedWriteStream?.takeRetainedValue() else { | |
throw Error.NoWriteStream | |
} | |
self.host = host | |
self.port = port | |
self.readStream = readStream | |
self.writeStream = writeStream | |
self.readQueue = DispatchQueue(label: "com.trifia.websocket.connection.read", attributes: [ .serial ], target: nil) | |
self.writeQueue = DispatchQueue(label: "com.trifia.websocket.connection.write", attributes: [ .serial ], target: nil) | |
self.readPipe = DataPipe(serialQueue: self.readQueue) | |
self.writePipe = DataPipe(serialQueue: self.writeQueue) | |
let commonStreamEvents: CFStreamEventType = [ | |
.openCompleted, | |
.errorOccurred, | |
.endEncountered | |
] | |
var context = CFStreamClientContext(version: CFIndex(0), info: Unmanaged.passUnretained(self).toOpaque(), retain: nil, release: nil, copyDescription: nil) | |
guard CFReadStreamSetClient(self.readStream, commonStreamEvents.union(.hasBytesAvailable).rawValue, read, &context) else { | |
throw Error.NoReadStream | |
} | |
guard CFWriteStreamSetClient(self.writeStream, commonStreamEvents.union(.canAcceptBytes).rawValue, write, &context) else { | |
throw Error.NoWriteStream | |
} | |
CFReadStreamSetDispatchQueue(self.readStream, self.readQueue) | |
CFWriteStreamSetDispatchQueue(self.writeStream, self.writeQueue) | |
self.writePipe.readHandler = { [weak self] (data: Data) -> DataBuffer.ReadResult in | |
return self?._writeHandler(data: data) ?? .NoOperation | |
} | |
self.readPipe.readHandler = { [weak self] (data: Data) -> DataBuffer.ReadResult in | |
guard let strongSelf = self, let delegate = strongSelf.delegate else { | |
return .NoOperation | |
} | |
return delegate.TCPSession(session: strongSelf, didReceiveData: data) | |
} | |
} | |
func connect() throws { | |
guard CFReadStreamOpen(readStream) else { | |
throw Error.NoReadStream | |
} | |
guard CFWriteStreamOpen(writeStream) else { | |
throw Error.NoWriteStream | |
} | |
} | |
func asyncSend(data: Data) { | |
self.writePipe.asyncIn(data: data) | |
} | |
func flush() { | |
self.writePipe.flush() | |
} | |
func _writeHandler(data: Data) -> DataBuffer.ReadResult { | |
dispatchPrecondition(condition: .onQueue(self.writeQueue)) | |
var totalNumberOfBytesWritten = 0 | |
while (CFWriteStreamCanAcceptBytes(self.writeStream) && (totalNumberOfBytesWritten < data.count)) { | |
let numberOfBytesWritten = data.withUnsafeBytes { (bufferPtr: UnsafeMutablePointer<UInt8>) -> CFIndex in | |
return CFWriteStreamWrite(self.writeStream, bufferPtr.advanced(by: totalNumberOfBytesWritten), CFIndex(data.count - totalNumberOfBytesWritten)) | |
} | |
if numberOfBytesWritten > 0 { | |
totalNumberOfBytesWritten += numberOfBytesWritten | |
} else if numberOfBytesWritten < 0 { | |
// TODO: ([email protected]) Encountered error. We should log… | |
} else { | |
// Noop | |
} | |
} | |
if totalNumberOfBytesWritten > 0 { | |
return .Consume(bytes: totalNumberOfBytesWritten) | |
} else { | |
return .NoOperation | |
} | |
} | |
} | |
extension TCPSession { | |
enum Error : ErrorProtocol { | |
case NoReadStream | |
case NoWriteStream | |
case NotImplemented | |
} | |
} | |
func read(_ readStream: CFReadStream?, _ event: CFStreamEventType, _ optContext: UnsafeMutablePointer<Void>?) { | |
guard let context = optContext else { | |
return | |
} | |
let session = Unmanaged<TCPSession>.fromOpaque(context).takeUnretainedValue() | |
if event.contains(.hasBytesAvailable) { | |
var readCount = 0 | |
while (CFReadStreamHasBytesAvailable(session.readStream)) { | |
let bufferCount = 1024 | |
guard var buffer = Data(count: bufferCount) else { | |
return | |
} | |
let optData = buffer.withUnsafeMutableBytes { (bufferPtr: UnsafeMutablePointer<UInt8>) -> Data? in | |
let numberOfBytesRead = CFReadStreamRead(readStream, bufferPtr, bufferCount) | |
if numberOfBytesRead > 0 { | |
let range = Range(uncheckedBounds: (0, numberOfBytesRead)) | |
let subdata = buffer.subdata(in: range) | |
buffer.resetBytes(in: range) | |
return subdata | |
} else if numberOfBytesRead < 0 { | |
// TODO: ([email protected]) Encountered error. We should log… | |
return nil | |
} else { | |
return nil | |
} | |
} | |
if let data = optData { | |
session.readPipe.in(data: data, flush: false) | |
readCount += 1 | |
} | |
} | |
if readCount > 0 { | |
session.readPipe.flush() | |
} | |
} else if event.contains(.openCompleted) { | |
} else if event.contains(.errorOccurred) { | |
} else if event.contains(.endEncountered) { | |
} | |
} | |
func write(_ writeStream: CFWriteStream?, _ event: CFStreamEventType, _ optContext: UnsafeMutablePointer<Void>?) { | |
guard let context = optContext else { | |
return | |
} | |
let session = Unmanaged<TCPSession>.fromOpaque(context).takeUnretainedValue() | |
if event.contains(.canAcceptBytes) { | |
session.flush() | |
} else if event.contains(.openCompleted) { | |
} else if event.contains(.errorOccurred) { | |
} else if event.contains(.endEncountered) { | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment