/* * Optimized Ruby Mutex implementation, loosely based on thread.rb by * Yukihiro Matsumoto * * Copyright 2006-2007 MenTaLguY * * RDoc taken from original. * * This file is made available under the same terms as Ruby. */ #include #include #include static VALUE rb_cMutex; static VALUE rb_cConditionVariable; static VALUE rb_cQueue; static VALUE rb_cSizedQueue; static VALUE thread_exclusive_do() { rb_thread_critical = Qtrue; return rb_yield(Qundef); } static VALUE thread_exclusive_ensure(val) VALUE val; { rb_thread_critical = val; return Qundef; } /* * call-seq: * Thread.exclusive { block } => obj * * Wraps a block in Thread.critical, restoring the original value * upon exit from the critical section, and returns the value of the * block. */ static VALUE rb_thread_exclusive() { return rb_ensure(thread_exclusive_do, Qundef, thread_exclusive_ensure, rb_thread_critical); } typedef struct _Entry { VALUE value; struct _Entry *next; } Entry; typedef struct _List { Entry *entries; Entry *last_entry; Entry *entry_pool; unsigned long size; } List; static void init_list(List *list) { list->entries = NULL; list->last_entry = NULL; list->entry_pool = NULL; list->size = 0; } static void mark_list(List *list) { Entry *entry; for (entry = list->entries; entry; entry = entry->next) { rb_gc_mark(entry->value); } } static void free_entries(Entry *first) { Entry *next; while (first) { next = first->next; free(first); first = next; } } static void finalize_list(List *list) { free_entries(list->entries); free_entries(list->entry_pool); } static void push_list(List *list, VALUE value) { Entry *entry; if (list->entry_pool) { entry = list->entry_pool; list->entry_pool = entry->next; } else { entry = (Entry *)malloc(sizeof(Entry)); } entry->value = value; entry->next = NULL; if (list->last_entry) { list->last_entry->next = entry; } else { list->entries = entry; } list->last_entry = entry; ++list->size; } static void push_multiple_list(List *list, VALUE *values, unsigned count) { unsigned i; for (i = 0; i < count; i++) { push_list(list, values[i]); } } static VALUE shift_list(List *list) { Entry *entry; VALUE value; entry = list->entries; if (!entry) return Qundef; list->entries = entry->next; if (entry == list->last_entry) { list->last_entry = NULL; } --list->size; value = entry->value; #ifdef USE_MEM_POOLS entry->next = list->entry_pool; list->entry_pool = entry; #else free(entry); #endif return value; } static void clear_list(List *list) { if (list->last_entry) { #ifdef USE_MEM_POOLS list->last_entry->next = list->entry_pool; list->entry_pool = list->entries; #else free_entries(list->entries); #endif list->entries = NULL; list->last_entry = NULL; list->size = 0; } } static VALUE array_from_list(List const *list) { VALUE ary; Entry *entry; ary = rb_ary_new(); for (entry = list->entries; entry; entry = entry->next) { rb_ary_push(ary, entry->value); } return ary; } static VALUE wake_thread(VALUE thread) { return rb_rescue2(rb_thread_wakeup, thread, NULL, Qundef, rb_eThreadError, 0); } static VALUE run_thread(VALUE thread) { return rb_rescue2(rb_thread_run, thread, NULL, Qundef, rb_eThreadError, 0); } static VALUE wake_one(List *list) { VALUE waking; waking = Qnil; while (list->entries && !RTEST(waking)) { waking = wake_thread(shift_list(list)); } return waking; } static VALUE wake_all(List *list) { while (list->entries) { wake_one(list); } return Qnil; } static void assert_no_survivors(List *waiting, const char *label, void *addr) { Entry *entry; for (entry = waiting->entries; entry; entry = entry->next) { if (RTEST(wake_thread(entry->value))) { rb_bug("%s %p freed with live thread(s) waiting", label, addr); } } } /* * Document-class: Mutex * * Mutex implements a simple semaphore that can be used to coordinate access to * shared data from multiple concurrent threads. * * Example: * * require 'thread' * semaphore = Mutex.new * * a = Thread.new { * semaphore.synchronize { * # access shared resource * } * } * * b = Thread.new { * semaphore.synchronize { * # access shared resource * } * } * */ typedef struct _Mutex { VALUE owner; List waiting; } Mutex; static void mark_mutex(Mutex *mutex) { rb_gc_mark(mutex->owner); mark_list(&mutex->waiting); } static void finalize_mutex(Mutex *mutex) { finalize_list(&mutex->waiting); } static void free_mutex(Mutex *mutex) { assert_no_survivors(&mutex->waiting, "mutex", mutex); finalize_mutex(mutex); free(mutex); } static void init_mutex(Mutex *mutex) { mutex->owner = Qnil; init_list(&mutex->waiting); } /* * Document-method: new * call-seq: Mutex.new * * Creates a new Mutex * */ static VALUE rb_mutex_alloc(VALUE klass) { Mutex *mutex; mutex = (Mutex *)malloc(sizeof(Mutex)); init_mutex(mutex); return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex); } /* * Document-method: locked? * call-seq: locked? * * Returns +true+ if this lock is currently held by some thread. * */ static VALUE rb_mutex_locked_p(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); return RTEST(mutex->owner) ? Qtrue : Qfalse; } /* * Document-method: try_lock * call-seq: try_lock * * Attempts to obtain the lock and returns immediately. Returns +true+ if the * lock was granted. * */ static VALUE rb_mutex_try_lock(VALUE self) { Mutex *mutex; VALUE result; Data_Get_Struct(self, Mutex, mutex); result = Qfalse; rb_thread_critical = 1; if (!RTEST(mutex->owner)) { mutex->owner = rb_thread_current(); result = Qtrue; } rb_thread_critical = 0; return result; } /* * Document-method: lock * call-seq: lock * * Attempts to grab the lock and waits if it isn't available. * */ static void lock_mutex(Mutex *mutex) { VALUE current; current = rb_thread_current(); rb_thread_critical = 1; while (RTEST(mutex->owner)) { push_list(&mutex->waiting, current); rb_thread_stop(); rb_thread_critical = 1; } mutex->owner = current; rb_thread_critical = 0; } static VALUE rb_mutex_lock(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); lock_mutex(mutex); return self; } /* * Document-method: unlock * * Releases the lock. Returns +nil+ if ref wasn't locked. * */ static VALUE unlock_mutex_inner(Mutex *mutex) { VALUE waking; if (!RTEST(mutex->owner)) { return Qundef; } mutex->owner = Qnil; waking = wake_one(&mutex->waiting); return waking; } static VALUE set_critical(VALUE value) { rb_thread_critical = (int)value; return Qnil; } static VALUE unlock_mutex(Mutex *mutex) { VALUE waking; rb_thread_critical = 1; waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0); if (waking == Qundef) { return Qfalse; } if (RTEST(waking)) { run_thread(waking); } return Qtrue; } static VALUE rb_mutex_unlock(VALUE self) { Mutex *mutex; Data_Get_Struct(self, Mutex, mutex); if (RTEST(unlock_mutex(mutex))) { return self; } else { return Qnil; } } /* * Document-method: exclusive_unlock * call-seq: exclusive_unlock { ... } * * If the mutex is locked, unlocks the mutex, wakes one waiting thread, and * yields in a critical section. * */ static VALUE rb_mutex_exclusive_unlock_inner(Mutex *mutex) { VALUE waking; waking = unlock_mutex_inner(mutex); rb_yield(Qundef); return waking; } static VALUE rb_mutex_exclusive_unlock(VALUE self) { Mutex *mutex; VALUE waking; Data_Get_Struct(self, Mutex, mutex); rb_thread_critical = 1; waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0); if (waking == Qundef) { return Qnil; } if (RTEST(waking)) { run_thread(waking); } return self; } /* * Document-method: synchronize * call-seq: synchronize { ... } * * Obtains a lock, runs the block, and releases the lock when the block * completes. See the example under Mutex. * */ static VALUE rb_mutex_synchronize(VALUE self) { rb_mutex_lock(self); return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self); } /* * Document-class: ConditionVariable * * ConditionVariable objects augment class Mutex. Using condition variables, * it is possible to suspend while in the middle of a critical section until a * resource becomes available. * * Example: * * require 'thread' * * mutex = Mutex.new * resource = ConditionVariable.new * * a = Thread.new { * mutex.synchronize { * # Thread 'a' now needs the resource * resource.wait(mutex) * # 'a' can now have the resource * } * } * * b = Thread.new { * mutex.synchronize { * # Thread 'b' has finished using the resource * resource.signal * } * } * */ typedef struct _ConditionVariable { List waiting; } ConditionVariable; static void mark_condvar(ConditionVariable *condvar) { mark_list(&condvar->waiting); } static void finalize_condvar(ConditionVariable *condvar) { finalize_list(&condvar->waiting); } static void free_condvar(ConditionVariable *condvar) { assert_no_survivors(&condvar->waiting, "condition variable", condvar); finalize_condvar(condvar); free(condvar); } static void init_condvar(ConditionVariable *condvar) { init_list(&condvar->waiting); } /* * Document-method: new * call-seq: ConditionVariable.new * * Creates a new ConditionVariable * */ static VALUE rb_condvar_alloc(VALUE klass) { ConditionVariable *condvar; condvar = (ConditionVariable *)malloc(sizeof(ConditionVariable)); init_condvar(condvar); return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar); } /* * Document-method: wait * call-seq: wait * * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. * */ static void wait_condvar(ConditionVariable *condvar, Mutex *mutex) { rb_thread_critical = 1; if (!RTEST(mutex->owner)) { rb_thread_critical = Qfalse; return; } if (mutex->owner != rb_thread_current()) { rb_thread_critical = Qfalse; rb_raise(rb_eThreadError, "Not owner"); } mutex->owner = Qnil; push_list(&condvar->waiting, rb_thread_current()); rb_thread_stop(); lock_mutex(mutex); } static VALUE legacy_exclusive_unlock(VALUE mutex) { return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0); } typedef struct { ConditionVariable *condvar; VALUE mutex; } legacy_wait_args; static VALUE legacy_wait(VALUE unused, legacy_wait_args *args) { push_list(&args->condvar->waiting, rb_thread_current()); rb_thread_stop(); rb_funcall(args->mutex, rb_intern("lock"), 0); return Qnil; } static VALUE rb_condvar_wait(VALUE self, VALUE mutex_v) { ConditionVariable *condvar; Data_Get_Struct(self, ConditionVariable, condvar); if (CLASS_OF(mutex_v) != rb_cMutex) { /* interoperate with legacy mutex */ legacy_wait_args args; args.condvar = condvar; args.mutex = mutex_v; rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args); } else { Mutex *mutex; Data_Get_Struct(mutex_v, Mutex, mutex); wait_condvar(condvar, mutex); } return self; } /* * Document-method: broadcast * call-seq: broadcast * * Wakes up all threads waiting for this condition. * */ static VALUE rb_condvar_broadcast(VALUE self) { ConditionVariable *condvar; Data_Get_Struct(self, ConditionVariable, condvar); rb_thread_critical = 1; rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0); rb_thread_schedule(); return self; } /* * Document-method: signal * call-seq: signal * * Wakes up the first thread in line waiting for this condition. * */ static void signal_condvar(ConditionVariable *condvar) { VALUE waking; rb_thread_critical = 1; waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0); if (RTEST(waking)) { run_thread(waking); } } static VALUE rb_condvar_signal(VALUE self) { ConditionVariable *condvar; Data_Get_Struct(self, ConditionVariable, condvar); signal_condvar(condvar); return self; } /* * Document-class: Queue * * This class provides a way to synchronize communication between threads. * * Example: * * require 'thread' * * queue = Queue.new * * producer = Thread.new do * 5.times do |i| * sleep rand(i) # simulate expense * queue << i * puts "#{i} produced" * end * end * * consumer = Thread.new do * 5.times do |i| * value = queue.pop * sleep rand(i/2) # simulate expense * puts "consumed #{value}" * end * end * * consumer.join * */ typedef struct _Queue { Mutex mutex; ConditionVariable value_available; ConditionVariable space_available; List values; unsigned long capacity; } Queue; static void mark_queue(Queue *queue) { mark_mutex(&queue->mutex); mark_condvar(&queue->value_available); mark_condvar(&queue->space_available); mark_list(&queue->values); } static void finalize_queue(Queue *queue) { finalize_mutex(&queue->mutex); finalize_condvar(&queue->value_available); finalize_condvar(&queue->space_available); finalize_list(&queue->values); } static void free_queue(Queue *queue) { assert_no_survivors(&queue->mutex.waiting, "queue", queue); assert_no_survivors(&queue->space_available.waiting, "queue", queue); assert_no_survivors(&queue->value_available.waiting, "queue", queue); finalize_queue(queue); free(queue); } static void init_queue(Queue *queue) { init_mutex(&queue->mutex); init_condvar(&queue->value_available); init_condvar(&queue->space_available); init_list(&queue->values); queue->capacity = 0; } /* * Document-method: new * call-seq: new * * Creates a new queue. * */ static VALUE rb_queue_alloc(VALUE klass) { Queue *queue; queue = (Queue *)malloc(sizeof(Queue)); init_queue(queue); return Data_Wrap_Struct(klass, mark_queue, free_queue, queue); } static VALUE rb_queue_marshal_load(VALUE self, VALUE data) { Queue *queue; VALUE array; Data_Get_Struct(self, Queue, queue); array = rb_marshal_load(data); if (TYPE(array) != T_ARRAY) { rb_raise(rb_eRuntimeError, "expected Array of queue data"); } if (RARRAY(array)->len < 1) { rb_raise(rb_eRuntimeError, "missing capacity value"); } queue->capacity = NUM2ULONG(rb_ary_shift(array)); push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); return self; } static VALUE rb_queue_marshal_dump(VALUE self) { Queue *queue; VALUE array; Data_Get_Struct(self, Queue, queue); array = array_from_list(&queue->values); rb_ary_unshift(array, ULONG2NUM(queue->capacity)); return rb_marshal_dump(array, Qnil); } /* * Document-method: clear * call-seq: clear * * Removes all objects from the queue. * */ static VALUE rb_queue_clear(VALUE self) { Queue *queue; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); clear_list(&queue->values); signal_condvar(&queue->space_available); unlock_mutex(&queue->mutex); return self; } /* * Document-method: empty? * call-seq: empty? * * Returns +true+ if the queue is empty. * */ static VALUE rb_queue_empty_p(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = queue->values.size == 0 ? Qtrue : Qfalse; unlock_mutex(&queue->mutex); return result; } /* * Document-method: length * call-seq: length * * Returns the length of the queue. * */ static VALUE rb_queue_length(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->values.size); unlock_mutex(&queue->mutex); return result; } /* * Document-method: num_waiting * call-seq: num_waiting * * Returns the number of threads waiting on the queue. * */ static VALUE rb_queue_num_waiting(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->value_available.waiting.size + queue->space_available.waiting.size); unlock_mutex(&queue->mutex); return result; } /* * Document-method: pop * call_seq: pop(non_block=false) * * Retrieves data from the queue. If the queue is empty, the calling thread is * suspended until data is pushed onto the queue. If +non_block+ is true, the * thread isn't suspended, and an exception is raised. * */ static VALUE rb_queue_pop(int argc, VALUE *argv, VALUE self) { Queue *queue; int should_block; VALUE result; Data_Get_Struct(self, Queue, queue); if (argc == 0) { should_block = 1; } else if (argc == 1) { should_block = !RTEST(argv[0]); } else { rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc); } lock_mutex(&queue->mutex); if (!queue->values.entries && !should_block) { unlock_mutex(&queue->mutex); rb_raise(rb_eThreadError, "queue empty"); } while (!queue->values.entries) { wait_condvar(&queue->value_available, &queue->mutex); } result = shift_list(&queue->values); if (queue->capacity && queue->values.size < queue->capacity) { signal_condvar(&queue->space_available); } unlock_mutex(&queue->mutex); return result; } /* * Document-method: push * call-seq: push(obj) * * Pushes +obj+ to the queue. * */ static VALUE rb_queue_push(VALUE self, VALUE value) { Queue *queue; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); while (queue->capacity && queue->values.size >= queue->capacity) { wait_condvar(&queue->space_available, &queue->mutex); } push_list(&queue->values, value); signal_condvar(&queue->value_available); unlock_mutex(&queue->mutex); return self; } /* * Document-class: SizedQueue * * This class represents queues of specified size capacity. The push operation * may be blocked if the capacity is full. * * See Queue for an example of how a SizedQueue works. * */ /* * Document-method: new * call-seq: new * * Creates a fixed-length queue with a maximum size of +max+. * */ /* * Document-method: max * call-seq: max * * Returns the maximum size of the queue. * */ static VALUE rb_sized_queue_max(VALUE self) { Queue *queue; VALUE result; Data_Get_Struct(self, Queue, queue); lock_mutex(&queue->mutex); result = ULONG2NUM(queue->capacity); unlock_mutex(&queue->mutex); return result; } /* * Document-method: max= * call-seq: max=(size) * * Sets the maximum size of the queue. * */ static VALUE rb_sized_queue_max_set(VALUE self, VALUE value) { Queue *queue; unsigned long new_capacity; unsigned long difference; Data_Get_Struct(self, Queue, queue); new_capacity = NUM2ULONG(value); if (new_capacity < 1) { rb_raise(rb_eArgError, "value must be positive"); } lock_mutex(&queue->mutex); if (queue->capacity && new_capacity > queue->capacity) { difference = new_capacity - queue->capacity; } else { difference = 0; } queue->capacity = new_capacity; for (; difference > 0; --difference) { signal_condvar(&queue->space_available); } unlock_mutex(&queue->mutex); return self; } /* * Document-method: push * call-seq: push(obj) * * Pushes +obj+ to the queue. If there is no space left in the queue, waits * until space becomes available. * */ /* * Document-method: pop * call-seq: pop(non_block=false) * * Retrieves data from the queue and runs a waiting thread, if any. * */ /* for marshalling mutexes and condvars */ static VALUE dummy_load(VALUE self, VALUE string) { return Qnil; } static VALUE dummy_dump(VALUE self) { return rb_str_new2(""); } void Init_thread(void) { rb_define_singleton_method(rb_cThread, "exclusive", rb_thread_exclusive, 0); rb_cMutex = rb_define_class("Mutex", rb_cObject); rb_define_alloc_func(rb_cMutex, rb_mutex_alloc); rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1); rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0); rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0); rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0); rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0); rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc); rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1); rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0); rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1); rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0); rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0); rb_cQueue = rb_define_class("Queue", rb_cObject); rb_define_alloc_func(rb_cQueue, rb_queue_alloc); rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1); rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0); rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cQueue, "push", rb_queue_push, 1); rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push")); rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push")); rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop")); rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop")); rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length")); rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue); rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1); rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0); rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1); rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0); rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1); rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push")); rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push")); rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop")); rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop")); }