termio: writer => mailbox

This commit is contained in:
Mitchell Hashimoto
2024-07-15 10:23:09 -07:00
parent 001a6d2624
commit 835d622baa
8 changed files with 65 additions and 66 deletions

View File

@ -450,9 +450,9 @@ pub fn init(
});
errdefer io_exec.deinit();
// Initialize our IO writer
var io_writer = try termio.Writer.initMailbox(alloc);
errdefer io_writer.deinit(alloc);
// Initialize our IO mailbox
var io_mailbox = try termio.Mailbox.initSPSC(alloc);
errdefer io_mailbox.deinit(alloc);
try termio.Termio.init(&self.io, alloc, .{
.grid_size = grid_size,
@ -461,7 +461,7 @@ pub fn init(
.full_config = config,
.config = try termio.Termio.DerivedConfig.init(alloc, config),
.backend = .{ .exec = io_exec },
.writer = io_writer,
.mailbox = io_mailbox,
.renderer_state = &self.renderer_state,
.renderer_wakeup = render_thread.wakeup,
.renderer_mailbox = render_thread.mailbox,

View File

@ -6,16 +6,15 @@ const stream_handler = @import("termio/stream_handler.zig");
pub usingnamespace @import("termio/message.zig");
pub const backend = @import("termio/backend.zig");
pub const writer = @import("termio/writer.zig");
pub const mailbox = @import("termio/mailbox.zig");
pub const Exec = @import("termio/Exec.zig");
pub const Options = @import("termio/Options.zig");
pub const Termio = @import("termio/Termio.zig");
pub const Thread = @import("termio/Thread.zig");
pub const Backend = backend.Backend;
pub const DerivedConfig = Termio.DerivedConfig;
pub const Mailbox = writer.Mailbox;
pub const Mailbox = mailbox.Mailbox;
pub const StreamHandler = stream_handler.StreamHandler;
pub const Writer = writer.Writer;
test {
@import("std").testing.refAllDecls(@This());

View File

@ -319,13 +319,13 @@ fn processExit(
// Notify our main writer thread which has access to more
// information so it can show a better error message.
td.writer.send(.{
td.mailbox.send(.{
.child_exited_abnormally = .{
.exit_code = exit_code,
.runtime_ms = runtime,
},
}, null);
td.writer.notify();
td.mailbox.notify();
return .disarm;
}

View File

@ -28,9 +28,9 @@ config: termio.Termio.DerivedConfig,
/// The backend for termio that implements where reads/writes are sourced.
backend: termio.Backend,
/// The writer for the terminal. This is how messages are delivered.
/// The mailbox for the terminal. This is how messages are delivered.
/// If you're using termio.Thread this MUST be "mailbox".
writer: termio.Writer,
mailbox: termio.Mailbox,
/// The render state. The IO implementation can modify anything here. The
/// surface thread will setup the initial "terminal" pointer but the IO impl

View File

@ -60,8 +60,8 @@ surface_mailbox: apprt.surface.Mailbox,
/// The cached grid size whenever a resize is called.
grid_size: renderer.GridSize,
/// The writer implementation to use.
writer: termio.Writer,
/// The mailbox implementation to use.
mailbox: termio.Mailbox,
/// The stream parser. This parses the stream of escape codes and so on
/// from the child process and calls callbacks in the stream handler.
@ -187,7 +187,7 @@ pub fn init(self: *Termio, alloc: Allocator, opts: termio.Options) !void {
break :handler .{
.alloc = alloc,
.writer = &self.writer,
.termio_mailbox = &self.mailbox,
.surface_mailbox = opts.surface_mailbox,
.renderer_state = opts.renderer_state,
.renderer_wakeup = opts.renderer_wakeup,
@ -217,7 +217,7 @@ pub fn init(self: *Termio, alloc: Allocator, opts: termio.Options) !void {
.surface_mailbox = opts.surface_mailbox,
.grid_size = opts.grid_size,
.backend = opts.backend,
.writer = opts.writer,
.mailbox = opts.mailbox,
.terminal_stream = .{
.handler = handler,
.parser = .{
@ -235,7 +235,7 @@ pub fn deinit(self: *Termio) void {
self.backend.deinit();
self.terminal.deinit(self.alloc);
self.config.deinit();
self.writer.deinit(self.alloc);
self.mailbox.deinit(self.alloc);
// Clear any StreamHandler state
self.terminal_stream.handler.deinit();
@ -255,7 +255,7 @@ pub fn threadEnter(self: *Termio, thread: *termio.Thread, data: *ThreadData) !vo
.loop = &thread.loop,
.renderer_state = self.renderer_state,
.surface_mailbox = self.surface_mailbox,
.writer = &self.writer,
.mailbox = &self.mailbox,
// Placeholder until setup below
.backend = .{ .manual = {} },
@ -269,29 +269,29 @@ pub fn threadExit(self: *Termio, data: *ThreadData) void {
self.backend.threadExit(data);
}
/// Send a message using the writer. Depending on the writer type in
/// Send a message to the the mailbox. Depending on the mailbox type in
/// use this may process now or it may just enqueue and process later.
///
/// This will also notify the writer thread to process the message. If
/// This will also notify the mailbox thread to process the message. If
/// you're sending a lot of messages, it may be more efficient to use
/// the writer directly and then call notify separately.
/// the mailbox directly and then call notify separately.
pub fn queueMessage(
self: *Termio,
msg: termio.Message,
mutex: enum { locked, unlocked },
) void {
self.writer.send(msg, switch (mutex) {
self.mailbox.send(msg, switch (mutex) {
.locked => self.renderer_state.mutex,
.unlocked => null,
});
self.writer.notify();
self.mailbox.notify();
}
/// Queue a write directly to the pty.
///
/// If you're using termio.Thread, this must ONLY be called from the
/// writer thread. If you're not on the thread, use queueMessage with
/// writer messages instead.
/// mailbox thread. If you're not on the thread, use queueMessage with
/// mailbox messages instead.
///
/// If you're not using termio.Thread, this is not threadsafe.
pub inline fn queueWrite(
@ -522,11 +522,11 @@ fn processOutputLocked(self: *Termio, buf: []const u8) void {
log.err("error processing terminal data: {}", .{err});
}
// If our stream handling caused messages to be sent to the writer
// If our stream handling caused messages to be sent to the mailbox
// thread, then we need to wake it up so that it processes them.
if (self.terminal_stream.handler.writer_messaged) {
self.terminal_stream.handler.writer_messaged = false;
self.writer.notify();
if (self.terminal_stream.handler.termio_messaged) {
self.terminal_stream.handler.termio_messaged = false;
self.mailbox.notify();
}
}
@ -552,7 +552,7 @@ pub const ThreadData = struct {
/// Data associated with the backend implementation (i.e. pty/exec state)
backend: termio.backend.ThreadData,
writer: *termio.Writer,
mailbox: *termio.Mailbox,
pub fn deinit(self: *ThreadData) void {
self.backend.deinit(self.alloc);

View File

@ -200,10 +200,10 @@ pub fn threadMain(self: *Thread, io: *termio.Termio) void {
fn threadMain_(self: *Thread, io: *termio.Termio) !void {
defer log.debug("IO thread exited", .{});
// Get the writer. This must be a mailbox writer for threading.
const writer = switch (io.writer) {
.mailbox => |*v| v,
// else => return error.TermioUnsupportedWriter,
// Get the mailbox. This must be an SPSC mailbox for threading.
const mailbox = switch (io.mailbox) {
.spsc => |*v| v,
// else => return error.TermioUnsupportedMailbox,
};
// This is the data sent to xev callbacks. We want a pointer to both
@ -219,7 +219,7 @@ fn threadMain_(self: *Thread, io: *termio.Termio) !void {
defer io.threadExit(&cb.data);
// Start the async handlers.
writer.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback);
mailbox.wakeup.wait(&self.loop, &self.wakeup_c, CallbackData, &cb, wakeupCallback);
self.stop.wait(&self.loop, &self.stop_c, CallbackData, &cb, stopCallback);
// Run
@ -241,7 +241,7 @@ fn drainMailbox(
cb: *CallbackData,
) !void {
// We assert when starting the thread that this is the state
const mailbox = cb.io.writer.mailbox.mailbox;
const mailbox = cb.io.mailbox.spsc.queue;
const io = cb.io;
const data = &cb.data;

View File

@ -9,14 +9,14 @@ const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue;
const log = std.log.scoped(.io_writer);
/// A mailbox used for storing messages that is periodically drained.
/// A queue used for storing messages that is periodically drained.
/// Typically used by a multi-threaded application. The capacity is
/// hardcoded to a value that empirically has made sense for Ghostty usage
/// but I'm open to changing it with good arguments.
pub const Mailbox = BlockingQueue(termio.Message, 64);
const Queue = BlockingQueue(termio.Message, 64);
/// The location to where write-related messages are sent.
pub const Writer = union(enum) {
pub const Mailbox = union(enum) {
// /// Write messages to an unbounded list backed by an allocator.
// /// This is useful for single-threaded applications where you're not
// /// afraid of running out of memory. You should be careful that you're
@ -28,27 +28,27 @@ pub const Writer = union(enum) {
// /// in libghostty eventually.
// unbounded: std.ArrayList(termio.Message),
/// Write messages to a SPSC mailbox for multi-threaded applications.
mailbox: struct {
mailbox: *Mailbox,
/// Write messages to a SPSC queue for multi-threaded applications.
spsc: struct {
queue: *Queue,
wakeup: xev.Async,
},
/// Init the mailbox writer.
pub fn initMailbox(alloc: Allocator) !Writer {
var mailbox = try Mailbox.create(alloc);
errdefer mailbox.destroy(alloc);
/// Init the SPSC writer.
pub fn initSPSC(alloc: Allocator) !Mailbox {
var queue = try Queue.create(alloc);
errdefer queue.destroy(alloc);
var wakeup = try xev.Async.init();
errdefer wakeup.deinit();
return .{ .mailbox = .{ .mailbox = mailbox, .wakeup = wakeup } };
return .{ .spsc = .{ .queue = queue, .wakeup = wakeup } };
}
pub fn deinit(self: *Writer, alloc: Allocator) void {
pub fn deinit(self: *Mailbox, alloc: Allocator) void {
switch (self.*) {
.mailbox => |*v| {
v.mailbox.destroy(alloc);
.spsc => |*v| {
v.queue.destroy(alloc);
v.wakeup.deinit();
},
}
@ -58,20 +58,20 @@ pub const Writer = union(enum) {
///
/// If the optional mutex is given, it must already be LOCKED. If the
/// send would block, we'll unlock this mutex, resend the message, and
/// lock it again. This handles an edge case where mailboxes are full.
/// lock it again. This handles an edge case where queues are full.
/// This may not apply to all writer types.
pub fn send(
self: *Writer,
self: *Mailbox,
msg: termio.Message,
mutex: ?*std.Thread.Mutex,
) void {
switch (self.*) {
.mailbox => |*mb| send: {
// Try to write to the mailbox with an instant timeout. This is the
.spsc => |*mb| send: {
// Try to write to the queue with an instant timeout. This is the
// fast path because we can queue without a lock.
if (mb.mailbox.push(msg, .{ .instant = {} }) > 0) break :send;
if (mb.queue.push(msg, .{ .instant = {} }) > 0) break :send;
// If we enter this conditional, the mailbox is full. We wake up
// If we enter this conditional, the queue is full. We wake up
// the writer thread so that it can process messages to clear up
// space. However, the writer thread may require the renderer
// lock so we need to unlock.
@ -86,21 +86,21 @@ pub const Writer = union(enum) {
// But this only gets triggered in certain pathological cases.
//
// Note that writes themselves don't require a lock, but there
// are other messages in the writer mailbox (resize, focus) that
// are other messages in the writer queue (resize, focus) that
// could acquire the lock. This is why we have to release our lock
// here.
if (mutex) |m| m.unlock();
defer if (mutex) |m| m.lock();
_ = mb.mailbox.push(msg, .{ .forever = {} });
_ = mb.queue.push(msg, .{ .forever = {} });
},
}
}
/// Notify that there are new messages. This may be a noop depending
/// on the writer type.
pub fn notify(self: *Writer) void {
pub fn notify(self: *Mailbox) void {
switch (self.*) {
.mailbox => |*v| v.wakeup.notify() catch |err| {
.spsc => |*v| v.wakeup.notify() catch |err| {
log.warn("failed to notify writer, data will be dropped err={}", .{err});
},
}

View File

@ -27,8 +27,8 @@ pub const StreamHandler = struct {
grid_size: *renderer.GridSize,
terminal: *terminal.Terminal,
/// Mailbox for data to the writer thread.
writer: *termio.Writer,
/// Mailbox for data to the termio thread.
termio_mailbox: *termio.Mailbox,
/// Mailbox for the surface.
surface_mailbox: apprt.surface.Mailbox,
@ -86,10 +86,10 @@ pub const StreamHandler = struct {
/// such as XTGETTCAP.
dcs: terminal.dcs.Handler = .{},
/// This is set to true when a message was written to the writer
/// This is set to true when a message was written to the termio
/// mailbox. This can be used by callers to determine if they need
/// to wake up the writer.
writer_messaged: bool = false,
/// to wake up the termio thread.
termio_messaged: bool = false,
/// This is set to true when we've seen a title escape sequence. We use
/// this to determine if we need to default the window title.
@ -140,8 +140,8 @@ pub const StreamHandler = struct {
}
inline fn messageWriter(self: *StreamHandler, msg: termio.Message) void {
self.writer.send(msg, self.renderer_state.mutex);
self.writer_messaged = true;
self.termio_mailbox.send(msg, self.renderer_state.mutex);
self.termio_messaged = true;
}
pub fn dcsHook(self: *StreamHandler, dcs: terminal.DCS) !void {