tcp: abstract connection handlers out

This commit is contained in:
Aarnav Tale
2024-04-20 19:33:33 -04:00
parent 0a7d8eb8d4
commit 545844ecf8
4 changed files with 134 additions and 84 deletions

View File

@ -1,9 +1,7 @@
const std = @import("std"); const std = @import("std");
const xev = @import("xev"); const xev = @import("xev");
const Config = @import("../config/Config.zig"); const Config = @import("../config/Config.zig");
const connections = @import("./handlers/connections.zig");
const reject_client = @import("./handlers/reject.zig").reject_client;
const read_client = @import("./handlers/reader.zig").read_client;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const CompletionPool = std.heap.MemoryPool(xev.Completion); const CompletionPool = std.heap.MemoryPool(xev.Completion);
@ -103,16 +101,11 @@ pub fn deinit(self: *Server) void {
pub fn start(self: *Server) !void { pub fn start(self: *Server) !void {
try self.socket.bind(self.addr); try self.socket.bind(self.addr);
try self.socket.listen(self.max_clients); try self.socket.listen(self.max_clients);
try connections.startAccepting(self);
log.info("bound server to socket={any}", .{self.socket}); log.info("bound server to socket={any}", .{self.socket});
// Each acceptor borrows a completion from the pool // TODO: Stop flag? Only necessary if we support signaling the server
// We do this because the completion is passed to the client TCP handlers // from the main thread on an event, ie. configuration reloading.
const c = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return error.OutOfMemory;
};
self.socket.accept(&self.loop, c, Server, self, acceptHandler);
while (true) { while (true) {
try self.loop.run(.until_done); try self.loop.run(.until_done);
} }
@ -125,81 +118,40 @@ pub fn destroyBuffer(self: *Server, buf: []const u8) void {
)); ));
} }
/// Accepts a new client connection and starts reading from it until EOF. const BindError = error{
/// Once an accept handler enters, it queues for a new client connection. NoAddress,
/// It essentially recursively calls itself until shutdown. InvalidAddress,
fn acceptHandler( };
self_: ?*Server,
_: *xev.Loop, /// Tries to generate a valid address to bind to, expects that the address will
c: *xev.Completion, /// start with tcp:// when binding to an IP and unix:// when binding to a file
e: xev.TCP.AcceptError!xev.TCP, /// based socket.
) xev.CallbackAction { pub fn parseAddress(raw_addr: ?[:0]const u8) BindError!std.net.Address {
const self = self_.?; const addr = raw_addr orelse {
const new_c = self.comp_pool.create() catch { return BindError.NoAddress;
log.err("couldn't allocate completion in pool", .{});
return .disarm;
}; };
// Accept a new client connection now that we have a new completion if (addr.len == 0) {
self.socket.accept(&self.loop, new_c, Server, self, acceptHandler); return BindError.NoAddress;
const sock = self.sock_pool.create() catch {
log.err("couldn't allocate socket in pool", .{});
return .disarm;
};
sock.* = e catch {
log.err("accept error", .{});
self.sock_pool.destroy(sock);
return .disarm;
};
if (self.clients_count == self.max_clients) {
log.warn("max clients reached, rejecting fd={d}", .{sock.fd});
reject_client(self, sock) catch return .rearm;
return .disarm;
} }
log.info("accepted connection fd={d}", .{sock.fd}); const uri = std.Uri.parse(addr) catch return BindError.InvalidAddress;
self.clients_count += 1; if (std.mem.eql(u8, uri.scheme, "tcp")) {
const host = uri.host orelse return BindError.InvalidAddress;
const port = uri.port orelse return BindError.InvalidAddress;
read_client(self, sock, c) catch { return std.net.Address.parseIp4(host.percent_encoded, port) catch {
log.err("couldn't read from client", .{}); return BindError.InvalidAddress;
}; };
}
return .disarm; // TODO: Should we check for valid file paths or just rely on the initUnix
} // function to return an error?
if (std.mem.eql(u8, uri.scheme, "unix")) {
fn shutdownHandler( return std.net.Address.initUnix(uri.path.percent_encoded) catch {
self_: ?*Server, return BindError.InvalidAddress;
loop: *xev.Loop, };
comp: *xev.Completion, }
sock: xev.TCP,
e: xev.TCP.ShutdownError!void, return BindError.InvalidAddress;
) xev.CallbackAction {
e catch {
// Is this even possible?
log.err("couldn't shutdown socket", .{});
};
const self = self_.?;
sock.close(loop, comp, Server, self, closeHandler);
return .disarm;
}
pub fn closeHandler(
self_: ?*Server,
_: *xev.Loop,
comp: *xev.Completion,
_: xev.TCP,
e: xev.TCP.CloseError!void,
) xev.CallbackAction {
e catch {
log.err("couldn't close socket", .{});
};
const self = self_.?;
self.comp_pool.destroy(comp);
return .disarm;
} }

