diff options
Diffstat (limited to 'lib/monitor.rb')
| -rw-r--r-- | lib/monitor.rb | 398 |
1 files changed, 185 insertions, 213 deletions
diff --git a/lib/monitor.rb b/lib/monitor.rb index 4c146c8086..e99cbd4baa 100644 --- a/lib/monitor.rb +++ b/lib/monitor.rb @@ -1,82 +1,95 @@ -=begin - -= monitor.rb - -Copyright (C) 2001 Shugo Maeda <shugo@ruby-lang.org> - -This library is distributed under the terms of the Ruby license. -You can freely distribute/modify this library. - -== example - -This is a simple example. - - require 'monitor.rb' - - buf = [] - buf.extend(MonitorMixin) - empty_cond = buf.new_cond - - # consumer - Thread.start do - loop do - buf.synchronize do - empty_cond.wait_while { buf.empty? } - print buf.shift - end - end - end - - # producer - while line = ARGF.gets - buf.synchronize do - buf.push(line) - empty_cond.signal - end - end - -The consumer thread waits for the producer thread to push a line -to buf while buf.empty?, and the producer thread (main thread) -reads a line from ARGF and push it to buf, then call -empty_cond.signal. - -=end - +# frozen_string_literal: false +# = monitor.rb +# +# Copyright (C) 2001 Shugo Maeda <shugo@ruby-lang.org> +# +# This library is distributed under the terms of the Ruby license. +# You can freely distribute/modify this library. +# # -# Adds monitor functionality to an arbitrary object by mixing the module with -# +include+. For example: +# In concurrent programming, a monitor is an object or module intended to be +# used safely by more than one thread. The defining characteristic of a +# monitor is that its methods are executed with mutual exclusion. That is, at +# each point in time, at most one thread may be executing any of its methods. +# This mutual exclusion greatly simplifies reasoning about the implementation +# of monitors compared to reasoning about parallel code that updates a data +# structure. +# +# You can read more about the general principles on the Wikipedia page for +# Monitors[http://en.wikipedia.org/wiki/Monitor_%28synchronization%29] +# +# == Examples +# +# === Simple object.extend +# +# require 'monitor.rb' # -# require 'monitor.rb' -# -# buf = [] -# buf.extend(MonitorMixin) -# empty_cond = buf.new_cond -# -# # consumer -# Thread.start do -# loop do -# buf.synchronize do -# empty_cond.wait_while { buf.empty? } -# print buf.shift -# end -# end -# end -# -# # producer -# while line = ARGF.gets -# buf.synchronize do -# buf.push(line) -# empty_cond.signal -# end -# end -# -# The consumer thread waits for the producer thread to push a line -# to buf while buf.empty?, and the producer thread (main thread) -# reads a line from ARGF and push it to buf, then call -# empty_cond.signal. +# buf = [] +# buf.extend(MonitorMixin) +# empty_cond = buf.new_cond +# +# # consumer +# Thread.start do +# loop do +# buf.synchronize do +# empty_cond.wait_while { buf.empty? } +# print buf.shift +# end +# end +# end +# +# # producer +# while line = ARGF.gets +# buf.synchronize do +# buf.push(line) +# empty_cond.signal +# end +# end +# +# The consumer thread waits for the producer thread to push a line to buf +# while <tt>buf.empty?</tt>. The producer thread (main thread) reads a +# line from ARGF and pushes it into buf then calls <tt>empty_cond.signal</tt> +# to notify the consumer thread of new data. +# +# === Simple Class include +# +# require 'monitor' +# +# class SynchronizedArray < Array +# +# include MonitorMixin +# +# def initialize(*args) +# super(*args) +# end +# +# alias :old_shift :shift +# alias :old_unshift :unshift +# +# def shift(n=1) +# self.synchronize do +# self.old_shift(n) +# end +# end +# +# def unshift(item) +# self.synchronize do +# self.old_unshift(item) +# end +# end +# +# # other methods ... +# end +# +# +SynchronizedArray+ implements an Array with synchronized access to items. +# This Class is implemented as subclass of Array which includes the +# MonitorMixin module. # module MonitorMixin + EXCEPTION_NEVER = {Exception => :never}.freeze + EXCEPTION_IMMEDIATE = {Exception => :immediate}.freeze + # # FIXME: This isn't documented in Nutshell. # @@ -85,123 +98,88 @@ module MonitorMixin # class ConditionVariable class Timeout < Exception; end - - # Create a new timer with the argument timeout, and add the - # current thread to the list of waiters. Then the thread is - # stopped. It will be resumed when a corresponding #signal - # occurs. - def wait(timeout = nil) - @monitor.instance_eval {mon_check_owner()} - timer = create_timer(timeout) - - Thread.critical = true - count = @monitor.instance_eval {mon_exit_for_cond()} - @waiters.push(Thread.current) - begin - Thread.stop - return true - rescue Timeout - return false - ensure - Thread.critical = true - begin - if timer && timer.alive? - Thread.kill(timer) - end - if @waiters.include?(Thread.current) # interrupted? - @waiters.delete(Thread.current) - end - @monitor.instance_eval {mon_enter_for_cond(count)} - ensure - Thread.critical = false - end + # + # Releases the lock held in the associated monitor and waits; reacquires the lock on wakeup. + # + # If +timeout+ is given, this method returns after +timeout+ seconds passed, + # even if no other thread doesn't signal. + # + def wait(timeout = nil) + Thread.handle_interrupt(EXCEPTION_NEVER) do + @monitor.__send__(:mon_check_owner) + count = @monitor.__send__(:mon_exit_for_cond) + begin + Thread.handle_interrupt(EXCEPTION_IMMEDIATE) do + @cond.wait(@monitor.instance_variable_get(:@mon_mutex), timeout) + end + return true + ensure + @monitor.__send__(:mon_enter_for_cond, count) + end end end - - # call #wait while the supplied block returns +true+. + # + # Calls wait repeatedly while the given block yields a truthy value. + # def wait_while while yield - wait + wait end end - - # call #wait until the supplied block returns +true+. + + # + # Calls wait repeatedly until the given block yields a truthy value. + # def wait_until until yield - wait + wait end end - - # Wake up and run the next waiter + + # + # Wakes up the first thread in line waiting for this lock. + # def signal - @monitor.instance_eval {mon_check_owner()} - Thread.critical = true - t = @waiters.shift - t.wakeup if t - Thread.critical = false - Thread.pass + @monitor.__send__(:mon_check_owner) + @cond.signal end - - # Wake up all the waiters. + + # + # Wakes up all threads waiting for this lock. + # def broadcast - @monitor.instance_eval {mon_check_owner()} - Thread.critical = true - for t in @waiters - t.wakeup - end - @waiters.clear - Thread.critical = false - Thread.pass - end - - def count_waiters - return @waiters.length + @monitor.__send__(:mon_check_owner) + @cond.broadcast end - + private def initialize(monitor) @monitor = monitor - @waiters = [] - end - - def create_timer(timeout) - if timeout - waiter = Thread.current - return Thread.start { - Thread.pass - sleep(timeout) - Thread.critical = true - waiter.raise(Timeout.new) - } - else - return nil - end + @cond = Thread::ConditionVariable.new end end - + def self.extend_object(obj) super(obj) - obj.instance_eval {mon_initialize()} + obj.__send__(:mon_initialize) end - + # # Attempts to enter exclusive section. Returns +false+ if lock fails. # def mon_try_enter - result = false - Thread.critical = true - if @mon_owner.nil? + if @mon_owner != Thread.current + unless @mon_mutex.try_lock + return false + end @mon_owner = Thread.current + @mon_count = 0 end - if @mon_owner == Thread.current - @mon_count += 1 - result = true - end - Thread.critical = false - return result + @mon_count += 1 + return true end # For backward compatibility alias try_mon_enter mon_try_enter @@ -210,25 +188,38 @@ module MonitorMixin # Enters exclusive section. # def mon_enter - Thread.critical = true - mon_acquire(@mon_entering_queue) + if @mon_owner != Thread.current + @mon_mutex.lock + @mon_owner = Thread.current + @mon_count = 0 + end @mon_count += 1 - ensure - Thread.critical = false end - + # # Leaves exclusive section. # def mon_exit mon_check_owner - Thread.critical = true - @mon_count -= 1 + @mon_count -=1 if @mon_count == 0 - mon_release + @mon_owner = nil + @mon_mutex.unlock end - Thread.critical = false - Thread.pass + end + + # + # Returns true if this monitor is locked by any thread + # + def mon_locked? + @mon_mutex.locked? + end + + # + # Returns true if this monitor is locked by current thread. + # + def mon_owned? + @mon_mutex.locked? && @mon_owner == Thread.current end # @@ -237,20 +228,20 @@ module MonitorMixin # +MonitorMixin+. # def mon_synchronize - mon_enter + # Prevent interrupt on handling interrupts; for example timeout errors + # it may break locking state. + Thread.handle_interrupt(Exception => :never){ mon_enter } begin yield ensure - mon_exit + Thread.handle_interrupt(EXCEPTION_NEVER){ mon_exit } end end alias synchronize mon_synchronize - + # - # FIXME: This isn't documented in Nutshell. - # - # Create a new condition variable for this monitor. - # This facilitates control of the monitor with #signal and #wait. + # Creates a new MonitorMixin::ConditionVariable associated with the + # receiver. # def new_cond return ConditionVariable.new(self) @@ -258,68 +249,51 @@ module MonitorMixin private + # Use <tt>extend MonitorMixin</tt> or <tt>include MonitorMixin</tt> instead + # of this constructor. Have look at the examples above to understand how to + # use this module. def initialize(*args) super mon_initialize end - # called by initialize method to set defaults for instance variables. + # Initializes the MonitorMixin after being included in a class or when an + # object has been extended with the MonitorMixin def mon_initialize @mon_owner = nil @mon_count = 0 - @mon_entering_queue = [] - @mon_waiting_queue = [] + @mon_mutex = Thread::Mutex.new end - # Throw a ThreadError exception if the current thread - # does't own the monitor def mon_check_owner if @mon_owner != Thread.current raise ThreadError, "current thread not owner" end end - def mon_acquire(queue) - while @mon_owner && @mon_owner != Thread.current - queue.push(Thread.current) - Thread.stop - Thread.critical = true - end - @mon_owner = Thread.current - end - - def mon_release - @mon_owner = nil - t = @mon_waiting_queue.shift - t = @mon_entering_queue.shift unless t - t.wakeup if t - end - def mon_enter_for_cond(count) - mon_acquire(@mon_waiting_queue) + @mon_owner = Thread.current @mon_count = count end def mon_exit_for_cond count = @mon_count + @mon_owner = nil @mon_count = 0 return count - ensure - mon_release end end -# Monitors provide means of mutual exclusion for Thread programming. -# A critical region is created by means of the synchronize method, -# which takes a block. -# The condition variables (created with #new_cond) may be used -# to control the execution of a monitor with #signal and #wait. +# Use the Monitor class when you want to have a lock object for blocks with +# mutual exclusion. +# +# require 'monitor' +# +# lock = Monitor.new +# lock.synchronize do +# # exclusive access +# end # -# the Monitor class wraps MonitorMixin, and provides aliases -# alias try_enter try_mon_enter -# alias enter mon_enter -# alias exit mon_exit -# to access its methods more concisely. class Monitor include MonitorMixin alias try_enter try_mon_enter @@ -335,8 +309,6 @@ end # - All the internals (internal modules Accessible and Initializable, class # ConditionVariable) appear in RDoc. It might be good to hide them, by # making them private, or marking them :nodoc:, etc. -# - The entire example from the RD section at the top is replicated in the RDoc -# comment for MonitorMixin. Does the RD section need to remain? # - RDoc doesn't recognise aliases, so we have mon_synchronize documented, but # not synchronize. # - mon_owner is in Nutshell, but appears as an accessor in a separate module |
