summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-14 11:10:02 +1200
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2020-09-14 16:44:09 +1200
commit8eea66a0ca8965ae8319b4c404f61c4d46866f64 (patch)
tree6083a6566df682ec5598bb971291cc150877de2b /thread_sync.c
parent0f613cc5f1bbe319ab916be905de263523ef5402 (diff)
Add support for Queue & SizedQueue.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/3434
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c70
1 files changed, 39 insertions, 31 deletions
diff --git a/thread_sync.c b/thread_sync.c
index bd60231789..c0a61554c1 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -946,25 +946,29 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
check_array(self, q->que);
while (RARRAY_LEN(q->que) == 0) {
- if (!should_block) {
- rb_raise(rb_eThreadError, "queue empty");
- }
- else if (queue_closed_p(self)) {
- return queue_closed_result(self, q);
- }
- else {
- struct queue_waiter qw;
+ if (!should_block) {
+ rb_raise(rb_eThreadError, "queue empty");
+ }
+ else if (queue_closed_p(self)) {
+ return queue_closed_result(self, q);
+ }
+ else {
+ rb_execution_context_t *ec = GET_EC();
+ struct queue_waiter qw;
- assert(RARRAY_LEN(q->que) == 0);
- assert(queue_closed_p(self) == 0);
+ assert(RARRAY_LEN(q->que) == 0);
+ assert(queue_closed_p(self) == 0);
- qw.w.th = GET_THREAD();
- qw.as.q = q;
- list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
- qw.as.q->num_waiting++;
+ qw.w.self = self;
+ qw.w.th = ec->thread_ptr;
+ qw.w.fiber = ec->fiber_ptr;
- rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
- }
+ qw.as.q = q;
+ list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
+ qw.as.q->num_waiting++;
+
+ rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
+ }
}
return rb_ary_shift(q->que);
@@ -1188,27 +1192,31 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
- if (!should_block) {
- rb_raise(rb_eThreadError, "queue full");
- }
- else if (queue_closed_p(self)) {
+ if (!should_block) {
+ rb_raise(rb_eThreadError, "queue full");
+ }
+ else if (queue_closed_p(self)) {
break;
- }
- else {
- struct queue_waiter qw;
- struct list_head *pushq = szqueue_pushq(sq);
+ }
+ else {
+ rb_execution_context_t *ec = GET_EC();
+ struct queue_waiter qw;
+ struct list_head *pushq = szqueue_pushq(sq);
- qw.w.th = GET_THREAD();
- qw.as.sq = sq;
- list_add_tail(pushq, &qw.w.node);
- sq->num_waiting_push++;
+ qw.w.self = self;
+ qw.w.th = ec->thread_ptr;
+ qw.w.fiber = ec->fiber_ptr;
- rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
- }
+ qw.as.sq = sq;
+ list_add_tail(pushq, &qw.w.node);
+ sq->num_waiting_push++;
+
+ rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
+ }
}
if (queue_closed_p(self)) {
- raise_closed_queue_error(self);
+ raise_closed_queue_error(self);
}
return queue_do_push(self, &sq->q, argv[0]);