ghostty/src/blocking_queue.zig
2022-11-14 10:46:40 -08:00

252 lines
8.8 KiB
Zig

//! Blocking queue implementation aimed primarily for message passing
//! between threads.
const std = @import("std");
const builtin = @import("builtin");
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
/// Returns a blocking queue implementation for type T.
///
/// This is tailor made for ghostty usage so it isn't meant to be maximally
/// generic, but I'm happy to make it more generic over time. Traits of this
/// queue that are specific to our usage:
///
/// - Fixed size. We expect our queue to quickly drain and also not be
/// too large so we prefer a fixed size queue for now.
/// - No blocking pop. We use an external event loop mechanism such as
/// eventfd to notify our waiter that there is no data available so
/// we don't need to implement a blocking pop.
/// - Drain function. Most queues usually pop one at a time. We have
/// a mechanism for draining since on every IO loop our TTY drains
/// the full queue so we can get rid of the overhead of a ton of
/// locks and bounds checking and do a one-time drain.
///
/// One key usage pattern is that our blocking queues are single producer
/// single consumer (SPSC). This should let us do some interesting optimizations
/// in the future. At the time of writing this, the blocking queue implementation
/// is purposely naive to build something quickly, but we should benchmark
/// and make this more optimized as necessary.
pub fn BlockingQueue(
comptime T: type,
comptime capacity: usize,
) type {
return struct {
const Self = @This();
// The type we use for queue size types. We can optimize this
// in the future to be the correct bit-size for our preallocated
// size for this queue.
pub const Size = u32;
// The bounds of this queue. We recast this to Size so we can do math.
const bounds = @intCast(Size, capacity);
/// Specifies the timeout for an operation.
pub const Timeout = union(enum) {
/// Fail instantly (non-blocking).
instant: void,
/// Run forever or until interrupted
forever: void,
/// Nanoseconds
ns: u64,
};
/// Our data. The values are undefined until they are written.
data: [bounds]T,
/// The next location to write (next empty loc) and next location
/// to read (next non-empty loc). The number of written elements.
write: Size,
read: Size,
len: Size,
/// The big mutex that must be held to read/write.
mutex: std.Thread.Mutex,
/// A CV for being notified when the queue is no longer full. This is
/// used for writing. Note we DON'T have a CV for waiting on the
/// queue not being EMPTY because we use external notifiers for that.
cond_not_full: std.Thread.Condition,
not_full_waiters: usize,
/// Allocate the blocking queue. Allocation must always happen on
/// the heap due to shared concurrency state.
pub fn create(alloc: Allocator) !*Self {
const ptr = try alloc.create(Self);
errdefer alloc.destroy(ptr);
ptr.* = .{
.data = undefined,
.len = 0,
.write = 0,
.read = 0,
.mutex = .{},
.cond_not_full = .{},
.not_full_waiters = 0,
};
return ptr;
}
/// Free all the resources for this queue. This should only be
/// called once all producers and consumers have quit.
pub fn destroy(self: *Self, alloc: Allocator) void {
self.* = undefined;
alloc.destroy(self);
}
/// Push a value to the queue. This returns the total size of the
/// queue (unread items) after the push. A return value of zero
/// means that the push failed.
pub fn push(self: *Self, value: T, timeout: Timeout) Size {
self.mutex.lock();
defer self.mutex.unlock();
// The
if (self.full()) {
switch (timeout) {
// If we're not waiting, then we failed to write.
.instant => return 0,
.forever => {
self.not_full_waiters += 1;
defer self.not_full_waiters -= 1;
self.cond_not_full.wait(&self.mutex);
},
.ns => |ns| {
self.not_full_waiters += 1;
defer self.not_full_waiters -= 1;
self.cond_not_full.timedWait(&self.mutex, ns) catch return 0;
},
}
// If we're still full, then we failed to write. This can
// happen in situations where we are interrupted.
if (self.full()) return 0;
}
// Add our data and update our accounting
self.data[self.write] = value;
self.write += 1;
if (self.write >= bounds) self.write -= bounds;
self.len += 1;
return self.len;
}
/// Pop a value from the queue without blocking.
pub fn pop(self: *Self) ?T {
self.mutex.lock();
defer self.mutex.unlock();
// If we're empty we have nothing
if (self.len == 0) return null;
// Get the index we're going to read data from and do some
// accounting. We don't copy the value here to avoid copying twice.
const n = self.read;
self.read += 1;
if (self.read >= bounds) self.read -= bounds;
self.len -= 1;
// If we have consumers waiting on a full queue, notify.
if (self.not_full_waiters > 0) self.cond_not_full.signal();
return self.data[n];
}
/// Pop all values from the queue. This will hold the big mutex
/// until `deinit` is called on the return value. This is used if
/// you know you're going to "pop" and utilize all the values
/// quickly to avoid many locks, bounds checks, and cv signals.
pub fn drain(self: *Self) DrainIterator {
self.mutex.lock();
return .{ .queue = self };
}
pub const DrainIterator = struct {
queue: *Self,
pub fn next(self: *DrainIterator) ?T {
if (self.queue.len == 0) return null;
// Read and account
const n = self.queue.read;
self.queue.read += 1;
if (self.queue.read >= bounds) self.queue.read -= bounds;
self.queue.len -= 1;
return self.queue.data[n];
}
pub fn deinit(self: *DrainIterator) void {
// If we have consumers waiting on a full queue, notify.
if (self.queue.not_full_waiters > 0) self.queue.cond_not_full.signal();
// Unlock
self.queue.mutex.unlock();
}
};
/// Returns true if the queue is full. This is not public because
/// it requires the lock to be held.
inline fn full(self: *Self) bool {
return self.len == bounds;
}
};
}
test "basic push and pop" {
const testing = std.testing;
const alloc = testing.allocator;
const Q = BlockingQueue(u64, 4);
const q = try Q.create(alloc);
defer q.destroy(alloc);
// Should have no values
try testing.expect(q.pop() == null);
// Push until we're full
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 2), q.push(2, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 3), q.push(3, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 4), q.push(4, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 0), q.push(5, .{ .instant = {} }));
// Pop!
try testing.expect(q.pop().? == 1);
try testing.expect(q.pop().? == 2);
try testing.expect(q.pop().? == 3);
try testing.expect(q.pop().? == 4);
try testing.expect(q.pop() == null);
// Drain does nothing
var it = q.drain();
try testing.expect(it.next() == null);
it.deinit();
// Verify we can still push
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
}
test "timed push" {
const testing = std.testing;
const alloc = testing.allocator;
const Q = BlockingQueue(u64, 1);
const q = try Q.create(alloc);
defer q.destroy(alloc);
// Push
try testing.expectEqual(@as(Q.Size, 1), q.push(1, .{ .instant = {} }));
try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .instant = {} }));
// Timed push should fail
try testing.expectEqual(@as(Q.Size, 0), q.push(2, .{ .ns = 1000 }));
}