tcp: add basic tcp server for client communication

This commit is contained in:
Aarnav Tale
2024-04-19 21:22:44 -04:00
parent 3e0a5d3a73
commit 403175bea8
7 changed files with 486 additions and 0 deletions

View File

@ -18,6 +18,7 @@ const fontconfig = @import("fontconfig");
const harfbuzz = @import("harfbuzz"); const harfbuzz = @import("harfbuzz");
const renderer = @import("renderer.zig"); const renderer = @import("renderer.zig");
const apprt = @import("apprt.zig"); const apprt = @import("apprt.zig");
const tcp = @import("tcp.zig");
const App = @import("App.zig"); const App = @import("App.zig");
const Ghostty = @import("main_c.zig").Ghostty; const Ghostty = @import("main_c.zig").Ghostty;
@ -112,6 +113,18 @@ pub fn main() !MainReturn {
// when the first surface is created. // when the first surface is created.
if (@hasDecl(apprt.App, "startQuitTimer")) app_runtime.startQuitTimer(); if (@hasDecl(apprt.App, "startQuitTimer")) app_runtime.startQuitTimer();
// Not sure where this should go tbh
var tcp_thread = try tcp.Thread.init(alloc);
defer tcp_thread.deinit();
var tcp_thr = try std.Thread.spawn(
.{},
tcp.Thread.threadMain,
.{&tcp_thread},
);
tcp_thr.setName("tcp") catch {};
// Run the GUI event loop // Run the GUI event loop
try app_runtime.run(); try app_runtime.run();
} }

9
src/tcp.zig Normal file
View File

@ -0,0 +1,9 @@
//! TCP implementation. The TCP implementation is responsible for
//! responding to TCP requests and dispatching them to the app's Mailbox.
pub const Thread = @import("tcp/Thread.zig");
pub const Server = @import("tcp/Server.zig");
pub const Command = @import("tcp/Command.zig").Command;
test {
@import("std").testing.refAllDecls(@This());
}

50
src/tcp/Command.zig Normal file
View File

@ -0,0 +1,50 @@
const std = @import("std");
const log = std.log.scoped(.tcp_thread);
pub const Command = enum {
ping,
pub const Error = error{
InvalidInput,
InvalidCommand,
};
/// Takes the bytes from our TCP handle and parses them into a Command.
pub fn parse(raw_read: []const u8) !Command {
const trimmed = std.mem.trim(u8, raw_read, "\t\n\r");
if (trimmed.len == 0) {
return error.InvalidInput;
}
var iter = std.mem.splitScalar(u8, trimmed, ' ');
// Not using .first() because it returns the remainder of the slice
// Instead we're doing what's equivalent to popping the first element
const cmdName = iter.next() orelse return error.InvalidInput;
log.debug("got command name={s}", .{cmdName});
// TODO: Handle/support arguments
return std.meta.stringToEnum(Command, cmdName) orelse return error.InvalidCommand;
}
pub fn handle(self: Command) ![]const u8 {
switch (self) {
.ping => return ping(),
}
}
pub fn handleError(err: Command.Error) ![]const u8 {
switch (err) {
error.InvalidInput => return "INVALID INPUT\n",
error.InvalidCommand => return "INVALID COMMAND\n",
}
}
};
// Want to move this to different file, not sure how I want it organized yet
fn ping() []const u8 {
return "PONG\n";
}
// TODO: These need proper testing.

317
src/tcp/Server.zig Normal file
View File

