summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/rinda/rinda.rb50
-rw-r--r--test/rinda/test_rinda.rb40
2 files changed, 87 insertions, 3 deletions
diff --git a/lib/rinda/rinda.rb b/lib/rinda/rinda.rb
index 18e284a544..d9cd3782a0 100644
--- a/lib/rinda/rinda.rb
+++ b/lib/rinda/rinda.rb
@@ -206,6 +206,50 @@ module Rinda
# TupleSpaceProxy allows a remote Tuplespace to appear as local.
class TupleSpaceProxy
+ ##
+ # A Port ensures that a moved tuple arrives properly at its destination
+ # and does not get lost.
+ #
+ # See https://bugs.ruby-lang.org/issues/8125
+
+ class Port # :nodoc:
+ attr_reader :value
+
+ def self.deliver
+ port = new
+
+ begin
+ yield(port)
+ ensure
+ port.close
+ end
+
+ port.value
+ end
+
+ def initialize
+ @open = true
+ @value = nil
+ end
+
+ ##
+ # Don't let the DRb thread push to it when remote sends tuple
+
+ def close
+ @open = false
+ end
+
+ ##
+ # Stores +value+ and ensure it does not get marshaled multiple times.
+
+ def push value
+ raise 'port closed' unless @open
+
+ @value = value
+
+ nil # avoid Marshal
+ end
+ end
##
# Creates a new TupleSpaceProxy to wrap +ts+.
@@ -225,9 +269,9 @@ module Rinda
# Takes +tuple+ from the proxied TupleSpace. See TupleSpace#take.
def take(tuple, sec=nil, &block)
- port = []
- @ts.move(DRbObject.new(port), tuple, sec, &block)
- port[0]
+ Port.deliver do |port|
+ @ts.move(DRbObject.new(port), tuple, sec, &block)
+ end
end
##
diff --git a/test/rinda/test_rinda.rb b/test/rinda/test_rinda.rb
index 45f49a740c..13f5d6d2bb 100644
--- a/test/rinda/test_rinda.rb
+++ b/test/rinda/test_rinda.rb
@@ -477,6 +477,46 @@ class TupleSpaceProxyTest < Test::Unit::TestCase
@ts.take({'head' => 1, 'tail' => 2}, 0))
end
+ def test_take_bug_8215
+ DRb.stop_service
+ service = DRb.start_service(nil, @ts_base)
+
+ uri = service.uri
+
+ take = fork do
+ DRb.stop_service
+ DRb.start_service
+ ro = DRbObject.new_with_uri(uri)
+ ts = Rinda::TupleSpaceProxy.new(ro)
+ th = Thread.new do
+ ts.take([:test_take, nil])
+ end
+ Kernel.sleep(0.1)
+ th.raise(Interrupt) # causes loss of the taken tuple
+ ts.write([:barrier, :continue])
+ Kernel.sleep
+ end
+
+ @ts_base.take([:barrier, :continue])
+
+ write = fork do
+ DRb.stop_service
+ DRb.start_service
+ ro = DRbObject.new_with_uri(uri)
+ ts = Rinda::TupleSpaceProxy.new(ro)
+ ts.write([:test_take, 42])
+ end
+
+ status = Process.wait(write)
+
+ assert_equal([[:test_take, 42]], @ts_base.read_all([:test_take, nil]),
+ '[bug:8215] tuple lost')
+ ensure
+ Process.kill("TERM", write) if write && status.nil?
+ Process.kill("TERM", take) if take
+ service.stop_service
+ end
+
@server = DRb.primary_server || DRb.start_service
end