diff options
Diffstat (limited to 'ractor.rb')
| -rw-r--r-- | ractor.rb | 1060 |
1 files changed, 526 insertions, 534 deletions
@@ -1,96 +1,60 @@ -# Ractor is an Actor-model abstraction for Ruby that provides thread-safe parallel execution. -# -# Ractor.new can make a new Ractor, and it will run in parallel. +# Ractor.new creates a new \Ractor, which can run in parallel with other ractors. # # # The simplest ractor # r = Ractor.new {puts "I am in Ractor!"} -# r.take # wait for it to finish -# # here "I am in Ractor!" would be printed +# r.join # wait for it to finish +# # Here, "I am in Ractor!" is printed # -# Ractors do not share usual objects, so the same kinds of thread-safety concerns such as data-race, -# race-conditions are not available on multi-ractor programming. +# Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety +# concerns such as data-races and race-conditions are not possible. The other benefit is parallelism. # -# To achieve this, ractors severely limit object sharing between different ractors. -# For example, unlike threads, ractors can't access each other's objects, nor any objects through -# variables of the outer scope. +# To achieve this, object sharing is limited across ractors. +# Unlike in threads, ractors can't access all the objects available in other ractors. For example, objects normally +# available through variables in the outer scope are prohibited from being used across ractors. # # a = 1 # r = Ractor.new {puts "I am in Ractor! a=#{a}"} # # fails immediately with # # ArgumentError (can not isolate a Proc because it accesses outer variables (a).) # -# On CRuby (the default implementation), Global Virtual Machine Lock (GVL) is held per ractor, so -# ractors are performed in parallel without locking each other. +# The object must be explicitly shared: +# a = 1 +# r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"} +# +# On CRuby (the default implementation), the Global Virtual Machine Lock (GVL) is held per ractor, so +# ractors can run in parallel. This is unlike the situation with threads on CRuby. # -# Instead of accessing the shared state, the objects should be passed to and from ractors via -# sending and receiving objects as messages. +# Instead of accessing shared state, objects should be passed to and from ractors by +# sending and receiving them as messages. # # a = 1 # r = Ractor.new do -# a_in_ractor = receive # receive blocks till somebody will pass message +# a_in_ractor = receive # receive blocks the Thread until our default port gets sent a message # puts "I am in Ractor! a=#{a_in_ractor}" # end # r.send(a) # pass it -# r.take -# # here "I am in Ractor! a=1" would be printed -# -# There are two pairs of methods for sending/receiving messages: -# -# * Ractor#send and Ractor.receive for when the _sender_ knows the receiver (push); -# * Ractor.yield and Ractor#take for when the _receiver_ knows the sender (pull); -# -# In addition to that, an argument to Ractor.new would be passed to block and available there -# as if received by Ractor.receive, and the last block value would be sent outside of the -# ractor as if sent by Ractor.yield. -# -# A little demonstration on a classic ping-pong: -# -# server = Ractor.new do -# puts "Server starts: #{self.inspect}" -# puts "Server sends: ping" -# Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested -# received = Ractor.receive # The server doesn't know the sender and receives from whoever sent -# puts "Server received: #{received}" -# end -# -# client = Ractor.new(server) do |srv| # The server is sent inside client, and available as srv -# puts "Client starts: #{self.inspect}" -# received = srv.take # The Client takes a message specifically from the server -# puts "Client received from " \ -# "#{srv.inspect}: #{received}" -# puts "Client sends to " \ -# "#{srv.inspect}: pong" -# srv.send 'pong' # The client sends a message specifically to the server -# end -# -# [client, server].each(&:take) # Wait till they both finish -# -# This will output: +# r.join +# # Here, "I am in Ractor! a=1" is printed # -# Server starts: #<Ractor:#2 test.rb:1 running> -# Server sends: ping -# Client starts: #<Ractor:#3 test.rb:8 running> -# Client received from #<Ractor:#2 rac.rb:1 blocking>: ping -# Client sends to #<Ractor:#2 rac.rb:1 blocking>: pong -# Server received: pong -# -# It is said that Ractor receives messages via the <em>incoming port</em>, and sends them -# to the <em>outgoing port</em>. Either one can be disabled with Ractor#close_incoming and -# Ractor#close_outgoing respectively. If a ractor terminated, its ports will be closed -# automatically. +# In addition to that, any arguments passed to Ractor.new are passed to the block and available there +# as if received by Ractor.receive, and the last block value can be received with Ractor#value. # # == Shareable and unshareable objects # -# When the object is sent to and from the ractor, it is important to understand whether the -# object is shareable or unshareable. Most of objects are unshareable objects. +# When an object is sent to a ractor, it's important to understand whether the +# object is shareable or unshareable. Most Ruby objects are unshareable objects. Even +# frozen objects can be unshareable if they contain (through their instance variables) unfrozen +# objects. # -# Shareable objects are basically those which can be used by several threads without compromising -# thread-safety; e.g. immutable ones. Ractor.shareable? allows to check this, and Ractor.make_shareable -# tries to make object shareable if it is not. +# Shareable objects are those which can be used by several ractors at once without compromising +# thread-safety, for example numbers, +true+ and +false+. Ractor.shareable? allows you to check this, +# and Ractor.make_shareable tries to make the object shareable if it's not already and gives an error +# if it can't do it. # -# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are -# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # freeze_string_literals: true +# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable +# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true # Ractor.shareable?('foo'.freeze) #=> true +# Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen # # ary = ['hello', 'world'] # ary.frozen? #=> false @@ -100,32 +64,32 @@ # ary[0].frozen? #=> true # ary[1].frozen? #=> true # -# When a shareable object is sent (via #send or Ractor.yield), no additional processing happens, -# and it just becomes usable by both ractors. When an unshareable object is sent, it can be -# either _copied_ or _moved_. The first is the default, and it makes the object's full copy by -# deep cloning of non-shareable parts of its structure. +# When a shareable object is sent via #send, no additional processing occurs +# on it and it becomes usable by both ractors. When an unshareable object is sent, it can be +# either _copied_ or _moved_. Copying is the default, and it copies the object fully by +# deep cloning (Object#clone) the non-shareable parts of its structure. # -# data = ['foo', 'bar'.freeze] +# data = ['foo'.dup, 'bar'.freeze] # r = Ractor.new do # data2 = Ractor.receive # puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" # end # r.send(data) -# r.take +# r.join # puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}" # -# This will output: +# This will output something like: # -# In ractor: 340, 360, 320 -# Outside : 380, 400, 320 +# In ractor: 8, 16, 24 +# Outside : 32, 40, 24 # -# (Note that object id of both array and non-frozen string inside array have changed inside -# the ractor, showing it is different objects. But the second array's element, which is a -# shareable frozen string, has the same object_id.) +# Note that the object ids of the array and the non-frozen string inside the array have changed in +# the ractor because they are different objects. The second array's element, which is a +# shareable frozen string, is the same object. # -# Deep cloning of the objects may be slow, and sometimes impossible. Alternatively, -# <tt>move: true</tt> may be used on sending. This will <em>move</em> the object to the -# receiving ractor, making it inaccessible for a sending ractor. +# Deep cloning of objects may be slow, and sometimes impossible. Alternatively, <tt>move: true</tt> may +# be used during sending. This will <em>move</em> the unshareable object to the receiving ractor, making it +# inaccessible to the sending ractor. # # data = ['foo', 'bar'] # r = Ractor.new do @@ -133,7 +97,7 @@ # puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" # end # r.send(data, move: true) -# r.take +# r.join # puts "Outside: moved? #{Ractor::MovedObject === data}" # puts "Outside: #{data.inspect}" # @@ -143,16 +107,17 @@ # Outside: moved? true # test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError) # -# Notice that even +inspect+ (and more basic methods like <tt>__id__</tt>) is inaccessible +# Notice that even +inspect+ and more basic methods like <tt>__id__</tt> are inaccessible # on a moved object. # -# Besides frozen objects, there are shareable objects. Class and Module objects are shareable so -# the Class/Module definitions are shared between ractors. Ractor objects are also shareable objects. -# All operations for the shareable mutable objects are thread-safe, so the thread-safety property -# will be kept. We can not define mutable shareable objects in Ruby, but C extensions can introduce them. +# +Class+ and +Module+ objects are shareable and their class/module definitions are shared between ractors. +# \Ractor objects are also shareable. All operations on shareable objects are thread-safe across ractors. +# Defining mutable, shareable objects in Ruby is not possible, but C extensions can introduce them. # -# It is prohibited to access instance variables of mutable shareable objects (especially Modules and classes) -# from ractors other than main: +# It is prohibited to access (get) instance variables of shareable objects in other ractors if the values of the +# variables aren't shareable. This can occur because modules/classes are shareable, but they can have +# instance variables whose values are not. In non-main ractors, it's also prohibited to set instance +# variables on classes/modules (even if the value is shareable). # # class C # class << self @@ -160,27 +125,28 @@ # end # end # -# C.tricky = 'test' +# C.tricky = "unshareable".dup # # r = Ractor.new(C) do |cls| # puts "I see #{cls}" # puts "I can't see #{cls.tricky}" +# cls.tricky = true # doesn't get here, but this would also raise an error # end -# r.take +# r.join # # I see C # # can not access instance variables of classes/modules from non-main Ractors (RuntimeError) # -# Ractors can access constants if they are shareable. The main Ractor is the only one that can +# Ractors can access constants if they are shareable. The main \Ractor is the only one that can # access non-shareable constants. # # GOOD = 'good'.freeze -# BAD = 'bad' +# BAD = 'bad'.dup # # r = Ractor.new do # puts "GOOD=#{GOOD}" # puts "BAD=#{BAD}" # end -# r.take +# r.join # # GOOD=good # # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) # @@ -190,7 +156,7 @@ # puts "I see #{C}" # puts "I can't see #{C.tricky}" # end -# r.take +# r.join # # I see C # # can not access instance variables of classes/modules from non-main Ractors (RuntimeError) # @@ -199,27 +165,28 @@ # # == Ractors vs threads # -# Each ractor creates its own thread. New threads can be created from inside ractor -# (and, on CRuby, sharing GVL with other threads of this ractor). +# Each ractor has its own main Thread. New threads can be created from inside ractors +# (and, on CRuby, they share the GVL with other threads of this ractor). # # r = Ractor.new do # a = 1 # Thread.new {puts "Thread in ractor: a=#{a}"}.join # end -# r.take +# r.join # # Here "Thread in ractor: a=1" will be printed # # == Note on code examples # -# In examples below, sometimes we use the following method to wait till ractors that -# are not currently blocked will finish (or process till next blocking) method. +# In the examples below, sometimes we use the following method to wait for ractors +# to make progress or finish. # # def wait # sleep(0.1) # end # -# It is **only for demonstration purposes** and shouldn't be used in a real code. -# Most of the times, just #take is used to wait till ractor will finish. +# This is **only for demonstration purposes** and shouldn't be used in a real code. +# Most of the time, #join is used to wait for ractors to finish and Ractor.receive is used +# to wait for messages. # # == Reference # @@ -230,17 +197,17 @@ class Ractor # call-seq: # Ractor.new(*args, name: nil) {|*args| block } -> ractor # - # Create a new Ractor with args and a block. + # Creates a new \Ractor with args and a block. # - # A block (Proc) will be isolated (can't access to outer variables). +self+ - # inside the block will refer to the current Ractor. + # The given block (Proc) is isolated (can't access any outer variables). +self+ + # inside the block will refer to the current \Ractor. # # r = Ractor.new { puts "Hi, I am #{self.inspect}" } - # r.take + # r.join # # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>" # - # +args+ passed to the method would be propagated to block args by the same rules as - # objects passed through #send/Ractor.receive: if +args+ are not shareable, they + # Any +args+ passed are propagated to the block arguments by the same rules as + # objects sent via #send/Ractor.receive. If an argument in +args+ is not shareable, it # will be copied (via deep cloning, which might be inefficient). # # arg = [1, 2, 3] @@ -248,14 +215,14 @@ class Ractor # r = Ractor.new(arg) {|received_arg| # puts "Received: #{received_arg} (##{received_arg.object_id})" # } - # r.take + # r.join # # Prints: # # Passing: [1, 2, 3] (#280) # # Received: [1, 2, 3] (#300) # # Ractor's +name+ can be set for debugging purposes: # - # r = Ractor.new(name: 'my ractor') {} + # r = Ractor.new(name: 'my ractor') {}; r.join # p r # #=> #<Ractor:#3 my ractor test.rb:1 terminated> # @@ -263,8 +230,8 @@ class Ractor b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") - warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \ - "Also there are many implementation issues.", uplevel: 0, category: :experimental) + Kernel.warn("Ractor API is experimental and may change in future versions of Ruby.", + uplevel: 0, category: :experimental) end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" @@ -280,13 +247,13 @@ class Ractor } end - # Returns total count of Ractors currently running. + # Returns the number of ractors currently running or blocking (waiting). # # Ractor.count #=> 1 - # r = Ractor.new(name: 'example') { Ractor.yield(1) } + # r = Ractor.new(name: 'example') { Ractor.receive } # Ractor.count #=> 2 (main + example ractor) - # r.take # wait for Ractor.yield(1) - # r.take # wait till r will finish + # r << 42 # r's Ractor.receive will resume + # r.join # wait for r's termination # Ractor.count #=> 1 def self.count __builtin_cexpr! %q{ @@ -296,130 +263,91 @@ class Ractor # # call-seq: - # Ractor.select(*ractors, [yield_value:, move: false]) -> [ractor or symbol, obj] - # - # Waits for the first ractor to have something in its outgoing port, reads from this ractor, and - # returns that ractor and the object received. - # - # r1 = Ractor.new {Ractor.yield 'from 1'} - # r2 = Ractor.new {Ractor.yield 'from 2'} - # - # r, obj = Ractor.select(r1, r2) - # - # puts "received #{obj.inspect} from #{r.inspect}" - # # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> - # - # If one of the given ractors is the current ractor, and it would be selected, +r+ will contain - # +:receive+ symbol instead of the ractor object. - # - # r1 = Ractor.new(Ractor.current) do |main| - # main.send 'to main' - # Ractor.yield 'from 1' - # end - # r2 = Ractor.new do - # Ractor.yield 'from 2' + # Ractor.select(*ractors_or_ports) -> [ractor or port, obj] + # + # Blocks the current Thread until one of the given ports has received a message. Returns an + # array of two elements where the first element is the Port and the second is the received object. + # This method can also accept Ractor objects themselves, and in that case will wait until one + # has terminated and return a two-element array where the first element is the ractor and the + # second is its termination value. + # + # p1, p2 = Ractor::Port.new, Ractor::Port.new + # ps = [p1, p2] + # rs = 2.times.map do |i| + # Ractor.new(ps.shift, i) do |p, i| + # sleep rand(0.99) + # p.send("r#{i}") + # sleep rand(0.99) + # "r#{i} done" + # end # end # - # r, obj = Ractor.select(r1, r2, Ractor.current) - # puts "received #{obj.inspect} from #{r.inspect}" - # # Prints: received "to main" from :receive - # - # If +yield_value+ is provided, that value may be yielded if another Ractor is calling #take. - # In this case, the pair <tt>[:yield, nil]</tt> would be returned: - # - # r1 = Ractor.new(Ractor.current) do |main| - # puts "Received from main: #{main.take}" + # waiting_on = [p1, p2, *rs] + # until waiting_on.empty? + # received_on, obj = Ractor.select(*waiting_on) + # waiting_on.delete(received_on) + # puts obj # end # - # puts "Trying to select" - # r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) - # wait - # puts "Received #{obj.inspect} from #{r.inspect}" + # # r0 + # # r1 + # # r1 done + # # r0 done # - # This will print: + # The following example is almost equivalent to <code>ractors.map(&:value)</code> except the thread + # is unblocked when any of the ractors has terminated as opposed to waiting for their termination in + # the array element order. # - # Trying to select - # Received from main: 123 - # Received nil from :yield + # values = [] + # until ractors.empty? + # r, val = Ractor.select(*ractors) + # ractors.delete(r) + # values << val + # end # - # +move+ boolean flag defines whether yielded value should be copied (default) or moved. - def self.select(*ractors, yield_value: yield_unspecified = true, move: false) - raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? + def self.select(*ports) + raise ArgumentError, 'specify at least one Ractor::Port or Ractor' if ports.empty? - __builtin_cstmt! %q{ - const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors); - VALUE rv; - VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors), - yield_unspecified == Qtrue ? Qundef : yield_value, - (bool)RTEST(move) ? true : false, &rv); - return rb_ary_new_from_args(2, rv, v); - } + monitors = {} # Ractor::Port => Ractor + + ports = ports.map do |arg| + case arg + when Ractor + port = Ractor::Port.new + monitors[port] = arg + arg.monitor port + port + when Ractor::Port + arg + else + raise ArgumentError, "should be Ractor::Port or Ractor" + end + end + + begin + result_port, obj = __builtin_ractor_select_internal(ports) + + if r = monitors[result_port] + [r, r.value] + else + [result_port, obj] + end + ensure + # close all ports for join + monitors.each do |port, r| + r.unmonitor port + port.close + end + end end # # call-seq: - # Ractor.receive -> msg - # - # Receive an incoming message from the current Ractor's incoming port's queue, which was - # sent there by #send. - # - # r = Ractor.new do - # v1 = Ractor.receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # Here will be printed: "Received: message1" - # - # Alternatively, private instance method +receive+ may be used: - # - # r = Ractor.new do - # v1 = receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # Here will be printed: "Received: message1" - # - # The method blocks if the queue is empty. - # - # r = Ractor.new do - # puts "Before first receive" - # v1 = Ractor.receive - # puts "Received: #{v1}" - # v2 = Ractor.receive - # puts "Received: #{v2}" - # end - # wait - # puts "Still not received" - # r.send('message1') - # wait - # puts "Still received only one" - # r.send('message2') - # r.take - # - # Output: - # - # Before first receive - # Still not received - # Received: message1 - # Still received only one - # Received: message2 - # - # If close_incoming was called on the ractor, the method raises Ractor::ClosedError - # if there are no more messages in incoming queue: - # - # Ractor.new do - # close_incoming - # receive - # end - # wait - # # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError) + # Ractor.receive -> obj # + # Receives a message from the current ractor's default port. def self.receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } + Ractor.current.default_port.receive end class << self @@ -428,278 +356,21 @@ class Ractor # same as Ractor.receive private def receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } + default_port.receive end alias recv receive # # call-seq: - # Ractor.receive_if {|msg| block } -> msg - # - # Receive only a specific message. - # - # Instead of Ractor.receive, Ractor.receive_if can provide a pattern - # by a block and you can choose the receiving message. - # - # r = Ractor.new do - # p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" - # p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" - # p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" - # end - # r << "bar1" - # r << "baz2" - # r << "foo3" - # r.take - # - # This will output: - # - # foo3 - # bar1 - # baz2 - # - # If the block returns a truthy value, the message will be removed from the incoming queue - # and returned. - # Otherwise, the message remains in the incoming queue and the following received - # messages are checked by the given block. - # - # If there are no messages left in the incoming queue, the method will - # block until new messages arrive. - # - # If the block is escaped by break/return/exception/throw, the message is removed from - # the incoming queue as if a truthy value had been returned. - # - # r = Ractor.new do - # val = Ractor.receive_if{|msg| msg.is_a?(Array)} - # puts "Received successfully: #{val}" - # end - # - # r.send(1) - # r.send('test') - # wait - # puts "2 non-matching sent, nothing received" - # r.send([1, 2, 3]) - # wait - # - # Prints: - # - # 2 non-matching sent, nothing received - # Received successfully: [1, 2, 3] - # - # Note that you can not call receive/receive_if in the given block recursively. - # It means that you should not do any tasks in the block. - # - # Ractor.current << true - # Ractor.receive_if{|msg| Ractor.receive} - # #=> `receive': can not call receive/receive_if recursively (Ractor::Error) - # - def self.receive_if &b - Primitive.ractor_receive_if b - end - - private def receive_if &b - Primitive.ractor_receive_if b - end - - # - # call-seq: - # ractor.send(msg, move: false) -> self - # - # Send a message to a Ractor's incoming queue to be consumed by Ractor.receive. - # - # r = Ractor.new do - # value = Ractor.receive - # puts "Received #{value}" - # end - # r.send 'message' - # # Prints: "Received: message" - # - # The method is non-blocking (will return immediately even if the ractor is not ready - # to receive anything): + # ractor.send(msg, move: false) -> self # - # r = Ractor.new {sleep(5)} - # r.send('test') - # puts "Sent successfully" - # # Prints: "Sent successfully" immediately - # - # Attempt to send to ractor which already finished its execution will raise Ractor::ClosedError. - # - # r = Ractor.new {} - # r.take - # p r - # # "#<Ractor:#6 (irb):23 terminated>" - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # - # If close_incoming was called on the ractor, the method also raises Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # receive - # end - # r.close_incoming - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # # The error would be raised immediately, not when ractor will try to receive - # - # If the +obj+ is unshareable, by default it would be copied into ractor by deep cloning. - # If the <tt>move: true</tt> is passed, object is _moved_ into ractor and becomes - # inaccessible to sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # msg = 'message' - # r.send(msg, move: true) - # r.take - # p msg - # - # This prints: - # - # Received: message - # in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8> - # - # All references to the object and its parts will become invalid in sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message' - # ary = [s] - # copy = ary.dup - # r.send(ary, move: true) - # - # s.inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # ary.class - # # Ractor::MovedError (can not send any methods to a moved object) - # copy.class - # # => Array, it is different object - # copy[0].inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # # ...but its item was still a reference to `s`, which was moved - # - # If the object was shareable, <tt>move: true</tt> has no effect on it: - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message'.freeze - # r.send(s, move: true) - # s.inspect #=> "message", still available - # - def send(obj, move: false) - __builtin_cexpr! %q{ - ractor_send(ec, RACTOR_PTR(self), obj, move) - } + # This is equivalent to Port#send to the ractor's #default_port. + def send(...) + default_port.send(...) + self end alias << send - # - # call-seq: - # Ractor.yield(msg, move: false) -> nil - # - # Send a message to the current ractor's outgoing port to be consumed by #take. - # - # r = Ractor.new {Ractor.yield 'Hello from ractor'} - # puts r.take - # # Prints: "Hello from ractor" - # - # The method is blocking, and will return only when somebody consumes the - # sent message. - # - # r = Ractor.new do - # Ractor.yield 'Hello from ractor' - # puts "Ractor: after yield" - # end - # wait - # puts "Still not taken" - # puts r.take - # - # This will print: - # - # Still not taken - # Hello from ractor - # Ractor: after yield - # - # If the outgoing port was closed with #close_outgoing, the method will raise: - # - # r = Ractor.new do - # close_outgoing - # Ractor.yield 'Hello from ractor' - # end - # wait - # # `yield': The outgoing-port is already closed (Ractor::ClosedError) - # - # The meaning of +move+ argument is the same as for #send. - def self.yield(obj, move: false) - __builtin_cexpr! %q{ - ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) - } - end - - # - # call-seq: - # ractor.take -> msg - # - # Take a message from ractor's outgoing port, which was put there by Ractor.yield or at ractor's - # finalization. - # - # r = Ractor.new do - # Ractor.yield 'explicit yield' - # 'last value' - # end - # puts r.take #=> 'explicit yield' - # puts r.take #=> 'last value' - # puts r.take # Ractor::ClosedError (The outgoing-port is already closed) - # - # The fact that the last value is also put to outgoing port means that +take+ can be used - # as some analog of Thread#join ("just wait till ractor finishes"), but don't forget it - # will raise if somebody had already consumed everything ractor have produced. - # - # If the outgoing port was closed with #close_outgoing, the method will raise Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # Ractor.yield 'Hello from ractor' - # end - # r.close_outgoing - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - # # The error would be raised immediately, not when ractor will try to receive - # - # If an uncaught exception is raised in the Ractor, it is propagated on take as a - # Ractor::RemoteError. - # - # r = Ractor.new {raise "Something weird happened"} - # - # begin - # r.take - # rescue => e - # p e # => #<Ractor::RemoteError: thrown by remote Ractor.> - # p e.ractor == r # => true - # p e.cause # => #<RuntimeError: Something weird happened> - # end - # - # Ractor::ClosedError is a descendant of StopIteration, so the closing of the ractor will break - # the loops without propagating the error: - # - # r = Ractor.new do - # 3.times {|i| Ractor.yield "message #{i}"} - # "finishing" - # end - # - # loop {puts "Received: " + r.take} - # puts "Continue successfully" - # - # This will print: - # - # Received: message 0 - # Received: message 1 - # Received: message 2 - # Received: finishing - # Continue successfully - def take - __builtin_cexpr! %q{ - ractor_take(ec, RACTOR_PTR(self)) - } - end - def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } @@ -712,51 +383,25 @@ class Ractor alias to_s inspect - # The name set in Ractor.new, or +nil+. + # Returns the name set in Ractor.new, or +nil+. def name __builtin_cexpr! %q{RACTOR_PTR(self)->name} end class RemoteError + # The Ractor in which the uncaught exception was raised. attr_reader :ractor end # # call-seq: - # ractor.close_incoming -> true | false - # - # Closes the incoming port and returns its previous state. - # All further attempts to Ractor.receive in the ractor, and #send to the ractor - # will fail with Ractor::ClosedError. - # - # r = Ractor.new {sleep(500)} - # r.close_incoming #=> false - # r.close_incoming #=> true - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - def close_incoming - __builtin_cexpr! %q{ - ractor_close_incoming(ec, RACTOR_PTR(self)); - } - end - + # ractor.close -> true | false # - # call-seq: - # ractor.close_outgoing -> true | false - # - # Closes the outgoing port and returns its previous state. - # All further attempts to Ractor.yield in the ractor, and #take from the ractor - # will fail with Ractor::ClosedError. - # - # r = Ractor.new {sleep(500)} - # r.close_outgoing #=> false - # r.close_outgoing #=> true - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - def close_outgoing - __builtin_cexpr! %q{ - ractor_close_outgoing(ec, RACTOR_PTR(self)); - } + # Closes the default port. Closing a port is allowed only by the ractor which created the port. + # Therefore, the receiver must be the current ractor. + # + def close + default_port.close end # @@ -765,11 +410,11 @@ class Ractor # # Checks if the object is shareable by ractors. # - # Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are frozen - # Ractor.shareable?('foo') #=> false, unless the string is frozen due to # freeze_string_literals: true + # Ractor.shareable?(1) #=> true -- numbers are shareable + # Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true # Ractor.shareable?('foo'.freeze) #=> true # - # See also the "Shareable and unshareable objects" section in the Ractor class docs. + # See also the "Shareable and unshareable objects" section in the \Ractor class docs. def self.shareable? obj __builtin_cexpr! %q{ RBOOL(rb_ractor_shareable_p(obj)); @@ -780,13 +425,13 @@ class Ractor # call-seq: # Ractor.make_shareable(obj, copy: false) -> shareable_obj # - # Make +obj+ shareable between ractors. + # Makes +obj+ shareable between ractors. # # +obj+ and all the objects it refers to will be frozen, unless they are # already shareable. # - # If +copy+ keyword is +true+, the method will copy objects before freezing them - # This is safer option but it can take be slower. + # If +copy+ keyword is +true+, it will copy objects before freezing them, and will not + # modify +obj+ or its internal objects. # # Note that the specification and implementation of this method are not # mature and may be changed in the future. @@ -823,20 +468,367 @@ class Ractor end end - # get a value from ractor-local storage + # Gets a value from ractor-local storage for the current Ractor. + # Obsolete, use Ractor.[] instead. def [](sym) + if (self != Ractor.current) + raise RuntimeError, "Cannot get ractor local storage for non-current ractor" + end Primitive.ractor_local_value(sym) end - # set a value in ractor-local storage + # Sets a value in ractor-local storage for the current Ractor. + # Obsolete, use Ractor.[]= instead. def []=(sym, val) + if (self != Ractor.current) + raise RuntimeError, "Cannot set ractor local storage for non-current ractor" + end + Primitive.ractor_local_value_set(sym, val) + end + + # Gets a value from ractor-local storage for the current Ractor. + def self.[](sym) + Primitive.ractor_local_value(sym) + end + + # Sets a value in ractor-local storage for the current Ractor. + def self.[]=(sym, val) Primitive.ractor_local_value_set(sym, val) end - # returns main ractor + # call-seq: + # Ractor.store_if_absent(key){ init_block } + # + # If the corresponding ractor-local value is not set, yields a value with + # init_block and stores the value in a thread-safe manner. + # This method returns the stored value. + # + # (1..10).map{ + # Thread.new(it){|i| + # Ractor.store_if_absent(:s){ f(); i } + # #=> return stored value of key :s + # } + # }.map(&:value).uniq.size #=> 1 and f() is called only once + # + def self.store_if_absent(sym) + Primitive.attr! :use_block + Primitive.ractor_local_value_store_if_absent(sym) + end + + # Returns the main ractor. def self.main __builtin_cexpr! %q{ rb_ractor_self(GET_VM()->ractor.main_ractor); } end + + # Returns true if the current ractor is the main ractor. + def self.main? + __builtin_cexpr! %q{ + RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec)) + } + end + + # internal method + def self._require feature # :nodoc: + if main? + super feature + else + Primitive.ractor_require feature + end + end + + class << self + private + + # internal method that is called when the first "Ractor.new" is called + def _activated # :nodoc: + Kernel.prepend Module.new{|m| + m.set_temporary_name '<RactorRequire>' + + def require feature # :nodoc: -- otherwise RDoc outputs it as a class method + if Ractor.main? + super + else + Ractor._require feature + end + end + } + end + end + + # + # call-seq: + # ractor.default_port -> port object + # + # Returns the default port of the Ractor. + # + def default_port + __builtin_cexpr! %q{ + ractor_default_port_value(RACTOR_PTR(self)) + } + end + + # + # call-seq: + # ractor.join -> self + # + # Waits for the termination of the Ractor. + # If the Ractor was aborted (terminated by an unhandled exception), + # the exception is raised in the current ractor. + # + # Ractor.new{}.join #=> ractor + # + # Ractor.new{ raise "foo" }.join + # #=> raises the exception "foo (RuntimeError)" + # + def join + port = Port.new + + self.monitor port + if port.receive == :aborted + __builtin_ractor_value + end + + self + ensure + port.close + end + + # + # call-seq: + # ractor.value -> obj + # + # Waits for +ractor+ to complete and returns its value or raises the exception + # which terminated the Ractor. The termination value will be moved to the calling + # Ractor. Therefore, at most 1 Ractor can receive another ractor's termination value. + # + # r = Ractor.new{ [1, 2] } + # r.value #=> [1, 2] (unshareable object) + # + # Ractor.new(r){|r| r.value} #=> Ractor::Error + # + def value + self.join + __builtin_ractor_value + end + + # + # call-seq: + # ractor.monitor(port) -> self + # + # Registers the port as a monitoring port for this ractor. When the ractor terminates, + # the port receives a Symbol object. + # + # * +:exited+ is sent if the ractor terminates without an unhandled exception. + # * +:aborted+ is sent if the ractor terminates by an unhandled exception. + # + # r = Ractor.new{ some_task() } + # r.monitor(port = Ractor::Port.new) + # port.receive #=> :exited and r is terminated + # + # r = Ractor.new{ raise "foo" } + # r.monitor(port = Ractor::Port.new) + # port.receive #=> :aborted and r is terminated by the RuntimeError "foo" + # + def monitor port + __builtin_ractor_monitor(port) + end + + # + # call-seq: + # ractor.unmonitor(port) -> self + # + # Unregisters the port from the monitoring ports for this ractor. + # + def unmonitor port + __builtin_ractor_unmonitor(port) + end + + # + # call-seq: + # Ractor.shareable_proc(self: nil){} -> shareable proc + # + # Returns a shareable copy of the given block's Proc. The value of +self+ + # in the Proc will be replaced with the value passed via the `self:` keyword, + # or +nil+ if not given. + # + # In a shareable Proc, access to any outer variables is prohibited. + # + # a = 42 + # Ractor.shareable_proc{ p a } + # #=> can not isolate a Proc because it accesses outer variables (a). (ArgumentError) + # + # The value of `self` in the Proc must be a shareable object. + # + # Ractor.shareable_proc(self: self){} + # #=> self should be shareable: main (Ractor::IsolationError) + # + def self.shareable_proc self: nil + Primitive.attr! :use_block + + __builtin_cexpr!(%Q{ + ractor_shareable_proc(ec, *LOCAL_PTR(self), false) + }) + end + + # + # call-seq: + # Ractor.shareable_lambda(self: nil){} -> shareable lambda + # + # Same as Ractor.shareable_proc, but returns a lambda Proc. + # + def self.shareable_lambda self: nil + Primitive.attr! :use_block + + __builtin_cexpr!(%Q{ + ractor_shareable_proc(ec, *LOCAL_PTR(self), true) + }) + end + + # \Port objects transmit messages between Ractors. + class Port + # + # call-seq: + # port.receive -> msg + # + # Receives a message from the port (which was sent there by Port#send). Only the ractor + # that created the port can receive messages this way. + # + # port = Ractor::Port.new + # r = Ractor.new port do |port| + # port.send('message1') + # end + # + # v1 = port.receive + # puts "Received: #{v1}" + # r.join + # # This will print: "Received: message1" + # + # The method blocks the current Thread if the message queue is empty. + # + # port = Ractor::Port.new + # r = Ractor.new port do |port| + # wait + # puts "Still not received" + # port.send('message1') + # wait + # puts "Still received only one" + # port.send('message2') + # end + # puts "Before first receive" + # v1 = port.receive + # puts "Received: #{v1}" + # v2 = port.receive + # puts "Received: #{v2}" + # r.join + # + # Output: + # + # Before first receive + # Still not received + # Received: message1 + # Still received only one + # Received: message2 + # + # If the port is closed and there are no more messages in the message queue, + # the method raises Ractor::ClosedError. + # + # port = Ractor::Port.new + # port.close + # port.receive #=> raise Ractor::ClosedError + # + def receive + __builtin_cexpr! %q{ + ractor_port_receive(ec, self) + } + end + + # + # call-seq: + # port.send(msg, move: false) -> self + # + # Sends a message to the port to be accepted by port.receive. + # + # port = Ractor::Port.new + # r = Ractor.new(port) do |port| + # port.send 'message' + # end + # value = port.receive + # puts "Received #{value}" + # # Prints: "Received: message" + # + # The method is non-blocking (it will return immediately even if the ractor that created the port is not ready + # to receive anything): + # + # port = Ractor::Port.new + # r = Ractor.new(port) do |port| + # port.send 'test' + # puts "Sent successfully" + # # Prints: "Sent successfully" immediately + # end + # + # An attempt to send to a closed port will raise Ractor::ClosedError. + # + # r = Ractor.new {Ractor::Port.new} + # r.join + # p r + # # "#<Ractor:#6 (irb):23 terminated>" + # port = r.value + # port.send('test') # raise Ractor::ClosedError + # + # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning. + # + # If the object is shareable, a reference to the object will be sent to the receiving ractor. + # + def send obj, move: false + __builtin_cexpr! %q{ + ractor_port_send(ec, self, obj, move) + } + end + + alias << send + + # + # call-seq: + # port.close + # + # Closes the port. Sending to a closed port is prohibited. + # Receiving is also prohibited if there are no messages in its message queue. + # + # Only the Ractor which created the port is allowed to close it. + # + # port = Ractor::Port.new + # Ractor.new port do |port| + # port.close #=> closing port by other ractors is not allowed (Ractor::Error) + # end.join + # + def close + __builtin_cexpr! %q{ + ractor_port_close(ec, self) + } + end + + # + # call-seq: + # port.closed? -> true/false + # + # Returns whether or not the port is closed. + def closed? + __builtin_cexpr! %q{ + ractor_port_closed_p(ec, self); + } + end + + # + # call-seq: + # port.inspect -> string + def inspect + "#<Ractor::Port to:\##{ + __builtin_cexpr! "SIZET2NUM(rb_ractor_id((RACTOR_PORT_PTR(self)->r)))" + } id:#{ + __builtin_cexpr! "SIZET2NUM(ractor_port_id(RACTOR_PORT_PTR(self)))" + }>" + end + end end |