@ -0,0 +1,317 @@
const std = @import("std");
const xev = @import("xev");
const tcp = @import("../tcp.zig");
const reject_client = @import("./handlers/reject.zig").reject_client;
const Allocator = std.mem.Allocator;
const CompletionPool = std.heap.MemoryPool(xev.Completion);
const SocketPool = std.heap.MemoryPool(xev.TCP);
const BufferPool = std.heap.MemoryPool([1024]u8);
const log = std.log.scoped(.tcp_thread);
/// Maximum connections we allow at once
const MAX_CLIENTS = 2;
/// Acceptor polling rate in milliseconds
const ACCEPTOR_RATE = 1;
/// A wrapper around the TCP server socket
pub const Server = @This();
/// Memory pool that stores the completions required by xev
comp_pool: CompletionPool,
/// Memory pool that stores client sockets
sock_pool: SocketPool,
/// Memory pool that stores buffers for reading and writing
buf_pool: BufferPool,
/// Stop flag
stop: bool,
/// Event loop
loop: xev.Loop,
/// Server socket
socket: xev.TCP,
/// Timer for accepting connections
acceptor: xev.Timer,
/// Array of client sockets
clients: [MAX_CLIENTS]*xev.TCP,
/// Number of clients connected
clients_count: usize,
/// Initializes the server with the given allocator and address
pub fn init(alloc: Allocator, addr: std.net.Address) !Server {
const socket = try xev.TCP.init(addr);
try socket.bind(addr);
return Server{
.comp_pool = CompletionPool.init(alloc),
.sock_pool = SocketPool.init(alloc),
.buf_pool = BufferPool.init(alloc),
.stop = false,
.loop = try xev.Loop.init(.{}),
.socket = socket,
.acceptor = try xev.Timer.init(),
.clients = undefined,
.clients_count = 0,
};
}
/// Deinitializes the server
pub fn deinit(self: *Server) void {
self.buf_pool.deinit();
self.comp_pool.deinit();
self.sock_pool.deinit();
self.loop.deinit();
self.acceptor.deinit();
}
/// Starts the timer which tries to accept connections
pub fn start(self: *Server) !void {
try self.socket.listen(MAX_CLIENTS);
log.info("bound server to socket={any}", .{self.socket});
// Each acceptor borrows a completion from the pool
// We do this because the completion is passed to the client TCP handlers
const c = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return error.OutOfMemory;
};
self.acceptor.run(&self.loop, c, ACCEPTOR_RATE, Server, self, acceptor);
while (!self.stop) {
try self.loop.run(.until_done);
}
}
/// Convenience function to destroy a buffer in our pool
fn destroyBuffer(self: *Server, buf: []const u8) void {
self.buf_pool.destroy(@alignCast(
@as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))),
));
}
/// This runs on a loop and attempts to accept a connection to pass into the
/// handlers that manage client communication. It is very important to note
/// that the xev.Completion is shared between the acceptor and other handlers.
/// When a client disconnects, only then the completion is freed.
fn acceptor(
self_: ?*Server,
loop: *xev.Loop,
c: *xev.Completion,
e: xev.Timer.RunError!void,
) xev.CallbackAction {
const self = self_.?;
e catch {
log.err("timer error", .{});
return .disarm;
};
self.socket.accept(loop, c, Server, self, acceptHandler);
// We need to create a new completion for the next acceptor since each
// TCP connection will need its own if it successfully accepts.
const accept_recomp = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return .disarm;
};
// We can't rearm because it'll repeat *this* instance of the acceptor
// So if the socket fails to accept it won't actually accept anything new
self.acceptor.run(loop, accept_recomp, ACCEPTOR_RATE, Server, self, acceptor);
return .disarm;
}
/// Accepts a new client connection and starts reading from it until EOF.
fn acceptHandler(
self_: ?*Server,
l: *xev.Loop,
c: *xev.Completion,
e: xev.TCP.AcceptError!xev.TCP,
) xev.CallbackAction {
const self = self_.?;
const sock = self.sock_pool.create() catch {
log.err("couldn't allocate socket in pool", .{});
return .disarm;
};
sock.* = e catch {
log.err("accept error", .{});
self.sock_pool.destroy(sock);
return .disarm;
};
const comp_w = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return .disarm;
};
const buf_w = self.buf_pool.create() catch {
log.err("couldn't allocate buffer in pool", .{});
self.sock_pool.destroy(sock);
return .disarm;
};
if (self.clients_count == MAX_CLIENTS) {
log.warn("max clients reached, rejecting fd={d}", .{sock.fd});
reject_client(self, sock) catch return .rearm;
return .disarm;
}
log.info("accepted connection fd={d}", .{sock.fd});
self.clients[self.clients_count] = sock;
self.clients_count += 1;
const buf_r = self.buf_pool.create() catch {
log.err("couldn't allocate buffer in pool", .{});
return .disarm;
};
@memcpy(buf_w.ptr, "GHOSTTY\n");
sock.write(l, comp_w, .{ .slice = buf_w }, Server, self, writeHandler);
sock.read(l, c, .{ .slice = buf_r }, Server, self, readHandler);
return .disarm;
}
fn readHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
buf: xev.ReadBuffer,
e: xev.TCP.ReadError!usize,
) xev.CallbackAction {
const self = self_.?;
const l = e catch |err| {
switch (err) {
error.EOF => {
log.info("disconnecting client fd={d}", .{sock.fd});
destroyBuffer(self, buf.slice);
self.clients_count -= 1;
sock.close(loop, comp, Server, self, closeHandler);
return .disarm;
},
else => {
destroyBuffer(self, buf.slice);
log.err("read error", .{});
return .disarm;
},
}
};
const comp_w = self.comp_pool.create() catch {
log.err("couldn't allocate completion in pool", .{});
return .rearm;
};
const buf_w = self.buf_pool.create() catch {
log.err("couldn't allocate buffer in pool", .{});
return .rearm;
};
// TODO: What should we do with the rest of the buffer?
var iter = std.mem.splitScalar(u8, buf.slice[0..l], '\n');
while (iter.next()) |item| {
if (item.len == 0) {
continue;
}
const c = tcp.Command.parse(item) catch |err| {
const res = try tcp.Command.handleError(err);
@memcpy(buf_w.ptr, res.ptr[0..res.len]);
sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler);
return .rearm;
};
const res = try tcp.Command.handle(c);
@memcpy(buf_w.ptr, res.ptr[0..res.len]);
}
sock.write(loop, comp_w, .{ .slice = buf_w }, Server, self, writeHandler);
return .rearm;
}
fn writeHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
buf: xev.WriteBuffer,
e: xev.TCP.WriteError!usize,
) xev.CallbackAction {
_ = sock;
_ = comp;
_ = loop;
const self = self_.?;
_ = e catch |err| {
destroyBuffer(self, buf.slice);
log.err("write error {any}", .{err});
return .disarm;
};
return .disarm;
}
fn rejectHandler(
self_: ?*Server,
l: *xev.Loop,
c: *xev.Completion,
sock: xev.TCP,
buf: xev.WriteBuffer,
e: xev.TCP.WriteError!usize,
) xev.CallbackAction {
const self = self_.?;
_ = e catch |err| {
destroyBuffer(self, buf.slice);
log.err("write error {any}", .{err});
return .disarm;
};
sock.close(l, c, Server, self, closeHandler);
return .disarm;
}
fn shutdownHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
e: xev.TCP.ShutdownError!void,
) xev.CallbackAction {
e catch {
// Is this even possible?
log.err("couldn't shutdown socket", .{});
};
const self = self_.?;
sock.close(loop, comp, Server, self, closeHandler);
return .disarm;
}
pub fn closeHandler(
self_: ?*Server,
loop: *xev.Loop,
comp: *xev.Completion,
sock: xev.TCP,
e: xev.TCP.CloseError!void,
) xev.CallbackAction {
_ = sock;
_ = loop;
e catch {
log.err("couldn't close socket", .{});
};
const self = self_.?;
self.comp_pool.destroy(comp);
return .disarm;
}

