Skip to content

Instantly share code, notes, and snippets.

@pmbanugo
Last active May 7, 2025 13:32
Show Gist options
  • Save pmbanugo/4408bfc8b8a3e3be78e031510a2456b6 to your computer and use it in GitHub Desktop.
Save pmbanugo/4408bfc8b8a3e3be78e031510a2456b6 to your computer and use it in GitHub Desktop.
non-blocking HTTP with libxev event-loop / async I/O
//! By convention, root.zig is the root source file when making a library. If
//! you are making an executable, the convention is to delete this file and
//! start with main.zig instead.
const std = @import("std");
const xev = @import("xev");
const testing = std.testing;
fn onConnectionClosed(
_: ?*void,
_: *xev.Loop,
_: *xev.Completion,
tcp: xev.TCP,
_: xev.CloseError!void,
) xev.CallbackAction {
std.debug.print("connection closed! {any} \n", .{@TypeOf(tcp)});
// var next_completion: xev.Completion = undefined;
// tcp.accept(loop, c, xev.TCP, @constCast(&tcp), onAcceptConnection);
return .disarm;
}
fn onWriteComplete(
_: ?*void,
loop: *xev.Loop,
c: *xev.Completion,
tcp: xev.TCP,
_: xev.WriteBuffer,
result: xev.WriteError!usize,
) xev.CallbackAction {
std.debug.print("Write complete!\n", .{});
// _ = tcp
// _ = loop;
// _ = c;
_ = result catch unreachable;
// var close_completion: xev.Completion = undefined;
tcp.close(loop, c, void, null, onConnectionClosed);
return .disarm;
}
fn onAcceptConnection(userdata: ?*void, ev_loop: *xev.Loop, completion_ref: *xev.Completion, result: xev.AcceptError!xev.TCP) xev.CallbackAction {
_ = userdata;
// _ = completion_ref;
std.debug.print("Accepted connection!\n", .{});
std.debug.print("Completion state: {any}\n", .{completion_ref.state()});
const conn = result catch |err| {
std.debug.print("Error accepting connection: {}\n", .{err});
return .disarm;
};
// Simple HTTP response
const response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!";
// var write_completion: xev.Completion = undefined;
conn.write(ev_loop, completion_ref, .{ .slice = response }, void, null, onWriteComplete);
// accept more connections
var next_completion: xev.Completion = undefined;
conn.accept(ev_loop, &next_completion, void, null, onAcceptConnection);
return .disarm;
}
pub fn server() !void {
var loop = try xev.Loop.init(.{});
defer loop.deinit();
const address = try std.net.Address.resolveIp("127.0.0.1", 4221);
const watcher_tcp = try xev.TCP.init(address);
// should it be close() or shutdown()?
// defer watcher_tcp.close(&loop, &finalise_completion, void, null, stopServer);
try watcher_tcp.bind(address);
try watcher_tcp.listen(128); //number should be configurable
std.debug.print("Listening on {d}\n", .{address.getPort()});
var completion: xev.Completion = undefined;
watcher_tcp.accept(&loop, &completion, void, null, onAcceptConnection);
std.debug.print("Accepting connections...\n", .{});
try loop.run(.until_done);
}
fn stopServer(
userdata: ?*void,
loop: *xev.Loop,
c: *xev.Completion,
tcp_stream: xev.TCP,
result: xev.CloseError!void,
) xev.CallbackAction {
_ = userdata;
_ = loop;
_ = c;
_ = tcp_stream;
_ = result catch |err| {
std.debug.print("Error closing server: {}\n", .{err});
return .disarm;
};
std.debug.print("Server shutdown!\n", .{});
return .disarm;
}
//! By convention, main.zig is where your main function lives in the case that
//! you are building an executable. If you are making a library, the convention
//! is to delete this file and start with root.zig instead.
const xev = @import("xev");
const pita = @import("pita_lib");
const std = @import("std");
const lib = @import("pita_lib");
pub fn main() !void {
try pita.server();
std.debug.print("Server started.... MAIN!.\n", .{});
}
const std = @import("std");
const xev = @import("xev");
const testing = std.testing; // Not used, but fine to keep
// Global allocator for simplicity in this example.
// In a real app, manage this more carefully (e.g., pass it around).
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
const allocator = gpa.allocator();
// Context for the listening server
const ServerContext = struct {
listener_tcp: xev.TCP,
// Completion for the listener's accept operations.
// Its memory is stable as part of ServerContext on server()'s stack.
accept_completion: xev.Completion = undefined,
loop: *xev.Loop,
};
// Context for each connected client
const ClientContext = struct {
client_tcp: xev.TCP,
write_completion: xev.Completion = undefined,
close_completion: xev.Completion = undefined,
server_ctx_ptr: *ServerContext, // To access loop, listener if needed (not used here for listener)
// Buffer for the response, so it lives as long as the context
// This is a simple way; for dynamic responses, you'd handle memory differently.
// For this static response, it's fine to have it copied or const.
// For simplicity, we'll keep the response const static.
pub fn create(
alloc: std.mem.Allocator,
client_tcp: xev.TCP,
server_ctx_ptr: *ServerContext,
) !*ClientContext {
const self = try alloc.create(ClientContext);
self.* = ClientContext{
.client_tcp = client_tcp,
.server_ctx_ptr = server_ctx_ptr,
};
// std.debug.print("ClientContext created for {any}\n", .{client_tcp});
return self;
}
pub fn destroy(self: *ClientContext, alloc: std.mem.Allocator) void {
// std.debug.print("ClientContext destroying for {any}\n", .{self.client_tcp});
alloc.destroy(self);
}
};
// Dummy callback for fire-and-forget close attempts on error paths
fn fnNoopClose(_: ?*void, _: *xev.Loop, _: *xev.Completion, _: xev.TCP, _: xev.CloseError!void) xev.CallbackAction {
return .disarm;
}
fn onConnectionClosed(
userdata: ?*ClientContext, // Will be *ClientContext
_: *xev.Loop,
_: *xev.Completion, // This is &client_ctx.close_completion
closed_tcp: xev.TCP,
result: xev.CloseError!void,
) xev.CallbackAction {
// const client_ctx: *ClientContext = @ptrCast(@alignCast(userdata.?));
const client_ctx: *ClientContext = userdata.?;
_ = closed_tcp;
// std.debug.print("Connection closed for client {any} (original handle: {any})\n", .{ client_ctx.client_tcp, closed_tcp });
// if (result) |_| {
// // This block is for the success case (result is 'void').
// // We use |_| because 'void' has no value to capture.
// } else |err| {
// // This block is for the error case.
// // 'err' will be the actual error value from the xev.CloseError set.
// std.debug.print("Error during close for {any}: {any}\n", .{ client_ctx.client_tcp, err });
// }
_ = result catch |err| {
std.debug.print("Error during close for {any}: {any}\n", .{ client_ctx.client_tcp, err });
};
client_ctx.destroy(allocator);
return .disarm;
}
fn onWriteComplete(
userdata: ?*ClientContext, // Will be *ClientContext
loop: *xev.Loop,
_: *xev.Completion, // This is &client_ctx.write_completion
tcp_stream: xev.TCP, // This is client_ctx.client_tcp
_: xev.WriteBuffer,
result: xev.WriteError!usize,
) xev.CallbackAction {
// const client_ctx: *ClientContext = @ptrCast(@alignCast(userdata.?));
const client_ctx: *ClientContext = userdata.?;
_ = tcp_stream;
// std.debug.print("Write complete for client {any} (stream: {any})\n", .{ client_ctx.client_tcp, tcp_stream });
_ = result catch |err| {
std.debug.print("Write error for {any}: {any}. Closing connection.\n", .{ client_ctx.client_tcp, err });
// Still proceed to close
};
client_ctx.client_tcp.close(loop, &client_ctx.close_completion, ClientContext, client_ctx, onConnectionClosed);
return .disarm;
}
fn onAcceptConnection(
userdata: ?*ServerContext, // Will be *ServerContext
ev_loop: *xev.Loop,
// This completion_ref is &server_ctx.accept_completion from server_ctx on server() stack
listener_completion_ref: *xev.Completion,
result: xev.AcceptError!xev.TCP,
) xev.CallbackAction {
// const server_ctx: *ServerContext = @ptrCast(@alignCast(userdata.?));
const server_ctx: *ServerContext = userdata.?;
// std.debug.print("Attempting to accept connection...\n", .{});
// It's okay to check the state of the listener's completion object
// std.debug.print("Listener Completion state before processing: {any}\n", .{listener_completion_ref.state()});
// Re-arm accept for the *next* connection on the listener socket *immediately*.
// This ensures the server is always ready to accept new connections.
// We use the same listener_completion_ref because it belongs to the listener and is long-lived.
server_ctx.listener_tcp.accept(ev_loop, listener_completion_ref, ServerContext, server_ctx, onAcceptConnection);
const new_client_stream = result catch |err| {
std.debug.print("Error accepting connection: {}. Listener remains active.\n", .{err});
return .disarm; // Disarm this specific (failed) accept callback instance. The listener is re-armed above.
};
// std.debug.print("Accepted new client: {any}\n", .{new_client_stream});
// Create a context for this new client
var client_ctx: *ClientContext = undefined;
client_ctx = ClientContext.create(allocator, new_client_stream, server_ctx) catch |err| {
std.debug.print("Failed to create client context: {any}. Closing accepted socket.\n", .{err});
// We accepted a client but can't handle it (e.g. OOM). Close it.
// Need a temporary completion for this immediate close if we don't have ClientContext
var temp_close_completion: xev.Completion = undefined;
new_client_stream.close(ev_loop, &temp_close_completion, void, null, fnNoopClose);
return .disarm; // Disarm this accept callback. Listener is re-armed.
};
// Simple HTTP response
const response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!";
// Use client_ctx.write_completion for the write operation
// The userdata for onWriteComplete will be client_ctx
client_ctx.client_tcp.write(
ev_loop,
&client_ctx.write_completion, // Completion object owned by ClientContext
.{ .slice = response }, //buffer to write
ClientContext, // comptime Userdata: type
client_ctx, // Pass ClientContext as userdata
onWriteComplete,
);
return .disarm; // Disarm *this instance* of the accept callback. The listener is re-armed above.
}
pub fn server() !void {
var thread_pool = xev.ThreadPool.init(.{ .max_threads = 6 }); // should be configurable
defer {
thread_pool.shutdown();
thread_pool.deinit();
}
var loop_val = try xev.Loop.init(.{ .thread_pool = &thread_pool });
// var loop_val = try xev.Loop.init(.{});
defer loop_val.deinit();
// Ensure GPA is deinitialized if server() returns (normally or via error)
// Note: If loop.run() blocks indefinitely, this defer for gpa might not run as expected
// until the loop is explicitly stopped.
defer {
std.debug.print("Deinitializing GPA.\n", .{});
// In a real server, you'd gracefully shutdown connections before this.
// Here, we might leak ClientContexts if the server exits abruptly while
// clients are connected.
_ = gpa.deinit();
}
const address = try std.net.Address.resolveIp("127.0.0.1", 4221);
const tcp_listener = try xev.TCP.init(address);
// Proper shutdown of the listener itself would also be an async operation.
// For this example, we'll omit its explicit close for simplicity,
// relying on OS to clean up on process exit.
// var listener_close_completion: xev.Completion = undefined;
// defer tcp_listener.close(&loop_val, &listener_close_completion, null, fnNoopClose);
try tcp_listener.bind(address);
try tcp_listener.listen(180); //number should be configurable
std.debug.print("Listening on {d}\n", .{address.getPort()});
// server_ctx lives on the stack of server(), which is fine as server()
// doesn't return until loop.run() finishes.
var server_ctx = ServerContext{
.listener_tcp = tcp_listener,
.loop = &loop_val,
// .accept_completion is already 'undefined'
};
// Initial accept call. Uses server_ctx.accept_completion.
// Userdata is server_ctx itself.
server_ctx.listener_tcp.accept(&loop_val, &server_ctx.accept_completion, ServerContext, &server_ctx, onAcceptConnection);
std.debug.print("Accepting connections...\n", .{});
try loop_val.run(.until_done);
std.debug.print("Server loop finished.\n", .{});
}
// The stopServer function from original code is not directly used in this flow.
// A graceful shutdown mechanism would involve stopping the accept loop,
// waiting for active connections to close, then closing the listener,
// and finally stopping the event loop.
// fn stopServer(
// userdata: ?*void,
// loop: *xev.Loop,
// c: *xev.Completion,
// tcp_stream: xev.TCP,
// result: xev.CloseError!void,
// ) xev.CallbackAction {
// _ = userdata;
// _ = loop;
// _ = c;
// _ = tcp_stream;
// _ = result catch |err| {
// std.debug.print("Error closing server: {}\n", .{err});
// return .disarm;
// };
// std.debug.print("Server shutdown!\n", .{});
// // To actually stop the loop, you might need to call loop.stop() or similar
// // if xev provides such a mechanism, or ensure all completions are disarmed.
// return .disarm;
// }
@pmbanugo
Copy link
Author

pmbanugo commented May 7, 2025

I made a revision, so working-lib.zig has the code I got it working (with help from LLM - Gemini 2.5 Pro I/O edition). The bad/old code is in lib.zig

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment