os: FlatpakHostCommand uses thread with its own event loop

This commit is contained in:
Mitchell Hashimoto
2023-02-27 10:18:56 -08:00
parent f89d202b0d
commit f64d871847
3 changed files with 233 additions and 77 deletions

View File

@ -28,12 +28,13 @@ const TIOCSWINSZ = if (builtin.os.tag == .macos) 2148037735 else c.TIOCSWINSZ;
const TIOCGWINSZ = if (builtin.os.tag == .macos) 1074295912 else c.TIOCGWINSZ;
/// Redeclare this winsize struct so we can just use a Zig struct. This
/// layout should be correct on all tested platforms.
/// layout should be correct on all tested platforms. The defaults on this
/// are some reasonable screen size but you should probably not use them.
const winsize = extern struct {
ws_row: u16,
ws_col: u16,
ws_xpixel: u16,
ws_ypixel: u16,
ws_row: u16 = 100,
ws_col: u16 = 80,
ws_xpixel: u16 = 800,
ws_ypixel: u16 = 600,
};
pub extern "c" fn setsid() std.c.pid_t;

View File

@ -12,7 +12,14 @@ pub fn isFlatpak() bool {
}
/// A struct to help execute commands on the host via the
/// org.freedesktop.Flatpak.Development DBus module.
/// org.freedesktop.Flatpak.Development DBus module. This uses GIO/GLib
/// under the hood.
///
/// This always spawns its own thread and maintains its own GLib event loop.
/// This makes it easy for the command to behave synchronously similar to
/// std.process.ChildProcess.
///
/// Requires GIO, GLib to be available and linked.
pub const FlatpakHostCommand = struct {
const Allocator = std.mem.Allocator;
const fd_t = std.os.fd_t;
@ -34,77 +41,169 @@ pub const FlatpakHostCommand = struct {
/// does not send any environment variables.
env: ?*const EnvMap = null,
/// File descriptors to send to the child process.
stdin: StdIo = .{ .devnull = null },
stdout: StdIo = .{ .devnull = null },
stderr: StdIo = .{ .devnull = null },
/// File descriptors to send to the child process. It is up to the
/// caller to create the file descriptors and set them up.
stdin: fd_t,
stdout: fd_t,
stderr: fd_t,
/// Process ID is set after spawn is called.
pid: ?c_int = null,
/// State of the process. This is updated by the dedicated thread it
/// runs in and is protected by the given lock and condition variable.
state: State = .{ .init = {} },
state_mutex: std.Thread.Mutex = .{},
state_cv: std.Thread.Condition = .{},
pub const StdIo = union(enum) {
// Drop the input/output to /dev/null. The value should be NULL
// and the spawn functil will take care of initializing and closing.
devnull: ?fd_t,
/// State the process is in. This can't be inspected directly, you
/// must use getters on the struct to get access.
const State = union(enum) {
/// Initial state
init: void,
/// Setup the stdio to be a pipe. The value should be set to NULL
/// to start and the spawn function will take care of initializing
/// the pipe.
pipe: ?fd_t,
/// Error starting. The error message is only available via logs.
/// (This isn't a fundamental limitation, just didn't need the
/// error message yet)
err: void,
fn setup(self: *StdIo) !fd_t {
switch (self.*) {
.devnull => |*v| {
assert(v.* == null);
/// Process started with the given pid on the host.
started: struct {
pid: c_int,
subscription: c.guint,
loop: *c.GMainLoop,
},
// Slight optimization potential: we can open /dev/null
// exactly once but its so rare that we use it that I
// didn't care to optimize this at this time.
const fd = std.os.openZ("/dev/null", std.os.O.RDWR, 0) catch |err| switch (err) {
error.PathAlreadyExists => unreachable,
error.NoSpaceLeft => unreachable,
error.FileTooBig => unreachable,
error.DeviceBusy => unreachable,
error.FileLocksNotSupported => unreachable,
error.BadPathName => unreachable, // Windows-only
error.InvalidHandle => unreachable, // WASI-only
error.WouldBlock => unreachable,
else => |e| return e,
};
v.* = fd;
return fd;
},
.pipe => unreachable,
}
}
/// Process exited
exited: struct {
pid: c_int,
status: u8,
},
};
/// Spawn the command. This will start the host command and set the
/// Execute the command and wait for it to finish. This will automatically
/// read all the data from the provided stdout/stderr fds and return them
/// in the result.
///
/// This runs the exec in a dedicated thread with a dedicated GLib
/// event loop so that it can run synchronously.
pub fn exec(self: *FlatpakHostCommand, alloc: Allocator) !void {
const thread = try std.Thread.spawn(.{}, threadMain, .{ self, alloc });
thread.join();
}
/// Spawn the command. This will start the host command. On return,
/// the pid will be available. This must only be called with the
/// state in "init".
///
/// Precondition: The self pointer MUST be stable.
pub fn spawn(self: *FlatpakHostCommand, alloc: Allocator) !c_int {
const thread = try std.Thread.spawn(.{}, threadMain, .{ self, alloc });
thread.setName("flatpak-host-command") catch {};
// Wait for the process to start or error.
self.state_mutex.lock();
defer self.state_mutex.unlock();
while (self.state == .init) self.state_cv.wait(&self.state_mutex);
return switch (self.state) {
.init => unreachable,
.err => error.FlatpakSpawnFail,
.started => |v| v.pid,
.exited => |v| v.pid,
};
}
/// Wait for the process to end and return the exit status. This
/// can only be called ONCE. Once this returns, the state is reset.
pub fn wait(self: *FlatpakHostCommand) !u8 {
self.state_mutex.lock();
defer self.state_mutex.unlock();
while (true) {
switch (self.state) {
.init => return error.FlatpakCommandNotStarted,
.err => return error.FlatpakSpawnFail,
.started => {},
.exited => |v| {
self.state = .{ .init = {} };
self.state_cv.broadcast();
return v.status;
},
}
self.state_cv.wait(&self.state_mutex);
}
}
fn threadMain(self: *FlatpakHostCommand, alloc: Allocator) void {
// Create a new thread-local context so that all our sources go
// to this context and we can run our loop correctly.
const ctx = c.g_main_context_new();
defer c.g_main_context_unref(ctx);
c.g_main_context_push_thread_default(ctx);
defer c.g_main_context_pop_thread_default(ctx);
// Get our loop for the current thread
const loop = c.g_main_loop_new(ctx, 1).?;
defer c.g_main_loop_unref(loop);
// Get our bus connection. This has to remain active until we exit
// the thread otherwise our signals won't be called.
var g_err: [*c]c.GError = null;
const bus = c.g_bus_get_sync(c.G_BUS_TYPE_SESSION, null, &g_err) orelse {
log.warn("spawn error getting bus: {s}", .{g_err.*.message});
self.updateState(.{ .err = {} });
return;
};
defer c.g_object_unref(bus);
// Spawn the command first. This will setup all our IO.
self.start(alloc, bus, loop) catch |err| {
log.warn("error starting host command: {}", .{err});
self.updateState(.{ .err = {} });
return;
};
// Run the event loop. It quits in the exit callback.
c.g_main_loop_run(loop);
}
/// Start the command. This will start the host command and set the
/// pid field on success. This will not wait for completion.
pub fn spawn(self: *FlatpakHostCommand, alloc: Allocator) !void {
///
/// Once this is called, the self pointer MUST remain stable. This
/// requirement is due to using GLib under the covers with callbacks.
fn start(
self: *FlatpakHostCommand,
alloc: Allocator,
bus: *c.GDBusConnection,
loop: *c.GMainLoop,
) !void {
var err: [*c]c.GError = null;
var arena_allocator = std.heap.ArenaAllocator.init(alloc);
defer arena_allocator.deinit();
const arena = arena_allocator.allocator();
var err: [*c]c.GError = null;
const bus = c.g_bus_get_sync(c.G_BUS_TYPE_SESSION, null, &err) orelse {
log.warn("spawn error getting bus: {s}", .{err.*.message});
return error.FlatpakDbusFailed;
};
defer c.g_object_unref(bus);
// Our list of file descriptors that we need to send to the process.
const fd_list = c.g_unix_fd_list_new();
defer c.g_object_unref(fd_list);
if (c.g_unix_fd_list_append(fd_list, self.stdin, &err) < 0) {
log.warn("error adding fd: {s}", .{err.*.message});
return error.FlatpakFdFailed;
}
if (c.g_unix_fd_list_append(fd_list, self.stdout, &err) < 0) {
log.warn("error adding fd: {s}", .{err.*.message});
return error.FlatpakFdFailed;
}
if (c.g_unix_fd_list_append(fd_list, self.stderr, &err) < 0) {
log.warn("error adding fd: {s}", .{err.*.message});
return error.FlatpakFdFailed;
}
// Build our arguments for the file descriptors.
const fd_builder = c.g_variant_builder_new(c.G_VARIANT_TYPE("a{uh}"));
defer c.g_variant_builder_unref(fd_builder);
try setupFd(&self.stdin, 0, fd_list, fd_builder);
try setupFd(&self.stdout, 1, fd_list, fd_builder);
try setupFd(&self.stderr, 2, fd_list, fd_builder);
c.g_variant_builder_add(fd_builder, "{uh}", @as(c_int, 0), self.stdin);
c.g_variant_builder_add(fd_builder, "{uh}", @as(c_int, 1), self.stdout);
c.g_variant_builder_add(fd_builder, "{uh}", @as(c_int, 2), self.stderr);
// Build our env vars
const env_builder = c.g_variant_builder_new(c.G_VARIANT_TYPE("a{ss}"));
@ -148,6 +247,21 @@ pub const FlatpakHostCommand = struct {
_ = c.g_variant_ref_sink(params); // take ownership
defer c.g_variant_unref(params);
// Subscribe to exit notifications
const subscription_id = c.g_dbus_connection_signal_subscribe(
bus,
"org.freedesktop.Flatpak",
"org.freedesktop.Flatpak.Development",
"HostCommandExited",
"/org/freedesktop/Flatpak/Development",
null,
0,
onExit,
self,
null,
);
errdefer c.g_dbus_connection_signal_unsubscribe(bus, subscription_id);
// Go!
const reply = c.g_dbus_connection_call_with_unix_fd_list_sync(
bus,
@ -171,28 +285,61 @@ pub const FlatpakHostCommand = struct {
var pid: c_int = 0;
c.g_variant_get(reply, "(u)", &pid);
log.debug("HostCommand started pid={}", .{pid});
log.debug("HostCommand started pid={} subscription={}", .{
pid,
subscription_id,
});
self.pid = pid;
self.updateState(.{
.started = .{
.pid = pid,
.subscription = subscription_id,
.loop = loop,
},
});
}
/// Helper to setup our io fd and add it to the necessary fd
/// list for sending to the child and parameter list for calling our
/// API.
fn setupFd(
stdio: *StdIo,
child_fd: fd_t,
list: *c.GUnixFDList,
builder: *c.GVariantBuilder,
) !void {
const fd = try stdio.setup();
/// Helper to update the state and notify waiters via the cv.
fn updateState(self: *FlatpakHostCommand, state: State) void {
self.state_mutex.lock();
defer self.state_mutex.unlock();
defer self.state_cv.broadcast();
self.state = state;
}
var err: [*c]c.GError = null;
if (c.g_unix_fd_list_append(list, fd, &err) < 0) {
log.warn("error adding fd: {s}", .{err.*.message});
return error.FlatpakFdFailed;
}
fn onExit(
bus: ?*c.GDBusConnection,
_: [*c]const u8,
_: [*c]const u8,
_: [*c]const u8,
_: [*c]const u8,
params: ?*c.GVariant,
ud: ?*anyopaque,
) callconv(.C) void {
const self = @ptrCast(*FlatpakHostCommand, @alignCast(@alignOf(FlatpakHostCommand), ud));
const state = state: {
self.state_mutex.lock();
defer self.state_mutex.unlock();
break :state self.state.started;
};
c.g_variant_builder_add(builder, "{uh}", child_fd, fd);
var pid: c_int = 0;
var exit_status: c_int = 0;
c.g_variant_get(params.?, "(uu)", &pid, &exit_status);
if (state.pid != pid) return;
// Update our state
self.updateState(.{
.exited = .{
.pid = pid,
.status = std.math.cast(u8, exit_status) orelse 255,
},
});
// We're done now, so we can unsubscribe
c.g_dbus_connection_signal_unsubscribe(bus.?, state.subscription);
// We are also done with our loop so we can exit.
c.g_main_loop_quit(state.loop);
}
};

View File

@ -55,6 +55,10 @@ pub fn get(alloc: Allocator) !Entry {
// utilities properly until we get a login shell.
if (internal_os.isFlatpak()) {
log.info("flatpak detected, will use host-spawn to get our entry", .{});
const Pty = @import("Pty.zig");
var pty = try Pty.open(.{});
defer pty.deinit();
var cmd: internal_os.FlatpakHostCommand = .{
.argv = &[_][]const u8{
"/bin/sh",
@ -66,8 +70,12 @@ pub fn get(alloc: Allocator) !Entry {
.{std.mem.sliceTo(pw.pw_name, 0)},
),
},
.stdin = pty.slave,
.stdout = pty.slave,
.stderr = pty.slave,
};
try cmd.spawn(alloc);
_ = try cmd.spawn(alloc);
_ = try cmd.wait();
if (true) @panic("END");
const exec = try std.ChildProcess.exec(.{