From b592910dd3a4f76a99685ae27585988584d53379 Mon Sep 17 00:00:00 2001 From: Aarnav Tale Date: Fri, 19 Apr 2024 22:48:52 -0400 Subject: [PATCH] tcp: break up xev handlers into smaller files --- src/tcp/Command.zig | 9 +-- src/tcp/Server.zig | 134 +++--------------------------------- src/tcp/commands/ping.zig | 6 ++ src/tcp/handlers/reader.zig | 84 ++++++++++++++++++++++ src/tcp/handlers/reject.zig | 10 +-- 5 files changed, 106 insertions(+), 137 deletions(-) create mode 100644 src/tcp/handlers/reader.zig diff --git a/src/tcp/Command.zig b/src/tcp/Command.zig index 65a9a92de..11f341f36 100644 --- a/src/tcp/Command.zig +++ b/src/tcp/Command.zig @@ -1,6 +1,8 @@ const std = @import("std"); const log = std.log.scoped(.tcp_thread); +const ping = @import("commands/ping.zig").ping; + pub const Command = enum { ping, @@ -19,7 +21,7 @@ pub const Command = enum { var iter = std.mem.splitScalar(u8, trimmed, ' '); - // Not using .first() because it returns the remainder of the slice + // Not using .first() because it returns everything if there is no space // Instead we're doing what's equivalent to popping the first element const cmdName = iter.next() orelse return error.InvalidInput; log.debug("got command name={s}", .{cmdName}); @@ -42,9 +44,4 @@ pub const Command = enum { } }; -// Want to move this to different file, not sure how I want it organized yet -fn ping() []const u8 { - return "PONG\n"; -} - // TODO: These need proper testing. diff --git a/src/tcp/Server.zig b/src/tcp/Server.zig index 0568ed11d..406e0eecd 100644 --- a/src/tcp/Server.zig +++ b/src/tcp/Server.zig @@ -3,6 +3,7 @@ const xev = @import("xev"); const tcp = @import("../tcp.zig"); const reject_client = @import("./handlers/reject.zig").reject_client; +const read_client = @import("./handlers/reader.zig").read_client; const Allocator = std.mem.Allocator; const CompletionPool = std.heap.MemoryPool(xev.Completion); @@ -19,6 +20,9 @@ const ACCEPTOR_RATE = 1; /// A wrapper around the TCP server socket pub const Server = @This(); +/// Used for response formatting +alloc: Allocator, + /// Memory pool that stores the completions required by xev comp_pool: CompletionPool, @@ -61,6 +65,7 @@ pub fn init(alloc: Allocator, addr: std.net.Address) !Server { .acceptor = try xev.Timer.init(), .clients = undefined, .clients_count = 0, + .alloc = alloc, }; } @@ -92,7 +97,7 @@ pub fn start(self: *Server) !void { } /// Convenience function to destroy a buffer in our pool -fn destroyBuffer(self: *Server, buf: []const u8) void { +pub fn destroyBuffer(self: *Server, buf: []const u8) void { self.buf_pool.destroy(@alignCast( @as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))), )); @@ -132,7 +137,7 @@ fn acceptor( /// Accepts a new client connection and starts reading from it until EOF. fn acceptHandler( self_: ?*Server, - l: *xev.Loop, + _: *xev.Loop, c: *xev.Completion, e: xev.TCP.AcceptError!xev.TCP, ) xev.CallbackAction { @@ -148,17 +153,6 @@ fn acceptHandler( return .disarm; }; - const comp_w = self.comp_pool.create() catch { - log.err("couldn't allocate completion in pool", .{}); - return .disarm; - }; - - const buf_w = self.buf_pool.create() catch { - log.err("couldn't allocate buffer in pool", .{}); - self.sock_pool.destroy(sock); - return .disarm; - }; - if (self.clients_count == MAX_CLIENTS) { log.warn("max clients reached, rejecting fd={d}", .{sock.fd}); reject_client(self, sock) catch return .rearm; @@ -169,113 +163,10 @@ fn acceptHandler( self.clients[self.clients_count] = sock; self.clients_count += 1; - const buf_r = self.buf_pool.create() catch { - log.err("couldn't allocate buffer in pool", .{}); - return .disarm; + read_client(self, sock, c) catch { + log.err("couldn't read from client", .{}); }; - @memcpy(buf_w.ptr, "GHOSTTY\n"); - sock.write(l, comp_w, .{ .slice = buf_w }, Server, self, writeHandler); - sock.read(l, c, .{ .slice = buf_r }, Server, self, readHandler); - return .disarm; -} - -fn readHandler( - self_: ?*Server, - loop: *xev.Loop, - comp: *xev.Completion, - sock: xev.TCP, - buf: xev.ReadBuffer, - e: xev.TCP.ReadError!usize, -) xev.CallbackAction { - const self = self_.?; - const l = e catch |err| { - switch (err) { - error.EOF => { - log.info("disconnecting client fd={d}", .{sock.fd}); - destroyBuffer(self, buf.slice); - self.clients_count -= 1; - sock.close(loop, comp, Server, self, closeHandler); - return .disarm; - }, - - else => { - destroyBuffer(self, buf.slice); - log.err("read error", .{}); - return .disarm; - }, - } - }; - - const comp_w = self.comp_pool.create() catch { - log.err("couldn't allocate completion in pool", .{}); - return .rearm; - }; - - const buf_w = self.buf_pool.create() catch { - log.err("couldn't allocate buffer in pool", .{}); - return .rearm; - }; - - // TODO: What should we do with the rest of the buffer? - var iter = std.mem.splitScalar(u8, buf.slice[0..l], '\n'); - while (iter.next()) |item| { - if (item.len == 0) { - continue; - } - - const c = tcp.Command.parse(item) catch |err| { - const res = try tcp.Command.handleError(err); - @memcpy(buf_w.ptr, res.ptr[0..res.len]); - sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler); - return .rearm; - }; - - const res = try tcp.Command.handle(c); - @memcpy(buf_w.ptr, res.ptr[0..res.len]); - } - - sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler); - return .rearm; -} - -fn writeHandler( - self_: ?*Server, - loop: *xev.Loop, - comp: *xev.Completion, - sock: xev.TCP, - buf: xev.WriteBuffer, - e: xev.TCP.WriteError!usize, -) xev.CallbackAction { - _ = sock; - _ = comp; - _ = loop; - const self = self_.?; - _ = e catch |err| { - destroyBuffer(self, buf.slice); - log.err("write error {any}", .{err}); - return .disarm; - }; - - return .disarm; -} - -fn rejectHandler( - self_: ?*Server, - l: *xev.Loop, - c: *xev.Completion, - sock: xev.TCP, - buf: xev.WriteBuffer, - e: xev.TCP.WriteError!usize, -) xev.CallbackAction { - const self = self_.?; - _ = e catch |err| { - destroyBuffer(self, buf.slice); - log.err("write error {any}", .{err}); - return .disarm; - }; - - sock.close(l, c, Server, self, closeHandler); return .disarm; } @@ -299,14 +190,11 @@ fn shutdownHandler( pub fn closeHandler( self_: ?*Server, - loop: *xev.Loop, + _: *xev.Loop, comp: *xev.Completion, - sock: xev.TCP, + _: xev.TCP, e: xev.TCP.CloseError!void, ) xev.CallbackAction { - _ = sock; - _ = loop; - e catch { log.err("couldn't close socket", .{}); }; diff --git a/src/tcp/commands/ping.zig b/src/tcp/commands/ping.zig index e69de29bb..d2a6b10e1 100644 --- a/src/tcp/commands/ping.zig +++ b/src/tcp/commands/ping.zig @@ -0,0 +1,6 @@ +const std = @import("std"); +const build_config = @import("../../build_config.zig"); + +pub fn ping() []const u8 { + return std.fmt.comptimePrint("PONG v={}\n", .{build_config.version}); +} diff --git a/src/tcp/handlers/reader.zig b/src/tcp/handlers/reader.zig new file mode 100644 index 000000000..8d79f33a1 --- /dev/null +++ b/src/tcp/handlers/reader.zig @@ -0,0 +1,84 @@ +const xev = @import("xev"); +const std = @import("std"); +const Server = @import("../Server.zig").Server; +const Command = @import("../Command.zig").Command; + +const log = std.log.scoped(.tcp_thread); + +pub fn read_client( + self: *Server, + client: *xev.TCP, + c: *xev.Completion, +) !void { + const buf_r = self.buf_pool.create() catch return error.OutOfMemory; + client.read(&self.loop, c, .{ .slice = buf_r }, Server, self, rHandler); +} + +fn rHandler( + self_: ?*Server, + l: *xev.Loop, + c: *xev.Completion, + s: xev.TCP, + b: xev.ReadBuffer, + e: xev.TCP.ReadError!usize, +) xev.CallbackAction { + const self = self_.?; + const len = e catch |err| { + Server.destroyBuffer(self, b.slice); + switch (err) { + error.EOF => { + log.info("client disconnected fd={d}", .{s.fd}); + self.clients_count -= 1; + s.close(l, c, Server, self, Server.closeHandler); + return .disarm; + }, + + else => { + log.err("client read error fd={d} err={any}", .{ s.fd, err }); + return .disarm; + }, + } + }; + + // Create the completion task and buffer for our command responses + const c_w = self.comp_pool.create() catch return .rearm; + const b_w = self.buf_pool.create() catch return .rearm; + + // Split commands by newline + var iter = std.mem.splitScalar(u8, b.slice[0..len], '\n'); + while (iter.next()) |line| { + // Skip empty lines + if (line.len == 0) { + continue; + } + + const cmd = Command.parse(line) catch |err| { + const res = try Command.handleError(err); + @memcpy(b_w.ptr, res.ptr[0..res.len]); + continue; + }; + + const res = try Command.handle(cmd); + @memcpy(b_w.ptr, res.ptr[0..res.len]); + } + + s.write(l, c_w, .{ .slice = b_w }, Server, self, wHandler); + return .rearm; +} + +fn wHandler( + self_: ?*Server, + _: *xev.Loop, + _: *xev.Completion, + s: xev.TCP, + b: xev.WriteBuffer, + e: xev.TCP.WriteError!usize, +) xev.CallbackAction { + const self = self_.?; + _ = e catch |err| { + log.err("client write error fd={d} err={any}", .{ s.fd, err }); + Server.destroyBuffer(self, b.slice); + }; + + return .disarm; +} diff --git a/src/tcp/handlers/reject.zig b/src/tcp/handlers/reject.zig index 85283f802..07c698f86 100644 --- a/src/tcp/handlers/reject.zig +++ b/src/tcp/handlers/reject.zig @@ -4,12 +4,6 @@ const Server = @import("../Server.zig").Server; const log = std.log.scoped(.tcp_thread); -fn destroyBuffer(self: *Server, buf: []const u8) void { - self.buf_pool.destroy(@alignCast( - @as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))), - )); -} - const RejectError = error{ AllocError, WriteError, @@ -19,7 +13,7 @@ const RejectError = error{ pub fn reject_client(self: *Server, c: *xev.TCP) !void { const buf_w = self.buf_pool.create() catch return RejectError.AllocError; const comp_w = self.comp_pool.create() catch { - destroyBuffer(self, buf_w); + Server.destroyBuffer(self, buf_w); return RejectError.AllocError; }; @@ -37,7 +31,7 @@ fn wHandler( ) xev.CallbackAction { const self = self_.?; _ = e catch |err| { - destroyBuffer(self, b.slice); + Server.destroyBuffer(self, b.slice); log.err("write error {any}", .{err}); return .disarm; };