termio: cleanup more state

This commit is contained in:
Mitchell Hashimoto
2024-07-13 14:59:25 -07:00
parent bfbbe1485e
commit 2e62e3354b
3 changed files with 56 additions and 72 deletions

View File

@ -236,8 +236,8 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
errdefer posix.close(pipe[1]); errdefer posix.close(pipe[1]);
// Setup our data that is used for callbacks // Setup our data that is used for callbacks
var ev_data_ptr = try alloc.create(EventData); var read_data_ptr = try alloc.create(ReadData);
errdefer alloc.destroy(ev_data_ptr); errdefer alloc.destroy(read_data_ptr);
// Setup our stream so that we can write. // Setup our stream so that we can write.
var stream = xev.Stream.initFd(pty_fds.write); var stream = xev.Stream.initFd(pty_fds.write);
@ -282,14 +282,10 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
}; };
// Setup our event data before we start // Setup our event data before we start
ev_data_ptr.* = .{ read_data_ptr.* = .{
.writer_mailbox = thread.mailbox,
.writer_wakeup = thread.wakeup,
.surface_mailbox = self.surface_mailbox,
.renderer_state = self.renderer_state, .renderer_state = self.renderer_state,
.renderer_wakeup = self.renderer_wakeup, .renderer_wakeup = self.renderer_wakeup,
.renderer_mailbox = self.renderer_mailbox, .renderer_mailbox = self.renderer_mailbox,
.process = process,
.loop = &thread.loop, .loop = &thread.loop,
.terminal_stream = .{ .terminal_stream = .{
.handler = handler, .handler = handler,
@ -302,41 +298,45 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
}, },
}, },
}; };
errdefer ev_data_ptr.deinit(self.alloc); errdefer read_data_ptr.deinit();
// Start our process watcher
process.wait(
ev_data_ptr.loop,
&ev_data_ptr.process_wait_c,
ThreadData,
data,
processExit,
);
// Start our reader thread // Start our reader thread
const read_thread = try std.Thread.spawn( const read_thread = try std.Thread.spawn(
.{}, .{},
if (builtin.os.tag == .windows) ReadThread.threadMainWindows else ReadThread.threadMainPosix, if (builtin.os.tag == .windows) ReadThread.threadMainWindows else ReadThread.threadMainPosix,
.{ pty_fds.read, ev_data_ptr, pipe[0] }, .{ pty_fds.read, read_data_ptr, pipe[0] },
); );
read_thread.setName("io-reader") catch {}; read_thread.setName("io-reader") catch {};
// Return our thread data // Return our thread data
data.* = .{ data.* = .{
.alloc = alloc, .alloc = alloc,
.ev = ev_data_ptr,
.loop = &thread.loop, .loop = &thread.loop,
.renderer_state = self.renderer_state,
.surface_mailbox = self.surface_mailbox, .surface_mailbox = self.surface_mailbox,
.writer_mailbox = thread.mailbox,
.writer_wakeup = thread.wakeup,
.reader = .{ .exec = .{ .reader = .{ .exec = .{
.start = process_start, .start = process_start,
.abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms, .abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms,
.wait_after_command = self.config.wait_after_command, .wait_after_command = self.config.wait_after_command,
.write_stream = stream, .write_stream = stream,
.process = process,
} }, } },
.read_thread = read_thread, .read_thread = read_thread,
.read_thread_pipe = pipe[1], .read_thread_pipe = pipe[1],
.read_thread_fd = if (builtin.os.tag == .windows) pty_fds.read else {}, .read_thread_fd = if (builtin.os.tag == .windows) pty_fds.read else {},
.read_thread_data = read_data_ptr,
}; };
// Start our process watcher
process.wait(
&thread.loop,
&data.reader.exec.process_wait_c,
ThreadData,
data,
processExit,
);
} }
/// This outputs an error message when exec failed and we are the /// This outputs an error message when exec failed and we are the
@ -405,7 +405,7 @@ pub fn changeConfig(self: *Termio, td: *ThreadData, config: *DerivedConfig) !voi
// Update our stream handler. The stream handler uses the same // Update our stream handler. The stream handler uses the same
// renderer mutex so this is safe to do despite being executed // renderer mutex so this is safe to do despite being executed
// from another thread. // from another thread.
td.ev.terminal_stream.handler.changeConfig(&self.config); td.read_thread_data.terminal_stream.handler.changeConfig(&self.config);
td.reader.changeConfig(&self.config); td.reader.changeConfig(&self.config);
// Update the configuration that we know about. // Update the configuration that we know about.
@ -711,7 +711,7 @@ fn queueWriteExec(
} }
fn readInternal( fn readInternal(
ev: *EventData, ev: *ReadData,
buf: []const u8, buf: []const u8,
) void { ) void {
// log.info("DATA: {d}", .{n}); // log.info("DATA: {d}", .{n});
@ -722,7 +722,7 @@ fn readInternal(
defer ev.renderer_state.mutex.unlock(); defer ev.renderer_state.mutex.unlock();
// Schedule a render. We can call this first because we have the lock. // Schedule a render. We can call this first because we have the lock.
ev.queueRender() catch unreachable; ev.terminal_stream.handler.queueRender() catch unreachable;
// Whenever a character is typed, we ensure the cursor is in the // Whenever a character is typed, we ensure the cursor is in the
// non-blink state so it is rendered if visible. If we're under // non-blink state so it is rendered if visible. If we're under
@ -758,9 +758,10 @@ fn readInternal(
// thread, then we need to wake it up so that it processes them. // thread, then we need to wake it up so that it processes them.
if (ev.terminal_stream.handler.writer_messaged) { if (ev.terminal_stream.handler.writer_messaged) {
ev.terminal_stream.handler.writer_messaged = false; ev.terminal_stream.handler.writer_messaged = false;
ev.writer_wakeup.notify() catch |err| { // TODO
log.warn("failed to wake up writer thread err={}", .{err}); // ev.writer_wakeup.notify() catch |err| {
}; // log.warn("failed to wake up writer thread err={}", .{err});
// };
} }
} }
@ -774,15 +775,17 @@ pub const ThreadData = struct {
/// Allocator used for the event data /// Allocator used for the event data
alloc: Allocator, alloc: Allocator,
/// The data that is attached to the callbacks.
ev: *EventData,
/// The event loop associated with this thread. This is owned by /// The event loop associated with this thread. This is owned by
/// the Thread but we have a pointer so we can queue new work to it. /// the Thread but we have a pointer so we can queue new work to it.
loop: *xev.Loop, loop: *xev.Loop,
/// The shared render state
renderer_state: *renderer.State,
/// Mailboxes for different threads /// Mailboxes for different threads
surface_mailbox: apprt.surface.Mailbox, surface_mailbox: apprt.surface.Mailbox,
writer_mailbox: *termio.Mailbox,
writer_wakeup: xev.Async,
/// Data associated with the reader implementation (i.e. pty/exec state) /// Data associated with the reader implementation (i.e. pty/exec state)
reader: termio.reader.ThreadData, reader: termio.reader.ThreadData,
@ -791,28 +794,19 @@ pub const ThreadData = struct {
read_thread: std.Thread, read_thread: std.Thread,
read_thread_pipe: posix.fd_t, read_thread_pipe: posix.fd_t,
read_thread_fd: if (builtin.os.tag == .windows) posix.fd_t else void, read_thread_fd: if (builtin.os.tag == .windows) posix.fd_t else void,
read_thread_data: *ReadData,
pub fn deinit(self: *ThreadData) void { pub fn deinit(self: *ThreadData) void {
posix.close(self.read_thread_pipe); posix.close(self.read_thread_pipe);
self.ev.deinit(self.alloc); self.read_thread_data.deinit();
self.reader.deinit(self.alloc); self.reader.deinit(self.alloc);
self.alloc.destroy(self.ev); self.alloc.destroy(self.read_thread_data);
self.* = undefined; self.* = undefined;
} }
}; };
pub const EventData = struct { /// Thread local data for the reader thread.
// The preallocation size for the write request pool. This should be big pub const ReadData = struct {
// enough to satisfy most write requests. It must be a power of 2.
const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5);
/// Mailbox for data to the writer thread.
writer_mailbox: *termio.Mailbox,
writer_wakeup: xev.Async,
/// Mailbox for the surface.
surface_mailbox: apprt.surface.Mailbox,
/// The stream parser. This parses the stream of escape codes and so on /// The stream parser. This parses the stream of escape codes and so on
/// from the child process and calls callbacks in the stream handler. /// from the child process and calls callbacks in the stream handler.
terminal_stream: terminal.Stream(StreamHandler), terminal_stream: terminal.Stream(StreamHandler),
@ -827,13 +821,6 @@ pub const EventData = struct {
/// The mailbox for notifying the renderer of things. /// The mailbox for notifying the renderer of things.
renderer_mailbox: *renderer.Thread.Mailbox, renderer_mailbox: *renderer.Thread.Mailbox,
/// The process watcher
process: xev.Process,
/// This is used for both waiting for the process to exit and then
/// subsequently to wait for the data_stream to close.
process_wait_c: xev.Completion = .{},
/// The event loop, /// The event loop,
loop: *xev.Loop, loop: *xev.Loop,
@ -841,23 +828,11 @@ pub const EventData = struct {
/// flooding with cursor resets. /// flooding with cursor resets.
last_cursor_reset: i64 = 0, last_cursor_reset: i64 = 0,
pub fn deinit(self: *EventData, alloc: Allocator) void { pub fn deinit(self: *ReadData) void {
_ = alloc;
// Stop our process watcher
self.process.deinit();
// Clear any StreamHandler state // Clear any StreamHandler state
self.terminal_stream.handler.deinit(); self.terminal_stream.handler.deinit();
self.terminal_stream.deinit(); self.terminal_stream.deinit();
} }
/// This queues a render operation with the renderer thread. The render
/// isn't guaranteed to happen immediately but it will happen as soon as
/// practical.
pub inline fn queueRender(self: *EventData) !void {
try self.renderer_wakeup.notify();
}
}; };
fn processExit( fn processExit(
@ -870,7 +845,6 @@ fn processExit(
const td = td_.?; const td = td_.?;
assert(td.reader == .exec); assert(td.reader == .exec);
const ev = td.ev;
const execdata = &td.reader.exec; const execdata = &td.reader.exec;
execdata.exited = true; execdata.exited = true;
@ -907,13 +881,13 @@ fn processExit(
// Notify our main writer thread which has access to more // Notify our main writer thread which has access to more
// information so it can show a better error message. // information so it can show a better error message.
_ = ev.writer_mailbox.push(.{ _ = td.writer_mailbox.push(.{
.child_exited_abnormally = .{ .child_exited_abnormally = .{
.exit_code = exit_code, .exit_code = exit_code,
.runtime_ms = runtime, .runtime_ms = runtime,
}, },
}, .{ .forever = {} }); }, .{ .forever = {} });
ev.writer_wakeup.notify() catch break :runtime; td.writer_wakeup.notify() catch break :runtime;
return .disarm; return .disarm;
} }
@ -925,9 +899,9 @@ fn processExit(
// We output a message so that the user knows whats going on and // We output a message so that the user knows whats going on and
// doesn't think their terminal just froze. // doesn't think their terminal just froze.
terminal: { terminal: {
ev.renderer_state.mutex.lock(); td.renderer_state.mutex.lock();
defer ev.renderer_state.mutex.unlock(); defer td.renderer_state.mutex.unlock();
const t = ev.renderer_state.terminal; const t = td.renderer_state.terminal;
t.carriageReturn(); t.carriageReturn();
t.linefeed() catch break :terminal; t.linefeed() catch break :terminal;
t.printString("Process exited. Press any key to close the terminal.") catch t.printString("Process exited. Press any key to close the terminal.") catch
@ -939,7 +913,7 @@ fn processExit(
} }
// Notify our surface we want to close // Notify our surface we want to close
_ = ev.surface_mailbox.push(.{ _ = td.surface_mailbox.push(.{
.child_exited = {}, .child_exited = {},
}, .{ .forever = {} }); }, .{ .forever = {} });
@ -1614,7 +1588,7 @@ const Subprocess = struct {
/// fds and this is still much faster and lower overhead than any async /// fds and this is still much faster and lower overhead than any async
/// mechanism. /// mechanism.
const ReadThread = struct { const ReadThread = struct {
fn threadMainPosix(fd: posix.fd_t, ev: *EventData, quit: posix.fd_t) void { fn threadMainPosix(fd: posix.fd_t, ev: *ReadData, quit: posix.fd_t) void {
// Always close our end of the pipe when we exit. // Always close our end of the pipe when we exit.
defer posix.close(quit); defer posix.close(quit);
@ -1695,7 +1669,7 @@ const ReadThread = struct {
} }
} }
fn threadMainWindows(fd: posix.fd_t, ev: *EventData, quit: posix.fd_t) void { fn threadMainWindows(fd: posix.fd_t, ev: *ReadData, quit: posix.fd_t) void {
// Always close our end of the pipe when we exit. // Always close our end of the pipe when we exit.
defer posix.close(quit); defer posix.close(quit);

View File

@ -57,6 +57,9 @@ pub const ThreadData = union(Kind) {
/// The data stream is the main IO for the pty. /// The data stream is the main IO for the pty.
write_stream: xev.Stream, write_stream: xev.Stream,
/// The process watcher
process: xev.Process,
/// This is the pool of available (unused) write requests. If you grab /// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done! /// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{}, write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
@ -66,6 +69,10 @@ pub const ThreadData = union(Kind) {
/// The write queue for the data stream. /// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{}, write_queue: xev.Stream.WriteQueue = .{},
/// This is used for both waiting for the process to exit and then
/// subsequently to wait for the data_stream to close.
process_wait_c: xev.Completion = .{},
}; };
pub fn deinit(self: *ThreadData, alloc: Allocator) void { pub fn deinit(self: *ThreadData, alloc: Allocator) void {
@ -78,6 +85,9 @@ pub const ThreadData = union(Kind) {
exec.write_req_pool.deinit(alloc); exec.write_req_pool.deinit(alloc);
exec.write_buf_pool.deinit(alloc); exec.write_buf_pool.deinit(alloc);
// Stop our process watcher
exec.process.deinit();
// Stop our write stream // Stop our write stream
exec.write_stream.deinit(); exec.write_stream.deinit();
}, },

View File

@ -104,7 +104,7 @@ pub const StreamHandler = struct {
/// This queues a render operation with the renderer thread. The render /// This queues a render operation with the renderer thread. The render
/// isn't guaranteed to happen immediately but it will happen as soon as /// isn't guaranteed to happen immediately but it will happen as soon as
/// practical. /// practical.
inline fn queueRender(self: *StreamHandler) !void { pub inline fn queueRender(self: *StreamHandler) !void {
try self.renderer_wakeup.notify(); try self.renderer_wakeup.notify();
} }