summaryrefslogtreecommitdiff
path: root/ractor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.rb')
-rw-r--r--ractor.rb921
1 files changed, 415 insertions, 506 deletions
diff --git a/ractor.rb b/ractor.rb
index 3b649f042f..ac04237fc6 100644
--- a/ractor.rb
+++ b/ractor.rb
@@ -1,17 +1,15 @@
-# \Ractor is an Actor-model abstraction for Ruby that provides thread-safe parallel execution.
-#
-# Ractor.new makes a new \Ractor, which can run in parallel.
+# Ractor.new creates a new \Ractor, which can run in parallel with other ractors.
#
# # The simplest ractor
# r = Ractor.new {puts "I am in Ractor!"}
-# r.take # wait for it to finish
+# r.join # wait for it to finish
# # Here, "I am in Ractor!" is printed
#
# Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety
# concerns such as data-races and race-conditions are not possible. The other benefit is parallelism.
#
# To achieve this, object sharing is limited across ractors.
-# For example, unlike in threads, ractors can't access all the objects available in other ractors. Even objects normally
+# Unlike in threads, ractors can't access all the objects available in other ractors. For example, objects normally
# available through variables in the outer scope are prohibited from being used across ractors.
#
# a = 1
@@ -23,77 +21,34 @@
# a = 1
# r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
#
-# On CRuby (the default implementation), Global Virtual Machine Lock (GVL) is held per ractor, so
-# ractors can perform in parallel without locking each other. This is unlike the situation with threads
-# on CRuby.
+# On CRuby (the default implementation), the Global Virtual Machine Lock (GVL) is held per ractor, so
+# ractors can run in parallel. This is unlike the situation with threads on CRuby.
#
# Instead of accessing shared state, objects should be passed to and from ractors by
# sending and receiving them as messages.
#
# a = 1
# r = Ractor.new do
-# a_in_ractor = receive # receive blocks until somebody passes a message
+# a_in_ractor = receive # receive blocks the Thread until our default port gets sent a message
# puts "I am in Ractor! a=#{a_in_ractor}"
# end
# r.send(a) # pass it
-# r.take
+# r.join
# # Here, "I am in Ractor! a=1" is printed
#
-# There are two pairs of methods for sending/receiving messages:
-#
-# * Ractor#send and Ractor.receive for when the _sender_ knows the receiver (push);
-# * Ractor.yield and Ractor#take for when the _receiver_ knows the sender (pull);
-#
# In addition to that, any arguments passed to Ractor.new are passed to the block and available there
-# as if received by Ractor.receive, and the last block value is sent outside of the
-# ractor as if sent by Ractor.yield.
-#
-# A little demonstration of a classic ping-pong:
-#
-# server = Ractor.new(name: "server") do
-# puts "Server starts: #{self.inspect}"
-# puts "Server sends: ping"
-# Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested
-# received = Ractor.receive # The server doesn't know the sender and receives from whoever sent
-# puts "Server received: #{received}"
-# end
-#
-# client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv
-# puts "Client starts: #{self.inspect}"
-# received = srv.take # The client takes a message from the server
-# puts "Client received from " \
-# "#{srv.inspect}: #{received}"
-# puts "Client sends to " \
-# "#{srv.inspect}: pong"
-# srv.send 'pong' # The client sends a message to the server
-# end
-#
-# [client, server].each(&:take) # Wait until they both finish
-#
-# This will output something like:
-#
-# Server starts: #<Ractor:#2 server test.rb:1 running>
-# Server sends: ping
-# Client starts: #<Ractor:#3 test.rb:8 running>
-# Client received from #<Ractor:#2 server test.rb:1 blocking>: ping
-# Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong
-# Server received: pong
-#
-# Ractors receive their messages via the <em>incoming port</em>, and send them
-# to the <em>outgoing port</em>. Either one can be disabled with Ractor#close_incoming and
-# Ractor#close_outgoing, respectively. When a ractor terminates, its ports are closed
-# automatically.
+# as if received by Ractor.receive, and the last block value can be received with Ractor#value.
#
# == Shareable and unshareable objects
#
-# When an object is sent to and from a ractor, it's important to understand whether the
+# When an object is sent to a ractor, it's important to understand whether the
# object is shareable or unshareable. Most Ruby objects are unshareable objects. Even
# frozen objects can be unshareable if they contain (through their instance variables) unfrozen
# objects.
#
-# Shareable objects are those which can be used by several threads without compromising
+# Shareable objects are those which can be used by several ractors at once without compromising
# thread-safety, for example numbers, +true+ and +false+. Ractor.shareable? allows you to check this,
-# and Ractor.make_shareable tries to make the object shareable if it's not already, and gives an error
+# and Ractor.make_shareable tries to make the object shareable if it's not already and gives an error
# if it can't do it.
#
# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable
@@ -109,24 +64,24 @@
# ary[0].frozen? #=> true
# ary[1].frozen? #=> true
#
-# When a shareable object is sent (via #send or Ractor.yield), no additional processing occurs
-# on it. It just becomes usable by both ractors. When an unshareable object is sent, it can be
-# either _copied_ or _moved_. The first is the default, and it copies the object fully by
+# When a shareable object is sent via #send, no additional processing occurs
+# on it and it becomes usable by both ractors. When an unshareable object is sent, it can be
+# either _copied_ or _moved_. Copying is the default, and it copies the object fully by
# deep cloning (Object#clone) the non-shareable parts of its structure.
#
-# data = ['foo', 'bar'.freeze]
+# data = ['foo'.dup, 'bar'.freeze]
# r = Ractor.new do
# data2 = Ractor.receive
# puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}"
# end
# r.send(data)
-# r.take
+# r.join
# puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}"
#
# This will output something like:
#
-# In ractor: 340, 360, 320
-# Outside : 380, 400, 320
+# In ractor: 8, 16, 24
+# Outside : 32, 40, 24
#
# Note that the object ids of the array and the non-frozen string inside the array have changed in
# the ractor because they are different objects. The second array's element, which is a
@@ -142,7 +97,7 @@
# puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}"
# end
# r.send(data, move: true)
-# r.take
+# r.join
# puts "Outside: moved? #{Ractor::MovedObject === data}"
# puts "Outside: #{data.inspect}"
#
@@ -152,12 +107,12 @@
# Outside: moved? true
# test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
#
-# Notice that even +inspect+ (and more basic methods like <tt>__id__</tt>) is inaccessible
+# Notice that even +inspect+ and more basic methods like <tt>__id__</tt> are inaccessible
# on a moved object.
#
-# +Class+ and +Module+ objects are shareable so the class/module definitions are shared between ractors.
-# \Ractor objects are also shareable. All operations on shareable objects are thread-safe, so the thread-safety property
-# will be kept. We can not define mutable shareable objects in Ruby, but C extensions can introduce them.
+# +Class+ and +Module+ objects are shareable and their class/module definitions are shared between ractors.
+# \Ractor objects are also shareable. All operations on shareable objects are thread-safe across ractors.
+# Defining mutable, shareable objects in Ruby is not possible, but C extensions can introduce them.
#
# It is prohibited to access (get) instance variables of shareable objects in other ractors if the values of the
# variables aren't shareable. This can occur because modules/classes are shareable, but they can have
@@ -177,7 +132,7 @@
# puts "I can't see #{cls.tricky}"
# cls.tricky = true # doesn't get here, but this would also raise an error
# end
-# r.take
+# r.join
# # I see C
# # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
#
@@ -191,7 +146,7 @@
# puts "GOOD=#{GOOD}"
# puts "BAD=#{BAD}"
# end
-# r.take
+# r.join
# # GOOD=good
# # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError)
#
@@ -201,7 +156,7 @@
# puts "I see #{C}"
# puts "I can't see #{C.tricky}"
# end
-# r.take
+# r.join
# # I see C
# # can not access instance variables of classes/modules from non-main Ractors (RuntimeError)
#
@@ -217,20 +172,21 @@
# a = 1
# Thread.new {puts "Thread in ractor: a=#{a}"}.join
# end
-# r.take
+# r.join
# # Here "Thread in ractor: a=1" will be printed
#
# == Note on code examples
#
-# In the examples below, sometimes we use the following method to wait for ractors that
-# are not currently blocked to finish (or to make progress).
+# In the examples below, sometimes we use the following method to wait for ractors
+# to make progress or finish.
#
# def wait
# sleep(0.1)
# end
#
-# It is **only for demonstration purposes** and shouldn't be used in a real code.
-# Most of the time, #take is used to wait for ractors to finish.
+# This is **only for demonstration purposes** and shouldn't be used in a real code.
+# Most of the time, #join is used to wait for ractors to finish and Ractor.receive is used
+# to wait for messages.
#
# == Reference
#
@@ -241,13 +197,13 @@ class Ractor
# call-seq:
# Ractor.new(*args, name: nil) {|*args| block } -> ractor
#
- # Create a new \Ractor with args and a block.
+ # Creates a new \Ractor with args and a block.
#
- # The given block (Proc) will be isolated (can't access any outer variables). +self+
+ # The given block (Proc) is isolated (can't access any outer variables). +self+
# inside the block will refer to the current \Ractor.
#
# r = Ractor.new { puts "Hi, I am #{self.inspect}" }
- # r.take
+ # r.join
# # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>"
#
# Any +args+ passed are propagated to the block arguments by the same rules as
@@ -259,14 +215,14 @@ class Ractor
# r = Ractor.new(arg) {|received_arg|
# puts "Received: #{received_arg} (##{received_arg.object_id})"
# }
- # r.take
+ # r.join
# # Prints:
# # Passing: [1, 2, 3] (#280)
# # Received: [1, 2, 3] (#300)
#
# Ractor's +name+ can be set for debugging purposes:
#
- # r = Ractor.new(name: 'my ractor') {}; r.take
+ # r = Ractor.new(name: 'my ractor') {}; r.join
# p r
# #=> #<Ractor:#3 my ractor test.rb:1 terminated>
#
@@ -274,8 +230,8 @@ class Ractor
b = block # TODO: builtin bug
raise ArgumentError, "must be called with a block" unless block
if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)")
- Kernel.warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \
- "Also there are many implementation issues.", uplevel: 0, category: :experimental)
+ Kernel.warn("Ractor API is experimental and may change in future versions of Ruby.",
+ uplevel: 0, category: :experimental)
end
loc = caller_locations(1, 1).first
loc = "#{loc.path}:#{loc.lineno}"
@@ -291,13 +247,13 @@ class Ractor
}
end
- # Returns the number of Ractors currently running or blocking (waiting).
+ # Returns the number of ractors currently running or blocking (waiting).
#
# Ractor.count #=> 1
- # r = Ractor.new(name: 'example') { Ractor.yield(1) }
+ # r = Ractor.new(name: 'example') { Ractor.receive }
# Ractor.count #=> 2 (main + example ractor)
- # r.take # wait for Ractor.yield(1)
- # r.take # wait until r will finish
+ # r << 42 # r's Ractor.receive will resume
+ # r.join # wait for r's termination
# Ractor.count #=> 1
def self.count
__builtin_cexpr! %q{
@@ -307,130 +263,91 @@ class Ractor
#
# call-seq:
- # Ractor.select(*ractors, [yield_value:, move: false]) -> [ractor or symbol, obj]
- #
- # Wait for any ractor to have something in its outgoing port, read from this ractor, and
- # then return that ractor and the object received.
- #
- # r1 = Ractor.new {Ractor.yield 'from 1'}
- # r2 = Ractor.new {Ractor.yield 'from 2'}
- #
- # r, obj = Ractor.select(r1, r2)
- #
- # puts "received #{obj.inspect} from #{r.inspect}"
- # # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running>
- # # But could just as well print "from r2" here, either prints could be first.
- #
- # If one of the given ractors is the current ractor, and it is selected, +r+ will contain
- # the +:receive+ symbol instead of the ractor object.
- #
- # r1 = Ractor.new(Ractor.current) do |main|
- # main.send 'to main'
- # Ractor.yield 'from 1'
- # end
- # r2 = Ractor.new do
- # Ractor.yield 'from 2'
+ # Ractor.select(*ractors_or_ports) -> [ractor or port, obj]
+ #
+ # Blocks the current Thread until one of the given ports has received a message. Returns an
+ # array of two elements where the first element is the Port and the second is the received object.
+ # This method can also accept Ractor objects themselves, and in that case will wait until one
+ # has terminated and return a two-element array where the first element is the ractor and the
+ # second is its termination value.
+ #
+ # p1, p2 = Ractor::Port.new, Ractor::Port.new
+ # ps = [p1, p2]
+ # rs = 2.times.map do |i|
+ # Ractor.new(ps.shift, i) do |p, i|
+ # sleep rand(0.99)
+ # p.send("r#{i}")
+ # sleep rand(0.99)
+ # "r#{i} done"
+ # end
# end
#
- # r, obj = Ractor.select(r1, r2, Ractor.current)
- # puts "received #{obj.inspect} from #{r.inspect}"
- # # Could print: received "to main" from :receive
- #
- # If +yield_value+ is provided, that value may be yielded if another ractor is calling #take.
- # In this case, the pair <tt>[:yield, nil]</tt> is returned:
- #
- # r1 = Ractor.new(Ractor.current) do |main|
- # puts "Received from main: #{main.take}"
+ # waiting_on = [p1, p2, *rs]
+ # until waiting_on.empty?
+ # received_on, obj = Ractor.select(*waiting_on)
+ # waiting_on.delete(received_on)
+ # puts obj
# end
#
- # puts "Trying to select"
- # r, obj = Ractor.select(r1, Ractor.current, yield_value: 123)
- # wait
- # puts "Received #{obj.inspect} from #{r.inspect}"
+ # # r0
+ # # r1
+ # # r1 done
+ # # r0 done
#
- # This will print:
+ # The following example is almost equivalent to <code>ractors.map(&:value)</code> except the thread
+ # is unblocked when any of the ractors has terminated as opposed to waiting for their termination in
+ # the array element order.
#
- # Trying to select
- # Received from main: 123
- # Received nil from :yield
+ # values = []
+ # until ractors.empty?
+ # r, val = Ractor.select(*ractors)
+ # ractors.delete(r)
+ # values << val
+ # end
#
- # +move+ boolean flag defines whether yielded value will be copied (default) or moved.
- def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
- raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?
-
- if ractors.delete Ractor.current
- do_receive = true
- else
- do_receive = false
+ def self.select(*ports)
+ raise ArgumentError, 'specify at least one Ractor::Port or Ractor' if ports.empty?
+
+ monitors = {} # Ractor::Port => Ractor
+
+ ports = ports.map do |arg|
+ case arg
+ when Ractor
+ port = Ractor::Port.new
+ monitors[port] = arg
+ arg.monitor port
+ port
+ when Ractor::Port
+ arg
+ else
+ raise ArgumentError, "should be Ractor::Port or Ractor"
+ end
end
- __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move
+ begin
+ result_port, obj = __builtin_ractor_select_internal(ports)
+
+ if r = monitors[result_port]
+ [r, r.value]
+ else
+ [result_port, obj]
+ end
+ ensure
+ # close all ports for join
+ monitors.each do |port, r|
+ r.unmonitor port
+ port.close
+ end
+ end
end
#
# call-seq:
- # Ractor.receive -> msg
- #
- # Receive a message from the incoming port of the current ractor (which was
- # sent there by #send from another ractor).
- #
- # r = Ractor.new do
- # v1 = Ractor.receive
- # puts "Received: #{v1}"
- # end
- # r.send('message1')
- # r.take
- # # Here will be printed: "Received: message1"
- #
- # Alternatively, the private instance method +receive+ may be used:
- #
- # r = Ractor.new do
- # v1 = receive
- # puts "Received: #{v1}"
- # end
- # r.send('message1')
- # r.take
- # # This prints: "Received: message1"
- #
- # The method blocks if the queue is empty.
- #
- # r = Ractor.new do
- # puts "Before first receive"
- # v1 = Ractor.receive
- # puts "Received: #{v1}"
- # v2 = Ractor.receive
- # puts "Received: #{v2}"
- # end
- # wait
- # puts "Still not received"
- # r.send('message1')
- # wait
- # puts "Still received only one"
- # r.send('message2')
- # r.take
- #
- # Output:
- #
- # Before first receive
- # Still not received
- # Received: message1
- # Still received only one
- # Received: message2
- #
- # If close_incoming was called on the ractor, the method raises Ractor::ClosedError
- # if there are no more messages in the incoming queue:
- #
- # Ractor.new do
- # close_incoming
- # receive
- # end
- # wait
- # # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError)
+ # Ractor.receive -> obj
#
+ # Receives a message from the current ractor's default port.
def self.receive
- __builtin_cexpr! %q{
- ractor_receive(ec, rb_ec_ractor_ptr(ec))
- }
+ Ractor.current.default_port.receive
end
class << self
@@ -439,280 +356,21 @@ class Ractor
# same as Ractor.receive
private def receive
- __builtin_cexpr! %q{
- ractor_receive(ec, rb_ec_ractor_ptr(ec))
- }
+ default_port.receive
end
alias recv receive
#
# call-seq:
- # Ractor.receive_if {|msg| block } -> msg
- #
- # Receive only a specific message.
- #
- # Instead of Ractor.receive, Ractor.receive_if can be given a pattern (or any
- # filter) in a block and you can choose the messages to accept that are available in
- # your ractor's incoming queue.
- #
- # r = Ractor.new do
- # p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3"
- # p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1"
- # p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2"
- # end
- # r << "bar1"
- # r << "baz2"
- # r << "foo3"
- # r.take
- #
- # This will output:
- #
- # foo3
- # bar1
- # baz2
- #
- # If the block returns a truthy value, the message is removed from the incoming queue
- # and returned.
- # Otherwise, the message remains in the incoming queue and the next messages are checked
- # by the given block.
- #
- # If there are no messages left in the incoming queue, the method will
- # block until new messages arrive.
- #
- # If the block is escaped by break/return/exception/throw, the message is removed from
- # the incoming queue as if a truthy value had been returned.
- #
- # r = Ractor.new do
- # val = Ractor.receive_if{|msg| msg.is_a?(Array)}
- # puts "Received successfully: #{val}"
- # end
- #
- # r.send(1)
- # r.send('test')
- # wait
- # puts "2 non-matching sent, nothing received"
- # r.send([1, 2, 3])
- # wait
- #
- # Prints:
- #
- # 2 non-matching sent, nothing received
- # Received successfully: [1, 2, 3]
- #
- # Note that you can not call receive/receive_if in the given block recursively.
- # You should not do any tasks in the block other than message filtration.
- #
- # Ractor.current << true
- # Ractor.receive_if{|msg| Ractor.receive}
- # #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
- #
- def self.receive_if &b
- Primitive.ractor_receive_if b
- end
-
- # same as Ractor.receive_if
- private def receive_if &b
- Primitive.ractor_receive_if b
- end
-
- #
- # call-seq:
- # ractor.send(msg, move: false) -> self
- #
- # Send a message to a Ractor's incoming queue to be accepted by Ractor.receive.
- #
- # r = Ractor.new do
- # value = Ractor.receive
- # puts "Received #{value}"
- # end
- # r.send 'message'
- # # Prints: "Received: message"
- #
- # The method is non-blocking (will return immediately even if the ractor is not ready
- # to receive anything):
- #
- # r = Ractor.new {sleep(5)}
- # r.send('test')
- # puts "Sent successfully"
- # # Prints: "Sent successfully" immediately
- #
- # An attempt to send to a ractor which already finished its execution will raise Ractor::ClosedError.
- #
- # r = Ractor.new {}
- # r.take
- # p r
- # # "#<Ractor:#6 (irb):23 terminated>"
- # r.send('test')
- # # Ractor::ClosedError (The incoming-port is already closed)
+ # ractor.send(msg, move: false) -> self
#
- # If close_incoming was called on the ractor, the method also raises Ractor::ClosedError.
- #
- # r = Ractor.new do
- # sleep(500)
- # receive
- # end
- # r.close_incoming
- # r.send('test')
- # # Ractor::ClosedError (The incoming-port is already closed)
- # # The error is raised immediately, not when the ractor tries to receive
- #
- # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning.
- # If <tt>move: true</tt> is passed, the object is _moved_ into the receiving ractor and becomes
- # inaccessible to the sender.
- #
- # r = Ractor.new {puts "Received: #{receive}"}
- # msg = 'message'
- # r.send(msg, move: true)
- # r.take
- # p msg
- #
- # This prints:
- #
- # Received: message
- # in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8>
- #
- # All references to the object and its parts will become invalid to the sender.
- #
- # r = Ractor.new {puts "Received: #{receive}"}
- # s = 'message'
- # ary = [s]
- # copy = ary.dup
- # r.send(ary, move: true)
- #
- # s.inspect
- # # Ractor::MovedError (can not send any methods to a moved object)
- # ary.class
- # # Ractor::MovedError (can not send any methods to a moved object)
- # copy.class
- # # => Array, it is different object
- # copy[0].inspect
- # # Ractor::MovedError (can not send any methods to a moved object)
- # # ...but its item was still a reference to `s`, which was moved
- #
- # If the object is shareable, <tt>move: true</tt> has no effect on it:
- #
- # r = Ractor.new {puts "Received: #{receive}"}
- # s = 'message'.freeze
- # r.send(s, move: true)
- # s.inspect #=> "message", still available
- #
- def send(obj, move: false)
- __builtin_cexpr! %q{
- ractor_send(ec, RACTOR_PTR(self), obj, move)
- }
+ # This is equivalent to Port#send to the ractor's #default_port.
+ def send(...)
+ default_port.send(...)
+ self
end
alias << send
- #
- # call-seq:
- # Ractor.yield(msg, move: false) -> nil
- #
- # Send a message to the current ractor's outgoing port to be accepted by #take.
- #
- # r = Ractor.new {Ractor.yield 'Hello from ractor'}
- # puts r.take
- # # Prints: "Hello from ractor"
- #
- # This method is blocking, and will return only when somebody consumes the
- # sent message.
- #
- # r = Ractor.new do
- # Ractor.yield 'Hello from ractor'
- # puts "Ractor: after yield"
- # end
- # wait
- # puts "Still not taken"
- # puts r.take
- #
- # This will print:
- #
- # Still not taken
- # Hello from ractor
- # Ractor: after yield
- #
- # If the outgoing port was closed with #close_outgoing, the method will raise:
- #
- # r = Ractor.new do
- # close_outgoing
- # Ractor.yield 'Hello from ractor'
- # end
- # wait
- # # `yield': The outgoing-port is already closed (Ractor::ClosedError)
- #
- # The meaning of the +move+ argument is the same as for #send.
- def self.yield(obj, move: false)
- __builtin_cexpr! %q{
- ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
- }
- end
-
- #
- # call-seq:
- # ractor.take -> msg
- #
- # Get a message from the ractor's outgoing port, which was put there by Ractor.yield or at ractor's
- # termination.
- #
- # r = Ractor.new do
- # Ractor.yield 'explicit yield'
- # 'last value'
- # end
- # puts r.take #=> 'explicit yield'
- # puts r.take #=> 'last value'
- # puts r.take # Ractor::ClosedError (The outgoing-port is already closed)
- #
- # The fact that the last value is also sent to the outgoing port means that +take+ can be used
- # as an analog of Thread#join ("just wait until ractor finishes"). However, it will raise if
- # somebody has already consumed that message.
- #
- # If the outgoing port was closed with #close_outgoing, the method will raise Ractor::ClosedError.
- #
- # r = Ractor.new do
- # sleep(500)
- # Ractor.yield 'Hello from ractor'
- # end
- # r.close_outgoing
- # r.take
- # # Ractor::ClosedError (The outgoing-port is already closed)
- # # The error would be raised immediately, not when ractor will try to receive
- #
- # If an uncaught exception is raised in the Ractor, it is propagated by take as a
- # Ractor::RemoteError.
- #
- # r = Ractor.new {raise "Something weird happened"}
- #
- # begin
- # r.take
- # rescue => e
- # p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
- # p e.ractor == r # => true
- # p e.cause # => #<RuntimeError: Something weird happened>
- # end
- #
- # Ractor::ClosedError is a descendant of StopIteration, so the termination of the ractor will break
- # out of any loops that receive this message without propagating the error:
- #
- # r = Ractor.new do
- # 3.times {|i| Ractor.yield "message #{i}"}
- # "finishing"
- # end
- #
- # loop {puts "Received: " + r.take}
- # puts "Continue successfully"
- #
- # This will print:
- #
- # Received: message 0
- # Received: message 1
- # Received: message 2
- # Received: finishing
- # Continue successfully
- def take
- __builtin_cexpr! %q{
- ractor_take(ec, RACTOR_PTR(self))
- }
- end
-
def inspect
loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
@@ -725,50 +383,25 @@ class Ractor
alias to_s inspect
- # The name set in Ractor.new, or +nil+.
+ # Returns the name set in Ractor.new, or +nil+.
def name
__builtin_cexpr! %q{RACTOR_PTR(self)->name}
end
class RemoteError
- # The Ractor an uncaught exception is raised in.
+ # The Ractor in which the uncaught exception was raised.
attr_reader :ractor
end
#
# call-seq:
- # ractor.close_incoming -> true | false
+ # ractor.close -> true | false
#
- # Closes the incoming port and returns whether it was already closed. All further attempts
- # to Ractor.receive in the ractor, and #send to the ractor will fail with Ractor::ClosedError.
+ # Closes the default port. Closing a port is allowed only by the ractor which created the port.
+ # Therefore, the receiver must be the current ractor.
#
- # r = Ractor.new {sleep(500)}
- # r.close_incoming #=> false
- # r.close_incoming #=> true
- # r.send('test')
- # # Ractor::ClosedError (The incoming-port is already closed)
- def close_incoming
- __builtin_cexpr! %q{
- ractor_close_incoming(ec, RACTOR_PTR(self));
- }
- end
-
- #
- # call-seq:
- # ractor.close_outgoing -> true | false
- #
- # Closes the outgoing port and returns whether it was already closed. All further attempts
- # to Ractor.yield in the ractor, and #take from the ractor will fail with Ractor::ClosedError.
- #
- # r = Ractor.new {sleep(500)}
- # r.close_outgoing #=> false
- # r.close_outgoing #=> true
- # r.take
- # # Ractor::ClosedError (The outgoing-port is already closed)
- def close_outgoing
- __builtin_cexpr! %q{
- ractor_close_outgoing(ec, RACTOR_PTR(self));
- }
+ def close
+ default_port.close
end
#
@@ -777,7 +410,7 @@ class Ractor
#
# Checks if the object is shareable by ractors.
#
- # Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are frozen
+ # Ractor.shareable?(1) #=> true -- numbers are shareable
# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true
# Ractor.shareable?('foo'.freeze) #=> true
#
@@ -792,7 +425,7 @@ class Ractor
# call-seq:
# Ractor.make_shareable(obj, copy: false) -> shareable_obj
#
- # Make +obj+ shareable between ractors.
+ # Makes +obj+ shareable between ractors.
#
# +obj+ and all the objects it refers to will be frozen, unless they are
# already shareable.
@@ -835,8 +468,8 @@ class Ractor
end
end
- # get a value from ractor-local storage for current Ractor
- # Obsolete and use Ractor.[] instead.
+ # Gets a value from ractor-local storage for the current Ractor.
+ # Obsolete, use Ractor.[] instead.
def [](sym)
if (self != Ractor.current)
raise RuntimeError, "Cannot get ractor local storage for non-current ractor"
@@ -844,8 +477,8 @@ class Ractor
Primitive.ractor_local_value(sym)
end
- # set a value in ractor-local storage for current Ractor
- # Obsolete and use Ractor.[]= instead.
+ # Sets a value in ractor-local storage for the current Ractor.
+ # Obsolete, use Ractor.[]= instead.
def []=(sym, val)
if (self != Ractor.current)
raise RuntimeError, "Cannot set ractor local storage for non-current ractor"
@@ -853,12 +486,12 @@ class Ractor
Primitive.ractor_local_value_set(sym, val)
end
- # get a value from ractor-local storage of current Ractor
+ # Gets a value from ractor-local storage for the current Ractor.
def self.[](sym)
Primitive.ractor_local_value(sym)
end
- # set a value in ractor-local storage of current Ractor
+ # Sets a value in ractor-local storage for the current Ractor.
def self.[]=(sym, val)
Primitive.ractor_local_value_set(sym, val)
end
@@ -866,9 +499,9 @@ class Ractor
# call-seq:
# Ractor.store_if_absent(key){ init_block }
#
- # If the corresponding value is not set, yield a value with
- # init_block and store the value in thread-safe manner.
- # This method returns corresponding stored value.
+ # If the corresponding ractor-local value is not set, yields a value with
+ # init_block and stores the value in a thread-safe manner.
+ # This method returns the stored value.
#
# (1..10).map{
# Thread.new(it){|i|
@@ -878,17 +511,18 @@ class Ractor
# }.map(&:value).uniq.size #=> 1 and f() is called only once
#
def self.store_if_absent(sym)
+ Primitive.attr! :use_block
Primitive.ractor_local_value_store_if_absent(sym)
end
- # returns main ractor
+ # Returns the main ractor.
def self.main
__builtin_cexpr! %q{
rb_ractor_self(GET_VM()->ractor.main_ractor);
}
end
- # return true if the current ractor is main ractor
+ # Returns true if the current ractor is the main ractor.
def self.main?
__builtin_cexpr! %q{
RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec))
@@ -922,4 +556,279 @@ class Ractor
}
end
end
+
+ #
+ # call-seq:
+ # ractor.default_port -> port object
+ #
+ # Returns the default port of the Ractor.
+ #
+ def default_port
+ __builtin_cexpr! %q{
+ ractor_default_port_value(RACTOR_PTR(self))
+ }
+ end
+
+ #
+ # call-seq:
+ # ractor.join -> self
+ #
+ # Waits for the termination of the Ractor.
+ # If the Ractor was aborted (terminated by an unhandled exception),
+ # the exception is raised in the current ractor.
+ #
+ # Ractor.new{}.join #=> ractor
+ #
+ # Ractor.new{ raise "foo" }.join
+ # #=> raises the exception "foo (RuntimeError)"
+ #
+ def join
+ port = Port.new
+
+ self.monitor port
+ if port.receive == :aborted
+ __builtin_ractor_value
+ end
+
+ self
+ ensure
+ port.close
+ end
+
+ #
+ # call-seq:
+ # ractor.value -> obj
+ #
+ # Waits for +ractor+ to complete and returns its value or raises the exception
+ # which terminated the Ractor. The termination value will be moved to the calling
+ # Ractor. Therefore, at most 1 Ractor can receive another ractor's termination value.
+ #
+ # r = Ractor.new{ [1, 2] }
+ # r.value #=> [1, 2] (unshareable object)
+ #
+ # Ractor.new(r){|r| r.value} #=> Ractor::Error
+ #
+ def value
+ self.join
+ __builtin_ractor_value
+ end
+
+ #
+ # call-seq:
+ # ractor.monitor(port) -> self
+ #
+ # Registers the port as a monitoring port for this ractor. When the ractor terminates,
+ # the port receives a Symbol object.
+ #
+ # * +:exited+ is sent if the ractor terminates without an unhandled exception.
+ # * +:aborted+ is sent if the ractor terminates by an unhandled exception.
+ #
+ # r = Ractor.new{ some_task() }
+ # r.monitor(port = Ractor::Port.new)
+ # port.receive #=> :exited and r is terminated
+ #
+ # r = Ractor.new{ raise "foo" }
+ # r.monitor(port = Ractor::Port.new)
+ # port.receive #=> :aborted and r is terminated by the RuntimeError "foo"
+ #
+ def monitor port
+ __builtin_ractor_monitor(port)
+ end
+
+ #
+ # call-seq:
+ # ractor.unmonitor(port) -> self
+ #
+ # Unregisters the port from the monitoring ports for this ractor.
+ #
+ def unmonitor port
+ __builtin_ractor_unmonitor(port)
+ end
+
+ #
+ # call-seq:
+ # Ractor.shareable_proc(self: nil){} -> shareable proc
+ #
+ # Returns a shareable copy of the given block's Proc. The value of +self+
+ # in the Proc will be replaced with the value passed via the `self:` keyword,
+ # or +nil+ if not given.
+ #
+ # In a shareable Proc, access to any outer variables is prohibited.
+ #
+ # a = 42
+ # Ractor.shareable_proc{ p a }
+ # #=> can not isolate a Proc because it accesses outer variables (a). (ArgumentError)
+ #
+ # The value of `self` in the Proc must be a shareable object.
+ #
+ # Ractor.shareable_proc(self: self){}
+ # #=> self should be shareable: main (Ractor::IsolationError)
+ #
+ def self.shareable_proc self: nil
+ Primitive.attr! :use_block
+
+ __builtin_cexpr!(%Q{
+ ractor_shareable_proc(ec, *LOCAL_PTR(self), false)
+ })
+ end
+
+ #
+ # call-seq:
+ # Ractor.shareable_lambda(self: nil){} -> shareable lambda
+ #
+ # Same as Ractor.shareable_proc, but returns a lambda Proc.
+ #
+ def self.shareable_lambda self: nil
+ Primitive.attr! :use_block
+
+ __builtin_cexpr!(%Q{
+ ractor_shareable_proc(ec, *LOCAL_PTR(self), true)
+ })
+ end
+
+ # \Port objects transmit messages between Ractors.
+ class Port
+ #
+ # call-seq:
+ # port.receive -> msg
+ #
+ # Receives a message from the port (which was sent there by Port#send). Only the ractor
+ # that created the port can receive messages this way.
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new port do |port|
+ # port.send('message1')
+ # end
+ #
+ # v1 = port.receive
+ # puts "Received: #{v1}"
+ # r.join
+ # # This will print: "Received: message1"
+ #
+ # The method blocks the current Thread if the message queue is empty.
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new port do |port|
+ # wait
+ # puts "Still not received"
+ # port.send('message1')
+ # wait
+ # puts "Still received only one"
+ # port.send('message2')
+ # end
+ # puts "Before first receive"
+ # v1 = port.receive
+ # puts "Received: #{v1}"
+ # v2 = port.receive
+ # puts "Received: #{v2}"
+ # r.join
+ #
+ # Output:
+ #
+ # Before first receive
+ # Still not received
+ # Received: message1
+ # Still received only one
+ # Received: message2
+ #
+ # If the port is closed and there are no more messages in the message queue,
+ # the method raises Ractor::ClosedError.
+ #
+ # port = Ractor::Port.new
+ # port.close
+ # port.receive #=> raise Ractor::ClosedError
+ #
+ def receive
+ __builtin_cexpr! %q{
+ ractor_port_receive(ec, self)
+ }
+ end
+
+ #
+ # call-seq:
+ # port.send(msg, move: false) -> self
+ #
+ # Sends a message to the port to be accepted by port.receive.
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new(port) do |port|
+ # port.send 'message'
+ # end
+ # value = port.receive
+ # puts "Received #{value}"
+ # # Prints: "Received: message"
+ #
+ # The method is non-blocking (it will return immediately even if the ractor that created the port is not ready
+ # to receive anything):
+ #
+ # port = Ractor::Port.new
+ # r = Ractor.new(port) do |port|
+ # port.send 'test'
+ # puts "Sent successfully"
+ # # Prints: "Sent successfully" immediately
+ # end
+ #
+ # An attempt to send to a closed port will raise Ractor::ClosedError.
+ #
+ # r = Ractor.new {Ractor::Port.new}
+ # r.join
+ # p r
+ # # "#<Ractor:#6 (irb):23 terminated>"
+ # port = r.value
+ # port.send('test') # raise Ractor::ClosedError
+ #
+ # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning.
+ #
+ # If the object is shareable, a reference to the object will be sent to the receiving ractor.
+ #
+ def send obj, move: false
+ __builtin_cexpr! %q{
+ ractor_port_send(ec, self, obj, move)
+ }
+ end
+
+ alias << send
+
+ #
+ # call-seq:
+ # port.close
+ #
+ # Closes the port. Sending to a closed port is prohibited.
+ # Receiving is also prohibited if there are no messages in its message queue.
+ #
+ # Only the Ractor which created the port is allowed to close it.
+ #
+ # port = Ractor::Port.new
+ # Ractor.new port do |port|
+ # port.close #=> closing port by other ractors is not allowed (Ractor::Error)
+ # end.join
+ #
+ def close
+ __builtin_cexpr! %q{
+ ractor_port_close(ec, self)
+ }
+ end
+
+ #
+ # call-seq:
+ # port.closed? -> true/false
+ #
+ # Returns whether or not the port is closed.
+ def closed?
+ __builtin_cexpr! %q{
+ ractor_port_closed_p(ec, self);
+ }
+ end
+
+ #
+ # call-seq:
+ # port.inspect -> string
+ def inspect
+ "#<Ractor::Port to:\##{
+ __builtin_cexpr! "SIZET2NUM(rb_ractor_id((RACTOR_PORT_PTR(self)->r)))"
+ } id:#{
+ __builtin_cexpr! "SIZET2NUM(ractor_port_id(RACTOR_PORT_PTR(self)))"
+ }>"
+ end
+ end
end