Created
March 2, 2024 21:16
-
-
Save karlbohlmark/0fac5f74ee9de47026a079682946c0ca to your computer and use it in GitHub Desktop.
Zig io_uring multishot recv
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const os = std.os; | |
const socket_t = std.os.socket_t; | |
const io_uring = std.os.linux.IO_Uring; | |
const Allocator = std.mem.Allocator; | |
const fs = std.fs; | |
pub fn main() !void { | |
const MAX_BUFFERS = 10; | |
const BUFFER_SIZE = 2048; | |
const GROUP_ID = 1; | |
const INITIAL_BUFFER_ID = 0; | |
var ring = io_uring.init(256, 0) catch |err| { | |
std.debug.print("Failed to initialize io_uring: {s}\n", .{@errorName(err)}); | |
return; | |
}; | |
// Init an allocator for allocating buffers for io_uring receive operations | |
var arena = std.heap.ArenaAllocator.init(std.heap.page_allocator); | |
const allocator = arena.allocator(); | |
const buffer_mem = try allocator.alloc(u8, BUFFER_SIZE * MAX_BUFFERS); | |
const UserData = enum(u64) { | |
ProvideBuffers = 1, | |
Recv = 2, | |
// Add more as needed | |
}; | |
const provide_buffers_userdata: u64 = @intFromEnum(UserData.ProvideBuffers); | |
const buffer_sqe = try ring.provide_buffers(provide_buffers_userdata, buffer_mem.ptr, BUFFER_SIZE, MAX_BUFFERS, GROUP_ID, INITIAL_BUFFER_ID); | |
_ = buffer_sqe; | |
std.debug.print("Registered buffer group {}\n", .{GROUP_ID}); | |
const submitted_buffers = try ring.submit_and_wait(1); | |
std.debug.print("submitted_buffers {} \n", .{submitted_buffers}); | |
const buffer_cqe = try ring.copy_cqe(); | |
if (buffer_cqe.res < 0) { | |
std.debug.print("buffer_cqe.res = {}\n", .{buffer_cqe.res}); | |
std.process.exit(1); | |
} | |
// Defer the deinit of the io_uring instance | |
defer io_uring.deinit(&ring); | |
const act = std.os.Sigaction{ | |
.handler = .{ .sigaction = @ptrCast(@alignCast(std.os.SIG.DFL)) }, | |
.mask = std.os.empty_sigset, | |
.flags = 0, | |
}; | |
std.os.sigaction(@intCast(std.os.SIG.INT), &act, null) catch {}; | |
const socket = try bindUdpSocket(12345); | |
// Prepare the multishot recv SQE | |
const recv_sqe = try ring.recv(@intFromEnum(UserData.Recv), @intCast(socket), .{ .buffer_selection = .{ .group_id = GROUP_ID, .len = BUFFER_SIZE } }, 0); | |
// _ = recv_sqe; | |
recv_sqe.ioprio |= std.os.linux.IORING_RECV_MULTISHOT; | |
while (true) { | |
// Wait for at least one completion | |
const num_submitted = try ring.submit_and_wait(1); | |
std.debug.print("submitted: {}\n", .{num_submitted}); | |
while (ring.cq_ready() > 0) { | |
var cqe = try ring.copy_cqe(); | |
switch (cqe.user_data) { | |
@intFromEnum(UserData.ProvideBuffers) => { | |
std.debug.print("Received ProvideBuffers CQE\n", .{}); | |
// We don't need to do anything here, the buffers are already provided | |
}, | |
@intFromEnum(UserData.Recv) => { | |
std.debug.print("Received Recv CQE - flags: {}\n", .{cqe.flags}); | |
if (cqe.res < 0) { | |
std.debug.print("Error receiving packet: {}\n", .{cqe.err()}); | |
} else { | |
const bytesRead = cqe.res; | |
std.debug.print("Received {} bytes. error: {}\n", .{ bytesRead, cqe.err() }); | |
} | |
// This line below is what I'm trying to do without | |
// _ = try ring.recv(@intFromEnum(UserData.Recv), @intCast(socket), .{ .buffer_selection = .{ .group_id = GROUP_ID, .len = BUFFER_SIZE } }, 0); | |
}, | |
else => { | |
std.debug.print("Unknown user_data: {} res {}. error {}\n", .{ cqe.user_data, cqe.res, cqe.err() }); | |
}, | |
} | |
// _ = nanosleep(100000000); | |
} | |
} | |
} | |
pub fn bindUdpSocket(port: u16) !usize { | |
const socket = std.os.linux.socket(std.os.linux.AF.INET, std.os.SOCK.DGRAM, 0); | |
const address = std.os.linux.sockaddr.in{ | |
.family = std.os.linux.AF.INET, | |
.port = std.mem.nativeToBig(u16, port), | |
.addr = 0, // Bind to any address | |
}; | |
const ret = std.os.linux.bind(@intCast(socket), @ptrCast(&address), @sizeOf(@TypeOf(address))); | |
std.debug.print("UDP Socket created and bound to port 12345. Ret: {}\n", .{ret}); | |
return socket; | |
} | |
pub fn nanosleep(ns: isize) usize { | |
if (ns < 0) { | |
return 0; | |
} | |
if (ns > 999_999_999) { | |
return std.os.linux.nanosleep(@ptrCast(&std.os.linux.timespec{ .tv_sec = @divFloor(ns, 1_000_000_000), .tv_nsec = @rem(ns, 1_000_000_000) }), null); | |
} | |
const sleep = std.os.linux.timespec{ | |
.tv_sec = 0, | |
.tv_nsec = ns, | |
}; | |
return std.os.linux.nanosleep(@ptrCast(&sleep), null); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment