diff options
Diffstat (limited to 'lib/monitor.rb')
| -rw-r--r-- | lib/monitor.rb | 487 |
1 files changed, 189 insertions, 298 deletions
diff --git a/lib/monitor.rb b/lib/monitor.rb index 81fe8f2b22..21329a5de7 100644 --- a/lib/monitor.rb +++ b/lib/monitor.rb @@ -1,325 +1,216 @@ -## monitor.rb - -# Author: Shugo Maeda <shugo@po.aianet.ne.jp> -# Version: $Revision: 0.1 $ - -# USAGE: +# frozen_string_literal: false +# = monitor.rb # -# foo = Foo.new -# foo.extend(MonitorMixin) -# cond = foo.new_cond +# Copyright (C) 2001 Shugo Maeda <shugo@ruby-lang.org> # -# thread1: -# foo.synchronize { -# ... -# cond.wait_until { foo.done? } -# ... -# } +# This library is distributed under the terms of the Ruby license. +# You can freely distribute/modify this library. # -# thread2: -# foo.synchronize { -# foo.do_something -# cond.signal -# } - -# ATTENTION: # -# If you include MonitorMixin and override `initialize', you should -# call `super'. -# If you include MonitorMixin to built-in classes, you should override -# `new' to call `mon_initialize'. +# In concurrent programming, a monitor is an object or module intended to be +# used safely by more than one thread. The defining characteristic of a +# monitor is that its methods are executed with mutual exclusion. That is, at +# each point in time, at most one thread may be executing any of its methods. +# This mutual exclusion greatly simplifies reasoning about the implementation +# of monitors compared to reasoning about parallel code that updates a data +# structure. +# +# You can read more about the general principles on the Wikipedia page for +# Monitors[https://en.wikipedia.org/wiki/Monitor_%28synchronization%29]. +# +# == Examples +# +# === Simple object.extend +# +# require 'monitor.rb' +# +# buf = [] +# buf.extend(MonitorMixin) +# empty_cond = buf.new_cond +# +# # consumer +# Thread.start do +# loop do +# buf.synchronize do +# empty_cond.wait_while { buf.empty? } +# print buf.shift +# end +# end +# end +# +# # producer +# while line = ARGF.gets +# buf.synchronize do +# buf.push(line) +# empty_cond.signal +# end +# end +# +# The consumer thread waits for the producer thread to push a line to buf +# while <tt>buf.empty?</tt>. The producer thread (main thread) reads a +# line from ARGF and pushes it into buf then calls <tt>empty_cond.signal</tt> +# to notify the consumer thread of new data. +# +# === Simple Class include +# +# require 'monitor' +# +# class SynchronizedArray < Array +# +# include MonitorMixin +# +# def initialize(*args) +# super(*args) +# end +# +# alias :old_shift :shift +# alias :old_unshift :unshift +# +# def shift(n=1) +# self.synchronize do +# self.old_shift(n) +# end +# end +# +# def unshift(item) +# self.synchronize do +# self.old_unshift(item) +# end +# end +# +# # other methods ... +# end +# +# +SynchronizedArray+ implements an Array with synchronized access to items. +# This Class is implemented as subclass of Array which includes the +# MonitorMixin module. +# +module MonitorMixin + ConditionVariable = Monitor::ConditionVariable # :nodoc: -## Code: - -require "final" + # + # FIXME: This isn't documented in Nutshell. + # + # Since MonitorMixin.new_cond returns a ConditionVariable, and the example + # above calls while_wait and signal, this class should be documented. + # -module MonitorMixin - - RCS_ID = %q$Id: monitor.rb,v 0.1 1998/03/01 08:40:18 shugo Exp shugo $ - - module Primitive - - include MonitorMixin - - MON_OWNER_TABLE = {} - MON_COUNT_TABLE = {} - MON_ENTERING_QUEUE_TABLE = {} - MON_WAITING_QUEUE_TABLE = {} - - FINALIZER = Proc.new { |id| - MON_OWNER_TABLE.delete(id) - MON_COUNT_TABLE.delete(id) - MON_ENTERING_QUEUE_TABLE.delete(id) - MON_WAITING_QUEUE_TABLE.delete(id) - } - - def self.extend_object(obj) - super(obj) - obj.mon_initialize - end - - def mon_initialize - MON_OWNER_TABLE[id] = nil - MON_COUNT_TABLE[id] = 0 - MON_ENTERING_QUEUE_TABLE[id] = [] - MON_WAITING_QUEUE_TABLE[id] = [] - ObjectSpace.define_finalizer(self, FINALIZER) - end - - def mon_owner - return MON_OWNER_TABLE[id] - end - - def mon_count - return MON_COUNT_TABLE[id] - end - - def mon_entering_queue - return MON_ENTERING_QUEUE_TABLE[id] - end - - def mon_waiting_queue - return MON_WAITING_QUEUE_TABLE[id] - end - - def set_mon_owner(val) - return MON_OWNER_TABLE[id] = val - end - - def set_mon_count(val) - return MON_COUNT_TABLE[id] = val - end - - private :mon_count, :mon_entering_queue, :mon_waiting_queue, - :set_mon_owner, :set_mon_count - end - - module NonPrimitive - - include MonitorMixin - - attr_reader :mon_owner, :mon_count, - :mon_entering_queue, :mon_waiting_queue - - def self.extend_object(obj) - super(obj) - obj.mon_initialize - end - - def mon_initialize - @mon_owner = nil - @mon_count = 0 - @mon_entering_queue = [] - @mon_waiting_queue = [] - end - - def set_mon_owner(val) - @mon_owner = val - end - - def set_mon_count(val) - @mon_count = val - end - - private :mon_count, :mon_entering_queue, :mon_waiting_queue, - :set_mon_owner, :set_mon_count + def self.extend_object(obj) # :nodoc: + super(obj) + obj.__send__(:mon_initialize) end - - def self.extendable_module(obj) - if Fixnum === obj or TrueClass === obj or FalseClass === obj or - NilClass === obj - raise TypeError, "MonitorMixin can't extend #{obj.type}" - else - begin - obj.instance_eval("@mon_owner") - return NonPrimitive - rescue TypeError - return Primitive - end - end - end - - def self.extend_object(obj) - obj.extend(extendable_module(obj)) + + # + # Attempts to enter exclusive section. Returns +false+ if lock fails. + # + def mon_try_enter + @mon_data.try_enter end - - def self.includable_module(klass) - if klass.instance_of?(Module) - return NonPrimitive - end - begin - dummy = klass.new - return extendable_module(dummy) - rescue ArgumentError - if klass.singleton_methods.include?("new") - return Primitive - else - return NonPrimitive - end - rescue NameError - raise TypeError, "#{klass} can't include MonitorMixin" - end + # For backward compatibility + alias try_mon_enter mon_try_enter + + # + # Enters exclusive section. + # + def mon_enter + @mon_data.enter end - - def self.append_features(klass) - mod = includable_module(klass) - klass.module_eval("include mod") + + # + # Leaves exclusive section. + # + def mon_exit + mon_check_owner + @mon_data.exit end - - def initialize(*args) - super - mon_initialize + + # + # Returns true if this monitor is locked by any thread + # + def mon_locked? + @mon_data.mon_locked? end - - def try_mon_enter - result = false - Thread.critical = true - if mon_owner.nil? - set_mon_owner(Thread.current) - end - if mon_owner == Thread.current - set_mon_count(mon_count + 1) - result = true - end - Thread.critical = false - return result + + # + # Returns true if this monitor is locked by current thread. + # + def mon_owned? + @mon_data.mon_owned? end - def mon_enter - Thread.critical = true - while mon_owner != nil && mon_owner != Thread.current - mon_entering_queue.push(Thread.current) - Thread.stop - Thread.critical = true - end - set_mon_owner(Thread.current) - set_mon_count(mon_count + 1) - Thread.critical = false + # + # Enters exclusive section and executes the block. Leaves the exclusive + # section automatically when the block exits. See example under + # +MonitorMixin+. + # + def mon_synchronize(&b) + @mon_data.synchronize(&b) end - - def mon_exit - if mon_owner != Thread.current - raise ThreadError, "current thread not owner" - end - Thread.critical = true - set_mon_count(mon_count - 1) - if mon_count == 0 - set_mon_owner(nil) - if mon_waiting_queue.empty? - t = mon_entering_queue.shift - else - t = mon_waiting_queue.shift - end + alias synchronize mon_synchronize + + # + # Creates a new MonitorMixin::ConditionVariable associated with the + # Monitor object. + # + def new_cond + unless defined?(@mon_data) + mon_initialize + @mon_initialized_by_new_cond = true end - t.wakeup if t - Thread.critical = false - Thread.pass + return ConditionVariable.new(@mon_data) end - def mon_synchronize - mon_enter - begin - yield - ensure - mon_exit - end + private + + # Use <tt>extend MonitorMixin</tt> or <tt>include MonitorMixin</tt> instead + # of this constructor. Have look at the examples above to understand how to + # use this module. + def initialize(...) + super + mon_initialize end - alias synchronize mon_synchronize - class ConditionVariable - def initialize(monitor) - @monitor = monitor - @waiters = [] - end - - def wait - if @monitor.mon_owner != Thread.current - raise ThreadError, "current thread not owner" - end - - @monitor.instance_eval(<<MON_EXIT) - Thread.critical = true - _count = mon_count - set_mon_count(0) - set_mon_owner(nil) - if mon_waiting_queue.empty? - t = mon_entering_queue.shift - else - t = mon_waiting_queue.shift - end - t.wakeup if t - Thread.critical = false -MON_EXIT - - Thread.critical = true - @waiters.push(Thread.current) - Thread.stop - - @monitor.instance_eval(<<MON_ENTER) - Thread.critical = true - while mon_owner != nil && mon_owner != Thread.current - mon_waiting_queue.push(Thread.current) - Thread.stop - Thread.critical = true - end - set_mon_owner(Thread.current) - set_mon_count(_count) - Thread.critical = false -MON_ENTER - end - - def wait_while - while yield - wait + # Initializes the MonitorMixin after being included in a class or when an + # object has been extended with the MonitorMixin + def mon_initialize + if defined?(@mon_data) + if defined?(@mon_initialized_by_new_cond) + return # already initialized. + elsif @mon_data_owner_object_id == self.object_id + raise ThreadError, "already initialized" end end - - def wait_until - until yield - wait - end - end - - def signal - if @monitor.mon_owner != Thread.current - raise ThreadError, "current thread not owner" - end - Thread.critical = true - t = @waiters.shift - t.wakeup if t - Thread.critical = false - Thread.pass - end - - def broadcast - if @monitor.mon_owner != Thread.current - raise ThreadError, "current thread not owner" - end - Thread.critical = true - for t in @waiters - t.wakeup - end - @waiters.clear - Thread.critical = false - Thread.pass - end - - def count_waiters - return @waiters.length - end + @mon_data = ::Monitor.new + @mon_data_owner_object_id = self.object_id end - - def new_cond - return ConditionVariable.new(self) + + # Ensures that the MonitorMixin is owned by the current thread, + # otherwise raises an exception. + def mon_check_owner + @mon_data.mon_check_owner end end -class Monitor - include MonitorMixin - alias try_enter try_mon_enter - alias enter mon_enter - alias exit mon_exit - alias owner mon_owner +class Monitor # :nodoc: + alias try_mon_enter try_enter + alias mon_try_enter try_enter + alias mon_enter enter + alias mon_exit exit + alias mon_synchronize synchronize end -## monitor.rb ends here +# Documentation comments: +# - All documentation comes from Nutshell. +# - MonitorMixin.new_cond appears in the example, but is not documented in +# Nutshell. +# - All the internals (internal modules Accessible and Initializable, class +# ConditionVariable) appear in RDoc. It might be good to hide them, by +# making them private, or marking them :nodoc:, etc. +# - RDoc doesn't recognise aliases, so we have mon_synchronize documented, but +# not synchronize. +# - mon_owner is in Nutshell, but appears as an accessor in a separate module +# here, so is hard/impossible to RDoc. Some other useful accessors +# (mon_count and some queue stuff) are also in this module, and don't appear +# directly in the RDoc output. +# - in short, it may be worth changing the code layout in this file to make the +# documentation easier |
