diff --git a/src/termio/Exec.zig b/src/termio/Exec.zig index 08808e3f8..89ecf726c 100644 --- a/src/termio/Exec.zig +++ b/src/termio/Exec.zig @@ -139,6 +139,12 @@ pub fn threadEnter(self: *Exec, thread: *termio.Thread) !ThreadData { break :pid command.pid orelse return error.ProcessNoPid; }; + // Create our pipe that we'll use to kill our read thread. + // pipe[0] is the read end, pipe[1] is the write end. + const pipe = try std.os.pipe(); + errdefer std.os.close(pipe[0]); + errdefer std.os.close(pipe[1]); + // Setup our data that is used for callbacks var ev_data_ptr = try alloc.create(EventData); errdefer alloc.destroy(ev_data_ptr); @@ -194,7 +200,7 @@ pub fn threadEnter(self: *Exec, thread: *termio.Thread) !ThreadData { const read_thread = try std.Thread.spawn( .{}, ReadThread.threadMain, - .{ master_fd, ev_data_ptr }, + .{ master_fd, ev_data_ptr, pipe[0] }, ); read_thread.setName("io-reader") catch {}; @@ -203,6 +209,7 @@ pub fn threadEnter(self: *Exec, thread: *termio.Thread) !ThreadData { .alloc = alloc, .ev = ev_data_ptr, .read_thread = read_thread, + .read_thread_pipe = pipe[1], }; } @@ -210,12 +217,15 @@ pub fn threadExit(self: *Exec, data: ThreadData) void { // Clear out our data since we're not active anymore. self.data = null; + // Quit our read thread first so that we aren't simultaneously + // performing a read/close on the pty fd. + _ = std.os.write(data.read_thread_pipe, "x") catch |err| + log.warn("error writing to read thread quit pipe err={}", .{err}); + data.read_thread.join(); + // Stop our subprocess if (data.ev.process_exited) self.subprocess.externalExit(); self.subprocess.stop(); - - // Wait for our reader thread to end - data.read_thread.join(); } /// Update the configuration. @@ -338,8 +348,10 @@ const ThreadData = struct { /// Our read thread read_thread: std.Thread, + read_thread_pipe: std.os.fd_t, pub fn deinit(self: *ThreadData) void { + std.os.close(self.read_thread_pipe); self.ev.deinit(self.alloc); self.alloc.destroy(self.ev); self.* = undefined; @@ -934,11 +946,39 @@ const Subprocess = struct { /// This is also empirically fast compared to putting the read into /// an async mechanism like io_uring/epoll because the reads are generally /// small. +/// +/// We use a basic poll syscall here because we are only monitoring two +/// fds and this is still much faster and lower overhead than any async +/// mechanism. const ReadThread = struct { /// The main entrypoint for the thread. - fn threadMain(fd: std.os.fd_t, ev: *EventData) void { + fn threadMain(fd: std.os.fd_t, ev: *EventData, quit: std.os.fd_t) void { + // Always close our end of the pipe when we exit. + defer std.os.close(quit); + + // Build up the list of fds we're going to poll. We are looking + // for data on the pty and our quit notification. + var pollfds: [2]std.os.pollfd = .{ + .{ .fd = fd, .events = std.os.POLL.IN, .revents = undefined }, + .{ .fd = quit, .events = std.os.POLL.IN, .revents = undefined }, + }; + var buf: [1024]u8 = undefined; while (true) { + // Wait for data. + _ = std.os.poll(&pollfds, 0) catch |err| { + log.warn("poll failed on read thread, exiting early err={}", .{err}); + return; + }; + + // If our quit fd is set, we're done. + if (pollfds[1].revents & std.os.POLL.IN != 0) { + log.info("read thread got quit signal", .{}); + return; + } + + // Ensure our pty has data. + if (pollfds[0].revents & std.os.POLL.IN == 0) continue; const n = std.os.read(fd, &buf) catch |err| { switch (err) { // This means our pty is closed. We're probably