View File

@ -0,0 +1,96 @@
const xev = @import("xev");
const std = @import("std");
const Server = @import("../Server.zig").Server;
const Command = @import("../Command.zig").Command;
const reject_client = @import("./reject.zig").reject_client;
const read_client = @import("./reader.zig").read_client;
const log = std.log.scoped(.tcp_thread);
/// Starts accepting client connections on the server's socket.
/// Note: This first xev.Completion is not destroyed here because it gets used
/// for an entire client connection lifecycle.
pub fn startAccepting(self: *Server) !void {
const c = try self.comp_pool.create();
self.socket.accept(&self.loop, c, Server, self, aHandler);
}
/// Accepts a new client connection and starts reading from it until EOF.
/// Once an accept handler enters, it queues for a new client connection.
/// It essentially recursively calls itself until shutdown.
fn aHandler(
self_: ?*Server,
_: *xev.Loop,
c: *xev.Completion,
e: xev.TCP.AcceptError!xev.TCP,
) xev.CallbackAction {
const self = self_.?;
const new_c = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return .disarm;
};
// Accept a new client connection now that we have a new completion
self.socket.accept(&self.loop, new_c, Server, self, aHandler);
const sock = self.sock_pool.create() catch {
log.err("couldn't allocate socket in pool", .{});
return .disarm;
};
sock.* = e catch {
log.err("accept error", .{});
self.sock_pool.destroy(sock);
return .disarm;
};
if (self.clients_count == self.max_clients) {
log.warn("max clients reached, rejecting fd={d}", .{sock.fd});
reject_client(self, sock) catch return .rearm;
return .disarm;
}
log.info("accepted connection fd={d}", .{sock.fd});
self.clients_count += 1;
read_client(self, sock, c) catch {
log.err("couldn't read from client", .{});
};
return .disarm;
}
fn sHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
e: xev.TCP.ShutdownError!void,
) xev.CallbackAction {
e catch {
// Is this even possible?
log.err("couldn't shutdown socket", .{});
};
const self = self_.?;
sock.close(loop, comp, Server, self, cHandler);
return .disarm;
}
/// Closes the client connection and cleans up the completion.
pub fn cHandler(
self_: ?*Server,
_: *xev.Loop,
comp: *xev.Completion,
_: xev.TCP,
e: xev.TCP.CloseError!void,
) xev.CallbackAction {
e catch {
log.err("couldn't close socket", .{});
};
const self = self_.?;
self.comp_pool.destroy(comp);
return .disarm;
}

View File

@ -2,6 +2,7 @@ const xev = @import("xev");
const std = @import("std"); const std = @import("std");
const Server = @import("../Server.zig").Server; const Server = @import("../Server.zig").Server;
const Command = @import("../Command.zig").Command; const Command = @import("../Command.zig").Command;
const connections = @import("./connections.zig");
const log = std.log.scoped(.tcp_thread); const log = std.log.scoped(.tcp_thread);
@ -29,7 +30,7 @@ fn rHandler(
error.EOF => { error.EOF => {
log.info("client disconnected fd={d}", .{s.fd}); log.info("client disconnected fd={d}", .{s.fd});
self.clients_count -= 1; self.clients_count -= 1;
s.close(l, c, Server, self, Server.closeHandler); s.close(l, c, Server, self, connections.cHandler);
return .disarm; return .disarm;
}, },

View File

@ -1,6 +1,7 @@
const xev = @import("xev"); const xev = @import("xev");
const std = @import("std"); const std = @import("std");
const Server = @import("../Server.zig").Server; const Server = @import("../Server.zig").Server;
const connections = @import("./connections.zig");
const log = std.log.scoped(.tcp_thread); const log = std.log.scoped(.tcp_thread);
@ -36,6 +37,6 @@ fn wHandler(
return .disarm; return .disarm;
}; };
client.close(l, c, Server, self, Server.closeHandler); client.close(l, c, Server, self, connections.cHandler);
return .disarm; return .disarm;
} }