diff --git a/src/Surface.zig b/src/Surface.zig index 68ab9214a..d0d320511 100644 --- a/src/Surface.zig +++ b/src/Surface.zig @@ -397,12 +397,15 @@ pub fn init( errdefer render_thread.deinit(); // Start our IO implementation + var io_writer = try termio.Writer.initMailbox(alloc); + errdefer io_writer.deinit(alloc); var io = try termio.Termio.init(alloc, .{ .grid_size = grid_size, .screen_size = screen_size, .padding = padding, .full_config = config, .config = try termio.Termio.DerivedConfig.init(alloc, config), + .writer = io_writer, .resources_dir = main.state.resources_dir, .renderer_state = &self.renderer_state, .renderer_wakeup = render_thread.wakeup, @@ -616,7 +619,7 @@ pub fn activateInspector(self: *Surface) !void { // Notify our components we have an inspector active _ = self.renderer_thread.mailbox.push(.{ .inspector = true }, .{ .forever = {} }); - _ = self.io_thread.mailbox.push(.{ .inspector = true }, .{ .forever = {} }); + self.io.queueMessage(.{ .inspector = true }, .unlocked); } /// Deactivate the inspector and stop collecting any information. @@ -633,7 +636,7 @@ pub fn deactivateInspector(self: *Surface) void { // Notify our components we have deactivated inspector _ = self.renderer_thread.mailbox.push(.{ .inspector = false }, .{ .forever = {} }); - _ = self.io_thread.mailbox.push(.{ .inspector = false }, .{ .forever = {} }); + self.io.queueMessage(.{ .inspector = false }, .unlocked); // Deinit the inspector insp.deinit(); @@ -733,8 +736,7 @@ fn reportColorScheme(self: *Surface) !void { .dark => "\x1B[?997;1n", }; - _ = self.io_thread.mailbox.push(.{ .write_stable = output }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + self.io.queueMessage(.{ .write_stable = output }, .unlocked); } /// Call this when modifiers change. This is safe to call even if modifiers @@ -815,20 +817,17 @@ fn changeConfig(self: *Surface, config: *const configpkg.Config) !void { errdefer termio_config_ptr.deinit(); _ = self.renderer_thread.mailbox.push(renderer_message, .{ .forever = {} }); - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .change_config = .{ .alloc = self.alloc, .ptr = termio_config_ptr, }, - }, .{ .forever = {} }); + }, .unlocked); // With mailbox messages sent, we have to wake them up so they process it. self.queueRender() catch |err| { log.warn("failed to notify renderer of config change err={}", .{err}); }; - self.io_thread.wakeup.notify() catch |err| { - log.warn("failed to notify io thread of config change err={}", .{err}); - }; } /// Returns true if the terminal has a selection. @@ -1066,14 +1065,13 @@ fn setCellSize(self: *Surface, size: renderer.CellSize) !void { ); // Notify the terminal - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .resize = .{ .grid_size = self.grid_size, .screen_size = self.screen_size, .padding = self.padding, }, - }, .{ .forever = {} }); - self.io_thread.wakeup.notify() catch {}; + }, .unlocked); // Notify the window try self.rt_surface.setCellSize(size.width, size.height); @@ -1169,14 +1167,13 @@ fn resize(self: *Surface, size: renderer.ScreenSize) !void { } // Mail the IO thread - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .resize = .{ .grid_size = self.grid_size, .screen_size = self.screen_size, .padding = self.padding, }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); } /// Called to set the preedit state for character input. Preedit is used @@ -1542,12 +1539,11 @@ pub fn keyCallback( ev.pty = copy; } - _ = self.io_thread.mailbox.push(switch (write_req) { + self.io.queueMessage(switch (write_req) { .small => |v| .{ .write_small = v }, .stable => |v| .{ .write_stable = v }, .alloc => |v| .{ .write_alloc = v }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); // If our event is any keypress that isn't a modifier and we generated // some data to send to the pty, then we move the viewport down to the @@ -1647,11 +1643,7 @@ pub fn focusCallback(self: *Surface, focused: bool) !void { if (focus_event) { const seq = if (focused) "\x1b[I" else "\x1b[O"; - _ = self.io_thread.mailbox.push(.{ - .write_stable = seq, - }, .{ .forever = {} }); - - try self.io_thread.wakeup.notify(); + self.io.queueMessage(.{ .write_stable = seq }, .unlocked); } } } @@ -1786,14 +1778,10 @@ pub fn scrollCallback( break :seq if (y.delta < 0) "\x1b[A" else "\x1b[B"; }; for (0..y.delta_unsigned) |_| { - _ = self.io_thread.mailbox.push(.{ - .write_stable = seq, - }, .{ .instant = {} }); + self.io.queueMessage(.{ .write_stable = seq }, .locked); } } - // After sending all our messages we have to notify our IO thread - try self.io_thread.wakeup.notify(); return; } @@ -1995,12 +1983,10 @@ fn mouseReport( data[5] = 32 + @as(u8, @intCast(viewport_point.y)) + 1; // Ask our IO thread to write the data - _ = self.io_thread.mailbox.push(.{ - .write_small = .{ - .data = data, - .len = 6, - }, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_small = .{ + .data = data, + .len = 6, + } }, .locked); }, .utf8 => { @@ -2020,12 +2006,10 @@ fn mouseReport( i += try std.unicode.utf8Encode(@intCast(32 + viewport_point.y + 1), data[i..]); // Ask our IO thread to write the data - _ = self.io_thread.mailbox.push(.{ - .write_small = .{ - .data = data, - .len = @intCast(i), - }, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_small = .{ + .data = data, + .len = @intCast(i), + } }, .locked); }, .sgr => { @@ -2043,12 +2027,10 @@ fn mouseReport( }); // Ask our IO thread to write the data - _ = self.io_thread.mailbox.push(.{ - .write_small = .{ - .data = data, - .len = @intCast(resp.len), - }, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_small = .{ + .data = data, + .len = @intCast(resp.len), + } }, .locked); }, .urxvt => { @@ -2062,12 +2044,10 @@ fn mouseReport( }); // Ask our IO thread to write the data - _ = self.io_thread.mailbox.push(.{ - .write_small = .{ - .data = data, - .len = @intCast(resp.len), - }, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_small = .{ + .data = data, + .len = @intCast(resp.len), + } }, .locked); }, .sgr_pixels => { @@ -2085,17 +2065,12 @@ fn mouseReport( }); // Ask our IO thread to write the data - _ = self.io_thread.mailbox.push(.{ - .write_small = .{ - .data = data, - .len = @intCast(resp.len), - }, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_small = .{ + .data = data, + .len = @intCast(resp.len), + } }, .locked); }, } - - // After sending all our messages we have to notify our IO thread - try self.io_thread.wakeup.notify(); } /// Returns true if the shift modifier is allowed to be captured by modifier @@ -2496,9 +2471,7 @@ fn clickMoveCursor(self: *Surface, to: terminal.Pin) !void { break :arrow if (t.modes.get(.cursor_keys)) "\x1bOB" else "\x1b[B"; }; for (0..@abs(path.y)) |_| { - _ = self.io_thread.mailbox.push(.{ - .write_stable = arrow, - }, .{ .instant = {} }); + self.io.queueMessage(.{ .write_stable = arrow }, .locked); } } if (path.x != 0) { @@ -2508,13 +2481,9 @@ fn clickMoveCursor(self: *Surface, to: terminal.Pin) !void { break :arrow if (t.modes.get(.cursor_keys)) "\x1bOC" else "\x1b[C"; }; for (0..@abs(path.x)) |_| { - _ = self.io_thread.mailbox.push(.{ - .write_stable = arrow, - }, .{ .instant = {} }); + self.io.queueMessage(.{ .write_stable = arrow }, .locked); } } - - try self.io_thread.wakeup.notify(); } /// Returns the link at the given cursor position, if any. @@ -3188,11 +3157,10 @@ pub fn performBindingAction(self: *Surface, action: input.Binding.Action) !bool .esc => try std.fmt.bufPrint(&buf, "\x1b{s}", .{data}), else => unreachable, }; - _ = self.io_thread.mailbox.push(try termio.Message.writeReq( + self.io.queueMessage(try termio.Message.writeReq( self.alloc, full_data, - ), .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + ), .unlocked); // CSI/ESC triggers a scroll. { @@ -3216,11 +3184,10 @@ pub fn performBindingAction(self: *Surface, action: input.Binding.Action) !bool ); return true; }; - _ = self.io_thread.mailbox.push(try termio.Message.writeReq( + self.io.queueMessage(try termio.Message.writeReq( self.alloc, text, - ), .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + ), .unlocked); // Text triggers a scroll. { @@ -3250,16 +3217,10 @@ pub fn performBindingAction(self: *Surface, action: input.Binding.Action) !bool }; if (normal) { - _ = self.io_thread.mailbox.push(.{ - .write_stable = ck.normal, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_stable = ck.normal }, .unlocked); } else { - _ = self.io_thread.mailbox.push(.{ - .write_stable = ck.application, - }, .{ .forever = {} }); + self.io.queueMessage(.{ .write_stable = ck.application }, .unlocked); } - - try self.io_thread.wakeup.notify(); }, .reset => { @@ -3341,63 +3302,55 @@ pub fn performBindingAction(self: *Surface, action: input.Binding.Action) !bool if (self.io.terminal.active_screen == .alternate) return false; } - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .clear_screen = .{ .history = true }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .scroll_to_top => { - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .scroll_viewport = .{ .top = {} }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .scroll_to_bottom => { - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .scroll_viewport = .{ .bottom = {} }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .scroll_page_up => { const rows: isize = @intCast(self.grid_size.rows); - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .scroll_viewport = .{ .delta = -1 * rows }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .scroll_page_down => { const rows: isize = @intCast(self.grid_size.rows); - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .scroll_viewport = .{ .delta = rows }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .scroll_page_fractional => |fraction| { const rows: f32 = @floatFromInt(self.grid_size.rows); const delta: isize = @intFromFloat(@floor(fraction * rows)); - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .scroll_viewport = .{ .delta = delta }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .scroll_page_lines => |lines| { - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .scroll_viewport = .{ .delta = lines }, - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .jump_to_prompt => |delta| { - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .jump_to_prompt = @intCast(delta), - }, .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + }, .unlocked); }, .write_scrollback_file => write_scrollback_file: { @@ -3441,11 +3394,10 @@ pub fn performBindingAction(self: *Surface, action: input.Binding.Action) !bool var path_buf: [std.fs.MAX_PATH_BYTES]u8 = undefined; const path = try tmp_dir.dir.realpath("scrollback", &path_buf); - _ = self.io_thread.mailbox.push(try termio.Message.writeReq( + self.io.queueMessage(try termio.Message.writeReq( self.alloc, path, - ), .{ .forever = {} }); - try self.io_thread.wakeup.notify(); + ), .unlocked); }, .new_window => try self.app.newWindow(self.rt_app, .{ .parent = self }), @@ -3700,16 +3652,16 @@ fn completeClipboardPaste( if (critical.bracketed) { // If we're bracketd we write the data as-is to the terminal with // the bracketed paste escape codes around it. - _ = self.io_thread.mailbox.push(.{ + self.io.queueMessage(.{ .write_stable = "\x1B[200~", - }, .{ .forever = {} }); - _ = self.io_thread.mailbox.push(try termio.Message.writeReq( + }, .unlocked); + self.io.queueMessage(try termio.Message.writeReq( self.alloc, data, - ), .{ .forever = {} }); - _ = self.io_thread.mailbox.push(.{ + ), .unlocked); + self.io.queueMessage(.{ .write_stable = "\x1B[201~", - }, .{ .forever = {} }); + }, .unlocked); } else { // If its not bracketed the input bytes are indistinguishable from // keystrokes, so we must be careful. For example, we must replace @@ -3736,13 +3688,11 @@ fn completeClipboardPaste( len += 1; } - _ = self.io_thread.mailbox.push(try termio.Message.writeReq( + self.io.queueMessage(try termio.Message.writeReq( self.alloc, buf[0..len], - ), .{ .forever = {} }); + ), .unlocked); } - - try self.io_thread.wakeup.notify(); } fn completeClipboardReadOSC52( @@ -3784,11 +3734,10 @@ fn completeClipboardReadOSC52( const encoded = enc.encode(buf[prefix.len..], data); assert(encoded.len == size); - _ = self.io_thread.mailbox.push(try termio.Message.writeReq( + self.io.queueMessage(try termio.Message.writeReq( self.alloc, buf, - ), .{ .forever = {} }); - self.io_thread.wakeup.notify() catch {}; + ), .unlocked); } fn showDesktopNotification(self: *Surface, title: [:0]const u8, body: [:0]const u8) !void { diff --git a/src/termio.zig b/src/termio.zig index d868dfd6d..4fce4df18 100644 --- a/src/termio.zig +++ b/src/termio.zig @@ -6,13 +6,15 @@ const stream_handler = @import("termio/stream_handler.zig"); pub usingnamespace @import("termio/message.zig"); pub const reader = @import("termio/reader.zig"); +pub const writer = @import("termio/writer.zig"); pub const Exec = @import("termio/Exec.zig"); pub const Options = @import("termio/Options.zig"); pub const Termio = @import("termio/Termio.zig"); pub const Thread = @import("termio/Thread.zig"); pub const DerivedConfig = Termio.DerivedConfig; -pub const Mailbox = Thread.Mailbox; +pub const Mailbox = writer.Mailbox; pub const StreamHandler = stream_handler.StreamHandler; +pub const Writer = writer.Writer; test { @import("std").testing.refAllDecls(@This()); diff --git a/src/termio/Exec.zig b/src/termio/Exec.zig index 199eb8a9e..91f44a69b 100644 --- a/src/termio/Exec.zig +++ b/src/termio/Exec.zig @@ -308,13 +308,13 @@ fn processExit( // Notify our main writer thread which has access to more // information so it can show a better error message. - _ = td.writer_mailbox.push(.{ + td.writer.send(.{ .child_exited_abnormally = .{ .exit_code = exit_code, .runtime_ms = runtime, }, - }, .{ .forever = {} }); - td.writer_wakeup.notify() catch break :runtime; + }, null); + td.writer.notify(); return .disarm; } diff --git a/src/termio/Options.zig b/src/termio/Options.zig index ac12c31eb..2cb636d60 100644 --- a/src/termio/Options.zig +++ b/src/termio/Options.zig @@ -25,6 +25,10 @@ full_config: *const Config, /// The derived configuration for this termio implementation. config: termio.Termio.DerivedConfig, +/// The writer for the terminal. This is how messages are delivered. +/// If you're using termio.Thread this MUST be "mailbox". +writer: termio.Writer, + /// The application resources directory. resources_dir: ?[]const u8, diff --git a/src/termio/Termio.zig b/src/termio/Termio.zig index 5eebb3c69..a329e2880 100644 --- a/src/termio/Termio.zig +++ b/src/termio/Termio.zig @@ -60,6 +60,9 @@ surface_mailbox: apprt.surface.Mailbox, /// The cached grid size whenever a resize is called. grid_size: renderer.GridSize, +/// The writer implementation to use. +writer: termio.Writer, + /// The pointer to the read data. This is only valid while the termio thread /// is alive. This is protected by the renderer state lock. read_data: ?*ReadData = null, @@ -176,6 +179,7 @@ pub fn init(alloc: Allocator, opts: termio.Options) !Termio { .renderer_mailbox = opts.renderer_mailbox, .surface_mailbox = opts.surface_mailbox, .grid_size = opts.grid_size, + .writer = opts.writer, }; } @@ -183,6 +187,7 @@ pub fn deinit(self: *Termio) void { self.subprocess.deinit(); self.terminal.deinit(self.alloc); self.config.deinit(); + self.writer.deinit(self.alloc); } pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !void { @@ -205,8 +210,7 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo break :handler .{ .alloc = self.alloc, - .writer_mailbox = thread.mailbox, - .writer_wakeup = thread.wakeup, + .writer = &self.writer, .surface_mailbox = self.surface_mailbox, .renderer_state = self.renderer_state, .renderer_wakeup = self.renderer_wakeup, @@ -250,9 +254,8 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo .loop = &thread.loop, .renderer_state = self.renderer_state, .surface_mailbox = self.surface_mailbox, - .writer_mailbox = thread.mailbox, - .writer_wakeup = thread.wakeup, .read_data = read_data_ptr, + .writer = &self.writer, // Placeholder until setup below .reader = .{ .manual = {} }, @@ -276,6 +279,40 @@ pub fn threadExit(self: *Termio, data: *ThreadData) void { self.read_data = null; } +/// Send a message using the writer. Depending on the writer type in +/// use this may process now or it may just enqueue and process later. +/// +/// This will also notify the writer thread to process the message. If +/// you're sending a lot of messages, it may be more efficient to use +/// the writer directly and then call notify separately. +pub fn queueMessage( + self: *Termio, + msg: termio.Message, + mutex: enum { locked, unlocked }, +) void { + self.writer.send(msg, switch (mutex) { + .locked => self.renderer_state.mutex, + .unlocked => null, + }); + self.writer.notify(); +} + +/// Queue a write directly to the pty. +/// +/// If you're using termio.Thread, this must ONLY be called from the +/// writer thread. If you're not on the thread, use queueMessage with +/// writer messages instead. +/// +/// If you're not using termio.Thread, this is not threadsafe. +pub inline fn queueWrite( + self: *Termio, + td: *ThreadData, + data: []const u8, + linefeed: bool, +) !void { + try self.subprocess.queueWrite(self.alloc, td, data, linefeed); +} + /// Update the configuration. pub fn changeConfig(self: *Termio, td: *ThreadData, config: *DerivedConfig) !void { // The remainder of this function is modifying terminal state or @@ -442,15 +479,6 @@ pub fn childExitedAbnormally(self: *Termio, exit_code: u32, runtime_ms: u64) !vo try self.subprocess.childExitedAbnormally(self.alloc, t, exit_code, runtime_ms); } -pub inline fn queueWrite( - self: *Termio, - td: *ThreadData, - data: []const u8, - linefeed: bool, -) !void { - try self.subprocess.queueWrite(self.alloc, td, data, linefeed); -} - /// Process output from the pty. This is the manual API that users can /// call with pty data but it is also called by the read thread when using /// an exec subprocess. @@ -544,12 +572,11 @@ pub const ThreadData = struct { /// Mailboxes for different threads surface_mailbox: apprt.surface.Mailbox, - writer_mailbox: *termio.Mailbox, - writer_wakeup: xev.Async, /// Data associated with the reader implementation (i.e. pty/exec state) reader: termio.reader.ThreadData, read_data: *ReadData, + writer: *termio.Writer, pub fn deinit(self: *ThreadData) void { self.reader.deinit(self.alloc); diff --git a/src/termio/Thread.zig b/src/termio/Thread.zig index 44d851998..f24fcf0df 100644 --- a/src/termio/Thread.zig +++ b/src/termio/Thread.zig @@ -21,11 +21,6 @@ const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue; const Allocator = std.mem.Allocator; const log = std.log.scoped(.io_thread); -/// The type used for sending messages to the IO thread. For now this is -/// hardcoded with a capacity. We can make this a comptime parameter in -/// the future if we want it configurable. -pub const Mailbox = BlockingQueue(termio.Message, 64); - /// This stores the information that is coalesced. const Coalesce = struct { /// The number of milliseconds to coalesce certain messages like resize for. @@ -47,8 +42,8 @@ alloc: std.mem.Allocator, /// so that users of the loop always have an allocator. loop: xev.Loop, -/// This can be used to wake up the thread. -wakeup: xev.Async, +/// The completion to use for the wakeup async handle that is present +/// on the termio.Writer. wakeup_c: xev.Completion = .{}, /// This can be used to stop the thread on the next loop iteration. @@ -67,10 +62,6 @@ sync_reset: xev.Timer, sync_reset_c: xev.Completion = .{}, sync_reset_cancel_c: xev.Completion = .{}, -/// The mailbox that can be used to send this thread messages. Note -/// this is a blocking queue so if it is full you will get errors (or block). -mailbox: *Mailbox, - flags: packed struct { /// This is set to true only when an abnormal exit is detected. It /// tells our mailbox system to drain and ignore all messages. @@ -94,10 +85,6 @@ pub fn init( var loop = try xev.Loop.init(.{}); errdefer loop.deinit(); - // This async handle is used to "wake up" the renderer and force a render. - var wakeup_h = try xev.Async.init(); - errdefer wakeup_h.deinit(); - // This async handle is used to stop the loop and force the thread to end. var stop_h = try xev.Async.init(); errdefer stop_h.deinit(); @@ -110,18 +97,12 @@ pub fn init( var sync_reset_h = try xev.Timer.init(); errdefer sync_reset_h.deinit(); - // The mailbox for messaging this thread - var mailbox = try Mailbox.create(alloc); - errdefer mailbox.destroy(alloc); - return Thread{ .alloc = alloc, .loop = loop, - .wakeup = wakeup_h, .stop = stop_h, .coalesce = coalesce_h, .sync_reset = sync_reset_h, - .mailbox = mailbox, }; } @@ -131,11 +112,7 @@ pub fn deinit(self: *Thread) void { self.coalesce.deinit(); self.sync_reset.deinit(); self.stop.deinit(); - self.wakeup.deinit(); self.loop.deinit(); - - // Nothing can possibly access the mailbox anymore, destroy it. - self.mailbox.destroy(self.alloc); } /// The main entrypoint for the thread. @@ -223,6 +200,12 @@ pub fn threadMain(self: *Thread, io: *termio.Termio) void { fn threadMain_(self: *Thread, io: *termio.Termio) !void { defer log.debug("IO thread exited", .{}); + // Get the writer. This must be a mailbox writer for threading. + const writer = switch (io.writer) { + .mailbox => |v| v, + // else => return error.TermioUnsupportedWriter, + }; + // This is the data sent to xev callbacks. We want a pointer to both // ourselves and the thread data so we can thread that through (pun intended). var cb: CallbackData = .{ .self = self, .io = io }; @@ -236,7 +219,7 @@ fn threadMain_(self: *Thread, io: *termio.Termio) !void { defer io.threadExit(&cb.data); // Start the async handlers. - self.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback); + writer.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback); self.stop.wait(&self.loop, &self.stop_c, CallbackData, &cb, stopCallback); // Run @@ -257,20 +240,22 @@ fn drainMailbox( self: *Thread, cb: *CallbackData, ) !void { - // If we're draining, we just drain the mailbox and return. - if (self.flags.drain) { - while (self.mailbox.pop()) |_| {} - return; - } - + // We assert when starting the thread that this is the state + const mailbox = cb.io.writer.mailbox.mailbox; const io = cb.io; const data = &cb.data; + // If we're draining, we just drain the mailbox and return. + if (self.flags.drain) { + while (mailbox.pop()) |_| {} + return; + } + // This holds the mailbox lock for the duration of the drain. The // expectation is that all our message handlers will be non-blocking // ENOUGH to not mess up throughput on producers. var redraw: bool = false; - while (self.mailbox.pop()) |message| { + while (mailbox.pop()) |message| { // If we have a message we always redraw redraw = true; diff --git a/src/termio/stream_handler.zig b/src/termio/stream_handler.zig index 14081746d..9a047bcfc 100644 --- a/src/termio/stream_handler.zig +++ b/src/termio/stream_handler.zig @@ -28,8 +28,7 @@ pub const StreamHandler = struct { terminal: *terminal.Terminal, /// Mailbox for data to the writer thread. - writer_mailbox: *termio.Mailbox, - writer_wakeup: xev.Async, + writer: *termio.Writer, /// Mailbox for the surface. surface_mailbox: apprt.surface.Mailbox, @@ -141,34 +140,7 @@ pub const StreamHandler = struct { } inline fn messageWriter(self: *StreamHandler, msg: termio.Message) void { - // Try to write to the mailbox with an instant timeout. This is the - // fast path because we can queue without a lock. - if (self.writer_mailbox.push(msg, .{ .instant = {} }) == 0) { - // If we enter this conditional, the mailbox is full. We wake up - // the writer thread so that it can process messages to clear up - // space. However, the writer thread may require the renderer - // lock so we need to unlock. - self.writer_wakeup.notify() catch |err| { - log.warn("failed to wake up writer, data will be dropped err={}", .{err}); - return; - }; - - // Unlock the renderer state so the writer thread can acquire it. - // Then try to queue our message before continuing. This is a very - // slow path because we are having a lot of contention for data. - // But this only gets triggered in certain pathological cases. - // - // Note that writes themselves don't require a lock, but there - // are other messages in the writer mailbox (resize, focus) that - // could acquire the lock. This is why we have to release our lock - // here. - self.renderer_state.mutex.unlock(); - defer self.renderer_state.mutex.lock(); - _ = self.writer_mailbox.push(msg, .{ .forever = {} }); - } - - // Normally, we just flag this true to wake up the writer thread - // once per batch of data. + self.writer.send(msg, self.renderer_state.mutex); self.writer_messaged = true; } diff --git a/src/termio/writer.zig b/src/termio/writer.zig new file mode 100644 index 000000000..a82169230 --- /dev/null +++ b/src/termio/writer.zig @@ -0,0 +1,108 @@ +const std = @import("std"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const Allocator = std.mem.Allocator; +const xev = @import("xev"); +const renderer = @import("../renderer.zig"); +const termio = @import("../termio.zig"); +const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue; + +const log = std.log.scoped(.io_writer); + +/// A mailbox used for storing messages that is periodically drained. +/// Typically used by a multi-threaded application. The capacity is +/// hardcoded to a value that empirically has made sense for Ghostty usage +/// but I'm open to changing it with good arguments. +pub const Mailbox = BlockingQueue(termio.Message, 64); + +/// The location to where write-related messages are sent. +pub const Writer = union(enum) { + // /// Write messages to an unbounded list backed by an allocator. + // /// This is useful for single-threaded applications where you're not + // /// afraid of running out of memory. You should be careful that you're + // /// processing this in a timely manner though since some heavy workloads + // /// will produce a LOT of messages. + // /// + // /// At the time of authoring this, the primary use case for this is + // /// testing more than anything, but it probably will have a use case + // /// in libghostty eventually. + // unbounded: std.ArrayList(termio.Message), + + /// Write messages to a SPSC mailbox for multi-threaded applications. + mailbox: struct { + mailbox: *Mailbox, + wakeup: xev.Async, + }, + + /// Init the mailbox writer. + pub fn initMailbox(alloc: Allocator) !Writer { + var mailbox = try Mailbox.create(alloc); + errdefer mailbox.destroy(alloc); + + var wakeup = try xev.Async.init(); + errdefer wakeup.deinit(); + + return .{ .mailbox = .{ .mailbox = mailbox, .wakeup = wakeup } }; + } + + pub fn deinit(self: *Writer, alloc: Allocator) void { + switch (self.*) { + .mailbox => |*v| { + v.mailbox.destroy(alloc); + v.wakeup.deinit(); + }, + } + } + + /// Sends the given message without notifying there are messages. + /// + /// If the optional mutex is given, it must already be LOCKED. If the + /// send would block, we'll unlock this mutex, resend the message, and + /// lock it again. This handles an edge case where mailboxes are full. + /// This may not apply to all writer types. + pub fn send( + self: *Writer, + msg: termio.Message, + mutex: ?*std.Thread.Mutex, + ) void { + switch (self.*) { + .mailbox => |mb| send: { + // Try to write to the mailbox with an instant timeout. This is the + // fast path because we can queue without a lock. + if (mb.mailbox.push(msg, .{ .instant = {} }) > 0) break :send; + + // If we enter this conditional, the mailbox is full. We wake up + // the writer thread so that it can process messages to clear up + // space. However, the writer thread may require the renderer + // lock so we need to unlock. + mb.wakeup.notify() catch |err| { + log.warn("failed to wake up writer, data will be dropped err={}", .{err}); + return; + }; + + // Unlock the renderer state so the writer thread can acquire it. + // Then try to queue our message before continuing. This is a very + // slow path because we are having a lot of contention for data. + // But this only gets triggered in certain pathological cases. + // + // Note that writes themselves don't require a lock, but there + // are other messages in the writer mailbox (resize, focus) that + // could acquire the lock. This is why we have to release our lock + // here. + if (mutex) |m| m.unlock(); + defer if (mutex) |m| m.lock(); + _ = mb.mailbox.push(msg, .{ .forever = {} }); + }, + } + } + + /// Notify that there are new messages. This may be a noop depending + /// on the writer type. + pub fn notify(self: *Writer) void { + switch (self.*) { + .mailbox => |v| v.wakeup.notify() catch |err| { + log.warn("failed to notify writer, data will be dropped err={}", .{err}); + }, + } + } +};