diff options
Diffstat (limited to 'thread_sync.c')
| -rw-r--r-- | thread_sync.c | 1269 |
1 files changed, 797 insertions, 472 deletions
diff --git a/thread_sync.c b/thread_sync.c index 152995fac0..27b617c907 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1,36 +1,77 @@ /* included by thread.c */ +#include "ccan/list/list.h" static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; /* Mutex */ - typedef struct rb_mutex_struct { - rb_nativethread_lock_t lock; - rb_nativethread_cond_t cond; - struct rb_thread_struct volatile *th; + rb_fiber_t *fiber; struct rb_mutex_struct *next_mutex; - int cond_waiting; - int allow_trap; + struct list_head waitq; /* protected by GVL */ } rb_mutex_t; +/* sync_waiter is always on-stack */ +struct sync_waiter { + VALUE self; + rb_thread_t *th; + rb_fiber_t *fiber; + struct list_node node; +}; + +#define MUTEX_ALLOW_TRAP FL_USER1 + +static void +sync_wakeup(struct list_head *head, long max) +{ + struct sync_waiter *cur = 0, *next; + + list_for_each_safe(head, cur, next, node) { + list_del_init(&cur->node); + + if (cur->th->status != THREAD_KILLED) { + + if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) { + rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); + } + else { + rb_threadptr_interrupt(cur->th); + cur->th->status = THREAD_RUNNABLE; + } + + if (--max == 0) return; + } + } +} + +static void +wakeup_one(struct list_head *head) +{ + sync_wakeup(head, 1); +} + +static void +wakeup_all(struct list_head *head) +{ + sync_wakeup(head, LONG_MAX); +} + #if defined(HAVE_WORKING_FORK) static void rb_mutex_abandon_all(rb_mutex_t *mutexes); static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); #endif -static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); +static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber); /* - * Document-class: Mutex + * Document-class: Thread::Mutex * - * Mutex implements a simple semaphore that can be used to coordinate access to - * shared data from multiple concurrent threads. + * Thread::Mutex implements a simple semaphore that can be used to + * coordinate access to shared data from multiple concurrent threads. * * Example: * - * require 'thread' - * semaphore = Mutex.new + * semaphore = Thread::Mutex.new * * a = Thread.new { * semaphore.synchronize { @@ -46,23 +87,31 @@ static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *t * */ -#define GetMutexPtr(obj, tobj) \ - TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) +#define mutex_mark ((void(*)(void*))0) + +static size_t +rb_mutex_num_waiting(rb_mutex_t *mutex) +{ + struct sync_waiter *w = 0; + size_t n = 0; + + list_for_each(&mutex->waitq, w, node) { + n++; + } + + return n; +} -#define mutex_mark NULL +rb_thread_t* rb_fiber_threadptr(const rb_fiber_t *fiber); static void mutex_free(void *ptr) { - if (ptr) { - rb_mutex_t *mutex = ptr; - if (mutex->th) { - /* rb_warn("free locked mutex"); */ - const char *err = rb_mutex_unlock_th(mutex, mutex->th); - if (err) rb_bug("%s", err); - } - native_mutex_destroy(&mutex->lock); - native_cond_destroy(&mutex->cond); + rb_mutex_t *mutex = ptr; + if (mutex->fiber) { + /* rb_warn("free locked mutex"); */ + const char *err = rb_mutex_unlock_th(mutex, rb_fiber_threadptr(mutex->fiber), mutex->fiber); + if (err) rb_bug("%s", err); } ruby_xfree(ptr); } @@ -76,18 +125,23 @@ mutex_memsize(const void *ptr) static const rb_data_type_t mutex_data_type = { "mutex", {mutex_mark, mutex_free, mutex_memsize,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY + 0, 0, RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_FREE_IMMEDIATELY }; +static rb_mutex_t * +mutex_ptr(VALUE obj) +{ + rb_mutex_t *mutex; + + TypedData_Get_Struct(obj, rb_mutex_t, &mutex_data_type, mutex); + + return mutex; +} + VALUE rb_obj_is_mutex(VALUE obj) { - if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { - return Qtrue; - } - else { - return Qfalse; - } + return RBOOL(rb_typeddata_is_kind_of(obj, &mutex_data_type)); } static VALUE @@ -97,14 +151,14 @@ mutex_alloc(VALUE klass) rb_mutex_t *mutex; obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); - native_mutex_initialize(&mutex->lock); - native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); + + list_head_init(&mutex->waitq); return obj; } /* * call-seq: - * Mutex.new -> mutex + * Thread::Mutex.new -> mutex * * Creates a new Mutex */ @@ -129,21 +183,43 @@ rb_mutex_new(void) VALUE rb_mutex_locked_p(VALUE self) { - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); - return mutex->th ? Qtrue : Qfalse; + rb_mutex_t *mutex = mutex_ptr(self); + + return RBOOL(mutex->fiber); } static void -mutex_locked(rb_thread_t *th, VALUE self) +thread_mutex_insert(rb_thread_t *thread, rb_mutex_t *mutex) { - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); + if (thread->keeping_mutexes) { + mutex->next_mutex = thread->keeping_mutexes; + } + + thread->keeping_mutexes = mutex; +} + +static void +thread_mutex_remove(rb_thread_t *thread, rb_mutex_t *mutex) +{ + rb_mutex_t **keeping_mutexes = &thread->keeping_mutexes; - if (th->keeping_mutexes) { - mutex->next_mutex = th->keeping_mutexes; + while (*keeping_mutexes && *keeping_mutexes != mutex) { + // Move to the next mutex in the list: + keeping_mutexes = &(*keeping_mutexes)->next_mutex; + } + + if (*keeping_mutexes) { + *keeping_mutexes = mutex->next_mutex; + mutex->next_mutex = NULL; } - th->keeping_mutexes = mutex; +} + +static void +mutex_locked(rb_thread_t *th, VALUE self) +{ + rb_mutex_t *mutex = mutex_ptr(self); + + thread_mutex_insert(th, mutex); } /* @@ -156,71 +232,18 @@ mutex_locked(rb_thread_t *th, VALUE self) VALUE rb_mutex_trylock(VALUE self) { - rb_mutex_t *mutex; - VALUE locked = Qfalse; - GetMutexPtr(self, mutex); + rb_mutex_t *mutex = mutex_ptr(self); - native_mutex_lock(&mutex->lock); - if (mutex->th == 0) { + if (mutex->fiber == 0) { + rb_fiber_t *fiber = GET_EC()->fiber_ptr; rb_thread_t *th = GET_THREAD(); - mutex->th = th; - locked = Qtrue; + mutex->fiber = fiber; mutex_locked(th, self); + return Qtrue; } - native_mutex_unlock(&mutex->lock); - - return locked; -} - -static int -lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) -{ - int interrupted = 0; - int err = 0; - - mutex->cond_waiting++; - for (;;) { - if (!mutex->th) { - mutex->th = th; - break; - } - if (RUBY_VM_INTERRUPTED(th)) { - interrupted = 1; - break; - } - if (err == ETIMEDOUT) { - interrupted = 2; - break; - } - - if (timeout_ms) { - struct timespec timeout_rel; - struct timespec timeout; - - timeout_rel.tv_sec = 0; - timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; - timeout = native_cond_timeout(&mutex->cond, timeout_rel); - err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); - } - else { - native_cond_wait(&mutex->cond, &mutex->lock); - err = 0; - } - } - mutex->cond_waiting--; - - return interrupted; -} -static void -lock_interrupt(void *ptr) -{ - rb_mutex_t *mutex = (rb_mutex_t *)ptr; - native_mutex_lock(&mutex->lock); - if (mutex->cond_waiting > 0) - native_cond_broadcast(&mutex->cond); - native_mutex_unlock(&mutex->lock); + return Qfalse; } /* @@ -230,6 +253,141 @@ lock_interrupt(void *ptr) */ static const rb_thread_t *patrol_thread = NULL; +static VALUE +mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex) +{ + return RBOOL(mutex->fiber == fiber); +} + +static VALUE +call_rb_fiber_scheduler_block(VALUE mutex) +{ + return rb_fiber_scheduler_block(rb_fiber_scheduler_current(), mutex, Qnil); +} + +static VALUE +delete_from_waitq(VALUE value) +{ + struct sync_waiter *sync_waiter = (void *)value; + list_del(&sync_waiter->node); + + return Qnil; +} + +static VALUE +do_mutex_lock(VALUE self, int interruptible_p) +{ + rb_execution_context_t *ec = GET_EC(); + rb_thread_t *th = ec->thread_ptr; + rb_fiber_t *fiber = ec->fiber_ptr; + rb_mutex_t *mutex = mutex_ptr(self); + + /* When running trap handler */ + if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) && + th->ec->interrupt_mask & TRAP_INTERRUPT_MASK) { + rb_raise(rb_eThreadError, "can't be called from trap context"); + } + + if (rb_mutex_trylock(self) == Qfalse) { + if (mutex->fiber == fiber) { + rb_raise(rb_eThreadError, "deadlock; recursive locking"); + } + + while (mutex->fiber != fiber) { + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + struct sync_waiter sync_waiter = { + .self = self, + .th = th, + .fiber = fiber + }; + + list_add_tail(&mutex->waitq, &sync_waiter.node); + + rb_ensure(call_rb_fiber_scheduler_block, self, delete_from_waitq, (VALUE)&sync_waiter); + + if (!mutex->fiber) { + mutex->fiber = fiber; + } + } + else { + if (!th->vm->thread_ignore_deadlock && rb_fiber_threadptr(mutex->fiber) == th) { + rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread"); + } + + enum rb_thread_status prev_status = th->status; + rb_hrtime_t *timeout = 0; + rb_hrtime_t rel = rb_msec2hrtime(100); + + th->status = THREAD_STOPPED_FOREVER; + th->locking_mutex = self; + rb_ractor_sleeper_threads_inc(th->ractor); + /* + * Carefully! while some contended threads are in native_sleep(), + * ractor->sleeper is unstable value. we have to avoid both deadlock + * and busy loop. + */ + if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) && + !patrol_thread) { + timeout = &rel; + patrol_thread = th; + } + + struct sync_waiter sync_waiter = { + .self = self, + .th = th, + .fiber = fiber + }; + + list_add_tail(&mutex->waitq, &sync_waiter.node); + + native_sleep(th, timeout); /* release GVL */ + + list_del(&sync_waiter.node); + + if (!mutex->fiber) { + mutex->fiber = fiber; + } + + if (patrol_thread == th) + patrol_thread = NULL; + + th->locking_mutex = Qfalse; + if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) { + rb_check_deadlock(th->ractor); + } + if (th->status == THREAD_STOPPED_FOREVER) { + th->status = prev_status; + } + rb_ractor_sleeper_threads_dec(th->ractor); + } + + if (interruptible_p) { + /* release mutex before checking for interrupts...as interrupt checking + * code might call rb_raise() */ + if (mutex->fiber == fiber) mutex->fiber = 0; + RUBY_VM_CHECK_INTS_BLOCKING(th->ec); /* may release mutex */ + if (!mutex->fiber) { + mutex->fiber = fiber; + } + } + } + + if (mutex->fiber == fiber) mutex_locked(th, self); + } + + // assertion + if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned."); + + return self; +} + +static VALUE +mutex_lock_uninterruptible(VALUE self) +{ + return do_mutex_lock(self, 0); +} + /* * call-seq: * mutex.lock -> self @@ -240,70 +398,7 @@ static const rb_thread_t *patrol_thread = NULL; VALUE rb_mutex_lock(VALUE self) { - rb_thread_t *th = GET_THREAD(); - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); - - /* When running trap handler */ - if (!mutex->allow_trap && th->interrupt_mask & TRAP_INTERRUPT_MASK) { - rb_raise(rb_eThreadError, "can't be called from trap context"); - } - - if (rb_mutex_trylock(self) == Qfalse) { - if (mutex->th == th) { - rb_raise(rb_eThreadError, "deadlock; recursive locking"); - } - - while (mutex->th != th) { - int interrupted; - enum rb_thread_status prev_status = th->status; - volatile int timeout_ms = 0; - struct rb_unblock_callback oldubf; - - set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); - th->status = THREAD_STOPPED_FOREVER; - th->locking_mutex = self; - - native_mutex_lock(&mutex->lock); - th->vm->sleeper++; - /* - * Carefully! while some contended threads are in lock_func(), - * vm->sleepr is unstable value. we have to avoid both deadlock - * and busy loop. - */ - if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && - !patrol_thread) { - timeout_ms = 100; - patrol_thread = th; - } - - GVL_UNLOCK_BEGIN(); - interrupted = lock_func(th, mutex, (int)timeout_ms); - native_mutex_unlock(&mutex->lock); - GVL_UNLOCK_END(); - - if (patrol_thread == th) - patrol_thread = NULL; - - reset_unblock_function(th, &oldubf); - - th->locking_mutex = Qfalse; - if (mutex->th && interrupted == 2) { - rb_check_deadlock(th->vm); - } - if (th->status == THREAD_STOPPED_FOREVER) { - th->status = prev_status; - } - th->vm->sleeper--; - - if (mutex->th == th) mutex_locked(th, self); - - if (interrupted) { - RUBY_VM_CHECK_INTS_BLOCKING(th); - } - } - } - return self; + return do_mutex_lock(self, 1); } /* @@ -315,46 +410,52 @@ rb_mutex_lock(VALUE self) VALUE rb_mutex_owned_p(VALUE self) { - VALUE owned = Qfalse; - rb_thread_t *th = GET_THREAD(); - rb_mutex_t *mutex; - - GetMutexPtr(self, mutex); + rb_fiber_t *fiber = GET_EC()->fiber_ptr; + rb_mutex_t *mutex = mutex_ptr(self); - if (mutex->th == th) - owned = Qtrue; - - return owned; + return mutex_owned_p(fiber, mutex); } static const char * -rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) +rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber) { const char *err = NULL; - native_mutex_lock(&mutex->lock); - - if (mutex->th == 0) { - err = "Attempt to unlock a mutex which is not locked"; + if (mutex->fiber == 0) { + err = "Attempt to unlock a mutex which is not locked"; } - else if (mutex->th != th) { - err = "Attempt to unlock a mutex which is locked by another thread"; + else if (mutex->fiber != fiber) { + err = "Attempt to unlock a mutex which is locked by another thread/fiber"; } else { - mutex->th = 0; - if (mutex->cond_waiting > 0) - native_cond_signal(&mutex->cond); - } - - native_mutex_unlock(&mutex->lock); - - if (!err) { - rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes; - while (*th_mutex != mutex) { - th_mutex = &(*th_mutex)->next_mutex; - } - *th_mutex = mutex->next_mutex; - mutex->next_mutex = NULL; + struct sync_waiter *cur = 0, *next; + + mutex->fiber = 0; + list_for_each_safe(&mutex->waitq, cur, next, node) { + list_del_init(&cur->node); + + if (cur->th->scheduler != Qnil && rb_fiberptr_blocking(cur->fiber) == 0) { + rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber)); + goto found; + } + else { + switch (cur->th->status) { + case THREAD_RUNNABLE: /* from someone else calling Thread#run */ + case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */ + rb_threadptr_interrupt(cur->th); + goto found; + case THREAD_STOPPED: /* probably impossible */ + rb_bug("unexpected THREAD_STOPPED"); + case THREAD_KILLED: + /* not sure about this, possible in exit GC? */ + rb_bug("unexpected THREAD_KILLED"); + continue; + } + } + } + + found: + thread_mutex_remove(th, mutex); } return err; @@ -371,10 +472,10 @@ VALUE rb_mutex_unlock(VALUE self) { const char *err; - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); + rb_mutex_t *mutex = mutex_ptr(self); + rb_thread_t *th = GET_THREAD(); - err = rb_mutex_unlock_th(mutex, GET_THREAD()); + err = rb_mutex_unlock_th(mutex, th, GET_EC()->fiber_ptr); if (err) rb_raise(rb_eThreadError, "%s", err); return self; @@ -384,23 +485,19 @@ rb_mutex_unlock(VALUE self) static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th) { - if (th->keeping_mutexes) { - rb_mutex_abandon_all(th->keeping_mutexes); - } + rb_mutex_abandon_all(th->keeping_mutexes); th->keeping_mutexes = NULL; } static void rb_mutex_abandon_locking_mutex(rb_thread_t *th) { - rb_mutex_t *mutex; - - if (!th->locking_mutex) return; + if (th->locking_mutex) { + rb_mutex_t *mutex = mutex_ptr(th->locking_mutex); - GetMutexPtr(th->locking_mutex, mutex); - if (mutex->th == th) - rb_mutex_abandon_all(mutex); - th->locking_mutex = Qfalse; + list_head_init(&mutex->waitq); + th->locking_mutex = Qfalse; + } } static void @@ -411,51 +508,65 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes) while (mutexes) { mutex = mutexes; mutexes = mutex->next_mutex; - mutex->th = 0; + mutex->fiber = 0; mutex->next_mutex = 0; + list_head_init(&mutex->waitq); } } #endif static VALUE -rb_mutex_sleep_forever(VALUE time) +rb_mutex_sleep_forever(VALUE self) { - rb_thread_sleep_deadly_allow_spurious_wakeup(); + rb_thread_sleep_deadly_allow_spurious_wakeup(self); return Qnil; } static VALUE rb_mutex_wait_for(VALUE time) { - struct timeval *t = (struct timeval *)time; - sleep_timeval(GET_THREAD(), *t, 0); /* permit spurious check */ - return Qnil; + rb_hrtime_t *rel = (rb_hrtime_t *)time; + /* permit spurious check */ + return RBOOL(sleep_hrtime(GET_THREAD(), *rel, 0)); } VALUE rb_mutex_sleep(VALUE self, VALUE timeout) { - time_t beg, end; struct timeval t; + VALUE woken = Qtrue; if (!NIL_P(timeout)) { t = rb_time_interval(timeout); } + rb_mutex_unlock(self); - beg = time(0); - if (NIL_P(timeout)) { - rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); + time_t beg = time(0); + + VALUE scheduler = rb_fiber_scheduler_current(); + if (scheduler != Qnil) { + rb_fiber_scheduler_kernel_sleep(scheduler, timeout); + mutex_lock_uninterruptible(self); } else { - rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); + if (NIL_P(timeout)) { + rb_ensure(rb_mutex_sleep_forever, self, mutex_lock_uninterruptible, self); + } + else { + rb_hrtime_t rel = rb_timeval2hrtime(&t); + woken = rb_ensure(rb_mutex_wait_for, (VALUE)&rel, mutex_lock_uninterruptible, self); + } } - end = time(0) - beg; - return INT2FIX(end); + + RUBY_VM_CHECK_INTS_BLOCKING(GET_EC()); + if (!woken) return Qnil; + time_t end = time(0) - beg; + return TIMET2NUM(end); } /* * call-seq: - * mutex.sleep(timeout = nil) -> number + * mutex.sleep(timeout = nil) -> number or nil * * Releases the lock and sleeps +timeout+ seconds if it is given and * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by @@ -466,13 +577,15 @@ rb_mutex_sleep(VALUE self, VALUE timeout) * * Note that this method can wakeup without explicit Thread#wakeup call. * For example, receiving signal and so on. + * + * Returns the slept time in seconds if woken up, or +nil+ if timed out. */ static VALUE mutex_sleep(int argc, VALUE *argv, VALUE self) { VALUE timeout; - rb_scan_args(argc, argv, "01", &timeout); + timeout = rb_check_arity(argc, 0, 1) ? argv[0] : Qnil; return rb_mutex_sleep(self, timeout); } @@ -481,7 +594,7 @@ mutex_sleep(int argc, VALUE *argv, VALUE self) * mutex.synchronize { ... } -> result of the block * * Obtains a lock, runs the block, and releases the lock when the block - * completes. See the example under +Mutex+. + * completes. See the example under Thread::Mutex. */ VALUE @@ -496,10 +609,10 @@ rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) * mutex.synchronize { ... } -> result of the block * * Obtains a lock, runs the block, and releases the lock when the block - * completes. See the example under +Mutex+. + * completes. See the example under Thread::Mutex. */ static VALUE -rb_mutex_synchronize_m(VALUE self, VALUE args) +rb_mutex_synchronize_m(VALUE self) { if (!rb_block_given_p()) { rb_raise(rb_eThreadError, "must be called with a block"); @@ -510,88 +623,157 @@ rb_mutex_synchronize_m(VALUE self, VALUE args) void rb_mutex_allow_trap(VALUE self, int val) { - rb_mutex_t *m; - GetMutexPtr(self, m); + Check_TypedStruct(self, &mutex_data_type); - m->allow_trap = val; + if (val) + FL_SET_RAW(self, MUTEX_ALLOW_TRAP); + else + FL_UNSET_RAW(self, MUTEX_ALLOW_TRAP); } /* Queue */ -enum { - QUEUE_QUE, - QUEUE_WAITERS, - SZQUEUE_WAITERS, - SZQUEUE_MAX, - END_QUEUE -}; +#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq) +PACKED_STRUCT_UNALIGNED(struct rb_queue { + struct list_head waitq; + rb_serial_t fork_gen; + const VALUE que; + int num_waiting; +}); + +#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq) +#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq) +PACKED_STRUCT_UNALIGNED(struct rb_szqueue { + struct rb_queue q; + int num_waiting_push; + struct list_head pushq; + long max; +}); -#define QUEUE_CLOSED FL_USER5 +static void +queue_mark(void *ptr) +{ + struct rb_queue *q = ptr; -#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) -#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) -#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) -#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) -#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q)) + /* no need to mark threads in waitq, they are on stack */ + rb_gc_mark(q->que); +} -static VALUE -ary_buf_new(void) +static size_t +queue_memsize(const void *ptr) { - return rb_ary_tmp_new(1); + return sizeof(struct rb_queue); } +static const rb_data_type_t queue_data_type = { + "queue", + {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + static VALUE -get_array(VALUE obj, int idx) +queue_alloc(VALUE klass) { - VALUE ary = RSTRUCT_GET(obj, idx); - if (!RB_TYPE_P(ary, T_ARRAY)) { - rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); - } - return ary; + VALUE obj; + struct rb_queue *q; + + obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q); + list_head_init(queue_waitq(q)); + return obj; } -static void -wakeup_first_thread(VALUE list) +static int +queue_fork_check(struct rb_queue *q) { - VALUE thread; + rb_serial_t fork_gen = GET_VM()->fork_gen; - while (!NIL_P(thread = rb_ary_shift(list))) { - if (RTEST(rb_thread_wakeup_alive(thread))) break; + if (q->fork_gen == fork_gen) { + return 0; } + /* forked children can't reach into parent thread stacks */ + q->fork_gen = fork_gen; + list_head_init(queue_waitq(q)); + q->num_waiting = 0; + return 1; } +static struct rb_queue * +queue_ptr(VALUE obj) +{ + struct rb_queue *q; + + TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q); + queue_fork_check(q); + + return q; +} + +#define QUEUE_CLOSED FL_USER5 + static void -wakeup_all_threads(VALUE list) +szqueue_mark(void *ptr) +{ + struct rb_szqueue *sq = ptr; + + queue_mark(&sq->q); +} + +static size_t +szqueue_memsize(const void *ptr) +{ + return sizeof(struct rb_szqueue); +} + +static const rb_data_type_t szqueue_data_type = { + "sized_queue", + {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + +static VALUE +szqueue_alloc(VALUE klass) +{ + struct rb_szqueue *sq; + VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue, + &szqueue_data_type, sq); + list_head_init(szqueue_waitq(sq)); + list_head_init(szqueue_pushq(sq)); + return obj; +} + +static struct rb_szqueue * +szqueue_ptr(VALUE obj) { - VALUE thread; - long i; + struct rb_szqueue *sq; - for (i=0; i<RARRAY_LEN(list); i++) { - thread = RARRAY_AREF(list, i); - rb_thread_wakeup_alive(thread); + TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq); + if (queue_fork_check(&sq->q)) { + list_head_init(szqueue_pushq(sq)); + sq->num_waiting_push = 0; } - rb_ary_clear(list); + + return sq; } -static unsigned long -queue_length(VALUE self) +static VALUE +ary_buf_new(void) { - VALUE que = GET_QUEUE_QUE(self); - return RARRAY_LEN(que); + return rb_ary_tmp_new(1); } -static unsigned long -queue_num_waiting(VALUE self) +static VALUE +check_array(VALUE obj, VALUE ary) { - VALUE waiters = GET_QUEUE_WAITERS(self); - return RARRAY_LEN(waiters); + if (!RB_TYPE_P(ary, T_ARRAY)) { + rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj); + } + return ary; } -static unsigned long -szqueue_num_waiting_producer(VALUE self) +static long +queue_length(VALUE self, struct rb_queue *q) { - VALUE waiters = GET_SZQUEUE_WAITERS(self); - return RARRAY_LEN(waiters); + return RARRAY_LEN(check_array(self, q->que)); } static int @@ -600,6 +782,15 @@ queue_closed_p(VALUE self) return FL_TEST_RAW(self, QUEUE_CLOSED) != 0; } +/* + * Document-class: ClosedQueueError + * + * The exception class which will be raised when pushing into a closed + * Queue. See Thread::Queue#close and Thread::SizedQueue#close. + */ + +NORETURN(static void raise_closed_queue_error(VALUE self)); + static void raise_closed_queue_error(VALUE self) { @@ -607,47 +798,26 @@ raise_closed_queue_error(VALUE self) } static VALUE -queue_closed_result(VALUE self) +queue_closed_result(VALUE self, struct rb_queue *q) { - assert(queue_length(self) == 0); + assert(queue_length(self, q) == 0); return Qnil; } -static VALUE -queue_do_close(VALUE self, int is_szq) -{ - if (!queue_closed_p(self)) { - FL_SET(self, QUEUE_CLOSED); - - if (queue_num_waiting(self) > 0) { - VALUE waiters = GET_QUEUE_WAITERS(self); - wakeup_all_threads(waiters); - } - - if (is_szq && szqueue_num_waiting_producer(self) > 0) { - VALUE waiters = GET_SZQUEUE_WAITERS(self); - wakeup_all_threads(waiters); - } - } - - return self; -} - /* - * Document-class: Queue + * Document-class: Thread::Queue * - * The Queue class implements multi-producer, multi-consumer queues. - * It is especially useful in threaded programming when information - * must be exchanged safely between multiple threads. The Queue class - * implements all the required locking semantics. + * The Thread::Queue class implements multi-producer, multi-consumer + * queues. It is especially useful in threaded programming when + * information must be exchanged safely between multiple threads. The + * Thread::Queue class implements all the required locking semantics. * * The class implements FIFO type of queue. In a FIFO queue, the first * tasks added are the first retrieved. * * Example: * - * require 'thread' - * queue = Queue.new + * queue = Thread::Queue.new * * producer = Thread.new do * 5.times do |i| @@ -665,35 +835,64 @@ queue_do_close(VALUE self, int is_szq) * end * end * + * consumer.join + * */ /* * Document-method: Queue::new * - * Creates a new queue instance. + * call-seq: + * Thread::Queue.new -> empty_queue + * Thread::Queue.new(enumerable) -> queue + * + * Creates a new queue instance, optionally using the contents of an +enumerable+ + * for its initial state. + * + * Example: + * + * q = Thread::Queue.new + * #=> #<Thread::Queue:0x00007ff7501110d0> + * q.empty? + * #=> true + * + * q = Thread::Queue.new([1, 2, 3]) + * #=> #<Thread::Queue:0x00007ff7500ec500> + * q.empty? + * #=> false + * q.pop + * #=> 1 */ static VALUE -rb_queue_initialize(VALUE self) +rb_queue_initialize(int argc, VALUE *argv, VALUE self) { - RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); - RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); + VALUE initial; + struct rb_queue *q = queue_ptr(self); + if ((argc = rb_scan_args(argc, argv, "01", &initial)) == 1) { + initial = rb_to_array(initial); + } + RB_OBJ_WRITE(self, &q->que, ary_buf_new()); + list_head_init(queue_waitq(q)); + if (argc == 1) { + rb_ary_concat(q->que, initial); + } return self; } static VALUE -queue_do_push(VALUE self, VALUE obj) +queue_do_push(VALUE self, struct rb_queue *q, VALUE obj) { if (queue_closed_p(self)) { raise_closed_queue_error(self); } - rb_ary_push(GET_QUEUE_QUE(self), obj); - wakeup_first_thread(GET_QUEUE_WAITERS(self)); + rb_ary_push(check_array(self, q->que), obj); + wakeup_one(queue_waitq(q)); return self; } /* - * Document-method: Queue#close + * Document-method: Thread::Queue#close * call-seq: * close * @@ -705,16 +904,18 @@ queue_do_push(VALUE self, VALUE obj) * * - +close+ will be ignored. * - * - calling enq/push/<< will return nil. + * - calling enq/push/<< will raise a +ClosedQueueError+. * * - when +empty?+ is false, calling deq/pop/shift will return an object * from the queue as usual. + * - when +empty?+ is true, deq(false) will not suspend the thread and will return nil. + * deq(true) will raise a +ThreadError+. * * ClosedQueueError is inherited from StopIteration, so that you can break loop block. * - * Example: + * Example: * - * q = Queue.new + * q = Thread::Queue.new * Thread.new{ * while e = q.deq # wait for nil to break loop * # ... @@ -726,11 +927,19 @@ queue_do_push(VALUE self, VALUE obj) static VALUE rb_queue_close(VALUE self) { - return queue_do_close(self, FALSE); + struct rb_queue *q = queue_ptr(self); + + if (!queue_closed_p(self)) { + FL_SET(self, QUEUE_CLOSED); + + wakeup_all(queue_waitq(q)); + } + + return self; } /* - * Document-method: Queue#closed? + * Document-method: Thread::Queue#closed? * call-seq: closed? * * Returns +true+ if the queue is closed. @@ -739,11 +948,11 @@ rb_queue_close(VALUE self) static VALUE rb_queue_closed_p(VALUE self) { - return queue_closed_p(self) ? Qtrue : Qfalse; + return RBOOL(queue_closed_p(self)); } /* - * Document-method: Queue#push + * Document-method: Thread::Queue#push * call-seq: * push(object) * enq(object) @@ -755,52 +964,79 @@ rb_queue_closed_p(VALUE self) static VALUE rb_queue_push(VALUE self, VALUE obj) { - return queue_do_push(self, obj); + return queue_do_push(self, queue_ptr(self), obj); +} + +static VALUE +queue_sleep(VALUE self) +{ + rb_thread_sleep_deadly_allow_spurious_wakeup(self); + return Qnil; } -struct waiting_delete { - VALUE waiting; - VALUE th; +struct queue_waiter { + struct sync_waiter w; + union { + struct rb_queue *q; + struct rb_szqueue *sq; + } as; }; static VALUE -queue_delete_from_waiting(struct waiting_delete *p) +queue_sleep_done(VALUE p) { - rb_ary_delete(p->waiting, p->th); - return Qnil; + struct queue_waiter *qw = (struct queue_waiter *)p; + + list_del(&qw->w.node); + qw->as.q->num_waiting--; + + return Qfalse; } static VALUE -queue_sleep(VALUE arg) +szqueue_sleep_done(VALUE p) { - rb_thread_sleep_deadly_allow_spurious_wakeup(); - return Qnil; + struct queue_waiter *qw = (struct queue_waiter *)p; + + list_del(&qw->w.node); + qw->as.sq->num_waiting_push--; + + return Qfalse; } static VALUE -queue_do_pop(VALUE self, int should_block) -{ - struct waiting_delete args; - args.waiting = GET_QUEUE_WAITERS(self); - args.th = rb_thread_current(); - - while (queue_length(self) == 0) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue empty"); - } - else if (queue_closed_p(self)) { - return queue_closed_result(self); - } - else { - assert(queue_length(self) == 0); - assert(queue_closed_p(self) == 0); - - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args); - } +queue_do_pop(VALUE self, struct rb_queue *q, int should_block) +{ + check_array(self, q->que); + + while (RARRAY_LEN(q->que) == 0) { + if (!should_block) { + rb_raise(rb_eThreadError, "queue empty"); + } + else if (queue_closed_p(self)) { + return queue_closed_result(self, q); + } + else { + rb_execution_context_t *ec = GET_EC(); + + assert(RARRAY_LEN(q->que) == 0); + assert(queue_closed_p(self) == 0); + + struct queue_waiter queue_waiter = { + .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr}, + .as = {.q = q} + }; + + struct list_head *waitq = queue_waitq(q); + + list_add_tail(waitq, &queue_waiter.w.node); + queue_waiter.as.q->num_waiting++; + + rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter); + } } - return rb_ary_shift(GET_QUEUE_QUE(self)); + return rb_ary_shift(q->que); } static int @@ -815,7 +1051,7 @@ queue_pop_should_block(int argc, const VALUE *argv) } /* - * Document-method: Queue#pop + * Document-method: Thread::Queue#pop * call-seq: * pop(non_block=false) * deq(non_block=false) @@ -832,11 +1068,11 @@ static VALUE rb_queue_pop(int argc, VALUE *argv, VALUE self) { int should_block = queue_pop_should_block(argc, argv); - return queue_do_pop(self, should_block); + return queue_do_pop(self, queue_ptr(self), should_block); } /* - * Document-method: Queue#empty? + * Document-method: Thread::Queue#empty? * call-seq: empty? * * Returns +true+ if the queue is empty. @@ -845,11 +1081,11 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self) static VALUE rb_queue_empty_p(VALUE self) { - return queue_length(self) == 0 ? Qtrue : Qfalse; + return RBOOL(queue_length(self, queue_ptr(self)) == 0); } /* - * Document-method: Queue#clear + * Document-method: Thread::Queue#clear * * Removes all objects from the queue. */ @@ -857,12 +1093,14 @@ rb_queue_empty_p(VALUE self) static VALUE rb_queue_clear(VALUE self) { - rb_ary_clear(GET_QUEUE_QUE(self)); + struct rb_queue *q = queue_ptr(self); + + rb_ary_clear(check_array(self, q->que)); return self; } /* - * Document-method: Queue#length + * Document-method: Thread::Queue#length * call-seq: * length * size @@ -873,12 +1111,11 @@ rb_queue_clear(VALUE self) static VALUE rb_queue_length(VALUE self) { - unsigned long len = queue_length(self); - return ULONG2NUM(len); + return LONG2NUM(queue_length(self, queue_ptr(self))); } /* - * Document-method: Queue#num_waiting + * Document-method: Thread::Queue#num_waiting * * Returns the number of threads waiting on the queue. */ @@ -886,17 +1123,18 @@ rb_queue_length(VALUE self) static VALUE rb_queue_num_waiting(VALUE self) { - unsigned long len = queue_num_waiting(self); - return ULONG2NUM(len); + struct rb_queue *q = queue_ptr(self); + + return INT2NUM(q->num_waiting); } /* - * Document-class: SizedQueue + * Document-class: Thread::SizedQueue * * This class represents queues of specified size capacity. The push operation * may be blocked if the capacity is full. * - * See Queue for an example of how a SizedQueue works. + * See Thread::Queue for an example of how a Thread::SizedQueue works. */ /* @@ -910,26 +1148,27 @@ static VALUE rb_szqueue_initialize(VALUE self, VALUE vmax) { long max; + struct rb_szqueue *sq = szqueue_ptr(self); max = NUM2LONG(vmax); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } - RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); - RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); - RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); - RSTRUCT_SET(self, SZQUEUE_MAX, vmax); + RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new()); + list_head_init(szqueue_waitq(sq)); + list_head_init(szqueue_pushq(sq)); + sq->max = max; return self; } /* - * Document-method: SizedQueue#close + * Document-method: Thread::SizedQueue#close * call-seq: * close * - * Similar to Queue#close. + * Similar to Thread::Queue#close. * * The difference is behavior with waiting enqueuing threads. * @@ -939,11 +1178,18 @@ rb_szqueue_initialize(VALUE self, VALUE vmax) static VALUE rb_szqueue_close(VALUE self) { - return queue_do_close(self, TRUE); + if (!queue_closed_p(self)) { + struct rb_szqueue *sq = szqueue_ptr(self); + + FL_SET(self, QUEUE_CLOSED); + wakeup_all(szqueue_waitq(sq)); + wakeup_all(szqueue_pushq(sq)); + } + return self; } /* - * Document-method: SizedQueue#max + * Document-method: Thread::SizedQueue#max * * Returns the maximum size of the queue. */ @@ -951,11 +1197,11 @@ rb_szqueue_close(VALUE self) static VALUE rb_szqueue_max_get(VALUE self) { - return GET_SZQUEUE_MAX(self); + return LONG2NUM(szqueue_ptr(self)->max); } /* - * Document-method: SizedQueue#max= + * Document-method: Thread::SizedQueue#max= * call-seq: max=(number) * * Sets the maximum size of the queue to the given +number+. @@ -964,19 +1210,18 @@ rb_szqueue_max_get(VALUE self) static VALUE rb_szqueue_max_set(VALUE self, VALUE vmax) { - long max = NUM2LONG(vmax), diff = 0; - VALUE t; + long max = NUM2LONG(vmax); + long diff = 0; + struct rb_szqueue *sq = szqueue_ptr(self); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } - if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) { - diff = max - GET_SZQUEUE_ULONGMAX(self); - } - RSTRUCT_SET(self, SZQUEUE_MAX, vmax); - while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) { - rb_thread_wakeup_alive(t); + if (max > sq->max) { + diff = max - sq->max; } + sq->max = max; + sync_wakeup(szqueue_pushq(sq), diff); return vmax; } @@ -992,7 +1237,7 @@ szqueue_push_should_block(int argc, const VALUE *argv) } /* - * Document-method: SizedQueue#push + * Document-method: Thread::SizedQueue#push * call-seq: * push(object, non_block=false) * enq(object, non_block=false) @@ -1008,46 +1253,54 @@ szqueue_push_should_block(int argc, const VALUE *argv) static VALUE rb_szqueue_push(int argc, VALUE *argv, VALUE self) { - struct waiting_delete args; + struct rb_szqueue *sq = szqueue_ptr(self); int should_block = szqueue_push_should_block(argc, argv); - args.waiting = GET_SZQUEUE_WAITERS(self); - args.th = rb_thread_current(); - - while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) { - if (!should_block) { - rb_raise(rb_eThreadError, "queue full"); - } - else if (queue_closed_p(self)) { - goto closed; - } - else { - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args); - } + + while (queue_length(self, &sq->q) >= sq->max) { + if (!should_block) { + rb_raise(rb_eThreadError, "queue full"); + } + else if (queue_closed_p(self)) { + break; + } + else { + rb_execution_context_t *ec = GET_EC(); + struct queue_waiter queue_waiter = { + .w = {.self = self, .th = ec->thread_ptr, .fiber = ec->fiber_ptr}, + .as = {.sq = sq} + }; + + struct list_head *pushq = szqueue_pushq(sq); + + list_add_tail(pushq, &queue_waiter.w.node); + sq->num_waiting_push++; + + rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&queue_waiter); + } } if (queue_closed_p(self)) { - closed: - raise_closed_queue_error(self); + raise_closed_queue_error(self); } - return queue_do_push(self, argv[0]); + return queue_do_push(self, &sq->q, argv[0]); } static VALUE szqueue_do_pop(VALUE self, int should_block) { - VALUE retval = queue_do_pop(self, should_block); + struct rb_szqueue *sq = szqueue_ptr(self); + VALUE retval = queue_do_pop(self, &sq->q, should_block); - if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) { - wakeup_first_thread(GET_SZQUEUE_WAITERS(self)); + if (queue_length(self, &sq->q) < sq->max) { + wakeup_one(szqueue_pushq(sq)); } return retval; } /* - * Document-method: SizedQueue#pop + * Document-method: Thread::SizedQueue#pop * call-seq: * pop(non_block=false) * deq(non_block=false) @@ -1068,7 +1321,7 @@ rb_szqueue_pop(int argc, VALUE *argv, VALUE self) } /* - * Document-method: Queue#clear + * Document-method: Thread::SizedQueue#clear * * Removes all objects from the queue. */ @@ -1076,13 +1329,32 @@ rb_szqueue_pop(int argc, VALUE *argv, VALUE self) static VALUE rb_szqueue_clear(VALUE self) { - rb_ary_clear(GET_QUEUE_QUE(self)); - wakeup_all_threads(GET_SZQUEUE_WAITERS(self)); + struct rb_szqueue *sq = szqueue_ptr(self); + + rb_ary_clear(check_array(self, sq->q.que)); + wakeup_all(szqueue_pushq(sq)); return self; } /* - * Document-method: SizedQueue#num_waiting + * Document-method: Thread::SizedQueue#length + * call-seq: + * length + * size + * + * Returns the length of the queue. + */ + +static VALUE +rb_szqueue_length(VALUE self) +{ + struct rb_szqueue *sq = szqueue_ptr(self); + + return LONG2NUM(queue_length(self, &sq->q)); +} + +/* + * Document-method: Thread::SizedQueue#num_waiting * * Returns the number of threads waiting on the queue. */ @@ -1090,21 +1362,35 @@ rb_szqueue_clear(VALUE self) static VALUE rb_szqueue_num_waiting(VALUE self) { - long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self); - return ULONG2NUM(len); + struct rb_szqueue *sq = szqueue_ptr(self); + + return INT2NUM(sq->q.num_waiting + sq->num_waiting_push); } -/* ConditionalVariable */ +/* + * Document-method: Thread::SizedQueue#empty? + * call-seq: empty? + * + * Returns +true+ if the queue is empty. + */ -enum { - CONDVAR_WAITERS, - END_CONDVAR -}; +static VALUE +rb_szqueue_empty_p(VALUE self) +{ + struct rb_szqueue *sq = szqueue_ptr(self); + + return RBOOL(queue_length(self, &sq->q) == 0); +} -#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS) + +/* ConditionalVariable */ +struct rb_condvar { + struct list_head waitq; + rb_serial_t fork_gen; +}; /* - * Document-class: ConditionVariable + * Document-class: Thread::ConditionVariable * * ConditionVariable objects augment class Mutex. Using condition variables, * it is possible to suspend while in the middle of a critical section until a @@ -1112,10 +1398,8 @@ enum { * * Example: * - * require 'thread' - * - * mutex = Mutex.new - * resource = ConditionVariable.new + * mutex = Thread::Mutex.new + * resource = Thread::ConditionVariable.new * * a = Thread.new { * mutex.synchronize { @@ -1133,6 +1417,47 @@ enum { * } */ +static size_t +condvar_memsize(const void *ptr) +{ + return sizeof(struct rb_condvar); +} + +static const rb_data_type_t cv_data_type = { + "condvar", + {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + +static struct rb_condvar * +condvar_ptr(VALUE self) +{ + struct rb_condvar *cv; + rb_serial_t fork_gen = GET_VM()->fork_gen; + + TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv); + + /* forked children can't reach into parent thread stacks */ + if (cv->fork_gen != fork_gen) { + cv->fork_gen = fork_gen; + list_head_init(&cv->waitq); + } + + return cv; +} + +static VALUE +condvar_alloc(VALUE klass) +{ + struct rb_condvar *cv; + VALUE obj; + + obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv); + list_head_init(&cv->waitq); + + return obj; +} + /* * Document-method: ConditionVariable::new * @@ -1142,7 +1467,8 @@ enum { static VALUE rb_condvar_initialize(VALUE self) { - RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new()); + struct rb_condvar *cv = condvar_ptr(self); + list_head_init(&cv->waitq); return self; } @@ -1160,41 +1486,40 @@ do_sleep(VALUE args) return rb_funcallv(p->mutex, id_sleep, 1, &p->timeout); } -static VALUE -delete_current_thread(VALUE ary) -{ - return rb_ary_delete(ary, rb_thread_current()); -} - /* - * Document-method: ConditionVariable#wait + * Document-method: Thread::ConditionVariable#wait * call-seq: wait(mutex, timeout=nil) * * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. * * If +timeout+ is given, this method returns after +timeout+ seconds passed, * even if no other thread doesn't signal. + * + * Returns the slept result on +mutex+. */ static VALUE rb_condvar_wait(int argc, VALUE *argv, VALUE self) { - VALUE waiters = GET_CONDVAR_WAITERS(self); - VALUE mutex, timeout; + rb_execution_context_t *ec = GET_EC(); + + struct rb_condvar *cv = condvar_ptr(self); struct sleep_call args; - rb_scan_args(argc, argv, "11", &mutex, &timeout); + rb_scan_args(argc, argv, "11", &args.mutex, &args.timeout); - args.mutex = mutex; - args.timeout = timeout; - rb_ary_push(waiters, rb_thread_current()); - rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters); + struct sync_waiter sync_waiter = { + .self = args.mutex, + .th = ec->thread_ptr, + .fiber = ec->fiber_ptr + }; - return self; + list_add_tail(&cv->waitq, &sync_waiter.node); + return rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&sync_waiter); } /* - * Document-method: ConditionVariable#signal + * Document-method: Thread::ConditionVariable#signal * * Wakes up the first thread in line waiting for this lock. */ @@ -1202,12 +1527,13 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self) static VALUE rb_condvar_signal(VALUE self) { - wakeup_first_thread(GET_CONDVAR_WAITERS(self)); + struct rb_condvar *cv = condvar_ptr(self); + wakeup_one(&cv->waitq); return self; } /* - * Document-method: ConditionVariable#broadcast + * Document-method: Thread::ConditionVariable#broadcast * * Wakes up all threads waiting for this lock. */ @@ -1215,35 +1541,44 @@ rb_condvar_signal(VALUE self) static VALUE rb_condvar_broadcast(VALUE self) { - wakeup_all_threads(GET_CONDVAR_WAITERS(self)); + struct rb_condvar *cv = condvar_ptr(self); + wakeup_all(&cv->waitq); return self; } +NORETURN(static VALUE undumpable(VALUE obj)); /* :nodoc: */ static VALUE undumpable(VALUE obj) { rb_raise(rb_eTypeError, "can't dump %"PRIsVALUE, rb_obj_class(obj)); - UNREACHABLE; + UNREACHABLE_RETURN(Qnil); } -static void -alias_global_const(const char *name, VALUE klass) +static VALUE +define_thread_class(VALUE outer, const ID name, VALUE super) { - rb_define_const(rb_cObject, name, klass); + VALUE klass = rb_define_class_id_under(outer, name, super); + rb_const_set(rb_cObject, name, klass); + return klass; } static void Init_thread_sync(void) { -#if 0 - rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); /* teach rdoc ConditionVariable */ - rb_cQueue = rb_define_class("Queue", rb_cObject); /* teach rdoc Queue */ - rb_cSizedQueue = rb_define_class("SizedQueue", rb_cObject); /* teach rdoc SizedQueue */ +#undef rb_intern +#if defined(TEACH_RDOC) && TEACH_RDOC == 42 + rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject); + rb_cConditionVariable = rb_define_class_under(rb_cThread, "ConditionVariable", rb_cObject); + rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject); + rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cObject); #endif +#define DEFINE_CLASS(name, super) \ + rb_c##name = define_thread_class(rb_cThread, rb_intern(#name), rb_c##super) + /* Mutex */ - rb_cMutex = rb_define_class_under(rb_cThread, "Mutex", rb_cObject); + DEFINE_CLASS(Mutex, Object); rb_define_alloc_func(rb_cMutex, mutex_alloc); rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); @@ -1255,14 +1590,12 @@ Init_thread_sync(void) rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); /* Queue */ - rb_cQueue = rb_struct_define_without_accessor_under( - rb_cThread, - "Queue", rb_cObject, rb_struct_alloc_noinit, - "que", "waiters", NULL); + DEFINE_CLASS(Queue, Object); + rb_define_alloc_func(rb_cQueue, queue_alloc); rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration); - rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0); + rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, -1); rb_undef_method(rb_cQueue, "initialize_copy"); rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0); rb_define_method(rb_cQueue, "close", rb_queue_close, 0); @@ -1280,10 +1613,8 @@ Init_thread_sync(void) rb_define_alias(rb_cQueue, "shift", "pop"); rb_define_alias(rb_cQueue, "size", "length"); - rb_cSizedQueue = rb_struct_define_without_accessor_under( - rb_cThread, - "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, - "que", "waiters", "queue_waiters", "size", NULL); + DEFINE_CLASS(SizedQueue, Queue); + rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc); rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0); @@ -1291,19 +1622,20 @@ Init_thread_sync(void) rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1); rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1); + rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0); rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); + rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); rb_define_alias(rb_cSizedQueue, "enq", "push"); rb_define_alias(rb_cSizedQueue, "<<", "push"); rb_define_alias(rb_cSizedQueue, "deq", "pop"); rb_define_alias(rb_cSizedQueue, "shift", "pop"); + rb_define_alias(rb_cSizedQueue, "size", "length"); /* CVar */ - rb_cConditionVariable = rb_struct_define_without_accessor_under( - rb_cThread, - "ConditionVariable", rb_cObject, rb_struct_alloc_noinit, - "waiters", NULL); + DEFINE_CLASS(ConditionVariable, Object); + rb_define_alloc_func(rb_cConditionVariable, condvar_alloc); id_sleep = rb_intern("sleep"); @@ -1314,12 +1646,5 @@ Init_thread_sync(void) rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0); rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0); -#define ALIAS_GLOBAL_CONST(name) \ - alias_global_const(#name, rb_c##name) - - ALIAS_GLOBAL_CONST(Mutex); - ALIAS_GLOBAL_CONST(Queue); - ALIAS_GLOBAL_CONST(SizedQueue); - ALIAS_GLOBAL_CONST(ConditionVariable); rb_provide("thread.rb"); } |
