class Ractor # Create a new Ractor with args and a block. # args are passed via incoming channel. # A block (Proc) will be isolated (can't access to outer variables) # # A ractor has default two channels: # an incoming channel and an outgoing channel. # # Other ractors send objects to the ractor via the incoming channel and # the ractor receives them. # The ractor send objects via the outgoing channel and other ractors can # receive them. # # The result of the block is sent via the outgoing channel # and other # # r = Ractor.new do # Ractor.receive # receive via r's mailbox => 1 # Ractor.receive # receive via r's mailbox => 2 # Ractor.yield 3 # yield a message (3) and wait for taking by another ractor. # 'ok' # the return value will be yielded. # # and r's incoming/outgoing ports are closed automatically. # end # r.send 1 # send a message (1) into r's mailbox. # r << 2 # << is an alias of `send`. # p r.take # take a message from r's outgoing port => 3 # p r.take # => 'ok' # p r.take # raise Ractor::ClosedError # # other options: # name: Ractor's name # def self.new(*args, name: nil, &block) b = block # TODO: builtin bug raise ArgumentError, "must be called with a block" unless block loc = caller_locations(1, 1).first loc = "#{loc.path}:#{loc.lineno}" __builtin_ractor_create(loc, name, args, b) end # return current Ractor def self.current __builtin_cexpr! %q{ rb_ec_ractor_ptr(ec)->self } end def self.count __builtin_cexpr! %q{ ULONG2NUM(GET_VM()->ractor.cnt); } end # Multiplex multiple Ractor communications. # # r, obj = Ractor.select(r1, r2) # #=> wait for taking from r1 or r2 # # returned obj is a taken object from Ractor r # # r, obj = Ractor.select(r1, r2, Ractor.current) # #=> wait for taking from r1 or r2 # # or receive from incoming queue # # If receive is succeed, then obj is received value # # and r is :receive (Ractor.current) # # r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj) # #=> wait for taking from r1 or r2 # # or receive from incoming queue # # or yield (Ractor.yield) obj # # If yield is succeed, then obj is nil # # and r is :yield # def self.select(*ractors, yield_value: yield_unspecified = true, move: false) raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty? __builtin_cstmt! %q{ const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors); VALUE rv; VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors), yield_unspecified == Qtrue ? Qundef : yield_value, (bool)RTEST(move) ? true : false, &rv); return rb_ary_new_from_args(2, rv, v); } end # Receive an incoming message from Ractor's incoming queue. def self.receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end class << self alias recv receive end # same as Ractor.receive private def receive __builtin_cexpr! %q{ ractor_receive(ec, rb_ec_ractor_ptr(ec)) } end alias recv receive # Receive only a specific message. # # Instead of Ractor.receive, Ractor.receive_if can provide a pattern # by a block and you can choose the receiving message. # # # Example: # r = Ractor.new do # p Ractor.receive_if{|msg| /foo/ =~ msg} #=> "foo3" # p Ractor.receive_if{|msg| /bar/ =~ msg} #=> "bar1" # p Ractor.receive_if{|msg| /baz/ =~ msg} #=> "baz2" # end # r << "bar1" # r << "baz2" # r << "foo3" # r.take # # If the block returns truthy, the message will be removed from incoming queue # and return this method with the message. # When the block is escaped by break/return/exception and so on, the message also # removed from the incoming queue. # Otherwise, the messsage is remained in the incoming queue and check next received # message by the given block. # # If there is no messages in the incoming queue, wait until arrival of other messages. # # Note that you can not call receive/receive_if in the given block recursively. # It means that you should not do any tasks in the block. # # # Example: # Ractor.current << true # Ractor.receive_if{|msg| Ractor.receive} # #=> `receive': can not call receive/receive_if recursively (Ractor::Error) # def self.receive_if &b Primitive.ractor_receive_if b end private def receive_if &b Primitive.ractor_receive_if b end # Send a message to a Ractor's incoming queue. # # # Example: # r = Ractor.new do # p Ractor.receive #=> 'ok' # end # r.send 'ok' # send to r's incoming queue. def send(obj, move: false) __builtin_cexpr! %q{ ractor_send(ec, RACTOR_PTR(self), obj, move) } end alias << send # yield a message to the ractor's outgoing port. def self.yield(obj, move: false) __builtin_cexpr! %q{ ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move) } end # Take a message from ractor's outgoing port. # # Example: # r = Ractor.new{ 'oK' } # p r.take #=> 'ok' def take __builtin_cexpr! %q{ ractor_take(ec, RACTOR_PTR(self)) } end def inspect loc = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc } name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name } id = __builtin_cexpr! %q{ INT2FIX(RACTOR_PTR(self)->id) } status = __builtin_cexpr! %q{ rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_)) } "#" end def name __builtin_cexpr! %q{ RACTOR_PTR(self)->name } end class RemoteError attr_reader :ractor end # Closes the incoming port and returns its previous state. def close_incoming __builtin_cexpr! %q{ ractor_close_incoming(ec, RACTOR_PTR(self)); } end # Closes the outgoing port and returns its previous state. def close_outgoing __builtin_cexpr! %q{ ractor_close_outgoing(ec, RACTOR_PTR(self)); } end # utility method def self.shareable? obj __builtin_cexpr! %q{ rb_ractor_shareable_p(obj) ? Qtrue : Qfalse; } end # make obj sharable. # # Basically, traverse referring objects from obj and freeze them. # # When a sharable object is found in traversing, stop traversing # from this shareable object. # # If copy keyword is true, it makes a deep copied object # and make it sharable. This is safer option (but it can take more time). # # Note that the specification and implementation of this method are not # matured and can be changed in a future. # def self.make_shareable obj, copy: false if copy __builtin_cexpr! %q{ rb_ractor_make_copy_shareable(obj); } else __builtin_cexpr! %q{ rb_ractor_make_shareable(obj); } end end end