50
src/tcp/Thread.zig Normal file
View File

@ -0,0 +1,50 @@
//! Represents the libxev thread for handling TCP connections.
pub const Thread = @This();
const std = @import("std");
const xev = @import("xev");
const tcp = @import("../tcp.zig");
const App = @import("../App.zig");
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.tcp_thread);
/// Allocator used for xev allocations.
alloc: std.mem.Allocator,
/// The TCP server for handling incoming connections.
server: tcp.Server,
/// Initialize the thread. This does not START the thread. This only sets
/// up all the internal state necessary prior to starting the thread. It
/// is up to the caller to start the thread with the threadMain entrypoint.
pub fn init(alloc: Allocator) !Thread {
// TODO: Configurable addresses and socket paths
const addr = try std.net.Address.parseIp4("127.0.0.1", 9090);
var server = try tcp.Server.init(alloc, addr);
errdefer server.deinit();
return Thread{
.alloc = alloc,
.server = server,
};
}
/// Clean up the thread. This is only safe to call once the thread
/// completes executing; the caller must join prior to this.
pub fn deinit(self: *Thread) void {
self.server.deinit();
}
/// The main entrypoint for the thread.
pub fn threadMain(self: *Thread) void {
self.threadMain_() catch |err| {
log.warn("error in tcp thread err={any}", .{err});
};
}
fn threadMain_(self: *Thread) !void {
log.debug("starting tcp thread", .{});
defer log.debug("tcp thread exited", .{});
try self.server.start();
}

View File

View File

@ -0,0 +1,47 @@
const xev = @import("xev");
const std = @import("std");
const Server = @import("../Server.zig").Server;
const log = std.log.scoped(.tcp_thread);
fn destroyBuffer(self: *Server, buf: []const u8) void {
self.buf_pool.destroy(@alignCast(
@as(*[1024]u8, @ptrFromInt(@intFromPtr(buf.ptr))),
));
}
const RejectError = error{
AllocError,
WriteError,
};
// TODO: Support client rejection reasons.
pub fn reject_client(self: *Server, c: *xev.TCP) !void {
const buf_w = self.buf_pool.create() catch return RejectError.AllocError;
const comp_w = self.comp_pool.create() catch {
destroyBuffer(self, buf_w);
return RejectError.AllocError;
};
@memcpy(buf_w.ptr, "ERR: Max connections reached\n");
c.write(&self.loop, comp_w, .{ .slice = buf_w }, Server, self, wHandler);
}
fn wHandler(
self_: ?*Server,
l: *xev.Loop,
c: *xev.Completion,
client: xev.TCP,
b: xev.WriteBuffer,
e: xev.TCP.WriteError!usize,
) xev.CallbackAction {
const self = self_.?;
_ = e catch |err| {
destroyBuffer(self, b.slice);
log.err("write error {any}", .{err});
return .disarm;
};
client.close(l, c, Server, self, Server.closeHandler);
return .disarm;
}