termio: move all subprocess logic to termio.Exec

This commit is contained in:
Mitchell Hashimoto
2024-07-13 19:23:45 -07:00
parent 7c23d61379
commit e30e635bed
3 changed files with 1228 additions and 1146 deletions

View File

@ -7,13 +7,16 @@ const std = @import("std");
const builtin = @import("builtin"); const builtin = @import("builtin");
const assert = std.debug.assert; const assert = std.debug.assert;
const Allocator = std.mem.Allocator; const Allocator = std.mem.Allocator;
const ArenaAllocator = std.heap.ArenaAllocator;
const posix = std.posix; const posix = std.posix;
const xev = @import("xev"); const xev = @import("xev");
const build_config = @import("../build_config.zig"); const build_config = @import("../build_config.zig");
const configpkg = @import("../config.zig"); const configpkg = @import("../config.zig");
const fastmem = @import("../fastmem.zig");
const internal_os = @import("../os/main.zig"); const internal_os = @import("../os/main.zig");
const renderer = @import("../renderer.zig"); const renderer = @import("../renderer.zig");
const shell_integration = @import("shell_integration.zig"); const shell_integration = @import("shell_integration.zig");
const terminal = @import("../terminal/main.zig");
const termio = @import("../termio.zig"); const termio = @import("../termio.zig");
const Command = @import("../Command.zig"); const Command = @import("../Command.zig");
const SegmentedPool = @import("../segmented_pool.zig").SegmentedPool; const SegmentedPool = @import("../segmented_pool.zig").SegmentedPool;
@ -23,6 +26,486 @@ const windows = internal_os.windows;
const log = std.log.scoped(.io_exec); const log = std.log.scoped(.io_exec);
/// The subprocess state for our exec reader.
subprocess: Subprocess,
/// Initialize the exec state. This will NOT start it, this only sets
/// up the internal state necessary to start it later.
pub fn init(
alloc: Allocator,
opts: termio.Options,
term: *terminal.Terminal,
) !Exec {
var subprocess = try Subprocess.init(alloc, opts);
errdefer subprocess.deinit();
// If we have an initial pwd requested by the subprocess, then we
// set that on the terminal now. This allows rapidly initializing
// new surfaces to use the proper pwd.
if (subprocess.cwd) |cwd| term.setPwd(cwd) catch |err| {
log.warn("error setting initial pwd err={}", .{err});
};
// Initial width/height based on subprocess
term.width_px = subprocess.screen_size.width;
term.height_px = subprocess.screen_size.height;
return .{ .subprocess = subprocess };
}
pub fn deinit(self: *Exec) void {
self.subprocess.deinit();
}
pub fn threadEnter(
self: *Exec,
alloc: Allocator,
io: *termio.Termio,
td: *termio.Termio.ThreadData,
) !void {
// Start our subprocess
const pty_fds = self.subprocess.start(alloc) catch |err| {
// If we specifically got this error then we are in the forked
// process and our child failed to execute. In that case
if (err != error.Termio) return err;
// Output an error message about the exec faililng and exit.
// This generally should NOT happen because we always wrap
// our command execution either in login (macOS) or /bin/sh
// (Linux) which are usually guaranteed to exist. Still, we
// want to handle this scenario.
execFailedInChild() catch {};
posix.exit(1);
};
errdefer self.subprocess.stop();
// Get the pid from the subprocess
const pid = pid: {
const command = self.subprocess.command orelse return error.ProcessNotStarted;
break :pid command.pid orelse return error.ProcessNoPid;
};
// Track our process start time for abnormal exits
const process_start = try std.time.Instant.now();
// Create our pipe that we'll use to kill our read thread.
// pipe[0] is the read end, pipe[1] is the write end.
const pipe = try internal_os.pipe();
errdefer posix.close(pipe[0]);
errdefer posix.close(pipe[1]);
// Setup our stream so that we can write.
var stream = xev.Stream.initFd(pty_fds.write);
errdefer stream.deinit();
// Watcher to detect subprocess exit
var process = try xev.Process.init(pid);
errdefer process.deinit();
// Start our read thread
const read_thread = try std.Thread.spawn(
.{},
if (builtin.os.tag == .windows) ReadThread.threadMainWindows else ReadThread.threadMainPosix,
.{ pty_fds.read, td.read_data, pipe[0] },
);
read_thread.setName("io-reader") catch {};
// Setup our threadata reader state to be our own
td.reader = .{ .exec = .{
.start = process_start,
.abnormal_runtime_threshold_ms = io.config.abnormal_runtime_threshold_ms,
.wait_after_command = io.config.wait_after_command,
.write_stream = stream,
.process = process,
.read_thread = read_thread,
.read_thread_pipe = pipe[1],
.read_thread_fd = if (builtin.os.tag == .windows) pty_fds.read else {},
} };
// Start our process watcher
process.wait(
td.loop,
&td.reader.exec.process_wait_c,
termio.Termio.ThreadData,
td,
processExit,
);
}
pub fn threadExit(self: *Exec, td: *termio.Termio.ThreadData) void {
assert(td.reader == .exec);
const exec = &td.reader.exec;
if (exec.exited) self.subprocess.externalExit();
self.subprocess.stop();
// Quit our read thread after exiting the subprocess so that
// we don't get stuck waiting for data to stop flowing if it is
// a particularly noisy process.
_ = posix.write(exec.read_thread_pipe, "x") catch |err|
log.warn("error writing to read thread quit pipe err={}", .{err});
if (comptime builtin.os.tag == .windows) {
// Interrupt the blocking read so the thread can see the quit message
if (windows.kernel32.CancelIoEx(exec.read_thread_fd, null) == 0) {
switch (windows.kernel32.GetLastError()) {
.NOT_FOUND => {},
else => |err| log.warn("error interrupting read thread err={}", .{err}),
}
}
}
exec.read_thread.join();
}
pub fn resize(
self: *Exec,
grid_size: renderer.GridSize,
screen_size: renderer.ScreenSize,
) !void {
return try self.subprocess.resize(grid_size, screen_size);
}
/// Called when the child process exited abnormally but before the surface
/// is notified.
pub fn childExitedAbnormally(
self: *Exec,
gpa: Allocator,
t: *terminal.Terminal,
exit_code: u32,
runtime_ms: u64,
) !void {
var arena = ArenaAllocator.init(gpa);
defer arena.deinit();
const alloc = arena.allocator();
// Build up our command for the error message
const command = try std.mem.join(alloc, " ", self.subprocess.args);
const runtime_str = try std.fmt.allocPrint(alloc, "{d} ms", .{runtime_ms});
// No matter what move the cursor back to the column 0.
t.carriageReturn();
// Reset styles
try t.setAttribute(.{ .unset = {} });
// If there is data in the viewport, we want to scroll down
// a little bit and write a horizontal rule before writing
// our message. This lets the use see the error message the
// command may have output.
const viewport_str = try t.plainString(alloc);
if (viewport_str.len > 0) {
try t.linefeed();
for (0..t.cols) |_| try t.print(0x2501);
t.carriageReturn();
try t.linefeed();
try t.linefeed();
}
// Output our error message
try t.setAttribute(.{ .@"8_fg" = .bright_red });
try t.setAttribute(.{ .bold = {} });
try t.printString("Ghostty failed to launch the requested command:");
try t.setAttribute(.{ .unset = {} });
t.carriageReturn();
try t.linefeed();
try t.linefeed();
try t.printString(command);
try t.setAttribute(.{ .unset = {} });
t.carriageReturn();
try t.linefeed();
try t.linefeed();
try t.printString("Runtime: ");
try t.setAttribute(.{ .@"8_fg" = .red });
try t.printString(runtime_str);
try t.setAttribute(.{ .unset = {} });
// We don't print this on macOS because the exit code is always 0
// due to the way we launch the process.
if (comptime !builtin.target.isDarwin()) {
const exit_code_str = try std.fmt.allocPrint(alloc, "{d}", .{exit_code});
t.carriageReturn();
try t.linefeed();
try t.printString("Exit Code: ");
try t.setAttribute(.{ .@"8_fg" = .red });
try t.printString(exit_code_str);
try t.setAttribute(.{ .unset = {} });
}
t.carriageReturn();
try t.linefeed();
try t.linefeed();
try t.printString("Press any key to close the window.");
// Hide the cursor
t.modes.set(.cursor_visible, false);
}
/// This outputs an error message when exec failed and we are the
/// child process. This returns so the caller should probably exit
/// after calling this.
///
/// Note that this usually is only called under very very rare
/// circumstances because we wrap our command execution in login
/// (macOS) or /bin/sh (Linux). So this output can be pretty crude
/// because it should never happen. Notably, this is not the error
/// users see when `command` is invalid.
fn execFailedInChild() !void {
const stderr = std.io.getStdErr().writer();
try stderr.writeAll("exec failed\n");
try stderr.writeAll("press any key to exit\n");
var buf: [1]u8 = undefined;
var reader = std.io.getStdIn().reader();
_ = try reader.read(&buf);
}
fn processExit(
td_: ?*termio.Termio.ThreadData,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Process.WaitError!u32,
) xev.CallbackAction {
const exit_code = r catch unreachable;
const td = td_.?;
assert(td.reader == .exec);
const execdata = &td.reader.exec;
execdata.exited = true;
// Determine how long the process was running for.
const runtime_ms: ?u64 = runtime: {
const process_end = std.time.Instant.now() catch break :runtime null;
const runtime_ns = process_end.since(execdata.start);
const runtime_ms = runtime_ns / std.time.ns_per_ms;
break :runtime runtime_ms;
};
log.debug(
"child process exited status={} runtime={}ms",
.{ exit_code, runtime_ms orelse 0 },
);
// If our runtime was below some threshold then we assume that this
// was an abnormal exit and we show an error message.
if (runtime_ms) |runtime| runtime: {
// On macOS, our exit code detection doesn't work, possibly
// because of our `login` wrapper. More investigation required.
if (comptime !builtin.target.isDarwin()) {
// If our exit code is zero, then the command was successful
// and we don't ever consider it abnormal.
if (exit_code == 0) break :runtime;
}
// Our runtime always has to be under the threshold to be
// considered abnormal. This is because a user can always
// manually do something like `exit 1` in their shell to
// force the exit code to be non-zero. We only want to detect
// abnormal exits that happen so quickly the user can't react.
if (runtime > execdata.abnormal_runtime_threshold_ms) break :runtime;
log.warn("abnormal process exit detected, showing error message", .{});
// Notify our main writer thread which has access to more
// information so it can show a better error message.
_ = td.writer_mailbox.push(.{
.child_exited_abnormally = .{
.exit_code = exit_code,
.runtime_ms = runtime,
},
}, .{ .forever = {} });
td.writer_wakeup.notify() catch break :runtime;
return .disarm;
}
// If we're purposely waiting then we just return since the process
// exited flag is set to true. This allows the terminal window to remain
// open.
if (execdata.wait_after_command) {
// We output a message so that the user knows whats going on and
// doesn't think their terminal just froze.
terminal: {
td.renderer_state.mutex.lock();
defer td.renderer_state.mutex.unlock();
const t = td.renderer_state.terminal;
t.carriageReturn();
t.linefeed() catch break :terminal;
t.printString("Process exited. Press any key to close the terminal.") catch
break :terminal;
t.modes.set(.cursor_visible, false);
}
return .disarm;
}
// Notify our surface we want to close
_ = td.surface_mailbox.push(.{
.child_exited = {},
}, .{ .forever = {} });
return .disarm;
}
pub fn queueWrite(
self: *Exec,
alloc: Allocator,
td: *termio.Termio.ThreadData,
data: []const u8,
linefeed: bool,
) !void {
_ = self;
const exec = &td.reader.exec;
// If our process is exited then we send our surface a message
// about it but we don't queue any more writes.
if (exec.exited) {
_ = td.surface_mailbox.push(.{
.child_exited = {},
}, .{ .forever = {} });
return;
}
// We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream.
var i: usize = 0;
while (i < data.len) {
const req = try exec.write_req_pool.getGrow(alloc);
const buf = try exec.write_buf_pool.getGrow(alloc);
const slice = slice: {
// The maximum end index is either the end of our data or
// the end of our buffer, whichever is smaller.
const max = @min(data.len, i + buf.len);
// Fast
if (!linefeed) {
fastmem.copy(u8, buf, data[i..max]);
const len = max - i;
i = max;
break :slice buf[0..len];
}
// Slow, have to replace \r with \r\n
var buf_i: usize = 0;
while (i < data.len and buf_i < buf.len - 1) {
const ch = data[i];
i += 1;
if (ch != '\r') {
buf[buf_i] = ch;
buf_i += 1;
continue;
}
// CRLF
buf[buf_i] = '\r';
buf[buf_i + 1] = '\n';
buf_i += 2;
}
break :slice buf[0..buf_i];
};
//for (slice) |b| log.warn("write: {x}", .{b});
exec.write_stream.queueWrite(
td.loop,
&exec.write_queue,
req,
.{ .slice = slice },
termio.Exec.ThreadData,
exec,
ttyWrite,
);
}
}
fn ttyWrite(
td_: ?*ThreadData,
_: *xev.Loop,
_: *xev.Completion,
_: xev.Stream,
_: xev.WriteBuffer,
r: xev.Stream.WriteError!usize,
) xev.CallbackAction {
const td = td_.?;
td.write_req_pool.put();
td.write_buf_pool.put();
const d = r catch |err| {
log.err("write error: {}", .{err});
return .disarm;
};
_ = d;
//log.info("WROTE: {d}", .{d});
return .disarm;
}
/// The thread local data for the exec implementation.
pub const ThreadData = struct {
// The preallocation size for the write request pool. This should be big
// enough to satisfy most write requests. It must be a power of 2.
const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5);
/// Process start time and boolean of whether its already exited.
start: std.time.Instant,
exited: bool = false,
/// The number of milliseconds below which we consider a process
/// exit to be abnormal. This is used to show an error message
/// when the process exits too quickly.
abnormal_runtime_threshold_ms: u32,
/// If true, do not immediately send a child exited message to the
/// surface to close the surface when the command exits. If this is
/// false we'll show a process exited message and wait for user input
/// to close the surface.
wait_after_command: bool,
/// The data stream is the main IO for the pty.
write_stream: xev.Stream,
/// The process watcher
process: xev.Process,
/// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
/// The pool of available buffers for writing to the pty.
write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{},
/// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{},
/// This is used for both waiting for the process to exit and then
/// subsequently to wait for the data_stream to close.
process_wait_c: xev.Completion = .{},
/// Reader thread state
read_thread: std.Thread,
read_thread_pipe: posix.fd_t,
read_thread_fd: if (builtin.os.tag == .windows) posix.fd_t else void,
pub fn deinit(self: *ThreadData, alloc: Allocator) void {
posix.close(self.read_thread_pipe);
// Clear our write pools. We know we aren't ever going to do
// any more IO since we stop our data stream below so we can just
// drop this.
self.write_req_pool.deinit(alloc);
self.write_buf_pool.deinit(alloc);
// Stop our process watcher
self.process.deinit();
// Stop our write stream
self.write_stream.deinit();
}
};
const Subprocess = struct {
/// If we build with flatpak support then we have to keep track of /// If we build with flatpak support then we have to keep track of
/// a potential execution on the host. /// a potential execution on the host.
const FlatpakHostCommand = if (build_config.flatpak) internal_os.FlatpakHostCommand else void; const FlatpakHostCommand = if (build_config.flatpak) internal_os.FlatpakHostCommand else void;
@ -46,7 +529,7 @@ linux_cgroup: Command.LinuxCgroup = Command.linux_cgroup_default,
/// Initialize the subprocess. This will NOT start it, this only sets /// Initialize the subprocess. This will NOT start it, this only sets
/// up the internal state necessary to start it later. /// up the internal state necessary to start it later.
pub fn init(gpa: Allocator, opts: termio.Options) !Exec { pub fn init(gpa: Allocator, opts: termio.Options) !Subprocess {
// We have a lot of maybe-allocations that all share the same lifetime // We have a lot of maybe-allocations that all share the same lifetime
// so use an arena so we don't end up in an accounting nightmare. // so use an arena so we don't end up in an accounting nightmare.
var arena = std.heap.ArenaAllocator.init(gpa); var arena = std.heap.ArenaAllocator.init(gpa);
@ -391,7 +874,7 @@ pub fn init(gpa: Allocator, opts: termio.Options) !Exec {
} }
/// Clean up the subprocess. This will stop the subprocess if it is started. /// Clean up the subprocess. This will stop the subprocess if it is started.
pub fn deinit(self: *Exec) void { pub fn deinit(self: *Subprocess) void {
self.stop(); self.stop();
if (self.pty) |*pty| pty.deinit(); if (self.pty) |*pty| pty.deinit();
self.arena.deinit(); self.arena.deinit();
@ -400,7 +883,7 @@ pub fn deinit(self: *Exec) void {
/// Start the subprocess. If the subprocess is already started this /// Start the subprocess. If the subprocess is already started this
/// will crash. /// will crash.
pub fn start(self: *Exec, alloc: Allocator) !struct { pub fn start(self: *Subprocess, alloc: Allocator) !struct {
read: Pty.Fd, read: Pty.Fd,
write: Pty.Fd, write: Pty.Fd,
} { } {
@ -480,7 +963,7 @@ pub fn start(self: *Exec, alloc: Allocator) !struct {
.pseudo_console = if (builtin.os.tag == .windows) pty.pseudo_console else {}, .pseudo_console = if (builtin.os.tag == .windows) pty.pseudo_console else {},
.pre_exec = if (builtin.os.tag == .windows) null else (struct { .pre_exec = if (builtin.os.tag == .windows) null else (struct {
fn callback(cmd: *Command) void { fn callback(cmd: *Command) void {
const sp = cmd.getData(Exec) orelse unreachable; const sp = cmd.getData(Subprocess) orelse unreachable;
sp.childPreExec() catch |err| log.err( sp.childPreExec() catch |err| log.err(
"error initializing child: {}", "error initializing child: {}",
.{err}, .{err},
@ -516,14 +999,14 @@ pub fn start(self: *Exec, alloc: Allocator) !struct {
/// This should be called after fork but before exec in the child process. /// This should be called after fork but before exec in the child process.
/// To repeat: this function RUNS IN THE FORKED CHILD PROCESS before /// To repeat: this function RUNS IN THE FORKED CHILD PROCESS before
/// exec is called; it does NOT run in the main Ghostty process. /// exec is called; it does NOT run in the main Ghostty process.
fn childPreExec(self: *Exec) !void { fn childPreExec(self: *Subprocess) !void {
// Setup our pty // Setup our pty
try self.pty.?.childPreExec(); try self.pty.?.childPreExec();
} }
/// Called to notify that we exited externally so we can unset our /// Called to notify that we exited externally so we can unset our
/// running state. /// running state.
pub fn externalExit(self: *Exec) void { pub fn externalExit(self: *Subprocess) void {
self.command = null; self.command = null;
} }
@ -531,7 +1014,7 @@ pub fn externalExit(self: *Exec) void {
/// for the subprocess to register that it has been signalled, but not /// for the subprocess to register that it has been signalled, but not
/// for it to terminate, so it will not block. /// for it to terminate, so it will not block.
/// This does not close the pty. /// This does not close the pty.
pub fn stop(self: *Exec) void { pub fn stop(self: *Subprocess) void {
// Kill our command // Kill our command
if (self.command) |*cmd| { if (self.command) |*cmd| {
// Note: this will also wait for the command to exit, so // Note: this will also wait for the command to exit, so
@ -555,7 +1038,7 @@ pub fn stop(self: *Exec) void {
/// Resize the pty subprocess. This is safe to call anytime. /// Resize the pty subprocess. This is safe to call anytime.
pub fn resize( pub fn resize(
self: *Exec, self: *Subprocess,
grid_size: renderer.GridSize, grid_size: renderer.GridSize,
screen_size: renderer.ScreenSize, screen_size: renderer.ScreenSize,
) !void { ) !void {
@ -654,3 +1137,141 @@ fn getpgid(pid: c.pid_t) ?c.pid_t {
fn killCommandFlatpak(command: *FlatpakHostCommand) !void { fn killCommandFlatpak(command: *FlatpakHostCommand) !void {
try command.signal(c.SIGHUP, true); try command.signal(c.SIGHUP, true);
} }
};
/// The read thread sits in a loop doing the following pseudo code:
///
/// while (true) { blocking_read(); exit_if_eof(); process(); }
///
/// Almost all terminal-modifying activity is from the pty read, so
/// putting this on a dedicated thread keeps performance very predictable
/// while also almost optimal. "Locking is fast, lock contention is slow."
/// and since we rarely have contention, this is fast.
///
/// This is also empirically fast compared to putting the read into
/// an async mechanism like io_uring/epoll because the reads are generally
/// small.
///
/// We use a basic poll syscall here because we are only monitoring two
/// fds and this is still much faster and lower overhead than any async
/// mechanism.
pub const ReadThread = struct {
fn threadMainPosix(fd: posix.fd_t, ev: *termio.Termio.ReadData, quit: posix.fd_t) void {
// Always close our end of the pipe when we exit.
defer posix.close(quit);
// First thing, we want to set the fd to non-blocking. We do this
// so that we can try to read from the fd in a tight loop and only
// check the quit fd occasionally.
if (posix.fcntl(fd, posix.F.GETFL, 0)) |flags| {
_ = posix.fcntl(
fd,
posix.F.SETFL,
flags | @as(u32, @bitCast(posix.O{ .NONBLOCK = true })),
) catch |err| {
log.warn("read thread failed to set flags err={}", .{err});
log.warn("this isn't a fatal error, but may cause performance issues", .{});
};
} else |err| {
log.warn("read thread failed to get flags err={}", .{err});
log.warn("this isn't a fatal error, but may cause performance issues", .{});
}
// Build up the list of fds we're going to poll. We are looking
// for data on the pty and our quit notification.
var pollfds: [2]posix.pollfd = .{
.{ .fd = fd, .events = posix.POLL.IN, .revents = undefined },
.{ .fd = quit, .events = posix.POLL.IN, .revents = undefined },
};
var buf: [1024]u8 = undefined;
while (true) {
// We try to read from the file descriptor as long as possible
// to maximize performance. We only check the quit fd if the
// main fd blocks. This optimizes for the realistic scenario that
// the data will eventually stop while we're trying to quit. This
// is always true because we kill the process.
while (true) {
const n = posix.read(fd, &buf) catch |err| {
switch (err) {
// This means our pty is closed. We're probably
// gracefully shutting down.
error.NotOpenForReading,
error.InputOutput,
=> {
log.info("io reader exiting", .{});
return;
},
// No more data, fall back to poll and check for
// exit conditions.
error.WouldBlock => break,
else => {
log.err("io reader error err={}", .{err});
unreachable;
},
}
};
// This happens on macOS instead of WouldBlock when the
// child process dies. To be safe, we just break the loop
// and let our poll happen.
if (n == 0) break;
// log.info("DATA: {d}", .{n});
@call(.always_inline, termio.Termio.processOutputReadData, .{ ev, buf[0..n] });
}
// Wait for data.
_ = posix.poll(&pollfds, -1) catch |err| {
log.warn("poll failed on read thread, exiting early err={}", .{err});
return;
};
// If our quit fd is set, we're done.
if (pollfds[1].revents & posix.POLL.IN != 0) {
log.info("read thread got quit signal", .{});
return;
}
}
}
fn threadMainWindows(fd: posix.fd_t, ev: *termio.Termio.ReadData, quit: posix.fd_t) void {
// Always close our end of the pipe when we exit.
defer posix.close(quit);
var buf: [1024]u8 = undefined;
while (true) {
while (true) {
var n: windows.DWORD = 0;
if (windows.kernel32.ReadFile(fd, &buf, buf.len, &n, null) == 0) {
const err = windows.kernel32.GetLastError();
switch (err) {
// Check for a quit signal
.OPERATION_ABORTED => break,
else => {
log.err("io reader error err={}", .{err});
unreachable;
},
}
}
@call(.always_inline, termio.Termio.processOutputReadData, .{ ev, buf[0..n] });
}
var quit_bytes: windows.DWORD = 0;
if (windows.exp.kernel32.PeekNamedPipe(quit, null, 0, null, &quit_bytes, null) == 0) {
const err = windows.kernel32.GetLastError();
log.err("quit pipe reader error err={}", .{err});
unreachable;
}
if (quit_bytes > 0) {
log.info("read thread got quit signal", .{});
return;
}
}
}
};

View File

@ -172,20 +172,11 @@ pub fn init(alloc: Allocator, opts: termio.Options) !Termio {
// Set our default cursor style // Set our default cursor style
term.screen.cursor.cursor_style = opts.config.cursor_style; term.screen.cursor.cursor_style = opts.config.cursor_style;
var subprocess = try termio.Exec.init(alloc, opts); // Setup our reader.
// TODO: for manual, we need to set the terminal width/height
var subprocess = try termio.Exec.init(alloc, opts, &term);
errdefer subprocess.deinit(); errdefer subprocess.deinit();
// If we have an initial pwd requested by the subprocess, then we
// set that on the terminal now. This allows rapidly initializing
// new surfaces to use the proper pwd.
if (subprocess.cwd) |cwd| term.setPwd(cwd) catch |err| {
log.warn("error setting initial pwd err={}", .{err});
};
// Initial width/height based on subprocess
term.width_px = subprocess.screen_size.width;
term.height_px = subprocess.screen_size.height;
return .{ return .{
.alloc = alloc, .alloc = alloc,
.terminal = term, .terminal = term,
@ -208,52 +199,14 @@ pub fn deinit(self: *Termio) void {
pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !void { pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !void {
const alloc = self.alloc; const alloc = self.alloc;
// Start our subprocess
const pty_fds = self.subprocess.start(alloc) catch |err| {
// If we specifically got this error then we are in the forked
// process and our child failed to execute. In that case
if (err != error.Termio) return err;
// Output an error message about the exec faililng and exit.
// This generally should NOT happen because we always wrap
// our command execution either in login (macOS) or /bin/sh
// (Linux) which are usually guaranteed to exist. Still, we
// want to handle this scenario.
self.execFailedInChild() catch {};
posix.exit(1);
};
errdefer self.subprocess.stop();
const pid = pid: {
const command = self.subprocess.command orelse return error.ProcessNotStarted;
break :pid command.pid orelse return error.ProcessNoPid;
};
// Track our process start time so we know how long it was
// running for.
const process_start = try std.time.Instant.now();
// Create our pipe that we'll use to kill our read thread.
// pipe[0] is the read end, pipe[1] is the write end.
const pipe = try internal_os.pipe();
errdefer posix.close(pipe[0]);
errdefer posix.close(pipe[1]);
// Setup our data that is used for callbacks // Setup our data that is used for callbacks
var read_data_ptr = try alloc.create(ReadData); var read_data_ptr = try alloc.create(ReadData);
errdefer alloc.destroy(read_data_ptr); errdefer alloc.destroy(read_data_ptr);
// Setup our stream so that we can write.
var stream = xev.Stream.initFd(pty_fds.write);
errdefer stream.deinit();
// Wakeup watcher for the writer thread. // Wakeup watcher for the writer thread.
var wakeup = try xev.Async.init(); var wakeup = try xev.Async.init();
errdefer wakeup.deinit(); errdefer wakeup.deinit();
// Watcher to detect subprocess exit
var process = try xev.Process.init(pid);
errdefer process.deinit();
// Create our stream handler // Create our stream handler
const handler: StreamHandler = handler: { const handler: StreamHandler = handler: {
const default_cursor_color = if (self.config.cursor_color) |col| const default_cursor_color = if (self.config.cursor_color) |col|
@ -303,15 +256,7 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
}; };
errdefer read_data_ptr.deinit(); errdefer read_data_ptr.deinit();
// Start our reader thread // Setup our thread data
const read_thread = try std.Thread.spawn(
.{},
if (builtin.os.tag == .windows) ReadThread.threadMainWindows else ReadThread.threadMainPosix,
.{ pty_fds.read, read_data_ptr, pipe[0] },
);
read_thread.setName("io-reader") catch {};
// Return our thread data
data.* = .{ data.* = .{
.alloc = alloc, .alloc = alloc,
.loop = &thread.loop, .loop = &thread.loop,
@ -319,27 +264,14 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
.surface_mailbox = self.surface_mailbox, .surface_mailbox = self.surface_mailbox,
.writer_mailbox = thread.mailbox, .writer_mailbox = thread.mailbox,
.writer_wakeup = thread.wakeup, .writer_wakeup = thread.wakeup,
.reader = .{ .exec = .{ .read_data = read_data_ptr,
.start = process_start,
.abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms, // Placeholder until setup below
.wait_after_command = self.config.wait_after_command, .reader = .{ .manual = {} },
.write_stream = stream,
.process = process,
} },
.read_thread = read_thread,
.read_thread_pipe = pipe[1],
.read_thread_fd = if (builtin.os.tag == .windows) pty_fds.read else {},
.read_thread_data = read_data_ptr,
}; };
// Start our process watcher // Setup our reader
process.wait( try self.subprocess.threadEnter(alloc, self, data);
&thread.loop,
&data.reader.exec.process_wait_c,
ThreadData,
data,
processExit,
);
} }
/// This outputs an error message when exec failed and we are the /// This outputs an error message when exec failed and we are the
@ -363,33 +295,7 @@ fn execFailedInChild(self: *Termio) !void {
} }
pub fn threadExit(self: *Termio, data: *ThreadData) void { pub fn threadExit(self: *Termio, data: *ThreadData) void {
// Stop our reader self.subprocess.threadExit(data);
switch (data.reader) {
.manual => {},
.exec => |exec| {
if (exec.exited) self.subprocess.externalExit();
self.subprocess.stop();
// Quit our read thread after exiting the subprocess so that
// we don't get stuck waiting for data to stop flowing if it is
// a particularly noisy process.
_ = posix.write(data.read_thread_pipe, "x") catch |err|
log.warn("error writing to read thread quit pipe err={}", .{err});
if (comptime builtin.os.tag == .windows) {
// Interrupt the blocking read so the thread can see the quit message
if (windows.kernel32.CancelIoEx(data.read_thread_fd, null) == 0) {
switch (windows.kernel32.GetLastError()) {
.NOT_FOUND => {},
else => |err| log.warn("error interrupting read thread err={}", .{err}),
}
}
}
data.read_thread.join();
},
}
} }
/// Update the configuration. /// Update the configuration.
@ -408,7 +314,7 @@ pub fn changeConfig(self: *Termio, td: *ThreadData, config: *DerivedConfig) !voi
// Update our stream handler. The stream handler uses the same // Update our stream handler. The stream handler uses the same
// renderer mutex so this is safe to do despite being executed // renderer mutex so this is safe to do despite being executed
// from another thread. // from another thread.
td.read_thread_data.terminal_stream.handler.changeConfig(&self.config); td.read_data.terminal_stream.handler.changeConfig(&self.config);
td.reader.changeConfig(&self.config); td.reader.changeConfig(&self.config);
// Update the configuration that we know about. // Update the configuration that we know about.
@ -552,78 +458,10 @@ pub fn jumpToPrompt(self: *Termio, delta: isize) !void {
/// Called when the child process exited abnormally but before /// Called when the child process exited abnormally but before
/// the surface is notified. /// the surface is notified.
pub fn childExitedAbnormally(self: *Termio, exit_code: u32, runtime_ms: u64) !void { pub fn childExitedAbnormally(self: *Termio, exit_code: u32, runtime_ms: u64) !void {
var arena = ArenaAllocator.init(self.alloc);
defer arena.deinit();
const alloc = arena.allocator();
// Build up our command for the error message
const command = try std.mem.join(alloc, " ", self.subprocess.args);
const runtime_str = try std.fmt.allocPrint(alloc, "{d} ms", .{runtime_ms});
// Modify the terminal to show our error message. This
// requires grabbing the renderer state lock.
self.renderer_state.mutex.lock(); self.renderer_state.mutex.lock();
defer self.renderer_state.mutex.unlock(); defer self.renderer_state.mutex.unlock();
const t = self.renderer_state.terminal; const t = self.renderer_state.terminal;
try self.subprocess.childExitedAbnormally(self.alloc, t, exit_code, runtime_ms);
// No matter what move the cursor back to the column 0.
t.carriageReturn();
// Reset styles
try t.setAttribute(.{ .unset = {} });
// If there is data in the viewport, we want to scroll down
// a little bit and write a horizontal rule before writing
// our message. This lets the use see the error message the
// command may have output.
const viewport_str = try t.plainString(alloc);
if (viewport_str.len > 0) {
try t.linefeed();
for (0..t.cols) |_| try t.print(0x2501);
t.carriageReturn();
try t.linefeed();
try t.linefeed();
}
// Output our error message
try t.setAttribute(.{ .@"8_fg" = .bright_red });
try t.setAttribute(.{ .bold = {} });
try t.printString("Ghostty failed to launch the requested command:");
try t.setAttribute(.{ .unset = {} });
t.carriageReturn();
try t.linefeed();
try t.linefeed();
try t.printString(command);
try t.setAttribute(.{ .unset = {} });
t.carriageReturn();
try t.linefeed();
try t.linefeed();
try t.printString("Runtime: ");
try t.setAttribute(.{ .@"8_fg" = .red });
try t.printString(runtime_str);
try t.setAttribute(.{ .unset = {} });
// We don't print this on macOS because the exit code is always 0
// due to the way we launch the process.
if (comptime !builtin.target.isDarwin()) {
const exit_code_str = try std.fmt.allocPrint(alloc, "{d}", .{exit_code});
t.carriageReturn();
try t.linefeed();
try t.printString("Exit Code: ");
try t.setAttribute(.{ .@"8_fg" = .red });
try t.printString(exit_code_str);
try t.setAttribute(.{ .unset = {} });
}
t.carriageReturn();
try t.linefeed();
try t.linefeed();
try t.printString("Press any key to close the window.");
// Hide the cursor
t.modes.set(.cursor_visible, false);
} }
pub inline fn queueWrite( pub inline fn queueWrite(
@ -632,85 +470,7 @@ pub inline fn queueWrite(
data: []const u8, data: []const u8,
linefeed: bool, linefeed: bool,
) !void { ) !void {
switch (td.reader) { try self.subprocess.queueWrite(self.alloc, td, data, linefeed);
.manual => {},
.exec => try self.queueWriteExec(
td,
data,
linefeed,
),
}
}
fn queueWriteExec(
self: *Termio,
td: *ThreadData,
data: []const u8,
linefeed: bool,
) !void {
const exec = &td.reader.exec;
// If our process is exited then we send our surface a message
// about it but we don't queue any more writes.
if (exec.exited) {
_ = td.surface_mailbox.push(.{
.child_exited = {},
}, .{ .forever = {} });
return;
}
// We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream.
var i: usize = 0;
while (i < data.len) {
const req = try exec.write_req_pool.getGrow(self.alloc);
const buf = try exec.write_buf_pool.getGrow(self.alloc);
const slice = slice: {
// The maximum end index is either the end of our data or
// the end of our buffer, whichever is smaller.
const max = @min(data.len, i + buf.len);
// Fast
if (!linefeed) {
fastmem.copy(u8, buf, data[i..max]);
const len = max - i;
i = max;
break :slice buf[0..len];
}
// Slow, have to replace \r with \r\n
var buf_i: usize = 0;
while (i < data.len and buf_i < buf.len - 1) {
const ch = data[i];
i += 1;
if (ch != '\r') {
buf[buf_i] = ch;
buf_i += 1;
continue;
}
// CRLF
buf[buf_i] = '\r';
buf[buf_i + 1] = '\n';
buf_i += 2;
}
break :slice buf[0..buf_i];
};
//for (slice) |b| log.warn("write: {x}", .{b});
exec.write_stream.queueWrite(
td.loop,
&exec.write_queue,
req,
.{ .slice = slice },
termio.reader.ThreadData.Exec,
exec,
ttyWrite,
);
}
} }
/// Process output from the pty. This is the manual API that users can /// Process output from the pty. This is the manual API that users can
@ -804,18 +564,12 @@ pub const ThreadData = struct {
/// Data associated with the reader implementation (i.e. pty/exec state) /// Data associated with the reader implementation (i.e. pty/exec state)
reader: termio.reader.ThreadData, reader: termio.reader.ThreadData,
read_data: *ReadData,
/// Our read thread
read_thread: std.Thread,
read_thread_pipe: posix.fd_t,
read_thread_fd: if (builtin.os.tag == .windows) posix.fd_t else void,
read_thread_data: *ReadData,
pub fn deinit(self: *ThreadData) void { pub fn deinit(self: *ThreadData) void {
posix.close(self.read_thread_pipe);
self.read_thread_data.deinit();
self.reader.deinit(self.alloc); self.reader.deinit(self.alloc);
self.alloc.destroy(self.read_thread_data); self.read_data.deinit();
self.alloc.destroy(self.read_data);
self.* = undefined; self.* = undefined;
} }
}; };
@ -849,247 +603,3 @@ pub const ReadData = struct {
self.terminal_stream.deinit(); self.terminal_stream.deinit();
} }
}; };
fn processExit(
td_: ?*ThreadData,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Process.WaitError!u32,
) xev.CallbackAction {
const exit_code = r catch unreachable;
const td = td_.?;
assert(td.reader == .exec);
const execdata = &td.reader.exec;
execdata.exited = true;
// Determine how long the process was running for.
const runtime_ms: ?u64 = runtime: {
const process_end = std.time.Instant.now() catch break :runtime null;
const runtime_ns = process_end.since(execdata.start);
const runtime_ms = runtime_ns / std.time.ns_per_ms;
break :runtime runtime_ms;
};
log.debug(
"child process exited status={} runtime={}ms",
.{ exit_code, runtime_ms orelse 0 },
);
// If our runtime was below some threshold then we assume that this
// was an abnormal exit and we show an error message.
if (runtime_ms) |runtime| runtime: {
// On macOS, our exit code detection doesn't work, possibly
// because of our `login` wrapper. More investigation required.
if (comptime !builtin.target.isDarwin()) {
// If our exit code is zero, then the command was successful
// and we don't ever consider it abnormal.
if (exit_code == 0) break :runtime;
}
// Our runtime always has to be under the threshold to be
// considered abnormal. This is because a user can always
// manually do something like `exit 1` in their shell to
// force the exit code to be non-zero. We only want to detect
// abnormal exits that happen so quickly the user can't react.
if (runtime > execdata.abnormal_runtime_threshold_ms) break :runtime;
log.warn("abnormal process exit detected, showing error message", .{});
// Notify our main writer thread which has access to more
// information so it can show a better error message.
_ = td.writer_mailbox.push(.{
.child_exited_abnormally = .{
.exit_code = exit_code,
.runtime_ms = runtime,
},
}, .{ .forever = {} });
td.writer_wakeup.notify() catch break :runtime;
return .disarm;
}
// If we're purposely waiting then we just return since the process
// exited flag is set to true. This allows the terminal window to remain
// open.
if (execdata.wait_after_command) {
// We output a message so that the user knows whats going on and
// doesn't think their terminal just froze.
terminal: {
td.renderer_state.mutex.lock();
defer td.renderer_state.mutex.unlock();
const t = td.renderer_state.terminal;
t.carriageReturn();
t.linefeed() catch break :terminal;
t.printString("Process exited. Press any key to close the terminal.") catch
break :terminal;
t.modes.set(.cursor_visible, false);
}
return .disarm;
}
// Notify our surface we want to close
_ = td.surface_mailbox.push(.{
.child_exited = {},
}, .{ .forever = {} });
return .disarm;
}
fn ttyWrite(
td_: ?*termio.reader.ThreadData.Exec,
_: *xev.Loop,
_: *xev.Completion,
_: xev.Stream,
_: xev.WriteBuffer,
r: xev.Stream.WriteError!usize,
) xev.CallbackAction {
const td = td_.?;
td.write_req_pool.put();
td.write_buf_pool.put();
const d = r catch |err| {
log.err("write error: {}", .{err});
return .disarm;
};
_ = d;
//log.info("WROTE: {d}", .{d});
return .disarm;
}
/// The read thread sits in a loop doing the following pseudo code:
///
/// while (true) { blocking_read(); exit_if_eof(); process(); }
///
/// Almost all terminal-modifying activity is from the pty read, so
/// putting this on a dedicated thread keeps performance very predictable
/// while also almost optimal. "Locking is fast, lock contention is slow."
/// and since we rarely have contention, this is fast.
///
/// This is also empirically fast compared to putting the read into
/// an async mechanism like io_uring/epoll because the reads are generally
/// small.
///
/// We use a basic poll syscall here because we are only monitoring two
/// fds and this is still much faster and lower overhead than any async
/// mechanism.
const ReadThread = struct {
fn threadMainPosix(fd: posix.fd_t, ev: *ReadData, quit: posix.fd_t) void {
// Always close our end of the pipe when we exit.
defer posix.close(quit);
// First thing, we want to set the fd to non-blocking. We do this
// so that we can try to read from the fd in a tight loop and only
// check the quit fd occasionally.
if (posix.fcntl(fd, posix.F.GETFL, 0)) |flags| {
_ = posix.fcntl(
fd,
posix.F.SETFL,
flags | @as(u32, @bitCast(posix.O{ .NONBLOCK = true })),
) catch |err| {
log.warn("read thread failed to set flags err={}", .{err});
log.warn("this isn't a fatal error, but may cause performance issues", .{});
};
} else |err| {
log.warn("read thread failed to get flags err={}", .{err});
log.warn("this isn't a fatal error, but may cause performance issues", .{});
}
// Build up the list of fds we're going to poll. We are looking
// for data on the pty and our quit notification.
var pollfds: [2]posix.pollfd = .{
.{ .fd = fd, .events = posix.POLL.IN, .revents = undefined },
.{ .fd = quit, .events = posix.POLL.IN, .revents = undefined },
};
var buf: [1024]u8 = undefined;
while (true) {
// We try to read from the file descriptor as long as possible
// to maximize performance. We only check the quit fd if the
// main fd blocks. This optimizes for the realistic scenario that
// the data will eventually stop while we're trying to quit. This
// is always true because we kill the process.
while (true) {
const n = posix.read(fd, &buf) catch |err| {
switch (err) {
// This means our pty is closed. We're probably
// gracefully shutting down.
error.NotOpenForReading,
error.InputOutput,
=> {
log.info("io reader exiting", .{});
return;
},
// No more data, fall back to poll and check for
// exit conditions.
error.WouldBlock => break,
else => {
log.err("io reader error err={}", .{err});
unreachable;
},
}
};
// This happens on macOS instead of WouldBlock when the
// child process dies. To be safe, we just break the loop
// and let our poll happen.
if (n == 0) break;
// log.info("DATA: {d}", .{n});
@call(.always_inline, processOutputReadData, .{ ev, buf[0..n] });
}
// Wait for data.
_ = posix.poll(&pollfds, -1) catch |err| {
log.warn("poll failed on read thread, exiting early err={}", .{err});
return;
};
// If our quit fd is set, we're done.
if (pollfds[1].revents & posix.POLL.IN != 0) {
log.info("read thread got quit signal", .{});
return;
}
}
}
fn threadMainWindows(fd: posix.fd_t, ev: *ReadData, quit: posix.fd_t) void {
// Always close our end of the pipe when we exit.
defer posix.close(quit);
var buf: [1024]u8 = undefined;
while (true) {
while (true) {
var n: windows.DWORD = 0;
if (windows.kernel32.ReadFile(fd, &buf, buf.len, &n, null) == 0) {
const err = windows.kernel32.GetLastError();
switch (err) {
// Check for a quit signal
.OPERATION_ABORTED => break,
else => {
log.err("io reader error err={}", .{err});
unreachable;
},
}
}
@call(.always_inline, processOutputReadData, .{ ev, buf[0..n] });
}
var quit_bytes: windows.DWORD = 0;
if (windows.exp.kernel32.PeekNamedPipe(quit, null, 0, null, &quit_bytes, null) == 0) {
const err = windows.kernel32.GetLastError();
log.err("quit pipe reader error err={}", .{err});
unreachable;
}
if (quit_bytes > 0) {
log.info("read thread got quit signal", .{});
return;
}
}
}
};

View File

@ -44,61 +44,12 @@ pub const Config = union(enum) {
/// Termio thread data. See termio.ThreadData for docs. /// Termio thread data. See termio.ThreadData for docs.
pub const ThreadData = union(Kind) { pub const ThreadData = union(Kind) {
manual: void, manual: void,
exec: ThreadData.Exec, exec: termio.Exec.ThreadData,
pub const Exec = struct {
/// Process start time and boolean of whether its already exited.
start: std.time.Instant,
exited: bool = false,
/// The number of milliseconds below which we consider a process
/// exit to be abnormal. This is used to show an error message
/// when the process exits too quickly.
abnormal_runtime_threshold_ms: u32,
/// If true, do not immediately send a child exited message to the
/// surface to close the surface when the command exits. If this is
/// false we'll show a process exited message and wait for user input
/// to close the surface.
wait_after_command: bool,
/// The data stream is the main IO for the pty.
write_stream: xev.Stream,
/// The process watcher
process: xev.Process,
/// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
/// The pool of available buffers for writing to the pty.
write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{},
/// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{},
/// This is used for both waiting for the process to exit and then
/// subsequently to wait for the data_stream to close.
process_wait_c: xev.Completion = .{},
};
pub fn deinit(self: *ThreadData, alloc: Allocator) void { pub fn deinit(self: *ThreadData, alloc: Allocator) void {
switch (self.*) { switch (self.*) {
.manual => {}, .manual => {},
.exec => |*exec| { .exec => |*exec| exec.deinit(alloc),
// Clear our write pools. We know we aren't ever going to do
// any more IO since we stop our data stream below so we can just
// drop this.
exec.write_req_pool.deinit(alloc);
exec.write_buf_pool.deinit(alloc);
// Stop our process watcher
exec.process.deinit();
// Stop our write stream
exec.write_stream.deinit();
},
} }
} }