summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c109
1 files changed, 107 insertions, 2 deletions
diff --git a/thread.c b/thread.c
index 7ecc535b88..13fef6be9a 100644
--- a/thread.c
+++ b/thread.c
@@ -109,6 +109,8 @@ static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
+static ID id_wait_for_single_fd;
+
enum SLEEP_FLAGS {
SLEEP_DEADLOCKABLE = 0x1,
SLEEP_SPURIOUS_CHECK = 0x2
@@ -708,6 +710,11 @@ thread_do_start(rb_thread_t *th)
else {
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
}
+
+ VALUE scheduler = th->scheduler;
+ if (scheduler != Qnil) {
+ rb_funcall(scheduler, rb_intern("run"), 0);
+ }
}
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
@@ -1471,6 +1478,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_thread_t *th = rb_ec_thread_ptr(ec);
int saved_errno = 0;
VALUE ubf_th = Qfalse;
+ VALUE scheduler = th->scheduler;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
@@ -1485,6 +1493,10 @@ rb_nogvl(void *(*func)(void *), void *data1,
}
}
+ if (scheduler != Qnil) {
+ rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0);
+ }
+
BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
@@ -1500,6 +1512,10 @@ rb_nogvl(void *(*func)(void *), void *data1,
thread_value(rb_thread_kill(ubf_th));
}
+ if (scheduler != Qnil) {
+ rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0);
+ }
+
errno = saved_errno;
return val;
@@ -3574,6 +3590,63 @@ rb_thread_variables(VALUE thread)
return ary;
}
+VALUE rb_thread_scheduler_get(VALUE thread)
+{
+ rb_thread_t * th = rb_thread_ptr(thread);
+
+ VM_ASSERT(th);
+
+ return th->scheduler;
+}
+
+VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler)
+{
+ rb_thread_t * th = rb_thread_ptr(thread);
+
+ VM_ASSERT(th);
+
+ th->scheduler = scheduler;
+
+ return th->scheduler;
+}
+
+/*
+ * call-seq:
+ * Thread.scheduler -> scheduler or nil
+ *
+ * Returns the current scheduler if scheduling operations are permitted.
+ *
+ */
+
+static VALUE
+rb_thread_scheduler(VALUE klass)
+{
+ return rb_current_thread_scheduler();
+}
+
+VALUE rb_current_thread_scheduler(void)
+{
+ rb_thread_t * th = GET_THREAD();
+
+ VM_ASSERT(th);
+
+ if (th->blocking == 0)
+ return th->scheduler;
+ else
+ return Qnil;
+}
+
+static VALUE
+rb_thread_blocking_p(VALUE thread)
+{
+ unsigned blocking = rb_thread_ptr(thread)->blocking;
+
+ if (blocking == 0)
+ return Qfalse;
+
+ return INT2NUM(blocking);
+}
+
/*
* call-seq:
* thr.thread_variable?(key) -> true or false
@@ -4129,6 +4202,15 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
+static VALUE
+rb_thread_timeout(struct timeval *timeout) {
+ if (timeout) {
+ return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
+ }
+
+ return Qnil;
+}
+
#ifdef USE_POLL
/* The same with linux kernel. TODO: make platform independent definition. */
@@ -4155,6 +4237,14 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct waiting_fd wfd;
int state;
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
+ rb_thread_timeout(timeout)
+ );
+ return RTEST(result);
+ }
+
wfd.th = GET_THREAD();
wfd.fd = fd;
list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
@@ -4287,8 +4377,16 @@ select_single_cleanup(VALUE ptr)
}
int
-rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
+rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
+ VALUE scheduler = rb_current_thread_scheduler();
+ if (scheduler != Qnil) {
+ VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
+ rb_thread_timeout(timeout)
+ );
+ return RTEST(result);
+ }
+
rb_fdset_t rfds, wfds, efds;
struct select_args args;
int r;
@@ -4298,7 +4396,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
- args.tv = tv;
+ args.tv = timeout;
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
@@ -5185,6 +5283,8 @@ Init_Thread(void)
sym_immediate = ID2SYM(rb_intern("immediate"));
sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
+ id_wait_for_single_fd = rb_intern("wait_for_single_fd");
+
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
@@ -5223,6 +5323,7 @@ Init_Thread(void)
rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
+ rb_define_method(rb_cThread, "blocking?", rb_thread_blocking_p, 0);
rb_define_method(rb_cThread, "status", rb_thread_status, 0);
rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
@@ -5239,6 +5340,10 @@ Init_Thread(void)
rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
+ rb_define_singleton_method(rb_cThread, "scheduler", rb_thread_scheduler, 0);
+ rb_define_method(rb_cThread, "scheduler", rb_thread_scheduler_get, 0);
+ rb_define_method(rb_cThread, "scheduler=", rb_thread_scheduler_set, 1);
+
rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);