summaryrefslogtreecommitdiff
path: root/lib/shell/process-controller.rb
diff options
context:
space:
mode:
authorkeiju <keiju@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2007-03-20 12:38:58 +0000
committerkeiju <keiju@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2007-03-20 12:38:58 +0000
commit42a81120e510c2c2fb7822ff4d5738db6ba392bf (patch)
tree3f6024dcb1fe3a3ca6b3fd613f436fd99c80c80c /lib/shell/process-controller.rb
parentd9d838a45beff13576e4fba42e0d5b611d0df706 (diff)
* lib/shell.rb, lib/shell: support for ruby 1.9(YARV) thread model.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@12110 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'lib/shell/process-controller.rb')
-rw-r--r--lib/shell/process-controller.rb158
1 files changed, 109 insertions, 49 deletions
diff --git a/lib/shell/process-controller.rb b/lib/shell/process-controller.rb
index 8929805506..c38d8bc987 100644
--- a/lib/shell/process-controller.rb
+++ b/lib/shell/process-controller.rb
@@ -1,33 +1,38 @@
#
# shell/process-controller.rb -
-# $Release Version: 0.6.0 $
+# $Release Version: 0.7 $
# $Revision$
# $Date$
-# by Keiju ISHITSUKA(Nihon Rational Software Co.,Ltd)
+# by Keiju ISHITSUKA(keiju@ruby-lang.org)
#
# --
#
#
#
+require "forwardable"
-require "mutex_m"
-require "monitor"
+require "thread"
require "sync"
class Shell
class ProcessController
@ProcessControllers = {}
- @ProcessControllers.extend Mutex_m
+ @ProcessControllersMonitor = Mutex.new
+ @ProcessControllersCV = ConditionVariable.new
+
+ @BlockOutputMonitor = Mutex.new
+ @BlockOutputCV = ConditionVariable.new
class<<self
+ extend Forwardable
- def process_controllers_exclusive
- begin
- @ProcessControllers.lock unless Thread.critical
- yield
- ensure
- @ProcessControllers.unlock unless Thread.critical
+ def_delegator("@ProcessControllersMonitor",
+ "synchronize", "process_controllers_exclusive")
+
+ def active_process_controllers
+ process_controllers_exclusive do
+ @ProcessControllers.dup
end
end
@@ -43,6 +48,7 @@ class Shell
if @ProcessControllers[pc]
if (@ProcessControllers[pc] -= 1) == 0
@ProcessControllers.delete(pc)
+ @ProcessControllersCV.signal
end
end
end
@@ -55,6 +61,34 @@ class Shell
end
end
end
+
+ def block_output_synchronize(&b)
+ @BlockOutputMonitor.synchronize &b
+ end
+
+ def wait_to_finish_all_process_controllers
+ process_controllers_exclusive do
+ while !@ProcessControllers.empty?
+ Shell::notify("Process finishing, but active shell exists",
+ "You can use Shell#transact or Shell#check_point for more safe execution.")
+ if Shell.debug?
+ for pc in @ProcessControllers.keys
+ Shell::notify(" Not finished jobs in "+pc.shell.to_s)
+ for com in pc.jobs
+ com.notify(" Jobs: %id")
+ end
+ end
+ end
+ @ProcessControllersCV.wait(@ProcessControllersMonitor)
+ end
+ end
+ end
+ end
+
+ # for shell-command complete finish at this prosess exit.
+ USING_AT_EXIT_WHEN_PROCESS_EXIT = true
+ at_exit do
+ wait_to_finish_all_process_controllers unless $@
end
def initialize(shell)
@@ -67,6 +101,8 @@ class Shell
@job_condition = ConditionVariable.new
end
+ attr_reader :shell
+
def jobs
jobs = []
@jobs_sync.synchronize(:SH) do
@@ -122,15 +158,19 @@ class Shell
@waiting_jobs.delete command
else
command = @waiting_jobs.shift
+# command.notify "job(%id) pre-start.", @shell.debug?
+
return unless command
end
@active_jobs.push command
command.start
+# command.notify "job(%id) post-start.", @shell.debug?
# start all jobs that input from the job
- for job in @waiting_jobs
+ for job in @waiting_jobs.dup
start_job(job) if job.input == command
end
+# command.notify "job(%id) post2-start.", @shell.debug?
end
end
@@ -152,6 +192,7 @@ class Shell
@active_jobs.delete command
ProcessController.inactivate(self)
if @active_jobs.empty?
+ command.notify("start_jon in ierminate_jon(%id)", Shell::debug?)
start_job
end
end
@@ -159,7 +200,7 @@ class Shell
# kill a job
def kill_job(sig, command)
- @jobs_sync.synchronize(:SH) do
+ @jobs_sync.synchronize(:EX) do
if @waiting_jobs.delete command
ProcessController.inactivate(self)
return
@@ -183,6 +224,9 @@ class Shell
begin
while !jobs.empty?
@job_condition.wait(@job_monitor)
+ for job in jobs
+ job.notify("waiting job(%id)", Shell::debug?)
+ end
end
ensure
redo unless jobs.empty?
@@ -194,66 +238,82 @@ class Shell
def sfork(command, &block)
pipe_me_in, pipe_peer_out = IO.pipe
pipe_peer_in, pipe_me_out = IO.pipe
- Thread.critical = true
- STDOUT.flush
- ProcessController.each_active_object do |pc|
- for jobs in pc.active_jobs
- jobs.flush
- end
- end
-
- pid = fork {
- Thread.critical = true
-
- Thread.list.each do |th|
- th.kill unless [Thread.main, Thread.current].include?(th)
- end
- STDIN.reopen(pipe_peer_in)
- STDOUT.reopen(pipe_peer_out)
+ pid = nil
+ pid_mutex = Mutex.new
+ pid_cv = ConditionVariable.new
- ObjectSpace.each_object(IO) do |io|
- if ![STDIN, STDOUT, STDERR].include?(io)
- io.close unless io.closed?
+ Thread.start do
+ ProcessController.block_output_synchronize do
+ STDOUT.flush
+ ProcessController.each_active_object do |pc|
+ for jobs in pc.active_jobs
+ jobs.flush
+ end
end
+
+ pid = fork {
+ Thread.list.each do |th|
+# th.kill unless [Thread.main, Thread.current].include?(th)
+ th.kill unless Thread.current == th
+ end
+
+ STDIN.reopen(pipe_peer_in)
+ STDOUT.reopen(pipe_peer_out)
+
+ ObjectSpace.each_object(IO) do |io|
+ if ![STDIN, STDOUT, STDERR].include?(io)
+ io.close unless io.closed?
+ end
+ end
+
+ yield
+ }
end
- yield
- }
+ pid_cv.signal
- pipe_peer_in.close
- pipe_peer_out.close
- command.notify "job(%name:##{pid}) start", @shell.debug?
- Thread.critical = false
+ pipe_peer_in.close
+ pipe_peer_out.close
+ command.notify "job(%name:##{pid}) start", @shell.debug?
- th = Thread.start {
- Thread.critical = true
begin
_pid = nil
command.notify("job(%id) start to waiting finish.", @shell.debug?)
- Thread.critical = false
_pid = Process.waitpid(pid, nil)
rescue Errno::ECHILD
command.notify "warn: job(%id) was done already waitipd."
_pid = true
+ # rescue
+ # STDERR.puts $!
ensure
+ command.notify("Job(%id): Wait to finish when Process finished.", @shell.debug?)
# when the process ends, wait until the command termintes
- if _pid
+ if USING_AT_EXIT_WHEN_PROCESS_EXIT or _pid
else
command.notify("notice: Process finishing...",
"wait for Job[%id] to finish.",
"You can use Shell#transact or Shell#check_point for more safe execution.")
redo
end
- Thread.exclusive do
- @job_monitor.synchronize do
- terminate_job(command)
- @job_condition.signal
- command.notify "job(%id) finish.", @shell.debug?
- end
+
+# command.notify "job(%id) pre-pre-finish.", @shell.debug?
+ @job_monitor.synchronize do
+# command.notify "job(%id) pre-finish.", @shell.debug?
+ terminate_job(command)
+# command.notify "job(%id) pre-finish2.", @shell.debug?
+ @job_condition.signal
+ command.notify "job(%id) finish.", @shell.debug?
end
end
- }
+ end
+
+ pid_mutex.synchronize do
+ while !pid
+ pid_cv.wait(pid_mutex)
+ end
+ end
+
return pid, pipe_me_in, pipe_me_out
end
end