diff options
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 204 |
1 files changed, 120 insertions, 84 deletions
diff --git a/thread_sync.c b/thread_sync.c index 74a26bd0ac..ae69cb4a6e 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -41,6 +41,8 @@ struct queue_sleep_arg { static void sync_wakeup(struct ccan_list_head *head, long max) { + RUBY_DEBUG_LOG("max:%ld", max); + struct sync_waiter *cur = 0, *next; ccan_list_for_each_safe(head, cur, next, node) { @@ -51,6 +53,7 @@ sync_wakeup(struct ccan_list_head *head, long max) rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); } else { + RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th)); rb_threadptr_interrupt(cur->th); cur->th->status = THREAD_RUNNABLE; } @@ -251,6 +254,8 @@ rb_mutex_trylock(VALUE self) rb_mutex_t *mutex = mutex_ptr(self); if (mutex->fiber == 0) { + RUBY_DEBUG_LOG("%p ok", mutex); + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_thread_t *th = GET_THREAD(); mutex->fiber = fiber; @@ -258,17 +263,12 @@ rb_mutex_trylock(VALUE self) mutex_locked(th, self); return Qtrue; } - - return Qfalse; + else { + RUBY_DEBUG_LOG("%p ng", mutex); + return Qfalse; + } } -/* - * At maximum, only one thread can use cond_timedwait and watch deadlock - * periodically. Multiple polling thread (i.e. concurrent deadlock check) - * introduces new race conditions. [Bug #6278] [ruby-core:44275] - */ -static const rb_thread_t *patrol_thread = NULL; - static VALUE mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) { @@ -290,6 +290,8 @@ delete_from_waitq(VALUE value) return Qnil; } +static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th); + static VALUE do_mutex_lock(VALUE self, int interruptible_p) { @@ -297,6 +299,7 @@ do_mutex_lock(VALUE self, int interruptible_p) rb_thread_t *th = ec->thread_ptr; rb_fiber_t *fiber = ec->fiber_ptr; rb_mutex_t *mutex = mutex_ptr(self); + rb_atomic_t saved_ints = 0; /* When running trap handler */ if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) && @@ -310,6 +313,8 @@ do_mutex_lock(VALUE self, int interruptible_p) } while (mutex->fiber != fiber) { + VM_ASSERT(mutex->fiber != NULL); + VALUE scheduler = rb_fiber_scheduler_current(); if (scheduler != Qnil) { struct sync_waiter sync_waiter = { @@ -331,51 +336,47 @@ do_mutex_lock(VALUE self, int interruptible_p) rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread"); } - enum rb_thread_status prev_status = th->status; - rb_hrtime_t *timeout = 0; - rb_hrtime_t rel = rb_msec2hrtime(100); - - th->status = THREAD_STOPPED_FOREVER; - th->locking_mutex = self; - rb_ractor_sleeper_threads_inc(th->ractor); - /* - * Carefully! while some contended threads are in native_sleep(), - * ractor->sleeper is unstable value. we have to avoid both deadlock - * and busy loop. - */ - if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) && - !patrol_thread) { - timeout = &rel; - patrol_thread = th; - } - struct sync_waiter sync_waiter = { .self = self, .th = th, - .fiber = nonblocking_fiber(fiber) + .fiber = nonblocking_fiber(fiber), }; - ccan_list_add_tail(&mutex->waitq, &sync_waiter.node); + RUBY_DEBUG_LOG("%p wait", mutex); + + // similar code with `sleep_forever`, but + // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception. + // Ensure clause is needed like but `rb_ensure` a bit slow. + // + // begin + // sleep_forever(th, SLEEP_DEADLOCKABLE); + // ensure + // ccan_list_del(&sync_waiter.node); + // end + enum rb_thread_status prev_status = th->status; + th->status = THREAD_STOPPED_FOREVER; + rb_ractor_sleeper_threads_inc(th->ractor); + rb_check_deadlock(th->ractor); - native_sleep(th, timeout); /* release GVL */ + th->locking_mutex = self; + ccan_list_add_tail(&mutex->waitq, &sync_waiter.node); + { + native_sleep(th, NULL); + } ccan_list_del(&sync_waiter.node); + // unlocked by another thread while sleeping if (!mutex->fiber) { mutex->fiber = fiber; } - if (patrol_thread == th) - patrol_thread = NULL; - - th->locking_mutex = Qfalse; - if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { - rb_check_deadlock(th->ractor); - } - if (th->status == THREAD_STOPPED_FOREVER) { - th->status = prev_status; - } rb_ractor_sleeper_threads_dec(th->ractor); + th->status = prev_status; + th->locking_mutex = Qfalse; + th->locking_mutex = Qfalse; + + RUBY_DEBUG_LOG("%p wakeup", mutex); } if (interruptible_p) { @@ -387,11 +388,27 @@ do_mutex_lock(VALUE self, int interruptible_p) mutex->fiber = fiber; } } + else { + // clear interrupt information + if (RUBY_VM_INTERRUPTED(th->ec)) { + // reset interrupts + if (saved_ints == 0) { + saved_ints = threadptr_get_interrupts(th); + } + else { + // ignore additional interrupts + threadptr_get_interrupts(th); + } + } + } } + if (saved_ints) th->ec->interrupt_flag = saved_ints; if (mutex->fiber == fiber) mutex_locked(th, self); } + RUBY_DEBUG_LOG("%p locked", mutex); + // assertion if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); @@ -435,46 +452,46 @@ rb_mutex_owned_p(VALUE self) static const char * rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) { - const char *err = NULL; + RUBY_DEBUG_LOG("%p", mutex); if (mutex->fiber == 0) { - err = "Attempt to unlock a mutex which is not locked"; + return "Attempt to unlock a mutex which is not locked"; } else if (mutex->fiber != fiber) { - err = "Attempt to unlock a mutex which is locked by another thread/fiber"; + return "Attempt to unlock a mutex which is locked by another thread/fiber"; } - else { - struct sync_waiter *cur = 0, *next; - mutex->fiber = 0; - ccan_list_for_each_safe(&mutex->waitq, cur, next, node) { - ccan_list_del_init(&cur->node); + struct sync_waiter *cur = 0, *next; - if (cur->th->scheduler != Qnil && cur->fiber) { - rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); - goto found; - } - else { - switch (cur->th->status) { - case THREAD_RUNNABLE: /* from someone else calling Thread#run */ - case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ - rb_threadptr_interrupt(cur->th); - goto found; - case THREAD_STOPPED: /* probably impossible */ - rb_bug("unexpected THREAD_STOPPED"); - case THREAD_KILLED: - /* not sure about this, possible in exit GC? */ - rb_bug("unexpected THREAD_KILLED"); - continue; - } + mutex->fiber = 0; + thread_mutex_remove(th, mutex); + + ccan_list_for_each_safe(&mutex->waitq, cur, next, node) { + ccan_list_del_init(&cur->node); + + if (cur->th->scheduler != Qnil && cur->fiber) { + rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); + return NULL; + } + else { + switch (cur->th->status) { + case THREAD_RUNNABLE: /* from someone else calling Thread#run */ + case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ + RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th)); + rb_threadptr_interrupt(cur->th); + return NULL; + case THREAD_STOPPED: /* probably impossible */ + rb_bug("unexpected THREAD_STOPPED"); + case THREAD_KILLED: + /* not sure about this, possible in exit GC? */ + rb_bug("unexpected THREAD_KILLED"); + continue; } } - - found: - thread_mutex_remove(th, mutex); } - return err; + // We did not find any threads to wake up, so we can just return with no error: + return NULL; } /* @@ -652,22 +669,24 @@ rb_mutex_allow_trap(VALUE self, int val) #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq) #define queue_list(q) UNALIGNED_MEMBER_PTR(q, que) -PACKED_STRUCT_UNALIGNED(struct rb_queue { +RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN() +struct rb_queue { struct ccan_list_head waitq; rb_serial_t fork_gen; const VALUE que; int num_waiting; -}); +} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END(); #define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq) #define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que) #define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq) -PACKED_STRUCT_UNALIGNED(struct rb_szqueue { +RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN() +struct rb_szqueue { struct rb_queue q; int num_waiting_push; struct ccan_list_head pushq; long max; -}); +} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END(); static void queue_mark(void *ptr) @@ -835,7 +854,7 @@ raise_closed_queue_error(VALUE self) static VALUE queue_closed_result(VALUE self, struct rb_queue *q) { - assert(queue_length(self, q) == 0); + RUBY_ASSERT(queue_length(self, q) == 0); return Qnil; } @@ -847,8 +866,8 @@ queue_closed_result(VALUE self, struct rb_queue *q) * information must be exchanged safely between multiple threads. The * Thread::Queue class implements all the required locking semantics. * - * The class implements FIFO type of queue. In a FIFO queue, the first - * tasks added are the first retrieved. + * The class implements FIFO (first in, first out) type of queue. + * In a FIFO queue, the first tasks added are the first retrieved. * * Example: * @@ -856,17 +875,17 @@ queue_closed_result(VALUE self, struct rb_queue *q) * * producer = Thread.new do * 5.times do |i| - * sleep rand(i) # simulate expense - * queue << i - * puts "#{i} produced" + * sleep rand(i) # simulate expense + * queue << i + * puts "#{i} produced" * end * end * * consumer = Thread.new do * 5.times do |i| - * value = queue.pop - * sleep rand(i/2) # simulate expense - * puts "consumed #{value}" + * value = queue.pop + * sleep rand(i/2) # simulate expense + * puts "consumed #{value}" * end * end * @@ -1062,8 +1081,8 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout) else { rb_execution_context_t *ec = GET_EC(); - assert(RARRAY_LEN(q->que) == 0); - assert(queue_closed_p(self) == 0); + RUBY_ASSERT(RARRAY_LEN(q->que) == 0); + RUBY_ASSERT(queue_closed_p(self) == 0); struct queue_waiter queue_waiter = { .w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)}, @@ -1139,6 +1158,22 @@ rb_queue_length(VALUE self) return LONG2NUM(queue_length(self, queue_ptr(self))); } +NORETURN(static VALUE rb_queue_freeze(VALUE self)); +/* + * call-seq: + * freeze + * + * The queue can't be frozen, so this method raises an exception: + * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>) + * + */ +static VALUE +rb_queue_freeze(VALUE self) +{ + rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self); + UNREACHABLE_RETURN(self); +} + /* * Document-method: Thread::Queue#num_waiting * @@ -1600,6 +1635,7 @@ Init_thread_sync(void) rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); + rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0); rb_define_alias(rb_cQueue, "enq", "push"); rb_define_alias(rb_cQueue, "<<", "push"); |