summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c32
1 files changed, 18 insertions, 14 deletions
diff --git a/thread_sync.c b/thread_sync.c
index a37f51f149..95f48ad079 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -521,12 +521,15 @@ void rb_mutex_allow_trap(VALUE self, int val)
/* Queue */
+#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
PACKED_STRUCT_UNALIGNED(struct rb_queue {
struct list_head waitq;
const VALUE que;
int num_waiting;
});
+#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
+#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
struct rb_queue q;
int num_waiting_push;
@@ -562,7 +565,7 @@ queue_alloc(VALUE klass)
struct rb_queue *q;
obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
- list_head_init(&q->waitq);
+ list_head_init(queue_waitq(q));
return obj;
}
@@ -603,8 +606,8 @@ szqueue_alloc(VALUE klass)
struct rb_szqueue *sq;
VALUE obj = TypedData_Make_Struct(klass, struct rb_szqueue,
&szqueue_data_type, sq);
- list_head_init(&sq->q.waitq);
- list_head_init(&sq->pushq);
+ list_head_init(szqueue_waitq(sq));
+ list_head_init(szqueue_pushq(sq));
return obj;
}
@@ -702,7 +705,7 @@ rb_queue_initialize(VALUE self)
{
struct rb_queue *q = queue_ptr(self);
RB_OBJ_WRITE(self, &q->que, ary_buf_new());
- list_head_init(&q->waitq);
+ list_head_init(queue_waitq(q));
return self;
}
@@ -713,7 +716,7 @@ queue_do_push(VALUE self, struct rb_queue *q, VALUE obj)
raise_closed_queue_error(self);
}
rb_ary_push(check_array(self, q->que), obj);
- wakeup_one(&q->waitq);
+ wakeup_one(queue_waitq(q));
return self;
}
@@ -756,7 +759,7 @@ rb_queue_close(VALUE self)
if (!queue_closed_p(self)) {
FL_SET(self, QUEUE_CLOSED);
- wakeup_all(&q->waitq);
+ wakeup_all(queue_waitq(q));
}
return self;
@@ -975,8 +978,8 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
}
RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
- list_head_init(&sq->q.waitq);
- list_head_init(&sq->pushq);
+ list_head_init(szqueue_waitq(sq));
+ list_head_init(szqueue_pushq(sq));
sq->max = max;
return self;
@@ -1001,8 +1004,8 @@ rb_szqueue_close(VALUE self)
struct rb_szqueue *sq = szqueue_ptr(self);
FL_SET(self, QUEUE_CLOSED);
- wakeup_all(&sq->q.waitq);
- wakeup_all(&sq->pushq);
+ wakeup_all(szqueue_waitq(sq));
+ wakeup_all(szqueue_pushq(sq));
}
return self;
}
@@ -1040,7 +1043,7 @@ rb_szqueue_max_set(VALUE self, VALUE vmax)
diff = max - sq->max;
}
sq->max = max;
- while (diff-- > 0 && wakeup_one(&sq->pushq)) {
+ while (diff-- > 0 && wakeup_one(szqueue_pushq(sq))) {
/* keep waking more up */
}
return vmax;
@@ -1086,10 +1089,11 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
}
else {
struct queue_waiter qw;
+ struct list_head *pushq = szqueue_pushq(sq);
qw.w.th = GET_THREAD();
qw.as.sq = sq;
- list_add_tail(&sq->pushq, &qw.w.node);
+ list_add_tail(pushq, &qw.w.node);
sq->num_waiting_push++;
rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
@@ -1111,7 +1115,7 @@ szqueue_do_pop(VALUE self, int should_block)
VALUE retval = queue_do_pop(self, &sq->q, should_block);
if (queue_length(self, &sq->q) < sq->max) {
- wakeup_one(&sq->pushq);
+ wakeup_one(szqueue_pushq(sq));
}
return retval;
@@ -1150,7 +1154,7 @@ rb_szqueue_clear(VALUE self)
struct rb_szqueue *sq = szqueue_ptr(self);
rb_ary_clear(check_array(self, sq->q.que));
- wakeup_all(&sq->pushq);
+ wakeup_all(szqueue_pushq(sq));
return self;
}