diff --git a/src/libuv/Pipe.zig b/src/libuv/Pipe.zig index 24feb9a74..30f916013 100644 --- a/src/libuv/Pipe.zig +++ b/src/libuv/Pipe.zig @@ -99,6 +99,7 @@ test "Pipe" { // Set our data that we'll use to assert var data: TestData = .{}; + defer data.deinit(); writer.setData(&data); // Write @@ -110,7 +111,7 @@ test "Pipe" { &[_][]const u8{ "hello", }, - callback, + TestData.write, ); // Run write and verify success @@ -118,19 +119,49 @@ test "Pipe" { try testing.expectEqual(@as(u8, 1), data.count); try testing.expectEqual(@as(i32, 0), data.status); + // Read + try reader.readStart(TestData.alloc, TestData.read); + reader.setData(&data); + _ = try loop.run(.once); + + // Check our data + try testing.expectEqual(@as(usize, 5), data.data.items.len); + try testing.expectEqualStrings("hello", data.data.items); + // End + reader.readStop(); reader.close(null); writer.close(null); _ = try loop.run(.default); } +/// Logic for testing read/write of pipes. const TestData = struct { count: u8 = 0, status: i32 = 0, -}; + data: std.ArrayListUnmanaged(u8) = .{}, -fn callback(req: *WriteReq, status: i32) void { - var data = req.handle(Pipe).?.getData(TestData).?; - data.count += 1; - data.status = status; -} + fn deinit(self: *TestData) void { + self.data.deinit(testing.allocator); + self.* = undefined; + } + + fn write(req: *WriteReq, status: i32) void { + var data = req.handle(Pipe).?.getData(TestData).?; + data.count += 1; + data.status = status; + } + + fn alloc(_: *Pipe, size: usize) ?[]u8 { + return testing.allocator.alloc(u8, size) catch null; + } + + fn read(h: *Pipe, n: isize, buf: []const u8) void { + var data = h.getData(TestData).?; + data.data.appendSlice( + testing.allocator, + buf[0..@intCast(usize, n)], + ) catch unreachable; + testing.allocator.free(buf); + } +}; diff --git a/src/libuv/stream.zig b/src/libuv/stream.zig index d71e82fe1..59518e3c2 100644 --- a/src/libuv/stream.zig +++ b/src/libuv/stream.zig @@ -62,6 +62,65 @@ pub fn Stream(comptime T: type) type { Wrapper.callback, )); } + + /// Read data from an incoming stream. The uv_read_cb callback will + /// be made several times until there is no more data to read or + /// uv_read_stop() is called. + pub fn readStart( + self: T, + comptime alloc_cb: fn (self: *T, size: usize) ?[]u8, + comptime read_cb: fn (self: *T, nread: isize, buf: []const u8) void, + ) !void { + const Wrapper = struct { + fn alloc( + cbhandle: [*c]c.uv_handle_t, + cbsize: usize, + buf: [*c]c.uv_buf_t, + ) callconv(.C) void { + var param: T = .{ .handle = @ptrCast(HandleType, cbhandle) }; + const result = @call(.{ .modifier = .always_inline }, alloc_cb, .{ + ¶m, + cbsize, + }); + + if (result) |slice| { + buf.* = @bitCast(c.uv_buf_t, slice); + } else { + buf.*.base = null; + buf.*.len = 0; + } + } + + fn read( + cbhandle: [*c]c.uv_stream_t, + cbnread: isize, + cbbuf: [*c]const c.uv_buf_t, + ) callconv(.C) void { + var param: T = .{ .handle = @ptrCast(HandleType, cbhandle) }; + @call(.{ .modifier = .always_inline }, read_cb, .{ + ¶m, + cbnread, + @bitCast([]const u8, cbbuf.*), + }); + } + }; + + try errors.convertError(c.uv_read_start( + @ptrCast(*c.uv_stream_t, self.handle), + Wrapper.alloc, + Wrapper.read, + )); + } + + /// Stop reading data from the stream. The uv_read_cb callback will + /// no longer be called. + /// + /// This function is idempotent and may be safely called on a stopped + /// stream. + pub fn readStop(self: T) void { + // Docs say we can ignore this result. + _ = c.uv_read_stop(@ptrCast(*c.uv_stream_t, self.handle)); + } }; }