summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authornormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2017-05-19 18:53:11 +0000
committernormal <normal@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2017-05-19 18:53:11 +0000
commitea1ce47fd7f2bc9023e9a1391dbadcfaf9e892ce (patch)
treed8530b4d2143ab9ea3597b71061b5230c6abbb6a
parent44e48eca5ffaa2ca70161e6e1cde347d64aa2ed2 (diff)
thread_sync.c: rewrite the rest using using ccan/list
The performance improvement increases as the number of waiters increases, due to avoiding the O(n) behavior of rb_ary_delete on the waiting thread. Uncontended queues and condition variables performance is not altered significantly. Function entry cost is slightly increased for ConditionVariable, since the data pointer is separately allocated and not embedded into the RVALUE slot. [ruby-core:81235] [Feature #13552] name |trunk |built ----------------------|------:|------: vm_thread_condvar1 | 0.858| 0.858 vm_thread_condvar2 | 1.003| 0.804 vm_thread_queue | 0.131| 0.129 vm_thread_sized_queue | 0.265| 0.251 vm_thread_sized_queue2| 0.892| 0.859 vm_thread_sized_queue3| 0.879| 0.845 vm_thread_sized_queue4| 0.599| 0.486 Speedup ratio: compare with the result of `trunk' (greater is better) name |built ----------------------|------: vm_thread_condvar1 | 0.999 vm_thread_condvar2 | 1.246 vm_thread_queue | 1.020 vm_thread_sized_queue | 1.057 vm_thread_sized_queue2| 1.039 vm_thread_sized_queue3| 1.041 vm_thread_sized_queue4| 1.233 git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@58805 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
-rw-r--r--thread_sync.c477
1 files changed, 319 insertions, 158 deletions
diff --git a/thread_sync.c b/thread_sync.c
index b702f6aab0..971217c9a3 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -4,8 +4,6 @@
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
-/* Mutex */
-
/* sync_waiter is always on-stack */
struct sync_waiter {
rb_thread_t *th;
@@ -14,6 +12,38 @@ struct sync_waiter {
#define MUTEX_ALLOW_TRAP FL_USER1
+static int
+wakeup_one(struct list_head *head)
+{
+ struct sync_waiter *cur = 0, *next = 0;
+
+ list_for_each_safe(head, cur, next, node) {
+ list_del_init(&cur->node);
+ if (cur->th->status != THREAD_KILLED) {
+ rb_threadptr_interrupt(cur->th);
+ cur->th->status = THREAD_RUNNABLE;
+ return TRUE;
+ }
+ }
+ return FALSE;
+}
+
+static void
+wakeup_all(struct list_head *head)
+{
+ struct sync_waiter *cur = 0, *next = 0;
+
+ list_for_each_safe(head, cur, next, node) {
+ list_del_init(&cur->node);
+ if (cur->th->status != THREAD_KILLED) {
+ rb_threadptr_interrupt(cur->th);
+ cur->th->status = THREAD_RUNNABLE;
+ }
+ }
+}
+
+/* Mutex */
+
typedef struct rb_mutex_struct {
struct rb_thread_struct volatile *th;
struct rb_mutex_struct *next_mutex;
@@ -491,80 +521,121 @@ void rb_mutex_allow_trap(VALUE self, int val)
/* Queue */
-enum {
- QUEUE_QUE,
- QUEUE_WAITERS,
- SZQUEUE_WAITERS,
- SZQUEUE_MAX,
- END_QUEUE
-};
+PACKED_STRUCT_UNALIGNED(struct rb_queue {
+ struct list_head waitq;
+ const VALUE que;
+ int num_waiting;
+});
-#define QUEUE_CLOSED FL_USER5
+PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
+ struct rb_queue q;
+ int num_waiting_push;
+ struct list_head pushq;
+ long max;
+});
-#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
-#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
-#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
-#define GET_SZQUEUE_MAX(q) RSTRUCT_GET((q), SZQUEUE_MAX)
-#define GET_SZQUEUE_ULONGMAX(q) NUM2ULONG(GET_SZQUEUE_MAX(q))
+static void
+queue_mark(void *ptr)
+{
+ struct rb_queue *q = ptr;
-static VALUE
-ary_buf_new(void)
+ /* no need to mark threads in waitq, they are on stack */
+ rb_gc_mark(q->que);
+}
+
+static size_t
+queue_memsize(const void *ptr)
{
- return rb_ary_tmp_new(1);
+ return sizeof(struct rb_queue);
}
+static const rb_data_type_t queue_data_type = {
+ "queue",
+ {queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
+};
+
static VALUE
-get_array(VALUE obj, int idx)
+queue_alloc(VALUE klass)
{
- VALUE ary = RSTRUCT_GET(obj, idx);
- if (!RB_TYPE_P(ary, T_ARRAY)) {
- rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
- }
- return ary;
+ VALUE obj;
+ struct rb_queue *q;
+
+ obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
+ list_head_init(&q->waitq);
+ return obj;
}
-static void
-wakeup_first_thread(VALUE list)
+static struct rb_queue *
+queue_ptr(VALUE obj)
{
- VALUE thread;
+ struct rb_queue *q;
- while (!NIL_P(thread = rb_ary_shift(list))) {
- if (RTEST(rb_thread_wakeup_alive(thread))) break;
- }
+ TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q);
+ return q;
}
+#define QUEUE_CLOSED FL_USER5
+
static void
-wakeup_all_threads(VALUE list)
+szqueue_mark(void *ptr)
{
- VALUE thread;
- long i;
+ struct rb_szqueue *sq = ptr;
- for (i=0; i<RARRAY_LEN(list); i++) {
- thread = RARRAY_AREF(list, i);
- rb_thread_wakeup_alive(thread);
- }
- rb_ary_clear(list);
+ queue_mark(&sq->q);
+}
+
+static size_t
+szqueue_memsize(const void *ptr)
+{
+ return sizeof(struct rb_szqueue);
+}
+
+static const rb_data_type_t szqueue_data_type = {
+ "sized_queue",
+ {szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
+};
+
+static VALUE
+szqueue_alloc(VALUE klass)
+{
+ struct rb_szqueue *sq;
+ VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
+ &szqueue_data_type, sq);
+ list_head_init(&sq->q.waitq);
+ list_head_init(&sq->pushq);
+ return obj;
+}
+
+static struct rb_szqueue *
+szqueue_ptr(VALUE obj)
+{
+ struct rb_szqueue *sq;
+
+ TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq);
+ return sq;
}
-static unsigned long
-queue_length(VALUE self)
+static VALUE
+ary_buf_new(void)
{
- VALUE que = GET_QUEUE_QUE(self);
- return RARRAY_LEN(que);
+ return rb_ary_tmp_new(1);
}
-static unsigned long
-queue_num_waiting(VALUE self)
+static VALUE
+check_array(VALUE obj, VALUE ary)
{
- VALUE waiters = GET_QUEUE_WAITERS(self);
- return RARRAY_LEN(waiters);
+ if (!RB_TYPE_P(ary, T_ARRAY)) {
+ rb_raise(rb_eTypeError, "%+"PRIsVALUE" not initialized", obj);
+ }
+ return ary;
}
-static unsigned long
-szqueue_num_waiting_producer(VALUE self)
+static long
+queue_length(VALUE self, struct rb_queue *q)
{
- VALUE waiters = GET_SZQUEUE_WAITERS(self);
- return RARRAY_LEN(waiters);
+ return RARRAY_LEN(check_array(self, q->que));
}
static int
@@ -580,32 +651,12 @@ raise_closed_queue_error(VALUE self)
}
static VALUE
-queue_closed_result(VALUE self)
+queue_closed_result(VALUE self, struct rb_queue *q)
{
- assert(queue_length(self) == 0);
+ assert(queue_length(self, q) == 0);
return Qnil;
}
-static VALUE
-queue_do_close(VALUE self, int is_szq)
-{
- if (!queue_closed_p(self)) {
- FL_SET(self, QUEUE_CLOSED);
-
- if (queue_num_waiting(self) > 0) {
- VALUE waiters = GET_QUEUE_WAITERS(self);
- wakeup_all_threads(waiters);
- }
-
- if (is_szq && szqueue_num_waiting_producer(self) > 0) {
- VALUE waiters = GET_SZQUEUE_WAITERS(self);
- wakeup_all_threads(waiters);
- }
- }
-
- return self;
-}
-
/*
* Document-class: Queue
*
@@ -649,19 +700,20 @@ queue_do_close(VALUE self, int is_szq)
static VALUE
rb_queue_initialize(VALUE self)
{
- RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
- RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
+ struct rb_queue *q = queue_ptr(self);
+ RB_OBJ_WRITE(self, &q->que, ary_buf_new());
+ list_head_init(&q->waitq);
return self;
}
static VALUE
-queue_do_push(VALUE self, VALUE obj)
+queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
{
if (queue_closed_p(self)) {
raise_closed_queue_error(self);
}
- rb_ary_push(GET_QUEUE_QUE(self), obj);
- wakeup_first_thread(GET_QUEUE_WAITERS(self));
+ rb_ary_push(check_array(self, q->que), obj);
+ wakeup_one(&q->waitq);
return self;
}
@@ -699,7 +751,15 @@ queue_do_push(VALUE self, VALUE obj)
static VALUE
rb_queue_close(VALUE self)
{
- return queue_do_close(self, FALSE);
+ struct rb_queue *q = queue_ptr(self);
+
+ if (!queue_closed_p(self)) {
+ FL_SET(self, QUEUE_CLOSED);
+
+ wakeup_all(&q->waitq);
+ }
+
+ return self;
}
/*
@@ -728,52 +788,74 @@ rb_queue_closed_p(VALUE self)
static VALUE
rb_queue_push(VALUE self, VALUE obj)
{
- return queue_do_push(self, obj);
+ return queue_do_push(self, queue_ptr(self), obj);
+}
+
+static VALUE
+queue_sleep(VALUE arg)
+{
+ rb_thread_sleep_deadly_allow_spurious_wakeup();
+ return Qnil;
}
-struct waiting_delete {
- VALUE waiting;
- VALUE th;
+struct queue_waiter {
+ struct sync_waiter w;
+ union {
+ struct rb_queue *q;
+ struct rb_szqueue *sq;
+ } as;
};
static VALUE
-queue_delete_from_waiting(struct waiting_delete *p)
+queue_sleep_done(VALUE p)
{
- rb_ary_delete(p->waiting, p->th);
- return Qnil;
+ struct queue_waiter *qw = (struct queue_waiter *)p;
+
+ list_del(&qw->w.node);
+ qw->as.q->num_waiting--;
+
+ return Qfalse;
}
static VALUE
-queue_sleep(VALUE arg)
+szqueue_sleep_done(VALUE p)
{
- rb_thread_sleep_deadly_allow_spurious_wakeup();
- return Qnil;
+ struct queue_waiter *qw = (struct queue_waiter *)p;
+
+ list_del(&qw->w.node);
+ qw->as.sq->num_waiting_push--;
+
+ return Qfalse;
}
static VALUE
-queue_do_pop(VALUE self, int should_block)
+queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
{
- struct waiting_delete args;
- args.waiting = GET_QUEUE_WAITERS(self);
- args.th = rb_thread_current();
+ check_array(self, q->que);
- while (queue_length(self) == 0) {
+ while (RARRAY_LEN(q->que) == 0) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
else if (queue_closed_p(self)) {
- return queue_closed_result(self);
+ return queue_closed_result(self, q);
}
else {
- assert(queue_length(self) == 0);
+ struct queue_waiter qw;
+
+ assert(RARRAY_LEN(q->que) == 0);
assert(queue_closed_p(self) == 0);
- rb_ary_push(args.waiting, args.th);
- rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
+ qw.w.th = GET_THREAD();
+ qw.as.q = q;
+ list_add_tail(&qw.as.q->waitq, &qw.w.node);
+ qw.as.q->num_waiting++;
+
+ rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
}
}
- return rb_ary_shift(GET_QUEUE_QUE(self));
+ return rb_ary_shift(q->que);
}
static int
@@ -805,7 +887,7 @@ static VALUE
rb_queue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
- return queue_do_pop(self, should_block);
+ return queue_do_pop(self, queue_ptr(self), should_block);
}
/*
@@ -818,7 +900,7 @@ rb_queue_pop(int argc, VALUE *argv, VALUE self)
static VALUE
rb_queue_empty_p(VALUE self)
{
- return queue_length(self) == 0 ? Qtrue : Qfalse;
+ return queue_length(self, queue_ptr(self)) == 0 ? Qtrue : Qfalse;
}
/*
@@ -830,7 +912,9 @@ rb_queue_empty_p(VALUE self)
static VALUE
rb_queue_clear(VALUE self)
{
- rb_ary_clear(GET_QUEUE_QUE(self));
+ struct rb_queue *q = queue_ptr(self);
+
+ rb_ary_clear(check_array(self, q->que));
return self;
}
@@ -846,8 +930,7 @@ rb_queue_clear(VALUE self)
static VALUE
rb_queue_length(VALUE self)
{
- unsigned long len = queue_length(self);
- return ULONG2NUM(len);
+ return LONG2NUM(queue_length(self, queue_ptr(self)));
}
/*
@@ -859,8 +942,9 @@ rb_queue_length(VALUE self)
static VALUE
rb_queue_num_waiting(VALUE self)
{
- unsigned long len = queue_num_waiting(self);
- return ULONG2NUM(len);
+ struct rb_queue *q = queue_ptr(self);
+
+ return INT2NUM(q->num_waiting);
}
/*
@@ -883,16 +967,17 @@ static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
long max;
+ struct rb_szqueue *sq = szqueue_ptr(self);
max = NUM2LONG(vmax);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
- RSTRUCT_SET(self, QUEUE_QUE, ary_buf_new());
- RSTRUCT_SET(self, QUEUE_WAITERS, ary_buf_new());
- RSTRUCT_SET(self, SZQUEUE_WAITERS, ary_buf_new());
- RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
+ RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
+ list_head_init(&sq->q.waitq);
+ list_head_init(&sq->pushq);
+ sq->max = max;
return self;
}
@@ -912,7 +997,14 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
static VALUE
rb_szqueue_close(VALUE self)
{
- return queue_do_close(self, TRUE);
+ if (!queue_closed_p(self)) {
+ struct rb_szqueue *sq = szqueue_ptr(self);
+
+ FL_SET(self, QUEUE_CLOSED);
+ wakeup_all(&sq->q.waitq);
+ wakeup_all(&sq->pushq);
+ }
+ return self;
}
/*
@@ -924,7 +1016,7 @@ rb_szqueue_close(VALUE self)
static VALUE
rb_szqueue_max_get(VALUE self)
{
- return GET_SZQUEUE_MAX(self);
+ return LONG2NUM(szqueue_ptr(self)->max);
}
/*
@@ -937,18 +1029,19 @@ rb_szqueue_max_get(VALUE self)
static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
- long max = NUM2LONG(vmax), diff = 0;
- VALUE t;
+ long max = NUM2LONG(vmax);
+ long diff = 0;
+ struct rb_szqueue *sq = szqueue_ptr(self);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
- if ((unsigned long)max > GET_SZQUEUE_ULONGMAX(self)) {
- diff = max - GET_SZQUEUE_ULONGMAX(self);
+ if (max > sq->max) {
+ diff = max - sq->max;
}
- RSTRUCT_SET(self, SZQUEUE_MAX, vmax);
- while (diff-- > 0 && !NIL_P(t = rb_ary_shift(GET_SZQUEUE_WAITERS(self)))) {
- rb_thread_wakeup_alive(t);
+ sq->max = max;
+ while (diff-- > 0 && wakeup_one(&sq->pushq)) {
+ /* keep waking more up */
}
return vmax;
}
@@ -981,12 +1074,10 @@ szqueue_push_should_block(int argc, const VALUE *argv)
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
- struct waiting_delete args;
+ struct rb_szqueue *sq = szqueue_ptr(self);
int should_block = szqueue_push_should_block(argc, argv);
- args.waiting = GET_SZQUEUE_WAITERS(self);
- args.th = rb_thread_current();
- while (queue_length(self) >= GET_SZQUEUE_ULONGMAX(self)) {
+ while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
@@ -994,8 +1085,14 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
goto closed;
}
else {
- rb_ary_push(args.waiting, args.th);
- rb_ensure(queue_sleep, Qfalse, queue_delete_from_waiting, (VALUE)&args);
+ struct queue_waiter qw;
+
+ qw.w.th = GET_THREAD();
+ qw.as.sq = sq;
+ list_add_tail(&sq->pushq, &qw.w.node);
+ sq->num_waiting_push++;
+
+ rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
}
}
@@ -1004,16 +1101,17 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
raise_closed_queue_error(self);
}
- return queue_do_push(self, argv[0]);
+ return queue_do_push(self, &sq->q, argv[0]);
}
static VALUE
szqueue_do_pop(VALUE self, int should_block)
{
- VALUE retval = queue_do_pop(self, should_block);
+ struct rb_szqueue *sq = szqueue_ptr(self);
+ VALUE retval = queue_do_pop(self, &sq->q, should_block);
- if (queue_length(self) < GET_SZQUEUE_ULONGMAX(self)) {
- wakeup_first_thread(GET_SZQUEUE_WAITERS(self));
+ if (queue_length(self, &sq->q) < sq->max) {
+ wakeup_one(&sq->pushq);
}
return retval;
@@ -1049,11 +1147,21 @@ rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
static VALUE
rb_szqueue_clear(VALUE self)
{
- rb_ary_clear(GET_QUEUE_QUE(self));
- wakeup_all_threads(GET_SZQUEUE_WAITERS(self));
+ struct rb_szqueue *sq = szqueue_ptr(self);
+
+ rb_ary_clear(check_array(self, sq->q.que));
+ wakeup_all(&sq->pushq);
return self;
}
+static VALUE
+rb_szqueue_length(VALUE self)
+{
+ struct rb_szqueue *sq = szqueue_ptr(self);
+
+ return LONG2NUM(queue_length(self, &sq->q));
+}
+
/*
* Document-method: SizedQueue#num_waiting
*
@@ -1063,18 +1171,32 @@ rb_szqueue_clear(VALUE self)
static VALUE
rb_szqueue_num_waiting(VALUE self)
{
- long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
- return ULONG2NUM(len);
+ struct rb_szqueue *sq = szqueue_ptr(self);
+
+ return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}
-/* ConditionalVariable */
+/*
+ * Document-method: SizedQueue#empty?
+ * call-seq: empty?
+ *
+ * Returns +true+ if the queue is empty.
+ */
-enum {
- CONDVAR_WAITERS,
- END_CONDVAR
-};
+static VALUE
+rb_szqueue_empty_p(VALUE self)
+{
+ struct rb_szqueue *sq = szqueue_ptr(self);
-#define GET_CONDVAR_WAITERS(cv) get_array((cv), CONDVAR_WAITERS)
+ return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
+}
+
+
+/* ConditionalVariable */
+/* TODO: maybe this can be IMEMO */
+struct rb_condvar {
+ struct list_head waitq;
+};
/*
* Document-class: ConditionVariable
@@ -1106,6 +1228,40 @@ enum {
* }
*/
+static size_t
+condvar_memsize(const void *ptr)
+{
+ return sizeof(struct rb_condvar);
+}
+
+static const rb_data_type_t cv_data_type = {
+ "condvar",
+ {0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
+ 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
+};
+
+static struct rb_condvar *
+condvar_ptr(VALUE self)
+{
+ struct rb_condvar *cv;
+
+ TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv);
+
+ return cv;
+}
+
+static VALUE
+condvar_alloc(VALUE klass)
+{
+ struct rb_condvar *cv;
+ VALUE obj;
+
+ obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
+ list_head_init(&cv->waitq);
+
+ return obj;
+}
+
/*
* Document-method: ConditionVariable::new
*
@@ -1115,7 +1271,8 @@ enum {
static VALUE
rb_condvar_initialize(VALUE self)
{
- RSTRUCT_SET(self, CONDVAR_WAITERS, ary_buf_new());
+ struct rb_condvar *cv = condvar_ptr(self);;
+ list_head_init(&cv->waitq);
return self;
}
@@ -1134,9 +1291,11 @@ do_sleep(VALUE args)
}
static VALUE
-delete_current_thread(VALUE ary)
+delete_from_waitq(struct sync_waiter *w)
{
- return rb_ary_delete(ary, rb_thread_current());
+ list_del(&w->node);
+
+ return Qnil;
}
/*
@@ -1152,16 +1311,18 @@ delete_current_thread(VALUE ary)
static VALUE
rb_condvar_wait(int argc, VALUE *argv, VALUE self)
{
- VALUE waiters = GET_CONDVAR_WAITERS(self);
+ struct rb_condvar *cv = condvar_ptr(self);
VALUE mutex, timeout;
struct sleep_call args;
+ struct sync_waiter w;
rb_scan_args(argc, argv, "11", &mutex, &timeout);
args.mutex = mutex;
args.timeout = timeout;
- rb_ary_push(waiters, rb_thread_current());
- rb_ensure(do_sleep, (VALUE)&args, delete_current_thread, waiters);
+ w.th = GET_THREAD();
+ list_add_tail(&cv->waitq, &w.node);
+ rb_ensure(do_sleep, (VALUE)&args, delete_from_waitq, (VALUE)&w);
return self;
}
@@ -1175,7 +1336,8 @@ rb_condvar_wait(int argc, VALUE *argv, VALUE self)
static VALUE
rb_condvar_signal(VALUE self)
{
- wakeup_first_thread(GET_CONDVAR_WAITERS(self));
+ struct rb_condvar *cv = condvar_ptr(self);
+ wakeup_one(&cv->waitq);
return self;
}
@@ -1188,7 +1350,8 @@ rb_condvar_signal(VALUE self)
static VALUE
rb_condvar_broadcast(VALUE self)
{
- wakeup_all_threads(GET_CONDVAR_WAITERS(self));
+ struct rb_condvar *cv = condvar_ptr(self);
+ wakeup_all(&cv->waitq);
return self;
}
@@ -1228,10 +1391,8 @@ Init_thread_sync(void)
rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0);
/* Queue */
- rb_cQueue = rb_struct_define_without_accessor_under(
- rb_cThread,
- "Queue", rb_cObject, rb_struct_alloc_noinit,
- "que", "waiters", NULL);
+ rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
+ rb_define_alloc_func(rb_cQueue, queue_alloc);
rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
@@ -1253,10 +1414,8 @@ Init_thread_sync(void)
rb_define_alias(rb_cQueue, "shift", "pop");
rb_define_alias(rb_cQueue, "size", "length");
- rb_cSizedQueue = rb_struct_define_without_accessor_under(
- rb_cThread,
- "SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
- "que", "waiters", "queue_waiters", "size", NULL);
+ rb_cSizedQueue = rb_define_class_under(rb_cThread, "SizedQueue", rb_cQueue);
+ rb_define_alloc_func(rb_cSizedQueue, szqueue_alloc);
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
@@ -1264,19 +1423,21 @@ Init_thread_sync(void)
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
+ rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
+ rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
rb_define_alias(rb_cSizedQueue, "enq", "push");
rb_define_alias(rb_cSizedQueue, "<<", "push");
rb_define_alias(rb_cSizedQueue, "deq", "pop");
rb_define_alias(rb_cSizedQueue, "shift", "pop");
+ rb_define_alias(rb_cSizedQueue, "size", "length");
/* CVar */
- rb_cConditionVariable = rb_struct_define_without_accessor_under(
- rb_cThread,
- "ConditionVariable", rb_cObject, rb_struct_alloc_noinit,
- "waiters", NULL);
+ rb_cConditionVariable = rb_define_class_under(rb_cThread,
+ "ConditionVariable", rb_cObject);
+ rb_define_alloc_func(rb_cConditionVariable, condvar_alloc);
id_sleep = rb_intern("sleep");