diff options
Diffstat (limited to 'ractor.rb')
| -rw-r--r-- | ractor.rb | 865 |
1 files changed, 759 insertions, 106 deletions
@@ -1,181 +1,834 @@ +# Ractor.new creates a new \Ractor, which can run in parallel with other ractors. +# +# # The simplest ractor +# r = Ractor.new {puts "I am in Ractor!"} +# r.join # wait for it to finish +# # Here, "I am in Ractor!" is printed +# +# Ractors do not share all objects with each other. There are two main benefits to this: across ractors, thread-safety +# concerns such as data-races and race-conditions are not possible. The other benefit is parallelism. +# +# To achieve this, object sharing is limited across ractors. +# Unlike in threads, ractors can't access all the objects available in other ractors. For example, objects normally +# available through variables in the outer scope are prohibited from being used across ractors. +# +# a = 1 +# r = Ractor.new {puts "I am in Ractor! a=#{a}"} +# # fails immediately with +# # Ractor::IsolationError (can not isolate a Proc because it accesses outer variables (a).) +# +# The object must be explicitly shared: +# a = 1 +# r = Ractor.new(a) { |a1| puts "I am in Ractor! a=#{a1}"} +# +# On CRuby (the default implementation), the Global Virtual Machine Lock (GVL) is held per ractor, so +# ractors can run in parallel. This is unlike the situation with threads on CRuby. +# +# Instead of accessing shared state, objects should be passed to and from ractors by +# sending and receiving them as messages. +# +# a = 1 +# r = Ractor.new do +# a_in_ractor = receive # receive blocks the Thread until our default port gets sent a message +# puts "I am in Ractor! a=#{a_in_ractor}" +# end +# r.send(a) # pass it +# r.join +# # Here, "I am in Ractor! a=1" is printed +# +# In addition to that, any arguments passed to Ractor.new are passed to the block and available there +# as if received by Ractor.receive, and the last block value can be received with Ractor#value. +# +# == Shareable and unshareable objects +# +# When an object is sent to a ractor, it's important to understand whether the +# object is shareable or unshareable. Most Ruby objects are unshareable objects. Even +# frozen objects can be unshareable if they contain (through their instance variables) unfrozen +# objects. +# +# Shareable objects are those which can be used by several ractors at once without compromising +# thread-safety, for example numbers, +true+ and +false+. Ractor.shareable? allows you to check this, +# and Ractor.make_shareable tries to make the object shareable if it's not already and gives an error +# if it can't do it. +# +# Ractor.shareable?(1) #=> true -- numbers and other immutable basic values are shareable +# Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true +# Ractor.shareable?('foo'.freeze) #=> true +# Ractor.shareable?([Object.new].freeze) #=> false, inner object is unfrozen +# +# ary = ['hello', 'world'] +# ary.frozen? #=> false +# ary[0].frozen? #=> false +# Ractor.make_shareable(ary) +# ary.frozen? #=> true +# ary[0].frozen? #=> true +# ary[1].frozen? #=> true +# +# When a shareable object is sent via #send, no additional processing occurs +# on it and it becomes usable by both ractors. When an unshareable object is sent, it can be +# either _copied_ or _moved_. Copying is the default, and it copies the object fully by +# deep cloning (Object#clone) the non-shareable parts of its structure. +# +# data = ['foo'.dup, 'bar'.freeze] +# r = Ractor.new do +# data2 = Ractor.receive +# puts "In ractor: #{data2.object_id}, #{data2[0].object_id}, #{data2[1].object_id}" +# end +# r.send(data) +# r.join +# puts "Outside : #{data.object_id}, #{data[0].object_id}, #{data[1].object_id}" +# +# This will output something like: +# +# In ractor: 8, 16, 24 +# Outside : 32, 40, 24 +# +# Note that the object ids of the array and the non-frozen string inside the array have changed in +# the ractor because they are different objects. The second array's element, which is a +# shareable frozen string, is the same object. +# +# Deep cloning of objects may be slow, and sometimes impossible. Alternatively, <tt>move: true</tt> may +# be used during sending. This will <em>move</em> the unshareable object to the receiving ractor, making it +# inaccessible to the sending ractor. +# +# data = ['foo', 'bar'] +# r = Ractor.new do +# data_in_ractor = Ractor.receive +# puts "In ractor: #{data_in_ractor.object_id}, #{data_in_ractor[0].object_id}" +# end +# r.send(data, move: true) +# r.join +# puts "Outside: moved? #{Ractor::MovedObject === data}" +# puts "Outside: #{data.inspect}" +# +# This will output: +# +# In ractor: 100, 120 +# Outside: moved? true +# test.rb:9:in `method_missing': can not send any methods to a moved object (Ractor::MovedError) +# +# Notice that even +inspect+ and more basic methods like <tt>__id__</tt> are inaccessible +# on a moved object. +# +# +Class+ and +Module+ objects are shareable and their class/module definitions are shared between ractors. +# \Ractor objects are also shareable. All operations on shareable objects are thread-safe across ractors. +# Defining mutable, shareable objects in Ruby is not possible, but C extensions can introduce them. +# +# It is prohibited to access (get) instance variables of shareable objects in other ractors if the values of the +# variables aren't shareable. This can occur because modules/classes are shareable, but they can have +# instance variables whose values are not. In non-main ractors, it's also prohibited to set instance +# variables on classes/modules (even if the value is shareable). +# +# class C +# class << self +# attr_accessor :tricky +# end +# end +# +# C.tricky = "unshareable".dup +# +# r = Ractor.new(C) do |cls| +# puts "I see #{cls}" +# puts "I can't see #{cls.tricky}" +# cls.tricky = true # doesn't get here, but this would also raise an error +# end +# r.join +# # I see C +# # can not access instance variables of classes/modules from non-main Ractors (RuntimeError) +# +# Ractors can access constants if they are shareable. The main \Ractor is the only one that can +# access non-shareable constants. +# +# GOOD = 'good'.freeze +# BAD = 'bad'.dup +# +# r = Ractor.new do +# puts "GOOD=#{GOOD}" +# puts "BAD=#{BAD}" +# end +# r.join +# # GOOD=good +# # can not access non-shareable objects in constant Object::BAD by non-main Ractor. (NameError) +# +# # Consider the same C class from above +# +# r = Ractor.new do +# puts "I see #{C}" +# puts "I can't see #{C.tricky}" +# end +# r.join +# # I see C +# # can not access instance variables of classes/modules from non-main Ractors (RuntimeError) +# +# See also the description of <tt># shareable_constant_value</tt> pragma in +# {Comments syntax}[rdoc-ref:syntax/comments.rdoc] explanation. +# +# == Ractors vs threads +# +# Each ractor has its own main Thread. New threads can be created from inside ractors +# (and, on CRuby, they share the GVL with other threads of this ractor). +# +# r = Ractor.new do +# a = 1 +# Thread.new {puts "Thread in ractor: a=#{a}"}.join +# end +# r.join +# # Here "Thread in ractor: a=1" will be printed +# +# == Note on code examples +# +# In the examples below, sometimes we use the following method to wait for ractors +# to make progress or finish. +# +# def wait +# sleep(0.1) +# end +# +# This is **only for demonstration purposes** and shouldn't be used in a real code. +# Most of the time, #join is used to wait for ractors to finish and Ractor.receive is used +# to wait for messages. +# +# == Reference +# +# See {Ractor design doc}[rdoc-ref:ractor.md] for more details. +# class Ractor - # Create a new Ractor with args and a block. - # args are passed via incoming channel. - # A block (Proc) will be isolated (can't access to outer variables) - # - # A ractor has default two channels: - # an incoming channel and an outgoing channel. - # - # Other ractors send objects to the ractor via the incoming channel and - # the ractor receives them. - # The ractor send objects via the outgoing channel and other ractors can - # receive them. - # - # The result of the block is sent via the outgoing channel - # and other - # - # r = Ractor.new do - # Ractor.receive # receive via r's mailbox => 1 - # Ractor.receive # receive via r's mailbox => 2 - # Ractor.yield 3 # yield a message (3) and wait for taking by another ractor. - # 'ok' # the return value will be yielded. - # # and r's incoming/outgoing ports are closed automatically. - # end - # r.send 1 # send a message (1) into r's mailbox. - # r << 2 # << is an alias of `send`. - # p r.take # take a message from r's outgoing port => 3 - # p r.take # => 'ok' - # p r.take # raise Ractor::ClosedError - # - # other options: - # name: Ractor's name + # + # call-seq: + # Ractor.new(*args, name: nil) {|*args| block } -> ractor + # + # Creates a new \Ractor with args and a block. + # + # The given block (Proc) is isolated (can't access any outer variables). +self+ + # inside the block will refer to the current \Ractor. + # + # r = Ractor.new { puts "Hi, I am #{self.inspect}" } + # r.join + # # Prints "Hi, I am #<Ractor:#2 test.rb:1 running>" + # + # Any +args+ passed are propagated to the block arguments by the same rules as + # objects sent via #send/Ractor.receive. If an argument in +args+ is not shareable, it + # will be copied (via deep cloning, which might be inefficient). + # + # arg = [1, 2, 3] + # puts "Passing: #{arg} (##{arg.object_id})" + # r = Ractor.new(arg) {|received_arg| + # puts "Received: #{received_arg} (##{received_arg.object_id})" + # } + # r.join + # # Prints: + # # Passing: [1, 2, 3] (#280) + # # Received: [1, 2, 3] (#300) + # + # Ractor's +name+ can be set for debugging purposes: + # + # r = Ractor.new(name: 'my ractor') {}; r.join + # p r + # #=> #<Ractor:#3 my ractor test.rb:1 terminated> # def self.new(*args, name: nil, &block) b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block + if __builtin_cexpr!("RBOOL(ruby_single_main_ractor)") + Kernel.warn("Ractor API is experimental and may change in future versions of Ruby.", + uplevel: 0, category: :experimental) + end loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" __builtin_ractor_create(loc, name, args, b) end - # return current Ractor + # Returns the currently executing Ractor. + # + # Ractor.current #=> #<Ractor:#1 running> def self.current __builtin_cexpr! %q{ - rb_ec_ractor_ptr(ec)->self + rb_ractor_self(rb_ec_ractor_ptr(ec)); } end + # Returns the number of ractors currently running or blocking (waiting). + # + # Ractor.count #=> 1 + # r = Ractor.new(name: 'example') { Ractor.receive } + # Ractor.count #=> 2 (main + example ractor) + # r << 42 # r's Ractor.receive will resume + # r.join # wait for r's termination + # Ractor.count #=> 1 def self.count __builtin_cexpr! %q{ ULONG2NUM(GET_VM()->ractor.cnt); } end - # Multiplex multiple Ractor communications. # - # r, obj = Ractor.select(r1, r2) - # #=> wait for taking from r1 or r2 - # # returned obj is a taken object from Ractor r + # call-seq: + # Ractor.select(*ractors_or_ports) -> [ractor or port, obj] + # + # Blocks the current Thread until one of the given ports has received a message. Returns an + # array of two elements where the first element is the Port and the second is the received object. + # This method can also accept Ractor objects themselves, and in that case will wait until one + # has terminated and return a two-element array where the first element is the ractor and the + # second is its termination value. + # + # p1, p2 = Ractor::Port.new, Ractor::Port.new + # ps = [p1, p2] + # rs = 2.times.map do |i| + # Ractor.new(ps.shift, i) do |p, i| + # sleep rand(0.99) + # p.send("r#{i}") + # sleep rand(0.99) + # "r#{i} done" + # end + # end + # + # waiting_on = [p1, p2, *rs] + # until waiting_on.empty? + # received_on, obj = Ractor.select(*waiting_on) + # waiting_on.delete(received_on) + # puts obj + # end # - # r, obj = Ractor.select(r1, r2, Ractor.current) - # #=> wait for taking from r1 or r2 - # # or receive from incoming queue - # # If receive is succeed, then obj is received value - # # and r is :receive (Ractor.current) + # # r0 + # # r1 + # # r1 done + # # r0 done # - # r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj) - # #=> wait for taking from r1 or r2 - # # or receive from incoming queue - # # or yield (Ractor.yield) obj - # # If yield is succeed, then obj is nil - # # and r is :yield + # The following example is almost equivalent to <code>ractors.map(&:value)</code> except the thread + # is unblocked when any of the ractors has terminated as opposed to waiting for their termination in + # the array element order. # - def self.select(*ractors, yield_value: yield_unspecified = true, move: false) - raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? + # values = [] + # until ractors.empty? + # r, val = Ractor.select(*ractors) + # ractors.delete(r) + # values << val + # end + # + def self.select(*ports) + raise ArgumentError, 'specify at least one Ractor::Port or Ractor' if ports.empty? - __builtin_cstmt! %q{ - const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors); - VALUE rv; - VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors), - yield_unspecified == Qtrue ? Qundef : yield_value, - (bool)RTEST(move) ? true : false, &rv); - return rb_ary_new_from_args(2, rv, v); - } + monitors = {} # Ractor::Port => Ractor + + ports = ports.map do |arg| + case arg + when Ractor + port = Ractor::Port.new + monitors[port] = arg + arg.monitor port + port + when Ractor::Port + arg + else + raise ArgumentError, "should be Ractor::Port or Ractor" + end + end + + begin + result_port, obj = __builtin_ractor_select_internal(ports) + + if r = monitors[result_port] + [r, r.value] + else + [result_port, obj] + end + ensure + # close all ports for join + monitors.each do |port, r| + r.unmonitor port + port.close + end + end end - # Receive an incoming message from Ractor's incoming queue. + # + # call-seq: + # Ractor.receive -> obj + # + # Receives a message from the current ractor's default port. def self.receive - __builtin_cexpr! %q{ - ractor_receive(ec, rb_ec_ractor_ptr(ec)) - } + Ractor.current.default_port.receive end class << self alias recv receive end + # same as Ractor.receive private def receive - __builtin_cexpr! %q{ - // TODO: check current actor - ractor_receive(ec, RACTOR_PTR(self)) - } + default_port.receive end alias recv receive - # Send a message to a Ractor's incoming queue. # - # # Example: - # r = Ractor.new do - # p Ractor.receive #=> 'ok' - # end - # r.send 'ok' # send to r's incoming queue. - def send(obj, move: false) - __builtin_cexpr! %q{ - ractor_send(ec, RACTOR_PTR(self), obj, move) - } - end - alias << send - - # yield a message to the ractor's outgoing port. - def self.yield(obj, move: false) - __builtin_cexpr! %q{ - ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) - } - end - - # Take a message from ractor's outgoing port. + # call-seq: + # ractor.send(msg, move: false) -> self # - # Example: - # r = Ractor.new{ 'oK' } - # p r.take #=> 'ok' - def take - __builtin_cexpr! %q{ - ractor_take(ec, RACTOR_PTR(self)) - } + # This is equivalent to Port#send to the ractor's #default_port. + def send(...) + default_port.send(...) + self end + alias << send def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } - id = __builtin_cexpr! %q{ INT2FIX(RACTOR_PTR(self)->id) } + id = __builtin_cexpr! %q{ UINT2NUM(rb_ractor_id(RACTOR_PTR(self))) } status = __builtin_cexpr! %q{ rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_)) } "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>" end + alias to_s inspect + + # Returns the name set in Ractor.new, or +nil+. def name - __builtin_cexpr! %q{ RACTOR_PTR(self)->name } + __builtin_cexpr! %q{RACTOR_PTR(self)->name} end class RemoteError + # The Ractor in which the uncaught exception was raised. attr_reader :ractor end - # Closes the incoming port and returns its previous state. - def close_incoming + # + # call-seq: + # ractor.close -> true | false + # + # Closes the default port. Closing a port is allowed only by the ractor which created the port. + # Therefore, the receiver must be the current ractor. + # + def close + default_port.close + end + + # + # call-seq: + # Ractor.shareable?(obj) -> true | false + # + # Checks if the object is shareable by ractors. + # + # Ractor.shareable?(1) #=> true -- numbers are shareable + # Ractor.shareable?('foo') #=> false, unless the string is frozen due to # frozen_string_literal: true + # Ractor.shareable?('foo'.freeze) #=> true + # + # See also the "Shareable and unshareable objects" section in the \Ractor class docs. + def self.shareable? obj __builtin_cexpr! %q{ - ractor_close_incoming(ec, RACTOR_PTR(self)); + RBOOL(rb_ractor_shareable_p(obj)); } end - # Closes the outgoing port and returns its previous state. - def close_outgoing + # + # call-seq: + # Ractor.make_shareable(obj, copy: false) -> shareable_obj + # + # Makes +obj+ shareable between ractors. + # + # +obj+ and all the objects it refers to will be frozen, unless they are + # already shareable. + # + # If +copy+ keyword is +true+, it will copy objects before freezing them, and will not + # modify +obj+ or its internal objects. + # + # Note that the specification and implementation of this method are not + # mature and may be changed in the future. + # + # obj = ['test'] + # Ractor.shareable?(obj) #=> false + # Ractor.make_shareable(obj) #=> ["test"] + # Ractor.shareable?(obj) #=> true + # obj.frozen? #=> true + # obj[0].frozen? #=> true + # + # # Copy vs non-copy versions: + # obj1 = ['test'] + # obj1s = Ractor.make_shareable(obj1) + # obj1.frozen? #=> true + # obj1s.object_id == obj1.object_id #=> true + # obj2 = ['test'] + # obj2s = Ractor.make_shareable(obj2, copy: true) + # obj2.frozen? #=> false + # obj2s.frozen? #=> true + # obj2s.object_id == obj2.object_id #=> false + # obj2s[0].object_id == obj2[0].object_id #=> false + # + # See also the "Shareable and unshareable objects" section in the Ractor class docs. + def self.make_shareable obj, copy: false + if copy + __builtin_cexpr! %q{ + rb_ractor_make_shareable_copy(obj); + } + else + __builtin_cexpr! %q{ + rb_ractor_make_shareable(obj); + } + end + end + + # Gets a value from ractor-local storage for the current Ractor. + # Obsolete, use Ractor.[] instead. + def [](sym) + if (self != Ractor.current) + raise RuntimeError, "Cannot get ractor local storage for non-current ractor" + end + Primitive.ractor_local_value(sym) + end + + # Sets a value in ractor-local storage for the current Ractor. + # Obsolete, use Ractor.[]= instead. + def []=(sym, val) + if (self != Ractor.current) + raise RuntimeError, "Cannot set ractor local storage for non-current ractor" + end + Primitive.ractor_local_value_set(sym, val) + end + + # Gets a value from ractor-local storage for the current Ractor. + def self.[](sym) + Primitive.ractor_local_value(sym) + end + + # Sets a value in ractor-local storage for the current Ractor. + def self.[]=(sym, val) + Primitive.ractor_local_value_set(sym, val) + end + + # call-seq: + # Ractor.store_if_absent(key){ init_block } + # + # If the corresponding ractor-local value is not set, yields a value with + # init_block and stores the value in a thread-safe manner. + # This method returns the stored value. + # + # (1..10).map{ + # Thread.new(it){|i| + # Ractor.store_if_absent(:s){ f(); i } + # #=> return stored value of key :s + # } + # }.map(&:value).uniq.size #=> 1 and f() is called only once + # + def self.store_if_absent(sym) + Primitive.attr! :use_block + Primitive.ractor_local_value_store_if_absent(sym) + end + + # Returns the main ractor. + def self.main __builtin_cexpr! %q{ - ractor_close_outgoing(ec, RACTOR_PTR(self)); + rb_ractor_self(GET_VM()->ractor.main_ractor); } end - # utility method - def self.shareable? obj + # Returns true if the current ractor is the main ractor. + def self.main? __builtin_cexpr! %q{ - rb_ractor_shareable_p(obj) ? Qtrue : Qfalse; + RBOOL(GET_VM()->ractor.main_ractor == rb_ec_ractor_ptr(ec)) } end - def self.make_shareable obj + # internal method + def self._require feature # :nodoc: + if main? + super feature + else + Primitive.ractor_require feature + end + end + + class << self + private + + # internal method that is called when the first "Ractor.new" is called + def _activated # :nodoc: + Kernel.prepend Module.new{|m| + m.set_temporary_name '<RactorRequire>' + + def require feature # :nodoc: -- otherwise RDoc outputs it as a class method + if Ractor.main? + super + else + Ractor._require feature + end + end + } + end + end + + # + # call-seq: + # ractor.default_port -> port object + # + # Returns the default port of the Ractor. + # + def default_port __builtin_cexpr! %q{ - rb_ractor_make_shareable(obj); + ractor_default_port_value(RACTOR_PTR(self)) } end + + # + # call-seq: + # ractor.join -> self + # + # Waits for the termination of the Ractor. + # If the Ractor was aborted (terminated by an unhandled exception), + # the exception is raised in the current ractor. + # + # Ractor.new{}.join #=> ractor + # + # Ractor.new{ raise "foo" }.join + # #=> raises the exception "foo (RuntimeError)" + # + def join + port = Port.new + + self.monitor port + if port.receive == :aborted + __builtin_ractor_value + end + + self + ensure + port.close + end + + # + # call-seq: + # ractor.value -> obj + # + # Waits for +ractor+ to complete and returns its value or raises the exception + # which terminated the Ractor. The termination value will be moved to the calling + # Ractor. Therefore, at most 1 Ractor can receive another ractor's termination value. + # + # r = Ractor.new{ [1, 2] } + # r.value #=> [1, 2] (unshareable object) + # + # Ractor.new(r){|r| r.value} #=> Ractor::Error + # + def value + self.join + __builtin_ractor_value + end + + # + # call-seq: + # ractor.monitor(port) -> self + # + # Registers the port as a monitoring port for this ractor. When the ractor terminates, + # the port receives a Symbol object. + # + # * +:exited+ is sent if the ractor terminates without an unhandled exception. + # * +:aborted+ is sent if the ractor terminates by an unhandled exception. + # + # r = Ractor.new{ some_task() } + # r.monitor(port = Ractor::Port.new) + # port.receive #=> :exited and r is terminated + # + # r = Ractor.new{ raise "foo" } + # r.monitor(port = Ractor::Port.new) + # port.receive #=> :aborted and r is terminated by the RuntimeError "foo" + # + def monitor port + __builtin_ractor_monitor(port) + end + + # + # call-seq: + # ractor.unmonitor(port) -> self + # + # Unregisters the port from the monitoring ports for this ractor. + # + def unmonitor port + __builtin_ractor_unmonitor(port) + end + + # + # call-seq: + # Ractor.shareable_proc(self: nil){} -> shareable proc + # + # Returns a shareable copy of the given block's Proc. The value of +self+ + # in the Proc will be replaced with the value passed via the `self:` keyword, + # or +nil+ if not given. + # + # In a shareable Proc, access to any outer variables is prohibited. + # + # a = 42 + # Ractor.shareable_proc{ p a } + # #=> can not isolate a Proc because it accesses outer variables (a). (Ractor::IsolationError) + # + # The value of `self` in the Proc must be a shareable object. + # + # Ractor.shareable_proc(self: self){} + # #=> self should be shareable: main (Ractor::IsolationError) + # + def self.shareable_proc self: nil + Primitive.attr! :use_block + + __builtin_cexpr!(%Q{ + ractor_shareable_proc(ec, *LOCAL_PTR(self), false) + }) + end + + # + # call-seq: + # Ractor.shareable_lambda(self: nil){} -> shareable lambda + # + # Same as Ractor.shareable_proc, but returns a lambda Proc. + # + def self.shareable_lambda self: nil + Primitive.attr! :use_block + + __builtin_cexpr!(%Q{ + ractor_shareable_proc(ec, *LOCAL_PTR(self), true) + }) + end + + # \Port objects transmit messages between Ractors. + class Port + # + # call-seq: + # port.receive -> msg + # + # Receives a message from the port (which was sent there by Port#send). Only the ractor + # that created the port can receive messages this way. + # + # port = Ractor::Port.new + # r = Ractor.new port do |port| + # port.send('message1') + # end + # + # v1 = port.receive + # puts "Received: #{v1}" + # r.join + # # This will print: "Received: message1" + # + # The method blocks the current Thread if the message queue is empty. + # + # port = Ractor::Port.new + # r = Ractor.new port do |port| + # wait + # puts "Still not received" + # port.send('message1') + # wait + # puts "Still received only one" + # port.send('message2') + # end + # puts "Before first receive" + # v1 = port.receive + # puts "Received: #{v1}" + # v2 = port.receive + # puts "Received: #{v2}" + # r.join + # + # Output: + # + # Before first receive + # Still not received + # Received: message1 + # Still received only one + # Received: message2 + # + # If the port is closed and there are no more messages in the message queue, + # the method raises Ractor::ClosedError. + # + # port = Ractor::Port.new + # port.close + # port.receive #=> raise Ractor::ClosedError + # + def receive + __builtin_cexpr! %q{ + ractor_port_receive(ec, self) + } + end + + # + # call-seq: + # port.send(msg, move: false) -> self + # + # Sends a message to the port to be accepted by port.receive. + # + # port = Ractor::Port.new + # r = Ractor.new(port) do |port| + # port.send 'message' + # end + # value = port.receive + # puts "Received #{value}" + # # Prints: "Received: message" + # + # The method is non-blocking (it will return immediately even if the ractor that created the port is not ready + # to receive anything): + # + # port = Ractor::Port.new + # r = Ractor.new(port) do |port| + # port.send 'test' + # puts "Sent successfully" + # # Prints: "Sent successfully" immediately + # end + # + # An attempt to send to a closed port will raise Ractor::ClosedError. + # + # r = Ractor.new {Ractor::Port.new} + # r.join + # p r + # # "#<Ractor:#6 (irb):23 terminated>" + # port = r.value + # port.send('test') # raise Ractor::ClosedError + # + # If the +obj+ is unshareable, by default it will be copied into the receiving ractor by deep cloning. + # + # If the object is shareable, a reference to the object will be sent to the receiving ractor. + # + def send obj, move: false + __builtin_cexpr! %q{ + ractor_port_send(ec, self, obj, move) + } + end + + alias << send + + # + # call-seq: + # port.close + # + # Closes the port. Sending to a closed port is prohibited. + # Receiving is also prohibited if there are no messages in its message queue. + # + # Only the Ractor which created the port is allowed to close it. + # + # port = Ractor::Port.new + # Ractor.new port do |port| + # port.close #=> closing port by other ractors is not allowed (Ractor::Error) + # end.join + # + def close + __builtin_cexpr! %q{ + ractor_port_close(ec, self) + } + end + + # + # call-seq: + # port.closed? -> true/false + # + # Returns whether or not the port is closed. + def closed? + __builtin_cexpr! %q{ + ractor_port_closed_p(ec, self); + } + end + + # + # call-seq: + # port.inspect -> string + def inspect + "#<Ractor::Port to:\##{ + __builtin_cexpr! "SIZET2NUM(rb_ractor_id((RACTOR_PORT_PTR(self)->r)))" + } id:#{ + __builtin_cexpr! "SIZET2NUM(ractor_port_id(RACTOR_PORT_PTR(self)))" + }>" + end + end end |
