diff --git a/src/main_ghostty.zig b/src/main_ghostty.zig index 6354214d0..6089371c2 100644 --- a/src/main_ghostty.zig +++ b/src/main_ghostty.zig @@ -18,6 +18,7 @@ const fontconfig = @import("fontconfig"); const harfbuzz = @import("harfbuzz"); const renderer = @import("renderer.zig"); const apprt = @import("apprt.zig"); +const tcp = @import("tcp.zig"); const App = @import("App.zig"); const Ghostty = @import("main_c.zig").Ghostty; @@ -112,6 +113,18 @@ pub fn main() !MainReturn { // when the first surface is created. if (@hasDecl(apprt.App, "startQuitTimer")) app_runtime.startQuitTimer(); + // Not sure where this should go tbh + var tcp_thread = try tcp.Thread.init(alloc); + defer tcp_thread.deinit(); + + var tcp_thr = try std.Thread.spawn( + .{}, + tcp.Thread.threadMain, + .{&tcp_thread}, + ); + + tcp_thr.setName("tcp") catch {}; + // Run the GUI event loop try app_runtime.run(); } diff --git a/src/tcp.zig b/src/tcp.zig new file mode 100644 index 000000000..68450920b --- /dev/null +++ b/src/tcp.zig @@ -0,0 +1,9 @@ +//! TCP implementation. The TCP implementation is responsible for +//! responding to TCP requests and dispatching them to the app's Mailbox. +pub const Thread = @import("tcp/Thread.zig"); +pub const Server = @import("tcp/Server.zig"); +pub const Command = @import("tcp/Command.zig").Command; + +test { + @import("std").testing.refAllDecls(@This()); +} diff --git a/src/tcp/Command.zig b/src/tcp/Command.zig new file mode 100644 index 000000000..65a9a92de --- /dev/null +++ b/src/tcp/Command.zig @@ -0,0 +1,50 @@ +const std = @import("std"); +const log = std.log.scoped(.tcp_thread); + +pub const Command = enum { + ping, + + pub const Error = error{ + InvalidInput, + InvalidCommand, + }; + + /// Takes the bytes from our TCP handle and parses them into a Command. + pub fn parse(raw_read: []const u8) !Command { + const trimmed = std.mem.trim(u8, raw_read, "\t\n\r"); + + if (trimmed.len == 0) { + return error.InvalidInput; + } + + var iter = std.mem.splitScalar(u8, trimmed, ' '); + + // Not using .first() because it returns the remainder of the slice + // Instead we're doing what's equivalent to popping the first element + const cmdName = iter.next() orelse return error.InvalidInput; + log.debug("got command name={s}", .{cmdName}); + // TODO: Handle/support arguments + + return std.meta.stringToEnum(Command, cmdName) orelse return error.InvalidCommand; + } + + pub fn handle(self: Command) ![]const u8 { + switch (self) { + .ping => return ping(), + } + } + + pub fn handleError(err: Command.Error) ![]const u8 { + switch (err) { + error.InvalidInput => return "INVALID INPUT\n", + error.InvalidCommand => return "INVALID COMMAND\n", + } + } +}; + +// Want to move this to different file, not sure how I want it organized yet +fn ping() []const u8 { + return "PONG\n"; +} + +// TODO: These need proper testing. diff --git a/src/tcp/Server.zig b/src/tcp/Server.zig new file mode 100644 index 000000000..0568ed11d --- /dev/null +++ b/src/tcp/Server.zig @@ -0,0 +1,317 @@ +const std = @import("std"); +const xev = @import("xev"); +const tcp = @import("../tcp.zig"); + +const reject_client = @import("./handlers/reject.zig").reject_client; + +const Allocator = std.mem.Allocator; +const CompletionPool = std.heap.MemoryPool(xev.Completion); +const SocketPool = std.heap.MemoryPool(xev.TCP); +const BufferPool = std.heap.MemoryPool([1024]u8); +const log = std.log.scoped(.tcp_thread); + +/// Maximum connections we allow at once +const MAX_CLIENTS = 2; + +/// Acceptor polling rate in milliseconds +const ACCEPTOR_RATE = 1; + +/// A wrapper around the TCP server socket +pub const Server = @This(); + +/// Memory pool that stores the completions required by xev +comp_pool: CompletionPool, + +/// Memory pool that stores client sockets +sock_pool: SocketPool, + +/// Memory pool that stores buffers for reading and writing +buf_pool: BufferPool, + +/// Stop flag +stop: bool, + +/// Event loop +loop: xev.Loop, + +/// Server socket +socket: xev.TCP, + +/// Timer for accepting connections +acceptor: xev.Timer, + +/// Array of client sockets +clients: [MAX_CLIENTS]*xev.TCP, + +/// Number of clients connected +clients_count: usize, + +/// Initializes the server with the given allocator and address +pub fn init(alloc: Allocator, addr: std.net.Address) !Server { + const socket = try xev.TCP.init(addr); + try socket.bind(addr); + + return Server{ + .comp_pool = CompletionPool.init(alloc), + .sock_pool = SocketPool.init(alloc), + .buf_pool = BufferPool.init(alloc), + .stop = false, + .loop = try xev.Loop.init(.{}), + .socket = socket, + .acceptor = try xev.Timer.init(), + .clients = undefined, + .clients_count = 0, + }; +} + +/// Deinitializes the server +pub fn deinit(self: *Server) void { + self.buf_pool.deinit(); + self.comp_pool.deinit(); + self.sock_pool.deinit(); + self.loop.deinit(); + self.acceptor.deinit(); +} + +/// Starts the timer which tries to accept connections +pub fn start(self: *Server) !void { + try self.socket.listen(MAX_CLIENTS); + log.info("bound server to socket={any}", .{self.socket}); + + // Each acceptor borrows a completion from the pool + // We do this because the completion is passed to the client TCP handlers + const c = self.comp_pool.create() catch { + log.err("couldn't allocate completion in pool", .{}); + return error.OutOfMemory; + }; + + self.acceptor.run(&self.loop, c, ACCEPTOR_RATE, Server, self, acceptor); + while (!self.stop) { + try self.loop.run(.until_done); + } +} + +/// Convenience function to destroy a buffer in our pool +fn destroyBuffer(self: *Server, buf: []const u8) void { + self.buf_pool.destroy(@alignCast( + @as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))), + )); +} + +/// This runs on a loop and attempts to accept a connection to pass into the +/// handlers that manage client communication. It is very important to note +/// that the xev.Completion is shared between the acceptor and other handlers. +/// When a client disconnects, only then the completion is freed. +fn acceptor( + self_: ?*Server, + loop: *xev.Loop, + c: *xev.Completion, + e: xev.Timer.RunError!void, +) xev.CallbackAction { + const self = self_.?; + e catch { + log.err("timer error", .{}); + return .disarm; + }; + + self.socket.accept(loop, c, Server, self, acceptHandler); + + // We need to create a new completion for the next acceptor since each + // TCP connection will need its own if it successfully accepts. + const accept_recomp = self.comp_pool.create() catch { + log.err("couldn't allocate completion in pool", .{}); + return .disarm; + }; + + // We can't rearm because it'll repeat *this* instance of the acceptor + // So if the socket fails to accept it won't actually accept anything new + self.acceptor.run(loop, accept_recomp, ACCEPTOR_RATE, Server, self, acceptor); + return .disarm; +} + +/// Accepts a new client connection and starts reading from it until EOF. +fn acceptHandler( + self_: ?*Server, + l: *xev.Loop, + c: *xev.Completion, + e: xev.TCP.AcceptError!xev.TCP, +) xev.CallbackAction { + const self = self_.?; + const sock = self.sock_pool.create() catch { + log.err("couldn't allocate socket in pool", .{}); + return .disarm; + }; + + sock.* = e catch { + log.err("accept error", .{}); + self.sock_pool.destroy(sock); + return .disarm; + }; + + const comp_w = self.comp_pool.create() catch { + log.err("couldn't allocate completion in pool", .{}); + return .disarm; + }; + + const buf_w = self.buf_pool.create() catch { + log.err("couldn't allocate buffer in pool", .{}); + self.sock_pool.destroy(sock); + return .disarm; + }; + + if (self.clients_count == MAX_CLIENTS) { + log.warn("max clients reached, rejecting fd={d}", .{sock.fd}); + reject_client(self, sock) catch return .rearm; + return .disarm; + } + + log.info("accepted connection fd={d}", .{sock.fd}); + self.clients[self.clients_count] = sock; + self.clients_count += 1; + + const buf_r = self.buf_pool.create() catch { + log.err("couldn't allocate buffer in pool", .{}); + return .disarm; + }; + + @memcpy(buf_w.ptr, "GHOSTTY\n"); + sock.write(l, comp_w, .{ .slice = buf_w }, Server, self, writeHandler); + sock.read(l, c, .{ .slice = buf_r }, Server, self, readHandler); + return .disarm; +} + +fn readHandler( + self_: ?*Server, + loop: *xev.Loop, + comp: *xev.Completion, + sock: xev.TCP, + buf: xev.ReadBuffer, + e: xev.TCP.ReadError!usize, +) xev.CallbackAction { + const self = self_.?; + const l = e catch |err| { + switch (err) { + error.EOF => { + log.info("disconnecting client fd={d}", .{sock.fd}); + destroyBuffer(self, buf.slice); + self.clients_count -= 1; + sock.close(loop, comp, Server, self, closeHandler); + return .disarm; + }, + + else => { + destroyBuffer(self, buf.slice); + log.err("read error", .{}); + return .disarm; + }, + } + }; + + const comp_w = self.comp_pool.create() catch { + log.err("couldn't allocate completion in pool", .{}); + return .rearm; + }; + + const buf_w = self.buf_pool.create() catch { + log.err("couldn't allocate buffer in pool", .{}); + return .rearm; + }; + + // TODO: What should we do with the rest of the buffer? + var iter = std.mem.splitScalar(u8, buf.slice[0..l], '\n'); + while (iter.next()) |item| { + if (item.len == 0) { + continue; + } + + const c = tcp.Command.parse(item) catch |err| { + const res = try tcp.Command.handleError(err); + @memcpy(buf_w.ptr, res.ptr[0..res.len]); + sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler); + return .rearm; + }; + + const res = try tcp.Command.handle(c); + @memcpy(buf_w.ptr, res.ptr[0..res.len]); + } + + sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler); + return .rearm; +} + +fn writeHandler( + self_: ?*Server, + loop: *xev.Loop, + comp: *xev.Completion, + sock: xev.TCP, + buf: xev.WriteBuffer, + e: xev.TCP.WriteError!usize, +) xev.CallbackAction { + _ = sock; + _ = comp; + _ = loop; + const self = self_.?; + _ = e catch |err| { + destroyBuffer(self, buf.slice); + log.err("write error {any}", .{err}); + return .disarm; + }; + + return .disarm; +} + +fn rejectHandler( + self_: ?*Server, + l: *xev.Loop, + c: *xev.Completion, + sock: xev.TCP, + buf: xev.WriteBuffer, + e: xev.TCP.WriteError!usize, +) xev.CallbackAction { + const self = self_.?; + _ = e catch |err| { + destroyBuffer(self, buf.slice); + log.err("write error {any}", .{err}); + return .disarm; + }; + + sock.close(l, c, Server, self, closeHandler); + return .disarm; +} + +fn shutdownHandler( + self_: ?*Server, + loop: *xev.Loop, + comp: *xev.Completion, + sock: xev.TCP, + e: xev.TCP.ShutdownError!void, +) xev.CallbackAction { + e catch { + // Is this even possible? + log.err("couldn't shutdown socket", .{}); + }; + + const self = self_.?; + + sock.close(loop, comp, Server, self, closeHandler); + return .disarm; +} + +pub fn closeHandler( + self_: ?*Server, + loop: *xev.Loop, + comp: *xev.Completion, + sock: xev.TCP, + e: xev.TCP.CloseError!void, +) xev.CallbackAction { + _ = sock; + _ = loop; + + e catch { + log.err("couldn't close socket", .{}); + }; + + const self = self_.?; + self.comp_pool.destroy(comp); + return .disarm; +} diff --git a/src/tcp/Thread.zig b/src/tcp/Thread.zig new file mode 100644 index 000000000..3e0f86a9a --- /dev/null +++ b/src/tcp/Thread.zig @@ -0,0 +1,50 @@ +//! Represents the libxev thread for handling TCP connections. +pub const Thread = @This(); + +const std = @import("std"); +const xev = @import("xev"); +const tcp = @import("../tcp.zig"); +const App = @import("../App.zig"); + +const Allocator = std.mem.Allocator; +const log = std.log.scoped(.tcp_thread); + +/// Allocator used for xev allocations. +alloc: std.mem.Allocator, + +/// The TCP server for handling incoming connections. +server: tcp.Server, + +/// Initialize the thread. This does not START the thread. This only sets +/// up all the internal state necessary prior to starting the thread. It +/// is up to the caller to start the thread with the threadMain entrypoint. +pub fn init(alloc: Allocator) !Thread { + // TODO: Configurable addresses and socket paths + const addr = try std.net.Address.parseIp4("127.0.0.1", 9090); + var server = try tcp.Server.init(alloc, addr); + errdefer server.deinit(); + + return Thread{ + .alloc = alloc, + .server = server, + }; +} + +/// Clean up the thread. This is only safe to call once the thread +/// completes executing; the caller must join prior to this. +pub fn deinit(self: *Thread) void { + self.server.deinit(); +} + +/// The main entrypoint for the thread. +pub fn threadMain(self: *Thread) void { + self.threadMain_() catch |err| { + log.warn("error in tcp thread err={any}", .{err}); + }; +} + +fn threadMain_(self: *Thread) !void { + log.debug("starting tcp thread", .{}); + defer log.debug("tcp thread exited", .{}); + try self.server.start(); +} diff --git a/src/tcp/commands/ping.zig b/src/tcp/commands/ping.zig new file mode 100644 index 000000000..e69de29bb diff --git a/src/tcp/handlers/reject.zig b/src/tcp/handlers/reject.zig new file mode 100644 index 000000000..85283f802 --- /dev/null +++ b/src/tcp/handlers/reject.zig @@ -0,0 +1,47 @@ +const xev = @import("xev"); +const std = @import("std"); +const Server = @import("../Server.zig").Server; + +const log = std.log.scoped(.tcp_thread); + +fn destroyBuffer(self: *Server, buf: []const u8) void { + self.buf_pool.destroy(@alignCast( + @as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))), + )); +} + +const RejectError = error{ + AllocError, + WriteError, +}; + +// TODO: Support client rejection reasons. +pub fn reject_client(self: *Server, c: *xev.TCP) !void { + const buf_w = self.buf_pool.create() catch return RejectError.AllocError; + const comp_w = self.comp_pool.create() catch { + destroyBuffer(self, buf_w); + return RejectError.AllocError; + }; + + @memcpy(buf_w.ptr, "ERR: Max connections reached\n"); + c.write(&self.loop, comp_w, .{ .slice = buf_w }, Server, self, wHandler); +} + +fn wHandler( + self_: ?*Server, + l: *xev.Loop, + c: *xev.Completion, + client: xev.TCP, + b: xev.WriteBuffer, + e: xev.TCP.WriteError!usize, +) xev.CallbackAction { + const self = self_.?; + _ = e catch |err| { + destroyBuffer(self, b.slice); + log.err("write error {any}", .{err}); + return .disarm; + }; + + client.close(l, c, Server, self, Server.closeHandler); + return .disarm; +}