From 2f457bbd97f403ca70cff818ed9f770d0c968f8f Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Mon, 25 Apr 2022 17:23:29 -0700 Subject: [PATCH] libuv: stream writes --- src/libuv/Pipe.zig | 73 +++++++++++++++++++++++++++++++++------ src/libuv/main.zig | 1 + src/libuv/stream.zig | 82 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 11 deletions(-) diff --git a/src/libuv/Pipe.zig b/src/libuv/Pipe.zig index 63d71b849..24feb9a74 100644 --- a/src/libuv/Pipe.zig +++ b/src/libuv/Pipe.zig @@ -10,7 +10,9 @@ const c = @import("c.zig"); const errors = @import("error.zig"); const Loop = @import("Loop.zig"); const Handle = @import("handle.zig").Handle; -const Stream = @import("stream.zig").Stream; +const stream = @import("stream.zig"); +const Stream = stream.Stream; +const WriteReq = stream.WriteReq; handle: *c.uv_pipe_t, @@ -33,17 +35,23 @@ pub const Flags = packed struct { } }; +/// Pair is a pair of ends to a single pipe. +pub const Pair = struct { + read: c.uv_file, + write: c.uv_file, +}; + /// Create a pair of connected pipe handles. Data may be written to fds[1] and /// read from fds[0]. The resulting handles can be passed to uv_pipe_open, /// used with uv_spawn, or for any other purpose. -pub fn pipe(read_flags: Flags, write_flags: Flags) ![2]c.uv_file { +pub fn pipe(read_flags: Flags, write_flags: Flags) !Pair { var res: [2]c.uv_file = undefined; try errors.convertError(c.uv_pipe( &res, read_flags.toInt(c_int), write_flags.toInt(c_int), )); - return res; + return Pair{ .read = res[0], .write = res[1] }; } pub fn init(alloc: Allocator, loop: Loop, ipc: bool) !Pipe { @@ -68,18 +76,61 @@ test { } test "Pipe" { - const pipes = try pipe(.{ .nonblock = true }, .{ .nonblock = true }); - defer std.os.close(pipes[1]); + const pair = try pipe(.{ .nonblock = true }, .{ .nonblock = true }); var loop = try Loop.init(testing.allocator); defer loop.deinit(testing.allocator); - var h = try init(testing.allocator, loop, false); - defer h.deinit(testing.allocator); - try h.open(pipes[0]); - try testing.expect(try h.isReadable()); - try testing.expect(!try h.isWritable()); + // Read side + var reader = try init(testing.allocator, loop, false); + defer reader.deinit(testing.allocator); - h.close(null); + try reader.open(pair.read); + try testing.expect(try reader.isReadable()); + try testing.expect(!try reader.isWritable()); + + // Write side + var writer = try init(testing.allocator, loop, false); + defer writer.deinit(testing.allocator); + + try writer.open(pair.write); + try testing.expect(!try writer.isReadable()); + try testing.expect(try writer.isWritable()); + + // Set our data that we'll use to assert + var data: TestData = .{}; + writer.setData(&data); + + // Write + var writeReq = try WriteReq.init(testing.allocator); + defer writeReq.deinit(testing.allocator); + + try writer.write( + writeReq, + &[_][]const u8{ + "hello", + }, + callback, + ); + + // Run write and verify success + _ = try loop.run(.once); + try testing.expectEqual(@as(u8, 1), data.count); + try testing.expectEqual(@as(i32, 0), data.status); + + // End + reader.close(null); + writer.close(null); _ = try loop.run(.default); } + +const TestData = struct { + count: u8 = 0, + status: i32 = 0, +}; + +fn callback(req: *WriteReq, status: i32) void { + var data = req.handle(Pipe).?.getData(TestData).?; + data.count += 1; + data.status = status; +} diff --git a/src/libuv/main.zig b/src/libuv/main.zig index 076d88a1e..40fa90ed5 100644 --- a/src/libuv/main.zig +++ b/src/libuv/main.zig @@ -10,6 +10,7 @@ pub const Error = @import("error.zig").Error; pub const Embed = @import("Embed.zig"); test { + _ = @import("stream.zig"); _ = @import("tests.zig"); _ = Loop; diff --git a/src/libuv/stream.zig b/src/libuv/stream.zig index 63203e327..d71e82fe1 100644 --- a/src/libuv/stream.zig +++ b/src/libuv/stream.zig @@ -1,7 +1,11 @@ const c = @import("c.zig"); +const std = @import("std"); +const Allocator = std.mem.Allocator; +const testing = std.testing; const Loop = @import("Loop.zig"); const errors = @import("error.zig"); +const Error = errors.Error; /// Returns a struct that has all the shared stream functions for the /// given stream type T. The type T must have a field named "handle". @@ -29,5 +33,83 @@ pub fn Stream(comptime T: type) type { try errors.convertError(res); return res > 0; } + + /// Write data to stream. Buffers are written in order. + pub fn write( + self: T, + req: WriteReq, + bufs: []const []const u8, + comptime cb: fn (req: *WriteReq, status: i32) void, + ) !void { + const Wrapper = struct { + fn callback(cbreq: [*c]c.uv_write_t, status: c_int) callconv(.C) void { + var newreq: WriteReq = .{ .req = cbreq }; + @call(.{ .modifier = .always_inline }, cb, .{ + &newreq, + @intCast(i32, status), + }); + } + }; + + // We can directly ptrCast bufs.ptr to a C pointer of uv_buf_t + // because they have the exact same layout in memory. We have a + // unit test below that keeps this true. + try errors.convertError(c.uv_write( + req.req, + @ptrCast(*c.uv_stream_t, self.handle), + @ptrCast([*c]const c.uv_buf_t, bufs.ptr), + @intCast(c_uint, bufs.len), + Wrapper.callback, + )); + } }; } + +/// Write request type. Careful attention must be paid when reusing objects +/// of this type. When a stream is in non-blocking mode, write requests sent +/// with uv_write will be queued. Reusing objects at this point is undefined +/// behaviour. It is safe to reuse the uv_write_t object only after the +/// callback passed to uv_write is fired. +pub const WriteReq = struct { + req: *c.uv_write_t, + + pub fn init(alloc: Allocator) !WriteReq { + var req = try alloc.create(c.uv_write_t); + errdefer alloc.destroy(req); + return WriteReq{ .req = req }; + } + + pub fn deinit(self: *WriteReq, alloc: Allocator) void { + alloc.destroy(self.req); + self.* = undefined; + } + + /// Pointer to the stream where this write request is running. + /// T should be a high-level handle type such as "Pipe". + pub fn handle(self: WriteReq, comptime T: type) ?T { + const tInfo = @typeInfo(T).Struct; + const HandleType = tInfo.fields[0].field_type; + + return if (self.req.handle) |ptr| + return T{ .handle = @ptrCast(HandleType, ptr) } + else + null; + } + + test "Write: create and destroy" { + var h = try init(testing.allocator); + defer h.deinit(testing.allocator); + } +}; + +test { + _ = WriteReq; +} + +test "uv_buf_t and slices are the same" { + // Verify that the fields are also the same + var slice: []const u8 = &[_]u8{ 1, 2, 3 }; + var buf = @bitCast(c.uv_buf_t, slice); + try testing.expectEqual(slice.ptr, buf.base); + try testing.expectEqual(slice.len, buf.len); +}