diff options
author | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2020-05-14 22:10:55 +1200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-14 22:10:55 +1200 |
commit | 0e3b0fcdba70cf96a8e0654eb8f50aacb8024bd4 (patch) | |
tree | 74d381412dfd8ff49dd3039f8aeae09ad9e4e6e3 /thread.c | |
parent | 336119dfc5e6baae0a936d6feae780a61975479c (diff) |
Thread scheduler for light weight concurrency.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/3032
Merged-By: ioquatix <samuel@codeotaku.com>
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 109 |
1 files changed, 107 insertions, 2 deletions
@@ -109,6 +109,8 @@ static VALUE sym_immediate; static VALUE sym_on_blocking; static VALUE sym_never; +static ID id_wait_for_single_fd; + enum SLEEP_FLAGS { SLEEP_DEADLOCKABLE = 0x1, SLEEP_SPURIOUS_CHECK = 0x2 @@ -708,6 +710,11 @@ thread_do_start(rb_thread_t *th) else { th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); } + + VALUE scheduler = th->scheduler; + if (scheduler != Qnil) { + rb_funcall(scheduler, rb_intern("run"), 0); + } } void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); @@ -1471,6 +1478,7 @@ rb_nogvl(void *(*func)(void *), void *data1, rb_thread_t *th = rb_ec_thread_ptr(ec); int saved_errno = 0; VALUE ubf_th = Qfalse; + VALUE scheduler = th->scheduler; if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { ubf = ubf_select; @@ -1485,6 +1493,10 @@ rb_nogvl(void *(*func)(void *), void *data1, } } + if (scheduler != Qnil) { + rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0); + } + BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; @@ -1500,6 +1512,10 @@ rb_nogvl(void *(*func)(void *), void *data1, thread_value(rb_thread_kill(ubf_th)); } + if (scheduler != Qnil) { + rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0); + } + errno = saved_errno; return val; @@ -3574,6 +3590,63 @@ rb_thread_variables(VALUE thread) return ary; } +VALUE rb_thread_scheduler_get(VALUE thread) +{ + rb_thread_t * th = rb_thread_ptr(thread); + + VM_ASSERT(th); + + return th->scheduler; +} + +VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler) +{ + rb_thread_t * th = rb_thread_ptr(thread); + + VM_ASSERT(th); + + th->scheduler = scheduler; + + return th->scheduler; +} + +/* + * call-seq: + * Thread.scheduler -> scheduler or nil + * + * Returns the current scheduler if scheduling operations are permitted. + * + */ + +static VALUE +rb_thread_scheduler(VALUE klass) +{ + return rb_current_thread_scheduler(); +} + +VALUE rb_current_thread_scheduler(void) +{ + rb_thread_t * th = GET_THREAD(); + + VM_ASSERT(th); + + if (th->blocking == 0) + return th->scheduler; + else + return Qnil; +} + +static VALUE +rb_thread_blocking_p(VALUE thread) +{ + unsigned blocking = rb_thread_ptr(thread)->blocking; + + if (blocking == 0) + return Qfalse; + + return INT2NUM(blocking); +} + /* * call-seq: * thr.thread_variable?(key) -> true or false @@ -4129,6 +4202,15 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); } +static VALUE +rb_thread_timeout(struct timeval *timeout) { + if (timeout) { + return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec)); + } + + return Qnil; +} + #ifdef USE_POLL /* The same with linux kernel. TODO: make platform independent definition. */ @@ -4155,6 +4237,14 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct waiting_fd wfd; int state; + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events), + rb_thread_timeout(timeout) + ); + return RTEST(result); + } + wfd.th = GET_THREAD(); wfd.fd = fd; list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); @@ -4287,8 +4377,16 @@ select_single_cleanup(VALUE ptr) } int -rb_wait_for_single_fd(int fd, int events, struct timeval *tv) +rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events), + rb_thread_timeout(timeout) + ); + return RTEST(result); + } + rb_fdset_t rfds, wfds, efds; struct select_args args; int r; @@ -4298,7 +4396,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; - args.tv = tv; + args.tv = timeout; args.wfd.fd = fd; args.wfd.th = GET_THREAD(); @@ -5185,6 +5283,8 @@ Init_Thread(void) sym_immediate = ID2SYM(rb_intern("immediate")); sym_on_blocking = ID2SYM(rb_intern("on_blocking")); + id_wait_for_single_fd = rb_intern("wait_for_single_fd"); + rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); rb_define_singleton_method(rb_cThread, "start", thread_start, -2); rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); @@ -5223,6 +5323,7 @@ Init_Thread(void) rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); + rb_define_method(rb_cThread, "blocking?", rb_thread_blocking_p, 0); rb_define_method(rb_cThread, "status", rb_thread_status, 0); rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1); rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2); @@ -5239,6 +5340,10 @@ Init_Thread(void) rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1); rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1); + rb_define_singleton_method(rb_cThread, "scheduler", rb_thread_scheduler, 0); + rb_define_method(rb_cThread, "scheduler", rb_thread_scheduler_get, 0); + rb_define_method(rb_cThread, "scheduler=", rb_thread_scheduler_set, 1); + rb_define_method(rb_cThread, "name", rb_thread_getname, 0); rb_define_method(rb_cThread, "name=", rb_thread_setname, 1); rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0); |