From 11d6e9122845ab55d4e28d0114c7a24b1b4c0c76 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 6 Feb 2023 14:52:24 -0800 Subject: [PATCH] termio: reader thread is thread-safe for writing to writer --- src/blocking_queue.zig | 17 ++- src/main.zig | 2 + src/termio.zig | 1 + src/termio/Exec.zig | 301 +++++++++++++++++++++-------------------- src/termio/Thread.zig | 4 +- src/termio/message.zig | 4 +- 6 files changed, 168 insertions(+), 161 deletions(-) diff --git a/src/blocking_queue.zig b/src/blocking_queue.zig index b3ef09b8c..1684b4751 100644 --- a/src/blocking_queue.zig +++ b/src/blocking_queue.zig @@ -55,25 +55,24 @@ pub fn BlockingQueue( }; /// Our data. The values are undefined until they are written. - data: [bounds]T, + data: [bounds]T = undefined, /// The next location to write (next empty loc) and next location /// to read (next non-empty loc). The number of written elements. - write: Size, - read: Size, - len: Size, + write: Size = 0, + read: Size = 0, + len: Size = 0, /// The big mutex that must be held to read/write. - mutex: std.Thread.Mutex, + mutex: std.Thread.Mutex = .{}, /// A CV for being notified when the queue is no longer full. This is /// used for writing. Note we DON'T have a CV for waiting on the /// queue not being EMPTY because we use external notifiers for that. - cond_not_full: std.Thread.Condition, - not_full_waiters: usize, + cond_not_full: std.Thread.Condition = .{}, + not_full_waiters: usize = 0, - /// Allocate the blocking queue. Allocation must always happen on - /// the heap due to shared concurrency state. + /// Allocate the blocking queue on the heap. pub fn create(alloc: Allocator) !*Self { const ptr = try alloc.create(Self); errdefer alloc.destroy(ptr); diff --git a/src/main.zig b/src/main.zig index 89f7e9ef4..0fd4b1a7b 100644 --- a/src/main.zig +++ b/src/main.zig @@ -7,6 +7,7 @@ const freetype = @import("freetype"); const harfbuzz = @import("harfbuzz"); const macos = @import("macos"); const tracy = @import("tracy"); +const xev = @import("xev"); const renderer = @import("renderer.zig"); const xdg = @import("xdg.zig"); const internal_os = @import("os/main.zig"); @@ -22,6 +23,7 @@ pub fn main() !void { std.log.info("dependency fontconfig={d}", .{fontconfig.version()}); } std.log.info("renderer={}", .{renderer.Renderer}); + std.log.info("libxev backend={}", .{xev.backend}); // First things first, we fix our file descriptors internal_os.fixMaxFiles(); diff --git a/src/termio.zig b/src/termio.zig index f988ddb5f..55d58ef1c 100644 --- a/src/termio.zig +++ b/src/termio.zig @@ -6,6 +6,7 @@ pub usingnamespace @import("termio/message.zig"); pub const Exec = @import("termio/Exec.zig"); pub const Options = @import("termio/Options.zig"); pub const Thread = @import("termio/Thread.zig"); +pub const Mailbox = Thread.Mailbox; /// The implementation to use for the IO. This is just "exec" for now but /// this is somewhat pluggable so that in the future we can introduce other diff --git a/src/termio/Exec.zig b/src/termio/Exec.zig index b51ab9eb8..50388373e 100644 --- a/src/termio/Exec.zig +++ b/src/termio/Exec.zig @@ -62,7 +62,8 @@ grid_size: renderer.GridSize, /// The data associated with the currently running thread. data: ?*EventData, -io_thread: ?std.Thread, +/// The thread that is started to read the data associated with the pty. +read_thread: ?std.Thread, /// Initialize the exec implementation. This will also start the child /// process. @@ -153,7 +154,7 @@ pub fn init(alloc: Allocator, opts: termio.Options) !Exec { .window_mailbox = opts.window_mailbox, .grid_size = opts.grid_size, .data = null, - .io_thread = null, + .read_thread = null, }; } @@ -164,6 +165,9 @@ pub fn deinit(self: *Exec) void { _ = self.command.wait(false) catch |err| log.err("error waiting for command to exit: {}", .{err}); + // Wait for our reader thread to end + if (self.read_thread) |thr| thr.join(); + // Clean up our other members self.terminal.deinit(self.alloc); } @@ -204,7 +208,7 @@ fn killCommand(self: *Exec) !void { } } -pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData { +pub fn threadEnter(self: *Exec, thread: *termio.Thread) !ThreadData { assert(self.data == null); const alloc = self.alloc; @@ -212,17 +216,23 @@ pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData { var ev_data_ptr = try alloc.create(EventData); errdefer alloc.destroy(ev_data_ptr); - // Read data + // Setup our stream so that we can write. var stream = xev.Stream.initFd(self.pty.master); errdefer stream.deinit(); + // Wakeup watcher for the writer thread. + var wakeup = try xev.Async.init(); + errdefer wakeup.deinit(); + // Setup our event data before we start ev_data_ptr.* = .{ + .writer_mailbox = thread.mailbox, + .writer_wakeup = thread.wakeup, .renderer_state = self.renderer_state, .renderer_wakeup = self.renderer_wakeup, .renderer_mailbox = self.renderer_mailbox, .data_stream = stream, - .loop = loop, + .loop = &thread.loop, .terminal_stream = .{ .handler = .{ .alloc = self.alloc, @@ -238,22 +248,14 @@ pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData { // Store our data so our callbacks can access it self.data = ev_data_ptr; - self.io_thread = try std.Thread.spawn( + // Start our reader thread + assert(self.read_thread == null); + self.read_thread = try std.Thread.spawn( .{}, - ioMain, + ReadThread.threadMain, .{ self.pty.master, ev_data_ptr }, ); - self.io_thread.?.setName("io-reader") catch {}; - - // Start our stream read - // stream.read( - // loop, - // &ev_data_ptr.data_stream_c_read, - // .{ .slice = &ev_data_ptr.data_stream_buf }, - // EventData, - // ev_data_ptr, - // ttyRead, - // ); + self.read_thread.?.setName("io-reader") catch {}; // Return our thread data return ThreadData{ @@ -262,28 +264,8 @@ pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData { }; } -/// The main entrypoint for the thread. -pub fn ioMain(fd: std.os.fd_t, ev: *EventData) void { - while (true) { - const n = std.os.read(fd, &ev.data_stream_buf) catch |err| { - log.err("READ ERROR err={}", .{err}); - return; - }; - - _ = @call(.always_inline, ttyRead, .{ - ev, - undefined, - undefined, - undefined, - .{ .slice = &ev.data_stream_buf }, - n, - }); - } -} - pub fn threadExit(self: *Exec, data: ThreadData) void { _ = data; - self.data = null; } @@ -318,7 +300,28 @@ pub fn resize( } pub inline fn queueWrite(self: *Exec, data: []const u8) !void { - try self.data.?.queueWrite(data); + const ev = self.data.?; + + // We go through and chunk the data if necessary to fit into + // our cached buffers that we can queue to the stream. + var i: usize = 0; + while (i < data.len) { + const req = try ev.write_req_pool.get(); + const buf = try ev.write_buf_pool.get(); + const end = @min(data.len, i + buf.len); + fastmem.copy(u8, buf, data[i..end]); + ev.data_stream.queueWrite( + ev.loop, + &ev.write_queue, + req, + .{ .slice = buf[0..(end - i)] }, + EventData, + ev, + ttyWrite, + ); + + i = end; + } } const ThreadData = struct { @@ -340,6 +343,10 @@ const EventData = struct { // enough to satisfy most write requests. It must be a power of 2. const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); + /// Mailbox for data to the writer thread. + writer_mailbox: *termio.Mailbox, + writer_wakeup: xev.Async, + /// The stream parser. This parses the stream of escape codes and so on /// from the child process and calls callbacks in the stream handler. terminal_stream: terminal.Stream(StreamHandler), @@ -356,8 +363,6 @@ const EventData = struct { /// The data stream is the main IO for the pty. data_stream: xev.Stream, - data_stream_c_read: xev.Completion = .{}, - data_stream_buf: [1024]u8 = undefined, /// The event loop, loop: *xev.Loop, @@ -384,7 +389,6 @@ const EventData = struct { self.write_buf_pool.deinit(alloc); // Stop our data stream - // TODO: close? self.data_stream.deinit(); } @@ -394,30 +398,6 @@ const EventData = struct { inline fn queueRender(self: *EventData) !void { try self.renderer_wakeup.notify(); } - - /// Queue a write to the pty. - fn queueWrite(self: *EventData, data: []const u8) !void { - // We go through and chunk the data if necessary to fit into - // our cached buffers that we can queue to the stream. - var i: usize = 0; - while (i < data.len) { - const req = try self.write_req_pool.get(); - const buf = try self.write_buf_pool.get(); - const end = @min(data.len, i + buf.len); - fastmem.copy(u8, buf, data[i..end]); - self.data_stream.queueWrite( - self.loop, - &self.write_queue, - req, - .{ .slice = buf[0..(end - i)] }, - EventData, - self, - ttyWrite, - ); - - i = end; - } - } }; fn ttyWrite( @@ -442,92 +422,112 @@ fn ttyWrite( return .disarm; } -fn ttyRead( - ev_: ?*EventData, - _: *xev.Loop, - _: *xev.Completion, - _: xev.Stream, - read_buf: xev.ReadBuffer, - r: xev.Stream.ReadError!usize, -) xev.CallbackAction { - const zone = trace(@src()); - defer zone.end(); +/// The read thread sits in a loop doing the following pseudo code: +/// +/// while (true) { blocking_read(); exit_if_eof(); process(); } +/// +/// Almost all terminal-modifying activity is from the pty read, so +/// putting this on a dedicated thread keeps performance very predictable +/// while also almost optimal. "Locking is fast, lock contention is slow." +/// and since we rarely have contention, this is fast. +/// +/// This is also empirically fast compared to putting the read into +/// an async mechanism like io_uring/epoll because the reads are generally +/// small. +const ReadThread = struct { + /// The main entrypoint for the thread. + fn threadMain(fd: std.os.fd_t, ev: *EventData) void { + var buf: [1024]u8 = undefined; + while (true) { + const n = std.os.read(fd, &buf) catch |err| { + log.err("READ ERROR err={}", .{err}); + return; + }; + log.info("DATA: {d}", .{n}); - const ev = ev_.?; - const n = r catch |err| { - switch (err) { - error.EOF => return .disarm, - else => log.err("read error err={}", .{err}), + @call(.always_inline, process, .{ ev, buf[0..n] }); } + } - return .rearm; - }; - const buf = read_buf.slice[0..n]; + fn process( + ev: *EventData, + buf: []const u8, + ) void { + const zone = trace(@src()); + defer zone.end(); - // log.info("DATA: {d}", .{n}); - // log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]}); + // log.info("DATA: {d}", .{n}); + // log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]}); - // Whenever a character is typed, we ensure the cursor is in the - // non-blink state so it is rendered if visible. If we're under - // HEAVY read load, we don't want to send a ton of these so we - // use a timer under the covers - // TODO - // const now = t.loop().now(); - // if (now - ev.last_cursor_reset > 500) { - // ev.last_cursor_reset = now; - // _ = ev.renderer_mailbox.push(.{ - // .reset_cursor_blink = {}, - // }, .{ .forever = {} }); - // } + // Whenever a character is typed, we ensure the cursor is in the + // non-blink state so it is rendered if visible. If we're under + // HEAVY read load, we don't want to send a ton of these so we + // use a timer under the covers + // TODO + // const now = t.loop().now(); + // if (now - ev.last_cursor_reset > 500) { + // ev.last_cursor_reset = now; + // _ = ev.renderer_mailbox.push(.{ + // .reset_cursor_blink = {}, + // }, .{ .forever = {} }); + // } - // We are modifying terminal state from here on out - ev.renderer_state.mutex.lock(); - defer ev.renderer_state.mutex.unlock(); + // We are modifying terminal state from here on out + ev.renderer_state.mutex.lock(); + defer ev.renderer_state.mutex.unlock(); - // Schedule a render - ev.queueRender() catch unreachable; + // Schedule a render + ev.queueRender() catch unreachable; - // Process the terminal data. This is an extremely hot part of the - // terminal emulator, so we do some abstraction leakage to avoid - // function calls and unnecessary logic. - // - // The ground state is the only state that we can see and print/execute - // ASCII, so we only execute this hot path if we're already in the ground - // state. - // - // Empirically, this alone improved throughput of large text output by ~20%. - var i: usize = 0; - const end = @intCast(usize, n); - if (ev.terminal_stream.parser.state == .ground) { - for (buf[i..end]) |ch| { - switch (terminal.parse_table.table[ch][@enumToInt(terminal.Parser.State.ground)].action) { - // Print, call directly. - .print => ev.terminal_stream.handler.print(@intCast(u21, ch)) catch |err| - log.err("error processing terminal data: {}", .{err}), + // Process the terminal data. This is an extremely hot part of the + // terminal emulator, so we do some abstraction leakage to avoid + // function calls and unnecessary logic. + // + // The ground state is the only state that we can see and print/execute + // ASCII, so we only execute this hot path if we're already in the ground + // state. + // + // Empirically, this alone improved throughput of large text output by ~20%. + var i: usize = 0; + const end = buf.len; + if (ev.terminal_stream.parser.state == .ground) { + for (buf[i..end]) |ch| { + switch (terminal.parse_table.table[ch][@enumToInt(terminal.Parser.State.ground)].action) { + // Print, call directly. + .print => ev.terminal_stream.handler.print(@intCast(u21, ch)) catch |err| + log.err("error processing terminal data: {}", .{err}), - // C0 execute, let our stream handle this one but otherwise - // continue since we're guaranteed to be back in ground. - .execute => ev.terminal_stream.execute(ch) catch |err| - log.err("error processing terminal data: {}", .{err}), + // C0 execute, let our stream handle this one but otherwise + // continue since we're guaranteed to be back in ground. + .execute => ev.terminal_stream.execute(ch) catch |err| + log.err("error processing terminal data: {}", .{err}), - // Otherwise, break out and go the slow path until we're - // back in ground. There is a slight optimization here where - // could try to find the next transition to ground but when - // I implemented that it didn't materially change performance. - else => break, + // Otherwise, break out and go the slow path until we're + // back in ground. There is a slight optimization here where + // could try to find the next transition to ground but when + // I implemented that it didn't materially change performance. + else => break, + } + + i += 1; } + } - i += 1; + if (i < end) { + ev.terminal_stream.nextSlice(buf[i..end]) catch |err| + log.err("error processing terminal data: {}", .{err}); + } + + // If our stream handling caused messages to be sent to the writer + // thread, then we need to wake it up so that it processes them. + if (ev.terminal_stream.handler.writer_messaged) { + ev.terminal_stream.handler.writer_messaged = false; + ev.writer_wakeup.notify() catch |err| { + log.warn("failed to wake up writer thread err={}", .{err}); + }; } } - - if (i < end) { - ev.terminal_stream.nextSlice(buf[i..end]) catch |err| - log.err("error processing terminal data: {}", .{err}); - } - - return .rearm; -} +}; /// This is used as the handler for the terminal.Stream type. This is /// stateful and is expected to live for the entire lifetime of the terminal. @@ -540,12 +540,18 @@ const StreamHandler = struct { terminal: *terminal.Terminal, window_mailbox: Window.Mailbox, + /// This is set to true when a message was written to the writer + /// mailbox. This can be used by callers to determine if they need + /// to wake up the writer. + writer_messaged: bool = false, + inline fn queueRender(self: *StreamHandler) !void { try self.ev.queueRender(); } - inline fn queueWrite(self: *StreamHandler, data: []const u8) !void { - try self.ev.queueWrite(data); + inline fn messageWriter(self: *StreamHandler, msg: termio.Message) void { + _ = self.ev.writer_mailbox.push(msg, .{ .forever = {} }); + self.writer_messaged = true; } pub fn print(self: *StreamHandler, ch: u21) !void { @@ -753,8 +759,7 @@ const StreamHandler = struct { switch (req) { // VT220 - .primary => self.queueWrite("\x1B[?62;c") catch |err| - log.warn("error queueing device attr response: {}", .{err}), + .primary => self.messageWriter(.{ .write_stable = "\x1B[?62;c" }), else => log.warn("unimplemented device attributes req: {}", .{req}), } } @@ -764,8 +769,7 @@ const StreamHandler = struct { req: terminal.DeviceStatusReq, ) !void { switch (req) { - .operating_status => self.queueWrite("\x1B[0n") catch |err| - log.warn("error queueing device attr response: {}", .{err}), + .operating_status => self.messageWriter(.{ .write_stable = "\x1B[0n" }), .cursor_position => { const pos: struct { @@ -783,13 +787,14 @@ const StreamHandler = struct { // Response always is at least 4 chars, so this leaves the // remainder for the row/column as base-10 numbers. This // will support a very large terminal. - var buf: [32]u8 = undefined; - const resp = try std.fmt.bufPrint(&buf, "\x1B[{};{}R", .{ + var msg: termio.Message = .{ .write_small = .{} }; + const resp = try std.fmt.bufPrint(&msg.write_small.data, "\x1B[{};{}R", .{ pos.y + 1, pos.x + 1, }); + msg.write_small.len = @intCast(u8, resp.len); - try self.queueWrite(resp); + self.messageWriter(msg); }, else => log.warn("unimplemented device status req: {}", .{req}), @@ -824,7 +829,7 @@ const StreamHandler = struct { } pub fn enquiry(self: *StreamHandler) !void { - try self.queueWrite(""); + self.messageWriter(.{ .write_stable = "" }); } pub fn scrollDown(self: *StreamHandler, count: usize) !void { diff --git a/src/termio/Thread.zig b/src/termio/Thread.zig index d4140bb82..48dd4a35e 100644 --- a/src/termio/Thread.zig +++ b/src/termio/Thread.zig @@ -16,7 +16,7 @@ const log = std.log.scoped(.io_thread); /// The type used for sending messages to the IO thread. For now this is /// hardcoded with a capacity. We can make this a comptime parameter in /// the future if we want it configurable. -const Mailbox = BlockingQueue(termio.Message, 64); +pub const Mailbox = BlockingQueue(termio.Message, 64); /// Allocator used for some state alloc: std.mem.Allocator, @@ -99,7 +99,7 @@ fn threadMain_(self: *Thread) !void { // Run our thread start/end callbacks. This allows the implementation // to hook into the event loop as needed. - var data = try self.impl.threadEnter(&self.loop); + var data = try self.impl.threadEnter(self); defer data.deinit(); defer self.impl.threadExit(data); diff --git a/src/termio/message.zig b/src/termio/message.zig index 2b1acb4a6..008093da0 100644 --- a/src/termio/message.zig +++ b/src/termio/message.zig @@ -60,8 +60,8 @@ pub fn MessageData(comptime Elem: type, comptime small_size: comptime_int) type pub const Small = struct { pub const Max = small_size; pub const Array = [Max]Elem; - data: Array, - len: u8, + data: Array = undefined, + len: u8 = 0, }; pub const Alloc = struct {