Created
September 28, 2016 13:55
-
-
Save goloveychuk/8840ef125ec59aa3a57d2b49f18dcf6b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ZeroMQ | |
import Venice | |
import Foundation | |
import C7 | |
let context = try! Context() | |
struct Error: ErrorProtocol { | |
} | |
public typealias ZmqSerializer = ProtobufSerializer | |
public class ZmqConnection { | |
let socket: ZeroMQ.Socket | |
let address: String | |
var polled: Venice.PollEvent? = nil | |
var fd: Int32 { | |
return try! socket.getFileDescriptor() | |
} | |
init(address: String, type: SocketType) { | |
self.address = address | |
socket = try! context.socket(type) | |
} | |
public func connect() throws { | |
try socket.connect(address) | |
} | |
func read() -> [Data]? { | |
if polled == nil { | |
polled = try! Venice.poll(fd, for: .reading) | |
guard polled!.contains(.reading) else { | |
return nil | |
} | |
} | |
let events = try! socket.getEvents()! | |
if events.contains(.In) { | |
var msg_parts: [Data] = [] | |
while true { | |
guard let array = try! socket.receive() else { | |
break | |
} | |
msg_parts.append(array) | |
if !(try! socket.getReceiveMore()) { | |
break | |
} | |
} | |
return msg_parts | |
} else { | |
polled = nil | |
return nil | |
} | |
} | |
} | |
public class ZmqConnectionSubscribe: ZmqConnection { | |
public var onReceive: ([Data] -> Void)? | |
public init(address: String) { | |
super.init(address: address, type: .Sub) | |
} | |
public func subscribe(_ subscribe: Data) { | |
try! socket.setSubscribe(subscribe) | |
} | |
public func start_loop() { | |
while true { | |
if let red = read() { | |
onReceive!(red) | |
} | |
} | |
} | |
} | |
public class ZmqConnectionRequest: ZmqConnection { | |
var requests: [String: Channel<[Data]>] = [:] | |
public convenience init(address: String) { | |
self.init(address: address, type: .Dealer) | |
} | |
public func send(_ data: [Data]) throws -> ReceivingChannel<[Data]> { | |
let msgId = NSUUID().uuidString | |
guard try socket.sendString(msgId, mode: [.SendMore, .DontWait]) else { | |
throw Error() | |
} | |
guard try socket.send([], mode: [.SendMore, .DontWait]) else { | |
throw Error() | |
} | |
for msg in data[0..<data.count-1]{ | |
guard try socket.send(msg, mode: [.SendMore, .DontWait]) else { | |
throw Error() | |
} | |
} | |
guard try socket.send(data.last!, mode: .DontWait) else { | |
throw Error() | |
} | |
let ch = Channel<[Data]>(bufferSize: 1) | |
requests[msgId] = ch | |
return ch.receivingChannel | |
} | |
public func start_loop() { | |
while true { | |
if var data = read() { | |
let reqId = data.removeFirst() | |
let reqStr = try! String(data: reqId) | |
if let resp = requests.removeValue(forKey: reqStr) { | |
data.removeFirst() | |
resp.send(data) | |
} else { | |
print("no request") | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment