Skip to content

Instantly share code, notes, and snippets.

@CodaFi
Last active June 3, 2023 05:32
Show Gist options
  • Save CodaFi/7dbd9be4b2a85f098c78b8ae286aa9fd to your computer and use it in GitHub Desktop.
Save CodaFi/7dbd9be4b2a85f098c78b8ae286aa9fd to your computer and use it in GitHub Desktop.
An async sequence of kevents
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#else
#error("unsupported platform")
#endif
/// Core utilities for interacting with IP-based abstractions.
public struct IP {
/// Represents a family of either IPv4 or IPv6 addresses in a single unified
/// type.
public enum AddressFamily: Sendable {
case v4(in_addr)
case v6(in6_addr)
/// Construct an IP address by parsing the given string. On failure, this
/// function returns `nil`.
///
/// - IPv4 addresses are of the form `ddd.ddd.ddd.ddd` where `d` is in the range [0...255]
/// - IPv6 addresses come in many forms:
/// - `x:x:x:x:x:x:x:x` where `x` is up to four hexidecimal digits
/// - An abbreviated format that drops any leading zeroes `::x` where `x` is a hexadecimal digit
/// - A mixed IPv4/IPv6 address of the form `x:x:x:x:x:x:d.d.d.d`.
public init?(parsing: String) {
do {
var a4 = in_addr()
let result = withUnsafeMutableBytes(of: &a4) { buf in
return inet_pton(AF_INET, parsing, buf.baseAddress)
}
guard result != 1 else {
self = .v4(a4)
return
}
}
do {
var a6 = in6_addr()
let result = withUnsafeMutableBytes(of: &a6) { buf in
return inet_pton(AF_INET6, parsing, buf.baseAddress)
}
guard result != 1 else {
self = .v6(a6)
return
}
}
return nil
}
}
/// Constructs an IPv4 address from the given integral address parts.
///
/// - Parameters:
/// - a: The first 8 bits.
/// - b: The second 8 bits.
/// - c: The third 8 bits.
/// - d: The fourth 8 bits.
/// - Returns: An IPv4 address value that represents the combined 32-bit address.
public static func v4(_ a: UInt8, _ b: UInt8, _ c: UInt8, _ d: UInt8) -> IP.AddressFamily {
let addr = (UInt32(a) << 24) | (UInt32(b) << 16) | (UInt32(c) << 8) | UInt32(d)
return .v4(in_addr(s_addr: addr.bigEndian))
}
/// Constructs an IPv6 address from the given integral address parts.
///
/// - Parameters:
/// - a: The first 16 bits.
/// - b: The second 16 bits.
/// - c: The third 16 bits.
/// - d: The fourth 16 bits.
/// - e: The fifth 16 bits.
/// - f: The sixth 16 bits.
/// - g: The seventh 16 bits.
/// - h: The eighth 16 bits.
/// - Returns: An IPv6 address value that represents the combined 128-bit address.
public static func v6(_ a: UInt16, _ b: UInt16, _ c: UInt16, _ d: UInt16, _ e: UInt16, _ f: UInt16, _ g: UInt16, _ h: UInt16) -> IP.AddressFamily {
return .v6(in6_addr(a, b, c, d, e, f, g, h))
}
}
extension IP.AddressFamily: Equatable {
public static func == (lhs: IP.AddressFamily, rhs: IP.AddressFamily) -> Bool {
switch (lhs, rhs) {
case let (.v4(lv4), .v4(rv4)):
return lv4.s_addr == rv4.s_addr
case let (.v6(lv6), .v6(rv6)):
return withUnsafeBytes(of: lv6) { lbuf in
return withUnsafeBytes(of: rv6) { rbuf in
return memcmp(lbuf.baseAddress!, rbuf.baseAddress!, MemoryLayout<in6_addr>.size) == 0
}
}
default:
return false
}
}
}
extension IP.AddressFamily: Hashable {
public func hash(into hasher: inout Hasher) {
switch self {
case .v4(let addr):
hasher.combine(0)
addr.s_addr.hash(into: &hasher)
case .v6(let addr):
hasher.combine(1)
return withUnsafeBytes(of: addr) { rbuf in
let (lo, hi) = rbuf.baseAddress!.load(as: (UInt64, UInt64).self)
lo.hash(into: &hasher)
hi.hash(into: &hasher)
}
}
}
}
extension IP.AddressFamily: CustomStringConvertible {
public var description: String {
switch self {
case .v4(let v4Addr):
var buffer = [CChar](repeating: 0, count: Int(INET_ADDRSTRLEN) + 1)
return buffer.withUnsafeMutableBufferPointer { descBuf in
return withUnsafeBytes(of: v4Addr) { (addrBuf: UnsafeRawBufferPointer) -> String in
guard inet_ntop(AF_INET, addrBuf.baseAddress, descBuf.baseAddress, socklen_t(INET_ADDRSTRLEN)) != nil else {
return ""
}
return String(cString: descBuf.baseAddress!)
}
}
case .v6(let v6Addr):
var buffer = [CChar](repeating: 0, count: Int(INET6_ADDRSTRLEN) + 1)
return buffer.withUnsafeMutableBufferPointer { descBuf in
return withUnsafeBytes(of: v6Addr) { (addrBuf: UnsafeRawBufferPointer) -> String in
guard inet_ntop(AF_INET6, addrBuf.baseAddress, descBuf.baseAddress, socklen_t(INET6_ADDRSTRLEN)) != nil else {
return ""
}
return String(cString: descBuf.baseAddress!)
}
}
}
}
}
extension in6_addr {
fileprivate init(_ a: UInt16, _ b: UInt16, _ c: UInt16, _ d: UInt16, _ e: UInt16, _ f: UInt16, _ g: UInt16, _ h: UInt16) {
self.init()
memcpy(&self, [
UInt8(a >> 8),
UInt8(a),
UInt8(b >> 8),
UInt8(b),
UInt8(c >> 8),
UInt8(c),
UInt8(d >> 8),
UInt8(d),
UInt8(e >> 8),
UInt8(e),
UInt8(f >> 8),
UInt8(f),
UInt8(g >> 8),
UInt8(g),
UInt8(h >> 8),
UInt8(h),
] as [UInt8], 16)
}
}
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#else
#error("unsupported platform")
#endif
public enum IO {
public struct Error: Swift.Error, CustomDebugStringConvertible {
public var code: POSIXErrorCode
public static func currentErrorCode() -> Error {
return IO.Error(code: POSIXErrorCode(rawValue: errno)!)
}
public var localizedDescription: String {
return "SocketToMe.IO.Error(code: \(self.code.rawValue) -> \(String(validatingUTF8: strerror(self.code.rawValue))!))"
}
public var debugDescription: String {
return "SocketToMe.IO.Error(code: \(self.code.rawValue) -> \(String(validatingUTF8: strerror(self.code.rawValue))!))"
}
}
}
extension IO.Error: Equatable {
public static func == (lhs: IO.Error, rhs: IO.Error) -> Bool {
return lhs.code == rhs.code
}
}
extension IO.Error {
public static let outOfMemory = IO.Error(code: .ENOMEM)
public static let wouldBlock = IO.Error(code: .EWOULDBLOCK)
public static let tryAgain = IO.Error(code: .EAGAIN)
public static let genericIO = IO.Error(code: .EIO)
public static let interrupted = IO.Error(code: .EINTR)
}
/// A type that supports extracting a raw socket value.
public protocol SocketStream {
/// The underlying raw socket value.
var socket: Socket { get }
}
extension SocketStream {
public func shutdown(_ ends: Socket.Shutdown) -> Result<Void, IO.Error> {
return self.socket.shutdown(ends)
}
}
/// A type that supports reading bytes from an underlying source into a provided
/// bounding buffer.
///
/// Implementing ``ReadableStream`` requires only that synchronous access to the
/// underlying resource be possible. The provided bounding buffer provides a maximum for
/// the number of bytes the client is willing to accept at a time. A conforming
/// implementation may read any number of bytes less than `buffer.count`.
///
/// Types implementing ``ReadableStream`` compose well to form stacks that can
/// interpret I/O operations in various ways. You might, for example, compose
/// a ``ReadableStream`` that implements a security protocol with one that reads
/// from an underlying socket.
public protocol ReadableStream {
/// Pull some bytes from this source into the given buffer, returning how many
/// bytes were read.
///
/// The details of how the reads are performed are entirely
/// implementation-defined. A conforming implementation may be synchronous or
/// asynchronous, and may throw an error if blocking I/O occurs unexpectedly.
///
/// A conforming implementation may similarly not rely on the contents of
/// the buffer having a certain shape or size. A zero-sized buffer is one
/// possible input for which many valid behaviors are acceptable. Similarly,
/// reading a stream that has reached an EOF-like state may or may not throw
/// or simply return 0 bytes read.
///
/// - Returns: The number of bytes read into the given `buffer`.
mutating func read(into buffer: UnsafeMutableBufferPointer<UInt8>) throws -> Int
}
/// A type that supports writing bytes from a source buffer into a set
/// destination.
///
/// Implementing ``WritableStream`` requires only that synchronous access to the
/// underlying resource be possible. The provided buffer provides a maximum
/// for the number of bytes the client is requesting be written at a time. A
/// conforming implementation may write any number of bytes less than
/// `buffer.count`.
///
/// The details of how the writes are performed are entirely
/// implementation-defined. A conforming implementation may be synchronous or
/// asynchronous, and may throw an error if blocking I/O occurs unexpectedly.
///
/// A conforming implementation may similarly not rely on the contents of
/// the buffer having a certain shape or size. A zero-sized buffer is one
/// possible input for which many valid behaviors are acceptable. Similarly,
/// writing to a stream that has reached an EOF-like state may or may not throw
/// or simply return 0 bytes read.
///
/// Types implementing ``WritableStream`` compose well to form stacks that can
/// interpret I/O operations in various ways. You might, for example, compose
/// a ``WritableStream`` that implements an encryption layer with one that
/// writes to an underlying socket.
public protocol WritableStream {
/// Writes some bytes from this source into the given buffer, returning how many
/// bytes were written.
///
/// The details of how the writes are performed are entirely
/// implementation-defined. A conforming implementation may be synchronous or
/// asynchronous, and may throw an error if blocking I/O occurs unexpectedly.
///
/// A conforming implementation may similarly not rely on the contents of
/// the buffer having a certain shape or size. A zero-sized buffer is one
/// possible input for which many valid behaviors are acceptable. Similarly,
/// writing to a stream that has reached an EOF-like state may or may not throw
/// or simply return 0 bytes written.
///
/// - Returns: The number of bytes written from the given `buffer`.
mutating func write(contentsOf buffer: UnsafeBufferPointer<UInt8>) throws -> Int
}
func handlePOSIXError(_ fn: () -> Int32) -> Result<(), IO.Error> {
let res = fn()
guard res != 0 else {
return .success(())
}
let err = POSIXErrorCode(rawValue: errno)!
return .failure(IO.Error(code: err))
}
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#else
#error("unsupported platform")
#endif
/// A `Socket` represents a UNIX Socket - an abstraction for opening a communication channel across
/// a specified domain.
public struct Socket: Sendable {
public enum AddressFamily {
case v4(sockaddr_in)
case v6(sockaddr_in6)
public init(address: IP.AddressFamily, port: UInt16) {
switch address {
case .v4(let a):
self = .v4(sockaddr_in(address: a, port: port))
case .v6(let a):
self = .v6(sockaddr_in6(address: a, port: port, flowInfo: 0, scopeID: 0))
}
}
public init(unsafeUninitializedAddress block: (UnsafeMutablePointer<sockaddr>?, inout socklen_t) -> Void) {
var storage = sockaddr_storage()
var len = socklen_t(MemoryLayout<sockaddr_storage>.size)
storage.withRawAddress { addr, _ in
block(addr, &len)
}
let family = storage.ss_family
self = storage.withRawAddress { addr, _ in
switch family {
case sa_family_t(AF_INET):
assert(Int(len) >= MemoryLayout<sockaddr_in>.size)
return addr!.withMemoryRebound(to: sockaddr_in.self, capacity: 1) { ptr in
return .v4(ptr.pointee)
}
case sa_family_t(AF_INET6):
assert(Int(len) >= MemoryLayout<sockaddr_in6>.size)
return addr!.withMemoryRebound(to: sockaddr_in6.self, capacity: 1) { ptr in
return .v6(ptr.pointee)
}
default:
fatalError()
}
}
}
func withRawAddress<T>(_ block: (UnsafePointer<sockaddr>?, inout socklen_t) throws -> T) rethrows -> T {
switch self {
case .v4(let rawAddr):
return try withUnsafeBytes(of: rawAddr) { (bvptr) -> T in
var len = socklen_t(MemoryLayout<sockaddr_in>.size(ofValue: rawAddr))
return try block(bvptr.bindMemory(to: sockaddr.self).baseAddress, &len)
}
case .v6(let rawAddr):
return try withUnsafeBytes(of: rawAddr) { (bvptr) -> T in
var len = socklen_t(MemoryLayout<sockaddr_in6>.size(ofValue: rawAddr))
return try block(bvptr.bindMemory(to: sockaddr.self).baseAddress, &len)
}
}
}
public var address: IP.AddressFamily {
switch self {
case .v4(let rawAddr):
return IP.AddressFamily.v4(rawAddr.sin_addr)
case .v6(let rawAddr):
return IP.AddressFamily.v6(rawAddr.sin6_addr)
}
}
public var port: UInt16 {
switch self {
case .v4(let rawAddr):
return rawAddr.sin_port.byteSwapped
case .v6(let rawAddr):
return rawAddr.sin6_port.byteSwapped
}
}
}
public var fd: UnsafeFileDescriptor
private init(fd: UnsafeFileDescriptor) {
self.fd = fd
}
init(family: CInt, type: CInt) throws {
self.fd = UnsafeFileDescriptor(fd: socket(family, type, 0))
_ = try self.fd.setCloseOnExec().get()
#if canImport(Darwin)
var enable = 1
_ = try handlePOSIXError({
setsockopt(self.fd.fd, SOL_SOCKET, SO_NOSIGPIPE, &enable, socklen_t(MemoryLayout<CInt>.size))
}).get()
#endif
}
public init(address: Socket.AddressFamily, type: CInt) throws {
switch address {
case .v4(_):
self = try Socket(family: AF_INET, type: type)
case .v6(_):
self = try Socket(family: AF_INET6, type: type)
}
}
}
extension Socket {
/// Connect this socket to the given address.
public mutating func connect(to address: Socket.AddressFamily) -> Result<Void, IO.Error> {
// FIXME: Handle EINPROGRESS
return self.fd.withAsyncIO { rawFD in
return address.withRawAddress { (addrp, len) -> Int32 in
#if canImport(Darwin)
return Darwin.connect(rawFD, addrp, len)
#else
return Glibc.connect(rawFD, addrp, len)
#endif
}
}.map({ _ in () })
}
mutating func accept(storage: UnsafeMutablePointer<sockaddr>?, length: inout socklen_t) -> Result<Socket, IO.Error> {
#if canImport(Darwin)
let rawFD = Darwin.accept(self.fd.fd, storage, &length)
#else
let rawFD = Glibc.accept(self.fd.fd, storage, &length)
#endif
guard rawFD != -1 else {
let err = POSIXErrorCode(rawValue: errno)!
return .failure(IO.Error(code: err))
}
var fd = UnsafeFileDescriptor(fd: rawFD)
switch fd.setCloseOnExec() {
case .failure(let e):
return .failure(e)
default:
return .success(Socket(fd: fd))
}
}
/// Duplicates this socket.
public func duplicate() -> Result<Socket, IO.Error> {
return self.fd.duplicate().map(Socket.init(fd:))
}
func recieve(buffer: UnsafeMutableBufferPointer<UInt8>, flags: CInt) -> Result<Int, IO.Error> {
let result = recv(self.fd.fd, UnsafeMutableRawPointer(buffer.baseAddress!), buffer.count, flags)
return .success(result)
}
/// Reads data from the socket into the given buffer.
public func read(buffer: UnsafeMutableBufferPointer<UInt8>) -> Result<Int, IO.Error> {
return self.recieve(buffer: buffer, flags: 0)
}
/// Writes data from the given buffer into the socket.
public func write(buffer: UnsafeBufferPointer<UInt8>) -> Result<Int, IO.Error> {
return self.fd.write(buffer: buffer)
}
/// Peeks at data from the socket, writing it into the given buffer without actually removing the data from
/// the underlying queue maintained by the operating system. This ensures that subsequent peeks and
/// reads will recieve the same data.
public func peek(buffer: UnsafeMutableBufferPointer<UInt8>) -> Result<Int, IO.Error> {
return self.recieve(buffer: buffer, flags: MSG_PEEK | MSG_DONTWAIT)
}
/// Close the socket connection.
public func close() -> Result<Void, IO.Error> {
return self.fd.close()
}
public struct Shutdown: OptionSet, Sendable {
public let rawValue: UInt8
public init(rawValue: UInt8) { self.rawValue = rawValue }
public static let read = Shutdown(rawValue: 1 << 0)
public static let write = Shutdown(rawValue: 1 << 1)
}
public func shutdown(_ ends: Shutdown) -> Result<Void, IO.Error> {
precondition(ends.rawValue != 0)
let how: Int32
switch ends {
case [.read, .write]:
how = SHUT_RDWR
case .read:
how = SHUT_RD
case .write:
how = SHUT_WR
default:
fatalError()
}
return handlePOSIXError {
#if canImport(Darwin)
return Darwin.shutdown(self.fd.fd, how)
#else
return Glibc.shutdown(self.fd.fd, how)
#endif
}
}
}
extension sockaddr_in {
init(address: in_addr, port: UInt16) {
self = sockaddr_in()
bzero(&self, MemoryLayout<sockaddr_in>.size)
self.sin_family = sa_family_t(AF_INET)
self.sin_addr = address
self.sin_port = port.bigEndian
}
}
extension sockaddr_in6 {
init(address: in6_addr, port: UInt16, flowInfo: UInt32, scopeID: UInt32) {
self = sockaddr_in6()
bzero(&self, MemoryLayout<sockaddr_in6>.size)
self.sin6_family = sa_family_t(AF_INET6)
self.sin6_addr = address
self.sin6_port = port.bigEndian
self.sin6_flowinfo = flowInfo
self.sin6_scope_id = scopeID
}
}
extension sockaddr_un {
static func atPath<T>(_ bound: String, _ f: (inout sockaddr_un, socklen_t) throws -> T) throws -> T {
var addr = sockaddr_un()
addr.sun_family = sa_family_t(AF_UNIX)
guard (bound.count + 1) < sockaddr_un.sunPathLength else {
throw IO.Error(code: .ENAMETOOLONG)
}
withUnsafeMutablePointer(to: &addr.sun_path) { pathPointer -> Void in
return pathPointer.withMemoryRebound(to: CChar.self, capacity: sockaddr_un.sunPathLength) { path in
bound.withCString { byteBuf -> Void in
memcpy(path, byteBuf, bound.count)
}
assert(path[bound.count] == 0)
}
}
var len = socklen_t(sockaddr_un.offsetOfSunPath + bound.count)
if !bound.isEmpty {
len += 1
}
return try f(&addr, len)
}
mutating func withRawAddress<T>(_ block: (UnsafeMutablePointer<sockaddr>?, socklen_t) throws -> T) rethrows -> T {
return try withUnsafeMutablePointer(to: &self) { (bvptr) -> T in
return try bvptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { bvptr in
let len = socklen_t(MemoryLayout<sockaddr_un>.size)
return try block(bvptr, len)
}
}
}
}
private protocol _SockSunKeyPathHelper {
associatedtype SunPathType
var sun_path: SunPathType { get }
}
extension sockaddr_un: _SockSunKeyPathHelper {
static var offsetOfSunPath: Int {
return MemoryLayout<sockaddr_un>.offset(of: \.sun_path as KeyPath<sockaddr_un, sockaddr_un.SunPathType>).unsafelyUnwrapped
}
static var sunPathLength: Int {
return MemoryLayout<sockaddr_un.SunPathType>.size / MemoryLayout<CChar>.size
}
}
extension sockaddr_storage {
mutating func withRawAddress<T>(_ block: (UnsafeMutablePointer<sockaddr>?, socklen_t) throws -> T) rethrows -> T {
return try withUnsafeMutablePointer(to: &self) { (bvptr) -> T in
return try bvptr.withMemoryRebound(to: sockaddr.self, capacity: 1) { bvptr in
let len = socklen_t(MemoryLayout<sockaddr_storage>.size)
return try block(bvptr, len)
}
}
}
}
import Darwin
/// A socket source represents a stream of events about the readability or
/// writability of a socket.
///
/// These sources can be plugged into a ``KEvent/Registry`` to enable the
/// aggregate delivery of events back to your application.
///
/// Tags
/// ====
///
/// When multiple sources are plugged into the same registry, tags are used to
/// disambiguate the source of the event. To create a tag, use a raw
/// representable enum. For example, a socket source that observes the read and
/// write end of a single socket looks like:
///
/// enum Tag: UInt64 {
/// case read
/// case write
/// }
///
/// Then you can create a registry and an async event consumer
///
/// var registry = KEvent.Registry<Tag>()
/// var readSource = SocketSource<Tag>(read: stream)
/// var writeSource = SocketSource<Tag>(write: stream)
///
/// registry.register(&self.readSource, tag: .read)
/// registry.register(&self.writeSource, tag: .write)
///
/// let registryTask = Task { [registry] in
/// for try await (tag, _) in registry {
/// switch tag {
/// case .read:
/// try await flushRead()
/// case .write:
/// try await flushWrite()
/// }
/// }
/// }
///
/// When you're done with the event loop, cancel the registry task
///
/// registryTask.cancel()
///
/// And finally cancel the registry to clean up the underlying kqueue.
///
/// registry.cancel()
public struct SocketSource<Tag: RawRepresentable>
where
Tag.RawValue == UInt64
{
private enum Interest {
case read
case write
var asFilter: KEvent.Filter {
switch self {
case .read:
return .read
case .write:
return .write
}
}
}
var socket: Socket
private var registeredQueue: UnsafeFileDescriptor?
private var registeredToken: Tag?
private var interest: Interest
private var resumeCount: Int
public init(read stream: ReadableStream & SocketStream) {
self.socket = stream.socket
self.registeredQueue = nil
self.registeredToken = nil
self.interest = .read
self.resumeCount = 1
}
public init(write stream: WritableStream & SocketStream) {
self.socket = stream.socket
self.registeredQueue = nil
self.registeredToken = nil
self.interest = .write
self.resumeCount = 1
}
/// Suspend the socket source.
///
// By suspending a socket source, your application can temporarily prevent
// the execution of any tasks associated with that object. The suspension
// occurs after completion of any tasks running at the time of the call.
//
// Calling this function increments the suspension count of the source, and
// calling ``SocketSource/resume()`` decrements it. While the count is
// greater than zero, the socket source remains suspended, so you must
// balance each ``SocketSource/suspend()`` call with a matching
// ``SocketSource/resume()`` call.
public mutating func suspend() -> Result<(), IO.Error> {
guard let kq = self.registeredQueue else {
return .success(())
}
self.resumeCount -= 1
guard self.resumeCount == 0 else {
return .success(())
}
let flags: KEvent.Flags = [ .delete, .receipt ]
var changes = [
kevent64_s(id: socket.fd.fd, filter: self.interest.asFilter, flags: flags, token: 0),
]
return keventRegister(kq, changes: &changes)
}
/// Resume the socket source.
///
/// Calling this function decrements the suspension count of a suspended
/// socket source object. While the count is greater than zero, the object
/// remains suspended. When the suspension count returns to zero, any tasks
/// submitted to the socket source while suspended are delivered.
public mutating func resume() -> Result<(), IO.Error> {
guard let kq = self.registeredQueue, let tag = self.registeredToken else {
return .success(())
}
self.resumeCount += 1
guard self.resumeCount == 1 else {
return .success(())
}
let flags: KEvent.Flags = [ .clear, .receipt ]
var changes = [
kevent64_s(id: socket.fd.fd,
filter: self.interest.asFilter,
flags: flags,
token: tag.rawValue),
]
return keventRegister(kq, changes: &changes)
}
fileprivate mutating func register(in queue: UnsafeFileDescriptor, tag: Tag) {
self.registeredQueue = queue
self.registeredToken = tag
switch interest {
case .read:
var changes = [
kevent64_s(id: self.socket.fd.fd,
filter: .read,
flags: [ .clear, .receipt, .add ],
token: tag.rawValue)
]
_ = keventRegister(queue, changes: &changes)
case .write:
var changes = [
kevent64_s(id: self.socket.fd.fd,
filter: .write,
flags: [ .clear, .receipt, .add ],
token: tag.rawValue)
]
_ = keventRegister(queue, changes: &changes)
}
}
}
private func keventRegister(_ kq: UnsafeFileDescriptor, changes: inout [kevent64_s]) -> Result<(), IO.Error> {
return handlePOSIXError {
return changes.withUnsafeMutableBufferPointer { buf in
return kevent64(kq.fd,
UnsafePointer(buf.baseAddress!), Int32(buf.count),
buf.baseAddress!, Int32(buf.count),
0, nil)
}
}
}
extension kevent64_s {
fileprivate init(id: CInt, filter: KEvent.Filter, flags: KEvent.Flags, token: UInt64) {
self = kevent64_s(ident: UInt64(id), filter: filter.rawValue, flags: flags.rawValue, fflags: 0, data: 0, udata: token, ext: (0, 0))
}
}
public enum KEvent {
public struct Registry<Tag: RawRepresentable>
where
Tag.RawValue == UInt64
{
private var queue: UnsafeFileDescriptor
private var eventBuffer: [kevent64_s]
private var cursor: Int
public init() {
self.queue = UnsafeFileDescriptor(fd: kqueue())
_ = self.queue.setCloseOnExec()
self.cursor = 0
self.eventBuffer = []
self.eventBuffer.reserveCapacity(128)
}
public mutating func register(_ source: inout SocketSource<Tag>, tag: Tag) {
source.register(in: queue, tag: tag)
}
}
}
extension KEvent {
public struct Filter: RawRepresentable, Sendable {
public var rawValue: Int16
public init(rawValue: Int16) {
self.rawValue = rawValue
}
public static let zero = Filter(rawValue: 0)
public static let read = Filter(rawValue: -1)
public static let write = Filter(rawValue: -2)
}
public struct Flags: OptionSet, Sendable {
public var rawValue: UInt16
public init(rawValue: UInt16) {
self.rawValue = rawValue
}
/// Add event to kq (implies enable)
public static let add = Flags(rawValue: 0x0001)
/// Delete event from kq
public static let delete = Flags(rawValue: 0x0002)
/// Enable event
public static let enable = Flags(rawValue: 0x0004)
/// Disable event (not reported)
public static let disable = Flags(rawValue: 0x0008)
/// Only report one occurrence
public static let oneshot = Flags(rawValue: 0x0010)
/// Clear event state after reporting
public static let clear = Flags(rawValue: 0x0020)
/// Force immediate event output
public static let receipt = Flags(rawValue: 0x0040)
}
}
extension KEvent.Registry {
/// Close the registry for event processing and deallocate
/// its resources.
///
/// Once the registry has been cancelled, it is considered in
/// an invalid state. Any further attempts to register socket
/// sources will result in an error. Instead, a new registry
/// should be created.
public __consuming func cancel() {
if case let .failure(err) = self.queue.close() {
fatalError("Could not close kqueue for registry: \(err)")
}
}
}
extension KEvent.Registry: Sendable where Tag: Sendable {}
extension KEvent.Registry: AsyncIteratorProtocol, AsyncSequence {
public func makeAsyncIterator() -> Self {
return self
}
public mutating func next() async throws -> (Tag, kevent64_s)? {
if self.cursor < self.eventBuffer.count {
let event = self.eventBuffer[self.cursor]
self.cursor += 1
return (Tag(rawValue: event.udata)!, event)
}
return try await reloadEvents()
}
private mutating func reloadEvents() async throws -> (Tag, kevent64_s)? {
try Task.checkCancellation()
self.eventBuffer.removeAll(keepingCapacity: true)
self.cursor = 0
// Opportunistically try to fill the event buffer.
kevent64(self.queue.fd,
nil, 0,
&self.eventBuffer, Int32(self.eventBuffer.capacity),
0, nil)
// So there weren't any events.
while self.eventBuffer.isEmpty {
// Let's see if another task can make progress instead.
await Task.yield()
// Make sure we weren't cancelled in the mean time.
try Task.checkCancellation()
// Opportunistically try to fill the event buffer.
kevent64(self.queue.fd,
nil, 0,
&self.eventBuffer, Int32(self.eventBuffer.capacity),
0, nil)
}
// We're full up again.
return try await self.next()
}
}
#if canImport(Darwin)
import Darwin
#elseif canImport(Glibc)
import Glibc
#else
#error("unsupported platform")
#endif
/// ``UnsafeFileDescriptor`` is an extremely low-level wrapper around a raw POSIX file
/// descriptor. It provides a number of higher-level operations that abstract the corresponding POSIX routines
/// but it makes no attempt to try to guard against misuse of the API surface. In particular, this abstraction
/// does not support automatically closing the underlying file descriptor when it is no longer in use.
/// In general, the validity of the underlying file descriptor is not guaranteed, i.e. it is possible for
/// other low-level routines and system calls to interfere with the state of the underlying file descriptor,
/// especially when the underlying file descriptor is escaped to routines that are outside of the control of
/// the SocketToMe library.
///
/// A file descriptor represents a low-level connection between a file controlled by the operating system and
/// a userspace application. POSIX affords three file descriptors often collectively referred to as the
/// "standard streams":`.standardInput`, `.standardOutput`, and `.standardError`.
public struct UnsafeFileDescriptor: Sendable {
public let fd: CInt
public init(fd: CInt) {
self.fd = fd
}
public static let standardInput = UnsafeFileDescriptor(fd: STDIN_FILENO)
public static let standardOutput = UnsafeFileDescriptor(fd: STDOUT_FILENO)
public static let standardError = UnsafeFileDescriptor(fd: STDERR_FILENO)
/// Sets the "close on exec" bit on the underlying file descriptor.
///
/// When a userspace process forks, the child inherits all open file descriptors from its parent. When a
/// function in the `exec` family is called, those file descriptors can potentially leak if the child forgets to
/// close them.
mutating func setCloseOnExec() -> Result<(), IO.Error> {
let previousSetting = fcntl(self.fd, F_GETFD)
let newSetting = previousSetting | FD_CLOEXEC
if previousSetting != newSetting {
return handlePOSIXError { fcntl(self.fd, F_SETFD, newSetting) }
}
return .success(())
}
/// Executes a user provided function by first enabling non-blocking I/O on the underlying file descriptor.
/// When the user-provided function returns, this functions takes care to return the file descriptor to
/// its original state.
///
/// This function should only be used when a file descriptor is first opened, such as calling `connect`
/// on a socket. Otherwise, it will usually have no practical effect.
mutating func withAsyncIO<T>(_ block: (CInt) -> T) -> Result<T, IO.Error> {
let previousSetting = fcntl(self.fd, F_GETFL)
let newSetting = previousSetting | O_NONBLOCK
if previousSetting != newSetting {
switch handlePOSIXError({ fcntl(self.fd, F_SETFL, newSetting) }) {
case .failure(let e):
return .failure(e)
default:
break
}
}
let value = block(self.fd)
let restoreSetting = previousSetting & ~O_NONBLOCK
if previousSetting != restoreSetting {
switch handlePOSIXError({ fcntl(self.fd, F_SETFL, restoreSetting) }) {
case .failure(let e):
return .failure(e)
default:
break
}
}
return .success(value)
}
/// Duplicates the underlying file descriptor.
public func duplicate() -> Result<UnsafeFileDescriptor, IO.Error> {
let rawFD = fcntl(self.fd, F_DUPFD, 0)
guard rawFD != -1 else {
let err = POSIXErrorCode(rawValue: errno)!
return .failure(IO.Error(code: err))
}
var fd = UnsafeFileDescriptor(fd: rawFD)
switch fd.setCloseOnExec() {
case .failure(let e):
return .failure(e)
default:
return .success(fd)
}
}
/// Writes the bytes from the given buffer to the file descriptor, returning the number of bytes that
/// were successfully written.
public func write(buffer: UnsafeBufferPointer<UInt8>) -> Result<Int, IO.Error> {
#if canImport(Darwin)
let len = Darwin.write(self.fd, buffer.baseAddress.map(UnsafeRawPointer.init), buffer.count)
#else
let len = Glibc.write(self.fd, buffer.baseAddress.map(UnsafeRawPointer.init), buffer.count)
#endif
guard len != -1 else {
let err = POSIXErrorCode(rawValue: errno)!
return .failure(IO.Error(code: err))
}
return .success(len)
}
/// Reads bytes from the given file into the provided buffer, returning the number of bytes that
/// were successfully read.
public func read(buffer: UnsafeMutableBufferPointer<UInt8>) -> Result<Int, IO.Error> {
#if canImport(Darwin)
let result = Darwin.read(self.fd, UnsafeMutableRawPointer(buffer.baseAddress!), buffer.count)
#else
let result = Glibc.read(self.fd, UnsafeMutableRawPointer(buffer.baseAddress!), buffer.count)
#endif
guard result != -1 else {
let err = POSIXErrorCode(rawValue: errno)!
return .failure(IO.Error(code: err))
}
return .success(result)
}
/// Closes the underlying file descriptor.
///
/// Once the file descriptor has been closed, it is the responsibility of the caller to ensure that no further
/// operations on the file descriptor occur.
///
/// Note that closing a file descriptor *can* fail. POSIX requires that no further use of the file descriptor
/// occur after *any* call to `close`, so it is not clear what the correct course of action to take is in that
/// situation.
public func close() -> Result<Void, IO.Error> {
return handlePOSIXError {
#if canImport(Darwin)
return Darwin.close(self.fd)
#else
return Glibc.close(self.fd)
#endif
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment