diff options
Diffstat (limited to 'ractor.rb')
| -rw-r--r-- | ractor.rb | 921 |
1 files changed, 415 insertions, 506 deletions
@@ -1,17 +1,15 @@ -# \Ractor is an Actor-model abstraction for Ruby that provides thread-safe parallel execution. -# -# Ractor.new makes a new \Ractor, which can run in parallel. +# Ractor.new creates a new \Ractor, which can run in parallel with other ractors. # # # The simplest ractor # r = Ractor.new {puts "I am in Ractor!"} -# r.take # wait for it to finish +# r.join # wait for it to finish # # Here, "I am in Ractor!" is printed # # Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety # concerns such as data-races and race-conditions are not possible. The other benefit is parallelism. # # To achieve this, object sharing is limited across ractors. -# For example, unlike in threads, ractors can't access all the objects available in other ractors. Even objects normally +# Unlike in threads, ractors can't access all the objects available in other ractors. For example, objects normally # available through variables in the outer scope are prohibited from being used across ractors. # # a = 1 @@ -23,77 +21,34 @@ # a = 1 # r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"} # -# On CRuby (the default implementation), Global Virtual Machine Lock (GVL) is held per ractor, so -# ractors can perform in parallel without locking each other. This is unlike the situation with threads -# on CRuby. +# On CRuby (the default implementation), the Global Virtual Machine Lock (GVL) is held per ractor, so +# ractors can run in parallel. This is unlike the situation with threads on CRuby. # # Instead of accessing shared state, objects should be passed to and from ractors by # sending and receiving them as messages. # # a = 1 # r = Ractor.new do -# a_in_ractor = receive # receive blocks until somebody passes a message +# a_in_ractor = receive # receive blocks the Thread until our default port gets sent a message # puts "I am in Ractor! a=#{a_in_ractor}" # end # r.send(a) # pass it -# r.take +# r.join # # Here, "I am in Ractor! a=1" is printed # -# There are two pairs of methods for sending/receiving messages: -# -# * Ractor#send and Ractor.receive for when the _sender_ knows the receiver (push); -# * Ractor.yield and Ractor#take for when the _receiver_ knows the sender (pull); -# # In addition to that, any arguments passed to Ractor.new are passed to the block and available there -# as if received by Ractor.receive, and the last block value is sent outside of the -# ractor as if sent by Ractor.yield. -# -# A little demonstration of a classic ping-pong: -# -# server = Ractor.new(name: "server") do -# puts "Server starts: #{self.inspect}" -# puts "Server sends: ping" -# Ractor.yield 'ping' # The server doesn't know the receiver and sends to whoever interested -# received = Ractor.receive # The server doesn't know the sender and receives from whoever sent -# puts "Server received: #{received}" -# end -# -# client = Ractor.new(server) do |srv| # The server is sent to the client, and available as srv -# puts "Client starts: #{self.inspect}" -# received = srv.take # The client takes a message from the server -# puts "Client received from " \ -# "#{srv.inspect}: #{received}" -# puts "Client sends to " \ -# "#{srv.inspect}: pong" -# srv.send 'pong' # The client sends a message to the server -# end -# -# [client, server].each(&:take) # Wait until they both finish -# -# This will output something like: -# -# Server starts: #<Ractor:#2 server test.rb:1 running> -# Server sends: ping -# Client starts: #<Ractor:#3 test.rb:8 running> -# Client received from #<Ractor:#2 server test.rb:1 blocking>: ping -# Client sends to #<Ractor:#2 server test.rb:1 blocking>: pong -# Server received: pong -# -# Ractors receive their messages via the <em>incoming port</em>, and send them -# to the <em>outgoing port</em>. Either one can be disabled with Ractor#close_incoming and -# Ractor#close_outgoing, respectively. When a ractor terminates, its ports are closed -# automatically. +# as if received by Ractor.receive, and the last block value can be received with Ractor#value. # # == Shareable and unshareable objects # -# When an object is sent to and from a ractor, it's important to understand whether the +# When an object is sent to a ractor, it's important to understand whether the # object is shareable or unshareable. Most Ruby objects are unshareable objects. Even # frozen objects can be unshareable if they contain (through their instance variables) unfrozen # objects. # -# Shareable objects are those which can be used by several threads without compromising +# Shareable objects are those which can be used by several ractors at once without compromising # thread-safety, for example numbers, +true+ and +false+. Ractor.shareable? allows you to check this, -# and Ractor.make_shareable tries to make the object shareable if it's not already, and gives an error +# and Ractor.make_shareable tries to make the object shareable if it's not already and gives an error # if it can't do it. # # Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable @@ -109,24 +64,24 @@ # ary[0].frozen? #=> true # ary[1].frozen? #=> true # -# When a shareable object is sent (via #send or Ractor.yield), no additional processing occurs -# on it. It just becomes usable by both ractors. When an unshareable object is sent, it can be -# either _copied_ or _moved_. The first is the default, and it copies the object fully by +# When a shareable object is sent via #send, no additional processing occurs +# on it and it becomes usable by both ractors. When an unshareable object is sent, it can be +# either _copied_ or _moved_. Copying is the default, and it copies the object fully by # deep cloning (Object#clone) the non-shareable parts of its structure. # -# data = ['foo', 'bar'.freeze] +# data = ['foo'.dup, 'bar'.freeze] # r = Ractor.new do # data2 = Ractor.receive # puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" # end # r.send(data) -# r.take +# r.join # puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}" # # This will output something like: # -# In ractor: 340, 360, 320 -# Outside : 380, 400, 320 +# In ractor: 8, 16, 24 +# Outside : 32, 40, 24 # # Note that the object ids of the array and the non-frozen string inside the array have changed in # the ractor because they are different objects. The second array's element, which is a @@ -142,7 +97,7 @@ # puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" # end # r.send(data, move: true) -# r.take +# r.join # puts "Outside: moved? #{Ractor::MovedObject === data}" # puts "Outside: #{data.inspect}" # @@ -152,12 +107,12 @@ # Outside: moved? true # test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError) # -# Notice that even +inspect+ (and more basic methods like <tt>__id__</tt>) is inaccessible +# Notice that even +inspect+ and more basic methods like <tt>__id__</tt> are inaccessible # on a moved object. # -# +Class+ and +Module+ objects are shareable so the class/module definitions are shared between ractors. -# \Ractor objects are also shareable. All operations on shareable objects are thread-safe, so the thread-safety property -# will be kept. We can not define mutable shareable objects in Ruby, but C extensions can introduce them. +# +Class+ and +Module+ objects are shareable and their class/module definitions are shared between ractors. +# \Ractor objects are also shareable. All operations on shareable objects are thread-safe across ractors. +# Defining mutable, shareable objects in Ruby is not possible, but C extensions can introduce them. # # It is prohibited to access (get) instance variables of shareable objects in other ractors if the values of the # variables aren't shareable. This can occur because modules/classes are shareable, but they can have @@ -177,7 +132,7 @@ # puts "I can't see #{cls.tricky}" # cls.tricky = true # doesn't get here, but this would also raise an error # end -# r.take +# r.join # # I see C # # can not access instance variables of classes/modules from non-main Ractors (RuntimeError) # @@ -191,7 +146,7 @@ # puts "GOOD=#{GOOD}" # puts "BAD=#{BAD}" # end -# r.take +# r.join # # GOOD=good # # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) # @@ -201,7 +156,7 @@ # puts "I see #{C}" # puts "I can't see #{C.tricky}" # end -# r.take +# r.join # # I see C # # can not access instance variables of classes/modules from non-main Ractors (RuntimeError) # @@ -217,20 +172,21 @@ # a = 1 # Thread.new {puts "Thread in ractor: a=#{a}"}.join # end -# r.take +# r.join # # Here "Thread in ractor: a=1" will be printed # # == Note on code examples # -# In the examples below, sometimes we use the following method to wait for ractors that -# are not currently blocked to finish (or to make progress). +# In the examples below, sometimes we use the following method to wait for ractors +# to make progress or finish. # # def wait # sleep(0.1) # end # -# It is **only for demonstration purposes** and shouldn't be used in a real code. -# Most of the time, #take is used to wait for ractors to finish. +# This is **only for demonstration purposes** and shouldn't be used in a real code. +# Most of the time, #join is used to wait for ractors to finish and Ractor.receive is used +# to wait for messages. # # == Reference # @@ -241,13 +197,13 @@ class Ractor # call-seq: # Ractor.new(*args, name: nil) {|*args| block } -> ractor # - # Create a new \Ractor with args and a block. + # Creates a new \Ractor with args and a block. # - # The given block (Proc) will be isolated (can't access any outer variables). +self+ + # The given block (Proc) is isolated (can't access any outer variables). +self+ # inside the block will refer to the current \Ractor. # # r = Ractor.new { puts "Hi, I am #{self.inspect}" } - # r.take + # r.join # # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>" # # Any +args+ passed are propagated to the block arguments by the same rules as @@ -259,14 +215,14 @@ class Ractor # r = Ractor.new(arg) {|received_arg| # puts "Received: #{received_arg} (##{received_arg.object_id})" # } - # r.take + # r.join # # Prints: # # Passing: [1, 2, 3] (#280) # # Received: [1, 2, 3] (#300) # # Ractor's +name+ can be set for debugging purposes: # - # r = Ractor.new(name: 'my ractor') {}; r.take + # r = Ractor.new(name: 'my ractor') {}; r.join # p r # #=> #<Ractor:#3 my ractor test.rb:1 terminated> # @@ -274,8 +230,8 @@ class Ractor b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") - Kernel.warn("Ractor is experimental, and the behavior may change in future versions of Ruby! " \ - "Also there are many implementation issues.", uplevel: 0, category: :experimental) + Kernel.warn("Ractor API is experimental and may change in future versions of Ruby.", + uplevel: 0, category: :experimental) end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" @@ -291,13 +247,13 @@ class Ractor } end - # Returns the number of Ractors currently running or blocking (waiting). + # Returns the number of ractors currently running or blocking (waiting). # # Ractor.count #=> 1 - # r = Ractor.new(name: 'example') { Ractor.yield(1) } + # r = Ractor.new(name: 'example') { Ractor.receive } # Ractor.count #=> 2 (main + example ractor) - # r.take # wait for Ractor.yield(1) - # r.take # wait until r will finish + # r << 42 # r's Ractor.receive will resume + # r.join # wait for r's termination # Ractor.count #=> 1 def self.count __builtin_cexpr! %q{ @@ -307,130 +263,91 @@ class Ractor # # call-seq: - # Ractor.select(*ractors, [yield_value:, move: false]) -> [ractor or symbol, obj] - # - # Wait for any ractor to have something in its outgoing port, read from this ractor, and - # then return that ractor and the object received. - # - # r1 = Ractor.new {Ractor.yield 'from 1'} - # r2 = Ractor.new {Ractor.yield 'from 2'} - # - # r, obj = Ractor.select(r1, r2) - # - # puts "received #{obj.inspect} from #{r.inspect}" - # # Prints: received "from 1" from #<Ractor:#2 test.rb:1 running> - # # But could just as well print "from r2" here, either prints could be first. - # - # If one of the given ractors is the current ractor, and it is selected, +r+ will contain - # the +:receive+ symbol instead of the ractor object. - # - # r1 = Ractor.new(Ractor.current) do |main| - # main.send 'to main' - # Ractor.yield 'from 1' - # end - # r2 = Ractor.new do - # Ractor.yield 'from 2' + # Ractor.select(*ractors_or_ports) -> [ractor or port, obj] + # + # Blocks the current Thread until one of the given ports has received a message. Returns an + # array of two elements where the first element is the Port and the second is the received object. + # This method can also accept Ractor objects themselves, and in that case will wait until one + # has terminated and return a two-element array where the first element is the ractor and the + # second is its termination value. + # + # p1, p2 = Ractor::Port.new, Ractor::Port.new + # ps = [p1, p2] + # rs = 2.times.map do |i| + # Ractor.new(ps.shift, i) do |p, i| + # sleep rand(0.99) + # p.send("r#{i}") + # sleep rand(0.99) + # "r#{i} done" + # end # end # - # r, obj = Ractor.select(r1, r2, Ractor.current) - # puts "received #{obj.inspect} from #{r.inspect}" - # # Could print: received "to main" from :receive - # - # If +yield_value+ is provided, that value may be yielded if another ractor is calling #take. - # In this case, the pair <tt>[:yield, nil]</tt> is returned: - # - # r1 = Ractor.new(Ractor.current) do |main| - # puts "Received from main: #{main.take}" + # waiting_on = [p1, p2, *rs] + # until waiting_on.empty? + # received_on, obj = Ractor.select(*waiting_on) + # waiting_on.delete(received_on) + # puts obj # end # - # puts "Trying to select" - # r, obj = Ractor.select(r1, Ractor.current, yield_value: 123) - # wait - # puts "Received #{obj.inspect} from #{r.inspect}" + # # r0 + # # r1 + # # r1 done + # # r0 done # - # This will print: + # The following example is almost equivalent to <code>ractors.map(&:value)</code> except the thread + # is unblocked when any of the ractors has terminated as opposed to waiting for their termination in + # the array element order. # - # Trying to select - # Received from main: 123 - # Received nil from :yield + # values = [] + # until ractors.empty? + # r, val = Ractor.select(*ractors) + # ractors.delete(r) + # values << val + # end # - # +move+ boolean flag defines whether yielded value will be copied (default) or moved. - def self.select(*ractors, yield_value: yield_unspecified = true, move: false) - raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? - - if ractors.delete Ractor.current - do_receive = true - else - do_receive = false + def self.select(*ports) + raise ArgumentError, 'specify at least one Ractor::Port or Ractor' if ports.empty? + + monitors = {} # Ractor::Port => Ractor + + ports = ports.map do |arg| + case arg + when Ractor + port = Ractor::Port.new + monitors[port] = arg + arg.monitor port + port + when Ractor::Port + arg + else + raise ArgumentError, "should be Ractor::Port or Ractor" + end end - __builtin_ractor_select_internal ractors, do_receive, !yield_unspecified, yield_value, move + begin + result_port, obj = __builtin_ractor_select_internal(ports) + + if r = monitors[result_port] + [r, r.value] + else + [result_port, obj] + end + ensure + # close all ports for join + monitors.each do |port, r| + r.unmonitor port + port.close + end + end end # # call-seq: - # Ractor.receive -> msg - # - # Receive a message from the incoming port of the current ractor (which was - # sent there by #send from another ractor). - # - # r = Ractor.new do - # v1 = Ractor.receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # Here will be printed: "Received: message1" - # - # Alternatively, the private instance method +receive+ may be used: - # - # r = Ractor.new do - # v1 = receive - # puts "Received: #{v1}" - # end - # r.send('message1') - # r.take - # # This prints: "Received: message1" - # - # The method blocks if the queue is empty. - # - # r = Ractor.new do - # puts "Before first receive" - # v1 = Ractor.receive - # puts "Received: #{v1}" - # v2 = Ractor.receive - # puts "Received: #{v2}" - # end - # wait - # puts "Still not received" - # r.send('message1') - # wait - # puts "Still received only one" - # r.send('message2') - # r.take - # - # Output: - # - # Before first receive - # Still not received - # Received: message1 - # Still received only one - # Received: message2 - # - # If close_incoming was called on the ractor, the method raises Ractor::ClosedError - # if there are no more messages in the incoming queue: - # - # Ractor.new do - # close_incoming - # receive - # end - # wait - # # in `receive': The incoming port is already closed => #<Ractor:#2 test.rb:1 running> (Ractor::ClosedError) + # Ractor.receive -> obj # + # Receives a message from the current ractor's default port. def self.receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } + Ractor.current.default_port.receive end class << self @@ -439,280 +356,21 @@ class Ractor # same as Ractor.receive private def receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } + default_port.receive end alias recv receive # # call-seq: - # Ractor.receive_if {|msg| block } -> msg - # - # Receive only a specific message. - # - # Instead of Ractor.receive, Ractor.receive_if can be given a pattern (or any - # filter) in a block and you can choose the messages to accept that are available in - # your ractor's incoming queue. - # - # r = Ractor.new do - # p Ractor.receive_if{|msg| msg.match?(/foo/)} #=> "foo3" - # p Ractor.receive_if{|msg| msg.match?(/bar/)} #=> "bar1" - # p Ractor.receive_if{|msg| msg.match?(/baz/)} #=> "baz2" - # end - # r << "bar1" - # r << "baz2" - # r << "foo3" - # r.take - # - # This will output: - # - # foo3 - # bar1 - # baz2 - # - # If the block returns a truthy value, the message is removed from the incoming queue - # and returned. - # Otherwise, the message remains in the incoming queue and the next messages are checked - # by the given block. - # - # If there are no messages left in the incoming queue, the method will - # block until new messages arrive. - # - # If the block is escaped by break/return/exception/throw, the message is removed from - # the incoming queue as if a truthy value had been returned. - # - # r = Ractor.new do - # val = Ractor.receive_if{|msg| msg.is_a?(Array)} - # puts "Received successfully: #{val}" - # end - # - # r.send(1) - # r.send('test') - # wait - # puts "2 non-matching sent, nothing received" - # r.send([1, 2, 3]) - # wait - # - # Prints: - # - # 2 non-matching sent, nothing received - # Received successfully: [1, 2, 3] - # - # Note that you can not call receive/receive_if in the given block recursively. - # You should not do any tasks in the block other than message filtration. - # - # Ractor.current << true - # Ractor.receive_if{|msg| Ractor.receive} - # #=> `receive': can not call receive/receive_if recursively (Ractor::Error) - # - def self.receive_if &b - Primitive.ractor_receive_if b - end - - # same as Ractor.receive_if - private def receive_if &b - Primitive.ractor_receive_if b - end - - # - # call-seq: - # ractor.send(msg, move: false) -> self - # - # Send a message to a Ractor's incoming queue to be accepted by Ractor.receive. - # - # r = Ractor.new do - # value = Ractor.receive - # puts "Received #{value}" - # end - # r.send 'message' - # # Prints: "Received: message" - # - # The method is non-blocking (will return immediately even if the ractor is not ready - # to receive anything): - # - # r = Ractor.new {sleep(5)} - # r.send('test') - # puts "Sent successfully" - # # Prints: "Sent successfully" immediately - # - # An attempt to send to a ractor which already finished its execution will raise Ractor::ClosedError. - # - # r = Ractor.new {} - # r.take - # p r - # # "#<Ractor:#6 (irb):23 terminated>" - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) + # ractor.send(msg, move: false) -> self # - # If close_incoming was called on the ractor, the method also raises Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # receive - # end - # r.close_incoming - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - # # The error is raised immediately, not when the ractor tries to receive - # - # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning. - # If <tt>move: true</tt> is passed, the object is _moved_ into the receiving ractor and becomes - # inaccessible to the sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # msg = 'message' - # r.send(msg, move: true) - # r.take - # p msg - # - # This prints: - # - # Received: message - # in `p': undefined method `inspect' for #<Ractor::MovedObject:0x000055c99b9b69b8> - # - # All references to the object and its parts will become invalid to the sender. - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message' - # ary = [s] - # copy = ary.dup - # r.send(ary, move: true) - # - # s.inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # ary.class - # # Ractor::MovedError (can not send any methods to a moved object) - # copy.class - # # => Array, it is different object - # copy[0].inspect - # # Ractor::MovedError (can not send any methods to a moved object) - # # ...but its item was still a reference to `s`, which was moved - # - # If the object is shareable, <tt>move: true</tt> has no effect on it: - # - # r = Ractor.new {puts "Received: #{receive}"} - # s = 'message'.freeze - # r.send(s, move: true) - # s.inspect #=> "message", still available - # - def send(obj, move: false) - __builtin_cexpr! %q{ - ractor_send(ec, RACTOR_PTR(self), obj, move) - } + # This is equivalent to Port#send to the ractor's #default_port. + def send(...) + default_port.send(...) + self end alias << send - # - # call-seq: - # Ractor.yield(msg, move: false) -> nil - # - # Send a message to the current ractor's outgoing port to be accepted by #take. - # - # r = Ractor.new {Ractor.yield 'Hello from ractor'} - # puts r.take - # # Prints: "Hello from ractor" - # - # This method is blocking, and will return only when somebody consumes the - # sent message. - # - # r = Ractor.new do - # Ractor.yield 'Hello from ractor' - # puts "Ractor: after yield" - # end - # wait - # puts "Still not taken" - # puts r.take - # - # This will print: - # - # Still not taken - # Hello from ractor - # Ractor: after yield - # - # If the outgoing port was closed with #close_outgoing, the method will raise: - # - # r = Ractor.new do - # close_outgoing - # Ractor.yield 'Hello from ractor' - # end - # wait - # # `yield': The outgoing-port is already closed (Ractor::ClosedError) - # - # The meaning of the +move+ argument is the same as for #send. - def self.yield(obj, move: false) - __builtin_cexpr! %q{ - ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) - } - end - - # - # call-seq: - # ractor.take -> msg - # - # Get a message from the ractor's outgoing port, which was put there by Ractor.yield or at ractor's - # termination. - # - # r = Ractor.new do - # Ractor.yield 'explicit yield' - # 'last value' - # end - # puts r.take #=> 'explicit yield' - # puts r.take #=> 'last value' - # puts r.take # Ractor::ClosedError (The outgoing-port is already closed) - # - # The fact that the last value is also sent to the outgoing port means that +take+ can be used - # as an analog of Thread#join ("just wait until ractor finishes"). However, it will raise if - # somebody has already consumed that message. - # - # If the outgoing port was closed with #close_outgoing, the method will raise Ractor::ClosedError. - # - # r = Ractor.new do - # sleep(500) - # Ractor.yield 'Hello from ractor' - # end - # r.close_outgoing - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - # # The error would be raised immediately, not when ractor will try to receive - # - # If an uncaught exception is raised in the Ractor, it is propagated by take as a - # Ractor::RemoteError. - # - # r = Ractor.new {raise "Something weird happened"} - # - # begin - # r.take - # rescue => e - # p e # => #<Ractor::RemoteError: thrown by remote Ractor.> - # p e.ractor == r # => true - # p e.cause # => #<RuntimeError: Something weird happened> - # end - # - # Ractor::ClosedError is a descendant of StopIteration, so the termination of the ractor will break - # out of any loops that receive this message without propagating the error: - # - # r = Ractor.new do - # 3.times {|i| Ractor.yield "message #{i}"} - # "finishing" - # end - # - # loop {puts "Received: " + r.take} - # puts "Continue successfully" - # - # This will print: - # - # Received: message 0 - # Received: message 1 - # Received: message 2 - # Received: finishing - # Continue successfully - def take - __builtin_cexpr! %q{ - ractor_take(ec, RACTOR_PTR(self)) - } - end - def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } @@ -725,50 +383,25 @@ class Ractor alias to_s inspect - # The name set in Ractor.new, or +nil+. + # Returns the name set in Ractor.new, or +nil+. def name __builtin_cexpr! %q{RACTOR_PTR(self)->name} end class RemoteError - # The Ractor an uncaught exception is raised in. + # The Ractor in which the uncaught exception was raised. attr_reader :ractor end # # call-seq: - # ractor.close_incoming -> true | false + # ractor.close -> true | false # - # Closes the incoming port and returns whether it was already closed. All further attempts - # to Ractor.receive in the ractor, and #send to the ractor will fail with Ractor::ClosedError. + # Closes the default port. Closing a port is allowed only by the ractor which created the port. + # Therefore, the receiver must be the current ractor. # - # r = Ractor.new {sleep(500)} - # r.close_incoming #=> false - # r.close_incoming #=> true - # r.send('test') - # # Ractor::ClosedError (The incoming-port is already closed) - def close_incoming - __builtin_cexpr! %q{ - ractor_close_incoming(ec, RACTOR_PTR(self)); - } - end - - # - # call-seq: - # ractor.close_outgoing -> true | false - # - # Closes the outgoing port and returns whether it was already closed. All further attempts - # to Ractor.yield in the ractor, and #take from the ractor will fail with Ractor::ClosedError. - # - # r = Ractor.new {sleep(500)} - # r.close_outgoing #=> false - # r.close_outgoing #=> true - # r.take - # # Ractor::ClosedError (The outgoing-port is already closed) - def close_outgoing - __builtin_cexpr! %q{ - ractor_close_outgoing(ec, RACTOR_PTR(self)); - } + def close + default_port.close end # @@ -777,7 +410,7 @@ class Ractor # # Checks if the object is shareable by ractors. # - # Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are frozen + # Ractor.shareable?(1) #=> true -- numbers are shareable # Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true # Ractor.shareable?('foo'.freeze) #=> true # @@ -792,7 +425,7 @@ class Ractor # call-seq: # Ractor.make_shareable(obj, copy: false) -> shareable_obj # - # Make +obj+ shareable between ractors. + # Makes +obj+ shareable between ractors. # # +obj+ and all the objects it refers to will be frozen, unless they are # already shareable. @@ -835,8 +468,8 @@ class Ractor end end - # get a value from ractor-local storage for current Ractor - # Obsolete and use Ractor.[] instead. + # Gets a value from ractor-local storage for the current Ractor. + # Obsolete, use Ractor.[] instead. def [](sym) if (self != Ractor.current) raise RuntimeError, "Cannot get ractor local storage for non-current ractor" @@ -844,8 +477,8 @@ class Ractor Primitive.ractor_local_value(sym) end - # set a value in ractor-local storage for current Ractor - # Obsolete and use Ractor.[]= instead. + # Sets a value in ractor-local storage for the current Ractor. + # Obsolete, use Ractor.[]= instead. def []=(sym, val) if (self != Ractor.current) raise RuntimeError, "Cannot set ractor local storage for non-current ractor" @@ -853,12 +486,12 @@ class Ractor Primitive.ractor_local_value_set(sym, val) end - # get a value from ractor-local storage of current Ractor + # Gets a value from ractor-local storage for the current Ractor. def self.[](sym) Primitive.ractor_local_value(sym) end - # set a value in ractor-local storage of current Ractor + # Sets a value in ractor-local storage for the current Ractor. def self.[]=(sym, val) Primitive.ractor_local_value_set(sym, val) end @@ -866,9 +499,9 @@ class Ractor # call-seq: # Ractor.store_if_absent(key){ init_block } # - # If the corresponding value is not set, yield a value with - # init_block and store the value in thread-safe manner. - # This method returns corresponding stored value. + # If the corresponding ractor-local value is not set, yields a value with + # init_block and stores the value in a thread-safe manner. + # This method returns the stored value. # # (1..10).map{ # Thread.new(it){|i| @@ -878,17 +511,18 @@ class Ractor # }.map(&:value).uniq.size #=> 1 and f() is called only once # def self.store_if_absent(sym) + Primitive.attr! :use_block Primitive.ractor_local_value_store_if_absent(sym) end - # returns main ractor + # Returns the main ractor. def self.main __builtin_cexpr! %q{ rb_ractor_self(GET_VM()->ractor.main_ractor); } end - # return true if the current ractor is main ractor + # Returns true if the current ractor is the main ractor. def self.main? __builtin_cexpr! %q{ RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec)) @@ -922,4 +556,279 @@ class Ractor } end end + + # + # call-seq: + # ractor.default_port -> port object + # + # Returns the default port of the Ractor. + # + def default_port + __builtin_cexpr! %q{ + ractor_default_port_value(RACTOR_PTR(self)) + } + end + + # + # call-seq: + # ractor.join -> self + # + # Waits for the termination of the Ractor. + # If the Ractor was aborted (terminated by an unhandled exception), + # the exception is raised in the current ractor. + # + # Ractor.new{}.join #=> ractor + # + # Ractor.new{ raise "foo" }.join + # #=> raises the exception "foo (RuntimeError)" + # + def join + port = Port.new + + self.monitor port + if port.receive == :aborted + __builtin_ractor_value + end + + self + ensure + port.close + end + + # + # call-seq: + # ractor.value -> obj + # + # Waits for +ractor+ to complete and returns its value or raises the exception + # which terminated the Ractor. The termination value will be moved to the calling + # Ractor. Therefore, at most 1 Ractor can receive another ractor's termination value. + # + # r = Ractor.new{ [1, 2] } + # r.value #=> [1, 2] (unshareable object) + # + # Ractor.new(r){|r| r.value} #=> Ractor::Error + # + def value + self.join + __builtin_ractor_value + end + + # + # call-seq: + # ractor.monitor(port) -> self + # + # Registers the port as a monitoring port for this ractor. When the ractor terminates, + # the port receives a Symbol object. + # + # * +:exited+ is sent if the ractor terminates without an unhandled exception. + # * +:aborted+ is sent if the ractor terminates by an unhandled exception. + # + # r = Ractor.new{ some_task() } + # r.monitor(port = Ractor::Port.new) + # port.receive #=> :exited and r is terminated + # + # r = Ractor.new{ raise "foo" } + # r.monitor(port = Ractor::Port.new) + # port.receive #=> :aborted and r is terminated by the RuntimeError "foo" + # + def monitor port + __builtin_ractor_monitor(port) + end + + # + # call-seq: + # ractor.unmonitor(port) -> self + # + # Unregisters the port from the monitoring ports for this ractor. + # + def unmonitor port + __builtin_ractor_unmonitor(port) + end + + # + # call-seq: + # Ractor.shareable_proc(self: nil){} -> shareable proc + # + # Returns a shareable copy of the given block's Proc. The value of +self+ + # in the Proc will be replaced with the value passed via the `self:` keyword, + # or +nil+ if not given. + # + # In a shareable Proc, access to any outer variables is prohibited. + # + # a = 42 + # Ractor.shareable_proc{ p a } + # #=> can not isolate a Proc because it accesses outer variables (a). (ArgumentError) + # + # The value of `self` in the Proc must be a shareable object. + # + # Ractor.shareable_proc(self: self){} + # #=> self should be shareable: main (Ractor::IsolationError) + # + def self.shareable_proc self: nil + Primitive.attr! :use_block + + __builtin_cexpr!(%Q{ + ractor_shareable_proc(ec, *LOCAL_PTR(self), false) + }) + end + + # + # call-seq: + # Ractor.shareable_lambda(self: nil){} -> shareable lambda + # + # Same as Ractor.shareable_proc, but returns a lambda Proc. + # + def self.shareable_lambda self: nil + Primitive.attr! :use_block + + __builtin_cexpr!(%Q{ + ractor_shareable_proc(ec, *LOCAL_PTR(self), true) + }) + end + + # \Port objects transmit messages between Ractors. + class Port + # + # call-seq: + # port.receive -> msg + # + # Receives a message from the port (which was sent there by Port#send). Only the ractor + # that created the port can receive messages this way. + # + # port = Ractor::Port.new + # r = Ractor.new port do |port| + # port.send('message1') + # end + # + # v1 = port.receive + # puts "Received: #{v1}" + # r.join + # # This will print: "Received: message1" + # + # The method blocks the current Thread if the message queue is empty. + # + # port = Ractor::Port.new + # r = Ractor.new port do |port| + # wait + # puts "Still not received" + # port.send('message1') + # wait + # puts "Still received only one" + # port.send('message2') + # end + # puts "Before first receive" + # v1 = port.receive + # puts "Received: #{v1}" + # v2 = port.receive + # puts "Received: #{v2}" + # r.join + # + # Output: + # + # Before first receive + # Still not received + # Received: message1 + # Still received only one + # Received: message2 + # + # If the port is closed and there are no more messages in the message queue, + # the method raises Ractor::ClosedError. + # + # port = Ractor::Port.new + # port.close + # port.receive #=> raise Ractor::ClosedError + # + def receive + __builtin_cexpr! %q{ + ractor_port_receive(ec, self) + } + end + + # + # call-seq: + # port.send(msg, move: false) -> self + # + # Sends a message to the port to be accepted by port.receive. + # + # port = Ractor::Port.new + # r = Ractor.new(port) do |port| + # port.send 'message' + # end + # value = port.receive + # puts "Received #{value}" + # # Prints: "Received: message" + # + # The method is non-blocking (it will return immediately even if the ractor that created the port is not ready + # to receive anything): + # + # port = Ractor::Port.new + # r = Ractor.new(port) do |port| + # port.send 'test' + # puts "Sent successfully" + # # Prints: "Sent successfully" immediately + # end + # + # An attempt to send to a closed port will raise Ractor::ClosedError. + # + # r = Ractor.new {Ractor::Port.new} + # r.join + # p r + # # "#<Ractor:#6 (irb):23 terminated>" + # port = r.value + # port.send('test') # raise Ractor::ClosedError + # + # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning. + # + # If the object is shareable, a reference to the object will be sent to the receiving ractor. + # + def send obj, move: false + __builtin_cexpr! %q{ + ractor_port_send(ec, self, obj, move) + } + end + + alias << send + + # + # call-seq: + # port.close + # + # Closes the port. Sending to a closed port is prohibited. + # Receiving is also prohibited if there are no messages in its message queue. + # + # Only the Ractor which created the port is allowed to close it. + # + # port = Ractor::Port.new + # Ractor.new port do |port| + # port.close #=> closing port by other ractors is not allowed (Ractor::Error) + # end.join + # + def close + __builtin_cexpr! %q{ + ractor_port_close(ec, self) + } + end + + # + # call-seq: + # port.closed? -> true/false + # + # Returns whether or not the port is closed. + def closed? + __builtin_cexpr! %q{ + ractor_port_closed_p(ec, self); + } + end + + # + # call-seq: + # port.inspect -> string + def inspect + "#<Ractor::Port to:\##{ + __builtin_cexpr! "SIZET2NUM(rb_ractor_id((RACTOR_PORT_PTR(self)->r)))" + } id:#{ + __builtin_cexpr! "SIZET2NUM(ractor_port_id(RACTOR_PORT_PTR(self)))" + }>" + end + end end |
