summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorseki <seki@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-07-27 00:04:38 +0000
committerseki <seki@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-07-27 00:04:38 +0000
commit0098caf510e6241982fe3c04068d8fa58d839fec (patch)
tree37daf6e7ec9af240ebe2d39306f1f59a738b2c23
parent232e6592149d8fae60b148797fcb5d315472f7e5 (diff)
merged from 1.8
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@18228 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
-rw-r--r--ChangeLog6
-rw-r--r--lib/rinda/tuplespace.rb111
-rw-r--r--test/rinda/test_rinda.rb130
3 files changed, 156 insertions, 91 deletions
diff --git a/ChangeLog b/ChangeLog
index 0ec75bbdef..29e34ce009 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,9 @@
+Sun Jul 27 09:02:32 2008 Masatoshi SEKI <m_seki@mva.biglobe.ne.jp>
+
+ * lib/rinda/tuplespace.rb: merged from 1.8.
+
+ * test/rinda/test_rinda.rb: merged from 1.8.
+
Sat Jul 26 22:45:18 2008 Yuki Sonoda (Yugui) <yugui@yugui.jp>
* sample/exyacc.rb: fixed NoMethodError(Kernel#sub!).
diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb
index 73e79bb401..b0409dde3b 100644
--- a/lib/rinda/tuplespace.rb
+++ b/lib/rinda/tuplespace.rb
@@ -2,6 +2,8 @@ require 'monitor'
require 'thread'
require 'drb/drb'
require 'rinda/rinda'
+require 'enumerator'
+require 'forwardable'
module Rinda
@@ -286,45 +288,70 @@ module Rinda
# of Tuplespace.
class TupleBag
+ class TupleBin
+ extend Forwardable
+ def_delegators '@bin', :find_all, :delete_if, :each, :empty?
+
+ def initialize
+ @bin = []
+ end
+
+ def add(tuple)
+ @bin.push(tuple)
+ end
+
+ def delete(tuple)
+ idx = @bin.rindex(tuple)
+ @bin.delete_at(idx) if idx
+ end
+
+ def find(&blk)
+ @bin.reverse_each do |x|
+ return x if yield(x)
+ end
+ nil
+ end
+ end
def initialize # :nodoc:
@hash = {}
+ @enum = Enumerable::Enumerator.new(self, :each_entry)
end
##
# +true+ if the TupleBag to see if it has any expired entries.
def has_expires?
- @hash.each do |k, v|
- v.each do |tuple|
- return true if tuple.expires
- end
+ @enum.find do |tuple|
+ tuple.expires
end
- false
end
##
- # Add +ary+ to the TupleBag.
+ # Add +tuple+ to the TupleBag.
- def push(ary)
- size = ary.size
- @hash[size] ||= []
- @hash[size].push(ary)
+ def push(tuple)
+ key = bin_key(tuple)
+ @hash[key] ||= TupleBin.new
+ @hash[key].add(tuple)
end
##
- # Removes +ary+ from the TupleBag.
+ # Removes +tuple+ from the TupleBag.
- def delete(ary)
- size = ary.size
- @hash.fetch(size, []).delete(ary)
+ def delete(tuple)
+ key = bin_key(tuple)
+ bin = @hash[key]
+ return nil unless bin
+ bin.delete(tuple)
+ @hash.delete(key) if bin.empty?
+ tuple
end
##
# Finds all live tuples that match +template+.
-
def find_all(template)
- @hash.fetch(template.size, []).find_all do |tuple|
+ bin_for_find(template).find_all do |tuple|
tuple.alive? && template.match(tuple)
end
end
@@ -333,7 +360,7 @@ module Rinda
# Finds a live tuple that matches +template+.
def find(template)
- @hash.fetch(template.size, []).find do |tuple|
+ bin_for_find(template).find do |tuple|
tuple.alive? && template.match(tuple)
end
end
@@ -343,7 +370,7 @@ module Rinda
# +tuple+ and are alive.
def find_all_template(tuple)
- @hash.fetch(tuple.size, []).find_all do |template|
+ @enum.find_all do |template|
template.alive? && template.match(tuple)
end
end
@@ -354,20 +381,39 @@ module Rinda
def delete_unless_alive
deleted = []
- @hash.keys.each do |size|
- ary = []
- @hash[size].each do |tuple|
+ @hash.each do |key, bin|
+ bin.delete_if do |tuple|
if tuple.alive?
- ary.push(tuple)
+ false
else
deleted.push(tuple)
+ true
end
end
- @hash[size] = ary
end
deleted
end
+ private
+ def each_entry(&blk)
+ @hash.each do |k, v|
+ v.each(&blk)
+ end
+ end
+
+ def bin_key(tuple)
+ head = tuple[0]
+ if head.class == Symbol
+ return head
+ else
+ false
+ end
+ end
+
+ def bin_for_find(template)
+ key = bin_key(template)
+ key ? @hash.fetch(key, []) : @enum
+ end
end
##
@@ -403,8 +449,7 @@ module Rinda
# Adds +tuple+
def write(tuple, sec=nil)
- entry = TupleEntry.new(tuple, sec)
- start_keeper
+ entry = create_entry(tuple, sec)
synchronize do
if entry.expired?
@read_waiter.find_all_template(entry).each do |template|
@@ -414,6 +459,7 @@ module Rinda
notify_event('delete', entry.value)
else
@bag.push(entry)
+ start_keeper if entry.expires
@read_waiter.find_all_template(entry).each do |template|
template.read(tuple)
end
@@ -439,7 +485,6 @@ module Rinda
def move(port, tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given?
- start_keeper
synchronize do
entry = @bag.find(template)
if entry
@@ -452,6 +497,7 @@ module Rinda
begin
@take_waiter.push(template)
+ start_keeper if template.expires
while true
raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired?
@@ -476,7 +522,6 @@ module Rinda
def read(tuple, sec=nil)
template = WaitTemplateEntry.new(self, tuple, sec)
yield(template) if block_given?
- start_keeper
synchronize do
entry = @bag.find(template)
return entry.value if entry
@@ -484,6 +529,7 @@ module Rinda
begin
@read_waiter.push(template)
+ start_keeper if template.expires
template.wait
raise RequestCanceledError if template.canceled?
raise RequestExpiredError if template.expired?
@@ -529,6 +575,10 @@ module Rinda
private
+ def create_entry(tuple, sec)
+ TupleEntry.new(tuple, sec)
+ end
+
##
# Removes dead tuples.
@@ -566,9 +616,12 @@ module Rinda
def start_keeper
return if @keeper && @keeper.alive?
@keeper = Thread.new do
- while need_keeper?
- keep_clean
+ while true
sleep(@period)
+ synchronize do
+ break unless need_keeper?
+ keep_clean
+ end
end
end
end
diff --git a/test/rinda/test_rinda.rb b/test/rinda/test_rinda.rb
index d796546a3d..14d3c31499 100644
--- a/test/rinda/test_rinda.rb
+++ b/test/rinda/test_rinda.rb
@@ -12,14 +12,14 @@ class MockClock
include Singleton
class MyTS < Rinda::TupleSpace
- def keeper
+ def keeper_thread
nil
end
end
def initialize
@now = 2
- @reso = 0.1
+ @reso = 1
@ts = MyTS.new
@ts.write([2, :now])
@inf = 2**31 - 1
@@ -33,17 +33,18 @@ class MockClock
n
end
- def _forward(n=@reso)
+ def _forward(n=nil)
now ,= @ts.take([nil, :now])
@now = now + n
n = @reso if n.nil?
@ts.write([@now, :now])
end
- def forward(n=@reso)
+ def forward(n)
while n > 0
_forward(@reso)
n -= @reso
+ Thread.pass
end
end
@@ -55,21 +56,11 @@ class MockClock
@ts.write([2, :now])
end
- def sleep(n=@reso)
- while will_deadlock?
- n -= @reso
- forward
- return 0 if n <= 0
- end
+ def sleep(n=nil)
now ,= @ts.read([nil, :now])
@ts.read([(now + n)..@inf, :now])
0
end
-
- def will_deadlock?
- sz = Thread.current.group.list.find_all {|x| x.status != 'sleep'}.size
- sz <= 1
- end
end
module Time
@@ -115,6 +106,14 @@ module TupleSpaceTestModule
Time.sleep(n)
end
end
+
+ def thread_join(th)
+ while th.alive?
+ Kernel.sleep(0.1)
+ sleep(1)
+ end
+ th.value
+ end
def test_00_tuple
tuple = Rinda::TupleEntry.new([1,2,3])
@@ -240,6 +239,28 @@ module TupleSpaceTestModule
end
end
+ def test_ruby_talk_264062
+ th = Thread.new { @ts.take([:empty], 1) }
+ sleep(10)
+ assert_raises(Rinda::RequestExpiredError) do
+ thread_join(th)
+ end
+
+ th = Thread.new { @ts.read([:empty], 1) }
+ sleep(10)
+ assert_raises(Rinda::RequestExpiredError) do
+ thread_join(th)
+ end
+ end
+
+ def test_symbol_tuple
+ @ts.write([:symbol, :symbol])
+ @ts.write(['string', :string])
+ assert_equal([[:symbol, :symbol]], @ts.read_all([:symbol, nil]))
+ assert_equal([[:symbol, :symbol]], @ts.read_all([Symbol, nil]))
+ assert_equal([], @ts.read_all([:nil, nil]))
+ end
+
def test_core_01
5.times do |n|
@ts.write([:req, 2])
@@ -252,7 +273,7 @@ module TupleSpaceTestModule
s = 0
while true
begin
- tuple = @ts.take([:req, Integer], 0.5)
+ tuple = @ts.take([:req, Integer], 1)
assert_equal(2, tuple[1])
s += tuple[1]
rescue Rinda::RequestExpiredError
@@ -263,10 +284,9 @@ module TupleSpaceTestModule
s
end
- sleep(20)
+ assert_equal(10, thread_join(taker))
tuple = @ts.take([:ans, nil])
assert_equal(10, tuple[1])
- assert_equal(10, taker.value)
end
def test_core_02
@@ -274,7 +294,7 @@ module TupleSpaceTestModule
s = 0
while true
begin
- tuple = @ts.take([:req, Integer], 1.0)
+ tuple = @ts.take([:req, Integer], 1)
assert_equal(2, tuple[1])
s += tuple[1]
rescue Rinda::RequestExpiredError
@@ -289,10 +309,9 @@ module TupleSpaceTestModule
@ts.write([:req, 2])
end
- sleep(20)
+ assert_equal(10, thread_join(taker))
tuple = @ts.take([:ans, nil])
assert_equal(10, tuple[1])
- assert_equal(10, taker.value)
assert_equal([], @ts.read_all([nil, nil]))
end
@@ -349,7 +368,7 @@ module TupleSpaceTestModule
s = 0
while true
begin
- tuple = @ts.take([:req, Integer], 1.0)
+ tuple = @ts.take([:req, Integer], 1)
s += tuple[1]
rescue Rinda::RequestExpiredError
break
@@ -359,26 +378,23 @@ module TupleSpaceTestModule
s
end
- writer = Thread.new do
- 5.times do |n|
- @ts.write([:req, 2])
- sleep 0.1
- end
+ 5.times do |n|
+ @ts.write([:req, 2])
end
@ts.take({"message"=>"first", "name"=>"3"})
sleep(4)
+ assert_equal(10, thread_join(taker))
tuple = @ts.take([:ans, nil])
assert_equal(10, tuple[1])
- assert_equal(10, taker.value)
assert_equal([], @ts.read_all([nil, nil]))
notify1.cancel
sleep(3) # notify2 expired
- assert_equal([0, 11], listener1.value)
- assert_equal([0, 3], listener2.value)
+ assert_equal([0, 11], thread_join(listener1))
+ assert_equal([0, 3], thread_join(listener2))
ary = []
ary.push(["write", {"message"=>"first", "name"=>"3"}])
@@ -403,23 +419,24 @@ module TupleSpaceTestModule
template = nil
taker = Thread.new do
- @ts.take([:take, nil], 10) do |template|
+ @ts.take([:take, nil], 10) do |t|
+ template = t
Thread.new do
- sleep 0.2
template.cancel
end
end
end
- sleep(1)
- assert(template.canceled?)
-
- @ts.write([:take, 1])
+ sleep(2)
assert_raises(Rinda::RequestCanceledError) do
- assert_nil(taker.value)
+ assert_nil(thread_join(taker))
end
+ assert(template.canceled?)
+
+ @ts.write([:take, 1])
+
assert_equal([[:take, 1]], @ts.read_all([nil, nil]))
end
@@ -431,23 +448,24 @@ module TupleSpaceTestModule
template = nil
reader = Thread.new do
- @ts.read([:take, nil], 10) do |template|
+ @ts.read([:take, nil], 10) do |t|
+ template = t
Thread.new do
- sleep 0.2
template.cancel
end
end
end
- sleep(1)
- assert(template.canceled?)
-
- @ts.write([:take, 1])
+ sleep(2)
assert_raises(Rinda::RequestCanceledError) do
- assert_nil(reader.value)
+ assert_nil(thread_join(reader))
end
+ assert(template.canceled?)
+
+ @ts.write([:take, 1])
+
assert_equal([[:take, 1]], @ts.read_all([nil, nil]))
end
@@ -478,30 +496,22 @@ module TupleSpaceTestModule
assert(tuple.expired?)
assert(!tuple.alive?)
- tuple = Rinda::TupleEntry.new([1,2,3], SimpleRenewer.new(1,2))
+ @renewer = SimpleRenewer.new(1,2)
+ tuple = Rinda::TupleEntry.new([1,2,3], @renewer)
assert(!tuple.canceled?)
assert(!tuple.expired?)
assert(tuple.alive?)
- sleep(1.5)
+ sleep(1)
assert(!tuple.canceled?)
assert(!tuple.expired?)
assert(tuple.alive?)
- sleep(1.5)
+ sleep(2)
assert(tuple.expired?)
assert(!tuple.alive?)
end
end
class TupleSpaceTest < Test::Unit::TestCase
- def test_message
- flunk("YARV doesn't support Rinda")
- end
-end
-
-end
-__END__
-
-class TupleSpaceTest < Test::Unit::TestCase
include TupleSpaceTestModule
def setup
@@ -520,13 +530,9 @@ class TupleSpaceProxyTest < Test::Unit::TestCase
end
def test_remote_array_and_hash
- ary = [1, 2, 3]
- @ts.write(DRbObject.new(ary))
- GC.start
+ @ts.write(DRbObject.new([1, 2, 3]))
assert_equal([1, 2, 3], @ts.take([1, 2, 3], 0))
- hash = {'head' => 1, 'tail' => 2}
- @ts.write(DRbObject.new(hash))
- GC.start
+ @ts.write(DRbObject.new({'head' => 1, 'tail' => 2}))
assert_equal({'head' => 1, 'tail' => 2},
@ts.take({'head' => 1, 'tail' => 2}, 0))
end