#include enum { CONDVAR_WAITERS = 0 }; enum { QUEUE_QUE = 0, QUEUE_WAITERS = 1, SZQUEUE_WAITERS = 2, SZQUEUE_MAX = 3 }; #define GET_CONDVAR_WAITERS(cv) RSTRUCT_GET((cv), CONDVAR_WAITERS) #define GET_QUEUE_QUE(q) RSTRUCT_GET((q), QUEUE_QUE) #define GET_QUEUE_WAITERS(q) RSTRUCT_GET((q), QUEUE_WAITERS) #define GET_SZQUEUE_WAITERS(q) RSTRUCT_GET((q), SZQUEUE_WAITERS) #define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX) #define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q)) static VALUE ary_buf_new(void) { return rb_ary_tmp_new(1); } static void wakeup_first_thread(VALUE list) { VALUE thread; while (!NIL_P(thread = rb_ary_shift(list))) { if (RTEST(rb_thread_wakeup_alive(thread))) break; } } static void wakeup_all_threads(VALUE list) { VALUE thread; long i; for (i=0; imutex, id_sleep, 1, &p->timeout); } static VALUE delete_current_thread(VALUE ary) { return rb_ary_delete(ary, rb_thread_current()); } /* * Document-method: wait * call-seq: wait(mutex, timeout=nil) * * Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. * * If +timeout+ is given, this method returns after +timeout+ seconds passed, * even if no other thread doesn't signal. */ static VALUE rb_condvar_wait(int argc, VALUE *argv, VALUE self) { VALUE waiters = GET_CONDVAR_WAITERS(self); VALUE mutex, timeout; struct sleep_call args; rb_scan_args(argc, argv, "11", &mutex, &timeout); args.mutex = mutex; args.timeout = timeout; rb_ary_push(waiters, rb_thread_current()); rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters); return self; } /* * Document-method: signal * call-seq: signal * * Wakes up the first thread in line waiting for this lock. */ static VALUE rb_condvar_signal(VALUE self) { wakeup_first_thread(GET_CONDVAR_WAITERS(self)); return self; } /* * Document-method: broadcast * call-seq: broadcast * * Wakes up all threads waiting for this lock. */ static VALUE rb_condvar_broadcast(VALUE self) { wakeup_all_threads(GET_CONDVAR_WAITERS(self)); return self; } /* * Document-class: Queue * * This class provides a way to synchronize communication between threads. * * Example: * * require 'thread' * queue = Queue.new * * producer = Thread.new do * 5.times do |i| * sleep rand(i) # simulate expense * queue << i * puts "#{i} produced" * end * end * * consumer = Thread.new do * 5.times do |i| * value = queue.pop * sleep rand(i/2) # simulate expense * puts "consumed #{value}" * end * end * */ /* * Document-method: new * call-seq: new * * Creates a new queue. */ static VALUE rb_queue_initialize(VALUE self) { RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); return self; } static VALUE queue_do_push(VALUE self, VALUE obj) { rb_ary_push(GET_QUEUE_QUE(self), obj); wakeup_first_thread(GET_QUEUE_WAITERS(self)); return self; } /* * Document-method: push * call-seq: push(obj) * * Pushes +obj+ to the queue. */ static VALUE rb_queue_push(VALUE self, VALUE obj) { return queue_do_push(self, obj); } static unsigned long queue_length(VALUE self) { return RARRAY_LEN(GET_QUEUE_QUE(self)); } static unsigned long queue_num_waiting(VALUE self) { return RARRAY_LEN(GET_QUEUE_WAITERS(self)); } struct waiting_delete { VALUE waiting; VALUE th; }; static VALUE queue_delete_from_waiting(struct waiting_delete *p) { rb_ary_delete(p->waiting, p->th); return Qnil; } static VALUE queue_do_pop(VALUE self, VALUE should_block) { struct waiting_delete args; args.waiting = GET_QUEUE_WAITERS(self); args.th = rb_thread_current(); while (queue_length(self) == 0) { if (!(int)should_block) { rb_raise(rb_eThreadError, "queue empty"); } rb_ary_push(args.waiting, args.th); rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } return rb_ary_shift(GET_QUEUE_QUE(self)); } static VALUE queue_pop_should_block(int argc, VALUE *argv) { VALUE should_block = Qtrue; switch (argc) { case 0: break; case 1: should_block = RTEST(argv[0]) ? Qtrue : Qfalse; break; default: rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc); } return should_block; } /* * Document-method: pop * call_seq: pop(non_block=false) * * Retrieves data from the queue. If the queue is empty, the calling thread is * suspended until data is pushed onto the queue. If +non_block+ is true, the * thread isn't suspended, and an exception is raised. */ static VALUE rb_queue_pop(int argc, VALUE *argv, VALUE self) { VALUE should_block = queue_pop_should_block(argc, argv); return queue_do_pop(self, should_block); } /* * Document-method: empty? * call-seq: empty? * * Returns +true+ if the queue is empty. */ static VALUE rb_queue_empty_p(VALUE self) { return queue_length(self) == 0 ? Qtrue : Qfalse; } /* * Document-method: clear * call-seq: clear * * Removes all objects from the queue. */ static VALUE rb_queue_clear(VALUE self) { rb_ary_clear(GET_QUEUE_QUE(self)); return self; } /* * Document-method: length * call-seq: length * * Returns the length of the queue. */ static VALUE rb_queue_length(VALUE self) { unsigned long len = queue_length(self); return ULONG2NUM(len); } /* * Document-method: num_waiting * call-seq: num_waiting * * Returns the number of threads waiting on the queue. */ static VALUE rb_queue_num_waiting(VALUE self) { unsigned long len = queue_num_waiting(self); return ULONG2NUM(len); } /* * Document-class: SizedQueue * * This class represents queues of specified size capacity. The push operation * may be blocked if the capacity is full. * * See Queue for an example of how a SizedQueue works. */ /* * Document-method: new * call-seq: new(max) * * Creates a fixed-length queue with a maximum size of +max+. */ static VALUE rb_szqueue_initialize(VALUE self, VALUE vmax) { long max; max = NUM2LONG(vmax); if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new()); RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new()); RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new()); RSTRUCT_SET(self, SZQUEUE_MAX, vmax); return self; } /* * Document-method: max * call-seq: max * * Returns the maximum size of the queue. */ static VALUE rb_szqueue_max_get(VALUE self) { return GET_SZQUEUE_MAX(self); } /* * Document-method: max= * call-seq: max=(n) * * Sets the maximum size of the queue. */ static VALUE rb_szqueue_max_set(VALUE self, VALUE vmax) { long max = NUM2LONG(vmax), diff = 0; VALUE t; if (max <= 0) { rb_raise(rb_eArgError, "queue size must be positive"); } if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) { diff = max - GET_SZQUEUE_ULONGMAX(self); } RSTRUCT_SET(self, SZQUEUE_MAX, vmax); while (diff > 0 && !NIL_P(t = rb_ary_shift(GET_QUEUE_QUE(self)))) { rb_thread_wakeup_alive(t); } return vmax; } /* * Document-method: push * call-seq: push(obj) * * Pushes +obj+ to the queue. If there is no space left in the queue, waits * until space becomes available. */ static VALUE rb_szqueue_push(VALUE self, VALUE obj) { struct waiting_delete args; args.waiting = GET_QUEUE_WAITERS(self); args.th = rb_thread_current(); while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) { rb_ary_push(args.waiting, args.th); rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); } return queue_do_push(self, obj); } static VALUE szqueue_do_pop(VALUE self, VALUE should_block) { VALUE retval = queue_do_pop(self, should_block); if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) { wakeup_first_thread(GET_SZQUEUE_WAITERS(self)); } return retval; } /* * Document-method: pop * call_seq: pop(non_block=false) * * Returns the number of threads waiting on the queue. */ static VALUE rb_szqueue_pop(int argc, VALUE *argv, VALUE self) { VALUE should_block = queue_pop_should_block(argc, argv); return szqueue_do_pop(self, should_block); } /* * Document-method: pop * call_seq: pop(non_block=false) * * Returns the number of threads waiting on the queue. */ static VALUE rb_szqueue_num_waiting(VALUE self) { long len = queue_num_waiting(self); len += RARRAY_LEN(GET_SZQUEUE_WAITERS(self)); return ULONG2NUM(len); } #ifndef UNDER_THREAD #define UNDER_THREAD 1 #endif void Init_thread(void) { #if UNDER_THREAD #define ALIAS_GLOBCAL_CONST(name) do { \ ID id = rb_intern_const(#name); \ if (!rb_const_defined_at(rb_cObject, id)) { \ rb_const_set(rb_cObject, id, rb_c##name); \ } \ } while (0) #else #define ALIAS_GLOBCAL_CONST(name) do { /* nothing */ } while (0) #endif VALUE rb_cConditionVariable = rb_struct_define_without_accessor_under( UNDER_THREAD ? rb_cThread : 0, "ConditionVariable", rb_cObject, rb_struct_alloc_noinit, "waiters", NULL); VALUE rb_cQueue = rb_struct_define_without_accessor_under( UNDER_THREAD ? rb_cThread : 0, "Queue", rb_cObject, rb_struct_alloc_noinit, "que", "waiters", NULL); VALUE rb_cSizedQueue = rb_struct_define_without_accessor_under( UNDER_THREAD ? rb_cThread : 0, "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, "que", "waiters", "queue_waiters", "size", NULL); id_sleep = rb_intern("sleep"); rb_define_method(rb_cConditionVariable, "initialize", rb_condvar_initialize, 0); rb_define_method(rb_cConditionVariable, "wait", rb_condvar_wait, -1); rb_define_method(rb_cConditionVariable, "signal", rb_condvar_signal, 0); rb_define_method(rb_cConditionVariable, "broadcast", rb_condvar_broadcast, 0); rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0); rb_define_method(rb_cQueue, "push", rb_queue_push, 1); rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0); rb_define_method(rb_cQueue, "length", rb_queue_length, 0); rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0); rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push")); rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push")); rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop")); rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop")); rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length")); rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, 1); rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); rb_alias(rb_cSizedQueue, rb_intern("enq"), rb_intern("push")); rb_alias(rb_cSizedQueue, rb_intern("<<"), rb_intern("push")); rb_alias(rb_cSizedQueue, rb_intern("deq"), rb_intern("pop")); rb_alias(rb_cSizedQueue, rb_intern("shift"), rb_intern("pop")); rb_provide("thread.rb"); ALIAS_GLOBCAL_CONST(ConditionVariable); ALIAS_GLOBCAL_CONST(Queue); ALIAS_GLOBCAL_CONST(SizedQueue); }