From 31144da8456cdb4a1d1d7765a02f38ac814765a1 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 14 Jul 2024 10:27:58 -0700 Subject: [PATCH] termio: Thread doesn't need to hold termio pointer --- src/Surface.zig | 4 +- src/termio/Thread.zig | 100 +++++++++++++++++++++++------------------- 2 files changed, 57 insertions(+), 47 deletions(-) diff --git a/src/Surface.zig b/src/Surface.zig index 94bb058b3..68ab9214a 100644 --- a/src/Surface.zig +++ b/src/Surface.zig @@ -422,7 +422,7 @@ pub fn init( errdefer io.deinit(); // Create the IO thread - var io_thread = try termio.Thread.init(alloc, &self.io); + var io_thread = try termio.Thread.init(alloc); errdefer io_thread.deinit(); self.* = .{ @@ -483,7 +483,7 @@ pub fn init( self.io_thr = try std.Thread.spawn( .{}, termio.Thread.threadMain, - .{&self.io_thread}, + .{ &self.io_thread, &self.io }, ); self.io_thr.setName("io") catch {}; diff --git a/src/termio/Thread.zig b/src/termio/Thread.zig index 97acb2acf..44d851998 100644 --- a/src/termio/Thread.zig +++ b/src/termio/Thread.zig @@ -1,5 +1,14 @@ -//! Represents the IO thread logic. The IO thread is responsible for -//! the child process and pty management. +//! Represents the "writer" thread for terminal IO. The reader side is +//! handled by the Termio struct itself and dependent on the underlying +//! implementation (i.e. if its a pty, manual, etc.). +//! +//! The writer thread does handle writing bytes to the pty but also handles +//! different events such as starting synchronized output, changing some +//! modes (like linefeed), etc. The goal is to offload as much from the +//! reader thread as possible since it is the hot path in parsing VT +//! sequences and updating terminal state. +//! +//! This thread state can only be used by one thread at a time. pub const Thread = @This(); const std = @import("std"); @@ -58,9 +67,6 @@ sync_reset: xev.Timer, sync_reset_c: xev.Completion = .{}, sync_reset_cancel_c: xev.Completion = .{}, -/// The main termio state. -termio: *termio.Termio, - /// The mailbox that can be used to send this thread messages. Note /// this is a blocking queue so if it is full you will get errors (or block). mailbox: *Mailbox, @@ -83,7 +89,6 @@ flags: packed struct { /// is up to the caller to start the thread with the threadMain entrypoint. pub fn init( alloc: Allocator, - t: *termio.Termio, ) !Thread { // Create our event loop. var loop = try xev.Loop.init(.{}); @@ -116,7 +121,6 @@ pub fn init( .stop = stop_h, .coalesce = coalesce_h, .sync_reset = sync_reset_h, - .termio = t, .mailbox = mailbox, }; } @@ -135,9 +139,9 @@ pub fn deinit(self: *Thread) void { } /// The main entrypoint for the thread. -pub fn threadMain(self: *Thread) void { +pub fn threadMain(self: *Thread, io: *termio.Termio) void { // Call child function so we can use errors... - self.threadMain_() catch |err| { + self.threadMain_(io) catch |err| { log.warn("error in io thread err={}", .{err}); // Use an arena to simplify memory management below @@ -150,9 +154,9 @@ pub fn threadMain(self: *Thread) void { // the error to the surface thread and let the apprt deal with it // in some way but this works for now. Without this, the user would // just see a blank terminal window. - self.termio.renderer_state.mutex.lock(); - defer self.termio.renderer_state.mutex.unlock(); - const t = self.termio.renderer_state.terminal; + io.renderer_state.mutex.lock(); + defer io.renderer_state.mutex.unlock(); + const t = io.renderer_state.terminal; // Hide the cursor t.modes.set(.cursor_visible, false); @@ -216,20 +220,20 @@ pub fn threadMain(self: *Thread) void { } } -fn threadMain_(self: *Thread) !void { +fn threadMain_(self: *Thread, io: *termio.Termio) !void { defer log.debug("IO thread exited", .{}); // This is the data sent to xev callbacks. We want a pointer to both // ourselves and the thread data so we can thread that through (pun intended). - var cb: CallbackData = .{ .self = self }; + var cb: CallbackData = .{ .self = self, .io = io }; // Run our thread start/end callbacks. This allows the implementation // to hook into the event loop as needed. The thread data is created // on the stack here so that it has a stable pointer throughout the // lifetime of the thread. - try self.termio.threadEnter(self, &cb.data); + try io.threadEnter(self, &cb.data); defer cb.data.deinit(); - defer self.termio.threadExit(&cb.data); + defer io.threadExit(&cb.data); // Start the async handlers. self.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback); @@ -244,17 +248,24 @@ fn threadMain_(self: *Thread) !void { /// This is the data passed to xev callbacks on the thread. const CallbackData = struct { self: *Thread, + io: *termio.Termio, data: termio.Termio.ThreadData = undefined, }; /// Drain the mailbox, handling all the messages in our terminal implementation. -fn drainMailbox(self: *Thread, data: *termio.Termio.ThreadData) !void { +fn drainMailbox( + self: *Thread, + cb: *CallbackData, +) !void { // If we're draining, we just drain the mailbox and return. if (self.flags.drain) { while (self.mailbox.pop()) |_| {} return; } + const io = cb.io; + const data = &cb.data; + // This holds the mailbox lock for the duration of the drain. The // expectation is that all our message handlers will be non-blocking // ENOUGH to not mess up throughput on producers. @@ -267,29 +278,29 @@ fn drainMailbox(self: *Thread, data: *termio.Termio.ThreadData) !void { switch (message) { .change_config => |config| { defer config.alloc.destroy(config.ptr); - try self.termio.changeConfig(data, config.ptr); + try io.changeConfig(data, config.ptr); }, .inspector => |v| self.flags.has_inspector = v, - .resize => |v| self.handleResize(v), - .clear_screen => |v| try self.termio.clearScreen(data, v.history), - .scroll_viewport => |v| try self.termio.scrollViewport(v), - .jump_to_prompt => |v| try self.termio.jumpToPrompt(v), - .start_synchronized_output => self.startSynchronizedOutput(), + .resize => |v| self.handleResize(cb, v), + .clear_screen => |v| try io.clearScreen(data, v.history), + .scroll_viewport => |v| try io.scrollViewport(v), + .jump_to_prompt => |v| try io.jumpToPrompt(v), + .start_synchronized_output => self.startSynchronizedOutput(cb), .linefeed_mode => |v| self.flags.linefeed_mode = v, - .child_exited_abnormally => |v| try self.termio.childExitedAbnormally(v.exit_code, v.runtime_ms), - .write_small => |v| try self.termio.queueWrite( + .child_exited_abnormally => |v| try io.childExitedAbnormally(v.exit_code, v.runtime_ms), + .write_small => |v| try io.queueWrite( data, v.data[0..v.len], self.flags.linefeed_mode, ), - .write_stable => |v| try self.termio.queueWrite( + .write_stable => |v| try io.queueWrite( data, v, self.flags.linefeed_mode, ), .write_alloc => |v| { defer v.alloc.free(v.data); - try self.termio.queueWrite( + try io.queueWrite( data, v.data, self.flags.linefeed_mode, @@ -301,23 +312,23 @@ fn drainMailbox(self: *Thread, data: *termio.Termio.ThreadData) !void { // Trigger a redraw after we've drained so we don't waste cyces // messaging a redraw. if (redraw) { - try self.termio.renderer_wakeup.notify(); + try io.renderer_wakeup.notify(); } } -fn startSynchronizedOutput(self: *Thread) void { +fn startSynchronizedOutput(self: *Thread, cb: *CallbackData) void { self.sync_reset.reset( &self.loop, &self.sync_reset_c, &self.sync_reset_cancel_c, sync_reset_ms, - Thread, - self, + CallbackData, + cb, syncResetCallback, ); } -fn handleResize(self: *Thread, resize: termio.Message.Resize) void { +fn handleResize(self: *Thread, cb: *CallbackData, resize: termio.Message.Resize) void { self.coalesce_data.resize = resize; // If the timer is already active we just return. In the future we want @@ -330,14 +341,14 @@ fn handleResize(self: *Thread, resize: termio.Message.Resize) void { &self.coalesce_c, &self.coalesce_cancel_c, Coalesce.min_ms, - Thread, - self, + CallbackData, + cb, coalesceCallback, ); } fn syncResetCallback( - self_: ?*Thread, + cb_: ?*CallbackData, _: *xev.Loop, _: *xev.Completion, r: xev.Timer.RunError!void, @@ -350,13 +361,13 @@ fn syncResetCallback( }, }; - const self = self_ orelse return .disarm; - self.termio.resetSynchronizedOutput(); + const cb = cb_ orelse return .disarm; + cb.io.resetSynchronizedOutput(); return .disarm; } fn coalesceCallback( - self_: ?*Thread, + cb_: ?*CallbackData, _: *xev.Loop, _: *xev.Completion, r: xev.Timer.RunError!void, @@ -369,11 +380,11 @@ fn coalesceCallback( }, }; - const self = self_ orelse return .disarm; + const cb = cb_ orelse return .disarm; - if (self.coalesce_data.resize) |v| { - self.coalesce_data.resize = null; - self.termio.resize(v.grid_size, v.screen_size, v.padding) catch |err| { + if (cb.self.coalesce_data.resize) |v| { + cb.self.coalesce_data.resize = null; + cb.io.resize(v.grid_size, v.screen_size, v.padding) catch |err| { log.warn("error during resize err={}", .{err}); }; } @@ -392,11 +403,10 @@ fn wakeupCallback( return .rearm; }; - const cb = cb_ orelse return .rearm; - // When we wake up, we check the mailbox. Mailbox producers should // wake up our thread after publishing. - cb.self.drainMailbox(&cb.data) catch |err| + const cb = cb_ orelse return .rearm; + cb.self.drainMailbox(cb) catch |err| log.err("error draining mailbox err={}", .{err}); return .rearm;