/* * Optimized Ruby Mutex implementation, loosely based on thread.rb by * Yukihiro Matsumoto * * Copyright 2006-2007 MenTaLguY * * RDoc taken from original. * * This file is made available under the same terms as Ruby. */ #include #include #include #include enum rb_thread_status rb_thread_status _((VALUE)); static VALUE rb_cMutex; static VALUE rb_cConditionVariable; static VALUE rb_cQueue; static VALUE rb_cSizedQueue; static VALUE set_critical(VALUE value); static VALUE thread_exclusive(VALUE (*func)(ANYARGS), VALUE arg) { VALUE critical = rb_thread_critical; rb_thread_critical = 1; return rb_ensure(func, arg, set_critical, (VALUE)critical); } /* * call-seq: * Thread.exclusive { block } => obj * * Wraps a block in Thread.critical, restoring the original value * upon exit from the critical section, and returns the value of the * block. */ static VALUE rb_thread_exclusive(void) { return thread_exclusive(rb_yield, Qundef); } typedef struct _Entry { VALUE value; struct _Entry *next; } Entry; typedef struct _List { Entry *entries; Entry *last_entry; Entry *entry_pool; unsigned long size; } List; static void init_list(List *list) { list->entries = NULL; list->last_entry = NULL; list->entry_pool = NULL; list->size = 0; } static void mark_list(List *list) { Entry *entry; for (entry = list->entries; entry; entry = entry->next) { rb_gc_mark(entry->value); } } static void free_entries(Entry *first) { Entry *next; while (first) { next = first->next; xfree(first); first = next; } } static void finalize_list(List *list) { free_entries(list->entries); free_entries(list->entry_pool); } static void push_list(List *list, VALUE value) { Entry *entry; if (list->entry_pool) { entry = list->entry_pool; list->entry_pool = entry->next; } else { entry = ALLOC(Entry); } entry->value = value; entry->next = NULL; if (list->last_entry) { list->last_entry->next = entry; } else { list->entries = entry; } list->last_entry = entry; ++list->size; } static void push_multiple_list(List *list, VALUE *values, unsigned count) { unsigned i; for (i = 0; i < count; i++) { push_list(list, values[i]); } } static void recycle_entries(List *list, Entry *first_entry, Entry *last_entry) { #ifdef USE_MEM_POOLS last_entry->next = list->entry_pool; list->entry_pool = first_entry; #else last_entry->next = NULL; free_entries(first_entry); #endif } static VALUE shift_list(List *list) { Entry *entry; VALUE value; entry = list->entries; if (!entry) return Qnil; list->entries = entry->next; if (entry == list->last_entry) { list->last_entry = NULL; } --list->size; value = entry->value; recycle_entries(list, entry, entry); return value; } static void remove_one(List *list, VALUE value) { Entry **ref; Entry *prev; Entry *entry; for (ref = &list->entries, prev = NULL, entry = list->entries; entry != NULL; ref = &entry->next, prev = entry, entry = entry->next) { if (entry->value == value) { *ref = entry->next; list->size--; if (!entry->next) { list->last_entry = prev; } recycle_entries(list, entry, entry); break; } } } static void clear_list(List *list) { if (list->last_entry) { recycle_entries(list, list->entries, list->last_entry); list->entries = NULL; list->last_entry = NULL; list->size = 0; } } static VALUE array_from_list(List const *list) { VALUE ary; Entry *entry; ary = rb_ary_new(); for (entry = list->entries; entry; entry = entry->next) { rb_ary_push(ary, entry->value); } return ary; } static VALUE wake_thread(VALUE thread) { return rb_thread_wakeup_alive(thread); } static VALUE run_thread(VALUE thread) { thread = wake_thread(thread); if (RTEST(thread) && !rb_thread_critical) rb_thread_schedule(); return thread; } static VALUE wake_one(List *list) { VALUE waking; waking = Qnil; while (list->entries && !RTEST(waking)) { waking = shift_list(list); if (waking == Qundef) break; waking = wake_thread(waking); } return waking; } static VALUE wake_all(List *list) { while (list->entries) { wake_one(list); } return Qnil; } static VALUE wait_list_inner(List *list) { push_list(list, rb_thread_current()); rb_thread_stop(); return Qnil; } static VALUE wait_list_cleanup(List *list) { /* cleanup in case of spurious wakeups */ remove_one(list, rb_thread_current()); return Qnil; } static void wait_list(List *list) { rb_ensure(wait_list_inner, (VALUE)list, wait_list_cleanup, (VALUE)list); } static void kill_waiting_threads(List *waiting) { Entry *entry; for (entry = waiting->entries; entry; entry = entry->next) { rb_thread_kill(entry->value); } } /* * Document-class: Mutex * * Mutex implements a simple semaphore that can be used to coordinate access to * shared data from multiple concurrent threads. * * Example: * * require 'thread' * semaphore = Mutex.new * * a = Thread.new { * semaphore.synchronize { * # access shared resource * } * } * * b = Thread.new { * semaphore.synchronize { * # access shared resource * } * } * */ typedef struct _Mutex { VALUE owner; List waiting; } Mutex; #define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner)) static void mark_mutex(Mutex *mutex) { rb_gc_mark(mutex->owner); mark_list(&mutex->waiting); } static void finalize_mutex(Mutex *mutex) { finalize_list(&mutex->waiting); } static void free_mutex(Mutex *mutex) { kill_waiting_threads(&mutex->waiting); finalize_mutex(mutex); xfree(mutex); } static void init_mutex(Mutex *mutex) { mutex->owner = Qnil; init_list(&mutex->waiting); } /* * Document-method: new * call-seq: Mutex.new * * Creates a new Mutex * */ static VALUE rb_mutex_alloc(VALUE klass) { Mutex *mutex; mutex = ALLOC(Mutex); init_mutex(mutex); return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex); } /* * Document-method: locked? * call-seq: locked? * * Returns +true+ if this lock is currently held by some thread. * */ static VALUE rb_mutex_locked_p(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse; } /* * Document-method: try_lock * call-seq: try_lock * * Attempts to obtain the lock and returns immediately. Returns +true+ if the * lock was granted. * */ static VALUE rb_mutex_try_lock(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); if (MUTEX_LOCKED_P(mutex)) return Qfalse; mutex->owner = rb_thread_current(); return Qtrue; } /* * Document-method: lock * call-seq: lock * * Attempts to grab the lock and waits if it isn't available. * */ static VALUE lock_mutex(Mutex *mutex) { VALUE current; current = rb_thread_current(); rb_thread_critical = 1; if (!MUTEX_LOCKED_P(mutex)) { mutex->owner = current; } else { do { wait_list(&mutex->waiting); rb_thread_critical = 1; if (!MUTEX_LOCKED_P(mutex)) { mutex->owner = current; break; } } while (mutex->owner != current); } rb_thread_critical = 0; return Qnil; } static VALUE rb_mutex_lock(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); lock_mutex(mutex); return self; } static VALUE relock_mutex(Mutex *mutex) { VALUE current = rb_thread_current(); switch (rb_thread_status(current)) { case THREAD_RUNNABLE: case THREAD_STOPPED: lock_mutex(mutex); break; default: break; } return Qundef; } /* * Document-method: unlock * * Releases the lock. Returns +nil+ if ref wasn't locked. * */ static VALUE unlock_mutex_inner(Mutex *mutex) { VALUE waking; if (mutex->owner != rb_thread_current()) { rb_raise(rb_eThreadError, "not owner"); } waking = wake_one(&mutex->waiting); mutex->owner = waking; return waking; } static VALUE set_critical(VALUE value) { rb_thread_critical = (int)value; return Qundef; } static VALUE unlock_mutex(Mutex *mutex) { VALUE waking = thread_exclusive(unlock_mutex_inner, (VALUE)mutex); if (!RTEST(waking)) { return Qfalse; } run_thread(waking); return Qtrue; } static VALUE rb_mutex_unlock(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); if (RTEST(unlock_mutex(mutex))) { return self; } else { return Qnil; } } /* * Document-method: exclusive_unlock * call-seq: exclusive_unlock { ... } * * If the mutex is locked, unlocks the mutex, wakes one waiting thread, and * yields in a critical section. * */ static VALUE rb_mutex_exclusive_unlock_inner(Mutex *mutex) { VALUE waking; waking = unlock_mutex_inner(mutex); rb_yield(Qundef); return waking; } static VALUE rb_mutex_exclusive_unlock(VALUE self) { Mutex *mutex; VALUE waking; Data_Get_Struct(self, Mutex, mutex); waking = thread_exclusive(rb_mutex_exclusive_unlock_inner, (VALUE)mutex); if (!RTEST(waking)) { return Qnil; } run_thread(waking); return self; } /* * Document-method: synchronize * call-seq: synchronize { ... } * * Obtains a lock, runs the block, and releases the lock when the block * completes. See the example under Mutex. * */ static VALUE rb_mutex_synchronize(VALUE self) { rb_mutex_lock(self); return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self); } /* * Document-class: ConditionVariable * * ConditionVariable objects augment class Mutex. Using condition variables, * it is possible to suspend while in the middle of a critical section until a * resource becomes available. * * Example: * * require 'thread' * * mutex = Mutex.new * resource = ConditionVariable.new * * a = Thread.new { * mutex.synchronize { * # Thread 'a' now needs the resource * resource.wait(mutex) * # 'a' can now have the resource * } * } * * b = Thread.new { * mutex.synchronize { * # Thread 'b' has finished using the resource * resource.signal * } * } * */ typedef struct _ConditionVariable { List waiting; } ConditionVariable; static void mark_condvar(ConditionVariable *condvar) { mark_list(&condvar->waiting); } static void finalize_condvar(ConditionVariable *condvar) { finalize_list(&condvar->waiting); } static void free_condvar(ConditionVariable *condvar) { kill_waiting_threads(&condvar->waiting); finalize_condvar(condvar); xfree(condvar); } static void init_condvar(ConditionVariable *condvar) { init_list(&condvar->waiting); } /* * Document-method: new * call-seq: ConditionVariable.new * * Creates a new ConditionVariable * */ static VALUE rb_condvar_alloc(VALUE klass) { ConditionVariable *condvar; condvar = ALLOC(ConditionVariable); init_condvar(condvar); return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar); } /* * Document-method: wait * call-seq: wait * * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. * */ static void wait_condvar(ConditionVariable *condvar, Mutex *mutex) { VALUE waking; rb_thread_critical = 1; if (rb_thread_current() != mutex->owner) { rb_thread_critical = 0; rb_raise(rb_eThreadError, "not owner of the synchronization mutex"); } waking = unlock_mutex_inner(mutex); if (RTEST(waking)) { wake_thread(waking); } rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex); } static VALUE legacy_exclusive_unlock(VALUE mutex) { return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0); } typedef struct { ConditionVariable *condvar; VALUE mutex; } legacy_wait_args; static VALUE legacy_wait(VALUE unused, legacy_wait_args *args) { wait_list(&args->condvar->waiting); rb_funcall(args->mutex, rb_intern("lock"), 0); return Qnil; } static VALUE rb_condvar_wait(VALUE self, VALUE mutex_v) { ConditionVariable *condvar; Data_Get_Struct(self, ConditionVariable, condvar); if (CLASS_OF(mutex_v) != rb_cMutex) { /* interoperate with legacy mutex */ legacy_wait_args args; args.condvar = condvar; args.mutex = mutex_v; rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args); } else { Mutex *mutex; Data_Get_Struct(mutex_v, Mutex, mutex); wait_condvar(condvar, mutex); } return self; } /* * Document-method: broadcast * call-seq: broadcast * * Wakes up all threads waiting for this condition. * */ static VALUE rb_condvar_broadcast(VALUE self) { ConditionVariable *condvar; Data_Get_Struct(self, ConditionVariable, condvar); thread_exclusive(wake_all, (VALUE)&condvar->waiting); rb_thread_schedule(); return self; } /* * Document-method: signal * call-seq: signal * * Wakes up the first thread in line waiting for this condition. * */ static void signal_condvar(ConditionVariable *condvar) { VALUE waking = thread_exclusive(wake_one, (VALUE)&condvar->waiting); if (RTEST(waking)) { run_thread(waking); } } static VALUE rb_condvar_signal(VALUE self) { ConditionVariable *condvar; Data_Get_Struct(self, ConditionVariable, condvar); signal_condvar(condvar); return self; } /* * Document-class: Queue * * This class provides a way to synchronize communication between threads. * * Example: * * require 'thread' * * queue = Queue.new * * producer = Thread.new do * 5.times do |i| * sleep rand(i) # simulate expense * queue << i * puts "#{i} produced" * end * end * * consumer = Thread.new do * 5.times do |i| * value = queue.pop * sleep rand(i/2) # simulate expense * puts "consumed #{value}" * end * end * * consumer.join * */ typedef struct _Queue { Mutex mutex; ConditionVariable value_available; ConditionVariable space_available; List values; unsigned long capacity; } Queue; static void mark_queue(Queue *queue) { mark_mutex(&queue->mutex); mark_condvar(&queue->value_available); mark_condvar(&queue->space_available); mark_list(&queue->values); } static void finalize_queue(Queue *queue) { finalize_mutex(&queue->mutex); finalize_condvar(&queue->value_available); finalize_condvar(&queue->space_available); finalize_list(&queue->values); } static void free_queue(Queue *queue) { kill_waiting_threads(&queue->mutex.waiting); kill_waiting_threads(&queue->space_available.waiting); kill_waiting_threads(&queue->value_available.waiting); finalize_queue(queue); xfree(queue); } static void init_queue(Queue *queue) { init_mutex(&queue->mutex); init_condvar(&queue->value_available); init_condvar(&queue->space_available); init_list(&queue->values); queue->capacity = 0; } /* * Document-method: new * call-seq: new * * Creates a new queue. * */ static VALUE rb_queue_alloc(VALUE klass) { Queue *queue; queue = ALLOC(Queue); init_queue(queue); return Data_Wrap_Struct(klass, mark_queue, free_queue, queue); } static VALUE rb_queue_marshal_load(VALUE self, VALUE data) { Queue *queue; VALUE array; Data_Get_Struct(self, Queue, queue); array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { rb_raise(rb_eTypeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { rb_raise(rb_eArgError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array)); push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); return self; } static VALUE rb_queue_marshal_dump(VALUE self) { Queue *queue; VALUE array; Data_Get_Struct(self, Queue, queue); array = array_from_list(&queue->values); rb_ary_unshift(array, ULONG2NUM(queue->capacity)); return rb_marshal_dump(array, Qnil); } /* * Document-method: clear * call-seq: clear * * Removes all objects from the queue. * */ static VALUE rb_queue_clear(VALUE self) { Queue *queue; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); clear_list(&queue->values); signal_condvar(&queue->space_available); unlock_mutex(&queue->mutex); return self; } /* * Document-method: empty? * call-seq: empty? * * Returns +true+ if the queue is empty. * */ static VALUE rb_queue_empty_p(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = queue->values.size == 0 ? Qtrue : Qfalse; unlock_mutex(&queue->mutex); return result; } /* * Document-method: length * call-seq: length * * Returns the length of the queue. * */ static VALUE rb_queue_length(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->values.size); unlock_mutex(&queue->mutex); return result; } /* * Document-method: num_waiting * call-seq: num_waiting * * Returns the number of threads waiting on the queue. * */ static VALUE rb_queue_num_waiting(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->value_available.waiting.size + queue->space_available.waiting.size); unlock_mutex(&queue->mutex); return result; } /* * Document-method: pop * call_seq: pop(non_block=false) * * Retrieves data from the queue. If the queue is empty, the calling thread is * suspended until data is pushed onto the queue. If +non_block+ is true, the * thread isn't suspended, and an exception is raised. * */ static VALUE rb_queue_pop(int argc, VALUE *argv, VALUE self) { Queue *queue; int should_block; VALUE result; Data_Get_Struct(self, Queue, queue); if (argc == 0) { should_block = 1; } else if (argc == 1) { should_block = !RTEST(argv[0]); } else { rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc); } lock_mutex(&queue->mutex); if (!queue->values.entries && !should_block) { unlock_mutex(&queue->mutex); rb_raise(rb_eThreadError, "queue empty"); } while (!queue->values.entries) { wait_condvar(&queue->value_available, &queue->mutex); } result = shift_list(&queue->values); if (queue->capacity && queue->values.size < queue->capacity) { signal_condvar(&queue->space_available); } unlock_mutex(&queue->mutex); return result; } /* * Document-method: push * call-seq: push(obj) * * Pushes +obj+ to the queue. * */ static VALUE rb_queue_push(VALUE self, VALUE value) { Queue *queue; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); while (queue->capacity && queue->values.size >= queue->capacity) { wait_condvar(&queue->space_available, &queue->mutex); } push_list(&queue->values, value); signal_condvar(&queue->value_available); unlock_mutex(&queue->mutex); return self; } /* * Document-class: SizedQueue * * This class represents queues of specified size capacity. The push operation * may be blocked if the capacity is full. * * See Queue for an example of how a SizedQueue works. * */ /* * Document-method: new * call-seq: new * * Creates a fixed-length queue with a maximum size of +max+. * */ /* * Document-method: max * call-seq: max * * Returns the maximum size of the queue. * */ static VALUE rb_sized_queue_max(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->capacity); unlock_mutex(&queue->mutex); return result; } /* * Document-method: max= * call-seq: max=(size) * * Sets the maximum size of the queue. * */ static VALUE rb_sized_queue_max_set(VALUE self, VALUE value) { Queue *queue; unsigned long new_capacity; unsigned long difference; Data_Get_Struct(self, Queue, queue); new_capacity = NUM2ULONG(value); if (new_capacity < 1) { rb_raise(rb_eArgError, "value must be positive"); } lock_mutex(&queue->mutex); if (queue->capacity && new_capacity > queue->capacity) { difference = new_capacity - queue->capacity; } else { difference = 0; } queue->capacity = new_capacity; for (; difference > 0; --difference) { signal_condvar(&queue->space_available); } unlock_mutex(&queue->mutex); return self; } /* * Document-method: push * call-seq: push(obj) * * Pushes +obj+ to the queue. If there is no space left in the queue, waits * until space becomes available. * */ /* * Document-method: pop * call-seq: pop(non_block=false) * * Retrieves data from the queue and runs a waiting thread, if any. * */ /* for marshalling mutexes and condvars */ static VALUE dummy_load(VALUE self, VALUE string) { return Qnil; } static VALUE dummy_dump(VALUE self) { return rb_str_new2(""); } void Init_thread(void) { rb_define_singleton_method(rb_cThread, "exclusive", rb_thread_exclusive, 0); rb_cMutex = rb_define_class("Mutex", rb_cObject); rb_define_alloc_func(rb_cMutex, rb_mutex_alloc); rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1); rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0); rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0); rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0); rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0); rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc); rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1); rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0); rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1); rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0); rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0); rb_cQueue = rb_define_class("Queue", rb_cObject); rb_define_alloc_func(rb_cQueue, rb_queue_alloc); rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1); rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0); rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cQueue, "push", rb_queue_push, 1); rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push")); rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push")); rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop")); rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop")); rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length")); rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue); rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1); rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0); rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1); rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0); rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1); rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push")); rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push")); rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop")); rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop")); }