libuv: pipe reading

This commit is contained in:
Mitchell Hashimoto
2022-04-25 18:58:32 -07:00
parent 2f457bbd97
commit a0424d3a86
2 changed files with 97 additions and 7 deletions

View File

@ -99,6 +99,7 @@ test "Pipe" {
// Set our data that we'll use to assert
var data: TestData = .{};
defer data.deinit();
writer.setData(&data);
// Write
@ -110,7 +111,7 @@ test "Pipe" {
&[_][]const u8{
"hello",
},
callback,
TestData.write,
);
// Run write and verify success
@ -118,19 +119,49 @@ test "Pipe" {
try testing.expectEqual(@as(u8, 1), data.count);
try testing.expectEqual(@as(i32, 0), data.status);
// Read
try reader.readStart(TestData.alloc, TestData.read);
reader.setData(&data);
_ = try loop.run(.once);
// Check our data
try testing.expectEqual(@as(usize, 5), data.data.items.len);
try testing.expectEqualStrings("hello", data.data.items);
// End
reader.readStop();
reader.close(null);
writer.close(null);
_ = try loop.run(.default);
}
/// Logic for testing read/write of pipes.
const TestData = struct {
count: u8 = 0,
status: i32 = 0,
};
data: std.ArrayListUnmanaged(u8) = .{},
fn callback(req: *WriteReq, status: i32) void {
fn deinit(self: *TestData) void {
self.data.deinit(testing.allocator);
self.* = undefined;
}
fn write(req: *WriteReq, status: i32) void {
var data = req.handle(Pipe).?.getData(TestData).?;
data.count += 1;
data.status = status;
}
fn alloc(_: *Pipe, size: usize) ?[]u8 {
return testing.allocator.alloc(u8, size) catch null;
}
fn read(h: *Pipe, n: isize, buf: []const u8) void {
var data = h.getData(TestData).?;
data.data.appendSlice(
testing.allocator,
buf[0..@intCast(usize, n)],
) catch unreachable;
testing.allocator.free(buf);
}
};

View File

@ -62,6 +62,65 @@ pub fn Stream(comptime T: type) type {
Wrapper.callback,
));
}
/// Read data from an incoming stream. The uv_read_cb callback will
/// be made several times until there is no more data to read or
/// uv_read_stop() is called.
pub fn readStart(
self: T,
comptime alloc_cb: fn (self: *T, size: usize) ?[]u8,
comptime read_cb: fn (self: *T, nread: isize, buf: []const u8) void,
) !void {
const Wrapper = struct {
fn alloc(
cbhandle: [*c]c.uv_handle_t,
cbsize: usize,
buf: [*c]c.uv_buf_t,
) callconv(.C) void {
var param: T = .{ .handle = @ptrCast(HandleType, cbhandle) };
const result = @call(.{ .modifier = .always_inline }, alloc_cb, .{
&param,
cbsize,
});
if (result) |slice| {
buf.* = @bitCast(c.uv_buf_t, slice);
} else {
buf.*.base = null;
buf.*.len = 0;
}
}
fn read(
cbhandle: [*c]c.uv_stream_t,
cbnread: isize,
cbbuf: [*c]const c.uv_buf_t,
) callconv(.C) void {
var param: T = .{ .handle = @ptrCast(HandleType, cbhandle) };
@call(.{ .modifier = .always_inline }, read_cb, .{
&param,
cbnread,
@bitCast([]const u8, cbbuf.*),
});
}
};
try errors.convertError(c.uv_read_start(
@ptrCast(*c.uv_stream_t, self.handle),
Wrapper.alloc,
Wrapper.read,
));
}
/// Stop reading data from the stream. The uv_read_cb callback will
/// no longer be called.
///
/// This function is idempotent and may be safely called on a stopped
/// stream.
pub fn readStop(self: T) void {
// Docs say we can ignore this result.
_ = c.uv_read_stop(@ptrCast(*c.uv_stream_t, self.handle));
}
};
}