summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-12-08 09:29:09 +1300
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-12-09 08:55:35 +1300
commit2553c5f94a5d51c2c5876b31e4c1521ad9be12f6 (patch)
treefc7b8fe6e578424b15dea0f8b94caa7a72b5c0a1
parenta4a92ae6d99a75e11165ca09c44ccf47cf342047 (diff)
Add support for non-blocking `Process.wait`.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3853
-rw-r--r--common.mk1
-rw-r--r--doc/scheduler.md11
-rw-r--r--include/ruby/internal/intern/process.h4
-rw-r--r--internal/scheduler.h3
-rw-r--r--io.c10
-rw-r--r--process.c212
-rw-r--r--scheduler.c14
-rw-r--r--test/fiber/scheduler.rb7
-rw-r--r--test/fiber/test_process.rb36
-rw-r--r--test/ruby/test_process.rb6
10 files changed, 260 insertions, 44 deletions
diff --git a/common.mk b/common.mk
index 158828a11c..c2d6a7235a 100644
--- a/common.mk
+++ b/common.mk
@@ -10002,6 +10002,7 @@ process.$(OBJEXT): $(top_srcdir)/internal/thread.h
process.$(OBJEXT): $(top_srcdir)/internal/variable.h
process.$(OBJEXT): $(top_srcdir)/internal/vm.h
process.$(OBJEXT): $(top_srcdir)/internal/warnings.h
+process.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
process.$(OBJEXT): {$(VPATH)}assert.h
process.$(OBJEXT): {$(VPATH)}backward/2/assume.h
process.$(OBJEXT): {$(VPATH)}backward/2/attributes.h
diff --git a/doc/scheduler.md b/doc/scheduler.md
index 9994831663..a6e2d78224 100644
--- a/doc/scheduler.md
+++ b/doc/scheduler.md
@@ -12,6 +12,17 @@ This is the interface you need to implement.
~~~ ruby
class Scheduler
+ # Wait for the specified process ID to exit.
+ # This hook is optional.
+ # @parameter pid [Integer] The process ID to wait for.
+ # @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
+ # @returns [Process::Status] A process status instance.
+ def process_wait(pid, flags)
+ Thread.new do
+ Process::Status.wait(pid, flags)
+ end.value
+ end
+
# Wait for the given file descriptor to match the specified events within
# the specified timeout.
# @parameter event [Integer] A bit mask of `IO::READABLE`,
diff --git a/include/ruby/internal/intern/process.h b/include/ruby/internal/intern/process.h
index 2b1005a205..bcb5a7e364 100644
--- a/include/ruby/internal/intern/process.h
+++ b/include/ruby/internal/intern/process.h
@@ -28,7 +28,9 @@
RBIMPL_SYMBOL_EXPORT_BEGIN()
/* process.c */
-void rb_last_status_set(int status, rb_pid_t pid);
+RUBY_EXTERN void (* rb_socket_before_fork_func)();
+
+void rb_last_status_set(rb_pid_t pid, int status, int error);
VALUE rb_last_status_get(void);
int rb_proc_exec(const char*);
diff --git a/internal/scheduler.h b/internal/scheduler.h
index 472edc6368..8314020220 100644
--- a/internal/scheduler.h
+++ b/internal/scheduler.h
@@ -25,6 +25,9 @@ VALUE rb_scheduler_close(VALUE scheduler);
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
+int rb_scheduler_supports_process_wait(VALUE scheduler);
+VALUE rb_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags);
+
VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber);
diff --git a/io.c b/io.c
index 0182042b39..b5553b5783 100644
--- a/io.c
+++ b/io.c
@@ -4913,9 +4913,9 @@ fptr_waitpid(rb_io_t *fptr, int nohang)
{
int status;
if (fptr->pid) {
- rb_last_status_clear();
- rb_waitpid(fptr->pid, &status, nohang ? WNOHANG : 0);
- fptr->pid = 0;
+ rb_last_status_clear();
+ rb_waitpid(fptr->pid, &status, nohang ? WNOHANG : 0);
+ fptr->pid = 0;
}
}
@@ -6433,11 +6433,11 @@ pipe_finalize(rb_io_t *fptr, int noraise)
#if !defined(HAVE_WORKING_FORK) && !defined(_WIN32)
int status = 0;
if (fptr->stdio_file) {
- status = pclose(fptr->stdio_file);
+ status = pclose(fptr->stdio_file);
}
fptr->fd = -1;
fptr->stdio_file = 0;
- rb_last_status_set(status, fptr->pid);
+ rb_last_status_set(fptr->pid, status, 0);
#else
fptr_finalize(fptr, noraise);
#endif
diff --git a/process.c b/process.c
index 8ad555f8f6..acc9de1fd0 100644
--- a/process.c
+++ b/process.c
@@ -14,6 +14,7 @@
#include "ruby/internal/config.h"
#include "internal/scheduler.h"
+#include "coroutine/Stack.h"
#include <ctype.h>
#include <errno.h>
@@ -568,6 +569,27 @@ proc_get_ppid(VALUE _)
static VALUE rb_cProcessStatus;
+struct rb_process_status {
+ rb_pid_t pid;
+ int status;
+ int error;
+};
+
+static const rb_data_type_t rb_process_status_type = {
+ .wrap_struct_name = "Process::Status",
+ .function = {
+ .dfree = RUBY_DEFAULT_FREE,
+ },
+ .data = NULL,
+ .flags = RUBY_TYPED_FREE_IMMEDIATELY,
+};
+
+static VALUE rb_process_status_allocate(VALUE klass) {
+ struct rb_process_status *data = NULL;
+
+ return TypedData_Make_Struct(klass, struct rb_process_status, &rb_process_status_type, data);
+}
+
VALUE
rb_last_status_get(void)
{
@@ -596,13 +618,20 @@ proc_s_last_status(VALUE mod)
}
void
-rb_last_status_set(int status, rb_pid_t pid)
+rb_last_status_set(rb_pid_t pid, int status, int error)
{
rb_thread_t *th = GET_THREAD();
- th->last_status = rb_obj_alloc(rb_cProcessStatus);
- rb_ivar_set(th->last_status, id_status, INT2FIX(status));
- rb_ivar_set(th->last_status, id_pid, PIDT2NUM(pid));
- rb_obj_freeze(th->last_status);
+
+ VALUE last_status = rb_process_status_allocate(rb_cProcessStatus);
+
+ struct rb_process_status *data = RTYPEDDATA_DATA(last_status);
+ data->pid = pid;
+ data->status = status;
+ data->error = error;
+
+ rb_obj_freeze(last_status);
+
+ th->last_status = last_status;
}
void
@@ -624,9 +653,11 @@ rb_last_status_clear(void)
*/
static VALUE
-pst_to_i(VALUE st)
+pst_to_i(VALUE self)
{
- return rb_ivar_get(st, id_status);
+ struct rb_process_status *data = RTYPEDDATA_DATA(self);
+
+ return RB_INT2NUM(data->status);
}
#define PST2INT(st) NUM2INT(pst_to_i(st))
@@ -643,9 +674,11 @@ pst_to_i(VALUE st)
*/
static VALUE
-pst_pid(VALUE st)
+pst_pid(VALUE self)
{
- return rb_attr_get(st, id_pid);
+ struct rb_process_status *data = RTYPEDDATA_DATA(self);
+
+ return PIDT2NUM(data->pid);
}
static VALUE pst_message_status(VALUE str, int status);
@@ -1104,6 +1137,7 @@ waitpid_state_init(struct waitpid_state *w, rb_pid_t pid, int options)
w->ret = 0;
w->pid = pid;
w->options = options;
+ w->errnum = 0;
}
static const rb_hrtime_t *
@@ -1214,8 +1248,10 @@ waitpid_wait(struct waitpid_state *w)
*/
rb_native_mutex_lock(&vm->waitpid_lock);
- if (w->pid > 0 || list_empty(&vm->waiting_pids))
+ if (w->pid > 0 || list_empty(&vm->waiting_pids)) {
w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG);
+ }
+
if (w->ret) {
if (w->ret == -1) w->errnum = errno;
}
@@ -1264,35 +1300,125 @@ waitpid_no_SIGCHLD(struct waitpid_state *w)
w->errnum = errno;
}
-rb_pid_t
-rb_waitpid(rb_pid_t pid, int *st, int flags)
+/*
+ * call-seq:
+ * Process::Status.wait(pid=-1, flags=0) -> Process::Status
+ *
+ * Waits for a child process to exit and returns a Process::Status object
+ * containing information on that process. Which child it waits on
+ * depends on the value of _pid_:
+ *
+ * > 0:: Waits for the child whose process ID equals _pid_.
+ *
+ * 0:: Waits for any child whose process group ID equals that of the
+ * calling process.
+ *
+ * -1:: Waits for any child process (the default if no _pid_ is
+ * given).
+ *
+ * < -1:: Waits for any child whose process group ID equals the absolute
+ * value of _pid_.
+ *
+ * The _flags_ argument may be a logical or of the flag values
+ * Process::WNOHANG (do not block if no child available)
+ * or Process::WUNTRACED (return stopped children that
+ * haven't been reported). Not all flags are available on all
+ * platforms, but a flag value of zero will work on all platforms.
+ *
+ * Calling this method raises a SystemCallError if there are no child
+ * processes. Not available on all platforms.
+ *
+ * May invoke the scheduler hook _process_wait_.
+ *
+ * fork { exit 99 } #=> 27429
+ * Process::Status.wait #=> pid 27429 exit 99
+ * $? #=> nil
+ *
+ * pid = fork { sleep 3 } #=> 27440
+ * Time.now #=> 2008-03-08 19:56:16 +0900
+ * Process::Status.wait(pid, Process::WNOHANG) #=> nil
+ * Time.now #=> 2008-03-08 19:56:16 +0900
+ * Process::Status.wait(pid, 0) #=> pid 27440 exit 99
+ * Time.now #=> 2008-03-08 19:56:19 +0900
+ */
+VALUE rb_process_status_wait(rb_pid_t pid, int flags)
{
- struct waitpid_state w;
+ // We only enter the scheduler if we are "blocking":
+ if (!(flags & WNOHANG)) {
+ VALUE scheduler = rb_scheduler_current();
+ if (rb_scheduler_supports_process_wait(scheduler)) {
+ return rb_scheduler_process_wait(scheduler, pid, flags);
+ }
+ }
- waitpid_state_init(&w, pid, flags);
- w.ec = GET_EC();
+ COROUTINE_STACK_LOCAL(struct waitpid_state, w);
+
+ waitpid_state_init(w, pid, flags);
+ w->ec = GET_EC();
if (WAITPID_USE_SIGCHLD) {
- waitpid_wait(&w);
+ waitpid_wait(w);
}
else {
- waitpid_no_SIGCHLD(&w);
+ waitpid_no_SIGCHLD(w);
}
- if (st) *st = w.status;
- if (w.ret == -1) {
- errno = w.errnum;
- }
- else if (w.ret > 0) {
+ if (w->ret > 0) {
if (ruby_nocldwait) {
- w.ret = -1;
- errno = ECHILD;
- }
- else {
- rb_last_status_set(w.status, w.ret);
+ w->ret = -1;
+ w->errnum = ECHILD;
}
}
- return w.ret;
+
+ VALUE status = rb_process_status_allocate(rb_cProcessStatus);
+
+ struct rb_process_status *data = RTYPEDDATA_DATA(status);
+ data->pid = w->ret;
+ data->status = w->status;
+ data->error = w->errnum;
+
+ COROUTINE_STACK_FREE(w);
+
+ return status;
+}
+
+VALUE
+rb_process_status_waitv(int argc, VALUE *argv, VALUE _)
+{
+ rb_check_arity(argc, 0, 2);
+
+ rb_pid_t pid = -1;
+ int flags = 0;
+
+ if (argc >= 1) {
+ pid = NUM2PIDT(argv[0]);
+ }
+
+ if (argc >= 2) {
+ flags = RB_NUM2INT(argv[1]);
+ }
+
+ return rb_process_status_wait(pid, flags);
+}
+
+rb_pid_t
+rb_waitpid(rb_pid_t pid, int *st, int flags)
+{
+ VALUE status = rb_process_status_wait(pid, flags);
+ struct rb_process_status *data = RTYPEDDATA_DATA(status);
+
+ if (st) *st = data->status;
+
+ if (data->pid == -1) {
+ errno = data->error;
+ }
+ else {
+ rb_obj_freeze(status);
+ GET_THREAD()->last_status = status;
+ }
+
+ RB_GC_GUARD(status);
+ return data->pid;
}
static VALUE
@@ -1312,12 +1438,15 @@ proc_wait(int argc, VALUE *argv)
flags = NUM2UINT(vflags);
}
}
+
if ((pid = rb_waitpid(pid, &status, flags)) < 0)
rb_sys_fail(0);
+
if (pid == 0) {
rb_last_status_clear();
return Qnil;
}
+
return PIDT2NUM(pid);
}
@@ -4411,8 +4540,7 @@ rb_spawn_process(struct rb_execarg *eargp, char *errmsg, size_t errmsg_buflen)
#endif
#if defined HAVE_WORKING_FORK && !USE_SPAWNV
- pid = fork_check_err(0, rb_exec_atfork, eargp, eargp->redirect_fds,
- errmsg, errmsg_buflen, eargp);
+ pid = fork_check_err(0, rb_exec_atfork, eargp, eargp->redirect_fds, errmsg, errmsg_buflen, eargp);
#else
prog = eargp->use_shell ? eargp->invoke.sh.shell_script : eargp->invoke.cmd.command_name;
@@ -4426,32 +4554,37 @@ rb_spawn_process(struct rb_execarg *eargp, char *errmsg, size_t errmsg_buflen)
}
# if defined HAVE_SPAWNV
if (eargp->use_shell) {
- pid = proc_spawn_sh(RSTRING_PTR(prog));
+ pid = proc_spawn_sh(RSTRING_PTR(prog));
}
else {
char **argv = ARGVSTR2ARGV(eargp->invoke.cmd.argv_str);
- pid = proc_spawn_cmd(argv, prog, eargp);
+ pid = proc_spawn_cmd(argv, prog, eargp);
+ }
+
+ if (pid == -1) {
+ rb_last_status_set(pid, 0x7f << 8, 0);
}
- if (pid == -1)
- rb_last_status_set(0x7f << 8, 0);
# else
status = system(rb_execarg_commandline(eargp, &prog));
- rb_last_status_set((status & 0xff) << 8, 0);
pid = 1; /* dummy */
+ rb_last_status_set(pid, (status & 0xff) << 8, 0);
# endif
+
if (eargp->waitpid_state && eargp->waitpid_state != WAITPID_LOCK_ONLY) {
eargp->waitpid_state->pid = pid;
}
+
rb_execarg_run_options(&sarg, NULL, errmsg, errmsg_buflen);
#endif
+
return pid;
}
struct spawn_args {
VALUE execarg;
struct {
- char *ptr;
- size_t buflen;
+ char *ptr;
+ size_t buflen;
} errmsg;
};
@@ -4587,7 +4720,7 @@ rb_f_system(int argc, VALUE *argv, VALUE _)
else {
waitpid_no_SIGCHLD(w);
}
- rb_last_status_set(w->status, w->ret);
+ rb_last_status_set(w->ret, w->status, 0);
}
#endif
if (w->pid < 0 /* fork failure */ || pid < 0 /* exec failure */) {
@@ -8502,8 +8635,11 @@ InitVM_process(void)
rb_define_method(rb_cWaiter, "pid", detach_process_pid, 0);
rb_cProcessStatus = rb_define_class_under(rb_mProcess, "Status", rb_cObject);
+ rb_define_alloc_func(rb_cProcessStatus, rb_process_status_allocate);
rb_undef_method(CLASS_OF(rb_cProcessStatus), "new");
+ rb_define_singleton_method(rb_cProcessStatus, "wait", rb_process_status_waitv, -1);
+
rb_define_method(rb_cProcessStatus, "==", pst_equal, 1);
rb_define_method(rb_cProcessStatus, "&", pst_bitand, 1);
rb_define_method(rb_cProcessStatus, ">>", pst_rshift, 1);
diff --git a/scheduler.c b/scheduler.c
index 9ecb25f188..88db433f1e 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -18,6 +18,7 @@ static ID id_block;
static ID id_unblock;
static ID id_kernel_sleep;
+static ID id_process_wait;
static ID id_io_read;
static ID id_io_write;
@@ -32,6 +33,7 @@ Init_Scheduler(void)
id_unblock = rb_intern_const("unblock");
id_kernel_sleep = rb_intern_const("kernel_sleep");
+ id_process_wait = rb_intern_const("process_wait");
id_io_read = rb_intern_const("io_read");
id_io_write = rb_intern_const("io_write");
@@ -118,6 +120,18 @@ rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
+int
+rb_scheduler_supports_process_wait(VALUE scheduler)
+{
+ return rb_respond_to(scheduler, id_process_wait);
+}
+
+VALUE
+rb_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
+{
+ return rb_funcall(scheduler, id_process_wait, 2, PIDT2NUM(pid), RB_INT2NUM(flags));
+}
+
VALUE
rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{
diff --git a/test/fiber/scheduler.rb b/test/fiber/scheduler.rb
index 7cf0c26459..b3c3eaff59 100644
--- a/test/fiber/scheduler.rb
+++ b/test/fiber/scheduler.rb
@@ -117,6 +117,13 @@ class Scheduler
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
+ def process_wait(pid, flags)
+ # This is a very simple way to implement a non-blocking wait:
+ Thread.new do
+ Process::Status.wait(pid, flags)
+ end.value
+ end
+
def io_wait(io, events, duration)
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
diff --git a/test/fiber/test_process.rb b/test/fiber/test_process.rb
new file mode 100644
index 0000000000..c6583cac9b
--- /dev/null
+++ b/test/fiber/test_process.rb
@@ -0,0 +1,36 @@
+# frozen_string_literal: true
+require 'test/unit'
+require_relative 'scheduler'
+
+class TestFiberProcess < Test::Unit::TestCase
+ def test_process_wait
+ Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ pid = Process.spawn("true")
+ Process.wait(pid)
+
+ # TODO test that scheduler was invoked.
+
+ assert_predicate $?, :success?
+ end
+ end.join
+ end
+
+ def test_system
+ Thread.new do
+ scheduler = Scheduler.new
+ Fiber.set_scheduler scheduler
+
+ Fiber.schedule do
+ system("true")
+
+ # TODO test that scheduler was invoked (currently it's not).
+
+ assert_predicate $?, :success?
+ end
+ end.join
+ end
+end
diff --git a/test/ruby/test_process.rb b/test/ruby/test_process.rb
index 65af494bd6..4f96a623e0 100644
--- a/test/ruby/test_process.rb
+++ b/test/ruby/test_process.rb
@@ -2492,6 +2492,12 @@ EOS
assert_same(Process.last_status, $?)
end
+ def test_last_status_failure
+ assert_nil system("sad")
+ assert_not_predicate $?, :success?
+ assert_equal $?.exitstatus, 127
+ end
+
def test_exec_failure_leaves_no_child
assert_raise(Errno::ENOENT) do
spawn('inexistent_command')