diff --git a/src/termio/Termio.zig b/src/termio/Termio.zig index f2a501b64..fb6c53823 100644 --- a/src/termio/Termio.zig +++ b/src/termio/Termio.zig @@ -236,8 +236,8 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo errdefer posix.close(pipe[1]); // Setup our data that is used for callbacks - var ev_data_ptr = try alloc.create(EventData); - errdefer alloc.destroy(ev_data_ptr); + var read_data_ptr = try alloc.create(ReadData); + errdefer alloc.destroy(read_data_ptr); // Setup our stream so that we can write. var stream = xev.Stream.initFd(pty_fds.write); @@ -282,14 +282,10 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo }; // Setup our event data before we start - ev_data_ptr.* = .{ - .writer_mailbox = thread.mailbox, - .writer_wakeup = thread.wakeup, - .surface_mailbox = self.surface_mailbox, + read_data_ptr.* = .{ .renderer_state = self.renderer_state, .renderer_wakeup = self.renderer_wakeup, .renderer_mailbox = self.renderer_mailbox, - .process = process, .loop = &thread.loop, .terminal_stream = .{ .handler = handler, @@ -302,41 +298,45 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo }, }, }; - errdefer ev_data_ptr.deinit(self.alloc); - - // Start our process watcher - process.wait( - ev_data_ptr.loop, - &ev_data_ptr.process_wait_c, - ThreadData, - data, - processExit, - ); + errdefer read_data_ptr.deinit(); // Start our reader thread const read_thread = try std.Thread.spawn( .{}, if (builtin.os.tag == .windows) ReadThread.threadMainWindows else ReadThread.threadMainPosix, - .{ pty_fds.read, ev_data_ptr, pipe[0] }, + .{ pty_fds.read, read_data_ptr, pipe[0] }, ); read_thread.setName("io-reader") catch {}; // Return our thread data data.* = .{ .alloc = alloc, - .ev = ev_data_ptr, .loop = &thread.loop, + .renderer_state = self.renderer_state, .surface_mailbox = self.surface_mailbox, + .writer_mailbox = thread.mailbox, + .writer_wakeup = thread.wakeup, .reader = .{ .exec = .{ .start = process_start, .abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms, .wait_after_command = self.config.wait_after_command, .write_stream = stream, + .process = process, } }, .read_thread = read_thread, .read_thread_pipe = pipe[1], .read_thread_fd = if (builtin.os.tag == .windows) pty_fds.read else {}, + .read_thread_data = read_data_ptr, }; + + // Start our process watcher + process.wait( + &thread.loop, + &data.reader.exec.process_wait_c, + ThreadData, + data, + processExit, + ); } /// This outputs an error message when exec failed and we are the @@ -405,7 +405,7 @@ pub fn changeConfig(self: *Termio, td: *ThreadData, config: *DerivedConfig) !voi // Update our stream handler. The stream handler uses the same // renderer mutex so this is safe to do despite being executed // from another thread. - td.ev.terminal_stream.handler.changeConfig(&self.config); + td.read_thread_data.terminal_stream.handler.changeConfig(&self.config); td.reader.changeConfig(&self.config); // Update the configuration that we know about. @@ -711,7 +711,7 @@ fn queueWriteExec( } fn readInternal( - ev: *EventData, + ev: *ReadData, buf: []const u8, ) void { // log.info("DATA: {d}", .{n}); @@ -722,7 +722,7 @@ fn readInternal( defer ev.renderer_state.mutex.unlock(); // Schedule a render. We can call this first because we have the lock. - ev.queueRender() catch unreachable; + ev.terminal_stream.handler.queueRender() catch unreachable; // Whenever a character is typed, we ensure the cursor is in the // non-blink state so it is rendered if visible. If we're under @@ -758,9 +758,10 @@ fn readInternal( // thread, then we need to wake it up so that it processes them. if (ev.terminal_stream.handler.writer_messaged) { ev.terminal_stream.handler.writer_messaged = false; - ev.writer_wakeup.notify() catch |err| { - log.warn("failed to wake up writer thread err={}", .{err}); - }; + // TODO + // ev.writer_wakeup.notify() catch |err| { + // log.warn("failed to wake up writer thread err={}", .{err}); + // }; } } @@ -774,15 +775,17 @@ pub const ThreadData = struct { /// Allocator used for the event data alloc: Allocator, - /// The data that is attached to the callbacks. - ev: *EventData, - /// The event loop associated with this thread. This is owned by /// the Thread but we have a pointer so we can queue new work to it. loop: *xev.Loop, + /// The shared render state + renderer_state: *renderer.State, + /// Mailboxes for different threads surface_mailbox: apprt.surface.Mailbox, + writer_mailbox: *termio.Mailbox, + writer_wakeup: xev.Async, /// Data associated with the reader implementation (i.e. pty/exec state) reader: termio.reader.ThreadData, @@ -791,28 +794,19 @@ pub const ThreadData = struct { read_thread: std.Thread, read_thread_pipe: posix.fd_t, read_thread_fd: if (builtin.os.tag == .windows) posix.fd_t else void, + read_thread_data: *ReadData, pub fn deinit(self: *ThreadData) void { posix.close(self.read_thread_pipe); - self.ev.deinit(self.alloc); + self.read_thread_data.deinit(); self.reader.deinit(self.alloc); - self.alloc.destroy(self.ev); + self.alloc.destroy(self.read_thread_data); self.* = undefined; } }; -pub const EventData = struct { - // The preallocation size for the write request pool. This should be big - // enough to satisfy most write requests. It must be a power of 2. - const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); - - /// Mailbox for data to the writer thread. - writer_mailbox: *termio.Mailbox, - writer_wakeup: xev.Async, - - /// Mailbox for the surface. - surface_mailbox: apprt.surface.Mailbox, - +/// Thread local data for the reader thread. +pub const ReadData = struct { /// The stream parser. This parses the stream of escape codes and so on /// from the child process and calls callbacks in the stream handler. terminal_stream: terminal.Stream(StreamHandler), @@ -827,13 +821,6 @@ pub const EventData = struct { /// The mailbox for notifying the renderer of things. renderer_mailbox: *renderer.Thread.Mailbox, - /// The process watcher - process: xev.Process, - - /// This is used for both waiting for the process to exit and then - /// subsequently to wait for the data_stream to close. - process_wait_c: xev.Completion = .{}, - /// The event loop, loop: *xev.Loop, @@ -841,23 +828,11 @@ pub const EventData = struct { /// flooding with cursor resets. last_cursor_reset: i64 = 0, - pub fn deinit(self: *EventData, alloc: Allocator) void { - _ = alloc; - - // Stop our process watcher - self.process.deinit(); - + pub fn deinit(self: *ReadData) void { // Clear any StreamHandler state self.terminal_stream.handler.deinit(); self.terminal_stream.deinit(); } - - /// This queues a render operation with the renderer thread. The render - /// isn't guaranteed to happen immediately but it will happen as soon as - /// practical. - pub inline fn queueRender(self: *EventData) !void { - try self.renderer_wakeup.notify(); - } }; fn processExit( @@ -870,7 +845,6 @@ fn processExit( const td = td_.?; assert(td.reader == .exec); - const ev = td.ev; const execdata = &td.reader.exec; execdata.exited = true; @@ -907,13 +881,13 @@ fn processExit( // Notify our main writer thread which has access to more // information so it can show a better error message. - _ = ev.writer_mailbox.push(.{ + _ = td.writer_mailbox.push(.{ .child_exited_abnormally = .{ .exit_code = exit_code, .runtime_ms = runtime, }, }, .{ .forever = {} }); - ev.writer_wakeup.notify() catch break :runtime; + td.writer_wakeup.notify() catch break :runtime; return .disarm; } @@ -925,9 +899,9 @@ fn processExit( // We output a message so that the user knows whats going on and // doesn't think their terminal just froze. terminal: { - ev.renderer_state.mutex.lock(); - defer ev.renderer_state.mutex.unlock(); - const t = ev.renderer_state.terminal; + td.renderer_state.mutex.lock(); + defer td.renderer_state.mutex.unlock(); + const t = td.renderer_state.terminal; t.carriageReturn(); t.linefeed() catch break :terminal; t.printString("Process exited. Press any key to close the terminal.") catch @@ -939,7 +913,7 @@ fn processExit( } // Notify our surface we want to close - _ = ev.surface_mailbox.push(.{ + _ = td.surface_mailbox.push(.{ .child_exited = {}, }, .{ .forever = {} }); @@ -1614,7 +1588,7 @@ const Subprocess = struct { /// fds and this is still much faster and lower overhead than any async /// mechanism. const ReadThread = struct { - fn threadMainPosix(fd: posix.fd_t, ev: *EventData, quit: posix.fd_t) void { + fn threadMainPosix(fd: posix.fd_t, ev: *ReadData, quit: posix.fd_t) void { // Always close our end of the pipe when we exit. defer posix.close(quit); @@ -1695,7 +1669,7 @@ const ReadThread = struct { } } - fn threadMainWindows(fd: posix.fd_t, ev: *EventData, quit: posix.fd_t) void { + fn threadMainWindows(fd: posix.fd_t, ev: *ReadData, quit: posix.fd_t) void { // Always close our end of the pipe when we exit. defer posix.close(quit); diff --git a/src/termio/reader.zig b/src/termio/reader.zig index 10a6e9980..31d51498f 100644 --- a/src/termio/reader.zig +++ b/src/termio/reader.zig @@ -57,6 +57,9 @@ pub const ThreadData = union(Kind) { /// The data stream is the main IO for the pty. write_stream: xev.Stream, + /// The process watcher + process: xev.Process, + /// This is the pool of available (unused) write requests. If you grab /// one from the pool, you must put it back when you're done! write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{}, @@ -66,6 +69,10 @@ pub const ThreadData = union(Kind) { /// The write queue for the data stream. write_queue: xev.Stream.WriteQueue = .{}, + + /// This is used for both waiting for the process to exit and then + /// subsequently to wait for the data_stream to close. + process_wait_c: xev.Completion = .{}, }; pub fn deinit(self: *ThreadData, alloc: Allocator) void { @@ -78,6 +85,9 @@ pub const ThreadData = union(Kind) { exec.write_req_pool.deinit(alloc); exec.write_buf_pool.deinit(alloc); + // Stop our process watcher + exec.process.deinit(); + // Stop our write stream exec.write_stream.deinit(); }, diff --git a/src/termio/stream_handler.zig b/src/termio/stream_handler.zig index f88aeca97..b7774250f 100644 --- a/src/termio/stream_handler.zig +++ b/src/termio/stream_handler.zig @@ -104,7 +104,7 @@ pub const StreamHandler = struct { /// This queues a render operation with the renderer thread. The render /// isn't guaranteed to happen immediately but it will happen as soon as /// practical. - inline fn queueRender(self: *StreamHandler) !void { + pub inline fn queueRender(self: *StreamHandler) !void { try self.renderer_wakeup.notify(); }