diff --git a/src/Window.zig b/src/Window.zig index b847c5b8d..89ab690a1 100644 --- a/src/Window.zig +++ b/src/Window.zig @@ -467,7 +467,7 @@ pub fn destroy(self: *Window) void { { // Stop our IO thread - self.io_thread.stop.send() catch |err| + self.io_thread.stop.notify() catch |err| log.err("error notifying io thread to stop, may stall err={}", .{err}); self.io_thr.join(); self.io_thread.deinit(); @@ -582,7 +582,7 @@ fn clipboardRead(self: *const Window, kind: u8) !void { self.alloc, buf, ), .{ .forever = {} }); - self.io_thread.wakeup.send() catch {}; + self.io_thread.wakeup.notify() catch {}; } fn clipboardWrite(self: *const Window, data: []const u8) !void { @@ -629,7 +629,7 @@ fn setCellSize(self: *Window, size: renderer.CellSize) !void { .padding = self.padding, }, }, .{ .forever = {} }); - self.io_thread.wakeup.send() catch {}; + self.io_thread.wakeup.notify() catch {}; } /// Change the font size. @@ -696,7 +696,7 @@ pub fn sizeCallback(self: *Window, size: apprt.WindowSize) !void { .padding = self.padding, }, }, .{ .forever = {} }); - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); } pub fn charCallback(self: *Window, codepoint: u21) !void { @@ -746,7 +746,7 @@ pub fn charCallback(self: *Window, codepoint: u21) !void { }, .{ .forever = {} }); // After sending all our messages we have to notify our IO thread - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); } pub fn keyCallback( @@ -793,7 +793,7 @@ pub fn keyCallback( _ = self.io_thread.mailbox.push(.{ .write_stable = data, }, .{ .forever = {} }); - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); }, .cursor_key => |ck| { @@ -816,7 +816,7 @@ pub fn keyCallback( }, .{ .forever = {} }); } - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); }, .copy_to_clipboard => { @@ -870,7 +870,7 @@ pub fn keyCallback( }, .{ .forever = {} }); } - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); } }, @@ -1008,7 +1008,7 @@ pub fn keyCallback( }, .{ .forever = {} }); // After sending all our messages we have to notify our IO thread - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); } } } @@ -1264,7 +1264,7 @@ fn mouseReport( } // After sending all our messages we have to notify our IO thread - try self.io_thread.wakeup.send(); + try self.io_thread.wakeup.notify(); } pub fn mouseButtonCallback( diff --git a/src/termio/Exec.zig b/src/termio/Exec.zig index b54048b26..167508af1 100644 --- a/src/termio/Exec.zig +++ b/src/termio/Exec.zig @@ -202,30 +202,25 @@ fn killCommand(self: *Exec) !void { } } -pub fn threadEnter(self: *Exec, loop: libuv.Loop) !ThreadData { +pub fn threadEnter(self: *Exec, loop: *xev.Loop) !ThreadData { assert(self.data == null); - - // Get a copy to our allocator - const alloc_ptr = loop.getData(Allocator).?; - const alloc = alloc_ptr.*; + const alloc = self.alloc; // Setup our data that is used for callbacks var ev_data_ptr = try alloc.create(EventData); errdefer alloc.destroy(ev_data_ptr); // Read data - var stream = try libuv.Tty.init(alloc, loop, self.pty.master); - errdefer stream.deinit(alloc); - stream.setData(ev_data_ptr); - try stream.readStart(ttyReadAlloc, ttyRead); + var stream = xev.Stream.initFd(self.pty.master); + errdefer stream.deinit(); // Setup our event data before we start ev_data_ptr.* = .{ - .read_arena = std.heap.ArenaAllocator.init(alloc), .renderer_state = self.renderer_state, .renderer_wakeup = self.renderer_wakeup, .renderer_mailbox = self.renderer_mailbox, .data_stream = stream, + .loop = loop, .terminal_stream = .{ .handler = .{ .alloc = self.alloc, @@ -241,6 +236,16 @@ pub fn threadEnter(self: *Exec, loop: libuv.Loop) !ThreadData { // Store our data so our callbacks can access it self.data = ev_data_ptr; + // Start our stream read + stream.read( + loop, + &ev_data_ptr.data_stream_c_read, + .{ .slice = &ev_data_ptr.data_stream_buf }, + EventData, + ev_data_ptr, + ttyRead, + ); + // Return our thread data return ThreadData{ .alloc = alloc, @@ -307,11 +312,6 @@ const EventData = struct { // enough to satisfy most write requests. It must be a power of 2. const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5); - /// This is the arena allocator used for IO read buffers. Since we use - /// libuv under the covers, this lets us rarely heap allocate since we're - /// usually just reusing buffers from this. - read_arena: std.heap.ArenaAllocator, - /// The stream parser. This parses the stream of escape codes and so on /// from the child process and calls callbacks in the stream handler. terminal_stream: terminal.Stream(StreamHandler), @@ -327,11 +327,19 @@ const EventData = struct { renderer_mailbox: *renderer.Thread.Mailbox, /// The data stream is the main IO for the pty. - data_stream: libuv.Tty, + data_stream: xev.Stream, + data_stream_c_read: xev.Completion = .{}, + data_stream_buf: [1024]u8 = undefined, + + /// The event loop, + loop: *xev.Loop, + + /// The write queue for the data stream. + write_queue: xev.Stream.WriteQueue = .{}, /// This is the pool of available (unused) write requests. If you grab /// one from the pool, you must put it back when you're done! - write_req_pool: SegmentedPool(libuv.WriteReq.T, WRITE_REQ_PREALLOC) = .{}, + write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{}, /// The pool of available buffers for writing to the pty. write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{}, @@ -341,8 +349,6 @@ const EventData = struct { last_cursor_reset: u64 = 0, pub fn deinit(self: *EventData, alloc: Allocator) void { - self.read_arena.deinit(); - // Clear our write pools. We know we aren't ever going to do // any more IO since we stop our data stream below so we can just // drop this. @@ -350,13 +356,8 @@ const EventData = struct { self.write_buf_pool.deinit(alloc); // Stop our data stream - self.data_stream.readStop(); - self.data_stream.close((struct { - fn callback(h: *libuv.Tty) void { - const handle_alloc = h.loop().getData(Allocator).?.*; - h.deinit(handle_alloc); - } - }).callback); + // TODO: close? + self.data_stream.deinit(); } /// This queues a render operation with the renderer thread. The render @@ -376,9 +377,13 @@ const EventData = struct { const buf = try self.write_buf_pool.get(); const end = @min(data.len, i + buf.len); fastmem.copy(u8, buf, data[i..end]); - try self.data_stream.write( - .{ .req = req }, - &[1][]u8{buf[0..(end - i)]}, + self.data_stream.queueWrite( + self.loop, + &self.write_queue, + req, + .{ .slice = buf[0..(end - i)] }, + EventData, + self, ttyWrite, ); @@ -387,62 +392,65 @@ const EventData = struct { } }; -fn ttyWrite(req: *libuv.WriteReq, status: i32) void { - const tty = req.handle(libuv.Tty).?; - const ev = tty.getData(EventData).?; +fn ttyWrite( + ev_: ?*EventData, + _: *xev.Loop, + _: *xev.Completion, + _: xev.Stream, + _: xev.WriteBuffer, + r: xev.Stream.WriteError!usize, +) xev.CallbackAction { + const ev = ev_.?; ev.write_req_pool.put(); ev.write_buf_pool.put(); - libuv.convertError(status) catch |err| + const d = r catch |err| { log.err("write error: {}", .{err}); - + return .disarm; + }; + _ = d; //log.info("WROTE: {d}", .{status}); + + return .disarm; } -fn ttyReadAlloc(t: *libuv.Tty, size: usize) ?[]u8 { +fn ttyRead( + ev_: ?*EventData, + _: *xev.Loop, + _: *xev.Completion, + _: xev.Stream, + read_buf: xev.ReadBuffer, + r: xev.Stream.ReadError!usize, +) xev.CallbackAction { const zone = trace(@src()); defer zone.end(); - const ev = t.getData(EventData) orelse return null; - const alloc = ev.read_arena.allocator(); - return alloc.alloc(u8, size) catch null; -} + const ev = ev_.?; + const n = r catch |err| { + switch (err) { + error.EOF => return .disarm, + else => log.err("read error err={}", .{err}), + } -fn ttyRead(t: *libuv.Tty, n: isize, buf: []const u8) void { - const zone = trace(@src()); - defer zone.end(); - - const ev = t.getData(EventData).?; - defer { - const alloc = ev.read_arena.allocator(); - alloc.free(buf); - } + return .rearm; + }; + const buf = read_buf.slice[0..n]; // log.info("DATA: {d}", .{n}); // log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]}); - // First check for errors in the case n is less than 0. - libuv.convertError(@intCast(i32, n)) catch |err| { - switch (err) { - // ignore EOF because it should end the process. - libuv.Error.EOF => {}, - else => log.err("read error: {}", .{err}), - } - - return; - }; - // Whenever a character is typed, we ensure the cursor is in the // non-blink state so it is rendered if visible. If we're under // HEAVY read load, we don't want to send a ton of these so we // use a timer under the covers - const now = t.loop().now(); - if (now - ev.last_cursor_reset > 500) { - ev.last_cursor_reset = now; - _ = ev.renderer_mailbox.push(.{ - .reset_cursor_blink = {}, - }, .{ .forever = {} }); - } + // TODO + // const now = t.loop().now(); + // if (now - ev.last_cursor_reset > 500) { + // ev.last_cursor_reset = now; + // _ = ev.renderer_mailbox.push(.{ + // .reset_cursor_blink = {}, + // }, .{ .forever = {} }); + // } // We are modifying terminal state from here on out ev.renderer_state.mutex.lock(); @@ -489,6 +497,8 @@ fn ttyRead(t: *libuv.Tty, n: isize, buf: []const u8) void { ev.terminal_stream.nextSlice(buf[i..end]) catch |err| log.err("error processing terminal data: {}", .{err}); } + + return .rearm; } /// This is used as the handler for the terminal.Stream type. This is diff --git a/src/termio/Thread.zig b/src/termio/Thread.zig index c086ef51f..8dc350bd6 100644 --- a/src/termio/Thread.zig +++ b/src/termio/Thread.zig @@ -4,6 +4,7 @@ pub const Thread = @This(); const std = @import("std"); const builtin = @import("builtin"); +const xev = @import("xev"); const libuv = @import("libuv"); const termio = @import("../termio.zig"); const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue; @@ -18,16 +19,21 @@ const log = std.log.scoped(.io_thread); /// the future if we want it configurable. const Mailbox = BlockingQueue(termio.Message, 64); +/// Allocator used for some state +alloc: std.mem.Allocator, + /// The main event loop for the thread. The user data of this loop /// is always the allocator used to create the loop. This is a convenience /// so that users of the loop always have an allocator. -loop: libuv.Loop, +loop: xev.Loop, /// This can be used to wake up the thread. -wakeup: libuv.Async, +wakeup: xev.Async, +wakeup_c: xev.Completion = .{}, /// This can be used to stop the thread on the next loop iteration. -stop: libuv.Async, +stop: xev.Async, +stop_c: xev.Completion = .{}, /// The underlying IO implementation. impl: *termio.Impl, @@ -43,44 +49,24 @@ pub fn init( alloc: Allocator, impl: *termio.Impl, ) !Thread { - // We always store allocator pointer on the loop data so that - // handles can use our global allocator. - const allocPtr = try alloc.create(Allocator); - errdefer alloc.destroy(allocPtr); - allocPtr.* = alloc; - // Create our event loop. - var loop = try libuv.Loop.init(alloc); - errdefer { - // Run the loop once to close any of our handles - _ = loop.run(.nowait) catch 0; - loop.deinit(alloc); - } - loop.setData(allocPtr); + var loop = try xev.Loop.init(.{}); + errdefer loop.deinit(); // This async handle is used to "wake up" the renderer and force a render. - var wakeup_h = try libuv.Async.init(alloc, loop, wakeupCallback); - errdefer wakeup_h.close((struct { - fn callback(h: *libuv.Async) void { - const loop_alloc = h.loop().getData(Allocator).?.*; - h.deinit(loop_alloc); - } - }).callback); + var wakeup_h = try xev.Async.init(); + errdefer wakeup_h.deinit(); // This async handle is used to stop the loop and force the thread to end. - var stop_h = try libuv.Async.init(alloc, loop, stopCallback); - errdefer stop_h.close((struct { - fn callback(h: *libuv.Async) void { - const loop_alloc = h.loop().getData(Allocator).?.*; - h.deinit(loop_alloc); - } - }).callback); + var stop_h = try xev.Async.init(); + errdefer stop_h.deinit(); // The mailbox for messaging this thread var mailbox = try Mailbox.create(alloc); errdefer mailbox.destroy(alloc); return Thread{ + .alloc = alloc, .loop = loop, .wakeup = wakeup_h, .stop = stop_h, @@ -92,37 +78,12 @@ pub fn init( /// Clean up the thread. This is only safe to call once the thread /// completes executing; the caller must join prior to this. pub fn deinit(self: *Thread) void { - // Get a copy to our allocator - const alloc_ptr = self.loop.getData(Allocator).?; - const alloc = alloc_ptr.*; - - // Schedule our handles to close - self.stop.close((struct { - fn callback(h: *libuv.Async) void { - const handle_alloc = h.loop().getData(Allocator).?.*; - h.deinit(handle_alloc); - } - }).callback); - self.wakeup.close((struct { - fn callback(h: *libuv.Async) void { - const handle_alloc = h.loop().getData(Allocator).?.*; - h.deinit(handle_alloc); - } - }).callback); - - // Run the loop one more time, because destroying our other things - // like windows usually cancel all our event loop stuff and we need - // one more run through to finalize all the closes. - _ = self.loop.run(.default) catch |err| - log.err("error finalizing event loop: {}", .{err}); + self.stop.deinit(); + self.wakeup.deinit(); + self.loop.deinit(); // Nothing can possibly access the mailbox anymore, destroy it. - self.mailbox.destroy(alloc); - - // Dealloc our allocator copy - alloc.destroy(alloc_ptr); - - self.loop.deinit(alloc); + self.mailbox.destroy(self.alloc); } /// The main entrypoint for the thread. @@ -139,18 +100,18 @@ fn threadMain_(self: *Thread) !void { // Run our thread start/end callbacks. This allows the implementation // to hook into the event loop as needed. - var data = try self.impl.threadEnter(self.loop); + var data = try self.impl.threadEnter(&self.loop); defer data.deinit(); defer self.impl.threadExit(data); - // Set up our async handler to support rendering - self.wakeup.setData(self); - defer self.wakeup.setData(null); + // Start the async handlers + self.wakeup.wait(&self.loop, &self.wakeup_c, Thread, self, wakeupCallback); + self.stop.wait(&self.loop, &self.stop_c, Thread, self, stopCallback); // Run log.debug("starting IO thread", .{}); defer log.debug("exiting IO thread", .{}); - _ = try self.loop.run(.default); + try self.loop.run(.until_done); } /// Drain the mailbox, handling all the messages in our terminal implementation. @@ -185,22 +146,37 @@ fn drainMailbox(self: *Thread) !void { } } -fn wakeupCallback(h: *libuv.Async) void { +fn wakeupCallback( + self_: ?*Thread, + _: *xev.Loop, + _: *xev.Completion, + r: xev.Async.WaitError!void, +) xev.CallbackAction { + _ = r catch |err| { + log.err("error in wakeup err={}", .{err}); + return .rearm; + }; + const zone = trace(@src()); defer zone.end(); - const t = h.getData(Thread) orelse { - // This shouldn't happen so we log it. - log.warn("wakeup callback fired without data set", .{}); - return; - }; + const t = self_.?; // When we wake up, we check the mailbox. Mailbox producers should // wake up our thread after publishing. t.drainMailbox() catch |err| log.err("error draining mailbox err={}", .{err}); + + return .rearm; } -fn stopCallback(h: *libuv.Async) void { - h.loop().stop(); +fn stopCallback( + self_: ?*Thread, + _: *xev.Loop, + _: *xev.Completion, + r: xev.Async.WaitError!void, +) xev.CallbackAction { + _ = r catch unreachable; + self_.?.loop.stop(); + return .disarm; }