termio: reader thread is thread-safe for writing to writer

This commit is contained in:
Mitchell Hashimoto
2023-02-06 14:52:24 -08:00
parent 18f20add34
commit 11d6e91228
6 changed files with 168 additions and 161 deletions

View File

@ -55,25 +55,24 @@ pub fn BlockingQueue(
}; };
/// Our data. The values are undefined until they are written. /// Our data. The values are undefined until they are written.
data: [bounds]T, data: [bounds]T = undefined,
/// The next location to write (next empty loc) and next location /// The next location to write (next empty loc) and next location
/// to read (next non-empty loc). The number of written elements. /// to read (next non-empty loc). The number of written elements.
write: Size, write: Size = 0,
read: Size, read: Size = 0,
len: Size, len: Size = 0,
/// The big mutex that must be held to read/write. /// The big mutex that must be held to read/write.
mutex: std.Thread.Mutex, mutex: std.Thread.Mutex = .{},
/// A CV for being notified when the queue is no longer full. This is /// A CV for being notified when the queue is no longer full. This is
/// used for writing. Note we DON'T have a CV for waiting on the /// used for writing. Note we DON'T have a CV for waiting on the
/// queue not being EMPTY because we use external notifiers for that. /// queue not being EMPTY because we use external notifiers for that.
cond_not_full: std.Thread.Condition, cond_not_full: std.Thread.Condition = .{},
not_full_waiters: usize, not_full_waiters: usize = 0,
/// Allocate the blocking queue. Allocation must always happen on /// Allocate the blocking queue on the heap.
/// the heap due to shared concurrency state.
pub fn create(alloc: Allocator) !*Self { pub fn create(alloc: Allocator) !*Self {
const ptr = try alloc.create(Self); const ptr = try alloc.create(Self);
errdefer alloc.destroy(ptr); errdefer alloc.destroy(ptr);

View File

@ -7,6 +7,7 @@ const freetype = @import("freetype");
const harfbuzz = @import("harfbuzz"); const harfbuzz = @import("harfbuzz");
const macos = @import("macos"); const macos = @import("macos");
const tracy = @import("tracy"); const tracy = @import("tracy");
const xev = @import("xev");
const renderer = @import("renderer.zig"); const renderer = @import("renderer.zig");
const xdg = @import("xdg.zig"); const xdg = @import("xdg.zig");
const internal_os = @import("os/main.zig"); const internal_os = @import("os/main.zig");
@ -22,6 +23,7 @@ pub fn main() !void {
std.log.info("dependency fontconfig={d}", .{fontconfig.version()}); std.log.info("dependency fontconfig={d}", .{fontconfig.version()});
} }
std.log.info("renderer={}", .{renderer.Renderer}); std.log.info("renderer={}", .{renderer.Renderer});
std.log.info("libxev backend={}", .{xev.backend});
// First things first, we fix our file descriptors // First things first, we fix our file descriptors
internal_os.fixMaxFiles(); internal_os.fixMaxFiles();

View File

@ -6,6 +6,7 @@ pub usingnamespace @import("termio/message.zig");
pub const Exec = @import("termio/Exec.zig"); pub const Exec = @import("termio/Exec.zig");
pub const Options = @import("termio/Options.zig"); pub const Options = @import("termio/Options.zig");
pub const Thread = @import("termio/Thread.zig"); pub const Thread = @import("termio/Thread.zig");
pub const Mailbox = Thread.Mailbox;
/// The implementation to use for the IO. This is just "exec" for now but /// The implementation to use for the IO. This is just "exec" for now but
/// this is somewhat pluggable so that in the future we can introduce other /// this is somewhat pluggable so that in the future we can introduce other

View File

@ -62,7 +62,8 @@ grid_size: renderer.GridSize,
/// The data associated with the currently running thread. /// The data associated with the currently running thread.
data: ?*EventData, data: ?*EventData,
io_thread: ?std.Thread, /// The thread that is started to read the data associated with the pty.
read_thread: ?std.Thread,
/// Initialize the exec implementation. This will also start the child /// Initialize the exec implementation. This will also start the child
/// process. /// process.
@ -153,7 +154,7 @@ pub fn init(alloc: Allocator, opts: termio.Options) !Exec {
.window_mailbox = opts.window_mailbox, .window_mailbox = opts.window_mailbox,
.grid_size = opts.grid_size, .grid_size = opts.grid_size,
.data = null, .data = null,
.io_thread = null, .read_thread = null,
}; };
} }
@ -164,6 +165,9 @@ pub fn deinit(self: *Exec) void {
_ = self.command.wait(false) catch |err| _ = self.command.wait(false) catch |err|
log.err("error waiting for command to exit: {}", .{err}); log.err("error waiting for command to exit: {}", .{err});
// Wait for our reader thread to end
if (self.read_thread) |thr| thr.join();
// Clean up our other members // Clean up our other members
self.terminal.deinit(self.alloc); self.terminal.deinit(self.alloc);
} }
@ -204,7 +208,7 @@ fn killCommand(self: *Exec) !void {
} }
} }
pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData { pub fn threadEnter(self: *Exec, thread: *termio.Thread) !ThreadData {
assert(self.data == null); assert(self.data == null);
const alloc = self.alloc; const alloc = self.alloc;
@ -212,17 +216,23 @@ pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData {
var ev_data_ptr = try alloc.create(EventData); var ev_data_ptr = try alloc.create(EventData);
errdefer alloc.destroy(ev_data_ptr); errdefer alloc.destroy(ev_data_ptr);
// Read data // Setup our stream so that we can write.
var stream = xev.Stream.initFd(self.pty.master); var stream = xev.Stream.initFd(self.pty.master);
errdefer stream.deinit(); errdefer stream.deinit();
// Wakeup watcher for the writer thread.
var wakeup = try xev.Async.init();
errdefer wakeup.deinit();
// Setup our event data before we start // Setup our event data before we start
ev_data_ptr.* = .{ ev_data_ptr.* = .{
.writer_mailbox = thread.mailbox,
.writer_wakeup = thread.wakeup,
.renderer_state = self.renderer_state, .renderer_state = self.renderer_state,
.renderer_wakeup = self.renderer_wakeup, .renderer_wakeup = self.renderer_wakeup,
.renderer_mailbox = self.renderer_mailbox, .renderer_mailbox = self.renderer_mailbox,
.data_stream = stream, .data_stream = stream,
.loop = loop, .loop = &thread.loop,
.terminal_stream = .{ .terminal_stream = .{
.handler = .{ .handler = .{
.alloc = self.alloc, .alloc = self.alloc,
@ -238,22 +248,14 @@ pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData {
// Store our data so our callbacks can access it // Store our data so our callbacks can access it
self.data = ev_data_ptr; self.data = ev_data_ptr;
self.io_thread = try std.Thread.spawn( // Start our reader thread
assert(self.read_thread == null);
self.read_thread = try std.Thread.spawn(
.{}, .{},
ioMain, ReadThread.threadMain,
.{ self.pty.master, ev_data_ptr }, .{ self.pty.master, ev_data_ptr },
); );
self.io_thread.?.setName("io-reader") catch {}; self.read_thread.?.setName("io-reader") catch {};
// Start our stream read
// stream.read(
// loop,
// &ev_data_ptr.data_stream_c_read,
// .{ .slice = &ev_data_ptr.data_stream_buf },
// EventData,
// ev_data_ptr,
// ttyRead,
// );
// Return our thread data // Return our thread data
return ThreadData{ return ThreadData{
@ -262,28 +264,8 @@ pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData {
}; };
} }
/// The main entrypoint for the thread.
pub fn ioMain(fd: std.os.fd_t, ev: *EventData) void {
while (true) {
const n = std.os.read(fd, &ev.data_stream_buf) catch |err| {
log.err("READ ERROR err={}", .{err});
return;
};
_ = @call(.always_inline, ttyRead, .{
ev,
undefined,
undefined,
undefined,
.{ .slice = &ev.data_stream_buf },
n,
});
}
}
pub fn threadExit(self: *Exec, data: ThreadData) void { pub fn threadExit(self: *Exec, data: ThreadData) void {
_ = data; _ = data;
self.data = null; self.data = null;
} }
@ -318,7 +300,28 @@ pub fn resize(
} }
pub inline fn queueWrite(self: *Exec, data: []const u8) !void { pub inline fn queueWrite(self: *Exec, data: []const u8) !void {
try self.data.?.queueWrite(data); const ev = self.data.?;
// We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream.
var i: usize = 0;
while (i < data.len) {
const req = try ev.write_req_pool.get();
const buf = try ev.write_buf_pool.get();
const end = @min(data.len, i + buf.len);
fastmem.copy(u8, buf, data[i..end]);
ev.data_stream.queueWrite(
ev.loop,
&ev.write_queue,
req,
.{ .slice = buf[0..(end - i)] },
EventData,
ev,
ttyWrite,
);
i = end;
}
} }
const ThreadData = struct { const ThreadData = struct {
@ -340,6 +343,10 @@ const EventData = struct {
// enough to satisfy most write requests. It must be a power of 2. // enough to satisfy most write requests. It must be a power of 2.
const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5);
/// Mailbox for data to the writer thread.
writer_mailbox: *termio.Mailbox,
writer_wakeup: xev.Async,
/// The stream parser. This parses the stream of escape codes and so on /// The stream parser. This parses the stream of escape codes and so on
/// from the child process and calls callbacks in the stream handler. /// from the child process and calls callbacks in the stream handler.
terminal_stream: terminal.Stream(StreamHandler), terminal_stream: terminal.Stream(StreamHandler),
@ -356,8 +363,6 @@ const EventData = struct {
/// The data stream is the main IO for the pty. /// The data stream is the main IO for the pty.
data_stream: xev.Stream, data_stream: xev.Stream,
data_stream_c_read: xev.Completion = .{},
data_stream_buf: [1024]u8 = undefined,
/// The event loop, /// The event loop,
loop: *xev.Loop, loop: *xev.Loop,
@ -384,7 +389,6 @@ const EventData = struct {
self.write_buf_pool.deinit(alloc); self.write_buf_pool.deinit(alloc);
// Stop our data stream // Stop our data stream
// TODO: close?
self.data_stream.deinit(); self.data_stream.deinit();
} }
@ -394,30 +398,6 @@ const EventData = struct {
inline fn queueRender(self: *EventData) !void { inline fn queueRender(self: *EventData) !void {
try self.renderer_wakeup.notify(); try self.renderer_wakeup.notify();
} }
/// Queue a write to the pty.
fn queueWrite(self: *EventData, data: []const u8) !void {
// We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream.
var i: usize = 0;
while (i < data.len) {
const req = try self.write_req_pool.get();
const buf = try self.write_buf_pool.get();
const end = @min(data.len, i + buf.len);
fastmem.copy(u8, buf, data[i..end]);
self.data_stream.queueWrite(
self.loop,
&self.write_queue,
req,
.{ .slice = buf[0..(end - i)] },
EventData,
self,
ttyWrite,
);
i = end;
}
}
}; };
fn ttyWrite( fn ttyWrite(
@ -442,92 +422,112 @@ fn ttyWrite(
return .disarm; return .disarm;
} }
fn ttyRead( /// The read thread sits in a loop doing the following pseudo code:
ev_: ?*EventData, ///
_: *xev.Loop, /// while (true) { blocking_read(); exit_if_eof(); process(); }
_: *xev.Completion, ///
_: xev.Stream, /// Almost all terminal-modifying activity is from the pty read, so
read_buf: xev.ReadBuffer, /// putting this on a dedicated thread keeps performance very predictable
r: xev.Stream.ReadError!usize, /// while also almost optimal. "Locking is fast, lock contention is slow."
) xev.CallbackAction { /// and since we rarely have contention, this is fast.
const zone = trace(@src()); ///
defer zone.end(); /// This is also empirically fast compared to putting the read into
/// an async mechanism like io_uring/epoll because the reads are generally
/// small.
const ReadThread = struct {
/// The main entrypoint for the thread.
fn threadMain(fd: std.os.fd_t, ev: *EventData) void {
var buf: [1024]u8 = undefined;
while (true) {
const n = std.os.read(fd, &buf) catch |err| {
log.err("READ ERROR err={}", .{err});
return;
};
log.info("DATA: {d}", .{n});
const ev = ev_.?; @call(.always_inline, process, .{ ev, buf[0..n] });
const n = r catch |err| {
switch (err) {
error.EOF => return .disarm,
else => log.err("read error err={}", .{err}),
} }
}
return .rearm; fn process(
}; ev: *EventData,
const buf = read_buf.slice[0..n]; buf: []const u8,
) void {
const zone = trace(@src());
defer zone.end();
// log.info("DATA: {d}", .{n}); // log.info("DATA: {d}", .{n});
// log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]}); // log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]});
// Whenever a character is typed, we ensure the cursor is in the // Whenever a character is typed, we ensure the cursor is in the
// non-blink state so it is rendered if visible. If we're under // non-blink state so it is rendered if visible. If we're under
// HEAVY read load, we don't want to send a ton of these so we // HEAVY read load, we don't want to send a ton of these so we
// use a timer under the covers // use a timer under the covers
// TODO // TODO
// const now = t.loop().now(); // const now = t.loop().now();
// if (now - ev.last_cursor_reset > 500) { // if (now - ev.last_cursor_reset > 500) {
// ev.last_cursor_reset = now; // ev.last_cursor_reset = now;
// _ = ev.renderer_mailbox.push(.{ // _ = ev.renderer_mailbox.push(.{
// .reset_cursor_blink = {}, // .reset_cursor_blink = {},
// }, .{ .forever = {} }); // }, .{ .forever = {} });
// } // }
// We are modifying terminal state from here on out // We are modifying terminal state from here on out
ev.renderer_state.mutex.lock(); ev.renderer_state.mutex.lock();
defer ev.renderer_state.mutex.unlock(); defer ev.renderer_state.mutex.unlock();
// Schedule a render // Schedule a render
ev.queueRender() catch unreachable; ev.queueRender() catch unreachable;
// Process the terminal data. This is an extremely hot part of the // Process the terminal data. This is an extremely hot part of the
// terminal emulator, so we do some abstraction leakage to avoid // terminal emulator, so we do some abstraction leakage to avoid
// function calls and unnecessary logic. // function calls and unnecessary logic.
// //
// The ground state is the only state that we can see and print/execute // The ground state is the only state that we can see and print/execute
// ASCII, so we only execute this hot path if we're already in the ground // ASCII, so we only execute this hot path if we're already in the ground
// state. // state.
// //
// Empirically, this alone improved throughput of large text output by ~20%. // Empirically, this alone improved throughput of large text output by ~20%.
var i: usize = 0; var i: usize = 0;
const end = @intCast(usize, n); const end = buf.len;
if (ev.terminal_stream.parser.state == .ground) { if (ev.terminal_stream.parser.state == .ground) {
for (buf[i..end]) |ch| { for (buf[i..end]) |ch| {
switch (terminal.parse_table.table[ch][@enumToInt(terminal.Parser.State.ground)].action) { switch (terminal.parse_table.table[ch][@enumToInt(terminal.Parser.State.ground)].action) {
// Print, call directly. // Print, call directly.
.print => ev.terminal_stream.handler.print(@intCast(u21, ch)) catch |err| .print => ev.terminal_stream.handler.print(@intCast(u21, ch)) catch |err|
log.err("error processing terminal data: {}", .{err}), log.err("error processing terminal data: {}", .{err}),
// C0 execute, let our stream handle this one but otherwise // C0 execute, let our stream handle this one but otherwise
// continue since we're guaranteed to be back in ground. // continue since we're guaranteed to be back in ground.
.execute => ev.terminal_stream.execute(ch) catch |err| .execute => ev.terminal_stream.execute(ch) catch |err|
log.err("error processing terminal data: {}", .{err}), log.err("error processing terminal data: {}", .{err}),
// Otherwise, break out and go the slow path until we're // Otherwise, break out and go the slow path until we're
// back in ground. There is a slight optimization here where // back in ground. There is a slight optimization here where
// could try to find the next transition to ground but when // could try to find the next transition to ground but when
// I implemented that it didn't materially change performance. // I implemented that it didn't materially change performance.
else => break, else => break,
}
i += 1;
} }
}
i += 1; if (i < end) {
ev.terminal_stream.nextSlice(buf[i..end]) catch |err|
log.err("error processing terminal data: {}", .{err});
}
// If our stream handling caused messages to be sent to the writer
// thread, then we need to wake it up so that it processes them.
if (ev.terminal_stream.handler.writer_messaged) {
ev.terminal_stream.handler.writer_messaged = false;
ev.writer_wakeup.notify() catch |err| {
log.warn("failed to wake up writer thread err={}", .{err});
};
} }
} }
};
if (i < end) {
ev.terminal_stream.nextSlice(buf[i..end]) catch |err|
log.err("error processing terminal data: {}", .{err});
}
return .rearm;
}
/// This is used as the handler for the terminal.Stream type. This is /// This is used as the handler for the terminal.Stream type. This is
/// stateful and is expected to live for the entire lifetime of the terminal. /// stateful and is expected to live for the entire lifetime of the terminal.
@ -540,12 +540,18 @@ const StreamHandler = struct {
terminal: *terminal.Terminal, terminal: *terminal.Terminal,
window_mailbox: Window.Mailbox, window_mailbox: Window.Mailbox,
/// This is set to true when a message was written to the writer
/// mailbox. This can be used by callers to determine if they need
/// to wake up the writer.
writer_messaged: bool = false,
inline fn queueRender(self: *StreamHandler) !void { inline fn queueRender(self: *StreamHandler) !void {
try self.ev.queueRender(); try self.ev.queueRender();
} }
inline fn queueWrite(self: *StreamHandler, data: []const u8) !void { inline fn messageWriter(self: *StreamHandler, msg: termio.Message) void {
try self.ev.queueWrite(data); _ = self.ev.writer_mailbox.push(msg, .{ .forever = {} });
self.writer_messaged = true;
} }
pub fn print(self: *StreamHandler, ch: u21) !void { pub fn print(self: *StreamHandler, ch: u21) !void {
@ -753,8 +759,7 @@ const StreamHandler = struct {
switch (req) { switch (req) {
// VT220 // VT220
.primary => self.queueWrite("\x1B[?62;c") catch |err| .primary => self.messageWriter(.{ .write_stable = "\x1B[?62;c" }),
log.warn("error queueing device attr response: {}", .{err}),
else => log.warn("unimplemented device attributes req: {}", .{req}), else => log.warn("unimplemented device attributes req: {}", .{req}),
} }
} }
@ -764,8 +769,7 @@ const StreamHandler = struct {
req: terminal.DeviceStatusReq, req: terminal.DeviceStatusReq,
) !void { ) !void {
switch (req) { switch (req) {
.operating_status => self.queueWrite("\x1B[0n") catch |err| .operating_status => self.messageWriter(.{ .write_stable = "\x1B[0n" }),
log.warn("error queueing device attr response: {}", .{err}),
.cursor_position => { .cursor_position => {
const pos: struct { const pos: struct {
@ -783,13 +787,14 @@ const StreamHandler = struct {
// Response always is at least 4 chars, so this leaves the // Response always is at least 4 chars, so this leaves the
// remainder for the row/column as base-10 numbers. This // remainder for the row/column as base-10 numbers. This
// will support a very large terminal. // will support a very large terminal.
var buf: [32]u8 = undefined; var msg: termio.Message = .{ .write_small = .{} };
const resp = try std.fmt.bufPrint(&buf, "\x1B[{};{}R", .{ const resp = try std.fmt.bufPrint(&msg.write_small.data, "\x1B[{};{}R", .{
pos.y + 1, pos.y + 1,
pos.x + 1, pos.x + 1,
}); });
msg.write_small.len = @intCast(u8, resp.len);
try self.queueWrite(resp); self.messageWriter(msg);
}, },
else => log.warn("unimplemented device status req: {}", .{req}), else => log.warn("unimplemented device status req: {}", .{req}),
@ -824,7 +829,7 @@ const StreamHandler = struct {
} }
pub fn enquiry(self: *StreamHandler) !void { pub fn enquiry(self: *StreamHandler) !void {
try self.queueWrite(""); self.messageWriter(.{ .write_stable = "" });
} }
pub fn scrollDown(self: *StreamHandler, count: usize) !void { pub fn scrollDown(self: *StreamHandler, count: usize) !void {

View File

@ -16,7 +16,7 @@ const log = std.log.scoped(.io_thread);
/// The type used for sending messages to the IO thread. For now this is /// The type used for sending messages to the IO thread. For now this is
/// hardcoded with a capacity. We can make this a comptime parameter in /// hardcoded with a capacity. We can make this a comptime parameter in
/// the future if we want it configurable. /// the future if we want it configurable.
const Mailbox = BlockingQueue(termio.Message, 64); pub const Mailbox = BlockingQueue(termio.Message, 64);
/// Allocator used for some state /// Allocator used for some state
alloc: std.mem.Allocator, alloc: std.mem.Allocator,
@ -99,7 +99,7 @@ fn threadMain_(self: *Thread) !void {
// Run our thread start/end callbacks. This allows the implementation // Run our thread start/end callbacks. This allows the implementation
// to hook into the event loop as needed. // to hook into the event loop as needed.
var data = try self.impl.threadEnter(&self.loop); var data = try self.impl.threadEnter(self);
defer data.deinit(); defer data.deinit();
defer self.impl.threadExit(data); defer self.impl.threadExit(data);

View File

@ -60,8 +60,8 @@ pub fn MessageData(comptime Elem: type, comptime small_size: comptime_int) type
pub const Small = struct { pub const Small = struct {
pub const Max = small_size; pub const Max = small_size;
pub const Array = [Max]Elem; pub const Array = [Max]Elem;
data: Array, data: Array = undefined,
len: u8, len: u8 = 0,
}; };
pub const Alloc = struct { pub const Alloc = struct {