summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean Boussier <jean.boussier@gmail.com>2022-07-26 17:40:00 +0200
committerJean Boussier <jean.boussier@gmail.com>2022-08-18 10:07:37 +0200
commitfe61cad7490da8a879597f851f4a89856d44838e (patch)
tree29844ee84fe8b5c547ab8c58b8464ab450be0873
parentb3718edee28d5155ebc383d17ab58867e20f4aa2 (diff)
Implement SizedQueue#push(timeout: sec)
[Feature #18944] If both `non_block=true` and `timeout:` are supplied, ArgumentError is raised.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/6207
-rw-r--r--spec/ruby/shared/sizedqueue/enque.rb57
-rw-r--r--test/ruby/test_thread_queue.rb18
-rw-r--r--thread_sync.c48
-rw-r--r--thread_sync.rb23
4 files changed, 111 insertions, 35 deletions
diff --git a/spec/ruby/shared/sizedqueue/enque.rb b/spec/ruby/shared/sizedqueue/enque.rb
index 6ef12349f8..126470594a 100644
--- a/spec/ruby/shared/sizedqueue/enque.rb
+++ b/spec/ruby/shared/sizedqueue/enque.rb
@@ -47,4 +47,61 @@ describe :sizedqueue_enq, shared: true do
t.join
q.pop.should == 1
end
+
+ describe "with a timeout" do
+ ruby_version_is "3.2" do
+ it "returns self if the item was pushed in time" do
+ q = @object.call(1)
+ q << 1
+
+ t = Thread.new {
+ q.send(@method, 2, timeout: 1).should == q
+ }
+ Thread.pass until t.status == "sleep" && q.num_waiting == 1
+ q.pop
+ t.join
+ end
+
+ it "does nothing if the timeout is nil" do
+ q = @object.call(1)
+ q << 1
+ t = Thread.new {
+ q.send(@method, 2, timeout: nil).should == q
+ }
+ t.join(0.2).should == nil
+ q.pop
+ t.join
+ end
+
+ it "returns nil if no item is available in time" do
+ q = @object.call(1)
+ q << 1
+ t = Thread.new {
+ q.send(@method, 2, timeout: 0.1).should == nil
+ }
+ t.join
+ end
+
+ it "raise TypeError if timeout is not a valid numeric" do
+ q = @object.call(1)
+ -> { q.send(@method, 2, timeout: "1") }.should raise_error(
+ TypeError,
+ "no implicit conversion to float from string",
+ )
+
+ -> { q.send(@method, 2, timeout: false) }.should raise_error(
+ TypeError,
+ "no implicit conversion to float from false",
+ )
+ end
+
+ it "raise ArgumentError if non_block = true is passed too" do
+ q = @object.call(1)
+ -> { q.send(@method, 2, true, timeout: 1) }.should raise_error(
+ ArgumentError,
+ "can't set a timeout if non_block is enabled",
+ )
+ end
+ end
+ end
end
diff --git a/test/ruby/test_thread_queue.rb b/test/ruby/test_thread_queue.rb
index 1c852474b4..bd5728389d 100644
--- a/test/ruby/test_thread_queue.rb
+++ b/test/ruby/test_thread_queue.rb
@@ -168,6 +168,24 @@ class TestThreadQueue < Test::Unit::TestCase
end
end
+ def test_sized_queue_push_timeout
+ q = Thread::SizedQueue.new(1)
+
+ q << 1
+ assert_equal 1, q.size
+
+ t1 = Thread.new { q.push(2, timeout: 1) }
+ assert_equal t1, t1.join(2)
+ assert_nil t1.value
+
+ t2 = Thread.new { q.push(2, timeout: 0.1) }
+ assert_equal t2, t2.join(0.2)
+ assert_nil t2.value
+ ensure
+ t1&.kill
+ t2&.kill
+ end
+
def test_sized_queue_push_interrupt
q = Thread::SizedQueue.new(1)
q.push(1)
diff --git a/thread_sync.c b/thread_sync.c
index 63db1c4392..4ae404ec05 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -1229,39 +1229,15 @@ rb_szqueue_max_set(VALUE self, VALUE vmax)
return vmax;
}
-static int
-szqueue_push_should_block(int argc, const VALUE *argv)
-{
- int should_block = 1;
- rb_check_arity(argc, 1, 2);
- if (argc > 1) {
- should_block = !RTEST(argv[1]);
- }
- return should_block;
-}
-
-/*
- * Document-method: Thread::SizedQueue#push
- * call-seq:
- * push(object, non_block=false)
- * enq(object, non_block=false)
- * <<(object)
- *
- * Pushes +object+ to the queue.
- *
- * If there is no space left in the queue, waits until space becomes
- * available, unless +non_block+ is true. If +non_block+ is true, the
- * thread isn't suspended, and +ThreadError+ is raised.
- */
-
static VALUE
-rb_szqueue_push(int argc, VALUE *argv, VALUE self)
+rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
{
+ rb_hrtime_t end = queue_timeout2hrtime(timeout);
+ bool timed_out = false;
struct rb_szqueue *sq = szqueue_ptr(self);
- int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
- if (!should_block) {
+ if (RTEST(non_block)) {
rb_raise(rb_eThreadError, "queue full");
}
else if (queue_closed_p(self)) {
@@ -1281,11 +1257,14 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
struct queue_sleep_arg queue_sleep_arg = {
.self = self,
- .timeout = Qnil,
- .end = 0
+ .timeout = timeout,
+ .end = end
};
-
rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
+ if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
+ timed_out = true;
+ break;
+ }
}
}
@@ -1293,7 +1272,9 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
raise_closed_queue_error(self);
}
- return queue_do_push(self, &sq->q, argv[0]);
+ if (timed_out) return Qnil;
+
+ return queue_do_push(self, &sq->q, object);
}
static VALUE
@@ -1611,13 +1592,10 @@ Init_thread_sync(void)
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
- rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
- rb_define_alias(rb_cSizedQueue, "enq", "push");
- rb_define_alias(rb_cSizedQueue, "<<", "push");
rb_define_alias(rb_cSizedQueue, "size", "length");
/* CVar */
diff --git a/thread_sync.rb b/thread_sync.rb
index d567ca51af..7e4c341ad0 100644
--- a/thread_sync.rb
+++ b/thread_sync.rb
@@ -41,5 +41,28 @@ class Thread
end
alias_method :deq, :pop
alias_method :shift, :pop
+
+ # call-seq:
+ # push(object, non_block=false, timeout: nil)
+ # enq(object, non_block=false, timeout: nil)
+ # <<(object)
+ #
+ # Pushes +object+ to the queue.
+ #
+ # If there is no space left in the queue, waits until space becomes
+ # available, unless +non_block+ is true. If +non_block+ is true, the
+ # thread isn't suspended, and +ThreadError+ is raised.
+ #
+ # If +timeout+ seconds have passed and no space is available +nil+ is
+ # returned.
+ # Otherwise it returns +self+.
+ def push(object, non_block = false, timeout: nil)
+ if non_block && timeout
+ raise ArgumentError, "can't set a timeout if non_block is enabled"
+ end
+ Primitive.rb_szqueue_push(object, non_block, timeout)
+ end
+ alias_method :enq, :push
+ alias_method :<<, :push
end
end