summaryrefslogtreecommitdiff
path: root/lib/shell/system-command.rb
blob: 4bf4f2b86a6c7784a24e57bece4ac34768d452f4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#
#   shell/system-command.rb - 
#   	$Release Version: 0.7 $
#   	$Revision$
#   	$Date$
#   	by Keiju ISHITSUKA(keiju@ruby-lang.org)
#
# --
#
#   
#

require "shell/filter"

class Shell
  class SystemCommand < Filter
    def initialize(sh, command, *opts)
      if t = opts.find{|opt| !opt.kind_of?(String) && opt.class}
	Shell.Fail Error::TypeError, t.class, "String"
      end
      super(sh)
      @command = command
      @opts = opts
      
      @input_queue = Queue.new
      @pid = nil

      sh.process_controller.add_schedule(self)
    end

    attr_reader :command
    alias name command

    def wait?
      @shell.process_controller.waiting_job?(self)
    end

    def active?
      @shell.process_controller.active_job?(self)
    end

    def input=(inp)
      super
      if active?
	start_export
      end
    end

    def start
      notify([@command, *@opts].join(" "))

      @pid, @pipe_in, @pipe_out = @shell.process_controller.sfork(self) {
	Dir.chdir @shell.pwd
	$0 = @command
	exec(@command, *@opts)
      }
      if @input
 	start_export
      end
      start_import
    end

    def flush
      @pipe_out.flush if @pipe_out and !@pipe_out.closed?
    end

    def terminate
      begin
	@pipe_in.close
      rescue IOError
      end
      begin
	@pipe_out.close
      rescue IOError
      end
    end

    def kill(sig)
      if @pid
	Process.kill(sig, @pid)
      end
    end

    def start_import
      notify "Job(%id) start imp-pipe.", @shell.debug?
      rs = @shell.record_separator unless rs
      _eop = true
      th = Thread.start {
	begin
	  while l = @pipe_in.gets
	    @input_queue.push l
	  end
	  _eop = false
	rescue Errno::EPIPE
	  _eop = false
	ensure
	  if !ProcessController::USING_AT_EXIT_WHEN_PROCESS_EXIT and _eop
	    notify("warn: Process finishing...",
		   "wait for Job[%id] to finish pipe importing.",
		   "You can use Shell#transact or Shell#check_point for more safe execution.")
	    redo
	  end
	  notify "job(%id}) close imp-pipe.", @shell.debug?
	  @input_queue.push :EOF
	  @pipe_in.close
	end
      }
    end

    def start_export
      notify "job(%id) start exp-pipe.", @shell.debug?
      _eop = true
      th = Thread.start{
	begin
	  @input.each do |l|
	    ProcessController::block_output_synchronize do
	      @pipe_out.print l
	    end
	  end
	  _eop = false
	rescue Errno::EPIPE, Errno::EIO
	  _eop = false
	ensure
	  if !ProcessController::USING_AT_EXIT_WHEN_PROCESS_EXIT and _eop
	    notify("shell: warn: Process finishing...",
		   "wait for Job(%id) to finish pipe exporting.",
		   "You can use Shell#transact or Shell#check_point for more safe execution.")
	    redo
	  end
	  notify "job(%id) close exp-pipe.", @shell.debug?
	  @pipe_out.close
	end
      }
    end

    alias super_each each
    def each(rs = nil)
      while (l = @input_queue.pop) != :EOF
	yield l
      end
    end

    # ex)
    #    if you wish to output: 
    #	    "shell: job(#{@command}:#{@pid}) close pipe-out."
    #	 then 
    #	    mes: "job(%id) close pipe-out."
    #    yorn: Boolean(@shell.debug? or @shell.verbose?)
    def notify(*opts, &block)
      @shell.notify(*opts) do |mes|
	yield mes if iterator?

	mes.gsub!("%id", "#{@command}:##{@pid}")
	mes.gsub!("%name", "#{@command}")
	mes.gsub!("%pid", "#{@pid}")
	mes
      end
    end
  end
end