diff options
author | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2020-09-21 09:54:08 +1200 |
---|---|---|
committer | Samuel Williams <samuel.williams@oriontransfer.co.nz> | 2020-09-21 11:48:44 +1200 |
commit | 70f08f1eed1df4579fef047d28fc3c807183fcfa (patch) | |
tree | 75b19e3db5bcb6367f000764bf7f5096fc28e0ed /thread.c | |
parent | 596173155a15b6d4a7b04bdaf9218b3e756a0683 (diff) |
Make `Thread#join` non-blocking.
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/3558
Diffstat (limited to 'thread.c')
-rw-r--r-- | thread.c | 249 |
1 files changed, 137 insertions, 112 deletions
@@ -544,6 +544,32 @@ terminate_all(rb_ractor_t *r, const rb_thread_t *main_thread) } } +static void +rb_threadptr_join_list_wakeup(rb_thread_t *thread) +{ + struct rb_waiting_list *join_list = thread->join_list; + + while (join_list) { + rb_thread_t *target_thread = join_list->thread; + + if (target_thread->scheduler != Qnil) { + rb_scheduler_unblock(target_thread->scheduler, target_thread->self, rb_fiberptr_self(join_list->fiber)); + } else { + rb_threadptr_interrupt(target_thread); + + switch (target_thread->status) { + case THREAD_STOPPED: + case THREAD_STOPPED_FOREVER: + target_thread->status = THREAD_RUNNABLE; + default: + break; + } + } + + join_list = join_list->next; + } +} + void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) { @@ -758,7 +784,6 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) { STACK_GROW_DIR_DETECTION; enum ruby_tag_type state; - rb_thread_list_t *join_list; VALUE errinfo = Qnil; size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); rb_thread_t *ractor_main_th = th->ractor->threads.main; @@ -860,20 +885,9 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start) rb_threadptr_interrupt(ractor_main_th); } - /* wake up joining threads */ - join_list = th->join_list; - while (join_list) { - rb_threadptr_interrupt(join_list->th); - switch (join_list->th->status) { - case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: - join_list->th->status = THREAD_RUNNABLE; - default: break; - } - join_list = join_list->next; - } - - rb_threadptr_unlock_all_locking_mutexes(th); - rb_check_deadlock(th->ractor); + rb_threadptr_join_list_wakeup(th); + rb_threadptr_unlock_all_locking_mutexes(th); + rb_check_deadlock(th->ractor); rb_fiber_close(th->ec->fiber_ptr); } @@ -1105,129 +1119,152 @@ rb_thread_create_ractor(rb_ractor_t *g, VALUE args, VALUE proc) struct join_arg { - rb_thread_t *target, *waiting; - rb_hrtime_t *limit; + struct rb_waiting_list *waiting_list; + rb_thread_t *target; + VALUE timeout; }; static VALUE remove_from_join_list(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; + rb_thread_t *target_thread = p->target; - if (target_th->status != THREAD_KILLED) { - rb_thread_list_t **p = &target_th->join_list; + if (target_thread->status != THREAD_KILLED) { + struct rb_waiting_list **join_list = &target_thread->join_list; - while (*p) { - if ((*p)->th == th) { - *p = (*p)->next; - break; - } - p = &(*p)->next; - } + while (*join_list) { + if (*join_list == p->waiting_list) { + *join_list = (*join_list)->next; + break; + } + + join_list = &(*join_list)->next; + } } return Qnil; } +static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); + static VALUE thread_join_sleep(VALUE arg) { struct join_arg *p = (struct join_arg *)arg; - rb_thread_t *target_th = p->target, *th = p->waiting; - rb_hrtime_t end = 0; + rb_thread_t *target_th = p->target, *th = p->waiting_list->thread; + rb_hrtime_t end = 0, rel = 0, *limit = 0; - if (p->limit) { - end = rb_hrtime_add(*p->limit, rb_hrtime_now()); + /* + * This supports INFINITY and negative values, so we can't use + * rb_time_interval right now... + */ + if (p->timeout == Qnil) { + /* unlimited */ + } + else if (FIXNUM_P(p->timeout)) { + rel = rb_sec2hrtime(NUM2TIMET(p->timeout)); + limit = &rel; + } + else { + limit = double2hrtime(&rel, rb_num2dbl(p->timeout)); + } + + if (limit) { + end = rb_hrtime_add(*limit, rb_hrtime_now()); } while (target_th->status != THREAD_KILLED) { - if (!p->limit) { - th->status = THREAD_STOPPED_FOREVER; + if (th->scheduler != Qnil) { + rb_scheduler_block(th->scheduler, target_th->self, p->timeout); + } else if (!limit) { + th->status = THREAD_STOPPED_FOREVER; rb_ractor_sleeper_threads_inc(th->ractor); - rb_check_deadlock(th->ractor); - native_sleep(th, 0); + rb_check_deadlock(th->ractor); + native_sleep(th, 0); rb_ractor_sleeper_threads_dec(th->ractor); - } - else { - if (hrtime_update_expire(p->limit, end)) { - thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", - thread_id_str(target_th)); - return Qfalse; - } - th->status = THREAD_STOPPED; - native_sleep(th, p->limit); - } - RUBY_VM_CHECK_INTS_BLOCKING(th->ec); - th->status = THREAD_RUNNABLE; - thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + } + else { + if (hrtime_update_expire(limit, end)) { + thread_debug("thread_join: timeout (thid: %"PRI_THREAD_ID")\n", + thread_id_str(target_th)); + return Qfalse; + } + th->status = THREAD_STOPPED; + native_sleep(th, limit); + } + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); + th->status = THREAD_RUNNABLE; + thread_debug("thread_join: interrupted (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); } return Qtrue; } static VALUE -thread_join(rb_thread_t *target_th, rb_hrtime_t *rel) +thread_join(rb_thread_t *target_th, VALUE timeout) { - rb_thread_t *th = GET_THREAD(); - struct join_arg arg; + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = ec->thread_ptr; + rb_fiber_t *fiber = ec->fiber_ptr; if (th == target_th) { - rb_raise(rb_eThreadError, "Target thread must not be current thread"); + rb_raise(rb_eThreadError, "Target thread must not be current thread"); } + if (th->ractor->threads.main == target_th) { - rb_raise(rb_eThreadError, "Target thread must not be main thread"); + rb_raise(rb_eThreadError, "Target thread must not be main thread"); } - arg.target = target_th; - arg.waiting = th; - arg.limit = rel; - thread_debug("thread_join (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + thread_id_str(target_th), thread_status_name(target_th, TRUE)); if (target_th->status != THREAD_KILLED) { - rb_thread_list_t list; - list.next = target_th->join_list; - list.th = th; - target_th->join_list = &list; - if (!rb_ensure(thread_join_sleep, (VALUE)&arg, - remove_from_join_list, (VALUE)&arg)) { - return Qnil; - } + struct rb_waiting_list waiting_list; + waiting_list.next = target_th->join_list; + waiting_list.thread = th; + waiting_list.fiber = fiber; + target_th->join_list = &waiting_list; + + struct join_arg arg; + arg.waiting_list = &waiting_list; + arg.target = target_th; + arg.timeout = timeout; + + if (!rb_ensure(thread_join_sleep, (VALUE)&arg, remove_from_join_list, (VALUE)&arg)) { + return Qnil; + } } thread_debug("thread_join: success (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); + thread_id_str(target_th), thread_status_name(target_th, TRUE)); if (target_th->ec->errinfo != Qnil) { - VALUE err = target_th->ec->errinfo; - - if (FIXNUM_P(err)) { - switch (err) { - case INT2FIX(TAG_FATAL): - thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n", - thread_id_str(target_th), thread_status_name(target_th, TRUE)); - - /* OK. killed. */ - break; - default: - rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); - } - } - else if (THROW_DATA_P(target_th->ec->errinfo)) { - rb_bug("thread_join: THROW_DATA should not reach here."); - } - else { - /* normal exception */ - rb_exc_raise(err); - } + VALUE err = target_th->ec->errinfo; + + if (FIXNUM_P(err)) { + switch (err) { + case INT2FIX(TAG_FATAL): + thread_debug("thread_join: terminated (thid: %"PRI_THREAD_ID", status: %s)\n", + thread_id_str(target_th), thread_status_name(target_th, TRUE)); + + /* OK. killed. */ + break; + default: + rb_bug("thread_join: Fixnum (%d) should not reach here.", FIX2INT(err)); + } + } + else if (THROW_DATA_P(target_th->ec->errinfo)) { + rb_bug("thread_join: THROW_DATA should not reach here."); + } + else { + /* normal exception */ + rb_exc_raise(err); + } } return target_th->self; } -static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); - /* * call-seq: * thr.join -> thr @@ -1270,25 +1307,13 @@ static rb_hrtime_t *double2hrtime(rb_hrtime_t *, double); static VALUE thread_join_m(int argc, VALUE *argv, VALUE self) { - VALUE limit; - rb_hrtime_t rel, *to = 0; - - /* - * This supports INFINITY and negative values, so we can't use - * rb_time_interval right now... - */ - if (!rb_check_arity(argc, 0, 1) || NIL_P(argv[0])) { - /* unlimited */ - } - else if (FIXNUM_P(limit = argv[0])) { - rel = rb_sec2hrtime(NUM2TIMET(limit)); - to = &rel; - } - else { - to = double2hrtime(&rel, rb_num2dbl(limit)); + VALUE timeout = Qnil; + + if (rb_check_arity(argc, 0, 1)) { + timeout = argv[0]; } - return thread_join(rb_thread_ptr(self), to); + return thread_join(rb_thread_ptr(self), timeout); } /* @@ -1309,7 +1334,7 @@ static VALUE thread_value(VALUE self) { rb_thread_t *th = rb_thread_ptr(self); - thread_join(th, 0); + thread_join(th, Qnil); return th->value; } @@ -1486,7 +1511,7 @@ rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker) { VALUE scheduler = rb_thread_current_scheduler(); if (scheduler != Qnil) { - rb_scheduler_block(scheduler, blocker); + rb_scheduler_block(scheduler, blocker, Qnil); } else { thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n"); sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE); @@ -5559,9 +5584,9 @@ debug_deadlock_check(rb_ractor_t *r, VALUE msg) } { - rb_thread_list_t *list = th->join_list; + struct rb_waiting_list *list = th->join_list; while (list) { - rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->th); + rb_str_catf(msg, "\n depended by: tb_thread_id:%p", (void *)list->thread); list = list->next; } } |