summaryrefslogtreecommitdiff
path: root/ractor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.rb')
-rw-r--r--ractor.rb1060
1 files changed, 526 insertions, 534 deletions
diff --git a/ractor.rb b/ractor.rb
index 1031fe499b..ac04237fc6 100644
--- a/ractor.rb
+++ b/ractor.rb
@@ -1,96 +1,60 @@
-# Ractor is an Actor-model abstraction for Ruby that provides thread-safe parallel execution.
-#
-# Ractor.new can make a new Ractor, and it will run in parallel.
+# Ractor.new creates a new \Ractor, which can run in parallel with other ractors.
#
# # The simplest ractor
# r = Ractor.new {puts "I am in Ractor!"}
-# r.take # wait for it to finish
-# # here "I am in Ractor!" would be printed
+# r.join # wait for it to finish
+# # Here, "I am in Ractor!" is printed
#
-# Ractors do not share usual objects, so the same kinds of thread-safety concerns such as data-race,
-# race-conditions are not available on multi-ractor programming.
+# Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety
+# concerns such as data-races and race-conditions are not possible. The other benefit is parallelism.
#
-# To achieve this, ractors severely limit object sharing between different ractors.
-# For example, unlike threads, ractors can't access each other's objects, nor any objects through
-# variables of the outer scope.
+# To achieve this, object sharing is limited across ractors.
+# Unlike in threads, ractors can't access all the objects available in other ractors. For example, objects normally
+# available through variables in the outer scope are prohibited from being used across ractors.
#
# a = 1
# r = Ractor.new {puts "I am in Ractor! a=#{a}"}
# # fails immediately with
# # ArgumentError (can not isolate a Proc because it accesses outer variables (a).)
#
-# On CRuby (the default implementation), Global Virtual Machine Lock (GVL) is held per ractor, so
-# ractors are performed in parallel without locking each other.
+# The object must be explicitly shared:
+# a = 1
+# r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
+#
+# On CRuby (the default implementation), the Global Virtual Machine Lock (GVL) is held per ractor, so
+# ractors can run in parallel. This is unlike the situation with threads on CRuby.
#
-# Instead of accessing the shared state, the objects should be passed to and from ractors via
-# sending and receiving objects as messages.
+# Instead of accessing shared state, objects should be passed to and from ractors by
+# sending and receiving them as messages.
#
# a = 1
# r = Ractor.new do
-# a_in_ractor = receive # receive blocks till somebody will pass message
+# a_in_ractor = receive # receive blocks the Thread until our default port gets sent a message
# puts "I am in Ractor! a=#{a_in_ractor}"
# end
# r.send(a) # pass it
-# r.take
-# # here "I am in Ractor! a=1" would be printed
-#
-# There are two pairs of methods for sending/receiving messages:
-#
-# * Ractor#send and Ractor.receive for when the _sender_ knows the receiver (push);
-# * Ractor.yield and Ractor#take for when the _receiver_ knows the sender (pull);
-#
-# In addition to that, an argument to Ractor.new would be passed to block and available there
-# as if received by Ractor.receive, and the last block value would be sent outside of the
-# ractor as if sent by Ractor.yield.
-#
-# A little demonstration on a classic ping-pong:
-#
-# server = Ractor.new do
-# puts "Server starts: #{self.inspect}"
-# puts "Server sends: ping"
-# Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested
-# received = Ractor.receive # The server doesn't know the sender and receives from whoever sent
-# puts "Server received: #{received}"
-# end
-#
-# client = Ractor.new(server) do |srv| # The server is sent inside client, and available as srv
-# puts "Client starts: #{self.inspect}"
-# received = srv.take # The Client takes a message specifically from the server
-# puts "Client received from " \
-# "#{srv.inspect}: #{received}"
-# puts "Client sends to " \
-# "#{srv.inspect}: pong"
-# srv.send 'pong' # The client sends a message specifically to the server
-# end
-#
-# [client, server].each(&:take) # Wait till they both finish
-#
-# This will output:
+# r.join
+# # Here, "I am in Ractor! a=1" is printed
#
-# Server starts: #<Ractor:#2 test.rb:1 running>
-# Server sends: ping
-# Client starts: #<Ractor:#3 test.rb:8 running>
-# Client received from #<Ractor:#2 rac.rb:1 blocking>: ping
-# Client sends to #<Ractor:#2 rac.rb:1 blocking>: pong
-# Server received: pong
-#
-# It is said that Ractor receives messages via the <em>incoming port</em>, and sends them
-# to the <em>outgoing port</em>. Either one can be disabled with Ractor#close_incoming and
-# Ractor#close_outgoing respectively. If a ractor terminated, its ports will be closed
-# automatically.
+# In addition to that, any arguments passed to Ractor.new are passed to the block and available there
+# as if received by Ractor.receive, and the last block value can be received with Ractor#value.
#
# == Shareable and unshareable objects
#
-# When the object is sent to and from the ractor, it is important to understand whether the
-# object is shareable or unshareable. Most of objects are unshareable objects.
+# When an object is sent to a ractor, it's important to understand whether the
+# object is shareable or unshareable. Most Ruby objects are unshareable objects. Even
+# frozen objects can be unshareable if they contain (through their instance variables) unfrozen
+# objects.
#
-# Shareable objects are basically those which can be used by several threads without compromising
-# thread-safety; e.g. immutable ones. Ractor.shareable? allows to check this, and Ractor.make_shareable
-# tries to make object shareable if it is not.
+# Shareable objects are those which can be used by several ractors at once without compromising
+# thread-safety, for example numbers, +true+ and +false+. Ractor.shareable? allows you to check this,
+# and Ractor.make_shareable tries to make the object shareable if it's not already and gives an error
+# if it can't do it.
#
-# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are
-# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # freeze_string_literals: true
+# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable
+# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true
# Ractor.shareable?('foo'.freeze) #=> true
+# Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen
#
# ary = ['hello', 'world']
# ary.frozen? #=> false
@@ -100,32 +64,32 @@
# ary[0].frozen? #=> true
# ary[1].frozen? #=> true
#
-# When a shareable object is sent (via #send or Ractor.yield), no additional processing happens,
-# and it just becomes usable by both ractors. When an unshareable object is sent, it can be
-# either _copied_ or _moved_. The first is the default, and it makes the object's full copy by
-# deep cloning of non-shareable parts of its structure.
+# When a shareable object is sent via #send, no additional processing occurs
+# on it and it becomes usable by both ractors. When an unshareable object is sent, it can be
+# either _copied_ or _moved_. Copying is the default, and it copies the object fully by
+# deep cloning (Object#clone) the non-shareable parts of its structure.
#
-# data = ['foo', 'bar'.freeze]
+# data = ['foo'.dup, 'bar'.freeze]
# r = Ractor.new do
# data2 = Ractor.receive
# puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}"
# end
# r.send(data)
-# r.take
+# r.join
# puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"
#
-# This will output:
+# This will output something like:
#
-# In ractor: 340, 360, 320
-# Outside : 380, 400, 320
+# In ractor: 8, 16, 24
+# Outside : 32, 40, 24
#
-# (Note that object id of both array and non-frozen string inside array have changed inside
-# the ractor, showing it is different objects. But the second array's element, which is a
-# shareable frozen string, has the same object_id.)
+# Note that the object ids of the array and the non-frozen string inside the array have changed in
+# the ractor because they are different objects. The second array's element, which is a
+# shareable frozen string, is the same object.
#
-# Deep cloning of the objects may be slow, and sometimes impossible. Alternatively,
-# <tt>move: true</tt> may be used on sending. This will <em>move</em> the object to the
-# receiving ractor, making it inaccessible for a sending ractor.
+# Deep cloning of objects may be slow, and sometimes impossible. Alternatively, <tt>move: true</tt> may
+# be used during sending. This will <em>move</em> the unshareable object to the receiving ractor, making it
+# inaccessible to the sending ractor.
#
# data = ['foo', 'bar']
# r = Ractor.new do
@@ -133,7 +97,7 @@
# puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}"
# end
# r.send(data, move: true)
-# r.take
+# r.join
# puts "Outside: moved? #{Ractor::MovedObject === data}"
# puts "Outside: #{data.inspect}"
#
@@ -143,16 +107,17 @@
# Outside: moved? true
# test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
#
-# Notice that even +inspect+ (and more basic methods like <tt>__id__</tt>) is inaccessible
+# Notice that even +inspect+ and more basic methods like <tt>__id__</tt> are inaccessible
# on a moved object.
#
-# Besides frozen objects, there are shareable objects. Class and Module objects are shareable so
-# the Class/Module definitions are shared between ractors. Ractor objects are also shareable objects.
-# All operations for the shareable mutable objects are thread-safe, so the thread-safety property
-# will be kept. We can not define mutable shareable objects in Ruby, but C extensions can introduce them.
+# +Class+ and +Module+ objects are shareable and their class/module definitions are shared between ractors.
+# \Ractor objects are also shareable. All operations on shareable objects are thread-safe across ractors.
+# Defining mutable, shareable objects in Ruby is not possible, but C extensions can introduce them.
#
-# It is prohibited to access instance variables of mutable shareable objects (especially Modules and classes)
-# from ractors other than main:
+# It is prohibited to access (get) instance variables of shareable objects in other ractors if the values of the
+# variables aren't shareable. This can occur because modules/classes are shareable, but they can have
+# instance variables whose values are not. In non-main ractors, it's also prohibited to set instance
+# variables on classes/modules (even if the value is shareable).
#
# class C
# class << self
@@ -160,27 +125,28 @@
# end
# end
#
-# C.tricky = 'test'
+# C.tricky = "unshareable".dup
#
# r = Ractor.new(C) do |cls|
# puts "I see #{cls}"
# puts "I can't see #{cls.tricky}"
+# cls.tricky = true # doesn't get here, but this would also raise an error
# end
-# r.take
+# r.join
# # I see C
# # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
#
-# Ractors can access constants if they are shareable. The main Ractor is the only one that can
+# Ractors can access constants if they are shareable. The main \Ractor is the only one that can
# access non-shareable constants.
#
# GOOD = 'good'.freeze
-# BAD = 'bad'
+# BAD = 'bad'.dup
#
# r = Ractor.new do
# puts "GOOD=#{GOOD}"
# puts "BAD=#{BAD}"
# end
-# r.take
+# r.join
# # GOOD=good
# # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError)
#
@@ -190,7 +156,7 @@
# puts "I see #{C}"
# puts "I can't see #{C.tricky}"
# end
-# r.take
+# r.join
# # I see C
# # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
#
@@ -199,27 +165,28 @@
#
# == Ractors vs threads
#
-# Each ractor creates its own thread. New threads can be created from inside ractor
-# (and, on CRuby, sharing GVL with other threads of this ractor).
+# Each ractor has its own main Thread. New threads can be created from inside ractors
+# (and, on CRuby, they share the GVL with other threads of this ractor).
#
# r = Ractor.new do
# a = 1
# Thread.new {puts "Thread in ractor: a=#{a}"}.join
# end
-# r.take
+# r.join
# # Here "Thread in ractor: a=1" will be printed
#
# == Note on code examples
#
-# In examples below, sometimes we use the following method to wait till ractors that
-# are not currently blocked will finish (or process till next blocking) method.
+# In the examples below, sometimes we use the following method to wait for ractors
+# to make progress or finish.
#
# def wait
# sleep(0.1)
# end
#
-# It is **only for demonstration purposes** and shouldn't be used in a real code.
-# Most of the times, just #take is used to wait till ractor will finish.
+# This is **only for demonstration purposes** and shouldn't be used in a real code.
+# Most of the time, #join is used to wait for ractors to finish and Ractor.receive is used
+# to wait for messages.
#
# == Reference
#
@@ -230,17 +197,17 @@ class Ractor
# call-seq:
# Ractor.new(*args, name: nil) {|*args| block } -> ractor
#
- # Create a new Ractor with args and a block.
+ # Creates a new \Ractor with args and a block.
#
- # A block (Proc) will be isolated (can't access to outer variables). +self+
- # inside the block will refer to the current Ractor.
+ # The given block (Proc) is isolated (can't access any outer variables). +self+
+ # inside the block will refer to the current \Ractor.
#
# r = Ractor.new { puts "Hi, I am #{self.inspect}" }
- # r.take
+ # r.join
# # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"
#
- # +args+ passed to the method would be propagated to block args by the same rules as
- # objects passed through #send/Ractor.receive: if +args+ are not shareable, they
+ # Any +args+ passed are propagated to the block arguments by the same rules as
+ # objects sent via #send/Ractor.receive. If an argument in +args+ is not shareable, it
# will be copied (via deep cloning, which might be inefficient).
#
# arg = [1, 2, 3]
@@ -248,14 +215,14 @@ class Ractor
# r = Ractor.new(arg) {|received_arg|
# puts "Received: #{received_arg} (##{received_arg.object_id})"
# }
- # r.take
+ # r.join
# # Prints:
# # Passing: [1, 2, 3] (#280)
# # Received: [1, 2, 3] (#300)
#
# Ractor's +name+ can be set for debugging purposes:
#
- # r = Ractor.new(name: 'my ractor') {}
+ # r = Ractor.new(name: 'my ractor') {}; r.join
# p r
# #=> #<Ractor:#3 my ractor test.rb:1 terminated>
#
@@ -263,8 +230,8 @@ class Ractor
b = block # TODO: builtin bug
raise ArgumentError, "must be called with a block" unless block
if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)")
- warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \
- "Also there are many implementation issues.", uplevel: 0, category: :experimental)
+ Kernel.warn("Ractor API is experimental and may change in future versions of Ruby.",
+ uplevel: 0, category: :experimental)
end
loc = caller_locations(1, 1).first
loc = "#{loc.path}:#{loc.lineno}"
@@ -280,13 +247,13 @@ class Ractor
}
end
- # Returns total count of Ractors currently running.
+ # Returns the number of ractors currently running or blocking (waiting).
#
# Ractor.count #=> 1
- # r = Ractor.new(name: 'example') { Ractor.yield(1) }
+ # r = Ractor.new(name: 'example') { Ractor.receive }
# Ractor.count #=> 2 (main + example ractor)
- # r.take # wait for Ractor.yield(1)
- # r.take # wait till r will finish
+ # r << 42 # r's Ractor.receive will resume
+ # r.join # wait for r's termination
# Ractor.count #=> 1
def self.count
__builtin_cexpr! %q{
@@ -296,130 +263,91 @@ class Ractor
#
# call-seq:
- # Ractor.select(*ractors, [yield_value:, move: false]) -> [ractor or symbol, obj]
- #
- # Waits for the first ractor to have something in its outgoing port, reads from this ractor, and
- # returns that ractor and the object received.
- #
- # r1 = Ractor.new {Ractor.yield 'from 1'}
- # r2 = Ractor.new {Ractor.yield 'from 2'}
- #
- # r, obj = Ractor.select(r1, r2)
- #
- # puts "received #{obj.inspect} from #{r.inspect}"
- # # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running>
- #
- # If one of the given ractors is the current ractor, and it would be selected, +r+ will contain
- # +:receive+ symbol instead of the ractor object.
- #
- # r1 = Ractor.new(Ractor.current) do |main|
- # main.send 'to main'
- # Ractor.yield 'from 1'
- # end
- # r2 = Ractor.new do
- # Ractor.yield 'from 2'
+ # Ractor.select(*ractors_or_ports) -> [ractor or port, obj]
+ #
+ # Blocks the current Thread until one of the given ports has received a message. Returns an
+ # array of two elements where the first element is the Port and the second is the received object.
+ # This method can also accept Ractor objects themselves, and in that case will wait until one
+ # has terminated and return a two-element array where the first element is the ractor and the
+ # second is its termination value.
+ #
+ # p1, p2 = Ractor::Port.new, Ractor::Port.new
+ # ps = [p1, p2]
+ # rs = 2.times.map do |i|
+ # Ractor.new(ps.shift, i) do |p, i|
+ # sleep rand(0.99)
+ # p.send("r#{i}")
+ # sleep rand(0.99)
+ # "r#{i} done"
+ # end
# end
#
- # r, obj = Ractor.select(r1, r2, Ractor.current)
- # puts "received #{obj.inspect} from #{r.inspect}"
- # # Prints: received "to main" from :receive
- #
- # If +yield_value+ is provided, that value may be yielded if another Ractor is calling #take.
- # In this case, the pair <tt>[:yield, nil]</tt> would be returned:
- #
- # r1 = Ractor.new(Ractor.current) do |main|
- # puts "Received from main: #{main.take}"
+ # waiting_on = [p1, p2, *rs]
+ # until waiting_on.empty?
+ # received_on, obj = Ractor.select(*waiting_on)
+ # waiting_on.delete(received_on)
+ # puts obj
# end
#
- # puts "Trying to select"
- # r, obj = Ractor.select(r1, Ractor.current, yield_value: 123)
- # wait
- # puts "Received #{obj.inspect} from #{r.inspect}"
+ # # r0
+ # # r1
+ # # r1 done
+ # # r0 done
#
- # This will print:
+ # The following example is almost equivalent to <code>ractors.map(&:value)</code> except the thread
+ # is unblocked when any of the ractors has terminated as opposed to waiting for their termination in
+ # the array element order.
#
- # Trying to select
- # Received from main: 123
- # Received nil from :yield
+ # values = []
+ # until ractors.empty?
+ # r, val = Ractor.select(*ractors)
+ # ractors.delete(r)
+ # values << val
+ # end
#
- # +move+ boolean flag defines whether yielded value should be copied (default) or moved.
- def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
- raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?
+ def self.select(*ports)
+ raise ArgumentError, 'specify at least one Ractor::Port or Ractor' if ports.empty?
- __builtin_cstmt! %q{
- const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors);
- VALUE rv;
- VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors),
- yield_unspecified == Qtrue ? Qundef : yield_value,
- (bool)RTEST(move) ? true : false, &rv);
- return rb_ary_new_from_args(2, rv, v);
- }
+ monitors = {} # Ractor::Port => Ractor
+
+ ports = ports.map do |arg|
+ case arg
+ when Ractor
+ port = Ractor::Port.new
+ monitors[port] = arg
+ arg.monitor port
+ port
+ when Ractor::Port
+ arg
+ else
+ raise ArgumentError, "should be Ractor::Port or Ractor"
+ end
+ end
+
+ begin
+ result_port, obj = __builtin_ractor_select_internal(ports)
+
+ if r = monitors[result_port]
+ [r, r.value]
+ else
+ [result_port, obj]
+ end
+ ensure
+ # close all ports for join
+ monitors.each do |port, r|
+ r.unmonitor port
+ port.close
+ end
+ end
end
#
# call-seq:
- # Ractor.receive -> msg
- #
- # Receive an incoming message from the current Ractor's incoming port's queue, which was
- # sent there by #send.
- #
- # r = Ractor.new do
- # v1 = Ractor.receive
- # puts "Received: #{v1}"
- # end
- # r.send('message1')
- # r.take
- # # Here will be printed: "Received: message1"
- #
- # Alternatively, private instance method +receive+ may be used:
- #
- # r = Ractor.new do
- # v1 = receive
- # puts "Received: #{v1}"
- # end
- # r.send('message1')
- # r.take
- # # Here will be printed: "Received: message1"
- #
- # The method blocks if the queue is empty.
- #
- # r = Ractor.new do
- # puts "Before first receive"
- # v1 = Ractor.receive
- # puts "Received: #{v1}"
- # v2 = Ractor.receive
- # puts "Received: #{v2}"
- # end
- # wait
- # puts "Still not received"
- # r.send('message1')
- # wait
- # puts "Still received only one"
- # r.send('message2')
- # r.take
- #
- # Output:
- #
- # Before first receive
- # Still not received
- # Received: message1
- # Still received only one
- # Received: message2
- #
- # If close_incoming was called on the ractor, the method raises Ractor::ClosedError
- # if there are no more messages in incoming queue:
- #
- # Ractor.new do
- # close_incoming
- # receive
- # end
- # wait
- # # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
+ # Ractor.receive -> obj
#
+ # Receives a message from the current ractor's default port.
def self.receive
- __builtin_cexpr! %q{
- ractor_receive(ec, rb_ec_ractor_ptr(ec))
- }
+ Ractor.current.default_port.receive
end
class << self
@@ -428,278 +356,21 @@ class Ractor
# same as Ractor.receive
private def receive
- __builtin_cexpr! %q{
- ractor_receive(ec, rb_ec_ractor_ptr(ec))
- }
+ default_port.receive
end
alias recv receive
#
# call-seq:
- # Ractor.receive_if {|msg| block } -> msg
- #
- # Receive only a specific message.
- #
- # Instead of Ractor.receive, Ractor.receive_if can provide a pattern
- # by a block and you can choose the receiving message.
- #
- # r = Ractor.new do
- # p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3"
- # p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1"
- # p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2"
- # end
- # r << "bar1"
- # r << "baz2"
- # r << "foo3"
- # r.take
- #
- # This will output:
- #
- # foo3
- # bar1
- # baz2
- #
- # If the block returns a truthy value, the message will be removed from the incoming queue
- # and returned.
- # Otherwise, the message remains in the incoming queue and the following received
- # messages are checked by the given block.
- #
- # If there are no messages left in the incoming queue, the method will
- # block until new messages arrive.
- #
- # If the block is escaped by break/return/exception/throw, the message is removed from
- # the incoming queue as if a truthy value had been returned.
- #
- # r = Ractor.new do
- # val = Ractor.receive_if{|msg| msg.is_a?(Array)}
- # puts "Received successfully: #{val}"
- # end
- #
- # r.send(1)
- # r.send('test')
- # wait
- # puts "2 non-matching sent, nothing received"
- # r.send([1, 2, 3])
- # wait
- #
- # Prints:
- #
- # 2 non-matching sent, nothing received
- # Received successfully: [1, 2, 3]
- #
- # Note that you can not call receive/receive_if in the given block recursively.
- # It means that you should not do any tasks in the block.
- #
- # Ractor.current << true
- # Ractor.receive_if{|msg| Ractor.receive}
- # #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
- #
- def self.receive_if &b
- Primitive.ractor_receive_if b
- end
-
- private def receive_if &b
- Primitive.ractor_receive_if b
- end
-
- #
- # call-seq:
- # ractor.send(msg, move: false) -> self
- #
- # Send a message to a Ractor's incoming queue to be consumed by Ractor.receive.
- #
- # r = Ractor.new do
- # value = Ractor.receive
- # puts "Received #{value}"
- # end
- # r.send 'message'
- # # Prints: "Received: message"
- #
- # The method is non-blocking (will return immediately even if the ractor is not ready
- # to receive anything):
+ # ractor.send(msg, move: false) -> self
#
- # r = Ractor.new {sleep(5)}
- # r.send('test')
- # puts "Sent successfully"
- # # Prints: "Sent successfully" immediately
- #
- # Attempt to send to ractor which already finished its execution will raise Ractor::ClosedError.
- #
- # r = Ractor.new {}
- # r.take
- # p r
- # # "#<Ractor:#6 (irb):23 terminated>"
- # r.send('test')
- # # Ractor::ClosedError (The incoming-port is already closed)
- #
- # If close_incoming was called on the ractor, the method also raises Ractor::ClosedError.
- #
- # r = Ractor.new do
- # sleep(500)
- # receive
- # end
- # r.close_incoming
- # r.send('test')
- # # Ractor::ClosedError (The incoming-port is already closed)
- # # The error would be raised immediately, not when ractor will try to receive
- #
- # If the +obj+ is unshareable, by default it would be copied into ractor by deep cloning.
- # If the <tt>move: true</tt> is passed, object is _moved_ into ractor and becomes
- # inaccessible to sender.
- #
- # r = Ractor.new {puts "Received: #{receive}"}
- # msg = 'message'
- # r.send(msg, move: true)
- # r.take
- # p msg
- #
- # This prints:
- #
- # Received: message
- # in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>
- #
- # All references to the object and its parts will become invalid in sender.
- #
- # r = Ractor.new {puts "Received: #{receive}"}
- # s = 'message'
- # ary = [s]
- # copy = ary.dup
- # r.send(ary, move: true)
- #
- # s.inspect
- # # Ractor::MovedError (can not send any methods to a moved object)
- # ary.class
- # # Ractor::MovedError (can not send any methods to a moved object)
- # copy.class
- # # => Array, it is different object
- # copy[0].inspect
- # # Ractor::MovedError (can not send any methods to a moved object)
- # # ...but its item was still a reference to `s`, which was moved
- #
- # If the object was shareable, <tt>move: true</tt> has no effect on it:
- #
- # r = Ractor.new {puts "Received: #{receive}"}
- # s = 'message'.freeze
- # r.send(s, move: true)
- # s.inspect #=> "message", still available
- #
- def send(obj, move: false)
- __builtin_cexpr! %q{
- ractor_send(ec, RACTOR_PTR(self), obj, move)
- }
+ # This is equivalent to Port#send to the ractor's #default_port.
+ def send(...)
+ default_port.send(...)
+ self
end
alias << send
- #
- # call-seq:
- # Ractor.yield(msg, move: false) -> nil
- #
- # Send a message to the current ractor's outgoing port to be consumed by #take.
- #
- # r = Ractor.new {Ractor.yield 'Hello from ractor'}
- # puts r.take
- # # Prints: "Hello from ractor"
- #
- # The method is blocking, and will return only when somebody consumes the
- # sent message.
- #
- # r = Ractor.new do
- # Ractor.yield 'Hello from ractor'
- # puts "Ractor: after yield"
- # end
- # wait
- # puts "Still not taken"
- # puts r.take
- #
- # This will print:
- #
- # Still not taken
- # Hello from ractor
- # Ractor: after yield
- #
- # If the outgoing port was closed with #close_outgoing, the method will raise:
- #
- # r = Ractor.new do
- # close_outgoing
- # Ractor.yield 'Hello from ractor'
- # end
- # wait
- # # `yield': The outgoing-port is already closed (Ractor::ClosedError)
- #
- # The meaning of +move+ argument is the same as for #send.
- def self.yield(obj, move: false)
- __builtin_cexpr! %q{
- ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
- }
- end
-
- #
- # call-seq:
- # ractor.take -> msg
- #
- # Take a message from ractor's outgoing port, which was put there by Ractor.yield or at ractor's
- # finalization.
- #
- # r = Ractor.new do
- # Ractor.yield 'explicit yield'
- # 'last value'
- # end
- # puts r.take #=> 'explicit yield'
- # puts r.take #=> 'last value'
- # puts r.take # Ractor::ClosedError (The outgoing-port is already closed)
- #
- # The fact that the last value is also put to outgoing port means that +take+ can be used
- # as some analog of Thread#join ("just wait till ractor finishes"), but don't forget it
- # will raise if somebody had already consumed everything ractor have produced.
- #
- # If the outgoing port was closed with #close_outgoing, the method will raise Ractor::ClosedError.
- #
- # r = Ractor.new do
- # sleep(500)
- # Ractor.yield 'Hello from ractor'
- # end
- # r.close_outgoing
- # r.take
- # # Ractor::ClosedError (The outgoing-port is already closed)
- # # The error would be raised immediately, not when ractor will try to receive
- #
- # If an uncaught exception is raised in the Ractor, it is propagated on take as a
- # Ractor::RemoteError.
- #
- # r = Ractor.new {raise "Something weird happened"}
- #
- # begin
- # r.take
- # rescue => e
- # p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
- # p e.ractor == r # => true
- # p e.cause # => #<RuntimeError: Something weird happened>
- # end
- #
- # Ractor::ClosedError is a descendant of StopIteration, so the closing of the ractor will break
- # the loops without propagating the error:
- #
- # r = Ractor.new do
- # 3.times {|i| Ractor.yield "message #{i}"}
- # "finishing"
- # end
- #
- # loop {puts "Received: " + r.take}
- # puts "Continue successfully"
- #
- # This will print:
- #
- # Received: message 0
- # Received: message 1
- # Received: message 2
- # Received: finishing
- # Continue successfully
- def take
- __builtin_cexpr! %q{
- ractor_take(ec, RACTOR_PTR(self))
- }
- end
-
def inspect
loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
@@ -712,51 +383,25 @@ class Ractor
alias to_s inspect
- # The name set in Ractor.new, or +nil+.
+ # Returns the name set in Ractor.new, or +nil+.
def name
__builtin_cexpr! %q{RACTOR_PTR(self)->name}
end
class RemoteError
+ # The Ractor in which the uncaught exception was raised.
attr_reader :ractor
end
#
# call-seq:
- # ractor.close_incoming -> true | false
- #
- # Closes the incoming port and returns its previous state.
- # All further attempts to Ractor.receive in the ractor, and #send to the ractor
- # will fail with Ractor::ClosedError.
- #
- # r = Ractor.new {sleep(500)}
- # r.close_incoming #=> false
- # r.close_incoming #=> true
- # r.send('test')
- # # Ractor::ClosedError (The incoming-port is already closed)
- def close_incoming
- __builtin_cexpr! %q{
- ractor_close_incoming(ec, RACTOR_PTR(self));
- }
- end
-
+ # ractor.close -> true | false
#
- # call-seq:
- # ractor.close_outgoing -> true | false
- #
- # Closes the outgoing port and returns its previous state.
- # All further attempts to Ractor.yield in the ractor, and #take from the ractor
- # will fail with Ractor::ClosedError.
- #
- # r = Ractor.new {sleep(500)}
- # r.close_outgoing #=> false
- # r.close_outgoing #=> true
- # r.take
- # # Ractor::ClosedError (The outgoing-port is already closed)
- def close_outgoing
- __builtin_cexpr! %q{
- ractor_close_outgoing(ec, RACTOR_PTR(self));
- }
+ # Closes the default port. Closing a port is allowed only by the ractor which created the port.
+ # Therefore, the receiver must be the current ractor.
+ #
+ def close
+ default_port.close
end
#
@@ -765,11 +410,11 @@ class Ractor
#
# Checks if the object is shareable by ractors.
#
- # Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are frozen
- # Ractor.shareable?('foo') #=> false, unless the string is frozen due to # freeze_string_literals: true
+ # Ractor.shareable?(1) #=> true -- numbers are shareable
+ # Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true
# Ractor.shareable?('foo'.freeze) #=> true
#
- # See also the "Shareable and unshareable objects" section in the Ractor class docs.
+ # See also the "Shareable and unshareable objects" section in the \Ractor class docs.
def self.shareable? obj
__builtin_cexpr! %q{
RBOOL(rb_ractor_shareable_p(obj));
@@ -780,13 +425,13 @@ class Ractor
# call-seq:
# Ractor.make_shareable(obj, copy: false) -> shareable_obj
#
- # Make +obj+ shareable between ractors.
+ # Makes +obj+ shareable between ractors.
#
# +obj+ and all the objects it refers to will be frozen, unless they are
# already shareable.
#
- # If +copy+ keyword is +true+, the method will copy objects before freezing them
- # This is safer option but it can take be slower.
+ # If +copy+ keyword is +true+, it will copy objects before freezing them, and will not
+ # modify +obj+ or its internal objects.
#
# Note that the specification and implementation of this method are not
# mature and may be changed in the future.
@@ -823,20 +468,367 @@ class Ractor
end
end
- # get a value from ractor-local storage
+ # Gets a value from ractor-local storage for the current Ractor.
+ # Obsolete, use Ractor.[] instead.
def [](sym)
+ if (self != Ractor.current)
+ raise RuntimeError, "Cannot get ractor local storage for non-current ractor"
+ end
Primitive.ractor_local_value(sym)
end
- # set a value in ractor-local storage
+ # Sets a value in ractor-local storage for the current Ractor.
+ # Obsolete, use Ractor.[]= instead.
def []=(sym, val)
+ if (self != Ractor.current)
+ raise RuntimeError, "Cannot set ractor local storage for non-current ractor"
+ end
+ Primitive.ractor_local_value_set(sym, val)
+ end
+
+ # Gets a value from ractor-local storage for the current Ractor.
+ def self.[](sym)
+ Primitive.ractor_local_value(sym)
+ end
+
+ # Sets a value in ractor-local storage for the current Ractor.
+ def self.[]=(sym, val)
Primitive.ractor_local_value_set(sym, val)
end
- # returns main ractor
+ # call-seq:
+ # Ractor.store_if_absent(key){ init_block }
+ #
+ # If the corresponding ractor-local value is not set, yields a value with
+ # init_block and stores the value in a thread-safe manner.
+ # This method returns the stored value.
+ #
+ # (1..10).map{
+ # Thread.new(it){|i|
+ # Ractor.store_if_absent(:s){ f(); i }
+ # #=> return stored value of key :s
+ # }
+ # }.map(&:value).uniq.size #=> 1 and f() is called only once
+ #
+ def self.store_if_absent(sym)
+ Primitive.attr! :use_block
+ Primitive.ractor_local_value_store_if_absent(sym)
+ end
+
+ # Returns the main ractor.
def self.main
__builtin_cexpr! %q{
rb_ractor_self(GET_VM()->ractor.main_ractor);
}
end
+
+ # Returns true if the current ractor is the main ractor.
+ def self.main?
+ __builtin_cexpr! %q{
+ RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec))
+ }
+ end
+
+ # internal method
+ def self._require feature # :nodoc:
+ if main?
+ super feature
+ else
+ Primitive.ractor_require feature
+ end
+ end
+
+ class << self
+ private
+
+ # internal method that is called when the first "Ractor.new" is called
+ def _activated # :nodoc:
+ Kernel.prepend Module.new{|m|
+ m.set_temporary_name '<RactorRequire>'
+
+ def require feature # :nodoc: -- otherwise RDoc outputs it as a class method
+ if Ractor.main?
+ super
+ else
+ Ractor._require feature
+ end
+ end
+ }
+ end
+ end
+
+ #
+ # call-seq:
+ # ractor.default_port -> port object
+ #
+ # Returns the default port of the Ractor.
+ #
+ def default_port
+ __builtin_cexpr! %q{
+ ractor_default_port_value(RACTOR_PTR(self))
+ }
+ end
+
+ #
+ # call-seq:
+ # ractor.join -> self
+ #
+ # Waits for the termination of the Ractor.
+ # If the Ractor was aborted (terminated by an unhandled exception),
+ # the exception is raised in the current ractor.
+ #
+ # Ractor.new{}.join #=> ractor
+ #
+ # Ractor.new{ raise "foo" }.join
+ # #=> raises the exception "foo (RuntimeError)"
+ #
+ def join
+ port = Port.new
+
+ self.monitor port
+ if port.receive == :aborted
+ __builtin_ractor_value
+ end
+
+ self
+ ensure
+ port.close
+ end
+
+ #
+ # call-seq:
+ # ractor.value -> obj
+ #
+ # Waits for +ractor+ to complete and returns its value or raises the exception
+ # which terminated the Ractor. The termination value will be moved to the calling
+ # Ractor. Therefore, at most 1 Ractor can receive another ractor's termination value.
+ #
+ # r = Ractor.new{ [1, 2] }
+ # r.value #=> [1, 2] (unshareable object)
+ #
+ # Ractor.new(r){|r| r.value} #=> Ractor::Error
+ #
+ def value
+ self.join
+ __builtin_ractor_value
+ end
+
+ #
+ # call-seq:
+ # ractor.monitor(port) -> self
+ #
+ # Registers the port as a monitoring port for this ractor. When the ractor terminates,
+ # the port receives a Symbol object.
+ #
+ # * +:exited+ is sent if the ractor terminates without an unhandled exception.
+ # * +:aborted+ is sent if the ractor terminates by an unhandled exception.
+ #
+ # r = Ractor.new{ some_task() }
+ # r.monitor(port = Ractor::Port.new)
+ # port.receive #=> :exited and r is terminated
+ #
+ # r = Ractor.new{ raise "foo" }
+ # r.monitor(port = Ractor::Port.new)
+ # port.receive #=> :aborted and r is terminated by the RuntimeError "foo"
+ #
+ def monitor port
+ __builtin_ractor_monitor(port)
+ end
+
+ #
+ # call-seq:
+ # ractor.unmonitor(port) -> self
+ #
+ # Unregisters the port from the monitoring ports for this ractor.
+ #
+ def unmonitor port
+ __builtin_ractor_unmonitor(port)
+ end
+
+ #
+ # call-seq:
+ # Ractor.shareable_proc(self: nil){} -> shareable proc
+ #
+ # Returns a shareable copy of the given block's Proc. The value of +self+
+ # in the Proc will be replaced with the value passed via the `self:` keyword,
+ # or +nil+ if not given.
+ #
+ # In a shareable Proc, access to any outer variables is prohibited.
+ #
+ # a = 42
+ # Ractor.shareable_proc{ p a }
+ # #=> can not isolate a Proc because it accesses outer variables (a). (ArgumentError)
+ #
+ # The value of `self` in the Proc must be a shareable object.
+ #
+ # Ractor.shareable_proc(self: self){}
+ # #=> self should be shareable: main (Ractor::IsolationError)
+ #
+ def self.shareable_proc self: nil
+ Primitive.attr! :use_block
+
+ __builtin_cexpr!(%Q{
+ ractor_shareable_proc(ec, *LOCAL_PTR(self), false)
+ })
+ end
+
+ #
+ # call-seq:
+ # Ractor.shareable_lambda(self: nil){} -> shareable lambda
+ #
+ # Same as Ractor.shareable_proc, but returns a lambda Proc.
+ #
+ def self.shareable_lambda self: nil
+ Primitive.attr! :use_block
+
+ __builtin_cexpr!(%Q{
+ ractor_shareable_proc(ec, *LOCAL_PTR(self), true)
+ })
+ end
+
+ # \Port objects transmit messages between Ractors.
+ class Port
+ #
+ # call-seq:
+ # port.receive -> msg
+ #
+ # Receives a message from the port (which was sent there by Port#send). Only the ractor
+ # that created the port can receive messages this way.
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new port do |port|
+ # port.send('message1')
+ # end
+ #
+ # v1 = port.receive
+ # puts "Received: #{v1}"
+ # r.join
+ # # This will print: "Received: message1"
+ #
+ # The method blocks the current Thread if the message queue is empty.
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new port do |port|
+ # wait
+ # puts "Still not received"
+ # port.send('message1')
+ # wait
+ # puts "Still received only one"
+ # port.send('message2')
+ # end
+ # puts "Before first receive"
+ # v1 = port.receive
+ # puts "Received: #{v1}"
+ # v2 = port.receive
+ # puts "Received: #{v2}"
+ # r.join
+ #
+ # Output:
+ #
+ # Before first receive
+ # Still not received
+ # Received: message1
+ # Still received only one
+ # Received: message2
+ #
+ # If the port is closed and there are no more messages in the message queue,
+ # the method raises Ractor::ClosedError.
+ #
+ # port = Ractor::Port.new
+ # port.close
+ # port.receive #=> raise Ractor::ClosedError
+ #
+ def receive
+ __builtin_cexpr! %q{
+ ractor_port_receive(ec, self)
+ }
+ end
+
+ #
+ # call-seq:
+ # port.send(msg, move: false) -> self
+ #
+ # Sends a message to the port to be accepted by port.receive.
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new(port) do |port|
+ # port.send 'message'
+ # end
+ # value = port.receive
+ # puts "Received #{value}"
+ # # Prints: "Received: message"
+ #
+ # The method is non-blocking (it will return immediately even if the ractor that created the port is not ready
+ # to receive anything):
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new(port) do |port|
+ # port.send 'test'
+ # puts "Sent successfully"
+ # # Prints: "Sent successfully" immediately
+ # end
+ #
+ # An attempt to send to a closed port will raise Ractor::ClosedError.
+ #
+ # r = Ractor.new {Ractor::Port.new}
+ # r.join
+ # p r
+ # # "#<Ractor:#6 (irb):23 terminated>"
+ # port = r.value
+ # port.send('test') # raise Ractor::ClosedError
+ #
+ # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning.
+ #
+ # If the object is shareable, a reference to the object will be sent to the receiving ractor.
+ #
+ def send obj, move: false
+ __builtin_cexpr! %q{
+ ractor_port_send(ec, self, obj, move)
+ }
+ end
+
+ alias << send
+
+ #
+ # call-seq:
+ # port.close
+ #
+ # Closes the port. Sending to a closed port is prohibited.
+ # Receiving is also prohibited if there are no messages in its message queue.
+ #
+ # Only the Ractor which created the port is allowed to close it.
+ #
+ # port = Ractor::Port.new
+ # Ractor.new port do |port|
+ # port.close #=> closing port by other ractors is not allowed (Ractor::Error)
+ # end.join
+ #
+ def close
+ __builtin_cexpr! %q{
+ ractor_port_close(ec, self)
+ }
+ end
+
+ #
+ # call-seq:
+ # port.closed? -> true/false
+ #
+ # Returns whether or not the port is closed.
+ def closed?
+ __builtin_cexpr! %q{
+ ractor_port_closed_p(ec, self);
+ }
+ end
+
+ #
+ # call-seq:
+ # port.inspect -> string
+ def inspect
+ "#<Ractor::Port to:\##{
+ __builtin_cexpr! "SIZET2NUM(rb_ractor_id((RACTOR_PORT_PTR(self)->r)))"
+ } id:#{
+ __builtin_cexpr! "SIZET2NUM(ractor_port_id(RACTOR_PORT_PTR(self)))"
+ }>"
+ end
+ end
end