summaryrefslogtreecommitdiff
path: root/lib/monitor.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/monitor.rb')
-rw-r--r--lib/monitor.rb415
1 files changed, 191 insertions, 224 deletions
diff --git a/lib/monitor.rb b/lib/monitor.rb
index 721c51a9f5..21329a5de7 100644
--- a/lib/monitor.rb
+++ b/lib/monitor.rb
@@ -1,249 +1,216 @@
-=begin
-
-= monitor.rb
+# frozen_string_literal: false
+# = monitor.rb
+#
+# Copyright (C) 2001 Shugo Maeda <shugo@ruby-lang.org>
+#
+# This library is distributed under the terms of the Ruby license.
+# You can freely distribute/modify this library.
+#
+#
+# In concurrent programming, a monitor is an object or module intended to be
+# used safely by more than one thread. The defining characteristic of a
+# monitor is that its methods are executed with mutual exclusion. That is, at
+# each point in time, at most one thread may be executing any of its methods.
+# This mutual exclusion greatly simplifies reasoning about the implementation
+# of monitors compared to reasoning about parallel code that updates a data
+# structure.
+#
+# You can read more about the general principles on the Wikipedia page for
+# Monitors[https://en.wikipedia.org/wiki/Monitor_%28synchronization%29].
+#
+# == Examples
+#
+# === Simple object.extend
+#
+# require 'monitor.rb'
+#
+# buf = []
+# buf.extend(MonitorMixin)
+# empty_cond = buf.new_cond
+#
+# # consumer
+# Thread.start do
+# loop do
+# buf.synchronize do
+# empty_cond.wait_while { buf.empty? }
+# print buf.shift
+# end
+# end
+# end
+#
+# # producer
+# while line = ARGF.gets
+# buf.synchronize do
+# buf.push(line)
+# empty_cond.signal
+# end
+# end
+#
+# The consumer thread waits for the producer thread to push a line to buf
+# while <tt>buf.empty?</tt>. The producer thread (main thread) reads a
+# line from ARGF and pushes it into buf then calls <tt>empty_cond.signal</tt>
+# to notify the consumer thread of new data.
+#
+# === Simple Class include
+#
+# require 'monitor'
+#
+# class SynchronizedArray < Array
+#
+# include MonitorMixin
+#
+# def initialize(*args)
+# super(*args)
+# end
+#
+# alias :old_shift :shift
+# alias :old_unshift :unshift
+#
+# def shift(n=1)
+# self.synchronize do
+# self.old_shift(n)
+# end
+# end
+#
+# def unshift(item)
+# self.synchronize do
+# self.old_unshift(item)
+# end
+# end
+#
+# # other methods ...
+# end
+#
+# +SynchronizedArray+ implements an Array with synchronized access to items.
+# This Class is implemented as subclass of Array which includes the
+# MonitorMixin module.
+#
+module MonitorMixin
+ ConditionVariable = Monitor::ConditionVariable # :nodoc:
-Copyright (C) 2001 Shugo Maeda <shugo@ruby-lang.org>
+ #
+ # FIXME: This isn't documented in Nutshell.
+ #
+ # Since MonitorMixin.new_cond returns a ConditionVariable, and the example
+ # above calls while_wait and signal, this class should be documented.
+ #
-This library is distributed under the terms of the Ruby license.
-You can freely distribute/modify this library.
+ def self.extend_object(obj) # :nodoc:
+ super(obj)
+ obj.__send__(:mon_initialize)
+ end
-== example
+ #
+ # Attempts to enter exclusive section. Returns +false+ if lock fails.
+ #
+ def mon_try_enter
+ @mon_data.try_enter
+ end
+ # For backward compatibility
+ alias try_mon_enter mon_try_enter
-This is a simple example.
+ #
+ # Enters exclusive section.
+ #
+ def mon_enter
+ @mon_data.enter
+ end
- require 'monitor.rb'
-
- buf = []
- buf.extend(MonitorMixin)
- empty_cond = buf.new_cond
-
- # consumer
- Thread.start do
- loop do
- buf.synchronize do
- empty_cond.wait_while { buf.empty? }
- print buf.shift
- end
- end
+ #
+ # Leaves exclusive section.
+ #
+ def mon_exit
+ mon_check_owner
+ @mon_data.exit
end
-
- # producer
- while line = ARGF.gets
- buf.synchronize do
- buf.push(line)
- empty_cond.signal
- end
+
+ #
+ # Returns true if this monitor is locked by any thread
+ #
+ def mon_locked?
+ @mon_data.mon_locked?
end
-The consumer thread waits for the producer thread to push a line
-to buf while buf.empty?, and the producer thread (main thread)
-reads a line from ARGF and push it to buf, then call
-empty_cond.signal.
+ #
+ # Returns true if this monitor is locked by current thread.
+ #
+ def mon_owned?
+ @mon_data.mon_owned?
+ end
-=end
-
-module MonitorMixin
- module Accessible
- protected
- attr_accessor :mon_owner, :mon_count
- attr_reader :mon_entering_queue, :mon_waiting_queue
+ #
+ # Enters exclusive section and executes the block. Leaves the exclusive
+ # section automatically when the block exits. See example under
+ # +MonitorMixin+.
+ #
+ def mon_synchronize(&b)
+ @mon_data.synchronize(&b)
end
-
- module Initializable
- protected
- def mon_initialize
- @mon_owner = nil
- @mon_count = 0
- @mon_entering_queue = []
- @mon_waiting_queue = []
+ alias synchronize mon_synchronize
+
+ #
+ # Creates a new MonitorMixin::ConditionVariable associated with the
+ # Monitor object.
+ #
+ def new_cond
+ unless defined?(@mon_data)
+ mon_initialize
+ @mon_initialized_by_new_cond = true
end
+ return ConditionVariable.new(@mon_data)
end
-
- class ConditionVariable
- class Timeout < Exception; end
-
- include Accessible
-
- def wait(timeout = nil)
- if @monitor.mon_owner != Thread.current
- raise ThreadError, "current thread not owner"
- end
-
- if timeout
- ct = Thread.current
- timeout_thread = Thread.start {
- Thread.pass
- sleep(timeout)
- ct.raise(Timeout.new)
- }
- end
-
- Thread.critical = true
- count = @monitor.mon_count
- @monitor.mon_count = 0
- @monitor.mon_owner = nil
- if @monitor.mon_waiting_queue.empty?
- t = @monitor.mon_entering_queue.shift
- else
- t = @monitor.mon_waiting_queue.shift
- end
- t.wakeup if t
- @waiters.push(Thread.current)
- begin
- Thread.stop
- rescue Timeout
- ensure
- Thread.critical = true
- if timeout && timeout_thread.alive?
- Thread.kill(timeout_thread)
- end
- if @waiters.include?(Thread.current) # interrupted?
- @waiters.delete(Thread.current)
- end
- while @monitor.mon_owner &&
- @monitor.mon_owner != Thread.current
- @monitor.mon_waiting_queue.push(Thread.current)
- Thread.stop
- Thread.critical = true
- end
- @monitor.mon_owner = Thread.current
- @monitor.mon_count = count
- Thread.critical = false
- end
- end
-
- def wait_while
- while yield
- wait
- end
- end
-
- def wait_until
- until yield
- wait
- end
- end
-
- def signal
- if @monitor.mon_owner != Thread.current
- raise ThreadError, "current thread not owner"
- end
- Thread.critical = true
- t = @waiters.shift
- t.wakeup if t
- Thread.critical = false
- Thread.pass
- end
-
- def broadcast
- if @monitor.mon_owner != Thread.current
- raise ThreadError, "current thread not owner"
- end
- Thread.critical = true
- for t in @waiters
- t.wakeup
- end
- @waiters.clear
- Thread.critical = false
- Thread.pass
- end
-
- def count_waiters
- return @waiters.length
- end
-
private
- def initialize(monitor)
- @monitor = monitor
- @waiters = []
- end
- end
-
- include Accessible
- include Initializable
- extend Initializable
-
- def self.extend_object(obj)
- super(obj)
- obj.mon_initialize
- end
-
- def try_mon_enter
- result = false
- Thread.critical = true
- if mon_owner.nil?
- self.mon_owner = Thread.current
- end
- if mon_owner == Thread.current
- self.mon_count += 1
- result = true
- end
- Thread.critical = false
- return result
- end
- def mon_enter
- Thread.critical = true
- while mon_owner != nil && mon_owner != Thread.current
- mon_entering_queue.push(Thread.current)
- Thread.stop
- Thread.critical = true
- end
- self.mon_owner = Thread.current
- self.mon_count += 1
- Thread.critical = false
+ # Use <tt>extend MonitorMixin</tt> or <tt>include MonitorMixin</tt> instead
+ # of this constructor. Have look at the examples above to understand how to
+ # use this module.
+ def initialize(...)
+ super
+ mon_initialize
end
-
- def mon_exit
- if mon_owner != Thread.current
- raise ThreadError, "current thread not owner"
- end
- Thread.critical = true
- self.mon_count -= 1
- if mon_count == 0
- self.mon_owner = nil
- if mon_waiting_queue.empty?
- t = mon_entering_queue.shift
- else
- t = mon_waiting_queue.shift
+
+ # Initializes the MonitorMixin after being included in a class or when an
+ # object has been extended with the MonitorMixin
+ def mon_initialize
+ if defined?(@mon_data)
+ if defined?(@mon_initialized_by_new_cond)
+ return # already initialized.
+ elsif @mon_data_owner_object_id == self.object_id
+ raise ThreadError, "already initialized"
end
end
- t.wakeup if t
- Thread.critical = false
- Thread.pass
+ @mon_data = ::Monitor.new
+ @mon_data_owner_object_id = self.object_id
end
- def mon_synchronize
- mon_enter
- begin
- yield
- ensure
- mon_exit
- end
- end
- alias synchronize mon_synchronize
-
- def new_cond
- return ConditionVariable.new(self)
- end
-
-private
- def initialize(*args)
- super
- mon_initialize
+ # Ensures that the MonitorMixin is owned by the current thread,
+ # otherwise raises an exception.
+ def mon_check_owner
+ @mon_data.mon_check_owner
end
end
-class Monitor
- include MonitorMixin
- alias try_enter try_mon_enter
- alias enter mon_enter
- alias exit mon_exit
- alias owner mon_owner
+class Monitor # :nodoc:
+ alias try_mon_enter try_enter
+ alias mon_try_enter try_enter
+ alias mon_enter enter
+ alias mon_exit exit
+ alias mon_synchronize synchronize
end
-# Local variables:
-# mode: Ruby
-# tab-width: 8
-# End:
+# Documentation comments:
+# - All documentation comes from Nutshell.
+# - MonitorMixin.new_cond appears in the example, but is not documented in
+# Nutshell.
+# - All the internals (internal modules Accessible and Initializable, class
+# ConditionVariable) appear in RDoc. It might be good to hide them, by
+# making them private, or marking them :nodoc:, etc.
+# - RDoc doesn't recognise aliases, so we have mon_synchronize documented, but
+# not synchronize.
+# - mon_owner is in Nutshell, but appears as an accessor in a separate module
+# here, so is hard/impossible to RDoc. Some other useful accessors
+# (mon_count and some queue stuff) are also in this module, and don't appear
+# directly in the RDoc output.
+# - in short, it may be worth changing the code layout in this file to make the
+# documentation easier