Last active
June 3, 2023 05:32
-
-
Save CodaFi/7dbd9be4b2a85f098c78b8ae286aa9fd to your computer and use it in GitHub Desktop.
An async sequence of kevents
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if canImport(Darwin) | |
import Darwin | |
#elseif canImport(Glibc) | |
import Glibc | |
#else | |
#error("unsupported platform") | |
#endif | |
/// Core utilities for interacting with IP-based abstractions. | |
public struct IP { | |
/// Represents a family of either IPv4 or IPv6 addresses in a single unified | |
/// type. | |
public enum AddressFamily: Sendable { | |
case v4(in_addr) | |
case v6(in6_addr) | |
/// Construct an IP address by parsing the given string. On failure, this | |
/// function returns `nil`. | |
/// | |
/// - IPv4 addresses are of the form `ddd.ddd.ddd.ddd` where `d` is in the range [0...255] | |
/// - IPv6 addresses come in many forms: | |
/// - `x:x:x:x:x:x:x:x` where `x` is up to four hexidecimal digits | |
/// - An abbreviated format that drops any leading zeroes `::x` where `x` is a hexadecimal digit | |
/// - A mixed IPv4/IPv6 address of the form `x:x:x:x:x:x:d.d.d.d`. | |
public init?(parsing: String) { | |
do { | |
var a4 = in_addr() | |
let result = withUnsafeMutableBytes(of: &a4) { buf in | |
return inet_pton(AF_INET, parsing, buf.baseAddress) | |
} | |
guard result != 1 else { | |
self = .v4(a4) | |
return | |
} | |
} | |
do { | |
var a6 = in6_addr() | |
let result = withUnsafeMutableBytes(of: &a6) { buf in | |
return inet_pton(AF_INET6, parsing, buf.baseAddress) | |
} | |
guard result != 1 else { | |
self = .v6(a6) | |
return | |
} | |
} | |
return nil | |
} | |
} | |
/// Constructs an IPv4 address from the given integral address parts. | |
/// | |
/// - Parameters: | |
/// - a: The first 8 bits. | |
/// - b: The second 8 bits. | |
/// - c: The third 8 bits. | |
/// - d: The fourth 8 bits. | |
/// - Returns: An IPv4 address value that represents the combined 32-bit address. | |
public static func v4(_ a: UInt8, _ b: UInt8, _ c: UInt8, _ d: UInt8) -> IP.AddressFamily { | |
let addr = (UInt32(a) << 24) | (UInt32(b) << 16) | (UInt32(c) << 8) | UInt32(d) | |
return .v4(in_addr(s_addr: addr.bigEndian)) | |
} | |
/// Constructs an IPv6 address from the given integral address parts. | |
/// | |
/// - Parameters: | |
/// - a: The first 16 bits. | |
/// - b: The second 16 bits. | |
/// - c: The third 16 bits. | |
/// - d: The fourth 16 bits. | |
/// - e: The fifth 16 bits. | |
/// - f: The sixth 16 bits. | |
/// - g: The seventh 16 bits. | |
/// - h: The eighth 16 bits. | |
/// - Returns: An IPv6 address value that represents the combined 128-bit address. | |
public static func v6(_ a: UInt16, _ b: UInt16, _ c: UInt16, _ d: UInt16, _ e: UInt16, _ f: UInt16, _ g: UInt16, _ h: UInt16) -> IP.AddressFamily { | |
return .v6(in6_addr(a, b, c, d, e, f, g, h)) | |
} | |
} | |
extension IP.AddressFamily: Equatable { | |
public static func == (lhs: IP.AddressFamily, rhs: IP.AddressFamily) -> Bool { | |
switch (lhs, rhs) { | |
case let (.v4(lv4), .v4(rv4)): | |
return lv4.s_addr == rv4.s_addr | |
case let (.v6(lv6), .v6(rv6)): | |
return withUnsafeBytes(of: lv6) { lbuf in | |
return withUnsafeBytes(of: rv6) { rbuf in | |
return memcmp(lbuf.baseAddress!, rbuf.baseAddress!, MemoryLayout<in6_addr>.size) == 0 | |
} | |
} | |
default: | |
return false | |
} | |
} | |
} | |
extension IP.AddressFamily: Hashable { | |
public func hash(into hasher: inout Hasher) { | |
switch self { | |
case .v4(let addr): | |
hasher.combine(0) | |
addr.s_addr.hash(into: &hasher) | |
case .v6(let addr): | |
hasher.combine(1) | |
return withUnsafeBytes(of: addr) { rbuf in | |
let (lo, hi) = rbuf.baseAddress!.load(as: (UInt64, UInt64).self) | |
lo.hash(into: &hasher) | |
hi.hash(into: &hasher) | |
} | |
} | |
} | |
} | |
extension IP.AddressFamily: CustomStringConvertible { | |
public var description: String { | |
switch self { | |
case .v4(let v4Addr): | |
var buffer = [CChar](repeating: 0, count: Int(INET_ADDRSTRLEN) + 1) | |
return buffer.withUnsafeMutableBufferPointer { descBuf in | |
return withUnsafeBytes(of: v4Addr) { (addrBuf: UnsafeRawBufferPointer) -> String in | |
guard inet_ntop(AF_INET, addrBuf.baseAddress, descBuf.baseAddress, socklen_t(INET_ADDRSTRLEN)) != nil else { | |
return "" | |
} | |
return String(cString: descBuf.baseAddress!) | |
} | |
} | |
case .v6(let v6Addr): | |
var buffer = [CChar](repeating: 0, count: Int(INET6_ADDRSTRLEN) + 1) | |
return buffer.withUnsafeMutableBufferPointer { descBuf in | |
return withUnsafeBytes(of: v6Addr) { (addrBuf: UnsafeRawBufferPointer) -> String in | |
guard inet_ntop(AF_INET6, addrBuf.baseAddress, descBuf.baseAddress, socklen_t(INET6_ADDRSTRLEN)) != nil else { | |
return "" | |
} | |
return String(cString: descBuf.baseAddress!) | |
} | |
} | |
} | |
} | |
} | |
extension in6_addr { | |
fileprivate init(_ a: UInt16, _ b: UInt16, _ c: UInt16, _ d: UInt16, _ e: UInt16, _ f: UInt16, _ g: UInt16, _ h: UInt16) { | |
self.init() | |
memcpy(&self, [ | |
UInt8(a >> 8), | |
UInt8(a), | |
UInt8(b >> 8), | |
UInt8(b), | |
UInt8(c >> 8), | |
UInt8(c), | |
UInt8(d >> 8), | |
UInt8(d), | |
UInt8(e >> 8), | |
UInt8(e), | |
UInt8(f >> 8), | |
UInt8(f), | |
UInt8(g >> 8), | |
UInt8(g), | |
UInt8(h >> 8), | |
UInt8(h), | |
] as [UInt8], 16) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if canImport(Darwin) | |
import Darwin | |
#elseif canImport(Glibc) | |
import Glibc | |
#else | |
#error("unsupported platform") | |
#endif | |
public enum IO { | |
public struct Error: Swift.Error, CustomDebugStringConvertible { | |
public var code: POSIXErrorCode | |
public static func currentErrorCode() -> Error { | |
return IO.Error(code: POSIXErrorCode(rawValue: errno)!) | |
} | |
public var localizedDescription: String { | |
return "SocketToMe.IO.Error(code: \(self.code.rawValue) -> \(String(validatingUTF8: strerror(self.code.rawValue))!))" | |
} | |
public var debugDescription: String { | |
return "SocketToMe.IO.Error(code: \(self.code.rawValue) -> \(String(validatingUTF8: strerror(self.code.rawValue))!))" | |
} | |
} | |
} | |
extension IO.Error: Equatable { | |
public static func == (lhs: IO.Error, rhs: IO.Error) -> Bool { | |
return lhs.code == rhs.code | |
} | |
} | |
extension IO.Error { | |
public static let outOfMemory = IO.Error(code: .ENOMEM) | |
public static let wouldBlock = IO.Error(code: .EWOULDBLOCK) | |
public static let tryAgain = IO.Error(code: .EAGAIN) | |
public static let genericIO = IO.Error(code: .EIO) | |
public static let interrupted = IO.Error(code: .EINTR) | |
} | |
/// A type that supports extracting a raw socket value. | |
public protocol SocketStream { | |
/// The underlying raw socket value. | |
var socket: Socket { get } | |
} | |
extension SocketStream { | |
public func shutdown(_ ends: Socket.Shutdown) -> Result<Void, IO.Error> { | |
return self.socket.shutdown(ends) | |
} | |
} | |
/// A type that supports reading bytes from an underlying source into a provided | |
/// bounding buffer. | |
/// | |
/// Implementing ``ReadableStream`` requires only that synchronous access to the | |
/// underlying resource be possible. The provided bounding buffer provides a maximum for | |
/// the number of bytes the client is willing to accept at a time. A conforming | |
/// implementation may read any number of bytes less than `buffer.count`. | |
/// | |
/// Types implementing ``ReadableStream`` compose well to form stacks that can | |
/// interpret I/O operations in various ways. You might, for example, compose | |
/// a ``ReadableStream`` that implements a security protocol with one that reads | |
/// from an underlying socket. | |
public protocol ReadableStream { | |
/// Pull some bytes from this source into the given buffer, returning how many | |
/// bytes were read. | |
/// | |
/// The details of how the reads are performed are entirely | |
/// implementation-defined. A conforming implementation may be synchronous or | |
/// asynchronous, and may throw an error if blocking I/O occurs unexpectedly. | |
/// | |
/// A conforming implementation may similarly not rely on the contents of | |
/// the buffer having a certain shape or size. A zero-sized buffer is one | |
/// possible input for which many valid behaviors are acceptable. Similarly, | |
/// reading a stream that has reached an EOF-like state may or may not throw | |
/// or simply return 0 bytes read. | |
/// | |
/// - Returns: The number of bytes read into the given `buffer`. | |
mutating func read(into buffer: UnsafeMutableBufferPointer<UInt8>) throws -> Int | |
} | |
/// A type that supports writing bytes from a source buffer into a set | |
/// destination. | |
/// | |
/// Implementing ``WritableStream`` requires only that synchronous access to the | |
/// underlying resource be possible. The provided buffer provides a maximum | |
/// for the number of bytes the client is requesting be written at a time. A | |
/// conforming implementation may write any number of bytes less than | |
/// `buffer.count`. | |
/// | |
/// The details of how the writes are performed are entirely | |
/// implementation-defined. A conforming implementation may be synchronous or | |
/// asynchronous, and may throw an error if blocking I/O occurs unexpectedly. | |
/// | |
/// A conforming implementation may similarly not rely on the contents of | |
/// the buffer having a certain shape or size. A zero-sized buffer is one | |
/// possible input for which many valid behaviors are acceptable. Similarly, | |
/// writing to a stream that has reached an EOF-like state may or may not throw | |
/// or simply return 0 bytes read. | |
/// | |
/// Types implementing ``WritableStream`` compose well to form stacks that can | |
/// interpret I/O operations in various ways. You might, for example, compose | |
/// a ``WritableStream`` that implements an encryption layer with one that | |
/// writes to an underlying socket. | |
public protocol WritableStream { | |
/// Writes some bytes from this source into the given buffer, returning how many | |
/// bytes were written. | |
/// | |
/// The details of how the writes are performed are entirely | |
/// implementation-defined. A conforming implementation may be synchronous or | |
/// asynchronous, and may throw an error if blocking I/O occurs unexpectedly. | |
/// | |
/// A conforming implementation may similarly not rely on the contents of | |
/// the buffer having a certain shape or size. A zero-sized buffer is one | |
/// possible input for which many valid behaviors are acceptable. Similarly, | |
/// writing to a stream that has reached an EOF-like state may or may not throw | |
/// or simply return 0 bytes written. | |
/// | |
/// - Returns: The number of bytes written from the given `buffer`. | |
mutating func write(contentsOf buffer: UnsafeBufferPointer<UInt8>) throws -> Int | |
} | |
func handlePOSIXError(_ fn: () -> Int32) -> Result<(), IO.Error> { | |
let res = fn() | |
guard res != 0 else { | |
return .success(()) | |
} | |
let err = POSIXErrorCode(rawValue: errno)! | |
return .failure(IO.Error(code: err)) | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if canImport(Darwin) | |
import Darwin | |
#elseif canImport(Glibc) | |
import Glibc | |
#else | |
#error("unsupported platform") | |
#endif | |
/// A `Socket` represents a UNIX Socket - an abstraction for opening a communication channel across | |
/// a specified domain. | |
public struct Socket: Sendable { | |
public enum AddressFamily { | |
case v4(sockaddr_in) | |
case v6(sockaddr_in6) | |
public init(address: IP.AddressFamily, port: UInt16) { | |
switch address { | |
case .v4(let a): | |
self = .v4(sockaddr_in(address: a, port: port)) | |
case .v6(let a): | |
self = .v6(sockaddr_in6(address: a, port: port, flowInfo: 0, scopeID: 0)) | |
} | |
} | |
public init(unsafeUninitializedAddress block: (UnsafeMutablePointer<sockaddr>?, inout socklen_t) -> Void) { | |
var storage = sockaddr_storage() | |
var len = socklen_t(MemoryLayout<sockaddr_storage>.size) | |
storage.withRawAddress { addr, _ in | |
block(addr, &len) | |
} | |
let family = storage.ss_family | |
self = storage.withRawAddress { addr, _ in | |
switch family { | |
case sa_family_t(AF_INET): | |
assert(Int(len) >= MemoryLayout<sockaddr_in>.size) | |
return addr!.withMemoryRebound(to: sockaddr_in.self, capacity: 1) { ptr in | |
return .v4(ptr.pointee) | |
} | |
case sa_family_t(AF_INET6): | |
assert(Int(len) >= MemoryLayout<sockaddr_in6>.size) | |
return addr!.withMemoryRebound(to: sockaddr_in6.self, capacity: 1) { ptr in | |
return .v6(ptr.pointee) | |
} | |
default: | |
fatalError() | |
} | |
} | |
} | |
func withRawAddress<T>(_ block: (UnsafePointer<sockaddr>?, inout socklen_t) throws -> T) rethrows -> T { | |
switch self { | |
case .v4(let rawAddr): | |
return try withUnsafeBytes(of: rawAddr) { (bvptr) -> T in | |
var len = socklen_t(MemoryLayout<sockaddr_in>.size(ofValue: rawAddr)) | |
return try block(bvptr.bindMemory(to: sockaddr.self).baseAddress, &len) | |
} | |
case .v6(let rawAddr): | |
return try withUnsafeBytes(of: rawAddr) { (bvptr) -> T in | |
var len = socklen_t(MemoryLayout<sockaddr_in6>.size(ofValue: rawAddr)) | |
return try block(bvptr.bindMemory(to: sockaddr.self).baseAddress, &len) | |
} | |
} | |
} | |
public var address: IP.AddressFamily { | |
switch self { | |
case .v4(let rawAddr): | |
return IP.AddressFamily.v4(rawAddr.sin_addr) | |
case .v6(let rawAddr): | |
return IP.AddressFamily.v6(rawAddr.sin6_addr) | |
} | |
} | |
public var port: UInt16 { | |
switch self { | |
case .v4(let rawAddr): | |
return rawAddr.sin_port.byteSwapped | |
case .v6(let rawAddr): | |
return rawAddr.sin6_port.byteSwapped | |
} | |
} | |
} | |
public var fd: UnsafeFileDescriptor | |
private init(fd: UnsafeFileDescriptor) { | |
self.fd = fd | |
} | |
init(family: CInt, type: CInt) throws { | |
self.fd = UnsafeFileDescriptor(fd: socket(family, type, 0)) | |
_ = try self.fd.setCloseOnExec().get() | |
#if canImport(Darwin) | |
var enable = 1 | |
_ = try handlePOSIXError({ | |
setsockopt(self.fd.fd, SOL_SOCKET, SO_NOSIGPIPE, &enable, socklen_t(MemoryLayout<CInt>.size)) | |
}).get() | |
#endif | |
} | |
public init(address: Socket.AddressFamily, type: CInt) throws { | |
switch address { | |
case .v4(_): | |
self = try Socket(family: AF_INET, type: type) | |
case .v6(_): | |
self = try Socket(family: AF_INET6, type: type) | |
} | |
} | |
} | |
extension Socket { | |
/// Connect this socket to the given address. | |
public mutating func connect(to address: Socket.AddressFamily) -> Result<Void, IO.Error> { | |
// FIXME: Handle EINPROGRESS | |
return self.fd.withAsyncIO { rawFD in | |
return address.withRawAddress { (addrp, len) -> Int32 in | |
#if canImport(Darwin) | |
return Darwin.connect(rawFD, addrp, len) | |
#else | |
return Glibc.connect(rawFD, addrp, len) | |
#endif | |
} | |
}.map({ _ in () }) | |
} | |
mutating func accept(storage: UnsafeMutablePointer<sockaddr>?, length: inout socklen_t) -> Result<Socket, IO.Error> { | |
#if canImport(Darwin) | |
let rawFD = Darwin.accept(self.fd.fd, storage, &length) | |
#else | |
let rawFD = Glibc.accept(self.fd.fd, storage, &length) | |
#endif | |
guard rawFD != -1 else { | |
let err = POSIXErrorCode(rawValue: errno)! | |
return .failure(IO.Error(code: err)) | |
} | |
var fd = UnsafeFileDescriptor(fd: rawFD) | |
switch fd.setCloseOnExec() { | |
case .failure(let e): | |
return .failure(e) | |
default: | |
return .success(Socket(fd: fd)) | |
} | |
} | |
/// Duplicates this socket. | |
public func duplicate() -> Result<Socket, IO.Error> { | |
return self.fd.duplicate().map(Socket.init(fd:)) | |
} | |
func recieve(buffer: UnsafeMutableBufferPointer<UInt8>, flags: CInt) -> Result<Int, IO.Error> { | |
let result = recv(self.fd.fd, UnsafeMutableRawPointer(buffer.baseAddress!), buffer.count, flags) | |
return .success(result) | |
} | |
/// Reads data from the socket into the given buffer. | |
public func read(buffer: UnsafeMutableBufferPointer<UInt8>) -> Result<Int, IO.Error> { | |
return self.recieve(buffer: buffer, flags: 0) | |
} | |
/// Writes data from the given buffer into the socket. | |
public func write(buffer: UnsafeBufferPointer<UInt8>) -> Result<Int, IO.Error> { | |
return self.fd.write(buffer: buffer) | |
} | |
/// Peeks at data from the socket, writing it into the given buffer without actually removing the data from | |
/// the underlying queue maintained by the operating system. This ensures that subsequent peeks and | |
/// reads will recieve the same data. | |
public func peek(buffer: UnsafeMutableBufferPointer<UInt8>) -> Result<Int, IO.Error> { | |
return self.recieve(buffer: buffer, flags: MSG_PEEK | MSG_DONTWAIT) | |
} | |
/// Close the socket connection. | |
public func close() -> Result<Void, IO.Error> { | |
return self.fd.close() | |
} | |
public struct Shutdown: OptionSet, Sendable { | |
public let rawValue: UInt8 | |
public init(rawValue: UInt8) { self.rawValue = rawValue } | |
public static let read = Shutdown(rawValue: 1 << 0) | |
public static let write = Shutdown(rawValue: 1 << 1) | |
} | |
public func shutdown(_ ends: Shutdown) -> Result<Void, IO.Error> { | |
precondition(ends.rawValue != 0) | |
let how: Int32 | |
switch ends { | |
case [.read, .write]: | |
how = SHUT_RDWR | |
case .read: | |
how = SHUT_RD | |
case .write: | |
how = SHUT_WR | |
default: | |
fatalError() | |
} | |
return handlePOSIXError { | |
#if canImport(Darwin) | |
return Darwin.shutdown(self.fd.fd, how) | |
#else | |
return Glibc.shutdown(self.fd.fd, how) | |
#endif | |
} | |
} | |
} | |
extension sockaddr_in { | |
init(address: in_addr, port: UInt16) { | |
self = sockaddr_in() | |
bzero(&self, MemoryLayout<sockaddr_in>.size) | |
self.sin_family = sa_family_t(AF_INET) | |
self.sin_addr = address | |
self.sin_port = port.bigEndian | |
} | |
} | |
extension sockaddr_in6 { | |
init(address: in6_addr, port: UInt16, flowInfo: UInt32, scopeID: UInt32) { | |
self = sockaddr_in6() | |
bzero(&self, MemoryLayout<sockaddr_in6>.size) | |
self.sin6_family = sa_family_t(AF_INET6) | |
self.sin6_addr = address | |
self.sin6_port = port.bigEndian | |
self.sin6_flowinfo = flowInfo | |
self.sin6_scope_id = scopeID | |
} | |
} | |
extension sockaddr_un { | |
static func atPath<T>(_ bound: String, _ f: (inout sockaddr_un, socklen_t) throws -> T) throws -> T { | |
var addr = sockaddr_un() | |
addr.sun_family = sa_family_t(AF_UNIX) | |
guard (bound.count + 1) < sockaddr_un.sunPathLength else { | |
throw IO.Error(code: .ENAMETOOLONG) | |
} | |
withUnsafeMutablePointer(to: &addr.sun_path) { pathPointer -> Void in | |
return pathPointer.withMemoryRebound(to: CChar.self, capacity: sockaddr_un.sunPathLength) { path in | |
bound.withCString { byteBuf -> Void in | |
memcpy(path, byteBuf, bound.count) | |
} | |
assert(path[bound.count] == 0) | |
} | |
} | |
var len = socklen_t(sockaddr_un.offsetOfSunPath + bound.count) | |
if !bound.isEmpty { | |
len += 1 | |
} | |
return try f(&addr, len) | |
} | |
mutating func withRawAddress<T>(_ block: (UnsafeMutablePointer<sockaddr>?, socklen_t) throws -> T) rethrows -> T { | |
return try withUnsafeMutablePointer(to: &self) { (bvptr) -> T in | |
return try bvptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { bvptr in | |
let len = socklen_t(MemoryLayout<sockaddr_un>.size) | |
return try block(bvptr, len) | |
} | |
} | |
} | |
} | |
private protocol _SockSunKeyPathHelper { | |
associatedtype SunPathType | |
var sun_path: SunPathType { get } | |
} | |
extension sockaddr_un: _SockSunKeyPathHelper { | |
static var offsetOfSunPath: Int { | |
return MemoryLayout<sockaddr_un>.offset(of: \.sun_path as KeyPath<sockaddr_un, sockaddr_un.SunPathType>).unsafelyUnwrapped | |
} | |
static var sunPathLength: Int { | |
return MemoryLayout<sockaddr_un.SunPathType>.size / MemoryLayout<CChar>.size | |
} | |
} | |
extension sockaddr_storage { | |
mutating func withRawAddress<T>(_ block: (UnsafeMutablePointer<sockaddr>?, socklen_t) throws -> T) rethrows -> T { | |
return try withUnsafeMutablePointer(to: &self) { (bvptr) -> T in | |
return try bvptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { bvptr in | |
let len = socklen_t(MemoryLayout<sockaddr_storage>.size) | |
return try block(bvptr, len) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Darwin | |
/// A socket source represents a stream of events about the readability or | |
/// writability of a socket. | |
/// | |
/// These sources can be plugged into a ``KEvent/Registry`` to enable the | |
/// aggregate delivery of events back to your application. | |
/// | |
/// Tags | |
/// ==== | |
/// | |
/// When multiple sources are plugged into the same registry, tags are used to | |
/// disambiguate the source of the event. To create a tag, use a raw | |
/// representable enum. For example, a socket source that observes the read and | |
/// write end of a single socket looks like: | |
/// | |
/// enum Tag: UInt64 { | |
/// case read | |
/// case write | |
/// } | |
/// | |
/// Then you can create a registry and an async event consumer | |
/// | |
/// var registry = KEvent.Registry<Tag>() | |
/// var readSource = SocketSource<Tag>(read: stream) | |
/// var writeSource = SocketSource<Tag>(write: stream) | |
/// | |
/// registry.register(&self.readSource, tag: .read) | |
/// registry.register(&self.writeSource, tag: .write) | |
/// | |
/// let registryTask = Task { [registry] in | |
/// for try await (tag, _) in registry { | |
/// switch tag { | |
/// case .read: | |
/// try await flushRead() | |
/// case .write: | |
/// try await flushWrite() | |
/// } | |
/// } | |
/// } | |
/// | |
/// When you're done with the event loop, cancel the registry task | |
/// | |
/// registryTask.cancel() | |
/// | |
/// And finally cancel the registry to clean up the underlying kqueue. | |
/// | |
/// registry.cancel() | |
public struct SocketSource<Tag: RawRepresentable> | |
where | |
Tag.RawValue == UInt64 | |
{ | |
private enum Interest { | |
case read | |
case write | |
var asFilter: KEvent.Filter { | |
switch self { | |
case .read: | |
return .read | |
case .write: | |
return .write | |
} | |
} | |
} | |
var socket: Socket | |
private var registeredQueue: UnsafeFileDescriptor? | |
private var registeredToken: Tag? | |
private var interest: Interest | |
private var resumeCount: Int | |
public init(read stream: ReadableStream & SocketStream) { | |
self.socket = stream.socket | |
self.registeredQueue = nil | |
self.registeredToken = nil | |
self.interest = .read | |
self.resumeCount = 1 | |
} | |
public init(write stream: WritableStream & SocketStream) { | |
self.socket = stream.socket | |
self.registeredQueue = nil | |
self.registeredToken = nil | |
self.interest = .write | |
self.resumeCount = 1 | |
} | |
/// Suspend the socket source. | |
/// | |
// By suspending a socket source, your application can temporarily prevent | |
// the execution of any tasks associated with that object. The suspension | |
// occurs after completion of any tasks running at the time of the call. | |
// | |
// Calling this function increments the suspension count of the source, and | |
// calling ``SocketSource/resume()`` decrements it. While the count is | |
// greater than zero, the socket source remains suspended, so you must | |
// balance each ``SocketSource/suspend()`` call with a matching | |
// ``SocketSource/resume()`` call. | |
public mutating func suspend() -> Result<(), IO.Error> { | |
guard let kq = self.registeredQueue else { | |
return .success(()) | |
} | |
self.resumeCount -= 1 | |
guard self.resumeCount == 0 else { | |
return .success(()) | |
} | |
let flags: KEvent.Flags = [ .delete, .receipt ] | |
var changes = [ | |
kevent64_s(id: socket.fd.fd, filter: self.interest.asFilter, flags: flags, token: 0), | |
] | |
return keventRegister(kq, changes: &changes) | |
} | |
/// Resume the socket source. | |
/// | |
/// Calling this function decrements the suspension count of a suspended | |
/// socket source object. While the count is greater than zero, the object | |
/// remains suspended. When the suspension count returns to zero, any tasks | |
/// submitted to the socket source while suspended are delivered. | |
public mutating func resume() -> Result<(), IO.Error> { | |
guard let kq = self.registeredQueue, let tag = self.registeredToken else { | |
return .success(()) | |
} | |
self.resumeCount += 1 | |
guard self.resumeCount == 1 else { | |
return .success(()) | |
} | |
let flags: KEvent.Flags = [ .clear, .receipt ] | |
var changes = [ | |
kevent64_s(id: socket.fd.fd, | |
filter: self.interest.asFilter, | |
flags: flags, | |
token: tag.rawValue), | |
] | |
return keventRegister(kq, changes: &changes) | |
} | |
fileprivate mutating func register(in queue: UnsafeFileDescriptor, tag: Tag) { | |
self.registeredQueue = queue | |
self.registeredToken = tag | |
switch interest { | |
case .read: | |
var changes = [ | |
kevent64_s(id: self.socket.fd.fd, | |
filter: .read, | |
flags: [ .clear, .receipt, .add ], | |
token: tag.rawValue) | |
] | |
_ = keventRegister(queue, changes: &changes) | |
case .write: | |
var changes = [ | |
kevent64_s(id: self.socket.fd.fd, | |
filter: .write, | |
flags: [ .clear, .receipt, .add ], | |
token: tag.rawValue) | |
] | |
_ = keventRegister(queue, changes: &changes) | |
} | |
} | |
} | |
private func keventRegister(_ kq: UnsafeFileDescriptor, changes: inout [kevent64_s]) -> Result<(), IO.Error> { | |
return handlePOSIXError { | |
return changes.withUnsafeMutableBufferPointer { buf in | |
return kevent64(kq.fd, | |
UnsafePointer(buf.baseAddress!), Int32(buf.count), | |
buf.baseAddress!, Int32(buf.count), | |
0, nil) | |
} | |
} | |
} | |
extension kevent64_s { | |
fileprivate init(id: CInt, filter: KEvent.Filter, flags: KEvent.Flags, token: UInt64) { | |
self = kevent64_s(ident: UInt64(id), filter: filter.rawValue, flags: flags.rawValue, fflags: 0, data: 0, udata: token, ext: (0, 0)) | |
} | |
} | |
public enum KEvent { | |
public struct Registry<Tag: RawRepresentable> | |
where | |
Tag.RawValue == UInt64 | |
{ | |
private var queue: UnsafeFileDescriptor | |
private var eventBuffer: [kevent64_s] | |
private var cursor: Int | |
public init() { | |
self.queue = UnsafeFileDescriptor(fd: kqueue()) | |
_ = self.queue.setCloseOnExec() | |
self.cursor = 0 | |
self.eventBuffer = [] | |
self.eventBuffer.reserveCapacity(128) | |
} | |
public mutating func register(_ source: inout SocketSource<Tag>, tag: Tag) { | |
source.register(in: queue, tag: tag) | |
} | |
} | |
} | |
extension KEvent { | |
public struct Filter: RawRepresentable, Sendable { | |
public var rawValue: Int16 | |
public init(rawValue: Int16) { | |
self.rawValue = rawValue | |
} | |
public static let zero = Filter(rawValue: 0) | |
public static let read = Filter(rawValue: -1) | |
public static let write = Filter(rawValue: -2) | |
} | |
public struct Flags: OptionSet, Sendable { | |
public var rawValue: UInt16 | |
public init(rawValue: UInt16) { | |
self.rawValue = rawValue | |
} | |
/// Add event to kq (implies enable) | |
public static let add = Flags(rawValue: 0x0001) | |
/// Delete event from kq | |
public static let delete = Flags(rawValue: 0x0002) | |
/// Enable event | |
public static let enable = Flags(rawValue: 0x0004) | |
/// Disable event (not reported) | |
public static let disable = Flags(rawValue: 0x0008) | |
/// Only report one occurrence | |
public static let oneshot = Flags(rawValue: 0x0010) | |
/// Clear event state after reporting | |
public static let clear = Flags(rawValue: 0x0020) | |
/// Force immediate event output | |
public static let receipt = Flags(rawValue: 0x0040) | |
} | |
} | |
extension KEvent.Registry { | |
/// Close the registry for event processing and deallocate | |
/// its resources. | |
/// | |
/// Once the registry has been cancelled, it is considered in | |
/// an invalid state. Any further attempts to register socket | |
/// sources will result in an error. Instead, a new registry | |
/// should be created. | |
public __consuming func cancel() { | |
if case let .failure(err) = self.queue.close() { | |
fatalError("Could not close kqueue for registry: \(err)") | |
} | |
} | |
} | |
extension KEvent.Registry: Sendable where Tag: Sendable {} | |
extension KEvent.Registry: AsyncIteratorProtocol, AsyncSequence { | |
public func makeAsyncIterator() -> Self { | |
return self | |
} | |
public mutating func next() async throws -> (Tag, kevent64_s)? { | |
if self.cursor < self.eventBuffer.count { | |
let event = self.eventBuffer[self.cursor] | |
self.cursor += 1 | |
return (Tag(rawValue: event.udata)!, event) | |
} | |
return try await reloadEvents() | |
} | |
private mutating func reloadEvents() async throws -> (Tag, kevent64_s)? { | |
try Task.checkCancellation() | |
self.eventBuffer.removeAll(keepingCapacity: true) | |
self.cursor = 0 | |
// Opportunistically try to fill the event buffer. | |
kevent64(self.queue.fd, | |
nil, 0, | |
&self.eventBuffer, Int32(self.eventBuffer.capacity), | |
0, nil) | |
// So there weren't any events. | |
while self.eventBuffer.isEmpty { | |
// Let's see if another task can make progress instead. | |
await Task.yield() | |
// Make sure we weren't cancelled in the mean time. | |
try Task.checkCancellation() | |
// Opportunistically try to fill the event buffer. | |
kevent64(self.queue.fd, | |
nil, 0, | |
&self.eventBuffer, Int32(self.eventBuffer.capacity), | |
0, nil) | |
} | |
// We're full up again. | |
return try await self.next() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#if canImport(Darwin) | |
import Darwin | |
#elseif canImport(Glibc) | |
import Glibc | |
#else | |
#error("unsupported platform") | |
#endif | |
/// ``UnsafeFileDescriptor`` is an extremely low-level wrapper around a raw POSIX file | |
/// descriptor. It provides a number of higher-level operations that abstract the corresponding POSIX routines | |
/// but it makes no attempt to try to guard against misuse of the API surface. In particular, this abstraction | |
/// does not support automatically closing the underlying file descriptor when it is no longer in use. | |
/// In general, the validity of the underlying file descriptor is not guaranteed, i.e. it is possible for | |
/// other low-level routines and system calls to interfere with the state of the underlying file descriptor, | |
/// especially when the underlying file descriptor is escaped to routines that are outside of the control of | |
/// the SocketToMe library. | |
/// | |
/// A file descriptor represents a low-level connection between a file controlled by the operating system and | |
/// a userspace application. POSIX affords three file descriptors often collectively referred to as the | |
/// "standard streams":`.standardInput`, `.standardOutput`, and `.standardError`. | |
public struct UnsafeFileDescriptor: Sendable { | |
public let fd: CInt | |
public init(fd: CInt) { | |
self.fd = fd | |
} | |
public static let standardInput = UnsafeFileDescriptor(fd: STDIN_FILENO) | |
public static let standardOutput = UnsafeFileDescriptor(fd: STDOUT_FILENO) | |
public static let standardError = UnsafeFileDescriptor(fd: STDERR_FILENO) | |
/// Sets the "close on exec" bit on the underlying file descriptor. | |
/// | |
/// When a userspace process forks, the child inherits all open file descriptors from its parent. When a | |
/// function in the `exec` family is called, those file descriptors can potentially leak if the child forgets to | |
/// close them. | |
mutating func setCloseOnExec() -> Result<(), IO.Error> { | |
let previousSetting = fcntl(self.fd, F_GETFD) | |
let newSetting = previousSetting | FD_CLOEXEC | |
if previousSetting != newSetting { | |
return handlePOSIXError { fcntl(self.fd, F_SETFD, newSetting) } | |
} | |
return .success(()) | |
} | |
/// Executes a user provided function by first enabling non-blocking I/O on the underlying file descriptor. | |
/// When the user-provided function returns, this functions takes care to return the file descriptor to | |
/// its original state. | |
/// | |
/// This function should only be used when a file descriptor is first opened, such as calling `connect` | |
/// on a socket. Otherwise, it will usually have no practical effect. | |
mutating func withAsyncIO<T>(_ block: (CInt) -> T) -> Result<T, IO.Error> { | |
let previousSetting = fcntl(self.fd, F_GETFL) | |
let newSetting = previousSetting | O_NONBLOCK | |
if previousSetting != newSetting { | |
switch handlePOSIXError({ fcntl(self.fd, F_SETFL, newSetting) }) { | |
case .failure(let e): | |
return .failure(e) | |
default: | |
break | |
} | |
} | |
let value = block(self.fd) | |
let restoreSetting = previousSetting & ~O_NONBLOCK | |
if previousSetting != restoreSetting { | |
switch handlePOSIXError({ fcntl(self.fd, F_SETFL, restoreSetting) }) { | |
case .failure(let e): | |
return .failure(e) | |
default: | |
break | |
} | |
} | |
return .success(value) | |
} | |
/// Duplicates the underlying file descriptor. | |
public func duplicate() -> Result<UnsafeFileDescriptor, IO.Error> { | |
let rawFD = fcntl(self.fd, F_DUPFD, 0) | |
guard rawFD != -1 else { | |
let err = POSIXErrorCode(rawValue: errno)! | |
return .failure(IO.Error(code: err)) | |
} | |
var fd = UnsafeFileDescriptor(fd: rawFD) | |
switch fd.setCloseOnExec() { | |
case .failure(let e): | |
return .failure(e) | |
default: | |
return .success(fd) | |
} | |
} | |
/// Writes the bytes from the given buffer to the file descriptor, returning the number of bytes that | |
/// were successfully written. | |
public func write(buffer: UnsafeBufferPointer<UInt8>) -> Result<Int, IO.Error> { | |
#if canImport(Darwin) | |
let len = Darwin.write(self.fd, buffer.baseAddress.map(UnsafeRawPointer.init), buffer.count) | |
#else | |
let len = Glibc.write(self.fd, buffer.baseAddress.map(UnsafeRawPointer.init), buffer.count) | |
#endif | |
guard len != -1 else { | |
let err = POSIXErrorCode(rawValue: errno)! | |
return .failure(IO.Error(code: err)) | |
} | |
return .success(len) | |
} | |
/// Reads bytes from the given file into the provided buffer, returning the number of bytes that | |
/// were successfully read. | |
public func read(buffer: UnsafeMutableBufferPointer<UInt8>) -> Result<Int, IO.Error> { | |
#if canImport(Darwin) | |
let result = Darwin.read(self.fd, UnsafeMutableRawPointer(buffer.baseAddress!), buffer.count) | |
#else | |
let result = Glibc.read(self.fd, UnsafeMutableRawPointer(buffer.baseAddress!), buffer.count) | |
#endif | |
guard result != -1 else { | |
let err = POSIXErrorCode(rawValue: errno)! | |
return .failure(IO.Error(code: err)) | |
} | |
return .success(result) | |
} | |
/// Closes the underlying file descriptor. | |
/// | |
/// Once the file descriptor has been closed, it is the responsibility of the caller to ensure that no further | |
/// operations on the file descriptor occur. | |
/// | |
/// Note that closing a file descriptor *can* fail. POSIX requires that no further use of the file descriptor | |
/// occur after *any* call to `close`, so it is not clear what the correct course of action to take is in that | |
/// situation. | |
public func close() -> Result<Void, IO.Error> { | |
return handlePOSIXError { | |
#if canImport(Darwin) | |
return Darwin.close(self.fd) | |
#else | |
return Glibc.close(self.fd) | |
#endif | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment