Skip to content

Instantly share code, notes, and snippets.

@st3fan
Created January 17, 2021 16:38
Show Gist options
  • Save st3fan/6237df263dcaa5f7111fbb8d3fb3f83b to your computer and use it in GitHub Desktop.
Save st3fan/6237df263dcaa5f7111fbb8d3fb3f83b to your computer and use it in GitHub Desktop.
import Foundation
import NIO
private let newLine = "\n".utf8.first!
/// Very simple example codec which will buffer inbound data until a `\n` was found.
final class LineDelimiterCodec: ByteToMessageDecoder {
public typealias InboundIn = ByteBuffer
public typealias InboundOut = ByteBuffer
public var cumulationBuffer: ByteBuffer?
public func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
let readable = buffer.withUnsafeReadableBytes { $0.firstIndex(of: newLine) }
if let r = readable {
context.fireChannelRead(self.wrapInboundOut(buffer.readSlice(length: r + 1)!))
return .continue
}
return .needMoreData
}
}
enum Command {
case quit
case get(key: String)
case set(key: String, value: String)
static func parse(_ str: String) -> Command? {
let components = str.components(separatedBy: .whitespaces)
if components.count == 0 {
return nil
}
switch components[0] {
case "GET":
if components.count == 2 {
return .get(key: components[1])
}
case "SET":
if components.count == 3 {
return .set(key: components[1], value: components[2])
}
case "QUIT":
return .quit
default:
return nil
}
return nil
}
}
var database = [String: String]()
class CacheHandler: ChannelInboundHandler {
public typealias InboundIn = ByteBuffer
public typealias OutboundOut = ByteBuffer
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
var buffer = self.unwrapInboundIn(data)
let str = buffer.readString(length: buffer.readableBytes)?.trimmingCharacters(in: .whitespacesAndNewlines)
print("[D] Received command: \(String(describing: str))")
guard let command = Command.parse(str!) else {
context.channel.close(mode: .all, promise: nil)
return
}
switch command {
case .get(let key):
print("Getting \(key)")
let value = database[key]!
var outgoing = context.channel.allocator.buffer(capacity: value.lengthOfBytes(using: .utf8) + 2)
outgoing.writeBytes(value.data(using: .utf8)!)
outgoing.writeBytes("\r\n".data(using: .utf8)!)
context.channel.writeAndFlush(outgoing, promise: nil)
case .set(let key, let value):
print("Setting \(key) to \(value)")
database[key] = value
case .quit:
print("Quitting")
context.channel.close(mode: .all, promise: nil)
}
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
print("error: ", error)
}
}
class CacheServer {
private let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
func start() throws {
print("[*] Starting server on 127.0.0.1:17625")
let channel = try serverBootstrap.bind(host: "127.0.0.1", port: 17625).wait()
try channel.closeFuture.wait()
}
func stop() throws {
}
private var serverBootstrap: ServerBootstrap {
let cacheHandler = CacheHandler()
return ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.backlog, value: 32)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { (channel) -> EventLoopFuture<Void> in
channel.pipeline.addHandler(ByteToMessageHandler(LineDelimiterCodec())).flatMap { v in
channel.pipeline.addHandler(cacheHandler)
}
}
}
}
let server = CacheServer(/* host: "127.0.0.1", port: 17625 */)
do {
try server.start()
} catch let error {
print("Could not start the server: \(error.localizedDescription)")
//server.stop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment