summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog8
-rw-r--r--lib/test/unit.rb441
-rw-r--r--lib/test/unit/parallel.rb11
-rw-r--r--test/testunit/test_parallel.rb3
4 files changed, 246 insertions, 217 deletions
diff --git a/ChangeLog b/ChangeLog
index 997c6cac7a..25d72b04cd 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,11 @@
+Sat Feb 26 16:10:23 2011 Shota Fukumori <sorah@tubusu.net>
+
+ * lib/test/unit.rb: --jobs-status won't puts over 2 lines.
+
+ * test/testunit/test_parallel.rb: Fix test for above.
+
+ * lib/test/*: refactoring.
+
Sat Feb 26 07:10:05 2011 Aaron Patterson <aaron@tenderlovemaking.com>
* ext/psych/lib/psych/scalar_scanner.rb: fix parsing timezone's whose
diff --git a/lib/test/unit.rb b/lib/test/unit.rb
index dee7e3cb74..4d6d6014d6 100644
--- a/lib/test/unit.rb
+++ b/lib/test/unit.rb
@@ -229,33 +229,87 @@ module Test
include Test::Unit::GCStressOption
include Test::Unit::RunCount
+ class Worker
+ def self.launch(ruby,args=[])
+ i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master
+ j,k = IO.pipe("ASCII-8BIT") # worker <j|<k master
+ k.sync = true
+ pid = spawn(*ruby,
+ "#{File.dirname(__FILE__)}/unit/parallel.rb",
+ *args, out: o, in: j)
+ [o,j].each(&:close)
+ new(in: k, out: i, pid: pid, status: :waiting)
+ end
+
+ def initialize(h={})
+ @worker = h
+ @hooks = {}
+ end
+
+ def run(task,type)
+ @worker[:file] = File.basename(task).gsub(/\.rb/,"")
+ @worker[:real_file] = task
+ begin
+ @worker[:loadpath] ||= []
+ @worker[:in].puts "loadpath #{[Marshal.dump($:-@worker[:loadpath])].pack("m").gsub("\n","")}"
+ @worker[:loadpath] = $:.dup
+ @worker[:in].puts "run #{task} #{type}"
+ @worker[:status] = :prepare
+ rescue Errno::EPIPE
+ dead
+ rescue IOError
+ raise unless ["stream closed","closed stream"].include? $!.message
+ dead
+ end
+ end
+
+ def hook(id,&block)
+ @hooks[id] ||= []
+ @hooks[id] << block
+ self
+ end
+
+ def read
+ ((self[:status] == :quit) ? self[:out].read : self[:out].gets).chomp
+ end
+
+ def [](k); @worker[k]; end
+ def []=(k,v); @worker[k]=v; end
+
+ def dead(*additional)
+ @worker[:status] = :quit
+ @worker[:in].close
+ @worker[:out].close
+
+ call_hook(:dead,*additional)
+ end
+
+ def to_s
+ if self[:file]
+ "#{self[:pid]}=#{self[:file]}"
+ else
+ "#{self[:pid]}:#{self[:status].to_s.ljust(7)}"
+ end
+ end
+
+ private
+
+ def call_hook(id,*additional)
+ @hooks[id] ||= []
+ @hooks[id].each{|hook| hook[self,additional] }
+ self
+ end
+
+ end
+
class << self; undef autorun; end
- alias orig_run_anything _run_anything
- undef _run_anything
undef options
def options
@optss ||= (@options||{}).merge(@opts)
end
- def _run_anything type
- if @opts[:parallel] && @warnings
- warn ""
- ary = []
- @warnings.reject! do |w|
- r = ary.include?(w[1].message)
- ary << w[1].message
- r
- end
- @warnings.each do |w|
- warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
- end
- warn ""
- end
- orig_run_anything(type)
- end
-
@@stop_auto_run = false
def self.autorun
at_exit {
@@ -269,7 +323,6 @@ module Test
def after_worker_down(worker, e=nil, c=1)
return unless @opts[:parallel]
return if @interrupt
- after_worker_dead worker
if e
b = e.backtrace
warn "#{b.shift}: #{e.message} (#{e.class})"
@@ -288,40 +341,20 @@ module Test
def jobs_status
return unless @opts[:job_status]
puts "" unless @opts[:verbose]
- if @opts[:job_status]
- line2 = []
- line1 = @workers.map { |worker|
- a = "#{worker[:pid]}:#{worker[:status].to_s.ljust(7)}"
- if worker[:file]
- if @opts[:job_status_type] == :replace
- a = "#{worker[:pid]}=#{worker[:file]}"
- else
- if a.size > worker[:file].size
- line2 << worker[:file].ljust(a.size)
- else
- a << " "*(worker[:file].size-a.size)
- line2 << worker[:file]
- end
- end
- else
- line2 << " "*a.size
- end
- a
- }.join(" ")
- if @opts[:job_status_type] == :replace
- @terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \
- || %x{tput cols 2>/dev/null}.to_i.nonzero? \
- || 80
- @jstr_size ||= 0
- del_jobs_status
- STDOUT.flush
- print line1[0...@terminal_width]
- STDOUT.flush
- @jstr_size = line1.size > @terminal_width ? @terminal_width : line1.size
- else
- puts line1
- puts line2.join(" ")
- end
+ status_line = @workers.map(&:to_s).join(" ")
+ if @opts[:job_status_type] == :replace
+ @terminal_width ||= %x{stty size 2>/dev/null}.split[1].to_i.nonzero? \
+ || %x{tput cols 2>/dev/null}.to_i.nonzero? \
+ || 80
+ @jstr_size ||= 0
+ del_jobs_status
+ STDOUT.flush
+ print status_line[0...@terminal_width]
+ STDOUT.flush
+ @jstr_size = status_line.size > @terminal_width ? \
+ @terminal_width : status_line.size
+ else
+ puts status_line
end
end
@@ -333,184 +366,174 @@ module Test
def after_worker_dead(worker)
return unless @opts[:parallel]
return if @interrupt
- worker[:status] = :quit
- worker[:in].close
- worker[:out].close
@workers.delete(worker)
@dead_workers << worker
@ios = @workers.map{|w| w[:out] }
end
- def _run_suites suites, type
- @interrupt = nil
- result = []
- if @opts[:parallel]
- begin
- # Require needed things for parallel running
- require 'thread'
- require 'timeout'
- @tasks = @files.dup # Array of filenames.
- @need_quit = false
- @dead_workers = [] # Array of dead workers.
- @warnings = []
- shutting_down = false
- errors = []
- failures = []
- skips = []
- rep = []
-
- # Array of workers.
- @workers = @opts[:parallel].times.map do
- i,o = IO.pipe("ASCII-8BIT") # worker o>|i> master
- j,k = IO.pipe("ASCII-8BIT") # worker <j|<k master
- k.sync = true
- pid = spawn(*@opts[:ruby],
- "#{File.dirname(__FILE__)}/unit/parallel.rb",
- *@args, out: o, in: j)
- [o,j].each{|io| io.close }
- {in: k, out: i, pid: pid, status: :waiting}
+ def _run_parallel suites, type, result
+ begin
+ # Require needed things for parallel running
+ require 'thread'
+ require 'timeout'
+ @tasks = @files.dup # Array of filenames.
+ @need_quit = false
+ @dead_workers = [] # Array of dead workers.
+ @warnings = []
+ shutting_down = false
+ rep = [] # FIXME: more good naming
+
+ # Array of workers.
+ @workers = @opts[:parallel].times.map {
+ begin
+ worker = Worker.launch(@opts[:ruby],@args)
+ worker.hook(:dead) do |w,info|
+ after_worker_dead w
+ after_worker_down w, *info unless info.empty?
end
-
- # Thread: watchdog
- watchdog = Thread.new do
- while stat = Process.wait2
- break if @interrupt # Break when interrupt
- w = (@workers + @dead_workers).find{|x| stat[0] == x[:pid] }.dup
- next unless w
- unless w[:status] == :quit
- # Worker down
- after_worker_down w, nil, stat[1].to_i
- end
- end
+ worker
+ rescue Exception; puts "#{$!.class}: #{$!.message}\n#{$!.backtrace}"
end
- @workers_hash = Hash[@workers.map {|w| [w[:out],w] }] # out-IO => worker
- @ios = @workers.map{|w| w[:out] } # Array of worker IOs
-
- while _io = IO.select(@ios)[0]
- break unless _io.each do |io|
- break if @need_quit
- worker = @workers_hash[io]
- buf = ((worker[:status] == :quit) ? io.read : io.gets).chomp
- case buf
- when /^okay$/ # Worker will run task
- worker[:status] = :running
- jobs_status
- when /^ready$/ # Worker is ready
- worker[:status] = :ready
- if @tasks.empty?
- break unless @workers.find{|x| x[:status] == :running }
- else
- task = @tasks.shift
- worker[:file] = File.basename(task).gsub(/\.rb/,"")
- worker[:real_file] = task
- begin
- worker[:loadpath] ||= []
- worker[:in].puts "loadpath #{[Marshal.dump($:-worker[:loadpath])].pack("m").gsub("\n","")}"
- worker[:loadpath] = $:.dup
- worker[:in].puts "run #{task} #{type}"
- worker[:status] = :prepare
- rescue Errno::EPIPE
- after_worker_down worker
- rescue IOError
- raise unless ["stream closed","closed stream"].include? $!.message
- after_worker_down worker
- end
- end
-
- jobs_status
- when /^done (.+?)$/ # Worker ran a one of suites in a file
- r = Marshal.load($1.unpack("m")[0])
- # [result,result,report,$:]
- result << r[0..1]
- rep << {file: worker[:real_file], report: r[2], result: r[3],
- testcase: r[5]}
- errors << [worker[:real_file],r[5],r[3][0]]
- failures << [worker[:real_file],r[5],r[3][1]]
- skips << [worker[:real_file],r[5],r[3][2]]
- $:.push(*r[4]).uniq!
- worker[:status] = :done
- jobs_status if @opts[:job_status_type] == :replace
- worker[:status] = :running
- when /^p (.+?)$/ # Worker wanna print to STDOUT
- del_jobs_status
- print $1.unpack("m")[0]
- jobs_status if @opts[:job_status_type] == :replace
- when /^after (.+?)$/
- @warnings << Marshal.load($1.unpack("m")[0])
- when /^bye (.+?)$/ # Worker will shutdown
- e = Marshal.load($1.unpack("m")[0])
- after_worker_down worker, e
- when /^bye$/ # Worker will shutdown
- if shutting_down
- after_worker_dead worker
- else
- after_worker_down worker
- end
- end
- break if @need_quit
+ }
+
+ # Thread: watchdog
+ watchdog = Thread.new do
+ while stat = Process.wait2
+ break if @interrupt # Break when interrupt
+ w = (@workers + @dead_workers).find{|x| stat[0] == x[:pid] }.dup
+ next unless w
+ unless w[:status] == :quit
+ # Worker down
+ w.dead(nil, stat[1].to_i)
end
end
+ end
- # Retry
- # TODO: Interrupt?
- rescue Interrupt => e
- @interrupt = e
- return result
- ensure
- shutting_down = true
+ @workers_hash = Hash[@workers.map {|w| [w[:out],w] }] # out-IO => worker
+ @ios = @workers.map{|w| w[:out] } # Array of worker IOs
+
+ while _io = IO.select(@ios)[0]
+ break unless _io.each do |io|
+ break if @need_quit
+ worker = @workers_hash[io]
+ case worker.read
+ when /^okay$/
+ worker[:status] = :running
+ jobs_status
+ when /^ready$/
+ worker[:status] = :ready
+ if @tasks.empty?
+ break unless @workers.find{|x| x[:status] == :running }
+ else
+ worker.run(@tasks.shift, type)
+ end
- watchdog.kill if watchdog
- @workers.each do |worker|
- begin
- timeout(1) do
- worker[:in].puts "quit"
+ jobs_status
+ when /^done (.+?)$/
+ r = Marshal.load($1.unpack("m")[0])
+ result << r[0..1]
+ rep << {file: worker[:real_file],
+ report: r[2], result: r[3], testcase: r[5]}
+ $:.push(*r[4]).uniq!
+ when /^p (.+?)$/
+ del_jobs_status
+ print $1.unpack("m")[0]
+ jobs_status if @opts[:job_status_type] == :replace
+ when /^after (.+?)$/
+ @warnings << Marshal.load($1.unpack("m")[0])
+ when /^bye (.+?)$/
+ after_worker_down worker, Marshal.load($1.unpack("m")[0])
+ when /^bye$/
+ if shutting_down
+ after_worker_dead worker
+ else
+ after_worker_down worker
end
- rescue Errno::EPIPE
- rescue Timeout::Error
- end
- [:in,:out].each do |name|
- worker[name].close
end
+ break if @need_quit
end
+ end
+ rescue Interrupt => e
+ @interrupt = e
+ return result
+ ensure
+ shutting_down = true
+
+ watchdog.kill if watchdog
+ @workers.each do |worker|
begin
- timeout(0.2*@workers.size) do
- Process.waitall
+ timeout(1) do
+ worker[:in].puts "quit"
end
+ rescue Errno::EPIPE
rescue Timeout::Error
- @workers.each do |worker|
- begin
- Process.kill(:KILL,worker[:pid])
- rescue Errno::ESRCH; end
- end
end
+ [:in,:out].each { |name| worker[name].close }
+ end
+ begin
+ timeout(0.2*@workers.size) do
+ Process.waitall
+ end
+ rescue Timeout::Error
+ @workers.each do |worker|
+ begin
+ Process.kill(:KILL,worker[:pid])
+ rescue Errno::ESRCH; end
+ end
+ end
- unless @need_quit
- if @interrupt || @opts[:no_retry]
- rep.each do |r|
- report.push(*r[:report])
- end
- @errors += errors.map(&:last).inject(:+)
- @failures += failures.map(&:last).inject(:+)
- @skips += skips.map(&:last).inject(:+)
+ if @interrupt || @opts[:no_retry]
+ rep.each do |r|
+ report.push(*r[:report])
+ end
+ @errors += rep.map{|x| x[:result][0] }.inject(:+)
+ @failures += rep.map{|x| x[:result][1] }.inject(:+)
+ @skips += rep.map{|x| x[:result][2] }.inject(:+)
+ elsif @need_quit
+ rep.each do |r|
+ report.push(*r[:report])
+ @errors += r[:result][0]
+ @failures += r[:result][1]
+ @skips += r[:result][2]
+ end
+ else
+ puts ""
+ puts "Retrying..."
+ puts ""
+ @options = @opts
+ rep.each do |r|
+ if r[:testcase] && r[:file] && !r[:report].empty?
+ require r[:file]
+ _run_suite(eval(r[:testcase]),type)
else
- puts ""
- puts "Retrying..."
- puts ""
- @options = @opts
- rep.each do |r|
- if r[:testcase] && r[:file] && !r[:report].empty?
- require r[:file]
- _run_suite(eval(r[:testcase]),type)
- else
- report.push(*r[:report])
- @errors += r[:result][0]
- @failures += r[:result][1]
- @skips += r[:result][2]
- end
- end
+ report.push(*r[:report])
+ @errors += r[:result][0]
+ @failures += r[:result][1]
+ @skips += r[:result][2]
end
end
end
+ if @warnings
+ warn ""
+ ary = []
+ @warnings.reject! do |w|
+ r = ary.include?(w[1].message)
+ ary << w[1].message
+ r
+ end
+ @warnings.each do |w|
+ warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
+ end
+ warn ""
+ end
+ end
+ end
+
+ def _run_suites suites, type
+ @interrupt = nil
+ result = []
+ if @opts[:parallel]
+ _run_parallel suites, type, result
else
suites.each {|suite|
begin
diff --git a/lib/test/unit/parallel.rb b/lib/test/unit/parallel.rb
index ae1bf2961c..80dd4eae56 100644
--- a/lib/test/unit/parallel.rb
+++ b/lib/test/unit/parallel.rb
@@ -26,9 +26,9 @@ module Test
stdout = STDOUT.dup
- th = Thread.new(i.dup) do |io|
+ th = Thread.new do
begin
- while buf = (self.verbose ? io.gets : io.read(5))
+ while buf = (self.verbose ? i.gets : i.read(5))
stdout.puts "p #{[buf].pack("m").gsub("\n","")}"
end
rescue IOError
@@ -70,13 +70,11 @@ module Test
@@stop_auto_run = true
@opts = @options.dup
- STDOUT.sync = true
- STDOUT.puts "ready"
Signal.trap(:INT,"IGNORE")
-
-
@old_loadpath = []
begin
+ STDOUT.sync = true
+ STDOUT.puts "ready"
stdin = STDIN.dup
stdout = STDOUT.dup
while buf = stdin.gets
@@ -123,6 +121,7 @@ module Test
exit
end
end
+ rescue Errno::EPIPE
rescue Exception => e
begin
STDOUT.puts "bye #{[Marshal.dump(e)].pack("m").gsub("\n","")}"
diff --git a/test/testunit/test_parallel.rb b/test/testunit/test_parallel.rb
index 3eff960af9..24e03be736 100644
--- a/test/testunit/test_parallel.rb
+++ b/test/testunit/test_parallel.rb
@@ -169,8 +169,7 @@ module TestParallel
def test_jobs_status
spawn_runner "--jobs-status"
buf = timeout(10){@test_out.read}
- assert_match(/\d+:(ready|prepare|running) */,buf)
- assert_match(/test_(first|second|third|forth) */,buf)
+ assert_match(/\d+=test_(first|second|third|forth) */,buf)
end
end