From 0de5e1cb87c24c650a32dfb73ea6fdbd651929ca Mon Sep 17 00:00:00 2001 From: ko1 Date: Fri, 21 Aug 2015 23:36:23 +0000 Subject: * ext/thread/thread.c: move definitions of Queue, SizedQueue and ConditionalVariables to thread_tools.c. In other wowrds, such classes are built-in. [Feature #8919] At first, I planned to embed only a Queue class. However, rubygems requires 'thread.rb' (rubygems are required at first, when launch MRI without --disable-gems). So most of people require 'thread.rb' as an embedded library. Now, ext/thread/thread.c is empty, only for a dummy for compatibility. * thread.c: move a definition of Mutex class to thread_tools.c. And define Mutex class under Thread (so now Mutex is Thread::Mutex). Because other thread related classes are also defined under Thread. We remain ::Mutex as Thread::Mutex. Only an inspect result is changed. * common.mk: add dependency from thread.o to thraed_tools.c. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@51660 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- thread.c | 525 +-------------------------------------------------------------- 1 file changed, 4 insertions(+), 521 deletions(-) (limited to 'thread.c') diff --git a/thread.c b/thread.c index f64a45d708..e591e65dc2 100644 --- a/thread.c +++ b/thread.c @@ -75,7 +75,6 @@ #define THREAD_DEBUG 0 #endif -VALUE rb_cMutex; VALUE rb_cThreadShield; static VALUE sym_immediate; @@ -332,6 +331,8 @@ rb_thread_debug( } #endif +#include "thread_tools.c" + void rb_vm_gvl_destroy(rb_vm_t *vm) { @@ -445,21 +446,6 @@ terminate_all(rb_vm_t *vm, const rb_thread_t *main_thread) } } -typedef struct rb_mutex_struct -{ - rb_nativethread_lock_t lock; - rb_nativethread_cond_t cond; - struct rb_thread_struct volatile *th; - struct rb_mutex_struct *next_mutex; - int cond_waiting; - int allow_trap; -} rb_mutex_t; - -static void rb_mutex_abandon_all(rb_mutex_t *mutexes); -static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); -static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); -static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); - void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) { @@ -4140,500 +4126,6 @@ thgroup_add(VALUE group, VALUE thread) return group; } - -/* - * Document-class: Mutex - * - * Mutex implements a simple semaphore that can be used to coordinate access to - * shared data from multiple concurrent threads. - * - * Example: - * - * require 'thread' - * semaphore = Mutex.new - * - * a = Thread.new { - * semaphore.synchronize { - * # access shared resource - * } - * } - * - * b = Thread.new { - * semaphore.synchronize { - * # access shared resource - * } - * } - * - */ - -#define GetMutexPtr(obj, tobj) \ - TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) - -#define mutex_mark NULL - -static void -mutex_free(void *ptr) -{ - if (ptr) { - rb_mutex_t *mutex = ptr; - if (mutex->th) { - /* rb_warn("free locked mutex"); */ - const char *err = rb_mutex_unlock_th(mutex, mutex->th); - if (err) rb_bug("%s", err); - } - native_mutex_destroy(&mutex->lock); - native_cond_destroy(&mutex->cond); - } - ruby_xfree(ptr); -} - -static size_t -mutex_memsize(const void *ptr) -{ - return ptr ? sizeof(rb_mutex_t) : 0; -} - -static const rb_data_type_t mutex_data_type = { - "mutex", - {mutex_mark, mutex_free, mutex_memsize,}, - 0, 0, RUBY_TYPED_FREE_IMMEDIATELY -}; - -VALUE -rb_obj_is_mutex(VALUE obj) -{ - if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { - return Qtrue; - } - else { - return Qfalse; - } -} - -static VALUE -mutex_alloc(VALUE klass) -{ - VALUE obj; - rb_mutex_t *mutex; - - obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); - native_mutex_initialize(&mutex->lock); - native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); - return obj; -} - -/* - * call-seq: - * Mutex.new -> mutex - * - * Creates a new Mutex - */ -static VALUE -mutex_initialize(VALUE self) -{ - return self; -} - -VALUE -rb_mutex_new(void) -{ - return mutex_alloc(rb_cMutex); -} - -/* - * call-seq: - * mutex.locked? -> true or false - * - * Returns +true+ if this lock is currently held by some thread. - */ -VALUE -rb_mutex_locked_p(VALUE self) -{ - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); - return mutex->th ? Qtrue : Qfalse; -} - -static void -mutex_locked(rb_thread_t *th, VALUE self) -{ - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); - - if (th->keeping_mutexes) { - mutex->next_mutex = th->keeping_mutexes; - } - th->keeping_mutexes = mutex; -} - -/* - * call-seq: - * mutex.try_lock -> true or false - * - * Attempts to obtain the lock and returns immediately. Returns +true+ if the - * lock was granted. - */ -VALUE -rb_mutex_trylock(VALUE self) -{ - rb_mutex_t *mutex; - VALUE locked = Qfalse; - GetMutexPtr(self, mutex); - - native_mutex_lock(&mutex->lock); - if (mutex->th == 0) { - rb_thread_t *th = GET_THREAD(); - mutex->th = th; - locked = Qtrue; - - mutex_locked(th, self); - } - native_mutex_unlock(&mutex->lock); - - return locked; -} - -static int -lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) -{ - int interrupted = 0; - int err = 0; - - mutex->cond_waiting++; - for (;;) { - if (!mutex->th) { - mutex->th = th; - break; - } - if (RUBY_VM_INTERRUPTED(th)) { - interrupted = 1; - break; - } - if (err == ETIMEDOUT) { - interrupted = 2; - break; - } - - if (timeout_ms) { - struct timespec timeout_rel; - struct timespec timeout; - - timeout_rel.tv_sec = 0; - timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; - timeout = native_cond_timeout(&mutex->cond, timeout_rel); - err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); - } - else { - native_cond_wait(&mutex->cond, &mutex->lock); - err = 0; - } - } - mutex->cond_waiting--; - - return interrupted; -} - -static void -lock_interrupt(void *ptr) -{ - rb_mutex_t *mutex = (rb_mutex_t *)ptr; - native_mutex_lock(&mutex->lock); - if (mutex->cond_waiting > 0) - native_cond_broadcast(&mutex->cond); - native_mutex_unlock(&mutex->lock); -} - -/* - * At maximum, only one thread can use cond_timedwait and watch deadlock - * periodically. Multiple polling thread (i.e. concurrent deadlock check) - * introduces new race conditions. [Bug #6278] [ruby-core:44275] - */ -static const rb_thread_t *patrol_thread = NULL; - -/* - * call-seq: - * mutex.lock -> self - * - * Attempts to grab the lock and waits if it isn't available. - * Raises +ThreadError+ if +mutex+ was locked by the current thread. - */ -VALUE -rb_mutex_lock(VALUE self) -{ - rb_thread_t *th = GET_THREAD(); - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); - - /* When running trap handler */ - if (!mutex->allow_trap && th->interrupt_mask & TRAP_INTERRUPT_MASK) { - rb_raise(rb_eThreadError, "can't be called from trap context"); - } - - if (rb_mutex_trylock(self) == Qfalse) { - if (mutex->th == th) { - rb_raise(rb_eThreadError, "deadlock; recursive locking"); - } - - while (mutex->th != th) { - int interrupted; - enum rb_thread_status prev_status = th->status; - volatile int timeout_ms = 0; - struct rb_unblock_callback oldubf; - - set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); - th->status = THREAD_STOPPED_FOREVER; - th->locking_mutex = self; - - native_mutex_lock(&mutex->lock); - th->vm->sleeper++; - /* - * Carefully! while some contended threads are in lock_func(), - * vm->sleepr is unstable value. we have to avoid both deadlock - * and busy loop. - */ - if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && - !patrol_thread) { - timeout_ms = 100; - patrol_thread = th; - } - - GVL_UNLOCK_BEGIN(); - interrupted = lock_func(th, mutex, (int)timeout_ms); - native_mutex_unlock(&mutex->lock); - GVL_UNLOCK_END(); - - if (patrol_thread == th) - patrol_thread = NULL; - - reset_unblock_function(th, &oldubf); - - th->locking_mutex = Qfalse; - if (mutex->th && interrupted == 2) { - rb_check_deadlock(th->vm); - } - if (th->status == THREAD_STOPPED_FOREVER) { - th->status = prev_status; - } - th->vm->sleeper--; - - if (mutex->th == th) mutex_locked(th, self); - - if (interrupted) { - RUBY_VM_CHECK_INTS_BLOCKING(th); - } - } - } - return self; -} - -/* - * call-seq: - * mutex.owned? -> true or false - * - * Returns +true+ if this lock is currently held by current thread. - */ -VALUE -rb_mutex_owned_p(VALUE self) -{ - VALUE owned = Qfalse; - rb_thread_t *th = GET_THREAD(); - rb_mutex_t *mutex; - - GetMutexPtr(self, mutex); - - if (mutex->th == th) - owned = Qtrue; - - return owned; -} - -static const char * -rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) -{ - const char *err = NULL; - - native_mutex_lock(&mutex->lock); - - if (mutex->th == 0) { - err = "Attempt to unlock a mutex which is not locked"; - } - else if (mutex->th != th) { - err = "Attempt to unlock a mutex which is locked by another thread"; - } - else { - mutex->th = 0; - if (mutex->cond_waiting > 0) - native_cond_signal(&mutex->cond); - } - - native_mutex_unlock(&mutex->lock); - - if (!err) { - rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes; - while (*th_mutex != mutex) { - th_mutex = &(*th_mutex)->next_mutex; - } - *th_mutex = mutex->next_mutex; - mutex->next_mutex = NULL; - } - - return err; -} - -/* - * call-seq: - * mutex.unlock -> self - * - * Releases the lock. - * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. - */ -VALUE -rb_mutex_unlock(VALUE self) -{ - const char *err; - rb_mutex_t *mutex; - GetMutexPtr(self, mutex); - - err = rb_mutex_unlock_th(mutex, GET_THREAD()); - if (err) rb_raise(rb_eThreadError, "%s", err); - - return self; -} - -static void -rb_mutex_abandon_keeping_mutexes(rb_thread_t *th) -{ - if (th->keeping_mutexes) { - rb_mutex_abandon_all(th->keeping_mutexes); - } - th->keeping_mutexes = NULL; -} - -static void -rb_mutex_abandon_locking_mutex(rb_thread_t *th) -{ - rb_mutex_t *mutex; - - if (!th->locking_mutex) return; - - GetMutexPtr(th->locking_mutex, mutex); - if (mutex->th == th) - rb_mutex_abandon_all(mutex); - th->locking_mutex = Qfalse; -} - -static void -rb_mutex_abandon_all(rb_mutex_t *mutexes) -{ - rb_mutex_t *mutex; - - while (mutexes) { - mutex = mutexes; - mutexes = mutex->next_mutex; - mutex->th = 0; - mutex->next_mutex = 0; - } -} - -static VALUE -rb_mutex_sleep_forever(VALUE time) -{ - sleep_forever(GET_THREAD(), 1, 0); /* permit spurious check */ - return Qnil; -} - -static VALUE -rb_mutex_wait_for(VALUE time) -{ - struct timeval *t = (struct timeval *)time; - sleep_timeval(GET_THREAD(), *t, 0); /* permit spurious check */ - return Qnil; -} - -VALUE -rb_mutex_sleep(VALUE self, VALUE timeout) -{ - time_t beg, end; - struct timeval t; - - if (!NIL_P(timeout)) { - t = rb_time_interval(timeout); - } - rb_mutex_unlock(self); - beg = time(0); - if (NIL_P(timeout)) { - rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); - } - else { - rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); - } - end = time(0) - beg; - return INT2FIX(end); -} - -/* - * call-seq: - * mutex.sleep(timeout = nil) -> number - * - * Releases the lock and sleeps +timeout+ seconds if it is given and - * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by - * the current thread. - * - * When the thread is next woken up, it will attempt to reacquire - * the lock. - * - * Note that this method can wakeup without explicit Thread#wakeup call. - * For example, receiving signal and so on. - */ -static VALUE -mutex_sleep(int argc, VALUE *argv, VALUE self) -{ - VALUE timeout; - - rb_scan_args(argc, argv, "01", &timeout); - return rb_mutex_sleep(self, timeout); -} - -/* - * call-seq: - * mutex.synchronize { ... } -> result of the block - * - * Obtains a lock, runs the block, and releases the lock when the block - * completes. See the example under +Mutex+. - */ - -VALUE -rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) -{ - rb_mutex_lock(mutex); - return rb_ensure(func, arg, rb_mutex_unlock, mutex); -} - -/* - * call-seq: - * mutex.synchronize { ... } -> result of the block - * - * Obtains a lock, runs the block, and releases the lock when the block - * completes. See the example under +Mutex+. - */ -static VALUE -rb_mutex_synchronize_m(VALUE self, VALUE args) -{ - if (!rb_block_given_p()) { - rb_raise(rb_eThreadError, "must be called with a block"); - } - - return rb_mutex_synchronize(self, rb_yield, Qundef); -} - -void rb_mutex_allow_trap(VALUE self, int val) -{ - rb_mutex_t *m; - GetMutexPtr(self, m); - - m->allow_trap = val; -} - /* * Document-class: ThreadShield */ @@ -5136,17 +4628,6 @@ Init_Thread(void) rb_define_const(cThGroup, "Default", th->thgroup); } - rb_cMutex = rb_define_class("Mutex", rb_cObject); - rb_define_alloc_func(rb_cMutex, mutex_alloc); - rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); - rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); - rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0); - rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); - rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); - rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1); - rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0); - rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); - recursive_key = rb_intern("__recursive_key__"); rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); @@ -5174,6 +4655,8 @@ Init_Thread(void) /* suppress warnings on cygwin, mingw and mswin.*/ (void)native_mutex_trylock; + + Init_thread_tools(); } int -- cgit v1.2.3