lib/shell/process-controller.rb


DEFINITIONS

This source file includes following functions.


   1  #
   2  #   shell/process-controller.rb - 
   3  #       $Release Version: 0.6.0 $
   4  #       $Revision: 1.2 $
   5  #       $Date: 2001/05/17 10:19:45 $
   6  #       by Keiju ISHITSUKA(Nihon Rational Software Co.,Ltd)
   7  #
   8  # --
   9  #
  10  #   
  11  #
  12  
  13  require "mutex_m"
  14  require "monitor"
  15  require "sync"
  16  
  17  class Shell
  18    class ProcessController
  19  
  20      @ProcessControllers = {}
  21      @ProcessControllers.extend Mutex_m
  22  
  23      class<<self
  24  
  25        def process_controllers_exclusive
  26          begin
  27            @ProcessControllers.lock unless Thread.critical 
  28            yield
  29          ensure
  30            @ProcessControllers.unlock unless Thread.critical 
  31          end
  32        end
  33  
  34        def activate(pc)
  35          process_controllers_exclusive do
  36            @ProcessControllers[pc] ||= 0
  37            @ProcessControllers[pc] += 1
  38          end
  39        end
  40  
  41        def inactivate(pc)
  42          process_controllers_exclusive do
  43            if @ProcessControllers[pc]
  44              if (@ProcessControllers[pc] -= 1) == 0
  45                @ProcessControllers.delete(pc)
  46              end
  47            end
  48          end
  49        end
  50  
  51        def each_active_object
  52          process_controllers_exclusive do
  53            for ref in @ProcessControllers.keys
  54              yield ref
  55            end
  56          end
  57        end
  58      end
  59  
  60      def initialize(shell)
  61        @shell = shell
  62        @waiting_jobs = []
  63        @active_jobs = []
  64        @jobs_sync = Sync.new
  65  
  66        @job_monitor = Mutex.new
  67        @job_condition = ConditionVariable.new
  68      end
  69  
  70      def jobs
  71        jobs = []
  72        @jobs_sync.synchronize(:SH) do
  73          jobs.concat @waiting_jobs
  74          jobs.concat @active_jobs
  75        end
  76        jobs
  77      end
  78  
  79      def active_jobs
  80        @active_jobs
  81      end
  82  
  83      def waiting_jobs
  84        @waiting_jobs
  85      end
  86      
  87      def jobs_exist?
  88        @jobs_sync.synchronize(:SH) do
  89          @active_jobs.empty? or @waiting_jobs.empty?
  90        end
  91      end
  92  
  93      def active_jobs_exist?
  94        @jobs_sync.synchronize(:SH) do
  95          @active_jobs.empty?
  96        end
  97      end
  98  
  99      def waiting_jobs_exist?
 100        @jobs_sync.synchronize(:SH) do
 101          @waiting_jobs.empty?
 102        end
 103      end
 104  
 105      # schedule a command
 106      def add_schedule(command)
 107        @jobs_sync.synchronize(:EX) do
 108          ProcessController.activate(self)
 109          if @active_jobs.empty?
 110            start_job command
 111          else
 112            @waiting_jobs.push(command)
 113          end
 114        end
 115      end
 116  
 117      # start a job
 118      def start_job(command = nil)
 119        @jobs_sync.synchronize(:EX) do
 120          if command
 121            return if command.active?
 122            @waiting_jobs.delete command
 123          else
 124            command = @waiting_jobs.shift
 125            return unless command
 126          end
 127          @active_jobs.push command
 128          command.start
 129  
 130          # start all jobs that input from the job
 131          for job in @waiting_jobs
 132            start_job(job) if job.input == command
 133          end
 134        end
 135      end
 136  
 137      def waiting_job?(job)
 138        @jobs_sync.synchronize(:SH) do
 139          @waiting_jobs.include?(job)
 140        end
 141      end
 142  
 143      def active_job?(job)
 144        @jobs_sync.synchronize(:SH) do
 145          @active_jobs.include?(job)
 146        end
 147      end
 148  
 149      # terminate a job
 150      def terminate_job(command)
 151        @jobs_sync.synchronize(:EX) do
 152          @active_jobs.delete command
 153          ProcessController.inactivate(self)
 154          if @active_jobs.empty?
 155            start_job
 156          end
 157        end
 158      end
 159  
 160      # kill a job
 161      def kill_job(sig, command)
 162        @jobs_sync.synchronize(:SH) do
 163          if @waiting_jobs.delete command
 164            ProcessController.inactivate(self)
 165            return
 166          elsif @active_jobs.include?(command)
 167            begin
 168              r = command.kill sig
 169              ProcessController.inactivate(self)
 170            rescue
 171              print "Shell: Warn: $!\n" if @shell.verbose?
 172              return nil
 173            end
 174            @active_jobs.delete command
 175            r
 176          end
 177        end
 178      end
 179  
 180      # wait for all jobs to terminate
 181      def wait_all_jobs_execution
 182        @job_monitor.synchronize do
 183          begin
 184            while !jobs.empty?
 185              @job_condition.wait(@job_monitor)
 186            end
 187          ensure
 188            redo unless jobs.empty?
 189          end
 190        end
 191      end
 192  
 193      # simple fork
 194      def sfork(command, &block)
 195        pipe_me_in, pipe_peer_out = IO.pipe
 196        pipe_peer_in, pipe_me_out = IO.pipe
 197        Thread.critical = true
 198  
 199        STDOUT.flush
 200        ProcessController.each_active_object do |pc|
 201          for jobs in pc.active_jobs
 202            jobs.flush
 203          end
 204        end
 205        
 206        pid = fork {
 207          Thread.critical = true
 208  
 209          Thread.list.each do |th| 
 210            th.kill unless [Thread.main, Thread.current].include?(th)
 211          end
 212  
 213          STDIN.reopen(pipe_peer_in)
 214          STDOUT.reopen(pipe_peer_out)
 215  
 216          ObjectSpace.each_object(IO) do |io| 
 217            if ![STDIN, STDOUT, STDERR].include?(io)
 218              io.close unless io.closed?
 219            end
 220          end
 221          yield
 222        }
 223  
 224        pipe_peer_in.close
 225        pipe_peer_out.close
 226        command.notify "job(%name:##{pid}) start", @shell.debug?
 227        Thread.critical = false
 228  
 229        th = Thread.start {
 230          Thread.critical = true
 231          begin
 232            _pid = nil
 233            command.notify("job(%id) start to waiting finish.", @shell.debug?)
 234            Thread.critical = false
 235            _pid = Process.waitpid(pid, nil)
 236          rescue Errno::ECHILD
 237            command.notify "warn: job(%id) was done already waitipd."
 238            _pid = true
 239          ensure
 240            # when the process ends, wait until the command termintes
 241            if _pid
 242            else
 243              command.notify("notice: Process finishing...",
 244                             "wait for Job[%id] to finish.",
 245                             "You can use Shell#transact or Shell#check_point for more safe execution.")
 246              redo
 247            end
 248            Thread.exclusive do
 249              terminate_job(command)
 250              @job_condition.signal
 251              command.notify "job(%id) finish.", @shell.debug?
 252            end
 253          end
 254        }
 255        return pid, pipe_me_in, pipe_me_out
 256      end
 257    end
 258  end