summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/fiber/test_mutex.rb31
-rw-r--r--thread.c9
-rw-r--r--thread_sync.c70
3 files changed, 77 insertions, 33 deletions
diff --git a/test/fiber/test_mutex.rb b/test/fiber/test_mutex.rb
index 21034128a1..1f53ae1a1f 100644
--- a/test/fiber/test_mutex.rb
+++ b/test/fiber/test_mutex.rb
@@ -84,6 +84,37 @@ class TestFiberMutex < Test::Unit::TestCase
assert signalled > 1
end
+ def test_queue
+ queue = Queue.new
+ processed = 0
+
+ thread = Thread.new do
+ scheduler = Scheduler.new
+ Thread.current.scheduler = scheduler
+
+ Fiber.schedule do
+ 3.times do |i|
+ queue << i
+ sleep 0.1
+ end
+
+ queue.close
+ end
+
+ Fiber.schedule do
+ while item = queue.pop
+ processed += 1
+ end
+ end
+
+ scheduler.run
+ end
+
+ thread.join
+
+ assert processed == 3
+ end
+
def test_mutex_deadlock
err = /No live threads left. Deadlock\?/
assert_in_out_err %W[-I#{__dir__} -], <<-RUBY, ['in synchronize'], err, success: false
diff --git a/thread.c b/thread.c
index c4ff5aafde..ab574e5c29 100644
--- a/thread.c
+++ b/thread.c
@@ -1481,8 +1481,13 @@ rb_thread_sleep_interruptible(void)
static void
rb_thread_sleep_deadly_allow_spurious_wakeup(void)
{
- thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
- sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ VALUE scheduler = rb_thread_current_scheduler();
+ if (scheduler != Qnil) {
+ rb_scheduler_kernel_sleepv(scheduler, 0, NULL);
+ } else {
+ thread_debug("rb_thread_sleep_deadly_allow_spurious_wakeup\n");
+ sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ }
}
void
diff --git a/thread_sync.c b/thread_sync.c
index bd60231789..c0a61554c1 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -946,25 +946,29 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
check_array(self, q->que);
while (RARRAY_LEN(q->que) == 0) {
- if (!should_block) {
- rb_raise(rb_eThreadError, "queue empty");
- }
- else if (queue_closed_p(self)) {
- return queue_closed_result(self, q);
- }
- else {
- struct queue_waiter qw;
+ if (!should_block) {
+ rb_raise(rb_eThreadError, "queue empty");
+ }
+ else if (queue_closed_p(self)) {
+ return queue_closed_result(self, q);
+ }
+ else {
+ rb_execution_context_t *ec = GET_EC();
+ struct queue_waiter qw;
- assert(RARRAY_LEN(q->que) == 0);
- assert(queue_closed_p(self) == 0);
+ assert(RARRAY_LEN(q->que) == 0);
+ assert(queue_closed_p(self) == 0);
- qw.w.th = GET_THREAD();
- qw.as.q = q;
- list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
- qw.as.q->num_waiting++;
+ qw.w.self = self;
+ qw.w.th = ec->thread_ptr;
+ qw.w.fiber = ec->fiber_ptr;
- rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
- }
+ qw.as.q = q;
+ list_add_tail(queue_waitq(qw.as.q), &qw.w.node);
+ qw.as.q->num_waiting++;
+
+ rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
+ }
}
return rb_ary_shift(q->que);
@@ -1188,27 +1192,31 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
- if (!should_block) {
- rb_raise(rb_eThreadError, "queue full");
- }
- else if (queue_closed_p(self)) {
+ if (!should_block) {
+ rb_raise(rb_eThreadError, "queue full");
+ }
+ else if (queue_closed_p(self)) {
break;
- }
- else {
- struct queue_waiter qw;
- struct list_head *pushq = szqueue_pushq(sq);
+ }
+ else {
+ rb_execution_context_t *ec = GET_EC();
+ struct queue_waiter qw;
+ struct list_head *pushq = szqueue_pushq(sq);
- qw.w.th = GET_THREAD();
- qw.as.sq = sq;
- list_add_tail(pushq, &qw.w.node);
- sq->num_waiting_push++;
+ qw.w.self = self;
+ qw.w.th = ec->thread_ptr;
+ qw.w.fiber = ec->fiber_ptr;
- rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
- }
+ qw.as.sq = sq;
+ list_add_tail(pushq, &qw.w.node);
+ sq->num_waiting_push++;
+
+ rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
+ }
}
if (queue_closed_p(self)) {
- raise_closed_queue_error(self);
+ raise_closed_queue_error(self);
}
return queue_do_push(self, &sq->q, argv[0]);