summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean Boussier <jean.boussier@gmail.com>2022-07-26 17:40:00 +0200
committerJean Boussier <jean.boussier@gmail.com>2022-08-02 11:04:28 +0200
commite3aabe93aae87a60ba7b8f1a0fd590534647e352 (patch)
tree3f5c15b61c9914c7e1a34ad56d042dcf70024f75
parentec3f59309e3f08339c4c76a6881901580801d6cd (diff)
Implement Queue#pop(timeout: sec)
[Feature #18774] As well as `SizedQueue#pop(timeout: sec)` If both `non_block=true` and `timeout:` are supplied, ArgumentError is raised.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/6185
-rw-r--r--.document1
-rw-r--r--common.mk5
-rw-r--r--hrtime.h10
-rw-r--r--inits.c1
-rw-r--r--spec/ruby/shared/queue/deque.rb55
-rw-r--r--test/ruby/test_settracefunc.rb21
-rw-r--r--test/ruby/test_thread_queue.rb35
-rw-r--r--thread.c35
-rw-r--r--thread_sync.c105
-rw-r--r--thread_sync.rb45
10 files changed, 238 insertions, 75 deletions
diff --git a/.document b/.document
index 5494bcc7fe..ec2fa09326 100644
--- a/.document
+++ b/.document
@@ -24,6 +24,7 @@ pack.rb
ractor.rb
string.rb
timev.rb
+thread_sync.rb
trace_point.rb
warning.rb
diff --git a/common.mk b/common.mk
index aeb87dfb55..4c49690e4a 100644
--- a/common.mk
+++ b/common.mk
@@ -1062,6 +1062,7 @@ BUILTIN_RB_SRCS = \
$(srcdir)/kernel.rb \
$(srcdir)/ractor.rb \
$(srcdir)/timev.rb \
+ $(srcdir)/thread_sync.rb \
$(srcdir)/nilclass.rb \
$(srcdir)/prelude.rb \
$(srcdir)/gem_prelude.rb \
@@ -9447,6 +9448,7 @@ miniinit.$(OBJEXT): {$(VPATH)}st.h
miniinit.$(OBJEXT): {$(VPATH)}subst.h
miniinit.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
miniinit.$(OBJEXT): {$(VPATH)}thread_native.h
+miniinit.$(OBJEXT): {$(VPATH)}thread_sync.rb
miniinit.$(OBJEXT): {$(VPATH)}timev.rb
miniinit.$(OBJEXT): {$(VPATH)}trace_point.rb
miniinit.$(OBJEXT): {$(VPATH)}vm_core.h
@@ -15230,6 +15232,7 @@ thread.$(OBJEXT): {$(VPATH)}backward/2/limits.h
thread.$(OBJEXT): {$(VPATH)}backward/2/long_long.h
thread.$(OBJEXT): {$(VPATH)}backward/2/stdalign.h
thread.$(OBJEXT): {$(VPATH)}backward/2/stdarg.h
+thread.$(OBJEXT): {$(VPATH)}builtin.h
thread.$(OBJEXT): {$(VPATH)}config.h
thread.$(OBJEXT): {$(VPATH)}debug.h
thread.$(OBJEXT): {$(VPATH)}debug_counter.h
@@ -15412,6 +15415,8 @@ thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).c
thread.$(OBJEXT): {$(VPATH)}thread_$(THREAD_MODEL).h
thread.$(OBJEXT): {$(VPATH)}thread_native.h
thread.$(OBJEXT): {$(VPATH)}thread_sync.c
+thread.$(OBJEXT): {$(VPATH)}thread_sync.rb
+thread.$(OBJEXT): {$(VPATH)}thread_sync.rbinc
thread.$(OBJEXT): {$(VPATH)}timev.h
thread.$(OBJEXT): {$(VPATH)}vm_core.h
thread.$(OBJEXT): {$(VPATH)}vm_debug.h
diff --git a/hrtime.h b/hrtime.h
index 4ac3d54723..80aff5deb3 100644
--- a/hrtime.h
+++ b/hrtime.h
@@ -36,6 +36,7 @@
#define RB_HRTIME_PER_MSEC (RB_HRTIME_PER_USEC * (rb_hrtime_t)1000)
#define RB_HRTIME_PER_SEC (RB_HRTIME_PER_MSEC * (rb_hrtime_t)1000)
#define RB_HRTIME_MAX UINT64_MAX
+#define RB_HRTIME_MIN ((rb_hrtime_t)0)
/*
* Lets try to support time travelers. Lets assume anybody with a time machine
@@ -91,6 +92,15 @@ rb_hrtime_add(rb_hrtime_t a, rb_hrtime_t b)
return c;
}
+static inline rb_hrtime_t
+rb_hrtime_sub(rb_hrtime_t a, rb_hrtime_t b)
+{
+ if (a < b) {
+ return RB_HRTIME_MIN;
+ }
+ return a - b;
+}
+
/*
* convert a timeval struct to rb_hrtime_t, clamping at RB_HRTIME_MAX
*/
diff --git a/inits.c b/inits.c
index f41e88d838..22ba6d5a8c 100644
--- a/inits.c
+++ b/inits.c
@@ -98,6 +98,7 @@ rb_call_builtin_inits(void)
BUILTIN(array);
BUILTIN(kernel);
BUILTIN(timev);
+ BUILTIN(thread_sync);
BUILTIN(yjit);
BUILTIN(nilclass);
BUILTIN(marshal);
diff --git a/spec/ruby/shared/queue/deque.rb b/spec/ruby/shared/queue/deque.rb
index 8b755dd9b7..ed32bd29c8 100644
--- a/spec/ruby/shared/queue/deque.rb
+++ b/spec/ruby/shared/queue/deque.rb
@@ -55,6 +55,61 @@ describe :queue_deq, shared: true do
t.join
end
+ describe "with a timeout" do
+ ruby_version_is "3.2" do
+ it "returns an item if one is available in time" do
+ q = @object.call
+
+ t = Thread.new {
+ q.send(@method, timeout: 1).should == 1
+ }
+ Thread.pass until t.status == "sleep" && q.num_waiting == 1
+ q << 1
+ t.join
+ end
+
+ it "returns nil if no item is available in time" do
+ q = @object.call
+
+ t = Thread.new {
+ q.send(@method, timeout: 0.1).should == nil
+ }
+ t.join
+ end
+
+ it "does nothing if the timeout is nil" do
+ q = @object.call
+ t = Thread.new {
+ q.send(@method, timeout: nil).should == 1
+ }
+ t.join(0.2).should == nil
+ q << 1
+ t.join
+ end
+
+ it "raise TypeError if timeout is not a valid numeric" do
+ q = @object.call
+ -> { q.send(@method, timeout: "1") }.should raise_error(
+ TypeError,
+ "no implicit conversion to float from string",
+ )
+
+ -> { q.send(@method, timeout: false) }.should raise_error(
+ TypeError,
+ "no implicit conversion to float from false",
+ )
+ end
+
+ it "raise ArgumentError if non_block = true is passed too" do
+ q = @object.call
+ -> { q.send(@method, true, timeout: 1) }.should raise_error(
+ ArgumentError,
+ "can't set a timeout if non_block is enabled",
+ )
+ end
+ end
+ end
+
describe "in non-blocking mode" do
it "removes an item from the queue" do
q = @object.call
diff --git a/test/ruby/test_settracefunc.rb b/test/ruby/test_settracefunc.rb
index 56d457c7d7..31946c8b71 100644
--- a/test/ruby/test_settracefunc.rb
+++ b/test/ruby/test_settracefunc.rb
@@ -2140,17 +2140,16 @@ CODE
m2t_q.push 1
t.join
- assert_equal ["c-return", base_line + 31], events[0]
- assert_equal ["line", base_line + 32], events[1]
- assert_equal ["line", base_line + 33], events[2]
- assert_equal ["call", base_line + -6], events[3]
- assert_equal ["return", base_line + -4], events[4]
- assert_equal ["line", base_line + 34], events[5]
- assert_equal ["line", base_line + 35], events[6]
- assert_equal ["c-call", base_line + 35], events[7] # Thread.current
- assert_equal ["c-return", base_line + 35], events[8] # Thread.current
- assert_equal ["c-call", base_line + 35], events[9] # Thread#set_trace_func
- assert_equal nil, events[10]
+ assert_equal ["line", base_line + 32], events[0]
+ assert_equal ["line", base_line + 33], events[1]
+ assert_equal ["call", base_line + -6], events[2]
+ assert_equal ["return", base_line + -4], events[3]
+ assert_equal ["line", base_line + 34], events[4]
+ assert_equal ["line", base_line + 35], events[5]
+ assert_equal ["c-call", base_line + 35], events[6] # Thread.current
+ assert_equal ["c-return", base_line + 35], events[7] # Thread.current
+ assert_equal ["c-call", base_line + 35], events[8] # Thread#set_trace_func
+ assert_equal nil, events[9]
end
def test_lineno_in_optimized_insn
diff --git a/test/ruby/test_thread_queue.rb b/test/ruby/test_thread_queue.rb
index ebf7ded3b9..aa4ea0a400 100644
--- a/test/ruby/test_thread_queue.rb
+++ b/test/ruby/test_thread_queue.rb
@@ -111,6 +111,23 @@ class TestThreadQueue < Test::Unit::TestCase
assert_equal(0, q.num_waiting)
end
+ def test_queue_pop_timeout
+ q = Thread::Queue.new
+ q << 1
+ assert_equal 1, q.pop(timeout: 1)
+
+ t1 = Thread.new { q.pop(timeout: 1) }
+ assert_equal t1, t1.join(2)
+ assert_nil t1.value
+
+ t2 = Thread.new { q.pop(timeout: 0.1) }
+ assert_equal t2, t2.join(0.2)
+ assert_nil t2.value
+ ensure
+ t1&.kill
+ t2&.kill
+ end
+
def test_queue_pop_non_block
q = Thread::Queue.new
assert_raise_with_message(ThreadError, /empty/) do
@@ -126,6 +143,24 @@ class TestThreadQueue < Test::Unit::TestCase
assert_equal(0, q.num_waiting)
end
+ def test_sized_queue_pop_timeout
+ q = Thread::SizedQueue.new(1)
+
+ q << 1
+ assert_equal 1, q.pop(timeout: 1)
+
+ t1 = Thread.new { q.pop(timeout: 1) }
+ assert_equal t1, t1.join(2)
+ assert_nil t1.value
+
+ t2 = Thread.new { q.pop(timeout: 0.1) }
+ assert_equal t2, t2.join(0.2)
+ assert_nil t2.value
+ ensure
+ t1&.kill
+ t2&.kill
+ end
+
def test_sized_queue_pop_non_block
q = Thread::SizedQueue.new(1)
assert_raise_with_message(ThreadError, /empty/) do
diff --git a/thread.c b/thread.c
index 411b6d7084..feb89d4352 100644
--- a/thread.c
+++ b/thread.c
@@ -132,7 +132,7 @@ rb_thread_local_storage(VALUE thread)
static int sleep_hrtime(rb_thread_t *, rb_hrtime_t, unsigned int fl);
static void sleep_forever(rb_thread_t *th, unsigned int fl);
-static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker);
+static void rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end);
static int rb_threadptr_dead(rb_thread_t *th);
static void rb_check_deadlock(rb_ractor_t *r);
static int rb_threadptr_pending_interrupt_empty_p(const rb_thread_t *th);
@@ -1328,6 +1328,28 @@ sleep_hrtime(rb_thread_t *th, rb_hrtime_t rel, unsigned int fl)
return woke;
}
+static int
+sleep_hrtime_until(rb_thread_t *th, rb_hrtime_t end, unsigned int fl)
+{
+ enum rb_thread_status prev_status = th->status;
+ int woke;
+ rb_hrtime_t rel = rb_hrtime_sub(end, rb_hrtime_now());
+
+ th->status = THREAD_STOPPED;
+ RUBY_VM_CHECK_INTS_BLOCKING(th->ec);
+ while (th->status == THREAD_STOPPED) {
+ native_sleep(th, &rel);
+ woke = vm_check_ints_blocking(th->ec);
+ if (woke && !(fl & SLEEP_SPURIOUS_CHECK))
+ break;
+ if (hrtime_update_expire(&rel, end))
+ break;
+ woke = 1;
+ }
+ th->status = prev_status;
+ return woke;
+}
+
void
rb_thread_sleep_forever(void)
{
@@ -1355,15 +1377,20 @@ rb_thread_sleep_interruptible(void)
}
static void
-rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker)
+rb_thread_sleep_deadly_allow_spurious_wakeup(VALUE blocker, VALUE timeout, rb_hrtime_t end)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
- rb_fiber_scheduler_block(scheduler, blocker, Qnil);
+ rb_fiber_scheduler_block(scheduler, blocker, timeout);
}
else {
RUBY_DEBUG_LOG("%s", "");
- sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ if (end) {
+ sleep_hrtime_until(GET_THREAD(), end, SLEEP_SPURIOUS_CHECK);
+ }
+ else {
+ sleep_forever(GET_THREAD(), SLEEP_DEADLOCKABLE);
+ }
}
}
diff --git a/thread_sync.c b/thread_sync.c
index 5ff36dd01d..1a0f3ee855 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -1,5 +1,6 @@
/* included by thread.c */
#include "ccan/list/list.h"
+#include "builtin.h"
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
static VALUE rb_eClosedQueueError;
@@ -19,6 +20,12 @@ struct sync_waiter {
struct ccan_list_node node;
};
+struct queue_sleep_arg {
+ VALUE self;
+ VALUE timeout;
+ rb_hrtime_t end;
+};
+
#define MUTEX_ALLOW_TRAP FL_USER1
static void
@@ -514,7 +521,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
static VALUE
rb_mutex_sleep_forever(VALUE self)
{
- rb_thread_sleep_deadly_allow_spurious_wakeup(self);
+ rb_thread_sleep_deadly_allow_spurious_wakeup(self, Qnil, 0);
return Qnil;
}
@@ -706,6 +713,21 @@ queue_ptr(VALUE obj)
#define QUEUE_CLOSED FL_USER5
+static rb_hrtime_t
+queue_timeout2hrtime(VALUE timeout) {
+ if (NIL_P(timeout)) {
+ return (rb_hrtime_t)0;
+ }
+ rb_hrtime_t rel = 0;
+ if (FIXNUM_P(timeout)) {
+ rel = rb_sec2hrtime(NUM2TIMET(timeout));
+ }
+ else {
+ double2hrtime(&rel, rb_num2dbl(timeout));
+ }
+ return rb_hrtime_add(rel, rb_hrtime_now());
+}
+
static void
szqueue_mark(void *ptr)
{
@@ -964,9 +986,10 @@ rb_queue_push(VALUE self, VALUE obj)
}
static VALUE
-queue_sleep(VALUE self)
+queue_sleep(VALUE _args)
{
- rb_thread_sleep_deadly_allow_spurious_wakeup(self);
+ struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
+ rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
return Qnil;
}
@@ -1001,9 +1024,10 @@ szqueue_sleep_done(VALUE p)
}
static VALUE
-queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
+queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
{
check_array(self, q->que);
+ rb_hrtime_t end = queue_timeout2hrtime(timeout);
while (RARRAY_LEN(q->que) == 0) {
if (!should_block) {
@@ -1028,43 +1052,25 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
ccan_list_add_tail(waitq, &queue_waiter.w.node);
queue_waiter.as.q->num_waiting++;
- rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter);
+ struct queue_sleep_arg queue_sleep_arg = {
+ .self = self,
+ .timeout = timeout,
+ .end = end
+ };
+
+ rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
+ if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
+ break;
}
}
return rb_ary_shift(q->que);
}
-static int
-queue_pop_should_block(int argc, const VALUE *argv)
-{
- int should_block = 1;
- rb_check_arity(argc, 0, 1);
- if (argc > 0) {
- should_block = !RTEST(argv[0]);
- }
- return should_block;
-}
-
-/*
- * Document-method: Thread::Queue#pop
- * call-seq:
- * pop(non_block=false)
- * deq(non_block=false)
- * shift(non_block=false)
- *
- * Retrieves data from the queue.
- *
- * If the queue is empty, the calling thread is suspended until data is pushed
- * onto the queue. If +non_block+ is true, the thread isn't suspended, and
- * +ThreadError+ is raised.
- */
-
static VALUE
-rb_queue_pop(int argc, VALUE *argv, VALUE self)
+rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
{
- int should_block = queue_pop_should_block(argc, argv);
- return queue_do_pop(self, queue_ptr(self), should_block);
+ return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
}
/*
@@ -1283,10 +1289,10 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
}
static VALUE
-szqueue_do_pop(VALUE self, int should_block)
+szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
{
struct rb_szqueue *sq = szqueue_ptr(self);
- VALUE retval = queue_do_pop(self, &sq->q, should_block);
+ VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
if (queue_length(self, &sq->q) < sq->max) {
wakeup_one(szqueue_pushq(sq));
@@ -1294,26 +1300,10 @@ szqueue_do_pop(VALUE self, int should_block)
return retval;
}
-
-/*
- * Document-method: Thread::SizedQueue#pop
- * call-seq:
- * pop(non_block=false)
- * deq(non_block=false)
- * shift(non_block=false)
- *
- * Retrieves data from the queue.
- *
- * If the queue is empty, the calling thread is suspended until data is pushed
- * onto the queue. If +non_block+ is true, the thread isn't suspended, and
- * +ThreadError+ is raised.
- */
-
static VALUE
-rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
+rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
{
- int should_block = queue_pop_should_block(argc, argv);
- return szqueue_do_pop(self, should_block);
+ return szqueue_do_pop(self, !RTEST(non_block), timeout);
}
/*
@@ -1597,7 +1587,6 @@ Init_thread_sync(void)
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
- rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
@@ -1605,8 +1594,6 @@ Init_thread_sync(void)
rb_define_alias(rb_cQueue, "enq", "push");
rb_define_alias(rb_cQueue, "<<", "push");
- rb_define_alias(rb_cQueue, "deq", "pop");
- rb_define_alias(rb_cQueue, "shift", "pop");
rb_define_alias(rb_cQueue, "size", "length");
DEFINE_CLASS(SizedQueue, Queue);
@@ -1617,16 +1604,12 @@ Init_thread_sync(void)
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
- rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
-
rb_define_alias(rb_cSizedQueue, "enq", "push");
rb_define_alias(rb_cSizedQueue, "<<", "push");
- rb_define_alias(rb_cSizedQueue, "deq", "pop");
- rb_define_alias(rb_cSizedQueue, "shift", "pop");
rb_define_alias(rb_cSizedQueue, "size", "length");
/* CVar */
@@ -1644,3 +1627,5 @@ Init_thread_sync(void)
rb_provide("thread.rb");
}
+
+#include "thread_sync.rbinc"
diff --git a/thread_sync.rb b/thread_sync.rb
new file mode 100644
index 0000000000..d567ca51af
--- /dev/null
+++ b/thread_sync.rb
@@ -0,0 +1,45 @@
+class Thread
+ class Queue
+ # call-seq:
+ # pop(non_block=false, timeout: nil)
+ #
+ # Retrieves data from the queue.
+ #
+ # If the queue is empty, the calling thread is suspended until data is pushed
+ # onto the queue. If +non_block+ is true, the thread isn't suspended, and
+ # +ThreadError+ is raised.
+ #
+ # If +timeout+ seconds have passed and no data is available +nil+ is
+ # returned.
+ def pop(non_block = false, timeout: nil)
+ if non_block && timeout
+ raise ArgumentError, "can't set a timeout if non_block is enabled"
+ end
+ Primitive.rb_queue_pop(non_block, timeout)
+ end
+ alias_method :deq, :pop
+ alias_method :shift, :pop
+ end
+
+ class SizedQueue
+ # call-seq:
+ # pop(non_block=false, timeout: nil)
+ #
+ # Retrieves data from the queue.
+ #
+ # If the queue is empty, the calling thread is suspended until data is
+ # pushed onto the queue. If +non_block+ is true, the thread isn't
+ # suspended, and +ThreadError+ is raised.
+ #
+ # If +timeout+ seconds have passed and no data is available +nil+ is
+ # returned.
+ def pop(non_block = false, timeout: nil)
+ if non_block && timeout
+ raise ArgumentError, "can't set a timeout if non_block is enabled"
+ end
+ Primitive.rb_szqueue_pop(non_block, timeout)
+ end
+ alias_method :deq, :pop
+ alias_method :shift, :pop
+ end
+end