termio: Thread doesn't need to hold termio pointer

This commit is contained in:
Mitchell Hashimoto
2024-07-14 10:27:58 -07:00
parent f50c15c350
commit 31144da845
2 changed files with 57 additions and 47 deletions

View File

@ -422,7 +422,7 @@ pub fn init(
errdefer io.deinit(); errdefer io.deinit();
// Create the IO thread // Create the IO thread
var io_thread = try termio.Thread.init(alloc, &self.io); var io_thread = try termio.Thread.init(alloc);
errdefer io_thread.deinit(); errdefer io_thread.deinit();
self.* = .{ self.* = .{
@ -483,7 +483,7 @@ pub fn init(
self.io_thr = try std.Thread.spawn( self.io_thr = try std.Thread.spawn(
.{}, .{},
termio.Thread.threadMain, termio.Thread.threadMain,
.{&self.io_thread}, .{ &self.io_thread, &self.io },
); );
self.io_thr.setName("io") catch {}; self.io_thr.setName("io") catch {};

View File

@ -1,5 +1,14 @@
//! Represents the IO thread logic. The IO thread is responsible for //! Represents the "writer" thread for terminal IO. The reader side is
//! the child process and pty management. //! handled by the Termio struct itself and dependent on the underlying
//! implementation (i.e. if its a pty, manual, etc.).
//!
//! The writer thread does handle writing bytes to the pty but also handles
//! different events such as starting synchronized output, changing some
//! modes (like linefeed), etc. The goal is to offload as much from the
//! reader thread as possible since it is the hot path in parsing VT
//! sequences and updating terminal state.
//!
//! This thread state can only be used by one thread at a time.
pub const Thread = @This(); pub const Thread = @This();
const std = @import("std"); const std = @import("std");
@ -58,9 +67,6 @@ sync_reset: xev.Timer,
sync_reset_c: xev.Completion = .{}, sync_reset_c: xev.Completion = .{},
sync_reset_cancel_c: xev.Completion = .{}, sync_reset_cancel_c: xev.Completion = .{},
/// The main termio state.
termio: *termio.Termio,
/// The mailbox that can be used to send this thread messages. Note /// The mailbox that can be used to send this thread messages. Note
/// this is a blocking queue so if it is full you will get errors (or block). /// this is a blocking queue so if it is full you will get errors (or block).
mailbox: *Mailbox, mailbox: *Mailbox,
@ -83,7 +89,6 @@ flags: packed struct {
/// is up to the caller to start the thread with the threadMain entrypoint. /// is up to the caller to start the thread with the threadMain entrypoint.
pub fn init( pub fn init(
alloc: Allocator, alloc: Allocator,
t: *termio.Termio,
) !Thread { ) !Thread {
// Create our event loop. // Create our event loop.
var loop = try xev.Loop.init(.{}); var loop = try xev.Loop.init(.{});
@ -116,7 +121,6 @@ pub fn init(
.stop = stop_h, .stop = stop_h,
.coalesce = coalesce_h, .coalesce = coalesce_h,
.sync_reset = sync_reset_h, .sync_reset = sync_reset_h,
.termio = t,
.mailbox = mailbox, .mailbox = mailbox,
}; };
} }
@ -135,9 +139,9 @@ pub fn deinit(self: *Thread) void {
} }
/// The main entrypoint for the thread. /// The main entrypoint for the thread.
pub fn threadMain(self: *Thread) void { pub fn threadMain(self: *Thread, io: *termio.Termio) void {
// Call child function so we can use errors... // Call child function so we can use errors...
self.threadMain_() catch |err| { self.threadMain_(io) catch |err| {
log.warn("error in io thread err={}", .{err}); log.warn("error in io thread err={}", .{err});
// Use an arena to simplify memory management below // Use an arena to simplify memory management below
@ -150,9 +154,9 @@ pub fn threadMain(self: *Thread) void {
// the error to the surface thread and let the apprt deal with it // the error to the surface thread and let the apprt deal with it
// in some way but this works for now. Without this, the user would // in some way but this works for now. Without this, the user would
// just see a blank terminal window. // just see a blank terminal window.
self.termio.renderer_state.mutex.lock(); io.renderer_state.mutex.lock();
defer self.termio.renderer_state.mutex.unlock(); defer io.renderer_state.mutex.unlock();
const t = self.termio.renderer_state.terminal; const t = io.renderer_state.terminal;
// Hide the cursor // Hide the cursor
t.modes.set(.cursor_visible, false); t.modes.set(.cursor_visible, false);
@ -216,20 +220,20 @@ pub fn threadMain(self: *Thread) void {
} }
} }
fn threadMain_(self: *Thread) !void { fn threadMain_(self: *Thread, io: *termio.Termio) !void {
defer log.debug("IO thread exited", .{}); defer log.debug("IO thread exited", .{});
// This is the data sent to xev callbacks. We want a pointer to both // This is the data sent to xev callbacks. We want a pointer to both
// ourselves and the thread data so we can thread that through (pun intended). // ourselves and the thread data so we can thread that through (pun intended).
var cb: CallbackData = .{ .self = self }; var cb: CallbackData = .{ .self = self, .io = io };
// Run our thread start/end callbacks. This allows the implementation // Run our thread start/end callbacks. This allows the implementation
// to hook into the event loop as needed. The thread data is created // to hook into the event loop as needed. The thread data is created
// on the stack here so that it has a stable pointer throughout the // on the stack here so that it has a stable pointer throughout the
// lifetime of the thread. // lifetime of the thread.
try self.termio.threadEnter(self, &cb.data); try io.threadEnter(self, &cb.data);
defer cb.data.deinit(); defer cb.data.deinit();
defer self.termio.threadExit(&cb.data); defer io.threadExit(&cb.data);
// Start the async handlers. // Start the async handlers.
self.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback); self.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback);
@ -244,17 +248,24 @@ fn threadMain_(self: *Thread) !void {
/// This is the data passed to xev callbacks on the thread. /// This is the data passed to xev callbacks on the thread.
const CallbackData = struct { const CallbackData = struct {
self: *Thread, self: *Thread,
io: *termio.Termio,
data: termio.Termio.ThreadData = undefined, data: termio.Termio.ThreadData = undefined,
}; };
/// Drain the mailbox, handling all the messages in our terminal implementation. /// Drain the mailbox, handling all the messages in our terminal implementation.
fn drainMailbox(self: *Thread, data: *termio.Termio.ThreadData) !void { fn drainMailbox(
self: *Thread,
cb: *CallbackData,
) !void {
// If we're draining, we just drain the mailbox and return. // If we're draining, we just drain the mailbox and return.
if (self.flags.drain) { if (self.flags.drain) {
while (self.mailbox.pop()) |_| {} while (self.mailbox.pop()) |_| {}
return; return;
} }
const io = cb.io;
const data = &cb.data;
// This holds the mailbox lock for the duration of the drain. The // This holds the mailbox lock for the duration of the drain. The
// expectation is that all our message handlers will be non-blocking // expectation is that all our message handlers will be non-blocking
// ENOUGH to not mess up throughput on producers. // ENOUGH to not mess up throughput on producers.
@ -267,29 +278,29 @@ fn drainMailbox(self: *Thread, data: *termio.Termio.ThreadData) !void {
switch (message) { switch (message) {
.change_config => |config| { .change_config => |config| {
defer config.alloc.destroy(config.ptr); defer config.alloc.destroy(config.ptr);
try self.termio.changeConfig(data, config.ptr); try io.changeConfig(data, config.ptr);
}, },
.inspector => |v| self.flags.has_inspector = v, .inspector => |v| self.flags.has_inspector = v,
.resize => |v| self.handleResize(v), .resize => |v| self.handleResize(cb, v),
.clear_screen => |v| try self.termio.clearScreen(data, v.history), .clear_screen => |v| try io.clearScreen(data, v.history),
.scroll_viewport => |v| try self.termio.scrollViewport(v), .scroll_viewport => |v| try io.scrollViewport(v),
.jump_to_prompt => |v| try self.termio.jumpToPrompt(v), .jump_to_prompt => |v| try io.jumpToPrompt(v),
.start_synchronized_output => self.startSynchronizedOutput(), .start_synchronized_output => self.startSynchronizedOutput(cb),
.linefeed_mode => |v| self.flags.linefeed_mode = v, .linefeed_mode => |v| self.flags.linefeed_mode = v,
.child_exited_abnormally => |v| try self.termio.childExitedAbnormally(v.exit_code, v.runtime_ms), .child_exited_abnormally => |v| try io.childExitedAbnormally(v.exit_code, v.runtime_ms),
.write_small => |v| try self.termio.queueWrite( .write_small => |v| try io.queueWrite(
data, data,
v.data[0..v.len], v.data[0..v.len],
self.flags.linefeed_mode, self.flags.linefeed_mode,
), ),
.write_stable => |v| try self.termio.queueWrite( .write_stable => |v| try io.queueWrite(
data, data,
v, v,
self.flags.linefeed_mode, self.flags.linefeed_mode,
), ),
.write_alloc => |v| { .write_alloc => |v| {
defer v.alloc.free(v.data); defer v.alloc.free(v.data);
try self.termio.queueWrite( try io.queueWrite(
data, data,
v.data, v.data,
self.flags.linefeed_mode, self.flags.linefeed_mode,
@ -301,23 +312,23 @@ fn drainMailbox(self: *Thread, data: *termio.Termio.ThreadData) !void {
// Trigger a redraw after we've drained so we don't waste cyces // Trigger a redraw after we've drained so we don't waste cyces
// messaging a redraw. // messaging a redraw.
if (redraw) { if (redraw) {
try self.termio.renderer_wakeup.notify(); try io.renderer_wakeup.notify();
} }
} }
fn startSynchronizedOutput(self: *Thread) void { fn startSynchronizedOutput(self: *Thread, cb: *CallbackData) void {
self.sync_reset.reset( self.sync_reset.reset(
&self.loop, &self.loop,
&self.sync_reset_c, &self.sync_reset_c,
&self.sync_reset_cancel_c, &self.sync_reset_cancel_c,
sync_reset_ms, sync_reset_ms,
Thread, CallbackData,
self, cb,
syncResetCallback, syncResetCallback,
); );
} }
fn handleResize(self: *Thread, resize: termio.Message.Resize) void { fn handleResize(self: *Thread, cb: *CallbackData, resize: termio.Message.Resize) void {
self.coalesce_data.resize = resize; self.coalesce_data.resize = resize;
// If the timer is already active we just return. In the future we want // If the timer is already active we just return. In the future we want
@ -330,14 +341,14 @@ fn handleResize(self: *Thread, resize: termio.Message.Resize) void {
&self.coalesce_c, &self.coalesce_c,
&self.coalesce_cancel_c, &self.coalesce_cancel_c,
Coalesce.min_ms, Coalesce.min_ms,
Thread, CallbackData,
self, cb,
coalesceCallback, coalesceCallback,
); );
} }
fn syncResetCallback( fn syncResetCallback(
self_: ?*Thread, cb_: ?*CallbackData,
_: *xev.Loop, _: *xev.Loop,
_: *xev.Completion, _: *xev.Completion,
r: xev.Timer.RunError!void, r: xev.Timer.RunError!void,
@ -350,13 +361,13 @@ fn syncResetCallback(
}, },
}; };
const self = self_ orelse return .disarm; const cb = cb_ orelse return .disarm;
self.termio.resetSynchronizedOutput(); cb.io.resetSynchronizedOutput();
return .disarm; return .disarm;
} }
fn coalesceCallback( fn coalesceCallback(
self_: ?*Thread, cb_: ?*CallbackData,
_: *xev.Loop, _: *xev.Loop,
_: *xev.Completion, _: *xev.Completion,
r: xev.Timer.RunError!void, r: xev.Timer.RunError!void,
@ -369,11 +380,11 @@ fn coalesceCallback(
}, },
}; };
const self = self_ orelse return .disarm; const cb = cb_ orelse return .disarm;
if (self.coalesce_data.resize) |v| { if (cb.self.coalesce_data.resize) |v| {
self.coalesce_data.resize = null; cb.self.coalesce_data.resize = null;
self.termio.resize(v.grid_size, v.screen_size, v.padding) catch |err| { cb.io.resize(v.grid_size, v.screen_size, v.padding) catch |err| {
log.warn("error during resize err={}", .{err}); log.warn("error during resize err={}", .{err});
}; };
} }
@ -392,11 +403,10 @@ fn wakeupCallback(
return .rearm; return .rearm;
}; };
const cb = cb_ orelse return .rearm;
// When we wake up, we check the mailbox. Mailbox producers should // When we wake up, we check the mailbox. Mailbox producers should
// wake up our thread after publishing. // wake up our thread after publishing.
cb.self.drainMailbox(&cb.data) catch |err| const cb = cb_ orelse return .rearm;
cb.self.drainMailbox(cb) catch |err|
log.err("error draining mailbox err={}", .{err}); log.err("error draining mailbox err={}", .{err});
return .rearm; return .rearm;