diff --git a/src/blocking_queue.zig b/src/blocking_queue.zig new file mode 100644 index 000000000..8014c37a1 --- /dev/null +++ b/src/blocking_queue.zig @@ -0,0 +1,251 @@ +//! Blocking queue implementation aimed primarily for message passing +//! between threads. + +const std = @import("std"); +const builtin = @import("builtin"); +const assert = std.debug.assert; +const Allocator = std.mem.Allocator; + +/// Returns a blocking queue implementation for type T. +/// +/// This is tailor made for ghostty usage so it isn't meant to be maximally +/// generic, but I'm happy to make it more generic over time. Traits of this +/// queue that are specific to our usage: +/// +/// - Fixed size. We expect our queue to quickly drain and also not be +/// too large so we prefer a fixed size queue for now. +/// - No blocking pop. We use an external event loop mechanism such as +/// eventfd to notify our waiter that there is no data available so +/// we don't need to implement a blocking pop. +/// - Drain function. Most queues usually pop one at a time. We have +/// a mechanism for draining since on every IO loop our TTY drains +/// the full queue so we can get rid of the overhead of a ton of +/// locks and bounds checking and do a one-time drain. +/// +/// One key usage pattern is that our blocking queues are single producer +/// single consumer (SPSC). This should let us do some interesting optimizations +/// in the future. At the time of writing this, the blocking queue implementation +/// is purposely naive to build something quickly, but we should benchmark +/// and make this more optimized as necessary. +pub fn BlockingQueue( + comptime T: type, + comptime capacity: usize, +) type { + return struct { + const Self = @This(); + + // The type we use for queue size types. We can optimize this + // in the future to be the correct bit-size for our preallocated + // size for this queue. + const Size = u32; + + // The bounds of this queue. We recast this to Size so we can do math. + const bounds = @intCast(Size, capacity); + + /// Specifies the timeout for an operation. + pub const Timeout = union(enum) { + /// Fail instantly (non-blocking). + instant: void, + + /// Run forever or until interrupted + forever: void, + + /// Nanoseconds + ns: u64, + }; + + /// Our data. The values are undefined until they are written. + data: [bounds]T, + + /// The next location to write (next empty loc) and next location + /// to read (next non-empty loc). The number of written elements. + write: Size, + read: Size, + len: Size, + + /// The big mutex that must be held to read/write. + mutex: std.Thread.Mutex, + + /// A CV for being notified when the queue is no longer full. This is + /// used for writing. Note we DON'T have a CV for waiting on the + /// queue not being EMPTY because we use external notifiers for that. + cond_not_full: std.Thread.Condition, + not_full_waiters: usize, + + /// Allocate the blocking queue. Allocation must always happen on + /// the heap due to shared concurrency state. + pub fn create(alloc: Allocator) !*Self { + const ptr = try alloc.create(Self); + errdefer alloc.destroy(ptr); + + ptr.* = .{ + .data = undefined, + .len = 0, + .write = 0, + .read = 0, + .mutex = .{}, + .cond_not_full = .{}, + .not_full_waiters = 0, + }; + + return ptr; + } + + /// Free all the resources for this queue. This should only be + /// called once all producers and consumers have quit. + pub fn destroy(self: *Self, alloc: Allocator) void { + self.* = undefined; + alloc.destroy(self); + } + + /// Push a value to the queue. This returns the total size of the + /// queue (unread items) after the push. A return value of zero + /// means that the push failed. + pub fn push(self: *Self, value: T, timeout: Timeout) Size { + self.mutex.lock(); + defer self.mutex.unlock(); + + // The + if (self.full()) { + switch (timeout) { + // If we're not waiting, then we failed to write. + .instant => return 0, + + .forever => { + self.not_full_waiters += 1; + defer self.not_full_waiters -= 1; + self.cond_not_full.wait(&self.mutex); + }, + + .ns => |ns| { + self.not_full_waiters += 1; + defer self.not_full_waiters -= 1; + self.cond_not_full.timedWait(&self.mutex, ns) catch return 0; + }, + } + + // If we're still full, then we failed to write. This can + // happen in situations where we are interrupted. + if (self.full()) return 0; + } + + // Add our data and update our accounting + self.data[self.write] = value; + self.write += 1; + if (self.write >= bounds) self.write -= bounds; + self.len += 1; + + return self.len; + } + + /// Pop a value from the queue without blocking. + pub fn pop(self: *Self) ?T { + self.mutex.lock(); + defer self.mutex.unlock(); + + // If we're empty we have nothing + if (self.len == 0) return null; + + // Get the index we're going to read data from and do some + // accounting. We don't copy the value here to avoid copying twice. + const n = self.read; + self.read += 1; + if (self.read >= bounds) self.read -= bounds; + self.len -= 1; + + // If we have consumers waiting on a full queue, notify. + if (self.not_full_waiters > 0) self.cond_not_full.signal(); + + return self.data[n]; + } + + /// Pop all values from the queue. This will hold the big mutex + /// until `deinit` is called on the return value. This is used if + /// you know you're going to "pop" and utilize all the values + /// quickly to avoid many locks, bounds checks, and cv signals. + pub fn drain(self: *Self) DrainIterator { + self.mutex.lock(); + return .{ .queue = self }; + } + + pub const DrainIterator = struct { + queue: *Self, + + pub fn next(self: *DrainIterator) ?T { + if (self.queue.len == 0) return null; + + // Read and account + const n = self.queue.read; + self.queue.read += 1; + if (self.queue.read >= bounds) self.queue.read -= bounds; + self.queue.len -= 1; + + return self.queue.data[n]; + } + + pub fn deinit(self: *DrainIterator) void { + // If we have consumers waiting on a full queue, notify. + if (self.queue.not_full_waiters > 0) self.queue.cond_not_full.signal(); + + // Unlock + self.queue.mutex.unlock(); + } + }; + + /// Returns true if the queue is full. This is not public because + /// it requires the lock to be held. + inline fn full(self: *Self) bool { + return self.len == bounds; + } + }; +} + +test "basic push and pop" { + const testing = std.testing; + const alloc = testing.allocator; + + const Q = BlockingQueue(u64, 4); + const q = try Q.create(alloc); + defer q.destroy(alloc); + + // Should have no values + try testing.expect(q.pop() == null); + + // Push until we're full + try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} })); + try testing.expectEqual(@as(Q.Size, 2), q.push(2, .{ .instant = {} })); + try testing.expectEqual(@as(Q.Size, 3), q.push(3, .{ .instant = {} })); + try testing.expectEqual(@as(Q.Size, 4), q.push(4, .{ .instant = {} })); + try testing.expectEqual(@as(Q.Size, 0), q.push(5, .{ .instant = {} })); + + // Pop! + try testing.expect(q.pop().? == 1); + try testing.expect(q.pop().? == 2); + try testing.expect(q.pop().? == 3); + try testing.expect(q.pop().? == 4); + try testing.expect(q.pop() == null); + + // Drain does nothing + var it = q.drain(); + try testing.expect(it.next() == null); + it.deinit(); + + // Verify we can still push + try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} })); +} + +test "timed push" { + const testing = std.testing; + const alloc = testing.allocator; + + const Q = BlockingQueue(u64, 1); + const q = try Q.create(alloc); + defer q.destroy(alloc); + + // Push + try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} })); + try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .instant = {} })); + + // Timed push should fail + try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .ns = 1000 })); +} diff --git a/src/main.zig b/src/main.zig index cd92fd761..1f92efbe1 100644 --- a/src/main.zig +++ b/src/main.zig @@ -203,6 +203,7 @@ test { _ = @import("terminal/main.zig"); // TODO + _ = @import("blocking_queue.zig"); _ = @import("config.zig"); _ = @import("homedir.zig"); _ = @import("passwd.zig");