diff options
Diffstat (limited to 'ruby_1_8_6/ext/thread')
-rw-r--r-- | ruby_1_8_6/ext/thread/extconf.rb | 9 | ||||
-rw-r--r-- | ruby_1_8_6/ext/thread/lib/thread.rb | 5 | ||||
-rw-r--r-- | ruby_1_8_6/ext/thread/thread.c | 1216 |
3 files changed, 1230 insertions, 0 deletions
diff --git a/ruby_1_8_6/ext/thread/extconf.rb b/ruby_1_8_6/ext/thread/extconf.rb new file mode 100644 index 0000000000..2f984efc6d --- /dev/null +++ b/ruby_1_8_6/ext/thread/extconf.rb @@ -0,0 +1,9 @@ +require 'mkmf' + +enable_config('fastthread', true) or exit + +if with_config('mem-pools', true) + $CPPFLAGS << ' -DUSE_MEM_POOLS' +end + +create_makefile("thread") diff --git a/ruby_1_8_6/ext/thread/lib/thread.rb b/ruby_1_8_6/ext/thread/lib/thread.rb new file mode 100644 index 0000000000..6c533aba39 --- /dev/null +++ b/ruby_1_8_6/ext/thread/lib/thread.rb @@ -0,0 +1,5 @@ +unless defined? Thread + fail "Thread not available for this ruby interpreter" +end + +require 'thread.so' diff --git a/ruby_1_8_6/ext/thread/thread.c b/ruby_1_8_6/ext/thread/thread.c new file mode 100644 index 0000000000..43dfc348ed --- /dev/null +++ b/ruby_1_8_6/ext/thread/thread.c @@ -0,0 +1,1216 @@ +/* + * Optimized Ruby Mutex implementation, loosely based on thread.rb by + * Yukihiro Matsumoto <matz@ruby-lang.org> + * + * Copyright 2006-2007 MenTaLguY <mental@rydia.net> + * + * RDoc taken from original. + * + * This file is made available under the same terms as Ruby. + */ + +#include <ruby.h> +#include <intern.h> +#include <rubysig.h> +#include <node.h> + +enum rb_thread_status rb_thread_status _((VALUE)); + +static VALUE rb_cMutex; +static VALUE rb_cConditionVariable; +static VALUE rb_cQueue; +static VALUE rb_cSizedQueue; + +static VALUE set_critical(VALUE value); + +static VALUE +thread_exclusive_do(void) +{ + rb_thread_critical = 1; + + return rb_yield(Qundef); +} + +/* + * call-seq: + * Thread.exclusive { block } => obj + * + * Wraps a block in Thread.critical, restoring the original value + * upon exit from the critical section, and returns the value of the + * block. + */ + +static VALUE +rb_thread_exclusive(void) +{ + return rb_ensure(thread_exclusive_do, Qundef, set_critical, rb_thread_critical); +} + +typedef struct _Entry { + VALUE value; + struct _Entry *next; +} Entry; + +typedef struct _List { + Entry *entries; + Entry *last_entry; + Entry *entry_pool; + unsigned long size; +} List; + +static void +init_list(List *list) +{ + list->entries = NULL; + list->last_entry = NULL; + list->entry_pool = NULL; + list->size = 0; +} + +static void +mark_list(List *list) +{ + Entry *entry; + for (entry = list->entries; entry; entry = entry->next) { + rb_gc_mark(entry->value); + } +} + +static void +free_entries(Entry *first) +{ + Entry *next; + while (first) { + next = first->next; + xfree(first); + first = next; + } +} + +static void +finalize_list(List *list) +{ + free_entries(list->entries); + free_entries(list->entry_pool); +} + +static void +push_list(List *list, VALUE value) +{ + Entry *entry; + + if (list->entry_pool) { + entry = list->entry_pool; + list->entry_pool = entry->next; + } else { + entry = ALLOC(Entry); + } + + entry->value = value; + entry->next = NULL; + + if (list->last_entry) { + list->last_entry->next = entry; + } else { + list->entries = entry; + } + list->last_entry = entry; + + ++list->size; +} + +static void +push_multiple_list(List *list, VALUE *values, unsigned count) +{ + unsigned i; + for (i = 0; i < count; i++) { + push_list(list, values[i]); + } +} + +static void +recycle_entries(List *list, Entry *first_entry, Entry *last_entry) +{ +#ifdef USE_MEM_POOLS + last_entry->next = list->entry_pool; + list->entry_pool = first_entry; +#else + last_entry->next = NULL; + free_entries(first_entry); +#endif +} + +static VALUE +shift_list(List *list) +{ + Entry *entry; + VALUE value; + + entry = list->entries; + if (!entry) return Qundef; + + list->entries = entry->next; + if (entry == list->last_entry) { + list->last_entry = NULL; + } + + --list->size; + + value = entry->value; + recycle_entries(list, entry, entry); + + return value; +} + +static void +remove_one(List *list, VALUE value) +{ + Entry **ref; + Entry *prev; + Entry *entry; + + for (ref = &list->entries, prev = NULL, entry = list->entries; + entry != NULL; + ref = &entry->next, prev = entry, entry = entry->next) { + if (entry->value == value) { + *ref = entry->next; + list->size--; + if (!entry->next) { + list->last_entry = prev; + } + recycle_entries(list, entry, entry); + break; + } + } +} + +static void +clear_list(List *list) +{ + if (list->last_entry) { + recycle_entries(list, list->entries, list->last_entry); + list->entries = NULL; + list->last_entry = NULL; + list->size = 0; + } +} + +static VALUE +array_from_list(List const *list) +{ + VALUE ary; + Entry *entry; + ary = rb_ary_new(); + for (entry = list->entries; entry; entry = entry->next) { + rb_ary_push(ary, entry->value); + } + return ary; +} + +static VALUE +wake_thread(VALUE thread) +{ + return rb_thread_wakeup_alive(thread); +} + +static VALUE +run_thread(VALUE thread) +{ + thread = wake_thread(thread); + if (RTEST(thread) && !rb_thread_critical) + rb_thread_schedule(); + return thread; +} + +static VALUE +wake_one(List *list) +{ + VALUE waking; + + waking = Qnil; + while (list->entries && !RTEST(waking)) { + waking = shift_list(list); + if (waking == Qundef) break; + waking = wake_thread(waking); + } + + return waking; +} + +static VALUE +wake_all(List *list) +{ + while (list->entries) { + wake_one(list); + } + return Qnil; +} + +static VALUE +wait_list_inner(List *list) +{ + push_list(list, rb_thread_current()); + rb_thread_stop(); + return Qnil; +} + +static VALUE +wait_list_cleanup(List *list) +{ + /* cleanup in case of spurious wakeups */ + remove_one(list, rb_thread_current()); + return Qnil; +} + +static void +wait_list(List *list) +{ + rb_ensure(wait_list_inner, (VALUE)list, wait_list_cleanup, (VALUE)list); +} + +static void +assert_no_survivors(List *waiting, const char *label, void *addr) +{ + Entry *entry; + VALUE ths = 0; + + for (entry = waiting->entries; entry; entry = entry->next) { + if (RTEST(wake_thread(entry->value))) { + if (!ths) ths = rb_ary_new(); + rb_ary_push(ths, entry->value); + } + } + if (ths) { + rb_bug("%s %p freed with live thread(s) %s waiting", + label, addr, RSTRING_PTR(rb_inspect(ths))); + } +} + +/* + * Document-class: Mutex + * + * Mutex implements a simple semaphore that can be used to coordinate access to + * shared data from multiple concurrent threads. + * + * Example: + * + * require 'thread' + * semaphore = Mutex.new + * + * a = Thread.new { + * semaphore.synchronize { + * # access shared resource + * } + * } + * + * b = Thread.new { + * semaphore.synchronize { + * # access shared resource + * } + * } + * + */ + +typedef struct _Mutex { + VALUE owner; + List waiting; +} Mutex; + +#define MUTEX_LOCKED_P(mutex) (RTEST((mutex)->owner) && rb_thread_alive_p((mutex)->owner)) + +static void +mark_mutex(Mutex *mutex) +{ + rb_gc_mark(mutex->owner); + mark_list(&mutex->waiting); +} + +static void +finalize_mutex(Mutex *mutex) +{ + finalize_list(&mutex->waiting); +} + +static void +free_mutex(Mutex *mutex) +{ + assert_no_survivors(&mutex->waiting, "mutex", mutex); + finalize_mutex(mutex); + xfree(mutex); +} + +static void +init_mutex(Mutex *mutex) +{ + mutex->owner = Qnil; + init_list(&mutex->waiting); +} + +/* + * Document-method: new + * call-seq: Mutex.new + * + * Creates a new Mutex + * + */ + +static VALUE +rb_mutex_alloc(VALUE klass) +{ + Mutex *mutex; + mutex = ALLOC(Mutex); + init_mutex(mutex); + return Data_Wrap_Struct(klass, mark_mutex, free_mutex, mutex); +} + +/* + * Document-method: locked? + * call-seq: locked? + * + * Returns +true+ if this lock is currently held by some thread. + * + */ + +static VALUE +rb_mutex_locked_p(VALUE self) +{ + Mutex *mutex; + Data_Get_Struct(self, Mutex, mutex); + return MUTEX_LOCKED_P(mutex) ? Qtrue : Qfalse; +} + +/* + * Document-method: try_lock + * call-seq: try_lock + * + * Attempts to obtain the lock and returns immediately. Returns +true+ if the + * lock was granted. + * + */ + +static VALUE +rb_mutex_try_lock(VALUE self) +{ + Mutex *mutex; + + Data_Get_Struct(self, Mutex, mutex); + + if (MUTEX_LOCKED_P(mutex)) + return Qfalse; + + mutex->owner = rb_thread_current(); + return Qtrue; +} + +/* + * Document-method: lock + * call-seq: lock + * + * Attempts to grab the lock and waits if it isn't available. + * + */ + +static VALUE +lock_mutex(Mutex *mutex) +{ + VALUE current; + current = rb_thread_current(); + + rb_thread_critical = 1; + + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + } + else { + push_list(&mutex->waiting, current); + do { + rb_thread_stop(); + rb_thread_critical = 1; + if (!MUTEX_LOCKED_P(mutex)) { + mutex->owner = current; + break; + } + } while (mutex->owner != current); + } + + rb_thread_critical = 0; + return Qnil; +} + +static VALUE +rb_mutex_lock(VALUE self) +{ + Mutex *mutex; + Data_Get_Struct(self, Mutex, mutex); + lock_mutex(mutex); + return self; +} + +static VALUE +relock_mutex(Mutex *mutex) +{ + VALUE current = rb_thread_current(); + + switch (rb_thread_status(current)) { + case THREAD_RUNNABLE: + case THREAD_STOPPED: + lock_mutex(mutex); + break; + default: + break; + } + return Qundef; +} + +/* + * Document-method: unlock + * + * Releases the lock. Returns +nil+ if ref wasn't locked. + * + */ + +static VALUE +unlock_mutex_inner(Mutex *mutex) +{ + VALUE waking; + + if (mutex->owner != rb_thread_current()) { + rb_raise(rb_eThreadError, "not owner"); + } + + waking = wake_one(&mutex->waiting); + mutex->owner = waking; + + return waking; +} + +static VALUE +set_critical(VALUE value) +{ + rb_thread_critical = (int)value; + return Qundef; +} + +static VALUE +unlock_mutex(Mutex *mutex) +{ + VALUE waking; + + rb_thread_critical = 1; + waking = rb_ensure(unlock_mutex_inner, (VALUE)mutex, set_critical, 0); + if (!RTEST(waking)) { + return Qfalse; + } + + run_thread(waking); + + return Qtrue; +} + +static VALUE +rb_mutex_unlock(VALUE self) +{ + Mutex *mutex; + Data_Get_Struct(self, Mutex, mutex); + + if (RTEST(unlock_mutex(mutex))) { + return self; + } else { + return Qnil; + } +} + +/* + * Document-method: exclusive_unlock + * call-seq: exclusive_unlock { ... } + * + * If the mutex is locked, unlocks the mutex, wakes one waiting thread, and + * yields in a critical section. + * + */ + +static VALUE +rb_mutex_exclusive_unlock_inner(Mutex *mutex) +{ + VALUE waking; + waking = unlock_mutex_inner(mutex); + rb_yield(Qundef); + return waking; +} + +static VALUE +rb_mutex_exclusive_unlock(VALUE self) +{ + Mutex *mutex; + VALUE waking; + Data_Get_Struct(self, Mutex, mutex); + + rb_thread_critical = 1; + waking = rb_ensure(rb_mutex_exclusive_unlock_inner, (VALUE)mutex, set_critical, 0); + + if (waking == Qundef || !RTEST(waking)) { + return Qnil; + } + + run_thread(waking); + + return self; +} + +/* + * Document-method: synchronize + * call-seq: synchronize { ... } + * + * Obtains a lock, runs the block, and releases the lock when the block + * completes. See the example under Mutex. + * + */ + +static VALUE +rb_mutex_synchronize(VALUE self) +{ + rb_mutex_lock(self); + return rb_ensure(rb_yield, Qundef, rb_mutex_unlock, self); +} + +/* + * Document-class: ConditionVariable + * + * ConditionVariable objects augment class Mutex. Using condition variables, + * it is possible to suspend while in the middle of a critical section until a + * resource becomes available. + * + * Example: + * + * require 'thread' + * + * mutex = Mutex.new + * resource = ConditionVariable.new + * + * a = Thread.new { + * mutex.synchronize { + * # Thread 'a' now needs the resource + * resource.wait(mutex) + * # 'a' can now have the resource + * } + * } + * + * b = Thread.new { + * mutex.synchronize { + * # Thread 'b' has finished using the resource + * resource.signal + * } + * } + * + */ + +typedef struct _ConditionVariable { + List waiting; +} ConditionVariable; + +static void +mark_condvar(ConditionVariable *condvar) +{ + mark_list(&condvar->waiting); +} + +static void +finalize_condvar(ConditionVariable *condvar) +{ + finalize_list(&condvar->waiting); +} + +static void +free_condvar(ConditionVariable *condvar) +{ + assert_no_survivors(&condvar->waiting, "condition variable", condvar); + finalize_condvar(condvar); + xfree(condvar); +} + +static void +init_condvar(ConditionVariable *condvar) +{ + init_list(&condvar->waiting); +} + +/* + * Document-method: new + * call-seq: ConditionVariable.new + * + * Creates a new ConditionVariable + * + */ + +static VALUE +rb_condvar_alloc(VALUE klass) +{ + ConditionVariable *condvar; + + condvar = ALLOC(ConditionVariable); + init_condvar(condvar); + + return Data_Wrap_Struct(klass, mark_condvar, free_condvar, condvar); +} + +/* + * Document-method: wait + * call-seq: wait + * + * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. + * + */ + +static void +wait_condvar(ConditionVariable *condvar, Mutex *mutex) +{ + VALUE waking; + + rb_thread_critical = 1; + if (rb_thread_current() != mutex->owner) { + rb_thread_critical = 0; + rb_raise(rb_eThreadError, "not owner of the synchronization mutex"); + } + waking = unlock_mutex_inner(mutex); + if (RTEST(waking)) { + wake_thread(waking); + } + rb_ensure(wait_list, (VALUE)&condvar->waiting, relock_mutex, (VALUE)mutex); +} + +static VALUE +legacy_exclusive_unlock(VALUE mutex) +{ + return rb_funcall(mutex, rb_intern("exclusive_unlock"), 0); +} + +typedef struct { + ConditionVariable *condvar; + VALUE mutex; +} legacy_wait_args; + +static VALUE +legacy_wait(VALUE unused, legacy_wait_args *args) +{ + wait_list(&args->condvar->waiting); + rb_funcall(args->mutex, rb_intern("lock"), 0); + return Qnil; +} + +static VALUE +rb_condvar_wait(VALUE self, VALUE mutex_v) +{ + ConditionVariable *condvar; + Data_Get_Struct(self, ConditionVariable, condvar); + + if (CLASS_OF(mutex_v) != rb_cMutex) { + /* interoperate with legacy mutex */ + legacy_wait_args args; + args.condvar = condvar; + args.mutex = mutex_v; + rb_iterate(legacy_exclusive_unlock, mutex_v, legacy_wait, (VALUE)&args); + } else { + Mutex *mutex; + Data_Get_Struct(mutex_v, Mutex, mutex); + wait_condvar(condvar, mutex); + } + + return self; +} + +/* + * Document-method: broadcast + * call-seq: broadcast + * + * Wakes up all threads waiting for this condition. + * + */ + +static VALUE +rb_condvar_broadcast(VALUE self) +{ + ConditionVariable *condvar; + + Data_Get_Struct(self, ConditionVariable, condvar); + + rb_thread_critical = 1; + rb_ensure(wake_all, (VALUE)&condvar->waiting, set_critical, 0); + rb_thread_schedule(); + + return self; +} + +/* + * Document-method: signal + * call-seq: signal + * + * Wakes up the first thread in line waiting for this condition. + * + */ + +static void +signal_condvar(ConditionVariable *condvar) +{ + VALUE waking; + rb_thread_critical = 1; + waking = rb_ensure(wake_one, (VALUE)&condvar->waiting, set_critical, 0); + if (RTEST(waking)) { + run_thread(waking); + } +} + +static VALUE +rb_condvar_signal(VALUE self) +{ + ConditionVariable *condvar; + Data_Get_Struct(self, ConditionVariable, condvar); + signal_condvar(condvar); + return self; +} + +/* + * Document-class: Queue + * + * This class provides a way to synchronize communication between threads. + * + * Example: + * + * require 'thread' + * + * queue = Queue.new + * + * producer = Thread.new do + * 5.times do |i| + * sleep rand(i) # simulate expense + * queue << i + * puts "#{i} produced" + * end + * end + * + * consumer = Thread.new do + * 5.times do |i| + * value = queue.pop + * sleep rand(i/2) # simulate expense + * puts "consumed #{value}" + * end + * end + * + * consumer.join + * + */ + +typedef struct _Queue { + Mutex mutex; + ConditionVariable value_available; + ConditionVariable space_available; + List values; + unsigned long capacity; +} Queue; + +static void +mark_queue(Queue *queue) +{ + mark_mutex(&queue->mutex); + mark_condvar(&queue->value_available); + mark_condvar(&queue->space_available); + mark_list(&queue->values); +} + +static void +finalize_queue(Queue *queue) +{ + finalize_mutex(&queue->mutex); + finalize_condvar(&queue->value_available); + finalize_condvar(&queue->space_available); + finalize_list(&queue->values); +} + +static void +free_queue(Queue *queue) +{ + assert_no_survivors(&queue->mutex.waiting, "queue", queue); + assert_no_survivors(&queue->space_available.waiting, "queue", queue); + assert_no_survivors(&queue->value_available.waiting, "queue", queue); + finalize_queue(queue); + xfree(queue); +} + +static void +init_queue(Queue *queue) +{ + init_mutex(&queue->mutex); + init_condvar(&queue->value_available); + init_condvar(&queue->space_available); + init_list(&queue->values); + queue->capacity = 0; +} + +/* + * Document-method: new + * call-seq: new + * + * Creates a new queue. + * + */ + +static VALUE +rb_queue_alloc(VALUE klass) +{ + Queue *queue; + queue = ALLOC(Queue); + init_queue(queue); + return Data_Wrap_Struct(klass, mark_queue, free_queue, queue); +} + +static VALUE +rb_queue_marshal_load(VALUE self, VALUE data) +{ + Queue *queue; + VALUE array; + Data_Get_Struct(self, Queue, queue); + + array = rb_marshal_load(data); + if (TYPE(array) != T_ARRAY) { + rb_raise(rb_eTypeError, "expected Array of queue data"); + } + if (RARRAY(array)->len < 1) { + rb_raise(rb_eArgError, "missing capacity value"); + } + queue->capacity = NUM2ULONG(rb_ary_shift(array)); + push_multiple_list(&queue->values, RARRAY(array)->ptr, (unsigned)RARRAY(array)->len); + + return self; +} + +static VALUE +rb_queue_marshal_dump(VALUE self) +{ + Queue *queue; + VALUE array; + Data_Get_Struct(self, Queue, queue); + + array = array_from_list(&queue->values); + rb_ary_unshift(array, ULONG2NUM(queue->capacity)); + return rb_marshal_dump(array, Qnil); +} + +/* + * Document-method: clear + * call-seq: clear + * + * Removes all objects from the queue. + * + */ + +static VALUE +rb_queue_clear(VALUE self) +{ + Queue *queue; + Data_Get_Struct(self, Queue, queue); + + lock_mutex(&queue->mutex); + clear_list(&queue->values); + signal_condvar(&queue->space_available); + unlock_mutex(&queue->mutex); + + return self; +} + +/* + * Document-method: empty? + * call-seq: empty? + * + * Returns +true+ if the queue is empty. + * + */ + +static VALUE +rb_queue_empty_p(VALUE self) +{ + Queue *queue; + VALUE result; + Data_Get_Struct(self, Queue, queue); + + lock_mutex(&queue->mutex); + result = queue->values.size == 0 ? Qtrue : Qfalse; + unlock_mutex(&queue->mutex); + + return result; +} + +/* + * Document-method: length + * call-seq: length + * + * Returns the length of the queue. + * + */ + +static VALUE +rb_queue_length(VALUE self) +{ + Queue *queue; + VALUE result; + Data_Get_Struct(self, Queue, queue); + + lock_mutex(&queue->mutex); + result = ULONG2NUM(queue->values.size); + unlock_mutex(&queue->mutex); + + return result; +} + +/* + * Document-method: num_waiting + * call-seq: num_waiting + * + * Returns the number of threads waiting on the queue. + * + */ + +static VALUE +rb_queue_num_waiting(VALUE self) +{ + Queue *queue; + VALUE result; + Data_Get_Struct(self, Queue, queue); + + lock_mutex(&queue->mutex); + result = ULONG2NUM(queue->value_available.waiting.size + + queue->space_available.waiting.size); + unlock_mutex(&queue->mutex); + + return result; +} + +/* + * Document-method: pop + * call_seq: pop(non_block=false) + * + * Retrieves data from the queue. If the queue is empty, the calling thread is + * suspended until data is pushed onto the queue. If +non_block+ is true, the + * thread isn't suspended, and an exception is raised. + * + */ + +static VALUE +rb_queue_pop(int argc, VALUE *argv, VALUE self) +{ + Queue *queue; + int should_block; + VALUE result; + Data_Get_Struct(self, Queue, queue); + + if (argc == 0) { + should_block = 1; + } else if (argc == 1) { + should_block = !RTEST(argv[0]); + } else { + rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc); + } + + lock_mutex(&queue->mutex); + if (!queue->values.entries && !should_block) { + unlock_mutex(&queue->mutex); + rb_raise(rb_eThreadError, "queue empty"); + } + + while (!queue->values.entries) { + wait_condvar(&queue->value_available, &queue->mutex); + } + + result = shift_list(&queue->values); + if (queue->capacity && queue->values.size < queue->capacity) { + signal_condvar(&queue->space_available); + } + unlock_mutex(&queue->mutex); + + return result; +} + +/* + * Document-method: push + * call-seq: push(obj) + * + * Pushes +obj+ to the queue. + * + */ + +static VALUE +rb_queue_push(VALUE self, VALUE value) +{ + Queue *queue; + Data_Get_Struct(self, Queue, queue); + + lock_mutex(&queue->mutex); + while (queue->capacity && queue->values.size >= queue->capacity) { + wait_condvar(&queue->space_available, &queue->mutex); + } + push_list(&queue->values, value); + signal_condvar(&queue->value_available); + unlock_mutex(&queue->mutex); + + return self; +} + +/* + * Document-class: SizedQueue + * + * This class represents queues of specified size capacity. The push operation + * may be blocked if the capacity is full. + * + * See Queue for an example of how a SizedQueue works. + * + */ + +/* + * Document-method: new + * call-seq: new + * + * Creates a fixed-length queue with a maximum size of +max+. + * + */ + +/* + * Document-method: max + * call-seq: max + * + * Returns the maximum size of the queue. + * + */ + +static VALUE +rb_sized_queue_max(VALUE self) +{ + Queue *queue; + VALUE result; + Data_Get_Struct(self, Queue, queue); + + lock_mutex(&queue->mutex); + result = ULONG2NUM(queue->capacity); + unlock_mutex(&queue->mutex); + + return result; +} + +/* + * Document-method: max= + * call-seq: max=(size) + * + * Sets the maximum size of the queue. + * + */ + +static VALUE +rb_sized_queue_max_set(VALUE self, VALUE value) +{ + Queue *queue; + unsigned long new_capacity; + unsigned long difference; + Data_Get_Struct(self, Queue, queue); + + new_capacity = NUM2ULONG(value); + + if (new_capacity < 1) { + rb_raise(rb_eArgError, "value must be positive"); + } + + lock_mutex(&queue->mutex); + if (queue->capacity && new_capacity > queue->capacity) { + difference = new_capacity - queue->capacity; + } else { + difference = 0; + } + queue->capacity = new_capacity; + for (; difference > 0; --difference) { + signal_condvar(&queue->space_available); + } + unlock_mutex(&queue->mutex); + + return self; +} + +/* + * Document-method: push + * call-seq: push(obj) + * + * Pushes +obj+ to the queue. If there is no space left in the queue, waits + * until space becomes available. + * + */ + +/* + * Document-method: pop + * call-seq: pop(non_block=false) + * + * Retrieves data from the queue and runs a waiting thread, if any. + * + */ + +/* for marshalling mutexes and condvars */ + +static VALUE +dummy_load(VALUE self, VALUE string) +{ + return Qnil; +} + +static VALUE +dummy_dump(VALUE self) +{ + return rb_str_new2(""); +} + +void +Init_thread(void) +{ + rb_define_singleton_method(rb_cThread, "exclusive", rb_thread_exclusive, 0); + + rb_cMutex = rb_define_class("Mutex", rb_cObject); + rb_define_alloc_func(rb_cMutex, rb_mutex_alloc); + rb_define_method(rb_cMutex, "marshal_load", dummy_load, 1); + rb_define_method(rb_cMutex, "marshal_dump", dummy_dump, 0); + rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); + rb_define_method(rb_cMutex, "try_lock", rb_mutex_try_lock, 0); + rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); + rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); + rb_define_method(rb_cMutex, "exclusive_unlock", rb_mutex_exclusive_unlock, 0); + rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize, 0); + + rb_cConditionVariable = rb_define_class("ConditionVariable", rb_cObject); + rb_define_alloc_func(rb_cConditionVariable, rb_condvar_alloc); + rb_define_method(rb_cConditionVariable, "marshal_load", dummy_load, 1); + rb_define_method(rb_cConditionVariable, "marshal_dump", dummy_dump, 0); + rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, 1); + rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0); + rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0); + + rb_cQueue = rb_define_class("Queue", rb_cObject); + rb_define_alloc_func(rb_cQueue, rb_queue_alloc); + rb_define_method(rb_cQueue, "marshal_load", rb_queue_marshal_load, 1); + rb_define_method(rb_cQueue, "marshal_dump", rb_queue_marshal_dump, 0); + rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); + rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); + rb_define_method(rb_cQueue, "length", rb_queue_length, 0); + rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); + rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); + rb_define_method(rb_cQueue, "push", rb_queue_push, 1); + rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push")); + rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push")); + rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop")); + rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop")); + rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length")); + + rb_cSizedQueue = rb_define_class("SizedQueue", rb_cQueue); + rb_define_method(rb_cSizedQueue, "initialize", rb_sized_queue_max_set, 1); + rb_define_method(rb_cSizedQueue, "num_waiting", rb_queue_num_waiting, 0); + rb_define_method(rb_cSizedQueue, "pop", rb_queue_pop, -1); + rb_define_method(rb_cSizedQueue, "push", rb_queue_push, 1); + rb_define_method(rb_cSizedQueue, "max", rb_sized_queue_max, 0); + rb_define_method(rb_cSizedQueue, "max=", rb_sized_queue_max_set, 1); + rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push")); + rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push")); + rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop")); + rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop")); +} + |