From b0aa222e58d12bca6c5d6f51cf138fd82773d90b Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Fri, 29 Apr 2022 19:20:47 -0700 Subject: [PATCH] libuv: Embed now uses a mutex + cond var --- src/libuv/Embed.zig | 44 +++++++++++++++++++++++++++++++++----------- 1 file changed, 33 insertions(+), 11 deletions(-) diff --git a/src/libuv/Embed.zig b/src/libuv/Embed.zig index 75d2f172e..48ec09eac 100644 --- a/src/libuv/Embed.zig +++ b/src/libuv/Embed.zig @@ -10,15 +10,18 @@ const builtin = @import("builtin"); const testing = std.testing; const Allocator = std.mem.Allocator; const Loop = @import("Loop.zig"); -const Sem = @import("Sem.zig"); const Thread = @import("Thread.zig"); +const Mutex = @import("Mutex.zig"); +const Cond = @import("Cond.zig"); const log = std.log.scoped(.libuv_embed); const BoolAtomic = std.atomic.Atomic(bool); loop: Loop, -sem: Sem, +mutex: Mutex, +cond: Cond, +ready: bool = false, terminate: BoolAtomic, sleeping: BoolAtomic, callback: fn () void, @@ -29,7 +32,8 @@ thread: ?Thread, pub fn init(alloc: Allocator, loop: Loop, callback: fn () void) !Embed { return Embed{ .loop = loop, - .sem = try Sem.init(alloc, 0), + .mutex = try Mutex.init(alloc), + .cond = try Cond.init(alloc), .terminate = BoolAtomic.init(false), .sleeping = BoolAtomic.init(false), .callback = callback, @@ -40,7 +44,8 @@ pub fn init(alloc: Allocator, loop: Loop, callback: fn () void) !Embed { /// Deinit the embed struct. This will not automatically terminate /// the embed thread. You must call stop manually. pub fn deinit(self: *Embed, alloc: Allocator) void { - self.sem.deinit(alloc); + self.mutex.deinit(alloc); + self.cond.deinit(alloc); self.* = undefined; } @@ -58,7 +63,7 @@ pub fn stop(self: *Embed) void { self.terminate.store(true, .SeqCst); // Post to the semaphore to ensure that any waits are processed. - self.sem.post(); + self.cond.broadcast(); } /// Wait for the thread backing the embedding to end. @@ -72,9 +77,15 @@ pub fn join(self: *Embed) !void { /// loopRun runs the next tick of the libuv event loop. This should be /// called by the main loop thread as a result of callback making some /// signal. This should NOT be called from callback. -pub fn loopRun(self: Embed) !void { - _ = try self.loop.run(.nowait); - self.sem.post(); +pub fn loopRun(self: *Embed) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + if (self.ready) { + self.ready = false; + _ = try self.loop.run(.nowait); + self.cond.broadcast(); + } } fn threadMain(self: *Embed) void { @@ -98,7 +109,7 @@ fn threadMain(self: *Embed) void { // epoll .linux => { var ev: [1]std.os.linux.epoll_event = undefined; - while (std.os.epoll_wait(fd, &ev, timeout) == -1) {} + _ = std.os.epoll_wait(fd, &ev, timeout); }, // kqueue @@ -129,8 +140,19 @@ fn threadMain(self: *Embed) void { // Call our trigger self.callback(); - // Wait for libuv to run a tick - self.sem.wait(); + // Wait for libuv to run a tick. + // + // NOTE: we use timedwait because I /believe/ there is a race here + // with gflw post event that sometimes causes it not to receive it. + // Therefore, if too much time passes, we just go back and loop + // through the poller. + // + // TODO: this is suboptimal for performance. There as to be a better + // way to do this. + self.mutex.lock(); + defer self.mutex.unlock(); + self.ready = true; + _ = self.cond.timedwait(self.mutex, 10 * 1000000); } }