tcp: break up xev handlers into smaller files

This commit is contained in:
Aarnav Tale
2024-04-19 22:48:52 -04:00
parent 403175bea8
commit b592910dd3
5 changed files with 106 additions and 137 deletions

View File

@ -1,6 +1,8 @@
const std = @import("std");
const log = std.log.scoped(.tcp_thread);
const ping = @import("commands/ping.zig").ping;
pub const Command = enum {
ping,
@ -19,7 +21,7 @@ pub const Command = enum {
var iter = std.mem.splitScalar(u8, trimmed, ' ');
// Not using .first() because it returns the remainder of the slice
// Not using .first() because it returns everything if there is no space
// Instead we're doing what's equivalent to popping the first element
const cmdName = iter.next() orelse return error.InvalidInput;
log.debug("got command name={s}", .{cmdName});
@ -42,9 +44,4 @@ pub const Command = enum {
}
};
// Want to move this to different file, not sure how I want it organized yet
fn ping() []const u8 {
return "PONG\n";
}
// TODO: These need proper testing.

View File

@ -3,6 +3,7 @@ const xev = @import("xev");
const tcp = @import("../tcp.zig");
const reject_client = @import("./handlers/reject.zig").reject_client;
const read_client = @import("./handlers/reader.zig").read_client;
const Allocator = std.mem.Allocator;
const CompletionPool = std.heap.MemoryPool(xev.Completion);
@ -19,6 +20,9 @@ const ACCEPTOR_RATE = 1;
/// A wrapper around the TCP server socket
pub const Server = @This();
/// Used for response formatting
alloc: Allocator,
/// Memory pool that stores the completions required by xev
comp_pool: CompletionPool,
@ -61,6 +65,7 @@ pub fn init(alloc: Allocator, addr: std.net.Address) !Server {
.acceptor = try xev.Timer.init(),
.clients = undefined,
.clients_count = 0,
.alloc = alloc,
};
}
@ -92,7 +97,7 @@ pub fn start(self: *Server) !void {
}
/// Convenience function to destroy a buffer in our pool
fn destroyBuffer(self: *Server, buf: []const u8) void {
pub fn destroyBuffer(self: *Server, buf: []const u8) void {
self.buf_pool.destroy(@alignCast(
@as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))),
));
@ -132,7 +137,7 @@ fn acceptor(
/// Accepts a new client connection and starts reading from it until EOF.
fn acceptHandler(
self_: ?*Server,
l: *xev.Loop,
_: *xev.Loop,
c: *xev.Completion,
e: xev.TCP.AcceptError!xev.TCP,
) xev.CallbackAction {
@ -148,17 +153,6 @@ fn acceptHandler(
return .disarm;
};
const comp_w = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return .disarm;
};
const buf_w = self.buf_pool.create() catch {
log.err("couldn't allocate buffer in pool", .{});
self.sock_pool.destroy(sock);
return .disarm;
};
if (self.clients_count == MAX_CLIENTS) {
log.warn("max clients reached, rejecting fd={d}", .{sock.fd});
reject_client(self, sock) catch return .rearm;
@ -169,113 +163,10 @@ fn acceptHandler(
self.clients[self.clients_count] = sock;
self.clients_count += 1;
const buf_r = self.buf_pool.create() catch {
log.err("couldn't allocate buffer in pool", .{});
return .disarm;
read_client(self, sock, c) catch {
log.err("couldn't read from client", .{});
};
@memcpy(buf_w.ptr, "GHOSTTY\n");
sock.write(l, comp_w, .{ .slice = buf_w }, Server, self, writeHandler);
sock.read(l, c, .{ .slice = buf_r }, Server, self, readHandler);
return .disarm;
}
fn readHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
buf: xev.ReadBuffer,
e: xev.TCP.ReadError!usize,
) xev.CallbackAction {
const self = self_.?;
const l = e catch |err| {
switch (err) {
error.EOF => {
log.info("disconnecting client fd={d}", .{sock.fd});
destroyBuffer(self, buf.slice);
self.clients_count -= 1;
sock.close(loop, comp, Server, self, closeHandler);
return .disarm;
},
else => {
destroyBuffer(self, buf.slice);
log.err("read error", .{});
return .disarm;
},
}
};
const comp_w = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return .rearm;
};
const buf_w = self.buf_pool.create() catch {
log.err("couldn't allocate buffer in pool", .{});
return .rearm;
};
// TODO: What should we do with the rest of the buffer?
var iter = std.mem.splitScalar(u8, buf.slice[0..l], '\n');
while (iter.next()) |item| {
if (item.len == 0) {
continue;
}
const c = tcp.Command.parse(item) catch |err| {
const res = try tcp.Command.handleError(err);
@memcpy(buf_w.ptr, res.ptr[0..res.len]);
sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler);
return .rearm;
};
const res = try tcp.Command.handle(c);
@memcpy(buf_w.ptr, res.ptr[0..res.len]);
}
sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler);
return .rearm;
}
fn writeHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
buf: xev.WriteBuffer,
e: xev.TCP.WriteError!usize,
) xev.CallbackAction {
_ = sock;
_ = comp;
_ = loop;
const self = self_.?;
_ = e catch |err| {
destroyBuffer(self, buf.slice);
log.err("write error {any}", .{err});
return .disarm;
};
return .disarm;
}
fn rejectHandler(
self_: ?*Server,
l: *xev.Loop,
c: *xev.Completion,
sock: xev.TCP,
buf: xev.WriteBuffer,
e: xev.TCP.WriteError!usize,
) xev.CallbackAction {
const self = self_.?;
_ = e catch |err| {
destroyBuffer(self, buf.slice);
log.err("write error {any}", .{err});
return .disarm;
};
sock.close(l, c, Server, self, closeHandler);
return .disarm;
}
@ -299,14 +190,11 @@ fn shutdownHandler(
pub fn closeHandler(
self_: ?*Server,
loop: *xev.Loop,
_: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
_: xev.TCP,
e: xev.TCP.CloseError!void,
) xev.CallbackAction {
_ = sock;
_ = loop;
e catch {
log.err("couldn't close socket", .{});
};

View File

@ -0,0 +1,6 @@
const std = @import("std");
const build_config = @import("../../build_config.zig");
pub fn ping() []const u8 {
return std.fmt.comptimePrint("PONG v={}\n", .{build_config.version});
}

View File

@ -0,0 +1,84 @@
const xev = @import("xev");
const std = @import("std");
const Server = @import("../Server.zig").Server;
const Command = @import("../Command.zig").Command;
const log = std.log.scoped(.tcp_thread);
pub fn read_client(
self: *Server,
client: *xev.TCP,
c: *xev.Completion,
) !void {
const buf_r = self.buf_pool.create() catch return error.OutOfMemory;
client.read(&self.loop, c, .{ .slice = buf_r }, Server, self, rHandler);
}
fn rHandler(
self_: ?*Server,
l: *xev.Loop,
c: *xev.Completion,
s: xev.TCP,
b: xev.ReadBuffer,
e: xev.TCP.ReadError!usize,
) xev.CallbackAction {
const self = self_.?;
const len = e catch |err| {
Server.destroyBuffer(self, b.slice);
switch (err) {
error.EOF => {
log.info("client disconnected fd={d}", .{s.fd});
self.clients_count -= 1;
s.close(l, c, Server, self, Server.closeHandler);
return .disarm;
},
else => {
log.err("client read error fd={d} err={any}", .{ s.fd, err });
return .disarm;
},
}
};
// Create the completion task and buffer for our command responses
const c_w = self.comp_pool.create() catch return .rearm;
const b_w = self.buf_pool.create() catch return .rearm;
// Split commands by newline
var iter = std.mem.splitScalar(u8, b.slice[0..len], '\n');
while (iter.next()) |line| {
// Skip empty lines
if (line.len == 0) {
continue;
}
const cmd = Command.parse(line) catch |err| {
const res = try Command.handleError(err);
@memcpy(b_w.ptr, res.ptr[0..res.len]);
continue;
};
const res = try Command.handle(cmd);
@memcpy(b_w.ptr, res.ptr[0..res.len]);
}
s.write(l, c_w, .{ .slice = b_w }, Server, self, wHandler);
return .rearm;
}
fn wHandler(
self_: ?*Server,
_: *xev.Loop,
_: *xev.Completion,
s: xev.TCP,
b: xev.WriteBuffer,
e: xev.TCP.WriteError!usize,
) xev.CallbackAction {
const self = self_.?;
_ = e catch |err| {
log.err("client write error fd={d} err={any}", .{ s.fd, err });
Server.destroyBuffer(self, b.slice);
};
return .disarm;
}

View File

@ -4,12 +4,6 @@ const Server = @import("../Server.zig").Server;
const log = std.log.scoped(.tcp_thread);
fn destroyBuffer(self: *Server, buf: []const u8) void {
self.buf_pool.destroy(@alignCast(
@as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))),
));
}
const RejectError = error{
AllocError,
WriteError,
@ -19,7 +13,7 @@ const RejectError = error{
pub fn reject_client(self: *Server, c: *xev.TCP) !void {
const buf_w = self.buf_pool.create() catch return RejectError.AllocError;
const comp_w = self.comp_pool.create() catch {
destroyBuffer(self, buf_w);
Server.destroyBuffer(self, buf_w);
return RejectError.AllocError;
};
@ -37,7 +31,7 @@ fn wHandler(
) xev.CallbackAction {
const self = self_.?;
_ = e catch |err| {
destroyBuffer(self, b.slice);
Server.destroyBuffer(self, b.slice);
log.err("write error {any}", .{err});
return .disarm;
};