summaryrefslogtreecommitdiff
path: root/ractor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'ractor.rb')
-rw-r--r--ractor.rb269
1 files changed, 166 insertions, 103 deletions
diff --git a/ractor.rb b/ractor.rb
index 5827f6672b..2dc60f5ff6 100644
--- a/ractor.rb
+++ b/ractor.rb
@@ -1,6 +1,4 @@
-# \Ractor is an Actor-model abstraction for Ruby that provides thread-safe parallel execution.
-#
-# Ractor.new makes a new \Ractor, which can run in parallel.
+# Ractor.new creates a new \Ractor, which can run in parallel with other ractors.
#
# # The simplest ractor
# r = Ractor.new {puts "I am in Ractor!"}
@@ -11,28 +9,27 @@
# concerns such as data-races and race-conditions are not possible. The other benefit is parallelism.
#
# To achieve this, object sharing is limited across ractors.
-# For example, unlike in threads, ractors can't access all the objects available in other ractors. Even objects normally
+# Unlike in threads, ractors can't access all the objects available in other ractors. For example, objects normally
# available through variables in the outer scope are prohibited from being used across ractors.
#
# a = 1
# r = Ractor.new {puts "I am in Ractor! a=#{a}"}
# # fails immediately with
-# # ArgumentError (can not isolate a Proc because it accesses outer variables (a).)
+# # Ractor::IsolationError (can not isolate a Proc because it accesses outer variables (a).)
#
# The object must be explicitly shared:
# a = 1
# r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"}
#
-# On CRuby (the default implementation), Global Virtual Machine Lock (GVL) is held per ractor, so
-# ractors can perform in parallel without locking each other. This is unlike the situation with threads
-# on CRuby.
+# On CRuby (the default implementation), the Global Virtual Machine Lock (GVL) is held per ractor, so
+# ractors can run in parallel. This is unlike the situation with threads on CRuby.
#
# Instead of accessing shared state, objects should be passed to and from ractors by
# sending and receiving them as messages.
#
# a = 1
# r = Ractor.new do
-# a_in_ractor = receive # receive blocks until somebody passes a message
+# a_in_ractor = receive # receive blocks the Thread until our default port gets sent a message
# puts "I am in Ractor! a=#{a_in_ractor}"
# end
# r.send(a) # pass it
@@ -44,14 +41,14 @@
#
# == Shareable and unshareable objects
#
-# When an object is sent to and from a ractor, it's important to understand whether the
+# When an object is sent to a ractor, it's important to understand whether the
# object is shareable or unshareable. Most Ruby objects are unshareable objects. Even
# frozen objects can be unshareable if they contain (through their instance variables) unfrozen
# objects.
#
-# Shareable objects are those which can be used by several threads without compromising
+# Shareable objects are those which can be used by several ractors at once without compromising
# thread-safety, for example numbers, +true+ and +false+. Ractor.shareable? allows you to check this,
-# and Ractor.make_shareable tries to make the object shareable if it's not already, and gives an error
+# and Ractor.make_shareable tries to make the object shareable if it's not already and gives an error
# if it can't do it.
#
# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable
@@ -67,12 +64,12 @@
# ary[0].frozen? #=> true
# ary[1].frozen? #=> true
#
-# When a shareable object is sent (via #send or Ractor.yield), no additional processing occurs
-# on it. It just becomes usable by both ractors. When an unshareable object is sent, it can be
-# either _copied_ or _moved_. The first is the default, and it copies the object fully by
+# When a shareable object is sent via #send, no additional processing occurs
+# on it and it becomes usable by both ractors. When an unshareable object is sent, it can be
+# either _copied_ or _moved_. Copying is the default, and it copies the object fully by
# deep cloning (Object#clone) the non-shareable parts of its structure.
#
-# data = ['foo', 'bar'.freeze]
+# data = ['foo'.dup, 'bar'.freeze]
# r = Ractor.new do
# data2 = Ractor.receive
# puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}"
@@ -83,8 +80,8 @@
#
# This will output something like:
#
-# In ractor: 340, 360, 320
-# Outside : 380, 400, 320
+# In ractor: 8, 16, 24
+# Outside : 32, 40, 24
#
# Note that the object ids of the array and the non-frozen string inside the array have changed in
# the ractor because they are different objects. The second array's element, which is a
@@ -110,12 +107,12 @@
# Outside: moved? true
# test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError)
#
-# Notice that even +inspect+ (and more basic methods like <tt>__id__</tt>) is inaccessible
+# Notice that even +inspect+ and more basic methods like <tt>__id__</tt> are inaccessible
# on a moved object.
#
-# +Class+ and +Module+ objects are shareable so the class/module definitions are shared between ractors.
-# \Ractor objects are also shareable. All operations on shareable objects are thread-safe, so the thread-safety property
-# will be kept. We can not define mutable shareable objects in Ruby, but C extensions can introduce them.
+# +Class+ and +Module+ objects are shareable and their class/module definitions are shared between ractors.
+# \Ractor objects are also shareable. All operations on shareable objects are thread-safe across ractors.
+# Defining mutable, shareable objects in Ruby is not possible, but C extensions can introduce them.
#
# It is prohibited to access (get) instance variables of shareable objects in other ractors if the values of the
# variables aren't shareable. This can occur because modules/classes are shareable, but they can have
@@ -180,15 +177,16 @@
#
# == Note on code examples
#
-# In the examples below, sometimes we use the following method to wait for ractors that
-# are not currently blocked to finish (or to make progress).
+# In the examples below, sometimes we use the following method to wait for ractors
+# to make progress or finish.
#
# def wait
# sleep(0.1)
# end
#
-# It is **only for demonstration purposes** and shouldn't be used in a real code.
-# Most of the time, #join is used to wait for ractors to finish.
+# This is **only for demonstration purposes** and shouldn't be used in a real code.
+# Most of the time, #join is used to wait for ractors to finish and Ractor.receive is used
+# to wait for messages.
#
# == Reference
#
@@ -199,9 +197,9 @@ class Ractor
# call-seq:
# Ractor.new(*args, name: nil) {|*args| block } -> ractor
#
- # Create a new \Ractor with args and a block.
+ # Creates a new \Ractor with args and a block.
#
- # The given block (Proc) will be isolated (can't access any outer variables). +self+
+ # The given block (Proc) is isolated (can't access any outer variables). +self+
# inside the block will refer to the current \Ractor.
#
# r = Ractor.new { puts "Hi, I am #{self.inspect}" }
@@ -232,8 +230,8 @@ class Ractor
b = block # TODO: builtin bug
raise ArgumentError, "must be called with a block" unless block
if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)")
- Kernel.warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \
- "Also there are many implementation issues.", uplevel: 0, category: :experimental)
+ Kernel.warn("Ractor API is experimental and may change in future versions of Ruby.",
+ uplevel: 0, category: :experimental)
end
loc = caller_locations(1, 1).first
loc = "#{loc.path}:#{loc.lineno}"
@@ -249,7 +247,7 @@ class Ractor
}
end
- # Returns the number of Ractors currently running or blocking (waiting).
+ # Returns the number of ractors currently running or blocking (waiting).
#
# Ractor.count #=> 1
# r = Ractor.new(name: 'example') { Ractor.receive }
@@ -265,11 +263,50 @@ class Ractor
#
# call-seq:
- # Ractor.select(*ports) -> [...]
+ # Ractor.select(*ractors_or_ports) -> [ractor or port, obj]
+ #
+ # Blocks the current Thread until one of the given ports has received a message. Returns an
+ # array of two elements where the first element is the Port and the second is the received object.
+ # This method can also accept Ractor objects themselves, and in that case will wait until one
+ # has terminated and return a two-element array where the first element is the ractor and the
+ # second is its termination value.
+ #
+ # p1, p2 = Ractor::Port.new, Ractor::Port.new
+ # ps = [p1, p2]
+ # rs = 2.times.map do |i|
+ # Ractor.new(ps.shift, i) do |p, i|
+ # sleep rand(0.99)
+ # p.send("r#{i}")
+ # sleep rand(0.99)
+ # "r#{i} done"
+ # end
+ # end
+ #
+ # waiting_on = [p1, p2, *rs]
+ # until waiting_on.empty?
+ # received_on, obj = Ractor.select(*waiting_on)
+ # waiting_on.delete(received_on)
+ # puts obj
+ # end
+ #
+ # # r0
+ # # r1
+ # # r1 done
+ # # r0 done
+ #
+ # The following example is almost equivalent to <code>ractors.map(&:value)</code> except the thread
+ # is unblocked when any of the ractors has terminated as opposed to waiting for their termination in
+ # the array element order.
+ #
+ # values = []
+ # until ractors.empty?
+ # r, val = Ractor.select(*ractors)
+ # ractors.delete(r)
+ # values << val
+ # end
#
- # TBD
def self.select(*ports)
- raise ArgumentError, 'specify at least one ractor or `yield_value`' if ports.empty?
+ raise ArgumentError, 'specify at least one Ractor::Port or Ractor' if ports.empty?
monitors = {} # Ractor::Port => Ractor
@@ -308,7 +345,7 @@ class Ractor
# call-seq:
# Ractor.receive -> obj
#
- # Receive a message from the default port.
+ # Receives a message from the current ractor's default port.
def self.receive
Ractor.current.default_port.receive
end
@@ -325,9 +362,9 @@ class Ractor
#
# call-seq:
- # ractor.send(msg) -> self
+ # ractor.send(msg, move: false) -> self
#
- # It is equivalent to default_port.send(msg)
+ # This is equivalent to Port#send to the ractor's #default_port.
def send(...)
default_port.send(...)
self
@@ -346,22 +383,22 @@ class Ractor
alias to_s inspect
- # The name set in Ractor.new, or +nil+.
+ # Returns the name set in Ractor.new, or +nil+.
def name
__builtin_cexpr! %q{RACTOR_PTR(self)->name}
end
class RemoteError
- # The Ractor an uncaught exception is raised in.
+ # The Ractor in which the uncaught exception was raised.
attr_reader :ractor
end
#
# call-seq:
- # Ractor.current.close -> true | false
+ # ractor.close -> true | false
#
- # Closes default_port. Closing port is allowed only by the ractor which creates this port.
- # So this close method also allowed by the current Ractor.
+ # Closes the default port. Closing a port is allowed only by the ractor which created the port.
+ # Therefore, the receiver must be the current ractor.
#
def close
default_port.close
@@ -373,7 +410,7 @@ class Ractor
#
# Checks if the object is shareable by ractors.
#
- # Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are frozen
+ # Ractor.shareable?(1) #=> true -- numbers are shareable
# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true
# Ractor.shareable?('foo'.freeze) #=> true
#
@@ -388,7 +425,7 @@ class Ractor
# call-seq:
# Ractor.make_shareable(obj, copy: false) -> shareable_obj
#
- # Make +obj+ shareable between ractors.
+ # Makes +obj+ shareable between ractors.
#
# +obj+ and all the objects it refers to will be frozen, unless they are
# already shareable.
@@ -431,8 +468,8 @@ class Ractor
end
end
- # get a value from ractor-local storage for current Ractor
- # Obsolete and use Ractor.[] instead.
+ # Gets a value from ractor-local storage for the current Ractor.
+ # Obsolete, use Ractor.[] instead.
def [](sym)
if (self != Ractor.current)
raise RuntimeError, "Cannot get ractor local storage for non-current ractor"
@@ -440,8 +477,8 @@ class Ractor
Primitive.ractor_local_value(sym)
end
- # set a value in ractor-local storage for current Ractor
- # Obsolete and use Ractor.[]= instead.
+ # Sets a value in ractor-local storage for the current Ractor.
+ # Obsolete, use Ractor.[]= instead.
def []=(sym, val)
if (self != Ractor.current)
raise RuntimeError, "Cannot set ractor local storage for non-current ractor"
@@ -449,12 +486,12 @@ class Ractor
Primitive.ractor_local_value_set(sym, val)
end
- # get a value from ractor-local storage of current Ractor
+ # Gets a value from ractor-local storage for the current Ractor.
def self.[](sym)
Primitive.ractor_local_value(sym)
end
- # set a value in ractor-local storage of current Ractor
+ # Sets a value in ractor-local storage for the current Ractor.
def self.[]=(sym, val)
Primitive.ractor_local_value_set(sym, val)
end
@@ -462,9 +499,9 @@ class Ractor
# call-seq:
# Ractor.store_if_absent(key){ init_block }
#
- # If the corresponding value is not set, yield a value with
- # init_block and store the value in thread-safe manner.
- # This method returns corresponding stored value.
+ # If the corresponding ractor-local value is not set, yields a value with
+ # init_block and stores the value in a thread-safe manner.
+ # This method returns the stored value.
#
# (1..10).map{
# Thread.new(it){|i|
@@ -474,17 +511,18 @@ class Ractor
# }.map(&:value).uniq.size #=> 1 and f() is called only once
#
def self.store_if_absent(sym)
+ Primitive.attr! :use_block
Primitive.ractor_local_value_store_if_absent(sym)
end
- # returns main ractor
+ # Returns the main ractor.
def self.main
__builtin_cexpr! %q{
rb_ractor_self(GET_VM()->ractor.main_ractor);
}
end
- # return true if the current ractor is main ractor
+ # Returns true if the current ractor is the main ractor.
def self.main?
__builtin_cexpr! %q{
RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec))
@@ -523,7 +561,7 @@ class Ractor
# call-seq:
# ractor.default_port -> port object
#
- # return default port of the Ractor.
+ # Returns the default port of the Ractor.
#
def default_port
__builtin_cexpr! %q{
@@ -535,14 +573,14 @@ class Ractor
# call-seq:
# ractor.join -> self
#
- # Wait for the termination of the Ractor.
- # If the Ractor was aborted (terminated with an exception),
- # Ractor#value is called to raise an exception.
+ # Waits for the termination of the Ractor.
+ # If the Ractor was aborted (terminated by an unhandled exception),
+ # the exception is raised in the current ractor.
#
# Ractor.new{}.join #=> ractor
#
# Ractor.new{ raise "foo" }.join
- # #=> raise an exception "foo (RuntimeError)"
+ # #=> raises the exception "foo (RuntimeError)"
#
def join
port = Port.new
@@ -561,9 +599,9 @@ class Ractor
# call-seq:
# ractor.value -> obj
#
- # Waits for +ractor+ to complete, using #join, and return its value or raise
- # the exception which terminated the Ractor. The value will not be copied even
- # if it is unshareable object. Therefore at most 1 Ractor can get a value.
+ # Waits for +ractor+ to complete and returns its value or raises the exception
+ # which terminated the Ractor. The termination value will be moved to the calling
+ # Ractor. Therefore, at most 1 Ractor can receive another ractor's termination value.
#
# r = Ractor.new{ [1, 2] }
# r.value #=> [1, 2] (unshareable object)
@@ -575,20 +613,15 @@ class Ractor
__builtin_ractor_value
end
- # keep it for compatibility
- def take
- Kernel.warn("Ractor#take was deprecated and use Ractor#value instead. This method will be removed after the end of Aug 2025", uplevel: 0)
- self.value
- end
-
#
# call-seq:
# ractor.monitor(port) -> self
#
- # Register port as a monitoring port. If the ractor terminated,
- # the port received a Symbol object.
- # :exited will be sent if the ractor terminated without an exception.
- # :aborted will be sent if the ractor terminated with a exception.
+ # Registers the port as a monitoring port for this ractor. When the ractor terminates,
+ # the port receives a Symbol object.
+ #
+ # * +:exited+ is sent if the ractor terminates without an unhandled exception.
+ # * +:aborted+ is sent if the ractor terminates by an unhandled exception.
#
# r = Ractor.new{ some_task() }
# r.monitor(port = Ractor::Port.new)
@@ -596,7 +629,7 @@ class Ractor
#
# r = Ractor.new{ raise "foo" }
# r.monitor(port = Ractor::Port.new)
- # port.receive #=> :terminated and r is terminated with an exception "foo"
+ # port.receive #=> :aborted and r is terminated by the RuntimeError "foo"
#
def monitor port
__builtin_ractor_monitor(port)
@@ -606,19 +639,61 @@ class Ractor
# call-seq:
# ractor.unmonitor(port) -> self
#
- # Unregister port from the monitoring ports.
+ # Unregisters the port from the monitoring ports for this ractor.
#
def unmonitor port
__builtin_ractor_unmonitor(port)
end
+ #
+ # call-seq:
+ # Ractor.shareable_proc(self: nil){} -> shareable proc
+ #
+ # Returns a shareable copy of the given block's Proc. The value of +self+
+ # in the Proc will be replaced with the value passed via the `self:` keyword,
+ # or +nil+ if not given.
+ #
+ # In a shareable Proc, access to any outer variables is prohibited.
+ #
+ # a = 42
+ # Ractor.shareable_proc{ p a }
+ # #=> can not isolate a Proc because it accesses outer variables (a). (Ractor::IsolationError)
+ #
+ # The value of `self` in the Proc must be a shareable object.
+ #
+ # Ractor.shareable_proc(self: self){}
+ # #=> self should be shareable: main (Ractor::IsolationError)
+ #
+ def self.shareable_proc self: nil
+ Primitive.attr! :use_block
+
+ __builtin_cexpr!(%Q{
+ ractor_shareable_proc(ec, *LOCAL_PTR(self), false)
+ })
+ end
+
+ #
+ # call-seq:
+ # Ractor.shareable_lambda(self: nil){} -> shareable lambda
+ #
+ # Same as Ractor.shareable_proc, but returns a lambda Proc.
+ #
+ def self.shareable_lambda self: nil
+ Primitive.attr! :use_block
+
+ __builtin_cexpr!(%Q{
+ ractor_shareable_proc(ec, *LOCAL_PTR(self), true)
+ })
+ end
+
# \Port objects transmit messages between Ractors.
class Port
#
# call-seq:
# port.receive -> msg
#
- # Receive a message to the port (which was sent there by Port#send).
+ # Receives a message from the port (which was sent there by Port#send). Only the ractor
+ # that created the port can receive messages this way.
#
# port = Ractor::Port.new
# r = Ractor.new port do |port|
@@ -628,9 +703,9 @@ class Ractor
# v1 = port.receive
# puts "Received: #{v1}"
# r.join
- # # Here will be printed: "Received: message1"
+ # # This will print: "Received: message1"
#
- # The method blocks if the message queue is empty.
+ # The method blocks the current Thread if the message queue is empty.
#
# port = Ractor::Port.new
# r = Ractor.new port do |port|
@@ -656,8 +731,8 @@ class Ractor
# Still received only one
# Received: message2
#
- # If close_incoming was called on the ractor, the method raises Ractor::ClosedError
- # if there are no more messages in the message queue:
+ # If the port is closed and there are no more messages in the message queue,
+ # the method raises Ractor::ClosedError.
#
# port = Ractor::Port.new
# port.close
@@ -673,27 +748,27 @@ class Ractor
# call-seq:
# port.send(msg, move: false) -> self
#
- # Send a message to a port to be accepted by port.receive.
+ # Sends a message to the port to be accepted by port.receive.
#
# port = Ractor::Port.new
- # r = Ractor.new do
- # r.send 'message'
+ # r = Ractor.new(port) do |port|
+ # port.send 'message'
# end
# value = port.receive
# puts "Received #{value}"
# # Prints: "Received: message"
#
- # The method is non-blocking (will return immediately even if the ractor is not ready
+ # The method is non-blocking (it will return immediately even if the ractor that created the port is not ready
# to receive anything):
#
# port = Ractor::Port.new
# r = Ractor.new(port) do |port|
- # port.send 'test'}
+ # port.send 'test'
# puts "Sent successfully"
# # Prints: "Sent successfully" immediately
# end
#
- # An attempt to send to a port which already closed its execution will raise Ractor::ClosedError.
+ # An attempt to send to a closed port will raise Ractor::ClosedError.
#
# r = Ractor.new {Ractor::Port.new}
# r.join
@@ -704,7 +779,7 @@ class Ractor
#
# If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning.
#
- # If the object is shareable, it only send a reference to the object without cloning.
+ # If the object is shareable, a reference to the object will be sent to the receiving ractor.
#
def send obj, move: false
__builtin_cexpr! %q{
@@ -718,22 +793,10 @@ class Ractor
# call-seq:
# port.close
#
- # Close the port. On the closed port, sending is not prohibited.
- # Receiving is also not allowed if there is no sent messages arrived before closing.
- #
- # port = Ractor::Port.new
- # Ractor.new port do |port|
- # port.send 1 # OK
- # port.send 2 # OK
- # port.close
- # port.send 3 # raise Ractor::ClosedError
- # end
- #
- # port.receive #=> 1
- # port.receive #=> 2
- # port.receive #=> raise Ractor::ClosedError
+ # Closes the port. Sending to a closed port is prohibited.
+ # Receiving is also prohibited if there are no messages in its message queue.
#
- # Now, only a Ractor which creates the port is allowed to close ports.
+ # Only the Ractor which created the port is allowed to close it.
#
# port = Ractor::Port.new
# Ractor.new port do |port|
@@ -750,7 +813,7 @@ class Ractor
# call-seq:
# port.closed? -> true/false
#
- # Return the port is closed or not.
+ # Returns whether or not the port is closed.
def closed?
__builtin_cexpr! %q{
ractor_port_closed_p(ec, self);