termio: queueWrite no longer uses EventData

This commit is contained in:
Mitchell Hashimoto
2024-07-13 14:44:44 -07:00
parent ffaf020576
commit bfbbe1485e
2 changed files with 84 additions and 46 deletions

View File

@ -290,7 +290,6 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
.renderer_wakeup = self.renderer_wakeup, .renderer_wakeup = self.renderer_wakeup,
.renderer_mailbox = self.renderer_mailbox, .renderer_mailbox = self.renderer_mailbox,
.process = process, .process = process,
.data_stream = stream,
.loop = &thread.loop, .loop = &thread.loop,
.terminal_stream = .{ .terminal_stream = .{
.handler = handler, .handler = handler,
@ -326,10 +325,13 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
data.* = .{ data.* = .{
.alloc = alloc, .alloc = alloc,
.ev = ev_data_ptr, .ev = ev_data_ptr,
.loop = &thread.loop,
.surface_mailbox = self.surface_mailbox,
.reader = .{ .exec = .{ .reader = .{ .exec = .{
.start = process_start, .start = process_start,
.abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms, .abnormal_runtime_threshold_ms = self.config.abnormal_runtime_threshold_ms,
.wait_after_command = self.config.wait_after_command, .wait_after_command = self.config.wait_after_command,
.write_stream = stream,
} }, } },
.read_thread = read_thread, .read_thread = read_thread,
.read_thread_pipe = pipe[1], .read_thread_pipe = pipe[1],
@ -627,28 +629,39 @@ pub inline fn queueWrite(
data: []const u8, data: []const u8,
linefeed: bool, linefeed: bool,
) !void { ) !void {
const ev = td.ev; switch (td.reader) {
.manual => {},
.exec => try self.queueWriteExec(
td,
data,
linefeed,
),
}
}
fn queueWriteExec(
self: *Termio,
td: *ThreadData,
data: []const u8,
linefeed: bool,
) !void {
const exec = &td.reader.exec;
// If our process is exited then we send our surface a message // If our process is exited then we send our surface a message
// about it but we don't queue any more writes. // about it but we don't queue any more writes.
switch (td.reader) { if (exec.exited) {
.manual => {}, _ = td.surface_mailbox.push(.{
.exec => |exec| { .child_exited = {},
if (exec.exited) { }, .{ .forever = {} });
_ = ev.surface_mailbox.push(.{ return;
.child_exited = {},
}, .{ .forever = {} });
return;
}
},
} }
// We go through and chunk the data if necessary to fit into // We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream. // our cached buffers that we can queue to the stream.
var i: usize = 0; var i: usize = 0;
while (i < data.len) { while (i < data.len) {
const req = try ev.write_req_pool.getGrow(self.alloc); const req = try exec.write_req_pool.getGrow(self.alloc);
const buf = try ev.write_buf_pool.getGrow(self.alloc); const buf = try exec.write_buf_pool.getGrow(self.alloc);
const slice = slice: { const slice = slice: {
// The maximum end index is either the end of our data or // The maximum end index is either the end of our data or
// the end of our buffer, whichever is smaller. // the end of our buffer, whichever is smaller.
@ -685,13 +698,13 @@ pub inline fn queueWrite(
//for (slice) |b| log.warn("write: {x}", .{b}); //for (slice) |b| log.warn("write: {x}", .{b});
ev.data_stream.queueWrite( exec.write_stream.queueWrite(
ev.loop, td.loop,
&ev.write_queue, &exec.write_queue,
req, req,
.{ .slice = slice }, .{ .slice = slice },
EventData, termio.reader.ThreadData.Exec,
ev, exec,
ttyWrite, ttyWrite,
); );
} }
@ -764,6 +777,13 @@ pub const ThreadData = struct {
/// The data that is attached to the callbacks. /// The data that is attached to the callbacks.
ev: *EventData, ev: *EventData,
/// The event loop associated with this thread. This is owned by
/// the Thread but we have a pointer so we can queue new work to it.
loop: *xev.Loop,
/// Mailboxes for different threads
surface_mailbox: apprt.surface.Mailbox,
/// Data associated with the reader implementation (i.e. pty/exec state) /// Data associated with the reader implementation (i.e. pty/exec state)
reader: termio.reader.ThreadData, reader: termio.reader.ThreadData,
@ -775,6 +795,7 @@ pub const ThreadData = struct {
pub fn deinit(self: *ThreadData) void { pub fn deinit(self: *ThreadData) void {
posix.close(self.read_thread_pipe); posix.close(self.read_thread_pipe);
self.ev.deinit(self.alloc); self.ev.deinit(self.alloc);
self.reader.deinit(self.alloc);
self.alloc.destroy(self.ev); self.alloc.destroy(self.ev);
self.* = undefined; self.* = undefined;
} }
@ -813,35 +834,15 @@ pub const EventData = struct {
/// subsequently to wait for the data_stream to close. /// subsequently to wait for the data_stream to close.
process_wait_c: xev.Completion = .{}, process_wait_c: xev.Completion = .{},
/// The data stream is the main IO for the pty.
data_stream: xev.Stream,
/// The event loop, /// The event loop,
loop: *xev.Loop, loop: *xev.Loop,
/// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{},
/// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
/// The pool of available buffers for writing to the pty.
write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{},
/// Last time the cursor was reset. This is used to prevent message /// Last time the cursor was reset. This is used to prevent message
/// flooding with cursor resets. /// flooding with cursor resets.
last_cursor_reset: i64 = 0, last_cursor_reset: i64 = 0,
pub fn deinit(self: *EventData, alloc: Allocator) void { pub fn deinit(self: *EventData, alloc: Allocator) void {
// Clear our write pools. We know we aren't ever going to do _ = alloc;
// any more IO since we stop our data stream below so we can just
// drop this.
self.write_req_pool.deinit(alloc);
self.write_buf_pool.deinit(alloc);
// Stop our data stream
self.data_stream.deinit();
// Stop our process watcher // Stop our process watcher
self.process.deinit(); self.process.deinit();
@ -946,16 +947,16 @@ fn processExit(
} }
fn ttyWrite( fn ttyWrite(
ev_: ?*EventData, td_: ?*termio.reader.ThreadData.Exec,
_: *xev.Loop, _: *xev.Loop,
_: *xev.Completion, _: *xev.Completion,
_: xev.Stream, _: xev.Stream,
_: xev.WriteBuffer, _: xev.WriteBuffer,
r: xev.Stream.WriteError!usize, r: xev.Stream.WriteError!usize,
) xev.CallbackAction { ) xev.CallbackAction {
const ev = ev_.?; const td = td_.?;
ev.write_req_pool.put(); td.write_req_pool.put();
ev.write_buf_pool.put(); td.write_buf_pool.put();
const d = r catch |err| { const d = r catch |err| {
log.err("write error: {}", .{err}); log.err("write error: {}", .{err});

View File

@ -1,7 +1,14 @@
const std = @import("std"); const std = @import("std");
const Allocator = std.mem.Allocator;
const xev = @import("xev");
const configpkg = @import("../config.zig"); const configpkg = @import("../config.zig");
const termio = @import("../termio.zig"); const termio = @import("../termio.zig");
const Command = @import("../Command.zig"); const Command = @import("../Command.zig");
const SegmentedPool = @import("../segmented_pool.zig").SegmentedPool;
// The preallocation size for the write request pool. This should be big
// enough to satisfy most write requests. It must be a power of 2.
const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5);
/// The kinds of readers. /// The kinds of readers.
pub const Kind = std.meta.Tag(Config); pub const Kind = std.meta.Tag(Config);
@ -29,8 +36,9 @@ pub const Config = union(enum) {
/// Termio thread data. See termio.ThreadData for docs. /// Termio thread data. See termio.ThreadData for docs.
pub const ThreadData = union(Kind) { pub const ThreadData = union(Kind) {
manual: void, manual: void,
exec: Exec,
exec: struct { pub const Exec = struct {
/// Process start time and boolean of whether its already exited. /// Process start time and boolean of whether its already exited.
start: std.time.Instant, start: std.time.Instant,
exited: bool = false, exited: bool = false,
@ -45,7 +53,36 @@ pub const ThreadData = union(Kind) {
/// false we'll show a process exited message and wait for user input /// false we'll show a process exited message and wait for user input
/// to close the surface. /// to close the surface.
wait_after_command: bool, wait_after_command: bool,
},
/// The data stream is the main IO for the pty.
write_stream: xev.Stream,
/// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
/// The pool of available buffers for writing to the pty.
write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{},
/// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{},
};
pub fn deinit(self: *ThreadData, alloc: Allocator) void {
switch (self.*) {
.manual => {},
.exec => |*exec| {
// Clear our write pools. We know we aren't ever going to do
// any more IO since we stop our data stream below so we can just
// drop this.
exec.write_req_pool.deinit(alloc);
exec.write_buf_pool.deinit(alloc);
// Stop our write stream
exec.write_stream.deinit();
},
}
}
pub fn changeConfig(self: *ThreadData, config: *termio.DerivedConfig) void { pub fn changeConfig(self: *ThreadData, config: *termio.DerivedConfig) void {
switch (self.*) { switch (self.*) {