Last active
May 7, 2025 13:32
-
-
Save pmbanugo/4408bfc8b8a3e3be78e031510a2456b6 to your computer and use it in GitHub Desktop.
non-blocking HTTP with libxev event-loop / async I/O
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! By convention, root.zig is the root source file when making a library. If | |
//! you are making an executable, the convention is to delete this file and | |
//! start with main.zig instead. | |
const std = @import("std"); | |
const xev = @import("xev"); | |
const testing = std.testing; | |
fn onConnectionClosed( | |
_: ?*void, | |
_: *xev.Loop, | |
_: *xev.Completion, | |
tcp: xev.TCP, | |
_: xev.CloseError!void, | |
) xev.CallbackAction { | |
std.debug.print("connection closed! {any} \n", .{@TypeOf(tcp)}); | |
// var next_completion: xev.Completion = undefined; | |
// tcp.accept(loop, c, xev.TCP, @constCast(&tcp), onAcceptConnection); | |
return .disarm; | |
} | |
fn onWriteComplete( | |
_: ?*void, | |
loop: *xev.Loop, | |
c: *xev.Completion, | |
tcp: xev.TCP, | |
_: xev.WriteBuffer, | |
result: xev.WriteError!usize, | |
) xev.CallbackAction { | |
std.debug.print("Write complete!\n", .{}); | |
// _ = tcp | |
// _ = loop; | |
// _ = c; | |
_ = result catch unreachable; | |
// var close_completion: xev.Completion = undefined; | |
tcp.close(loop, c, void, null, onConnectionClosed); | |
return .disarm; | |
} | |
fn onAcceptConnection(userdata: ?*void, ev_loop: *xev.Loop, completion_ref: *xev.Completion, result: xev.AcceptError!xev.TCP) xev.CallbackAction { | |
_ = userdata; | |
// _ = completion_ref; | |
std.debug.print("Accepted connection!\n", .{}); | |
std.debug.print("Completion state: {any}\n", .{completion_ref.state()}); | |
const conn = result catch |err| { | |
std.debug.print("Error accepting connection: {}\n", .{err}); | |
return .disarm; | |
}; | |
// Simple HTTP response | |
const response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!"; | |
// var write_completion: xev.Completion = undefined; | |
conn.write(ev_loop, completion_ref, .{ .slice = response }, void, null, onWriteComplete); | |
// accept more connections | |
var next_completion: xev.Completion = undefined; | |
conn.accept(ev_loop, &next_completion, void, null, onAcceptConnection); | |
return .disarm; | |
} | |
pub fn server() !void { | |
var loop = try xev.Loop.init(.{}); | |
defer loop.deinit(); | |
const address = try std.net.Address.resolveIp("127.0.0.1", 4221); | |
const watcher_tcp = try xev.TCP.init(address); | |
// should it be close() or shutdown()? | |
// defer watcher_tcp.close(&loop, &finalise_completion, void, null, stopServer); | |
try watcher_tcp.bind(address); | |
try watcher_tcp.listen(128); //number should be configurable | |
std.debug.print("Listening on {d}\n", .{address.getPort()}); | |
var completion: xev.Completion = undefined; | |
watcher_tcp.accept(&loop, &completion, void, null, onAcceptConnection); | |
std.debug.print("Accepting connections...\n", .{}); | |
try loop.run(.until_done); | |
} | |
fn stopServer( | |
userdata: ?*void, | |
loop: *xev.Loop, | |
c: *xev.Completion, | |
tcp_stream: xev.TCP, | |
result: xev.CloseError!void, | |
) xev.CallbackAction { | |
_ = userdata; | |
_ = loop; | |
_ = c; | |
_ = tcp_stream; | |
_ = result catch |err| { | |
std.debug.print("Error closing server: {}\n", .{err}); | |
return .disarm; | |
}; | |
std.debug.print("Server shutdown!\n", .{}); | |
return .disarm; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! By convention, main.zig is where your main function lives in the case that | |
//! you are building an executable. If you are making a library, the convention | |
//! is to delete this file and start with root.zig instead. | |
const xev = @import("xev"); | |
const pita = @import("pita_lib"); | |
const std = @import("std"); | |
const lib = @import("pita_lib"); | |
pub fn main() !void { | |
try pita.server(); | |
std.debug.print("Server started.... MAIN!.\n", .{}); | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const std = @import("std"); | |
const xev = @import("xev"); | |
const testing = std.testing; // Not used, but fine to keep | |
// Global allocator for simplicity in this example. | |
// In a real app, manage this more carefully (e.g., pass it around). | |
var gpa = std.heap.GeneralPurposeAllocator(.{}){}; | |
const allocator = gpa.allocator(); | |
// Context for the listening server | |
const ServerContext = struct { | |
listener_tcp: xev.TCP, | |
// Completion for the listener's accept operations. | |
// Its memory is stable as part of ServerContext on server()'s stack. | |
accept_completion: xev.Completion = undefined, | |
loop: *xev.Loop, | |
}; | |
// Context for each connected client | |
const ClientContext = struct { | |
client_tcp: xev.TCP, | |
write_completion: xev.Completion = undefined, | |
close_completion: xev.Completion = undefined, | |
server_ctx_ptr: *ServerContext, // To access loop, listener if needed (not used here for listener) | |
// Buffer for the response, so it lives as long as the context | |
// This is a simple way; for dynamic responses, you'd handle memory differently. | |
// For this static response, it's fine to have it copied or const. | |
// For simplicity, we'll keep the response const static. | |
pub fn create( | |
alloc: std.mem.Allocator, | |
client_tcp: xev.TCP, | |
server_ctx_ptr: *ServerContext, | |
) !*ClientContext { | |
const self = try alloc.create(ClientContext); | |
self.* = ClientContext{ | |
.client_tcp = client_tcp, | |
.server_ctx_ptr = server_ctx_ptr, | |
}; | |
// std.debug.print("ClientContext created for {any}\n", .{client_tcp}); | |
return self; | |
} | |
pub fn destroy(self: *ClientContext, alloc: std.mem.Allocator) void { | |
// std.debug.print("ClientContext destroying for {any}\n", .{self.client_tcp}); | |
alloc.destroy(self); | |
} | |
}; | |
// Dummy callback for fire-and-forget close attempts on error paths | |
fn fnNoopClose(_: ?*void, _: *xev.Loop, _: *xev.Completion, _: xev.TCP, _: xev.CloseError!void) xev.CallbackAction { | |
return .disarm; | |
} | |
fn onConnectionClosed( | |
userdata: ?*ClientContext, // Will be *ClientContext | |
_: *xev.Loop, | |
_: *xev.Completion, // This is &client_ctx.close_completion | |
closed_tcp: xev.TCP, | |
result: xev.CloseError!void, | |
) xev.CallbackAction { | |
// const client_ctx: *ClientContext = @ptrCast(@alignCast(userdata.?)); | |
const client_ctx: *ClientContext = userdata.?; | |
_ = closed_tcp; | |
// std.debug.print("Connection closed for client {any} (original handle: {any})\n", .{ client_ctx.client_tcp, closed_tcp }); | |
// if (result) |_| { | |
// // This block is for the success case (result is 'void'). | |
// // We use |_| because 'void' has no value to capture. | |
// } else |err| { | |
// // This block is for the error case. | |
// // 'err' will be the actual error value from the xev.CloseError set. | |
// std.debug.print("Error during close for {any}: {any}\n", .{ client_ctx.client_tcp, err }); | |
// } | |
_ = result catch |err| { | |
std.debug.print("Error during close for {any}: {any}\n", .{ client_ctx.client_tcp, err }); | |
}; | |
client_ctx.destroy(allocator); | |
return .disarm; | |
} | |
fn onWriteComplete( | |
userdata: ?*ClientContext, // Will be *ClientContext | |
loop: *xev.Loop, | |
_: *xev.Completion, // This is &client_ctx.write_completion | |
tcp_stream: xev.TCP, // This is client_ctx.client_tcp | |
_: xev.WriteBuffer, | |
result: xev.WriteError!usize, | |
) xev.CallbackAction { | |
// const client_ctx: *ClientContext = @ptrCast(@alignCast(userdata.?)); | |
const client_ctx: *ClientContext = userdata.?; | |
_ = tcp_stream; | |
// std.debug.print("Write complete for client {any} (stream: {any})\n", .{ client_ctx.client_tcp, tcp_stream }); | |
_ = result catch |err| { | |
std.debug.print("Write error for {any}: {any}. Closing connection.\n", .{ client_ctx.client_tcp, err }); | |
// Still proceed to close | |
}; | |
client_ctx.client_tcp.close(loop, &client_ctx.close_completion, ClientContext, client_ctx, onConnectionClosed); | |
return .disarm; | |
} | |
fn onAcceptConnection( | |
userdata: ?*ServerContext, // Will be *ServerContext | |
ev_loop: *xev.Loop, | |
// This completion_ref is &server_ctx.accept_completion from server_ctx on server() stack | |
listener_completion_ref: *xev.Completion, | |
result: xev.AcceptError!xev.TCP, | |
) xev.CallbackAction { | |
// const server_ctx: *ServerContext = @ptrCast(@alignCast(userdata.?)); | |
const server_ctx: *ServerContext = userdata.?; | |
// std.debug.print("Attempting to accept connection...\n", .{}); | |
// It's okay to check the state of the listener's completion object | |
// std.debug.print("Listener Completion state before processing: {any}\n", .{listener_completion_ref.state()}); | |
// Re-arm accept for the *next* connection on the listener socket *immediately*. | |
// This ensures the server is always ready to accept new connections. | |
// We use the same listener_completion_ref because it belongs to the listener and is long-lived. | |
server_ctx.listener_tcp.accept(ev_loop, listener_completion_ref, ServerContext, server_ctx, onAcceptConnection); | |
const new_client_stream = result catch |err| { | |
std.debug.print("Error accepting connection: {}. Listener remains active.\n", .{err}); | |
return .disarm; // Disarm this specific (failed) accept callback instance. The listener is re-armed above. | |
}; | |
// std.debug.print("Accepted new client: {any}\n", .{new_client_stream}); | |
// Create a context for this new client | |
var client_ctx: *ClientContext = undefined; | |
client_ctx = ClientContext.create(allocator, new_client_stream, server_ctx) catch |err| { | |
std.debug.print("Failed to create client context: {any}. Closing accepted socket.\n", .{err}); | |
// We accepted a client but can't handle it (e.g. OOM). Close it. | |
// Need a temporary completion for this immediate close if we don't have ClientContext | |
var temp_close_completion: xev.Completion = undefined; | |
new_client_stream.close(ev_loop, &temp_close_completion, void, null, fnNoopClose); | |
return .disarm; // Disarm this accept callback. Listener is re-armed. | |
}; | |
// Simple HTTP response | |
const response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!"; | |
// Use client_ctx.write_completion for the write operation | |
// The userdata for onWriteComplete will be client_ctx | |
client_ctx.client_tcp.write( | |
ev_loop, | |
&client_ctx.write_completion, // Completion object owned by ClientContext | |
.{ .slice = response }, //buffer to write | |
ClientContext, // comptime Userdata: type | |
client_ctx, // Pass ClientContext as userdata | |
onWriteComplete, | |
); | |
return .disarm; // Disarm *this instance* of the accept callback. The listener is re-armed above. | |
} | |
pub fn server() !void { | |
var thread_pool = xev.ThreadPool.init(.{ .max_threads = 6 }); // should be configurable | |
defer { | |
thread_pool.shutdown(); | |
thread_pool.deinit(); | |
} | |
var loop_val = try xev.Loop.init(.{ .thread_pool = &thread_pool }); | |
// var loop_val = try xev.Loop.init(.{}); | |
defer loop_val.deinit(); | |
// Ensure GPA is deinitialized if server() returns (normally or via error) | |
// Note: If loop.run() blocks indefinitely, this defer for gpa might not run as expected | |
// until the loop is explicitly stopped. | |
defer { | |
std.debug.print("Deinitializing GPA.\n", .{}); | |
// In a real server, you'd gracefully shutdown connections before this. | |
// Here, we might leak ClientContexts if the server exits abruptly while | |
// clients are connected. | |
_ = gpa.deinit(); | |
} | |
const address = try std.net.Address.resolveIp("127.0.0.1", 4221); | |
const tcp_listener = try xev.TCP.init(address); | |
// Proper shutdown of the listener itself would also be an async operation. | |
// For this example, we'll omit its explicit close for simplicity, | |
// relying on OS to clean up on process exit. | |
// var listener_close_completion: xev.Completion = undefined; | |
// defer tcp_listener.close(&loop_val, &listener_close_completion, null, fnNoopClose); | |
try tcp_listener.bind(address); | |
try tcp_listener.listen(180); //number should be configurable | |
std.debug.print("Listening on {d}\n", .{address.getPort()}); | |
// server_ctx lives on the stack of server(), which is fine as server() | |
// doesn't return until loop.run() finishes. | |
var server_ctx = ServerContext{ | |
.listener_tcp = tcp_listener, | |
.loop = &loop_val, | |
// .accept_completion is already 'undefined' | |
}; | |
// Initial accept call. Uses server_ctx.accept_completion. | |
// Userdata is server_ctx itself. | |
server_ctx.listener_tcp.accept(&loop_val, &server_ctx.accept_completion, ServerContext, &server_ctx, onAcceptConnection); | |
std.debug.print("Accepting connections...\n", .{}); | |
try loop_val.run(.until_done); | |
std.debug.print("Server loop finished.\n", .{}); | |
} | |
// The stopServer function from original code is not directly used in this flow. | |
// A graceful shutdown mechanism would involve stopping the accept loop, | |
// waiting for active connections to close, then closing the listener, | |
// and finally stopping the event loop. | |
// fn stopServer( | |
// userdata: ?*void, | |
// loop: *xev.Loop, | |
// c: *xev.Completion, | |
// tcp_stream: xev.TCP, | |
// result: xev.CloseError!void, | |
// ) xev.CallbackAction { | |
// _ = userdata; | |
// _ = loop; | |
// _ = c; | |
// _ = tcp_stream; | |
// _ = result catch |err| { | |
// std.debug.print("Error closing server: {}\n", .{err}); | |
// return .disarm; | |
// }; | |
// std.debug.print("Server shutdown!\n", .{}); | |
// // To actually stop the loop, you might need to call loop.stop() or similar | |
// // if xev provides such a mechanism, or ensure all completions are disarmed. | |
// return .disarm; | |
// } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I made a revision, so working-lib.zig has the code I got it working (with help from LLM - Gemini 2.5 Pro I/O edition). The bad/old code is in lib.zig