termio: use libxev (with TODOs)

This commit is contained in:
Mitchell Hashimoto
2023-02-04 11:47:51 -08:00
parent ad10b2cf0b
commit 7e6a86f065
3 changed files with 134 additions and 148 deletions

View File

@ -467,7 +467,7 @@ pub fn destroy(self: *Window) void {
{
// Stop our IO thread
self.io_thread.stop.send() catch |err|
self.io_thread.stop.notify() catch |err|
log.err("error notifying io thread to stop, may stall err={}", .{err});
self.io_thr.join();
self.io_thread.deinit();
@ -582,7 +582,7 @@ fn clipboardRead(self: *const Window, kind: u8) !void {
self.alloc,
buf,
), .{ .forever = {} });
self.io_thread.wakeup.send() catch {};
self.io_thread.wakeup.notify() catch {};
}
fn clipboardWrite(self: *const Window, data: []const u8) !void {
@ -629,7 +629,7 @@ fn setCellSize(self: *Window, size: renderer.CellSize) !void {
.padding = self.padding,
},
}, .{ .forever = {} });
self.io_thread.wakeup.send() catch {};
self.io_thread.wakeup.notify() catch {};
}
/// Change the font size.
@ -696,7 +696,7 @@ pub fn sizeCallback(self: *Window, size: apprt.WindowSize) !void {
.padding = self.padding,
},
}, .{ .forever = {} });
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
pub fn charCallback(self: *Window, codepoint: u21) !void {
@ -746,7 +746,7 @@ pub fn charCallback(self: *Window, codepoint: u21) !void {
}, .{ .forever = {} });
// After sending all our messages we have to notify our IO thread
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
pub fn keyCallback(
@ -793,7 +793,7 @@ pub fn keyCallback(
_ = self.io_thread.mailbox.push(.{
.write_stable = data,
}, .{ .forever = {} });
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
},
.cursor_key => |ck| {
@ -816,7 +816,7 @@ pub fn keyCallback(
}, .{ .forever = {} });
}
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
},
.copy_to_clipboard => {
@ -870,7 +870,7 @@ pub fn keyCallback(
}, .{ .forever = {} });
}
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
},
@ -1008,7 +1008,7 @@ pub fn keyCallback(
}, .{ .forever = {} });
// After sending all our messages we have to notify our IO thread
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
}
}
@ -1264,7 +1264,7 @@ fn mouseReport(
}
// After sending all our messages we have to notify our IO thread
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
pub fn mouseButtonCallback(

View File

@ -202,30 +202,25 @@ fn killCommand(self: *Exec) !void {
}
}
pub fn threadEnter(self: *Exec, loop: libuv.Loop) !ThreadData {
pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData {
assert(self.data == null);
// Get a copy to our allocator
const alloc_ptr = loop.getData(Allocator).?;
const alloc = alloc_ptr.*;
const alloc = self.alloc;
// Setup our data that is used for callbacks
var ev_data_ptr = try alloc.create(EventData);
errdefer alloc.destroy(ev_data_ptr);
// Read data
var stream = try libuv.Tty.init(alloc, loop, self.pty.master);
errdefer stream.deinit(alloc);
stream.setData(ev_data_ptr);
try stream.readStart(ttyReadAlloc, ttyRead);
var stream = xev.Stream.initFd(self.pty.master);
errdefer stream.deinit();
// Setup our event data before we start
ev_data_ptr.* = .{
.read_arena = std.heap.ArenaAllocator.init(alloc),
.renderer_state = self.renderer_state,
.renderer_wakeup = self.renderer_wakeup,
.renderer_mailbox = self.renderer_mailbox,
.data_stream = stream,
.loop = loop,
.terminal_stream = .{
.handler = .{
.alloc = self.alloc,
@ -241,6 +236,16 @@ pub fn threadEnter(self: *Exec, loop: libuv.Loop) !ThreadData {
// Store our data so our callbacks can access it
self.data = ev_data_ptr;
// Start our stream read
stream.read(
loop,
&ev_data_ptr.data_stream_c_read,
.{ .slice = &ev_data_ptr.data_stream_buf },
EventData,
ev_data_ptr,
ttyRead,
);
// Return our thread data
return ThreadData{
.alloc = alloc,
@ -307,11 +312,6 @@ const EventData = struct {
// enough to satisfy most write requests. It must be a power of 2.
const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5);
/// This is the arena allocator used for IO read buffers. Since we use
/// libuv under the covers, this lets us rarely heap allocate since we're
/// usually just reusing buffers from this.
read_arena: std.heap.ArenaAllocator,
/// The stream parser. This parses the stream of escape codes and so on
/// from the child process and calls callbacks in the stream handler.
terminal_stream: terminal.Stream(StreamHandler),
@ -327,11 +327,19 @@ const EventData = struct {
renderer_mailbox: *renderer.Thread.Mailbox,
/// The data stream is the main IO for the pty.
data_stream: libuv.Tty,
data_stream: xev.Stream,
data_stream_c_read: xev.Completion = .{},
data_stream_buf: [1024]u8 = undefined,
/// The event loop,
loop: *xev.Loop,
/// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{},
/// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(libuv.WriteReq.T, WRITE_REQ_PREALLOC) = .{},
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
/// The pool of available buffers for writing to the pty.
write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{},
@ -341,8 +349,6 @@ const EventData = struct {
last_cursor_reset: u64 = 0,
pub fn deinit(self: *EventData, alloc: Allocator) void {
self.read_arena.deinit();
// Clear our write pools. We know we aren't ever going to do
// any more IO since we stop our data stream below so we can just
// drop this.
@ -350,13 +356,8 @@ const EventData = struct {
self.write_buf_pool.deinit(alloc);
// Stop our data stream
self.data_stream.readStop();
self.data_stream.close((struct {
fn callback(h: *libuv.Tty) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
// TODO: close?
self.data_stream.deinit();
}
/// This queues a render operation with the renderer thread. The render
@ -376,9 +377,13 @@ const EventData = struct {
const buf = try self.write_buf_pool.get();
const end = @min(data.len, i + buf.len);
fastmem.copy(u8, buf, data[i..end]);
try self.data_stream.write(
.{ .req = req },
&[1][]u8{buf[0..(end - i)]},
self.data_stream.queueWrite(
self.loop,
&self.write_queue,
req,
.{ .slice = buf[0..(end - i)] },
EventData,
self,
ttyWrite,
);
@ -387,62 +392,65 @@ const EventData = struct {
}
};
fn ttyWrite(req: *libuv.WriteReq, status: i32) void {
const tty = req.handle(libuv.Tty).?;
const ev = tty.getData(EventData).?;
fn ttyWrite(
ev_: ?*EventData,
_: *xev.Loop,
_: *xev.Completion,
_: xev.Stream,
_: xev.WriteBuffer,
r: xev.Stream.WriteError!usize,
) xev.CallbackAction {
const ev = ev_.?;
ev.write_req_pool.put();
ev.write_buf_pool.put();
libuv.convertError(status) catch |err|
const d = r catch |err| {
log.err("write error: {}", .{err});
return .disarm;
};
_ = d;
//log.info("WROTE: {d}", .{status});
return .disarm;
}
fn ttyReadAlloc(t: *libuv.Tty, size: usize) ?[]u8 {
fn ttyRead(
ev_: ?*EventData,
_: *xev.Loop,
_: *xev.Completion,
_: xev.Stream,
read_buf: xev.ReadBuffer,
r: xev.Stream.ReadError!usize,
) xev.CallbackAction {
const zone = trace(@src());
defer zone.end();
const ev = t.getData(EventData) orelse return null;
const alloc = ev.read_arena.allocator();
return alloc.alloc(u8, size) catch null;
}
const ev = ev_.?;
const n = r catch |err| {
switch (err) {
error.EOF => return .disarm,
else => log.err("read error err={}", .{err}),
}
fn ttyRead(t: *libuv.Tty, n: isize, buf: []const u8) void {
const zone = trace(@src());
defer zone.end();
const ev = t.getData(EventData).?;
defer {
const alloc = ev.read_arena.allocator();
alloc.free(buf);
}
return .rearm;
};
const buf = read_buf.slice[0..n];
// log.info("DATA: {d}", .{n});
// log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]});
// First check for errors in the case n is less than 0.
libuv.convertError(@intCast(i32, n)) catch |err| {
switch (err) {
// ignore EOF because it should end the process.
libuv.Error.EOF => {},
else => log.err("read error: {}", .{err}),
}
return;
};
// Whenever a character is typed, we ensure the cursor is in the
// non-blink state so it is rendered if visible. If we're under
// HEAVY read load, we don't want to send a ton of these so we
// use a timer under the covers
const now = t.loop().now();
if (now - ev.last_cursor_reset > 500) {
ev.last_cursor_reset = now;
_ = ev.renderer_mailbox.push(.{
.reset_cursor_blink = {},
}, .{ .forever = {} });
}
// TODO
// const now = t.loop().now();
// if (now - ev.last_cursor_reset > 500) {
// ev.last_cursor_reset = now;
// _ = ev.renderer_mailbox.push(.{
// .reset_cursor_blink = {},
// }, .{ .forever = {} });
// }
// We are modifying terminal state from here on out
ev.renderer_state.mutex.lock();
@ -489,6 +497,8 @@ fn ttyRead(t: *libuv.Tty, n: isize, buf: []const u8) void {
ev.terminal_stream.nextSlice(buf[i..end]) catch |err|
log.err("error processing terminal data: {}", .{err});
}
return .rearm;
}
/// This is used as the handler for the terminal.Stream type. This is

View File

@ -4,6 +4,7 @@ pub const Thread = @This();
const std = @import("std");
const builtin = @import("builtin");
const xev = @import("xev");
const libuv = @import("libuv");
const termio = @import("../termio.zig");
const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue;
@ -18,16 +19,21 @@ const log = std.log.scoped(.io_thread);
/// the future if we want it configurable.
const Mailbox = BlockingQueue(termio.Message, 64);
/// Allocator used for some state
alloc: std.mem.Allocator,
/// The main event loop for the thread. The user data of this loop
/// is always the allocator used to create the loop. This is a convenience
/// so that users of the loop always have an allocator.
loop: libuv.Loop,
loop: xev.Loop,
/// This can be used to wake up the thread.
wakeup: libuv.Async,
wakeup: xev.Async,
wakeup_c: xev.Completion = .{},
/// This can be used to stop the thread on the next loop iteration.
stop: libuv.Async,
stop: xev.Async,
stop_c: xev.Completion = .{},
/// The underlying IO implementation.
impl: *termio.Impl,
@ -43,44 +49,24 @@ pub fn init(
alloc: Allocator,
impl: *termio.Impl,
) !Thread {
// We always store allocator pointer on the loop data so that
// handles can use our global allocator.
const allocPtr = try alloc.create(Allocator);
errdefer alloc.destroy(allocPtr);
allocPtr.* = alloc;
// Create our event loop.
var loop = try libuv.Loop.init(alloc);
errdefer {
// Run the loop once to close any of our handles
_ = loop.run(.nowait) catch 0;
loop.deinit(alloc);
}
loop.setData(allocPtr);
var loop = try xev.Loop.init(.{});
errdefer loop.deinit();
// This async handle is used to "wake up" the renderer and force a render.
var wakeup_h = try libuv.Async.init(alloc, loop, wakeupCallback);
errdefer wakeup_h.close((struct {
fn callback(h: *libuv.Async) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var wakeup_h = try xev.Async.init();
errdefer wakeup_h.deinit();
// This async handle is used to stop the loop and force the thread to end.
var stop_h = try libuv.Async.init(alloc, loop, stopCallback);
errdefer stop_h.close((struct {
fn callback(h: *libuv.Async) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var stop_h = try xev.Async.init();
errdefer stop_h.deinit();
// The mailbox for messaging this thread
var mailbox = try Mailbox.create(alloc);
errdefer mailbox.destroy(alloc);
return Thread{
.alloc = alloc,
.loop = loop,
.wakeup = wakeup_h,
.stop = stop_h,
@ -92,37 +78,12 @@ pub fn init(
/// Clean up the thread. This is only safe to call once the thread
/// completes executing; the caller must join prior to this.
pub fn deinit(self: *Thread) void {
// Get a copy to our allocator
const alloc_ptr = self.loop.getData(Allocator).?;
const alloc = alloc_ptr.*;
// Schedule our handles to close
self.stop.close((struct {
fn callback(h: *libuv.Async) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
self.wakeup.close((struct {
fn callback(h: *libuv.Async) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
// Run the loop one more time, because destroying our other things
// like windows usually cancel all our event loop stuff and we need
// one more run through to finalize all the closes.
_ = self.loop.run(.default) catch |err|
log.err("error finalizing event loop: {}", .{err});
self.stop.deinit();
self.wakeup.deinit();
self.loop.deinit();
// Nothing can possibly access the mailbox anymore, destroy it.
self.mailbox.destroy(alloc);
// Dealloc our allocator copy
alloc.destroy(alloc_ptr);
self.loop.deinit(alloc);
self.mailbox.destroy(self.alloc);
}
/// The main entrypoint for the thread.
@ -139,18 +100,18 @@ fn threadMain_(self: *Thread) !void {
// Run our thread start/end callbacks. This allows the implementation
// to hook into the event loop as needed.
var data = try self.impl.threadEnter(self.loop);
var data = try self.impl.threadEnter(&self.loop);
defer data.deinit();
defer self.impl.threadExit(data);
// Set up our async handler to support rendering
self.wakeup.setData(self);
defer self.wakeup.setData(null);
// Start the async handlers
self.wakeup.wait(&self.loop, &self.wakeup_c, Thread, self, wakeupCallback);
self.stop.wait(&self.loop, &self.stop_c, Thread, self, stopCallback);
// Run
log.debug("starting IO thread", .{});
defer log.debug("exiting IO thread", .{});
_ = try self.loop.run(.default);
try self.loop.run(.until_done);
}
/// Drain the mailbox, handling all the messages in our terminal implementation.
@ -185,22 +146,37 @@ fn drainMailbox(self: *Thread) !void {
}
}
fn wakeupCallback(h: *libuv.Async) void {
fn wakeupCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Async.WaitError!void,
) xev.CallbackAction {
_ = r catch |err| {
log.err("error in wakeup err={}", .{err});
return .rearm;
};
const zone = trace(@src());
defer zone.end();
const t = h.getData(Thread) orelse {
// This shouldn't happen so we log it.
log.warn("wakeup callback fired without data set", .{});
return;
};
const t = self_.?;
// When we wake up, we check the mailbox. Mailbox producers should
// wake up our thread after publishing.
t.drainMailbox() catch |err|
log.err("error draining mailbox err={}", .{err});
return .rearm;
}
fn stopCallback(h: *libuv.Async) void {
h.loop().stop();
fn stopCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Async.WaitError!void,
) xev.CallbackAction {
_ = r catch unreachable;
self_.?.loop.stop();
return .disarm;
}