summaryrefslogtreecommitdiff
path: root/ractor.rb
blob: e4fbdfe1023d08495f8f42a5e5e2c079884fcaf1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
class Ractor
  # Create a new Ractor with args and a block.
  # args are passed via incoming channel.
  # A block (Proc) will be isolated (can't access to outer variables)
  #
  # A ractor has default two channels:
  # an incoming channel and an outgoing channel.
  #
  # Other ractors send objects to the ractor via the incoming channel and
  # the ractor receives them.
  # The ractor send objects via the outgoing channel and other ractors can
  # receive them.
  #
  # The result of the block is sent via the outgoing channel
  # and other
  #
  #   r = Ractor.new do
  #     Ractor.receive # receive via r's mailbox => 1
  #     Ractor.receive # receive via r's mailbox => 2
  #     Ractor.yield 3 # yield a message (3) and wait for taking by another ractor.
  #     'ok'           # the return value will be yielded.
  #                    # and r's incoming/outgoing ports are closed automatically.
  #   end
  #   r.send 1 # send a message (1) into r's mailbox.
  #   r <<   2 # << is an alias of `send`.
  #   p r.take   # take a message from r's outgoing port => 3
  #   p r.take   # => 'ok'
  #   p r.take   # raise Ractor::ClosedError
  #
  # other options:
  #   name: Ractor's name
  #
  def self.new(*args, name: nil, &block)
    b = block # TODO: builtin bug
    raise ArgumentError, "must be called with a block" unless block
    loc = caller_locations(1, 1).first
    loc = "#{loc.path}:#{loc.lineno}"
    __builtin_ractor_create(loc, name, args, b)
  end

  # return current Ractor
  def self.current
    __builtin_cexpr! %q{
      rb_ec_ractor_ptr(ec)->self
    }
  end

  def self.count
    __builtin_cexpr! %q{
      ULONG2NUM(GET_VM()->ractor.cnt);
    }
  end

  # Multiplex multiple Ractor communications.
  #
  #   r, obj = Ractor.select(r1, r2)
  #   #=> wait for taking from r1 or r2
  #   #   returned obj is a taken object from Ractor r
  #
  #   r, obj = Ractor.select(r1, r2, Ractor.current)
  #   #=> wait for taking from r1 or r2
  #   #         or receive from incoming queue
  #   #   If receive is succeed, then obj is received value
  #   #   and r is :receive (Ractor.current)
  #
  #   r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj)
  #   #=> wait for taking from r1 or r2
  #   #         or receive from incoming queue
  #   #         or yield (Ractor.yield) obj
  #   #   If yield is succeed, then obj is nil
  #   #   and r is :yield
  #
  def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
    raise ArgumentError, 'specify at least one ractor or `yield_value`' if yield_unspecified && ractors.empty?

    __builtin_cstmt! %q{
      const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors);
      VALUE rv;
      VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors),
                              yield_unspecified == Qtrue ? Qundef : yield_value,
                              (bool)RTEST(move) ? true : false, &rv);
      return rb_ary_new_from_args(2, rv, v);
    }
  end

  # Receive an incoming message from Ractor's incoming queue.
  def self.receive
    __builtin_cexpr! %q{
      ractor_receive(ec, rb_ec_ractor_ptr(ec))
    }
  end

  class << self
    alias recv receive
  end

  # same as Ractor.receive
  private def receive
    __builtin_cexpr! %q{
      ractor_receive(ec, rb_ec_ractor_ptr(ec))
    }
  end
  alias recv receive

  # Receive only a specific message.
  #
  # Instead of Ractor.receive, Ractor.receive_if can provide a pattern
  # by a block and you can choose the receiving message.
  #
  # # Example:
  # r = Ractor.new do
  #   p Ractor.receive_if{|msg| /foo/ =~ msg} #=> "foo3"
  #   p Ractor.receive_if{|msg| /bar/ =~ msg} #=> "bar1"
  #   p Ractor.receive_if{|msg| /baz/ =~ msg} #=> "baz2"
  # end
  # r << "bar1"
  # r << "baz2"
  # r << "foo3"
  # r.take
  #
  # If the block returns truthy, the message will be removed from incoming queue
  # and return this method with the message.
  # When the block is escaped by break/return/exception and so on, the message also
  # removed from the incoming queue.
  # Otherwise, the messsage is remained in the incoming queue and check next received
  # message by the given block.
  #
  # If there is no messages in the incoming queue, wait until arrival of other messages.
  #
  # Note that you can not call receive/receive_if in the given block recursively.
  # It means that you should not do any tasks in the block.
  #
  # # Example:
  # Ractor.current << true
  # Ractor.receive_if{|msg| Ractor.receive}
  # #=> `receive': can not call receive/receive_if recursively (Ractor::Error)
  #
  def self.receive_if &b
    Primitive.ractor_receive_if b
  end

  private def receive_if &b
    Primitive.ractor_receive_if b
  end

  # Send a message to a Ractor's incoming queue.
  #
  # # Example:
  #   r = Ractor.new do
  #     p Ractor.receive #=> 'ok'
  #   end
  #   r.send 'ok' # send to r's incoming queue.
  def send(obj, move: false)
    __builtin_cexpr! %q{
      ractor_send(ec, RACTOR_PTR(self), obj, move)
    }
  end
  alias << send

  # yield a message to the ractor's outgoing port.
  def self.yield(obj, move: false)
    __builtin_cexpr! %q{
      ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
    }
  end

  # Take a message from ractor's outgoing port.
  #
  # Example:
  #   r = Ractor.new{ 'oK' }
  #   p r.take #=> 'ok'
  def take
    __builtin_cexpr! %q{
      ractor_take(ec, RACTOR_PTR(self))
    }
  end

  def inspect
    loc  = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
    name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
    id   = __builtin_cexpr! %q{ INT2FIX(RACTOR_PTR(self)->id) }
    status = __builtin_cexpr! %q{
      rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_))
    }
    "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>"
  end

  def name
    __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
  end

  class RemoteError
    attr_reader :ractor
  end

  # Closes the incoming port and returns its previous state.
  def close_incoming
    __builtin_cexpr! %q{
      ractor_close_incoming(ec, RACTOR_PTR(self));
    }
  end

  # Closes the outgoing port and returns its previous state.
  def close_outgoing
    __builtin_cexpr! %q{
      ractor_close_outgoing(ec, RACTOR_PTR(self));
    }
  end

  # utility method
  def self.shareable? obj
    __builtin_cexpr! %q{
      rb_ractor_shareable_p(obj) ? Qtrue : Qfalse;
    }
  end

  # make obj sharable.
  #
  # Basically, traverse referring objects from obj and freeze them.
  # 
  # When a sharable object is found in traversing, stop traversing
  # from this shareable object.
  #
  # If copy keyword is true, it makes a deep copied object
  # and make it sharable. This is safer option (but it can take more time).
  #
  # Note that the specification and implementation of this method are not
  # matured and can be changed in a future.
  #
  def self.make_shareable obj, copy: false
    if copy
      __builtin_cexpr! %q{
        rb_ractor_make_copy_shareable(obj);
      }
    else
      __builtin_cexpr! %q{
        rb_ractor_make_shareable(obj);
      }
    end
  end
end