diff --git a/src/termio/Termio.zig b/src/termio/Termio.zig index 0481a3126..f2a501b64 100644 --- a/src/termio/Termio.zig +++ b/src/termio/Termio.zig @@ -290,7 +290,6 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo .renderer_wakeup = self.renderer_wakeup, .renderer_mailbox = self.renderer_mailbox, .process = process, - .data_stream = stream, .loop = &thread.loop, .terminal_stream = .{ .handler = handler, @@ -326,10 +325,13 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo data.* = .{ .alloc = alloc, .ev = ev_data_ptr, + .loop = &thread.loop, + .surface_mailbox = self.surface_mailbox, .reader = .{ .exec = .{ .start = process_start, .abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms, .wait_after_command = self.config.wait_after_command, + .write_stream = stream, } }, .read_thread = read_thread, .read_thread_pipe = pipe[1], @@ -627,28 +629,39 @@ pub inline fn queueWrite( data: []const u8, linefeed: bool, ) !void { - const ev = td.ev; + switch (td.reader) { + .manual => {}, + .exec => try self.queueWriteExec( + td, + data, + linefeed, + ), + } +} + +fn queueWriteExec( + self: *Termio, + td: *ThreadData, + data: []const u8, + linefeed: bool, +) !void { + const exec = &td.reader.exec; // If our process is exited then we send our surface a message // about it but we don't queue any more writes. - switch (td.reader) { - .manual => {}, - .exec => |exec| { - if (exec.exited) { - _ = ev.surface_mailbox.push(.{ - .child_exited = {}, - }, .{ .forever = {} }); - return; - } - }, + if (exec.exited) { + _ = td.surface_mailbox.push(.{ + .child_exited = {}, + }, .{ .forever = {} }); + return; } // We go through and chunk the data if necessary to fit into // our cached buffers that we can queue to the stream. var i: usize = 0; while (i < data.len) { - const req = try ev.write_req_pool.getGrow(self.alloc); - const buf = try ev.write_buf_pool.getGrow(self.alloc); + const req = try exec.write_req_pool.getGrow(self.alloc); + const buf = try exec.write_buf_pool.getGrow(self.alloc); const slice = slice: { // The maximum end index is either the end of our data or // the end of our buffer, whichever is smaller. @@ -685,13 +698,13 @@ pub inline fn queueWrite( //for (slice) |b| log.warn("write: {x}", .{b}); - ev.data_stream.queueWrite( - ev.loop, - &ev.write_queue, + exec.write_stream.queueWrite( + td.loop, + &exec.write_queue, req, .{ .slice = slice }, - EventData, - ev, + termio.reader.ThreadData.Exec, + exec, ttyWrite, ); } @@ -764,6 +777,13 @@ pub const ThreadData = struct { /// The data that is attached to the callbacks. ev: *EventData, + /// The event loop associated with this thread. This is owned by + /// the Thread but we have a pointer so we can queue new work to it. + loop: *xev.Loop, + + /// Mailboxes for different threads + surface_mailbox: apprt.surface.Mailbox, + /// Data associated with the reader implementation (i.e. pty/exec state) reader: termio.reader.ThreadData, @@ -775,6 +795,7 @@ pub const ThreadData = struct { pub fn deinit(self: *ThreadData) void { posix.close(self.read_thread_pipe); self.ev.deinit(self.alloc); + self.reader.deinit(self.alloc); self.alloc.destroy(self.ev); self.* = undefined; } @@ -813,35 +834,15 @@ pub const EventData = struct { /// subsequently to wait for the data_stream to close. process_wait_c: xev.Completion = .{}, - /// The data stream is the main IO for the pty. - data_stream: xev.Stream, - /// The event loop, loop: *xev.Loop, - /// The write queue for the data stream. - write_queue: xev.Stream.WriteQueue = .{}, - - /// This is the pool of available (unused) write requests. If you grab - /// one from the pool, you must put it back when you're done! - write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{}, - - /// The pool of available buffers for writing to the pty. - write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{}, - /// Last time the cursor was reset. This is used to prevent message /// flooding with cursor resets. last_cursor_reset: i64 = 0, pub fn deinit(self: *EventData, alloc: Allocator) void { - // Clear our write pools. We know we aren't ever going to do - // any more IO since we stop our data stream below so we can just - // drop this. - self.write_req_pool.deinit(alloc); - self.write_buf_pool.deinit(alloc); - - // Stop our data stream - self.data_stream.deinit(); + _ = alloc; // Stop our process watcher self.process.deinit(); @@ -946,16 +947,16 @@ fn processExit( } fn ttyWrite( - ev_: ?*EventData, + td_: ?*termio.reader.ThreadData.Exec, _: *xev.Loop, _: *xev.Completion, _: xev.Stream, _: xev.WriteBuffer, r: xev.Stream.WriteError!usize, ) xev.CallbackAction { - const ev = ev_.?; - ev.write_req_pool.put(); - ev.write_buf_pool.put(); + const td = td_.?; + td.write_req_pool.put(); + td.write_buf_pool.put(); const d = r catch |err| { log.err("write error: {}", .{err}); diff --git a/src/termio/reader.zig b/src/termio/reader.zig index 99f78b561..10a6e9980 100644 --- a/src/termio/reader.zig +++ b/src/termio/reader.zig @@ -1,7 +1,14 @@ const std = @import("std"); +const Allocator = std.mem.Allocator; +const xev = @import("xev"); const configpkg = @import("../config.zig"); const termio = @import("../termio.zig"); const Command = @import("../Command.zig"); +const SegmentedPool = @import("../segmented_pool.zig").SegmentedPool; + +// The preallocation size for the write request pool. This should be big +// enough to satisfy most write requests. It must be a power of 2. +const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); /// The kinds of readers. pub const Kind = std.meta.Tag(Config); @@ -29,8 +36,9 @@ pub const Config = union(enum) { /// Termio thread data. See termio.ThreadData for docs. pub const ThreadData = union(Kind) { manual: void, + exec: Exec, - exec: struct { + pub const Exec = struct { /// Process start time and boolean of whether its already exited. start: std.time.Instant, exited: bool = false, @@ -45,7 +53,36 @@ pub const ThreadData = union(Kind) { /// false we'll show a process exited message and wait for user input /// to close the surface. wait_after_command: bool, - }, + + /// The data stream is the main IO for the pty. + write_stream: xev.Stream, + + /// This is the pool of available (unused) write requests. If you grab + /// one from the pool, you must put it back when you're done! + write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{}, + + /// The pool of available buffers for writing to the pty. + write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{}, + + /// The write queue for the data stream. + write_queue: xev.Stream.WriteQueue = .{}, + }; + + pub fn deinit(self: *ThreadData, alloc: Allocator) void { + switch (self.*) { + .manual => {}, + .exec => |*exec| { + // Clear our write pools. We know we aren't ever going to do + // any more IO since we stop our data stream below so we can just + // drop this. + exec.write_req_pool.deinit(alloc); + exec.write_buf_pool.deinit(alloc); + + // Stop our write stream + exec.write_stream.deinit(); + }, + } + } pub fn changeConfig(self: *ThreadData, config: *termio.DerivedConfig) void { switch (self.*) {