Merge pull request #70 from mitchellh/libxev

This completely drops libuv as a dependency and uses [libxev](https://github.com/mitchellh/libxev) instead.

The _primary_ reason to do this is easier building, cross-compiling, and the ability to write a WASM backend in the future. However, we don't want to do this unless it has a positive or null impact on performance. On this, we've succeeded!

The performance impact on Linux is very small. The performance impact on macOS is quite large (I'm seeing almost 40% faster on IO heavy workloads). On both platforms, the stddev of IO heavy workloads is much smaller. This isn't libxev itself, this is primarily do to switching to the dedicated read thread (more next).

This PR also has an architectural change: the TTY reads now happen on a dedicated thread _outside the event loop_. I realized that TTY reads are almost always very small and under heavy IO load, the event loop overhead is very heavy. This is particularly true with io_uring since it uses an kernel threadpool under the covers and the context switch cost was very visible. Another factor is that TTY reads are almost always the only thing impacting terminal state. So if we wrap the terminal state in a lock (we already do), there is almost no overhead to moving the reads to a thread since locks aren't slow, lock contention is slow, and we have almost no lock contention under typical loads.

The main impact of the dedicated TTY reader thread is that there is no event loop overhead and the "jitter" between IO reads is now very small.

🚀
This commit is contained in:
Mitchell Hashimoto
2023-02-06 16:18:19 -08:00
committed by GitHub
33 changed files with 667 additions and 22918 deletions

3
.gitmodules vendored
View File

@ -40,3 +40,6 @@
[submodule "vendor/zig-objc"]
path = vendor/zig-objc
url = https://github.com/mitchellh/zig-objc.git
[submodule "vendor/libxev"]
path = vendor/libxev
url = https://github.com/mitchellh/libxev.git

View File

@ -8,8 +8,8 @@ const freetype = @import("pkg/freetype/build.zig");
const harfbuzz = @import("pkg/harfbuzz/build.zig");
const imgui = @import("pkg/imgui/build.zig");
const js = @import("vendor/zig-js/build.zig");
const libxev = @import("vendor/libxev/build.zig");
const libxml2 = @import("vendor/zig-libxml2/libxml2.zig");
const libuv = @import("pkg/libuv/build.zig");
const libpng = @import("pkg/libpng/build.zig");
const macos = @import("pkg/macos/build.zig");
const objc = @import("vendor/zig-objc/build.zig");
@ -275,7 +275,7 @@ fn addDeps(
step.addPackage(harfbuzz.pkg);
step.addPackage(imgui.pkg);
step.addPackage(glfw.pkg);
step.addPackage(libuv.pkg);
step.addPackage(libxev.pkg);
step.addPackage(pixman.pkg);
step.addPackage(stb_image_resize.pkg);
step.addPackage(utf8proc.pkg);
@ -328,7 +328,6 @@ fn addDeps(
step.linkSystemLibrary("freetype2");
step.linkSystemLibrary("harfbuzz");
step.linkSystemLibrary("libpng");
step.linkSystemLibrary("libuv");
step.linkSystemLibrary("pixman-1");
step.linkSystemLibrary("zlib");
@ -378,10 +377,6 @@ fn addDeps(
const pixman_step = try pixman.link(b, step, .{});
_ = pixman_step;
// Libuv
const libuv_step = try libuv.link(b, step);
system_sdk.include(b, libuv_step, .{});
// Only Linux gets fontconfig
if (enable_fontconfig) {
// Libxml2

View File

@ -1,54 +0,0 @@
//! Async handles allow the user to wakeup the event loop and get a callback
//! called from another thread.
const Async = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Loop = @import("Loop.zig");
const Handle = @import("handle.zig").Handle;
handle: *c.uv_async_t,
pub usingnamespace Handle(Async);
pub fn init(alloc: Allocator, loop: Loop, comptime cb: fn (*Async) void) !Async {
var handle = try alloc.create(c.uv_async_t);
errdefer alloc.destroy(handle);
const Wrapper = struct {
pub fn callback(arg: [*c]c.uv_async_t) callconv(.C) void {
var newSelf: Async = .{ .handle = arg };
@call(.always_inline, cb, .{&newSelf});
}
};
try errors.convertError(c.uv_async_init(loop.loop, handle, Wrapper.callback));
return Async{ .handle = handle };
}
pub fn deinit(self: *Async, alloc: Allocator) void {
alloc.destroy(self.handle);
self.* = undefined;
}
/// Wake up the event loop and call the async handles callback.
pub fn send(self: Async) !void {
try errors.convertError(c.uv_async_send(self.handle));
}
test "Async" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
var h = try init(testing.allocator, loop, (struct {
fn callback(v: *Async) void {
v.close(null);
}
}).callback);
defer h.deinit(testing.allocator);
try h.send();
_ = try loop.run(.default);
}

View File

@ -1,44 +0,0 @@
//! Condition variables implemented via libuv.
const Cond = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Mutex = @import("Mutex.zig");
cond: *c.uv_cond_t,
pub fn init(alloc: Allocator) !Cond {
const cond = try alloc.create(c.uv_cond_t);
try errors.convertError(c.uv_cond_init(cond));
return Cond{ .cond = cond };
}
pub fn deinit(self: *Cond, alloc: Allocator) void {
c.uv_cond_destroy(self.cond);
alloc.destroy(self.cond);
self.* = undefined;
}
pub fn signal(self: Cond) void {
c.uv_cond_signal(self.cond);
}
pub fn broadcast(self: Cond) void {
c.uv_cond_broadcast(self.cond);
}
pub fn wait(self: Cond, mutex: Mutex) void {
c.uv_cond_wait(self.cond, mutex.mutex);
}
pub fn timedwait(self: Cond, mutex: Mutex, timeout: u64) c_int {
return c.uv_cond_timedwait(self.cond, mutex.mutex, timeout);
}
test {
var cond = try init(testing.allocator);
defer cond.deinit(testing.allocator);
}

View File

@ -1,163 +0,0 @@
//! This has a helper for embedding libuv in another event loop.
//! This is an extension of libuv and not a helper built-in to libuv
//! itself, although it uses official APIs of libuv to enable the
//! functionality.
const Embed = @This();
const std = @import("std");
const builtin = @import("builtin");
const testing = std.testing;
const Allocator = std.mem.Allocator;
const Loop = @import("Loop.zig");
const Thread = @import("Thread.zig");
const Mutex = @import("Mutex.zig");
const Cond = @import("Cond.zig");
const log = std.log.scoped(.libuv_embed);
const BoolAtomic = std.atomic.Atomic(bool);
loop: Loop,
mutex: Mutex,
cond: Cond,
ready: bool = false,
terminate: BoolAtomic,
callback: *const fn () void,
thread: ?Thread,
/// Initialize a new embedder. The callback is called when libuv should
/// tick. The callback should be as fast as possible.
pub fn init(
alloc: Allocator,
loop: Loop,
callback: *const fn () void,
) !Embed {
return Embed{
.loop = loop,
.mutex = try Mutex.init(alloc),
.cond = try Cond.init(alloc),
.terminate = BoolAtomic.init(false),
.callback = callback,
.thread = null,
};
}
/// Deinit the embed struct. This will not automatically terminate
/// the embed thread. You must call stop manually.
pub fn deinit(self: *Embed, alloc: Allocator) void {
self.mutex.deinit(alloc);
self.cond.deinit(alloc);
self.* = undefined;
}
/// Start the thread that runs the embed logic and calls callback
/// when the libuv loop should tick. This must only be called once.
pub fn start(self: *Embed) !void {
self.thread = try Thread.initData(self, Embed.threadMain);
}
/// Stop stops the embed thread and blocks until the thread joins.
pub fn stop(self: *Embed) void {
if (self.thread == null) return;
// Mark that we want to terminate
self.terminate.store(true, .SeqCst);
// Post to the semaphore to ensure that any waits are processed.
self.cond.broadcast();
}
/// Wait for the thread backing the embedding to end.
pub fn join(self: *Embed) !void {
if (self.thread) |*thr| {
try thr.join();
self.thread = null;
}
}
/// loopRun runs the next tick of the libuv event loop. This should be
/// called by the main loop thread as a result of callback making some
/// signal. This should NOT be called from callback.
pub fn loopRun(self: *Embed) !void {
self.mutex.lock();
defer self.mutex.unlock();
if (self.ready) {
self.ready = false;
_ = try self.loop.run(.nowait);
self.cond.broadcast();
}
}
fn threadMain(self: *Embed) void {
while (self.terminate.load(.SeqCst) == false) {
const fd = self.loop.backendFd() catch unreachable;
const timeout = self.loop.backendTimeout();
switch (builtin.os.tag) {
// epoll
.linux => {
var ev: [1]std.os.linux.epoll_event = undefined;
_ = std.os.epoll_wait(fd, &ev, timeout);
},
// kqueue
.macos, .dragonfly, .freebsd, .openbsd, .netbsd => {
var ts: std.os.timespec = .{
.tv_sec = @divTrunc(timeout, 1000),
.tv_nsec = @mod(timeout, 1000) * 1000000,
};
// Important: for kevent to block properly, it needs an
// EMPTY changeset and a NON-EMPTY event set.
var changes: [0]std.os.Kevent = undefined;
var events: [1]std.os.Kevent = undefined;
_ = std.os.kevent(
fd,
&changes,
&events,
if (timeout < 0) null else &ts,
) catch |err| blk: {
log.err("kevent error: {}", .{err});
break :blk 0;
};
},
else => @compileError("unsupported libuv Embed platform"),
}
// Call our trigger
self.callback();
// Wait for libuv to run a tick.
//
// NOTE: we use timedwait because I /believe/ there is a race here
// with gflw post event that sometimes causes it not to receive it.
// Therefore, if too much time passes, we just go back and loop
// through the poller.
//
// TODO: this is suboptimal for performance. There as to be a better
// way to do this.
self.mutex.lock();
defer self.mutex.unlock();
self.ready = true;
_ = self.cond.timedwait(self.mutex, 10 * 1000000);
}
}
test "Embed" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
var embed = try init(testing.allocator, loop, (struct {
fn callback() void {}
}).callback);
defer embed.deinit(testing.allocator);
// This just tests that the thread can start and then stop.
// It doesn't do much else at the moment
try embed.start();
embed.stop();
try embed.join();
}

View File

@ -1,66 +0,0 @@
//! Idle handles will run the given callback once per loop iteration, right
//! before the uv_prepare_t handles.
const Idle = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Loop = @import("Loop.zig");
const Handle = @import("handle.zig").Handle;
handle: *c.uv_idle_t,
pub usingnamespace Handle(Idle);
pub fn init(alloc: Allocator, loop: Loop) !Idle {
var handle = try alloc.create(c.uv_idle_t);
errdefer alloc.destroy(handle);
try errors.convertError(c.uv_idle_init(loop.loop, handle));
return Idle{ .handle = handle };
}
pub fn deinit(self: *Idle, alloc: Allocator) void {
alloc.destroy(self.handle);
self.* = undefined;
}
/// Start the handle with the given callback. This function always succeeds,
/// except when cb is NULL.
pub fn start(self: Idle, comptime cb: fn (*Idle) void) !void {
const Wrapper = struct {
pub fn callback(arg: [*c]c.uv_idle_t) callconv(.C) void {
var newSelf: Idle = .{ .handle = arg };
@call(.always_inline, cb, .{&newSelf});
}
};
try errors.convertError(c.uv_idle_start(self.handle, Wrapper.callback));
}
/// Stop the handle, the callback will no longer be called.
pub fn stop(self: Idle) !void {
try errors.convertError(c.uv_idle_stop(self.handle));
}
test "Idle" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
var h = try init(testing.allocator, loop);
defer h.deinit(testing.allocator);
var called: bool = false;
h.setData(&called);
try h.start((struct {
fn callback(t: *Idle) void {
t.getData(bool).?.* = true;
t.close(null);
}
}).callback);
_ = try loop.run(.default);
try testing.expect(called);
}

View File

@ -1,123 +0,0 @@
const Loop = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
loop: *c.uv_loop_t,
/// Initialize a new uv_loop.
pub fn init(alloc: Allocator) !Loop {
// The uv_loop_t type MUST be heap allocated and must not be copied.
// I can't find a definitive source on this, but the test suite starts
// hanging in weird places and doing bad things when it is copied.
const loop = try alloc.create(c.uv_loop_t);
try errors.convertError(c.uv_loop_init(loop));
return Loop{ .loop = loop };
}
/// Releases all internal loop resources. Call this function only when the
/// loop has finished executing and all open handles and requests have been
/// closed, or this will silently fail (in debug mode it will panic).
pub fn deinit(self: *Loop, alloc: Allocator) void {
// deinit functions idiomatically cannot fail in Zig, so we do the
// next best thing here and assert so that in debug mode you'll get
// a crash.
std.debug.assert(c.uv_loop_close(self.loop) >= 0);
alloc.destroy(self.loop);
self.* = undefined;
}
/// Returns true if the loop is still alive.
pub fn alive(self: Loop) !bool {
const res = c.uv_loop_alive(self.loop);
try errors.convertError(res);
return res > 0;
}
/// This function runs the event loop. See RunMode for mode documentation.
///
/// This is not reentrant. It must not be called from a callback.
pub fn run(self: Loop, mode: RunMode) !u32 {
const res = c.uv_run(self.loop, @enumToInt(mode));
try errors.convertError(res);
return @intCast(u32, res);
}
/// Stop the event loop, causing uv_run() to end as soon as possible. This
/// will happen not sooner than the next loop iteration. If this function was
/// called before blocking for i/o, the loop wont block for i/o on this iteration.
pub fn stop(self: Loop) void {
c.uv_stop(self.loop);
}
/// Get backend file descriptor. Only kqueue, epoll and event ports are supported.
///
/// This can be used in conjunction with uv_run(loop, UV_RUN_NOWAIT) to poll
/// in one thread and run the event loops callbacks in another see
/// test/test-embed.c for an example.
pub fn backendFd(self: Loop) !c_int {
const res = c.uv_backend_fd(self.loop);
try errors.convertError(res);
return res;
}
/// Get the poll timeout. The return value is in milliseconds, or -1 for no
/// timeout.
pub fn backendTimeout(self: Loop) c_int {
return c.uv_backend_timeout(self.loop);
}
/// Return the current timestamp in milliseconds. The timestamp is cached at
/// the start of the event loop tick, see uv_update_time() for details and rationale.
///
/// The timestamp increases monotonically from some arbitrary point in time.
/// Dont make assumptions about the starting point, you will only get disappointed.
pub fn now(self: Loop) u64 {
return c.uv_now(self.loop);
}
/// Update the event loops concept of now. Libuv caches the current time at
/// the start of the event loop tick in order to reduce the number of time-related
/// system calls.
///
/// You wont normally need to call this function unless you have callbacks
/// that block the event loop for longer periods of time, where longer is
/// somewhat subjective but probably on the order of a millisecond or more.
pub fn updateTime(self: Loop) void {
return c.uv_update_time(self.loop);
}
/// Sets loop->data to data.
pub fn setData(self: Loop, pointer: ?*anyopaque) void {
c.uv_loop_set_data(self.loop, pointer);
}
/// Returns loop->data.
pub fn getData(self: Loop, comptime DT: type) ?*DT {
return if (c.uv_loop_get_data(self.loop)) |ptr|
@ptrCast(?*DT, @alignCast(@alignOf(DT), ptr))
else
null;
}
/// Mode used to run the loop with uv_run().
pub const RunMode = enum(c.uv_run_mode) {
default = c.UV_RUN_DEFAULT,
once = c.UV_RUN_ONCE,
nowait = c.UV_RUN_NOWAIT,
};
test {
var loop = try init(testing.allocator);
defer loop.deinit(testing.allocator);
var data: u8 = 42;
loop.setData(&data);
try testing.expect(loop.getData(u8).?.* == 42);
try testing.expect((try loop.backendFd()) > 0);
try testing.expectEqual(@as(u32, 0), try loop.run(.nowait));
}

View File

@ -1,35 +0,0 @@
//! Mutexes implemented via libuv.
const Mutex = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
mutex: *c.uv_mutex_t,
pub fn init(alloc: Allocator) !Mutex {
const mutex = try alloc.create(c.uv_mutex_t);
try errors.convertError(c.uv_mutex_init(mutex));
return Mutex{ .mutex = mutex };
}
pub fn deinit(self: *Mutex, alloc: Allocator) void {
c.uv_mutex_destroy(self.mutex);
alloc.destroy(self.mutex);
self.* = undefined;
}
pub fn lock(self: Mutex) void {
c.uv_mutex_lock(self.mutex);
}
pub fn unlock(self: Mutex) void {
c.uv_mutex_unlock(self.mutex);
}
test {
var mutex = try init(testing.allocator);
defer mutex.deinit(testing.allocator);
}

View File

@ -1,174 +0,0 @@
//! Pipe handles provide an abstraction over streaming files on Unix
//! (including local domain sockets, pipes, and FIFOs) and named pipes on
//! Windows.
const Pipe = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Loop = @import("Loop.zig");
const Handle = @import("handle.zig").Handle;
const stream = @import("stream.zig");
const Stream = stream.Stream;
const WriteReq = stream.WriteReq;
handle: *c.uv_pipe_t,
pub usingnamespace Handle(Pipe);
pub usingnamespace Stream(Pipe);
/// Valid flags for pipe.
pub const Flags = packed struct {
_ignore: u6 = 0,
nonblock: bool = false, // UV_NONBLOCK_PIPE = 0x40
_ignore_high: u1 = 0,
pub inline fn toInt(self: Flags, comptime IntType: type) IntType {
return @intCast(IntType, @bitCast(u8, self));
}
test "Flags: expected value" {
const f: Flags = .{ .nonblock = true };
try testing.expectEqual(c.UV_NONBLOCK_PIPE, f.toInt(c_int));
}
};
/// Pair is a pair of ends to a single pipe.
pub const Pair = struct {
read: c.uv_file,
write: c.uv_file,
};
/// Create a pair of connected pipe handles. Data may be written to fds[1] and
/// read from fds[0]. The resulting handles can be passed to uv_pipe_open,
/// used with uv_spawn, or for any other purpose.
pub fn pipe(read_flags: Flags, write_flags: Flags) !Pair {
var res: [2]c.uv_file = undefined;
try errors.convertError(c.uv_pipe(
&res,
read_flags.toInt(c_int),
write_flags.toInt(c_int),
));
return Pair{ .read = res[0], .write = res[1] };
}
pub fn init(alloc: Allocator, loop: Loop, ipc: bool) !Pipe {
var handle = try alloc.create(c.uv_pipe_t);
errdefer alloc.destroy(handle);
try errors.convertError(c.uv_pipe_init(loop.loop, handle, @boolToInt(ipc)));
return Pipe{ .handle = handle };
}
pub fn deinit(self: *Pipe, alloc: Allocator) void {
alloc.destroy(self.handle);
self.* = undefined;
}
/// Open an existing file descriptor or HANDLE as a pipe.
pub fn open(self: Pipe, file: c.uv_file) !void {
try errors.convertError(c.uv_pipe_open(self.handle, file));
}
test {
_ = Flags;
}
test "Pipe" {
const pair = try pipe(.{ .nonblock = true }, .{ .nonblock = true });
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
// Read side
var reader = try init(testing.allocator, loop, false);
defer reader.deinit(testing.allocator);
try reader.open(pair.read);
try testing.expect(try reader.isReadable());
try testing.expect(!try reader.isWritable());
// Write side
var writer = try init(testing.allocator, loop, false);
defer writer.deinit(testing.allocator);
try writer.open(pair.write);
try testing.expect(!try writer.isReadable());
try testing.expect(try writer.isWritable());
// Set our data that we'll use to assert
var data: TestData = .{};
defer data.deinit();
writer.setData(&data);
// Write
var writeReq = try WriteReq.init(testing.allocator);
defer writeReq.deinit(testing.allocator);
try writer.write(
writeReq,
&[_][]const u8{
"hello",
},
TestData.write,
);
// Run write and verify success
_ = try loop.run(.once);
try testing.expectEqual(@as(u8, 1), data.count);
try testing.expectEqual(@as(i32, 0), data.status);
// Read
try reader.readStart(TestData.alloc, TestData.read);
reader.setData(&data);
_ = try loop.run(.once);
// Check our data
try testing.expectEqual(@as(usize, 5), data.data.items.len);
try testing.expectEqualStrings("hello", data.data.items);
data.data.clearRetainingCapacity();
// Try writing directly
_ = try writer.tryWrite(&[_][]const u8{"world"});
_ = try loop.run(.once);
try testing.expectEqual(@as(usize, 5), data.data.items.len);
try testing.expectEqualStrings("world", data.data.items);
// End
reader.readStop();
reader.close(null);
writer.close(null);
_ = try loop.run(.default);
}
/// Logic for testing read/write of pipes.
const TestData = struct {
count: u8 = 0,
status: i32 = 0,
data: std.ArrayListUnmanaged(u8) = .{},
fn deinit(self: *TestData) void {
self.data.deinit(testing.allocator);
self.* = undefined;
}
fn write(req: *WriteReq, status: i32) void {
var data = req.handle(Pipe).?.getData(TestData).?;
data.count += 1;
data.status = status;
}
fn alloc(_: *Pipe, size: usize) ?[]u8 {
return testing.allocator.alloc(u8, size) catch null;
}
fn read(h: *Pipe, n: isize, buf: []const u8) void {
var data = h.getData(TestData).?;
data.data.appendSlice(
testing.allocator,
buf[0..@intCast(usize, n)],
) catch unreachable;
testing.allocator.free(buf);
}
};

View File

@ -1,66 +0,0 @@
//! Prepare handles will run the given callback once per loop iteration, right
//! before polling for i/o.
const Prepare = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Loop = @import("Loop.zig");
const Handle = @import("handle.zig").Handle;
handle: *c.uv_prepare_t,
pub usingnamespace Handle(Prepare);
pub fn init(alloc: Allocator, loop: Loop) !Prepare {
var handle = try alloc.create(c.uv_prepare_t);
errdefer alloc.destroy(handle);
try errors.convertError(c.uv_prepare_init(loop.loop, handle));
return Prepare{ .handle = handle };
}
pub fn deinit(self: *Prepare, alloc: Allocator) void {
alloc.destroy(self.handle);
self.* = undefined;
}
/// Start the handle with the given callback. This function always succeeds,
/// except when cb is NULL.
pub fn start(self: Prepare, comptime cb: fn (*Prepare) void) !void {
const Wrapper = struct {
pub fn callback(arg: [*c]c.uv_prepare_t) callconv(.C) void {
var newSelf: Prepare = .{ .handle = arg };
@call(.always_inline, cb, .{&newSelf});
}
};
try errors.convertError(c.uv_prepare_start(self.handle, Wrapper.callback));
}
/// Stop the handle, the callback will no longer be called.
pub fn stop(self: Prepare) !void {
try errors.convertError(c.uv_prepare_stop(self.handle));
}
test "Prepare" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
var h = try init(testing.allocator, loop);
defer h.deinit(testing.allocator);
var called: bool = false;
h.setData(&called);
try h.start((struct {
fn callback(t: *Prepare) void {
t.getData(bool).?.* = true;
t.close(null);
}
}).callback);
_ = try loop.run(.default);
try testing.expect(called);
}

View File

@ -1,35 +0,0 @@
//! Semaphores implemented via libuv.
const Sem = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
sem: *c.uv_sem_t,
pub fn init(alloc: Allocator, value: u32) !Sem {
const sem = try alloc.create(c.uv_sem_t);
try errors.convertError(c.uv_sem_init(sem, value));
return Sem{ .sem = sem };
}
pub fn deinit(self: *Sem, alloc: Allocator) void {
c.uv_sem_destroy(self.sem);
alloc.destroy(self.sem);
self.* = undefined;
}
pub fn post(self: Sem) void {
c.uv_sem_post(self.sem);
}
pub fn wait(self: Sem) void {
c.uv_sem_wait(self.sem);
}
test {
var sem = try init(testing.allocator, 0);
defer sem.deinit(testing.allocator);
}

View File

@ -1,87 +0,0 @@
//! Threading implemented by libuv.
const Thread = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
thread: c.uv_thread_t,
/// Get the current thread
pub fn self() Thread {
return .{ .thread = c.uv_thread_self() };
}
/// Initialize a new thread.
pub fn init(
comptime callback: fn () void,
) !Thread {
const CWrapper = struct {
pub fn wrapper(_: ?*const anyopaque) callconv(.C) void {
@call(.always_inline, callback, .{});
}
};
var res = Thread{ .thread = undefined };
try errors.convertError(c.uv_thread_create(&res.thread, CWrapper.wrapper, null));
return res;
}
/// Initialize a new thread with user data attached.
pub fn initData(
data: anytype,
comptime callback: fn (arg: @TypeOf(data)) void,
) !Thread {
// Comptime stuff to learn more about our data parameter. This is used
// to do the proper casts for the callback.
const Data = @TypeOf(data);
const dataInfo = @typeInfo(Data);
if (dataInfo != .Pointer) @compileError("data must be a pointer type");
const CWrapper = struct {
pub fn wrapper(arg: ?*anyopaque) callconv(.C) void {
@call(.always_inline, callback, .{
@ptrCast(Data, @alignCast(@alignOf(dataInfo.Pointer.child), arg)),
});
}
};
var res: Thread = .{ .thread = undefined };
try errors.convertError(c.uv_thread_create(
&res.thread,
CWrapper.wrapper,
data,
));
return res;
}
pub fn join(t: *Thread) !void {
try errors.convertError(c.uv_thread_join(&t.thread));
}
test "Thread: no data argument" {
count = 0;
var thread = try init(incr);
try thread.join();
try testing.expectEqual(@as(u8, 1), count);
}
test "Thread: with data argument" {
count = 0;
var data: u8 = 2;
var thread = try initData(&data, incrBy);
try thread.join();
try testing.expectEqual(@as(u8, 2), count);
}
var count: u8 = 0;
fn incr() void {
count += 1;
}
fn incrBy(v: *u8) void {
count += v.*;
}

View File

@ -1,114 +0,0 @@
//! Timer handles are used to schedule callbacks to be called in the future.
const Timer = @This();
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Loop = @import("Loop.zig");
const Handle = @import("handle.zig").Handle;
handle: *c.uv_timer_t,
pub usingnamespace Handle(Timer);
pub fn init(alloc: Allocator, loop: Loop) !Timer {
var timer = try alloc.create(c.uv_timer_t);
errdefer alloc.destroy(timer);
try errors.convertError(c.uv_timer_init(loop.loop, timer));
return Timer{ .handle = timer };
}
pub fn deinit(self: *Timer, alloc: Allocator) void {
alloc.destroy(self.handle);
self.* = undefined;
}
/// Start the timer. timeout and repeat are in milliseconds.
///
/// If timeout is zero, the callback fires on the next event loop iteration.
/// If repeat is non-zero, the callback fires first after timeout milliseconds
/// and then repeatedly after repeat milliseconds.
pub fn start(
self: Timer,
comptime cb: fn (*Timer) void,
timeout: u64,
repeat: u64,
) !void {
const Wrapper = struct {
pub fn callback(handle: [*c]c.uv_timer_t) callconv(.C) void {
var newSelf: Timer = .{ .handle = handle };
@call(.always_inline, cb, .{&newSelf});
}
};
try errors.convertError(c.uv_timer_start(
self.handle,
Wrapper.callback,
timeout,
repeat,
));
}
/// Stop the timer, the callback will not be called anymore.
pub fn stop(self: Timer) !void {
try errors.convertError(c.uv_timer_stop(self.handle));
}
/// Stop the timer, and if it is repeating restart it using the repeat value
/// as the timeout. If the timer has never been started before it returns UV_EINVAL.
pub fn again(self: Timer) !void {
try errors.convertError(c.uv_timer_again(self.handle));
}
/// Get the timer repeat value.
pub fn getRepeat(self: Timer) u64 {
return c.uv_timer_get_repeat(self.handle);
}
/// Set the repeat interval value in milliseconds. The timer will be scheduled
/// to run on the given interval, regardless of the callback execution duration,
/// and will follow normal timer semantics in the case of a time-slice overrun.
pub fn setRepeat(self: Timer, repeat: u64) void {
c.uv_timer_set_repeat(self.handle, repeat);
}
test "Timer" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
var timer = try init(testing.allocator, loop);
defer timer.deinit(testing.allocator);
var called: bool = false;
timer.setData(&called);
try timer.start((struct {
fn callback(t: *Timer) void {
t.getData(bool).?.* = true;
t.close(null);
}
}).callback, 10, 1000);
_ = try loop.run(.default);
try testing.expect(called);
}
test "Timer: close callback" {
var loop = try Loop.init(testing.allocator);
defer loop.deinit(testing.allocator);
var timer = try init(testing.allocator, loop);
defer timer.deinit(testing.allocator);
var data: u8 = 42;
timer.setData(&data);
timer.close((struct {
fn callback(v: *Timer) void {
var dataPtr = v.getData(u8).?;
dataPtr.* = 24;
}
}).callback);
_ = try loop.run(.default);
try testing.expectEqual(@as(u8, 24), data);
}

View File

@ -1,29 +0,0 @@
//! Tty handles represent a stream for the console.
const Tty = @This();
const std = @import("std");
const fd_t = std.os.fd_t;
const Allocator = std.mem.Allocator;
const testing = std.testing;
const c = @import("c.zig");
const errors = @import("error.zig");
const Loop = @import("Loop.zig");
const Handle = @import("handle.zig").Handle;
const Stream = @import("stream.zig").Stream;
handle: *c.uv_tty_t,
pub usingnamespace Handle(Tty);
pub usingnamespace Stream(Tty);
pub fn init(alloc: Allocator, loop: Loop, fd: fd_t) !Tty {
var tty = try alloc.create(c.uv_tty_t);
errdefer alloc.destroy(tty);
try errors.convertError(c.uv_tty_init(loop.loop, tty, fd, 0));
return Tty{ .handle = tty };
}
pub fn deinit(self: *Tty, alloc: Allocator) void {
alloc.destroy(self.handle);
self.* = undefined;
}

View File

@ -1,160 +0,0 @@
const std = @import("std");
/// Directories with our includes.
const root = thisDir() ++ "../../../vendor/libuv/";
const include_path = root ++ "include";
pub const pkg = std.build.Pkg{
.name = "libuv",
.source = .{ .path = thisDir() ++ "/main.zig" },
};
fn thisDir() []const u8 {
return std.fs.path.dirname(@src().file) orelse ".";
}
pub fn link(b: *std.build.Builder, step: *std.build.LibExeObjStep) !*std.build.LibExeObjStep {
const libuv = try buildLibuv(b, step);
step.linkLibrary(libuv);
step.addIncludePath(include_path);
return libuv;
}
pub fn buildLibuv(
b: *std.build.Builder,
step: *std.build.LibExeObjStep,
) !*std.build.LibExeObjStep {
const lib = b.addStaticLibrary("uv", null);
lib.setTarget(step.target);
lib.setBuildMode(step.build_mode);
const target = step.target;
// Include dirs
lib.addIncludePath(include_path);
lib.addIncludePath(root ++ "src");
// Links
if (target.isWindows()) {
lib.linkSystemLibrary("psapi");
lib.linkSystemLibrary("user32");
lib.linkSystemLibrary("advapi32");
lib.linkSystemLibrary("iphlpapi");
lib.linkSystemLibrary("userenv");
lib.linkSystemLibrary("ws2_32");
}
if (target.isLinux()) {
lib.linkSystemLibrary("pthread");
}
lib.linkLibC();
// Compilation
var flags = std.ArrayList([]const u8).init(b.allocator);
defer flags.deinit();
// try flags.appendSlice(&.{});
if (!target.isWindows()) {
try flags.appendSlice(&.{
"-D_FILE_OFFSET_BITS=64",
"-D_LARGEFILE_SOURCE",
});
}
if (target.isLinux()) {
try flags.appendSlice(&.{
"-D_GNU_SOURCE",
"-D_POSIX_C_SOURCE=200112",
});
}
if (target.isDarwin()) {
try flags.appendSlice(&.{
"-D_DARWIN_UNLIMITED_SELECT=1",
"-D_DARWIN_USE_64_BIT_INODE=1",
});
}
// C files common to all platforms
lib.addCSourceFiles(&.{
root ++ "src/fs-poll.c",
root ++ "src/idna.c",
root ++ "src/inet.c",
root ++ "src/random.c",
root ++ "src/strscpy.c",
root ++ "src/strtok.c",
root ++ "src/threadpool.c",
root ++ "src/timer.c",
root ++ "src/uv-common.c",
root ++ "src/uv-data-getter-setters.c",
root ++ "src/version.c",
}, flags.items);
if (!target.isWindows()) {
lib.addCSourceFiles(&.{
root ++ "src/unix/async.c",
root ++ "src/unix/core.c",
root ++ "src/unix/dl.c",
root ++ "src/unix/fs.c",
root ++ "src/unix/getaddrinfo.c",
root ++ "src/unix/getnameinfo.c",
root ++ "src/unix/loop-watcher.c",
root ++ "src/unix/loop.c",
root ++ "src/unix/pipe.c",
root ++ "src/unix/poll.c",
root ++ "src/unix/process.c",
root ++ "src/unix/random-devurandom.c",
root ++ "src/unix/signal.c",
root ++ "src/unix/stream.c",
root ++ "src/unix/tcp.c",
root ++ "src/unix/thread.c",
root ++ "src/unix/tty.c",
root ++ "src/unix/udp.c",
}, flags.items);
}
if (target.isLinux() or target.isDarwin()) {
lib.addCSourceFiles(&.{
root ++ "src/unix/proctitle.c",
}, flags.items);
}
if (target.isLinux()) {
lib.addCSourceFiles(&.{
root ++ "src/unix/linux-core.c",
root ++ "src/unix/linux-inotify.c",
root ++ "src/unix/linux-syscalls.c",
root ++ "src/unix/procfs-exepath.c",
root ++ "src/unix/random-getrandom.c",
root ++ "src/unix/random-sysctl-linux.c",
root ++ "src/unix/epoll.c",
}, flags.items);
}
if (target.isDarwin() or
target.isOpenBSD() or
target.isNetBSD() or
target.isFreeBSD() or
target.isDragonFlyBSD())
{
lib.addCSourceFiles(&.{
root ++ "src/unix/bsd-ifaddrs.c",
root ++ "src/unix/kqueue.c",
}, flags.items);
}
if (target.isDarwin() or target.isOpenBSD()) {
lib.addCSourceFiles(&.{
root ++ "src/unix/random-getentropy.c",
}, flags.items);
}
if (target.isDarwin()) {
lib.addCSourceFiles(&.{
root ++ "src/unix/darwin-proctitle.c",
root ++ "src/unix/darwin.c",
root ++ "src/unix/fsevents.c",
}, flags.items);
}
return lib;
}

View File

@ -1,19 +0,0 @@
const builtin = @import("builtin");
pub usingnamespace switch (builtin.zig_backend) {
.stage1 => @cImport({
@cInclude("uv.h");
}),
// Workaround for:
// https://github.com/ziglang/zig/issues/12325
//
// Generated by:
// zig translate-c -target aarch64-macos -lc -Ivendor/libuv/include vendor/libuv/include/uv.h
// (and then manually modified)
else => switch (builtin.os.tag) {
.macos => @import("cimport_macos.zig"),
.linux => @import("cimport_linux.zig"),
else => @compileError("unsupported OS for now, see this line"),
},
};

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,196 +0,0 @@
const std = @import("std");
const testing = std.testing;
const c = @import("c.zig");
/// Enum mapping for errors.
pub const Errno = enum(i32) {
E2BIG = c.UV_E2BIG,
EACCES = c.UV_EACCES,
EADDRINUSE = c.UV_EADDRINUSE,
EADDRNOTAVAIL = c.UV_EADDRNOTAVAIL,
EAFNOSUPPORT = c.UV_EAFNOSUPPORT,
EAGAIN = c.UV_EAGAIN,
EAI_ADDRFAMILY = c.UV_EAI_ADDRFAMILY,
EAI_AGAIN = c.UV_EAI_AGAIN,
EAI_BADFLAGS = c.UV_EAI_BADFLAGS,
EAI_BADHINTS = c.UV_EAI_BADHINTS,
EAI_CANCELED = c.UV_EAI_CANCELED,
EAI_FAIL = c.UV_EAI_FAIL,
EAI_FAMILY = c.UV_EAI_FAMILY,
EAI_MEMORY = c.UV_EAI_MEMORY,
EAI_NODATA = c.UV_EAI_NODATA,
EAI_NONAME = c.UV_EAI_NONAME,
EAI_OVERFLOW = c.UV_EAI_OVERFLOW,
EAI_PROTOCOL = c.UV_EAI_PROTOCOL,
EAI_SERVICE = c.UV_EAI_SERVICE,
EAI_SOCKTYPE = c.UV_EAI_SOCKTYPE,
EALREADY = c.UV_EALREADY,
EBADF = c.UV_EBADF,
EBUSY = c.UV_EBUSY,
ECANCELED = c.UV_ECANCELED,
ECHARSET = c.UV_ECHARSET,
ECONNABORTED = c.UV_ECONNABORTED,
ECONNREFUSED = c.UV_ECONNREFUSED,
ECONNRESET = c.UV_ECONNRESET,
EDESTADDRREQ = c.UV_EDESTADDRREQ,
EEXIST = c.UV_EEXIST,
EFAULT = c.UV_EFAULT,
EFBIG = c.UV_EFBIG,
EHOSTUNREACH = c.UV_EHOSTUNREACH,
EINTR = c.UV_EINTR,
EINVAL = c.UV_EINVAL,
EIO = c.UV_EIO,
EISCONN = c.UV_EISCONN,
EISDIR = c.UV_EISDIR,
ELOOP = c.UV_ELOOP,
EMFILE = c.UV_EMFILE,
EMSGSIZE = c.UV_EMSGSIZE,
ENAMETOOLONG = c.UV_ENAMETOOLONG,
ENETDOWN = c.UV_ENETDOWN,
ENETUNREACH = c.UV_ENETUNREACH,
ENFILE = c.UV_ENFILE,
ENOBUFS = c.UV_ENOBUFS,
ENODEV = c.UV_ENODEV,
ENOENT = c.UV_ENOENT,
ENOMEM = c.UV_ENOMEM,
ENONET = c.UV_ENONET,
ENOPROTOOPT = c.UV_ENOPROTOOPT,
ENOSPC = c.UV_ENOSPC,
ENOSYS = c.UV_ENOSYS,
ENOTCONN = c.UV_ENOTCONN,
ENOTDIR = c.UV_ENOTDIR,
ENOTEMPTY = c.UV_ENOTEMPTY,
ENOTSOCK = c.UV_ENOTSOCK,
ENOTSUP = c.UV_ENOTSUP,
EPERM = c.UV_EPERM,
EPIPE = c.UV_EPIPE,
EPROTO = c.UV_EPROTO,
EPROTONOSUPPORT = c.UV_EPROTONOSUPPORT,
EPROTOTYPE = c.UV_EPROTOTYPE,
ERANGE = c.UV_ERANGE,
EROFS = c.UV_EROFS,
ESHUTDOWN = c.UV_ESHUTDOWN,
ESPIPE = c.UV_ESPIPE,
ESRCH = c.UV_ESRCH,
ETIMEDOUT = c.UV_ETIMEDOUT,
ETXTBSY = c.UV_ETXTBSY,
EXDEV = c.UV_EXDEV,
UNKNOWN = c.UV_UNKNOWN,
EOF = c.UV_EOF,
ENXIO = c.UV_ENXIO,
EHOSTDOWN = c.UV_EHOSTDOWN,
EREMOTEIO = c.UV_EREMOTEIO,
ENOTTY = c.UV_ENOTTY,
EFTYPE = c.UV_EFTYPE,
EILSEQ = c.UV_EILSEQ,
ESOCKTNOSUPPORT = c.UV_ESOCKTNOSUPPORT,
};
/// Errors that libuv can produce.
///
/// http://docs.libuv.org/en/v1.x/errors.html
pub const Error = blk: {
// We produce these from the Errno enum so that we can easily
// keep it synced.
const info = @typeInfo(Errno).Enum;
var errors: [info.fields.len]std.builtin.Type.Error = undefined;
for (info.fields) |field, i| {
errors[i] = .{ .name = field.name };
}
break :blk @Type(.{ .ErrorSet = &errors });
};
/// Convert the result of a libuv API call to an error (or no error).
pub fn convertError(r: i32) !void {
if (r >= 0) return;
return switch (@intToEnum(Errno, r)) {
.E2BIG => Error.E2BIG,
.EACCES => Error.EACCES,
.EADDRINUSE => Error.EADDRINUSE,
.EADDRNOTAVAIL => Error.EADDRNOTAVAIL,
.EAFNOSUPPORT => Error.EAFNOSUPPORT,
.EAGAIN => Error.EAGAIN,
.EAI_ADDRFAMILY => Error.EAI_ADDRFAMILY,
.EAI_AGAIN => Error.EAI_AGAIN,
.EAI_BADFLAGS => Error.EAI_BADFLAGS,
.EAI_BADHINTS => Error.EAI_BADHINTS,
.EAI_CANCELED => Error.EAI_CANCELED,
.EAI_FAIL => Error.EAI_FAIL,
.EAI_FAMILY => Error.EAI_FAMILY,
.EAI_MEMORY => Error.EAI_MEMORY,
.EAI_NODATA => Error.EAI_NODATA,
.EAI_NONAME => Error.EAI_NONAME,
.EAI_OVERFLOW => Error.EAI_OVERFLOW,
.EAI_PROTOCOL => Error.EAI_PROTOCOL,
.EAI_SERVICE => Error.EAI_SERVICE,
.EAI_SOCKTYPE => Error.EAI_SOCKTYPE,
.EALREADY => Error.EALREADY,
.EBADF => Error.EBADF,
.EBUSY => Error.EBUSY,
.ECANCELED => Error.ECANCELED,
.ECHARSET => Error.ECHARSET,
.ECONNABORTED => Error.ECONNABORTED,
.ECONNREFUSED => Error.ECONNREFUSED,
.ECONNRESET => Error.ECONNRESET,
.EDESTADDRREQ => Error.EDESTADDRREQ,
.EEXIST => Error.EEXIST,
.EFAULT => Error.EFAULT,
.EFBIG => Error.EFBIG,
.EHOSTUNREACH => Error.EHOSTUNREACH,
.EINTR => Error.EINTR,
.EINVAL => Error.EINVAL,
.EIO => Error.EIO,
.EISCONN => Error.EISCONN,
.EISDIR => Error.EISDIR,
.ELOOP => Error.ELOOP,
.EMFILE => Error.EMFILE,
.EMSGSIZE => Error.EMSGSIZE,
.ENAMETOOLONG => Error.ENAMETOOLONG,
.ENETDOWN => Error.ENETDOWN,
.ENETUNREACH => Error.ENETUNREACH,
.ENFILE => Error.ENFILE,
.ENOBUFS => Error.ENOBUFS,
.ENODEV => Error.ENODEV,
.ENOENT => Error.ENOENT,
.ENOMEM => Error.ENOMEM,
.ENONET => Error.ENONET,
.ENOPROTOOPT => Error.ENOPROTOOPT,
.ENOSPC => Error.ENOSPC,
.ENOSYS => Error.ENOSYS,
.ENOTCONN => Error.ENOTCONN,
.ENOTDIR => Error.ENOTDIR,
.ENOTEMPTY => Error.ENOTEMPTY,
.ENOTSOCK => Error.ENOTSOCK,
.ENOTSUP => Error.ENOTSUP,
.EPERM => Error.EPERM,
.EPIPE => Error.EPIPE,
.EPROTO => Error.EPROTO,
.EPROTONOSUPPORT => Error.EPROTONOSUPPORT,
.EPROTOTYPE => Error.EPROTOTYPE,
.ERANGE => Error.ERANGE,
.EROFS => Error.EROFS,
.ESHUTDOWN => Error.ESHUTDOWN,
.ESPIPE => Error.ESPIPE,
.ESRCH => Error.ESRCH,
.ETIMEDOUT => Error.ETIMEDOUT,
.ETXTBSY => Error.ETXTBSY,
.EXDEV => Error.EXDEV,
.UNKNOWN => Error.UNKNOWN,
.EOF => Error.EOF,
.ENXIO => Error.ENXIO,
.EHOSTDOWN => Error.EHOSTDOWN,
.EREMOTEIO => Error.EREMOTEIO,
.ENOTTY => Error.ENOTTY,
.EFTYPE => Error.EFTYPE,
.EILSEQ => Error.EILSEQ,
.ESOCKTNOSUPPORT => Error.ESOCKTNOSUPPORT,
};
}
test {
// This is mostly just forcing our error type and function to be
// codegenned and run once to ensure we have all the types.
try testing.expectError(Error.EFTYPE, convertError(c.UV_EFTYPE));
}

View File

@ -1,80 +0,0 @@
const c = @import("c.zig");
const Loop = @import("Loop.zig");
const errors = @import("error.zig");
/// Returns a struct that has all the shared handle functions for the
/// given handle type T. The type T must have a field named "handle".
/// This is expected to be used with usingnamespace to add the shared
/// handler functions to other handle types.
pub fn Handle(comptime T: type) type {
// 1. T should be a struct
// 2. First field should be the handle pointer
return struct {
// note: this has to be here: https://github.com/ziglang/zig/issues/11367
const tInfo = @typeInfo(T).Struct;
const HandleType = tInfo.fields[0].type;
// Request handle to be closed. close_cb will be called asynchronously
// after this call. This MUST be called on each handle before memory
// is released. Moreover, the memory can only be released in close_cb
// or after it has returned.
//
// Handles that wrap file descriptors are closed immediately but
// close_cb will still be deferred to the next iteration of the event
// loop. It gives you a chance to free up any resources associated with
// the handle.
//
// In-progress requests, like uv_connect_t or uv_write_t, are cancelled
// and have their callbacks called asynchronously with status=UV_ECANCELED.
pub fn close(self: T, comptime cb: ?fn (*T) void) void {
const cbParam = if (cb) |f|
(struct {
pub fn callback(handle: [*c]c.uv_handle_t) callconv(.C) void {
// We get the raw handle, so we need to reconstruct
// the T. This is mutable because a lot of the libuv APIs
// are non-const but modifying it makes no sense.
var param: T = .{ .handle = @ptrCast(HandleType, handle) };
@call(.always_inline, f, .{&param});
}
}).callback
else
null;
c.uv_close(@ptrCast(*c.uv_handle_t, self.handle), cbParam);
}
/// Loop returns the loop that this handle is a part of.
pub fn loop(self: T) Loop {
const handle = @ptrCast(*c.uv_handle_t, self.handle);
return .{ .loop = c.uv_handle_get_loop(handle) };
}
/// Returns non-zero if the handle is active, zero if its inactive.
/// Rule of thumb: if a handle of type uv_foo_t has a uv_foo_start()
/// function, then its active from the moment that function is called.
/// Likewise, uv_foo_stop() deactivates the handle again.
pub fn isActive(self: T) !bool {
const res = c.uv_is_active(@ptrCast(*c.uv_handle_t, self.handle));
try errors.convertError(res);
return res > 0;
}
/// Sets handle->data to data.
pub fn setData(self: T, pointer: ?*anyopaque) void {
c.uv_handle_set_data(
@ptrCast(*c.uv_handle_t, self.handle),
pointer,
);
}
/// Returns handle->data.
pub fn getData(self: T, comptime DT: type) ?*DT {
return if (c.uv_handle_get_data(@ptrCast(*c.uv_handle_t, self.handle))) |ptr|
@ptrCast(?*DT, @alignCast(@alignOf(DT), ptr))
else
null;
}
};
}

View File

@ -1,44 +0,0 @@
const std = @import("std");
const stream = @import("stream.zig");
pub const c = @import("c.zig");
pub const Loop = @import("Loop.zig");
pub const Async = @import("Async.zig");
pub const Idle = @import("Idle.zig");
pub const Pipe = @import("Pipe.zig");
pub const Prepare = @import("Prepare.zig");
pub const Timer = @import("Timer.zig");
pub const Tty = @import("Tty.zig");
pub const Cond = @import("Cond.zig");
pub const Mutex = @import("Mutex.zig");
pub const Sem = @import("Sem.zig");
pub const Thread = @import("Thread.zig");
pub const WriteReq = stream.WriteReq;
pub const Embed = @import("Embed.zig");
pub usingnamespace @import("error.zig");
test {
// Leak a loop... I don't know why but this fixes CI failures. Probably
// a miscompilation or something. TODO: double check this once self-hosted
// lands to see if we need this.
_ = try Loop.init(std.heap.page_allocator);
_ = @import("tests.zig");
_ = stream;
_ = Loop;
_ = Async;
_ = Idle;
_ = Prepare;
_ = Pipe;
_ = Timer;
_ = Tty;
_ = Cond;
_ = Mutex;
_ = Sem;
_ = Thread;
_ = Embed;
}

View File

@ -1,184 +0,0 @@
const c = @import("c.zig");
const std = @import("std");
const Allocator = std.mem.Allocator;
const testing = std.testing;
const Loop = @import("Loop.zig");
const errors = @import("error.zig");
const Error = errors.Error;
/// Returns a struct that has all the shared stream functions for the
/// given stream type T. The type T must have a field named "handle".
/// This is expected to be used with usingnamespace to add the shared
/// stream functions to other handle types.
pub fn Stream(comptime T: type) type {
// 1. T should be a struct
// 2. First field should be the handle pointer
return struct {
// note: this has to be here: https://github.com/ziglang/zig/issues/11367
const tInfo = @typeInfo(T).Struct;
const HandleType = tInfo.fields[0].type;
/// Returns 1 if the stream is readable, 0 otherwise.
pub fn isReadable(self: T) !bool {
const res = c.uv_is_readable(@ptrCast(*c.uv_stream_t, self.handle));
try errors.convertError(res);
return res > 0;
}
/// Returns 1 if the stream is writable, 0 otherwise.
pub fn isWritable(self: T) !bool {
const res = c.uv_is_writable(@ptrCast(*c.uv_stream_t, self.handle));
try errors.convertError(res);
return res > 0;
}
/// Write data to stream. Buffers are written in order.
pub fn write(
self: T,
req: WriteReq,
bufs: []const []const u8,
comptime cb: fn (req: *WriteReq, status: i32) void,
) !void {
const Wrapper = struct {
fn callback(cbreq: [*c]c.uv_write_t, status: c_int) callconv(.C) void {
var newreq: WriteReq = .{ .req = cbreq };
@call(.always_inline, cb, .{
&newreq,
@intCast(i32, status),
});
}
};
// We can directly ptrCast bufs.ptr to a C pointer of uv_buf_t
// because they have the exact same layout in memory. We have a
// unit test below that keeps this true.
try errors.convertError(c.uv_write(
req.req,
@ptrCast(*c.uv_stream_t, self.handle),
@ptrCast([*c]const c.uv_buf_t, bufs.ptr),
@intCast(c_uint, bufs.len),
Wrapper.callback,
));
}
/// Same as uv_write(), but wont queue a write request if it cant
/// be completed immediately.
pub fn tryWrite(self: T, bufs: []const []const u8) !usize {
const res = c.uv_try_write(
@ptrCast(*c.uv_stream_t, self.handle),
@ptrCast([*c]const c.uv_buf_t, bufs.ptr),
@intCast(c_uint, bufs.len),
);
try errors.convertError(res);
return @intCast(usize, res);
}
/// Read data from an incoming stream. The uv_read_cb callback will
/// be made several times until there is no more data to read or
/// uv_read_stop() is called.
pub fn readStart(
self: T,
comptime alloc_cb: fn (self: *T, size: usize) ?[]u8,
comptime read_cb: fn (self: *T, nread: isize, buf: []const u8) void,
) !void {
const Wrapper = struct {
fn alloc(
cbhandle: [*c]c.uv_handle_t,
cbsize: usize,
buf: [*c]c.uv_buf_t,
) callconv(.C) void {
var param: T = .{ .handle = @ptrCast(HandleType, cbhandle) };
const result = @call(.always_inline, alloc_cb, .{
&param,
cbsize,
});
if (result) |slice| {
buf.* = .{
.base = slice.ptr,
.len = slice.len,
};
} else {
buf.* = .{ .base = null, .len = 0 };
}
}
fn read(
cbhandle: [*c]c.uv_stream_t,
cbnread: isize,
cbbuf: [*c]const c.uv_buf_t,
) callconv(.C) void {
var param: T = .{ .handle = @ptrCast(HandleType, cbhandle) };
@call(.always_inline, read_cb, .{
&param,
cbnread,
cbbuf.*.base[0..cbbuf.*.len],
});
}
};
try errors.convertError(c.uv_read_start(
@ptrCast(*c.uv_stream_t, self.handle),
Wrapper.alloc,
Wrapper.read,
));
}
/// Stop reading data from the stream. The uv_read_cb callback will
/// no longer be called.
///
/// This function is idempotent and may be safely called on a stopped
/// stream.
pub fn readStop(self: T) void {
// Docs say we can ignore this result.
_ = c.uv_read_stop(@ptrCast(*c.uv_stream_t, self.handle));
}
};
}
/// Write request type. Careful attention must be paid when reusing objects
/// of this type. When a stream is in non-blocking mode, write requests sent
/// with uv_write will be queued. Reusing objects at this point is undefined
/// behaviour. It is safe to reuse the uv_write_t object only after the
/// callback passed to uv_write is fired.
pub const WriteReq = struct {
/// This is the underlying type that WriteReq wraps. This is exposed
/// so that you can pre-allocate the type and wrap it in a WrapReq.
pub const T = c.uv_write_t;
req: *T,
pub fn init(alloc: Allocator) !WriteReq {
var req = try alloc.create(c.uv_write_t);
errdefer alloc.destroy(req);
return WriteReq{ .req = req };
}
pub fn deinit(self: *WriteReq, alloc: Allocator) void {
alloc.destroy(self.req);
self.* = undefined;
}
/// Pointer to the stream where this write request is running.
/// T should be a high-level handle type such as "Pipe".
pub fn handle(self: WriteReq, comptime HT: type) ?HT {
const tInfo = @typeInfo(HT).Struct;
const HandleType = tInfo.fields[0].type;
return if (self.req.handle) |ptr|
return HT{ .handle = @ptrCast(HandleType, ptr) }
else
null;
}
test "Write: create and destroy" {
var h = try init(testing.allocator);
defer h.deinit(testing.allocator);
}
};
test {
_ = WriteReq;
}

View File

@ -1,37 +0,0 @@
//! This file contains other behavior tests for the libuv integration.
//! We trust that libuv works, but still test some behaviors to ensure
//! that our wrappers around libuv are working as expected.
const std = @import("std");
const testing = std.testing;
const libuv = @import("main.zig");
test "Async: cancel timer" {
const alloc = testing.allocator;
var loop = try libuv.Loop.init(alloc);
defer loop.deinit(alloc);
var timer = try libuv.Timer.init(alloc, loop);
defer timer.deinit(alloc);
// Start a timer with a long timeout. This will block our loop.
try timer.start((struct {
fn callback(_: *libuv.Timer) void {}
}).callback, 5000, 5000);
var async_handle = try libuv.Async.init(testing.allocator, loop, (struct {
fn callback(v: *libuv.Async) void {
v.loop().stop();
v.close(null);
}
}).callback);
defer async_handle.deinit(testing.allocator);
try async_handle.send();
// This run through the loop should exit because we called loop stop.
_ = try loop.run(.default);
// We need to run the loop one more time to handle all our close callbacks.
timer.close(null);
_ = try loop.run(.default);
}

View File

@ -439,15 +439,14 @@ pub fn create(alloc: Allocator, app: *App, config: *const Config) !*Window {
}
pub fn destroy(self: *Window) void {
// Stop rendering thread
{
// Stop rendering thread
self.renderer_thread.stop.send() catch |err|
self.renderer_thread.stop.notify() catch |err|
log.err("error notifying renderer thread to stop, may stall err={}", .{err});
self.renderer_thr.join();
// We need to become the active rendering thread again
self.renderer.threadEnter(self.window) catch unreachable;
self.renderer_thread.deinit();
// If we are devmode-owning, clean that up.
if (DevMode.enabled and DevMode.instance.window == self) {
@ -460,22 +459,22 @@ pub fn destroy(self: *Window) void {
// Uninitialize imgui
self.imgui_ctx.destroy();
}
// Deinit our renderer
self.renderer.deinit();
}
// Stop our IO thread
{
// Stop our IO thread
self.io_thread.stop.send() catch |err|
self.io_thread.stop.notify() catch |err|
log.err("error notifying io thread to stop, may stall err={}", .{err});
self.io_thr.join();
self.io_thread.deinit();
// Deinitialize our terminal IO
self.io.deinit();
}
// We need to deinit AFTER everything is stopped, since there are
// shared values between the two threads.
self.renderer_thread.deinit();
self.renderer.deinit();
self.io_thread.deinit();
self.io.deinit();
self.window.deinit();
self.font_group.deinit(self.alloc);
@ -582,7 +581,7 @@ fn clipboardRead(self: *const Window, kind: u8) !void {
self.alloc,
buf,
), .{ .forever = {} });
self.io_thread.wakeup.send() catch {};
self.io_thread.wakeup.notify() catch {};
}
fn clipboardWrite(self: *const Window, data: []const u8) !void {
@ -629,7 +628,7 @@ fn setCellSize(self: *Window, size: renderer.CellSize) !void {
.padding = self.padding,
},
}, .{ .forever = {} });
self.io_thread.wakeup.send() catch {};
self.io_thread.wakeup.notify() catch {};
}
/// Change the font size.
@ -652,7 +651,7 @@ pub fn setFontSize(self: *Window, size: font.face.DesiredSize) void {
/// isn't guaranteed to happen immediately but it will happen as soon as
/// practical.
fn queueRender(self: *const Window) !void {
try self.renderer_thread.wakeup.send();
try self.renderer_thread.wakeup.notify();
}
pub fn sizeCallback(self: *Window, size: apprt.WindowSize) !void {
@ -696,7 +695,7 @@ pub fn sizeCallback(self: *Window, size: apprt.WindowSize) !void {
.padding = self.padding,
},
}, .{ .forever = {} });
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
pub fn charCallback(self: *Window, codepoint: u21) !void {
@ -746,7 +745,7 @@ pub fn charCallback(self: *Window, codepoint: u21) !void {
}, .{ .forever = {} });
// After sending all our messages we have to notify our IO thread
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
pub fn keyCallback(
@ -793,7 +792,7 @@ pub fn keyCallback(
_ = self.io_thread.mailbox.push(.{
.write_stable = data,
}, .{ .forever = {} });
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
},
.cursor_key => |ck| {
@ -816,7 +815,7 @@ pub fn keyCallback(
}, .{ .forever = {} });
}
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
},
.copy_to_clipboard => {
@ -870,7 +869,7 @@ pub fn keyCallback(
}, .{ .forever = {} });
}
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
},
@ -1008,7 +1007,7 @@ pub fn keyCallback(
}, .{ .forever = {} });
// After sending all our messages we have to notify our IO thread
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
}
}
@ -1264,7 +1263,7 @@ fn mouseReport(
}
// After sending all our messages we have to notify our IO thread
try self.io_thread.wakeup.send();
try self.io_thread.wakeup.notify();
}
pub fn mouseButtonCallback(

View File

@ -55,25 +55,24 @@ pub fn BlockingQueue(
};
/// Our data. The values are undefined until they are written.
data: [bounds]T,
data: [bounds]T = undefined,
/// The next location to write (next empty loc) and next location
/// to read (next non-empty loc). The number of written elements.
write: Size,
read: Size,
len: Size,
write: Size = 0,
read: Size = 0,
len: Size = 0,
/// The big mutex that must be held to read/write.
mutex: std.Thread.Mutex,
mutex: std.Thread.Mutex = .{},
/// A CV for being notified when the queue is no longer full. This is
/// used for writing. Note we DON'T have a CV for waiting on the
/// queue not being EMPTY because we use external notifiers for that.
cond_not_full: std.Thread.Condition,
not_full_waiters: usize,
cond_not_full: std.Thread.Condition = .{},
not_full_waiters: usize = 0,
/// Allocate the blocking queue. Allocation must always happen on
/// the heap due to shared concurrency state.
/// Allocate the blocking queue on the heap.
pub fn create(alloc: Allocator) !*Self {
const ptr = try alloc.create(Self);
errdefer alloc.destroy(ptr);

View File

@ -7,6 +7,7 @@ const freetype = @import("freetype");
const harfbuzz = @import("harfbuzz");
const macos = @import("macos");
const tracy = @import("tracy");
const xev = @import("xev");
const renderer = @import("renderer.zig");
const xdg = @import("xdg.zig");
const internal_os = @import("os/main.zig");
@ -22,6 +23,7 @@ pub fn main() !void {
std.log.info("dependency fontconfig={d}", .{fontconfig.version()});
}
std.log.info("renderer={}", .{renderer.Renderer});
std.log.info("libxev backend={}", .{xev.backend});
// First things first, we fix our file descriptors
internal_os.fixMaxFiles();

View File

@ -4,7 +4,7 @@ pub const Thread = @This();
const std = @import("std");
const builtin = @import("builtin");
const libuv = @import("libuv");
const xev = @import("xev");
const renderer = @import("../renderer.zig");
const apprt = @import("../apprt.zig");
const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue;
@ -14,28 +14,38 @@ const trace = tracy.trace;
const Allocator = std.mem.Allocator;
const log = std.log.scoped(.renderer_thread);
const CURSOR_BLINK_INTERVAL = 600;
/// The type used for sending messages to the IO thread. For now this is
/// hardcoded with a capacity. We can make this a comptime parameter in
/// the future if we want it configurable.
pub const Mailbox = BlockingQueue(renderer.Message, 64);
/// Allocator used for some state
alloc: std.mem.Allocator,
/// The main event loop for the application. The user data of this loop
/// is always the allocator used to create the loop. This is a convenience
/// so that users of the loop always have an allocator.
loop: libuv.Loop,
loop: xev.Loop,
/// This can be used to wake up the renderer and force a render safely from
/// any thread.
wakeup: libuv.Async,
wakeup: xev.Async,
wakeup_c: xev.Completion = .{},
/// This can be used to stop the renderer on the next loop iteration.
stop: libuv.Async,
stop: xev.Async,
stop_c: xev.Completion = .{},
/// The timer used for rendering
render_h: libuv.Timer,
render_h: xev.Timer,
render_c: xev.Completion = .{},
/// The timer used for cursor blinking
cursor_h: libuv.Timer,
cursor_h: xev.Timer,
cursor_c: xev.Completion = .{},
cursor_c_cancel: xev.Completion = .{},
/// The window we're rendering to.
window: apprt.runtime.Window,
@ -59,62 +69,32 @@ pub fn init(
renderer_impl: *renderer.Renderer,
state: *renderer.State,
) !Thread {
// We always store allocator pointer on the loop data so that
// handles can use our global allocator.
const allocPtr = try alloc.create(Allocator);
errdefer alloc.destroy(allocPtr);
allocPtr.* = alloc;
// Create our event loop.
var loop = try libuv.Loop.init(alloc);
errdefer {
// Run the loop once to close any of our handles
_ = loop.run(.nowait) catch 0;
loop.deinit(alloc);
}
loop.setData(allocPtr);
var loop = try xev.Loop.init(.{});
errdefer loop.deinit();
// This async handle is used to "wake up" the renderer and force a render.
var wakeup_h = try libuv.Async.init(alloc, loop, wakeupCallback);
errdefer wakeup_h.close((struct {
fn callback(h: *libuv.Async) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var wakeup_h = try xev.Async.init();
errdefer wakeup_h.deinit();
// This async handle is used to stop the loop and force the thread to end.
var stop_h = try libuv.Async.init(alloc, loop, stopCallback);
errdefer stop_h.close((struct {
fn callback(h: *libuv.Async) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var stop_h = try xev.Async.init();
errdefer stop_h.deinit();
// The primary timer for rendering.
var render_h = try libuv.Timer.init(alloc, loop);
errdefer render_h.close((struct {
fn callback(h: *libuv.Timer) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var render_h = try xev.Timer.init();
errdefer render_h.deinit();
// Setup a timer for blinking the cursor
var cursor_timer = try libuv.Timer.init(alloc, loop);
errdefer cursor_timer.close((struct {
fn callback(t: *libuv.Timer) void {
const alloc_h = t.loop().getData(Allocator).?.*;
t.deinit(alloc_h);
}
}).callback);
var cursor_timer = try xev.Timer.init();
errdefer cursor_timer.deinit();
// The mailbox for messaging this thread
var mailbox = try Mailbox.create(alloc);
errdefer mailbox.destroy(alloc);
return Thread{
.alloc = alloc,
.loop = loop,
.wakeup = wakeup_h,
.stop = stop_h,
@ -130,49 +110,14 @@ pub fn init(
/// Clean up the thread. This is only safe to call once the thread
/// completes executing; the caller must join prior to this.
pub fn deinit(self: *Thread) void {
// Get a copy to our allocator
const alloc_ptr = self.loop.getData(Allocator).?;
const alloc = alloc_ptr.*;
// Schedule our handles to close
self.stop.close((struct {
fn callback(h: *libuv.Async) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
self.wakeup.close((struct {
fn callback(h: *libuv.Async) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
self.render_h.close((struct {
fn callback(h: *libuv.Timer) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
self.cursor_h.close((struct {
fn callback(h: *libuv.Timer) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
// Run the loop one more time, because destroying our other things
// like windows usually cancel all our event loop stuff and we need
// one more run through to finalize all the closes.
_ = self.loop.run(.default) catch |err|
log.err("error finalizing event loop: {}", .{err});
self.stop.deinit();
self.wakeup.deinit();
self.render_h.deinit();
self.cursor_h.deinit();
self.loop.deinit();
// Nothing can possibly access the mailbox anymore, destroy it.
self.mailbox.destroy(alloc);
// Dealloc our allocator copy
alloc.destroy(alloc_ptr);
self.loop.deinit(alloc);
self.mailbox.destroy(self.alloc);
}
/// The main entrypoint for the thread.
@ -193,44 +138,49 @@ fn threadMain_(self: *Thread) !void {
try self.renderer.threadEnter(self.window);
defer self.renderer.threadExit();
// Set up our async handler to support rendering
self.wakeup.setData(self);
defer self.wakeup.setData(null);
// Start the async handlers
self.wakeup.wait(&self.loop, &self.wakeup_c, Thread, self, wakeupCallback);
self.stop.wait(&self.loop, &self.stop_c, Thread, self, stopCallback);
// Set up our timer and start it for rendering
self.render_h.setData(self);
defer self.render_h.setData(null);
try self.wakeup.send();
// Send an initial wakeup message so that we render right away.
try self.wakeup.notify();
// Setup a timer for blinking the cursor
self.cursor_h.setData(self);
try self.cursor_h.start(cursorTimerCallback, 600, 600);
// Start blinking the cursor.
self.cursor_h.run(
&self.loop,
&self.cursor_c,
CURSOR_BLINK_INTERVAL,
Thread,
self,
cursorTimerCallback,
);
// If we are using tracy, then we setup a prepare handle so that
// we can mark the frame.
var frame_h: libuv.Prepare = if (!tracy.enabled) undefined else frame_h: {
const alloc_ptr = self.loop.getData(Allocator).?;
const alloc = alloc_ptr.*;
const h = try libuv.Prepare.init(alloc, self.loop);
h.setData(self);
try h.start(prepFrameCallback);
break :frame_h h;
};
defer if (tracy.enabled) {
frame_h.close((struct {
fn callback(h: *libuv.Prepare) void {
const alloc_h = h.loop().getData(Allocator).?.*;
h.deinit(alloc_h);
}
}).callback);
_ = self.loop.run(.nowait) catch {};
};
// TODO
// var frame_h: libuv.Prepare = if (!tracy.enabled) undefined else frame_h: {
// const alloc_ptr = self.loop.getData(Allocator).?;
// const alloc = alloc_ptr.*;
// const h = try libuv.Prepare.init(alloc, self.loop);
// h.setData(self);
// try h.start(prepFrameCallback);
//
// break :frame_h h;
// };
// defer if (tracy.enabled) {
// frame_h.close((struct {
// fn callback(h: *libuv.Prepare) void {
// const alloc_h = h.loop().getData(Allocator).?.*;
// h.deinit(alloc_h);
// }
// }).callback);
// _ = self.loop.run(.nowait) catch {};
// };
// Run
log.debug("starting renderer thread", .{});
defer log.debug("exiting renderer thread", .{});
_ = try self.loop.run(.default);
_ = try self.loop.run(.until_done);
}
/// Drain the mailbox.
@ -247,16 +197,30 @@ fn drainMailbox(self: *Thread) !void {
if (!v) {
// If we're not focused, then we stop the cursor blink
try self.cursor_h.stop();
if (self.cursor_c.state() == .active and
self.cursor_c_cancel.state() == .dead)
{
self.cursor_h.cancel(
&self.loop,
&self.cursor_c,
&self.cursor_c_cancel,
void,
null,
cursorCancelCallback,
);
}
} else {
// If we're focused, we immediately show the cursor again
// and then restart the timer.
if (!try self.cursor_h.isActive()) {
if (self.cursor_c.state() != .active) {
self.renderer.blinkCursor(true);
try self.cursor_h.start(
self.cursor_h.run(
&self.loop,
&self.cursor_c,
CURSOR_BLINK_INTERVAL,
Thread,
self,
cursorTimerCallback,
self.cursor_h.getRepeat(),
self.cursor_h.getRepeat(),
);
}
}
@ -264,8 +228,16 @@ fn drainMailbox(self: *Thread) !void {
.reset_cursor_blink => {
self.renderer.blinkCursor(true);
if (try self.cursor_h.isActive()) {
_ = try self.cursor_h.again();
if (self.cursor_c.state() == .active) {
self.cursor_h.reset(
&self.loop,
&self.cursor_c,
&self.cursor_c_cancel,
CURSOR_BLINK_INTERVAL,
Thread,
self,
cursorTimerCallback,
);
}
},
@ -280,15 +252,21 @@ fn drainMailbox(self: *Thread) !void {
}
}
fn wakeupCallback(h: *libuv.Async) void {
fn wakeupCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Async.WaitError!void,
) xev.CallbackAction {
_ = r catch |err| {
log.err("error in wakeup err={}", .{err});
return .rearm;
};
const zone = trace(@src());
defer zone.end();
const t = h.getData(Thread) orelse {
// This shouldn't happen so we log it.
log.warn("render callback fired without data set", .{});
return;
};
const t = self_.?;
// When we wake up, we check the mailbox. Mailbox producers should
// wake up our thread after publishing.
@ -296,48 +274,104 @@ fn wakeupCallback(h: *libuv.Async) void {
log.err("error draining mailbox err={}", .{err});
// If the timer is already active then we don't have to do anything.
const active = t.render_h.isActive() catch true;
if (active) return;
if (t.render_c.state() == .active) return .rearm;
// Timer is not active, let's start it
t.render_h.start(renderCallback, 10, 0) catch |err|
log.warn("render timer failed to start err={}", .{err});
t.render_h.run(
&t.loop,
&t.render_c,
10,
Thread,
t,
renderCallback,
);
return .rearm;
}
fn renderCallback(h: *libuv.Timer) void {
fn renderCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Timer.RunError!void,
) xev.CallbackAction {
const zone = trace(@src());
defer zone.end();
const t = h.getData(Thread) orelse {
_ = r catch unreachable;
const t = self_ orelse {
// This shouldn't happen so we log it.
log.warn("render callback fired without data set", .{});
return;
return .disarm;
};
t.renderer.render(t.window, t.state) catch |err|
log.warn("error rendering err={}", .{err});
return .disarm;
}
fn cursorTimerCallback(h: *libuv.Timer) void {
fn cursorTimerCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Timer.RunError!void,
) xev.CallbackAction {
const zone = trace(@src());
defer zone.end();
const t = h.getData(Thread) orelse {
_ = r catch |err| switch (err) {
// This is sent when our timer is canceled. That's fine.
error.Canceled => return .disarm,
else => {
log.warn("error in cursor timer callback err={}", .{err});
unreachable;
},
};
const t = self_ orelse {
// This shouldn't happen so we log it.
log.warn("render callback fired without data set", .{});
return;
return .disarm;
};
t.renderer.blinkCursor(false);
t.wakeup.send() catch {};
t.wakeup.notify() catch {};
t.cursor_h.run(&t.loop, &t.cursor_c, CURSOR_BLINK_INTERVAL, Thread, t, cursorTimerCallback);
return .disarm;
}
fn prepFrameCallback(h: *libuv.Prepare) void {
_ = h;
fn cursorCancelCallback(
_: ?*void,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Timer.CancelError!void,
) xev.CallbackAction {
_ = r catch |err| switch (err) {
error.NotFound => {},
else => {
log.warn("error in cursor cancel callback err={}", .{err});
unreachable;
},
};
tracy.frameMark();
return .disarm;
}
fn stopCallback(h: *libuv.Async) void {
h.loop().stop();
// fn prepFrameCallback(h: *libuv.Prepare) void {
// _ = h;
//
// tracy.frameMark();
// }
fn stopCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Async.WaitError!void,
) xev.CallbackAction {
_ = r catch unreachable;
self_.?.loop.stop();
return .disarm;
}

View File

@ -6,6 +6,7 @@ pub usingnamespace @import("termio/message.zig");
pub const Exec = @import("termio/Exec.zig");
pub const Options = @import("termio/Options.zig");
pub const Thread = @import("termio/Thread.zig");
pub const Mailbox = Thread.Mailbox;
/// The implementation to use for the IO. This is just "exec" for now but
/// this is somewhat pluggable so that in the future we can introduce other

View File

@ -11,7 +11,7 @@ const Window = @import("../Window.zig");
const Pty = @import("../Pty.zig");
const SegmentedPool = @import("../segmented_pool.zig").SegmentedPool;
const terminal = @import("../terminal/main.zig");
const libuv = @import("libuv");
const xev = @import("xev");
const renderer = @import("../renderer.zig");
const tracy = @import("tracy");
const trace = tracy.trace;
@ -29,10 +29,7 @@ const c = @cImport({
alloc: Allocator,
/// This is the pty fd created for the subcommand.
pty: Pty,
/// This is the container for the subcommand.
command: Command,
subprocess: Subprocess,
/// The terminal emulator internal state. This is the abstract "terminal"
/// that manages input, grid updating, etc. and is renderer-agnostic. It
@ -48,7 +45,7 @@ renderer_state: *renderer.State,
/// A handle to wake up the renderer. This hints to the renderer that that
/// a repaint should happen.
renderer_wakeup: libuv.Async,
renderer_wakeup: xev.Async,
/// The mailbox for notifying the renderer of things.
renderer_mailbox: *renderer.Thread.Mailbox,
@ -65,71 +62,6 @@ data: ?*EventData,
/// Initialize the exec implementation. This will also start the child
/// process.
pub fn init(alloc: Allocator, opts: termio.Options) !Exec {
// Create our pty
var pty = try Pty.open(.{
.ws_row = @intCast(u16, opts.grid_size.rows),
.ws_col = @intCast(u16, opts.grid_size.columns),
.ws_xpixel = @intCast(u16, opts.screen_size.width),
.ws_ypixel = @intCast(u16, opts.screen_size.height),
});
errdefer pty.deinit();
// Determine the path to the binary we're executing
const path = (try Command.expandPath(alloc, opts.config.command orelse "sh")) orelse
return error.CommandNotFound;
defer alloc.free(path);
// Set our env vars
var env = try std.process.getEnvMap(alloc);
defer env.deinit();
try env.put("TERM", "xterm-256color");
try env.put("COLORTERM", "truecolor");
// On macOS, we launch the program as a login shell. This is a Mac-specific
// behavior (see other terminals). Terminals in general should NOT be
// spawning login shells because well... we're not "logging in." The solution
// is to put dotfiles in "rc" variants rather than "_login" variants. But,
// history!
var argv0_buf: []u8 = undefined;
const args: []const []const u8 = if (comptime builtin.target.isDarwin()) args: {
// Get rid of the path
const argv0 = if (std.mem.lastIndexOf(u8, path, "/")) |idx|
path[idx + 1 ..]
else
path;
// Copy it with a hyphen so its a login shell
argv0_buf = try alloc.alloc(u8, argv0.len + 1);
argv0_buf[0] = '-';
std.mem.copy(u8, argv0_buf[1..], argv0);
break :args &[_][]const u8{argv0_buf};
} else &[_][]const u8{path};
// We can free the args buffer since it is only used for start.
// cmd retains a dangling pointer but we're not supposed to access it.
defer if (comptime builtin.target.isDarwin()) alloc.free(argv0_buf);
// Build our subcommand
var cmd: Command = .{
.path = path,
.args = args,
.env = &env,
.cwd = opts.config.@"working-directory",
.stdin = .{ .handle = pty.slave },
.stdout = .{ .handle = pty.slave },
.stderr = .{ .handle = pty.slave },
.pre_exec = (struct {
fn callback(cmd: *Command) void {
const p = cmd.getData(Pty) orelse unreachable;
p.childPreExec() catch |err|
log.err("error initializing child: {}", .{err});
}
}).callback,
.data = &pty,
};
try cmd.start(alloc);
log.info("started subcommand path={s} pid={?}", .{ path, cmd.pid });
// Create our terminal
var term = try terminal.Terminal.init(
alloc,
@ -139,12 +71,14 @@ pub fn init(alloc: Allocator, opts: termio.Options) !Exec {
errdefer term.deinit(alloc);
term.color_palette = opts.config.palette.value;
var subprocess = try Subprocess.init(alloc, opts);
errdefer subprocess.deinit();
return Exec{
.alloc = alloc,
.pty = pty,
.command = cmd,
.terminal = term,
.terminal_stream = undefined,
.subprocess = subprocess,
.renderer_state = opts.renderer_state,
.renderer_wakeup = opts.renderer_wakeup,
.renderer_mailbox = opts.renderer_mailbox,
@ -155,76 +89,41 @@ pub fn init(alloc: Allocator, opts: termio.Options) !Exec {
}
pub fn deinit(self: *Exec) void {
// Kill our command
self.killCommand() catch |err|
log.err("error sending SIGHUP to command, may hang: {}", .{err});
_ = self.command.wait(false) catch |err|
log.err("error waiting for command to exit: {}", .{err});
self.subprocess.deinit(self.alloc);
// Clean up our other members
self.terminal.deinit(self.alloc);
}
/// Kill the underlying subprocess. This closes the pty file handle and
/// sends a SIGHUP to the child process. This doesn't wait for the child
/// process to be exited.
fn killCommand(self: *Exec) !void {
// Close our PTY
self.pty.deinit();
// We need to get our process group ID and send a SIGHUP to it.
if (self.command.pid) |pid| {
const pgid_: ?c.pid_t = pgid: {
const pgid = c.getpgid(pid);
// Don't know why it would be zero but its not a valid pid
if (pgid == 0) break :pgid null;
// If the pid doesn't exist then... okay.
if (pgid == c.ESRCH) break :pgid null;
// If we have an error...
if (pgid < 0) {
log.warn("error getting pgid for kill", .{});
break :pgid null;
}
break :pgid pgid;
};
if (pgid_) |pgid| {
if (c.killpg(pgid, c.SIGHUP) < 0) {
log.warn("error killing process group pgid={}", .{pgid});
return error.KillFailed;
}
}
}
}
pub fn threadEnter(self: *Exec, loop: libuv.Loop) !ThreadData {
pub fn threadEnter(self: *Exec, thread: *termio.Thread) !ThreadData {
assert(self.data == null);
const alloc = self.alloc;
// Get a copy to our allocator
const alloc_ptr = loop.getData(Allocator).?;
const alloc = alloc_ptr.*;
// Start our subprocess
const master_fd = try self.subprocess.start(alloc);
errdefer self.subprocess.stop();
// Setup our data that is used for callbacks
var ev_data_ptr = try alloc.create(EventData);
errdefer alloc.destroy(ev_data_ptr);
// Read data
var stream = try libuv.Tty.init(alloc, loop, self.pty.master);
errdefer stream.deinit(alloc);
stream.setData(ev_data_ptr);
try stream.readStart(ttyReadAlloc, ttyRead);
// Setup our stream so that we can write.
var stream = xev.Stream.initFd(master_fd);
errdefer stream.deinit();
// Wakeup watcher for the writer thread.
var wakeup = try xev.Async.init();
errdefer wakeup.deinit();
// Setup our event data before we start
ev_data_ptr.* = .{
.read_arena = std.heap.ArenaAllocator.init(alloc),
.writer_mailbox = thread.mailbox,
.writer_wakeup = thread.wakeup,
.renderer_state = self.renderer_state,
.renderer_wakeup = self.renderer_wakeup,
.renderer_mailbox = self.renderer_mailbox,
.data_stream = stream,
.loop = &thread.loop,
.terminal_stream = .{
.handler = .{
.alloc = self.alloc,
@ -235,22 +134,36 @@ pub fn threadEnter(self: *Exec, loop: libuv.Loop) !ThreadData {
},
},
};
errdefer ev_data_ptr.deinit();
errdefer ev_data_ptr.deinit(self.alloc);
// Store our data so our callbacks can access it
self.data = ev_data_ptr;
// Start our reader thread
const read_thread = try std.Thread.spawn(
.{},
ReadThread.threadMain,
.{ master_fd, ev_data_ptr },
);
read_thread.setName("io-reader") catch {};
// Return our thread data
return ThreadData{
.alloc = alloc,
.ev = ev_data_ptr,
.read_thread = read_thread,
};
}
pub fn threadExit(self: *Exec, data: ThreadData) void {
_ = data;
// Clear out our data since we're not active anymore.
self.data = null;
// Stop our subprocess
self.subprocess.stop();
// Wait for our reader thread to end
data.read_thread.join();
}
/// Resize the terminal.
@ -260,15 +173,9 @@ pub fn resize(
screen_size: renderer.ScreenSize,
padding: renderer.Padding,
) !void {
const padded_size = screen_size.subPadding(padding);
// Update the size of our pty
try self.pty.setSize(.{
.ws_row = @intCast(u16, grid_size.rows),
.ws_col = @intCast(u16, grid_size.columns),
.ws_xpixel = @intCast(u16, padded_size.width),
.ws_ypixel = @intCast(u16, padded_size.height),
});
const padded_size = screen_size.subPadding(padding);
try self.subprocess.resize(grid_size, padded_size);
// Update our cached grid size
self.grid_size = grid_size;
@ -284,7 +191,28 @@ pub fn resize(
}
pub inline fn queueWrite(self: *Exec, data: []const u8) !void {
try self.data.?.queueWrite(data);
const ev = self.data.?;
// We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream.
var i: usize = 0;
while (i < data.len) {
const req = try ev.write_req_pool.get();
const buf = try ev.write_buf_pool.get();
const end = @min(data.len, i + buf.len);
fastmem.copy(u8, buf, data[i..end]);
ev.data_stream.queueWrite(
ev.loop,
&ev.write_queue,
req,
.{ .slice = buf[0..(end - i)] },
EventData,
ev,
ttyWrite,
);
i = end;
}
}
const ThreadData = struct {
@ -294,6 +222,9 @@ const ThreadData = struct {
/// The data that is attached to the callbacks.
ev: *EventData,
/// Our read thread
read_thread: std.Thread,
pub fn deinit(self: *ThreadData) void {
self.ev.deinit(self.alloc);
self.alloc.destroy(self.ev);
@ -306,10 +237,9 @@ const EventData = struct {
// enough to satisfy most write requests. It must be a power of 2.
const WRITE_REQ_PREALLOC = std.math.pow(usize, 2, 5);
/// This is the arena allocator used for IO read buffers. Since we use
/// libuv under the covers, this lets us rarely heap allocate since we're
/// usually just reusing buffers from this.
read_arena: std.heap.ArenaAllocator,
/// Mailbox for data to the writer thread.
writer_mailbox: *termio.Mailbox,
writer_wakeup: xev.Async,
/// The stream parser. This parses the stream of escape codes and so on
/// from the child process and calls callbacks in the stream handler.
@ -320,28 +250,32 @@ const EventData = struct {
/// A handle to wake up the renderer. This hints to the renderer that that
/// a repaint should happen.
renderer_wakeup: libuv.Async,
renderer_wakeup: xev.Async,
/// The mailbox for notifying the renderer of things.
renderer_mailbox: *renderer.Thread.Mailbox,
/// The data stream is the main IO for the pty.
data_stream: libuv.Tty,
data_stream: xev.Stream,
/// The event loop,
loop: *xev.Loop,
/// The write queue for the data stream.
write_queue: xev.Stream.WriteQueue = .{},
/// This is the pool of available (unused) write requests. If you grab
/// one from the pool, you must put it back when you're done!
write_req_pool: SegmentedPool(libuv.WriteReq.T, WRITE_REQ_PREALLOC) = .{},
write_req_pool: SegmentedPool(xev.Stream.WriteRequest, WRITE_REQ_PREALLOC) = .{},
/// The pool of available buffers for writing to the pty.
write_buf_pool: SegmentedPool([64]u8, WRITE_REQ_PREALLOC) = .{},
/// Last time the cursor was reset. This is used to prevent message
/// flooding with cursor resets.
last_cursor_reset: u64 = 0,
last_cursor_reset: i64 = 0,
pub fn deinit(self: *EventData, alloc: Allocator) void {
self.read_arena.deinit();
// Clear our write pools. We know we aren't ever going to do
// any more IO since we stop our data stream below so we can just
// drop this.
@ -349,146 +283,335 @@ const EventData = struct {
self.write_buf_pool.deinit(alloc);
// Stop our data stream
self.data_stream.readStop();
self.data_stream.close((struct {
fn callback(h: *libuv.Tty) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
self.data_stream.deinit();
}
/// This queues a render operation with the renderer thread. The render
/// isn't guaranteed to happen immediately but it will happen as soon as
/// practical.
inline fn queueRender(self: *EventData) !void {
try self.renderer_wakeup.send();
try self.renderer_wakeup.notify();
}
};
fn ttyWrite(
ev_: ?*EventData,
_: *xev.Loop,
_: *xev.Completion,
_: xev.Stream,
_: xev.WriteBuffer,
r: xev.Stream.WriteError!usize,
) xev.CallbackAction {
const ev = ev_.?;
ev.write_req_pool.put();
ev.write_buf_pool.put();
const d = r catch |err| {
log.err("write error: {}", .{err});
return .disarm;
};
_ = d;
//log.info("WROTE: {d}", .{status});
return .disarm;
}
/// Subprocess manages the lifecycle of the shell subprocess.
const Subprocess = struct {
cwd: ?[]const u8,
env: std.process.EnvMap,
path: []const u8,
argv0_override: ?[]const u8,
grid_size: renderer.GridSize,
screen_size: renderer.ScreenSize,
pty: ?Pty = null,
command: ?Command = null,
/// Initialize the subprocess. This will NOT start it, this only sets
/// up the internal state necessary to start it later.
pub fn init(alloc: Allocator, opts: termio.Options) !Subprocess {
// Determine the path to the binary we're executing
const path = (try Command.expandPath(alloc, opts.config.command orelse "sh")) orelse
return error.CommandNotFound;
errdefer alloc.free(path);
// On macOS, we launch the program as a login shell. This is a Mac-specific
// behavior (see other terminals). Terminals in general should NOT be
// spawning login shells because well... we're not "logging in." The solution
// is to put dotfiles in "rc" variants rather than "_login" variants. But,
// history!
const argv0_override: ?[]const u8 = if (comptime builtin.target.isDarwin()) argv0: {
// Get rid of the path
const argv0 = if (std.mem.lastIndexOf(u8, path, "/")) |idx|
path[idx + 1 ..]
else
path;
// Copy it with a hyphen so its a login shell
const argv0_buf = try alloc.alloc(u8, argv0.len + 1);
argv0_buf[0] = '-';
std.mem.copy(u8, argv0_buf[1..], argv0);
break :argv0 argv0_buf;
} else null;
errdefer if (argv0_override) |buf| alloc.free(buf);
// Set our env vars
var env = try std.process.getEnvMap(alloc);
errdefer env.deinit();
try env.put("TERM", "xterm-256color");
try env.put("COLORTERM", "truecolor");
return .{
.env = env,
.cwd = opts.config.@"working-directory",
.path = path,
.argv0_override = argv0_override,
.grid_size = opts.grid_size,
.screen_size = opts.screen_size,
};
}
/// Queue a write to the pty.
fn queueWrite(self: *EventData, data: []const u8) !void {
// We go through and chunk the data if necessary to fit into
// our cached buffers that we can queue to the stream.
var i: usize = 0;
while (i < data.len) {
const req = try self.write_req_pool.get();
const buf = try self.write_buf_pool.get();
const end = @min(data.len, i + buf.len);
fastmem.copy(u8, buf, data[i..end]);
try self.data_stream.write(
.{ .req = req },
&[1][]u8{buf[0..(end - i)]},
ttyWrite,
);
/// Clean up the subprocess. This will stop the subprocess if it is started.
pub fn deinit(self: *Subprocess, alloc: Allocator) void {
self.stop();
self.env.deinit();
alloc.free(self.path);
if (self.argv0_override) |v| alloc.free(v);
self.* = undefined;
}
i = end;
/// Start the subprocess. If the subprocess is already started this
/// will crash.
pub fn start(self: *Subprocess, alloc: Allocator) !std.os.fd_t {
assert(self.pty == null and self.command == null);
// Create our pty
var pty = try Pty.open(.{
.ws_row = @intCast(u16, self.grid_size.rows),
.ws_col = @intCast(u16, self.grid_size.columns),
.ws_xpixel = @intCast(u16, self.screen_size.width),
.ws_ypixel = @intCast(u16, self.screen_size.height),
});
self.pty = pty;
errdefer {
pty.deinit();
self.pty = null;
}
const args = &[_][]const u8{self.argv0_override orelse self.path};
// Build our subcommand
var cmd: Command = .{
.path = self.path,
.args = args,
.env = &self.env,
.cwd = self.cwd,
.stdin = .{ .handle = pty.slave },
.stdout = .{ .handle = pty.slave },
.stderr = .{ .handle = pty.slave },
.pre_exec = (struct {
fn callback(cmd: *Command) void {
const p = cmd.getData(Pty) orelse unreachable;
p.childPreExec() catch |err|
log.err("error initializing child: {}", .{err});
}
}).callback,
.data = &self.pty.?,
};
try cmd.start(alloc);
errdefer killCommand(cmd);
log.info("started subcommand path={s} pid={?}", .{ self.path, cmd.pid });
self.command = cmd;
return pty.master;
}
/// Stop the subprocess. This is safe to call anytime. This will wait
/// for the subprocess to end so it will block.
pub fn stop(self: *Subprocess) void {
// Kill our command
if (self.command) |*cmd| {
killCommand(cmd) catch |err|
log.err("error sending SIGHUP to command, may hang: {}", .{err});
_ = cmd.wait(false) catch |err|
log.err("error waiting for command to exit: {}", .{err});
self.command = null;
}
// Close our PTY. We do this after killing our command because on
// macOS, close will block until all blocking operations read/write
// are done with it and our reader thread is probably still alive.
if (self.pty) |*pty| {
pty.deinit();
self.pty = null;
}
}
/// Resize the pty subprocess. This is safe to call anytime.
pub fn resize(
self: *Subprocess,
grid_size: renderer.GridSize,
screen_size: renderer.ScreenSize,
) !void {
self.grid_size = grid_size;
self.screen_size = screen_size;
if (self.pty) |pty| {
try pty.setSize(.{
.ws_row = @intCast(u16, grid_size.rows),
.ws_col = @intCast(u16, grid_size.columns),
.ws_xpixel = @intCast(u16, screen_size.width),
.ws_ypixel = @intCast(u16, screen_size.height),
});
}
}
/// Kill the underlying subprocess. This sends a SIGHUP to the child
/// process. This doesn't wait for the child process to be exited.
fn killCommand(command: *Command) !void {
if (command.pid) |pid| {
const pgid_: ?c.pid_t = pgid: {
const pgid = c.getpgid(pid);
// Don't know why it would be zero but its not a valid pid
if (pgid == 0) break :pgid null;
// If the pid doesn't exist then... okay.
if (pgid == c.ESRCH) break :pgid null;
// If we have an error...
if (pgid < 0) {
log.warn("error getting pgid for kill", .{});
break :pgid null;
}
break :pgid pgid;
};
if (pgid_) |pgid| {
if (c.killpg(pgid, c.SIGHUP) < 0) {
log.warn("error killing process group pgid={}", .{pgid});
return error.KillFailed;
}
}
}
}
};
fn ttyWrite(req: *libuv.WriteReq, status: i32) void {
const tty = req.handle(libuv.Tty).?;
const ev = tty.getData(EventData).?;
ev.write_req_pool.put();
ev.write_buf_pool.put();
/// The read thread sits in a loop doing the following pseudo code:
///
/// while (true) { blocking_read(); exit_if_eof(); process(); }
///
/// Almost all terminal-modifying activity is from the pty read, so
/// putting this on a dedicated thread keeps performance very predictable
/// while also almost optimal. "Locking is fast, lock contention is slow."
/// and since we rarely have contention, this is fast.
///
/// This is also empirically fast compared to putting the read into
/// an async mechanism like io_uring/epoll because the reads are generally
/// small.
const ReadThread = struct {
/// The main entrypoint for the thread.
fn threadMain(fd: std.os.fd_t, ev: *EventData) void {
var buf: [1024]u8 = undefined;
while (true) {
const n = std.os.read(fd, &buf) catch |err| {
switch (err) {
// This means our pty is closed. We're probably
// gracefully shutting down.
error.NotOpenForReading => log.info("io reader exiting", .{}),
libuv.convertError(status) catch |err|
log.err("write error: {}", .{err});
else => {
log.err("io reader error err={}", .{err});
unreachable;
},
}
return;
};
//log.info("WROTE: {d}", .{status});
}
fn ttyReadAlloc(t: *libuv.Tty, size: usize) ?[]u8 {
const zone = trace(@src());
defer zone.end();
const ev = t.getData(EventData) orelse return null;
const alloc = ev.read_arena.allocator();
return alloc.alloc(u8, size) catch null;
}
fn ttyRead(t: *libuv.Tty, n: isize, buf: []const u8) void {
const zone = trace(@src());
defer zone.end();
const ev = t.getData(EventData).?;
defer {
const alloc = ev.read_arena.allocator();
alloc.free(buf);
// log.info("DATA: {d}", .{n});
@call(.always_inline, process, .{ ev, buf[0..n] });
}
}
// log.info("DATA: {d}", .{n});
// log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]});
fn process(
ev: *EventData,
buf: []const u8,
) void {
const zone = trace(@src());
defer zone.end();
// First check for errors in the case n is less than 0.
libuv.convertError(@intCast(i32, n)) catch |err| {
switch (err) {
// ignore EOF because it should end the process.
libuv.Error.EOF => {},
else => log.err("read error: {}", .{err}),
// log.info("DATA: {d}", .{n});
// log.info("DATA: {any}", .{buf[0..@intCast(usize, n)]});
// Whenever a character is typed, we ensure the cursor is in the
// non-blink state so it is rendered if visible. If we're under
// HEAVY read load, we don't want to send a ton of these so we
// use a timer under the covers
const now = ev.loop.now();
if (now - ev.last_cursor_reset > 500) {
ev.last_cursor_reset = now;
_ = ev.renderer_mailbox.push(.{
.reset_cursor_blink = {},
}, .{ .forever = {} });
}
return;
};
// We are modifying terminal state from here on out
ev.renderer_state.mutex.lock();
defer ev.renderer_state.mutex.unlock();
// Whenever a character is typed, we ensure the cursor is in the
// non-blink state so it is rendered if visible. If we're under
// HEAVY read load, we don't want to send a ton of these so we
// use a timer under the covers
const now = t.loop().now();
if (now - ev.last_cursor_reset > 500) {
ev.last_cursor_reset = now;
_ = ev.renderer_mailbox.push(.{
.reset_cursor_blink = {},
}, .{ .forever = {} });
}
// Schedule a render
ev.queueRender() catch unreachable;
// We are modifying terminal state from here on out
ev.renderer_state.mutex.lock();
defer ev.renderer_state.mutex.unlock();
// Process the terminal data. This is an extremely hot part of the
// terminal emulator, so we do some abstraction leakage to avoid
// function calls and unnecessary logic.
//
// The ground state is the only state that we can see and print/execute
// ASCII, so we only execute this hot path if we're already in the ground
// state.
//
// Empirically, this alone improved throughput of large text output by ~20%.
var i: usize = 0;
const end = buf.len;
if (ev.terminal_stream.parser.state == .ground) {
for (buf[i..end]) |ch| {
switch (terminal.parse_table.table[ch][@enumToInt(terminal.Parser.State.ground)].action) {
// Print, call directly.
.print => ev.terminal_stream.handler.print(@intCast(u21, ch)) catch |err|
log.err("error processing terminal data: {}", .{err}),
// Schedule a render
ev.queueRender() catch unreachable;
// C0 execute, let our stream handle this one but otherwise
// continue since we're guaranteed to be back in ground.
.execute => ev.terminal_stream.execute(ch) catch |err|
log.err("error processing terminal data: {}", .{err}),
// Process the terminal data. This is an extremely hot part of the
// terminal emulator, so we do some abstraction leakage to avoid
// function calls and unnecessary logic.
//
// The ground state is the only state that we can see and print/execute
// ASCII, so we only execute this hot path if we're already in the ground
// state.
//
// Empirically, this alone improved throughput of large text output by ~20%.
var i: usize = 0;
const end = @intCast(usize, n);
if (ev.terminal_stream.parser.state == .ground) {
for (buf[i..end]) |ch| {
switch (terminal.parse_table.table[ch][@enumToInt(terminal.Parser.State.ground)].action) {
// Print, call directly.
.print => ev.terminal_stream.handler.print(@intCast(u21, ch)) catch |err|
log.err("error processing terminal data: {}", .{err}),
// Otherwise, break out and go the slow path until we're
// back in ground. There is a slight optimization here where
// could try to find the next transition to ground but when
// I implemented that it didn't materially change performance.
else => break,
}
// C0 execute, let our stream handle this one but otherwise
// continue since we're guaranteed to be back in ground.
.execute => ev.terminal_stream.execute(ch) catch |err|
log.err("error processing terminal data: {}", .{err}),
// Otherwise, break out and go the slow path until we're
// back in ground. There is a slight optimization here where
// could try to find the next transition to ground but when
// I implemented that it didn't materially change performance.
else => break,
i += 1;
}
}
i += 1;
if (i < end) {
ev.terminal_stream.nextSlice(buf[i..end]) catch |err|
log.err("error processing terminal data: {}", .{err});
}
// If our stream handling caused messages to be sent to the writer
// thread, then we need to wake it up so that it processes them.
if (ev.terminal_stream.handler.writer_messaged) {
ev.terminal_stream.handler.writer_messaged = false;
ev.writer_wakeup.notify() catch |err| {
log.warn("failed to wake up writer thread err={}", .{err});
};
}
}
if (i < end) {
ev.terminal_stream.nextSlice(buf[i..end]) catch |err|
log.err("error processing terminal data: {}", .{err});
}
}
};
/// This is used as the handler for the terminal.Stream type. This is
/// stateful and is expected to live for the entire lifetime of the terminal.
@ -501,12 +624,18 @@ const StreamHandler = struct {
terminal: *terminal.Terminal,
window_mailbox: Window.Mailbox,
/// This is set to true when a message was written to the writer
/// mailbox. This can be used by callers to determine if they need
/// to wake up the writer.
writer_messaged: bool = false,
inline fn queueRender(self: *StreamHandler) !void {
try self.ev.queueRender();
}
inline fn queueWrite(self: *StreamHandler, data: []const u8) !void {
try self.ev.queueWrite(data);
inline fn messageWriter(self: *StreamHandler, msg: termio.Message) void {
_ = self.ev.writer_mailbox.push(msg, .{ .forever = {} });
self.writer_messaged = true;
}
pub fn print(self: *StreamHandler, ch: u21) !void {
@ -714,8 +843,7 @@ const StreamHandler = struct {
switch (req) {
// VT220
.primary => self.queueWrite("\x1B[?62;c") catch |err|
log.warn("error queueing device attr response: {}", .{err}),
.primary => self.messageWriter(.{ .write_stable = "\x1B[?62;c" }),
else => log.warn("unimplemented device attributes req: {}", .{req}),
}
}
@ -725,8 +853,7 @@ const StreamHandler = struct {
req: terminal.DeviceStatusReq,
) !void {
switch (req) {
.operating_status => self.queueWrite("\x1B[0n") catch |err|
log.warn("error queueing device attr response: {}", .{err}),
.operating_status => self.messageWriter(.{ .write_stable = "\x1B[0n" }),
.cursor_position => {
const pos: struct {
@ -744,13 +871,14 @@ const StreamHandler = struct {
// Response always is at least 4 chars, so this leaves the
// remainder for the row/column as base-10 numbers. This
// will support a very large terminal.
var buf: [32]u8 = undefined;
const resp = try std.fmt.bufPrint(&buf, "\x1B[{};{}R", .{
var msg: termio.Message = .{ .write_small = .{} };
const resp = try std.fmt.bufPrint(&msg.write_small.data, "\x1B[{};{}R", .{
pos.y + 1,
pos.x + 1,
});
msg.write_small.len = @intCast(u8, resp.len);
try self.queueWrite(resp);
self.messageWriter(msg);
},
else => log.warn("unimplemented device status req: {}", .{req}),
@ -785,7 +913,7 @@ const StreamHandler = struct {
}
pub fn enquiry(self: *StreamHandler) !void {
try self.queueWrite("");
self.messageWriter(.{ .write_stable = "" });
}
pub fn scrollDown(self: *StreamHandler, count: usize) !void {

View File

@ -1,6 +1,6 @@
//! The options that are used to configure a terminal IO implementation.
const libuv = @import("libuv");
const xev = @import("xev");
const renderer = @import("../renderer.zig");
const Config = @import("../config.zig").Config;
const Window = @import("../Window.zig");
@ -22,7 +22,7 @@ renderer_state: *renderer.State,
/// A handle to wake up the renderer. This hints to the renderer that that
/// a repaint should happen.
renderer_wakeup: libuv.Async,
renderer_wakeup: xev.Async,
/// The mailbox for renderer messages.
renderer_mailbox: *renderer.Thread.Mailbox,

View File

@ -4,7 +4,7 @@ pub const Thread = @This();
const std = @import("std");
const builtin = @import("builtin");
const libuv = @import("libuv");
const xev = @import("xev");
const termio = @import("../termio.zig");
const BlockingQueue = @import("../blocking_queue.zig").BlockingQueue;
const tracy = @import("tracy");
@ -16,18 +16,23 @@ const log = std.log.scoped(.io_thread);
/// The type used for sending messages to the IO thread. For now this is
/// hardcoded with a capacity. We can make this a comptime parameter in
/// the future if we want it configurable.
const Mailbox = BlockingQueue(termio.Message, 64);
pub const Mailbox = BlockingQueue(termio.Message, 64);
/// Allocator used for some state
alloc: std.mem.Allocator,
/// The main event loop for the thread. The user data of this loop
/// is always the allocator used to create the loop. This is a convenience
/// so that users of the loop always have an allocator.
loop: libuv.Loop,
loop: xev.Loop,
/// This can be used to wake up the thread.
wakeup: libuv.Async,
wakeup: xev.Async,
wakeup_c: xev.Completion = .{},
/// This can be used to stop the thread on the next loop iteration.
stop: libuv.Async,
stop: xev.Async,
stop_c: xev.Completion = .{},
/// The underlying IO implementation.
impl: *termio.Impl,
@ -43,44 +48,24 @@ pub fn init(
alloc: Allocator,
impl: *termio.Impl,
) !Thread {
// We always store allocator pointer on the loop data so that
// handles can use our global allocator.
const allocPtr = try alloc.create(Allocator);
errdefer alloc.destroy(allocPtr);
allocPtr.* = alloc;
// Create our event loop.
var loop = try libuv.Loop.init(alloc);
errdefer {
// Run the loop once to close any of our handles
_ = loop.run(.nowait) catch 0;
loop.deinit(alloc);
}
loop.setData(allocPtr);
var loop = try xev.Loop.init(.{});
errdefer loop.deinit();
// This async handle is used to "wake up" the renderer and force a render.
var wakeup_h = try libuv.Async.init(alloc, loop, wakeupCallback);
errdefer wakeup_h.close((struct {
fn callback(h: *libuv.Async) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var wakeup_h = try xev.Async.init();
errdefer wakeup_h.deinit();
// This async handle is used to stop the loop and force the thread to end.
var stop_h = try libuv.Async.init(alloc, loop, stopCallback);
errdefer stop_h.close((struct {
fn callback(h: *libuv.Async) void {
const loop_alloc = h.loop().getData(Allocator).?.*;
h.deinit(loop_alloc);
}
}).callback);
var stop_h = try xev.Async.init();
errdefer stop_h.deinit();
// The mailbox for messaging this thread
var mailbox = try Mailbox.create(alloc);
errdefer mailbox.destroy(alloc);
return Thread{
.alloc = alloc,
.loop = loop,
.wakeup = wakeup_h,
.stop = stop_h,
@ -92,37 +77,12 @@ pub fn init(
/// Clean up the thread. This is only safe to call once the thread
/// completes executing; the caller must join prior to this.
pub fn deinit(self: *Thread) void {
// Get a copy to our allocator
const alloc_ptr = self.loop.getData(Allocator).?;
const alloc = alloc_ptr.*;
// Schedule our handles to close
self.stop.close((struct {
fn callback(h: *libuv.Async) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
self.wakeup.close((struct {
fn callback(h: *libuv.Async) void {
const handle_alloc = h.loop().getData(Allocator).?.*;
h.deinit(handle_alloc);
}
}).callback);
// Run the loop one more time, because destroying our other things
// like windows usually cancel all our event loop stuff and we need
// one more run through to finalize all the closes.
_ = self.loop.run(.default) catch |err|
log.err("error finalizing event loop: {}", .{err});
self.stop.deinit();
self.wakeup.deinit();
self.loop.deinit();
// Nothing can possibly access the mailbox anymore, destroy it.
self.mailbox.destroy(alloc);
// Dealloc our allocator copy
alloc.destroy(alloc_ptr);
self.loop.deinit(alloc);
self.mailbox.destroy(self.alloc);
}
/// The main entrypoint for the thread.
@ -139,18 +99,18 @@ fn threadMain_(self: *Thread) !void {
// Run our thread start/end callbacks. This allows the implementation
// to hook into the event loop as needed.
var data = try self.impl.threadEnter(self.loop);
var data = try self.impl.threadEnter(self);
defer data.deinit();
defer self.impl.threadExit(data);
// Set up our async handler to support rendering
self.wakeup.setData(self);
defer self.wakeup.setData(null);
// Start the async handlers
self.wakeup.wait(&self.loop, &self.wakeup_c, Thread, self, wakeupCallback);
self.stop.wait(&self.loop, &self.stop_c, Thread, self, stopCallback);
// Run
log.debug("starting IO thread", .{});
defer log.debug("exiting IO thread", .{});
_ = try self.loop.run(.default);
try self.loop.run(.until_done);
}
/// Drain the mailbox, handling all the messages in our terminal implementation.
@ -181,26 +141,41 @@ fn drainMailbox(self: *Thread) !void {
// Trigger a redraw after we've drained so we don't waste cyces
// messaging a redraw.
if (redraw) {
try self.impl.renderer_wakeup.send();
try self.impl.renderer_wakeup.notify();
}
}
fn wakeupCallback(h: *libuv.Async) void {
fn wakeupCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Async.WaitError!void,
) xev.CallbackAction {
_ = r catch |err| {
log.err("error in wakeup err={}", .{err});
return .rearm;
};
const zone = trace(@src());
defer zone.end();
const t = h.getData(Thread) orelse {
// This shouldn't happen so we log it.
log.warn("wakeup callback fired without data set", .{});
return;
};
const t = self_.?;
// When we wake up, we check the mailbox. Mailbox producers should
// wake up our thread after publishing.
t.drainMailbox() catch |err|
log.err("error draining mailbox err={}", .{err});
return .rearm;
}
fn stopCallback(h: *libuv.Async) void {
h.loop().stop();
fn stopCallback(
self_: ?*Thread,
_: *xev.Loop,
_: *xev.Completion,
r: xev.Async.WaitError!void,
) xev.CallbackAction {
_ = r catch unreachable;
self_.?.loop.stop();
return .disarm;
}

View File

@ -60,8 +60,8 @@ pub fn MessageData(comptime Elem: type, comptime small_size: comptime_int) type
pub const Small = struct {
pub const Max = small_size;
pub const Array = [Max]Elem;
data: Array,
len: u8,
data: Array = undefined,
len: u8 = 0,
};
pub const Alloc = struct {

1
vendor/libxev vendored Submodule

@ -0,0 +1 @@
Subproject commit f578d81ebe60d3afe6593771f10cf650e4d891cb