diff options
Diffstat (limited to 'lib/sync.rb')
| -rw-r--r-- | lib/sync.rb | 346 |
1 files changed, 182 insertions, 164 deletions
diff --git a/lib/sync.rb b/lib/sync.rb index 03b161c274..40a48ac985 100644 --- a/lib/sync.rb +++ b/lib/sync.rb @@ -1,9 +1,9 @@ +# frozen_string_literal: false # # sync.rb - 2 phase lock with counter -# $Release Version: 1.0$ -# $Revision: 1.4 $ -# $Date: 2001/06/06 14:19:33 $ -# by Keiju ISHITSUKA(keiju@ishitsuka.com) +# $Release Version: 1.0$ +# $Revision$ +# by Keiju ISHITSUKA(keiju@ishitsuka.com) # # -- # Sync_m, Synchronizer_m @@ -11,8 +11,8 @@ # obj.extend(Sync_m) # or # class Foo -# include Sync_m -# : +# include Sync_m +# : # end # # Sync_m#sync_mode @@ -23,9 +23,8 @@ # Sync_m#sync_lock, lock # Sync_m#sync_unlock, unlock # -# Sync, Synchronicer: -# include Sync_m -# Usage: +# Sync, Synchronizer: +# Usage: # sync = Sync.new # # Sync#mode @@ -36,45 +35,46 @@ # Sync#lock(mode) -- mode = :EX, :SH, :UN # Sync#unlock # Sync#synchronize(mode) {...} -# +# # unless defined? Thread - fail "Thread not available for this ruby interpreter" + raise "Thread not available for this ruby interpreter" end +## +# A module that provides a two-phase lock with a counter. + module Sync_m - RCS_ID='-$Header: /src/ruby/lib/sync.rb,v 1.4 2001/06/06 14:19:33 keiju Exp $-' - # lock mode UN = :UN SH = :SH EX = :EX - + # exceptions class Err < StandardError def Err.Fail(*opt) fail self, sprintf(self::Message, *opt) end - + class UnknownLocker < Err Message = "Thread(%s) not locked." def UnknownLocker.Fail(th) - super(th.inspect) + super(th.inspect) end end - + class LockModeFailer < Err Message = "Unknown lock mode(%s)" def LockModeFailer.Fail(mode) - if mode.id2name - mode = id2name - end - super(mode) + if mode.id2name + mode = mode.id2name + end + super(mode) end end end - + def Sync_m.define_aliases(cl) cl.module_eval %q{ alias locked? sync_locked? @@ -86,30 +86,29 @@ module Sync_m alias synchronize sync_synchronize } end - + def Sync_m.append_features(cl) super - unless cl.instance_of?(Module) - # do nothing for Modules - # make aliases and include the proper module. - define_aliases(cl) - end + # do nothing for Modules + # make aliases for Classes. + define_aliases(cl) unless cl.instance_of?(Module) + self end - + def Sync_m.extend_object(obj) super - obj.sync_extended + obj.sync_extend end - def sync_extended + def sync_extend unless (defined? locked? and - defined? shared? and - defined? exclusive? and - defined? lock and - defined? unlock and - defined? try_lock and - defined? synchronize) - Sync_m.define_aliases(class<<self;self;end) + defined? shared? and + defined? exclusive? and + defined? lock and + defined? unlock and + defined? try_lock and + defined? synchronize) + Sync_m.define_aliases(singleton_class) end sync_initialize end @@ -118,128 +117,140 @@ module Sync_m def sync_locked? sync_mode != UN end - + def sync_shared? sync_mode == SH end - + def sync_exclusive? sync_mode == EX end - + # locking methods. def sync_try_lock(mode = EX) - return unlock if sync_mode == UN - - Thread.critical = true - ret = sync_try_lock_sub(sync_mode) - Thread.critical = false - ret + return unlock if mode == UN + @sync_mutex.synchronize do + sync_try_lock_sub(mode) + end end - + def sync_lock(m = EX) return unlock if m == UN - - until (Thread.critical = true; sync_try_lock_sub(m)) - if sync_sh_locker[Thread.current] - sync_upgrade_waiting.push [Thread.current, sync_sh_locker[Thread.current]] - sync_sh_locker.delete(Thread.current) - else - sync_waiting.push Thread.current + Thread.handle_interrupt(StandardError => :on_blocking) do + while true + @sync_mutex.synchronize do + begin + if sync_try_lock_sub(m) + return self + else + if sync_sh_locker[Thread.current] + sync_upgrade_waiting.push [Thread.current, sync_sh_locker[Thread.current]] + sync_sh_locker.delete(Thread.current) + else + unless sync_waiting.include?(Thread.current) || sync_upgrade_waiting.reverse_each.any?{|w| w.first == Thread.current } + sync_waiting.push Thread.current + end + end + @sync_mutex.sleep + end + ensure + sync_waiting.delete(Thread.current) + end + end end - Thread.stop end - Thread.critical = false self end - + def sync_unlock(m = EX) - Thread.critical = true - if sync_mode == UN - Thread.critical = false - Err::UnknownLocker.Fail(Thread.current) - end - - m = sync_mode if m == EX and sync_mode == SH - - runnable = false - case m - when UN - Thread.critical = false - Err::UnknownLocker.Fail(Thread.current) - - when EX - if sync_ex_locker == Thread.current - if (self.sync_ex_count = sync_ex_count - 1) == 0 - self.sync_ex_locker = nil - if sync_sh_locker.include?(Thread.current) - self.sync_mode = SH - else - self.sync_mode = UN - end - runnable = true - end - else - Err::UnknownLocker.Fail(Thread.current) + wakeup_threads = [] + @sync_mutex.synchronize do + if sync_mode == UN + Err::UnknownLocker.Fail(Thread.current) end - - when SH - if (count = sync_sh_locker[Thread.current]).nil? - Err::UnknownLocker.Fail(Thread.current) - else - if (sync_sh_locker[Thread.current] = count - 1) == 0 - sync_sh_locker.delete(Thread.current) - if sync_sh_locker.empty? and sync_ex_count == 0 - self.sync_mode = UN - runnable = true - end - end + + m = sync_mode if m == EX and sync_mode == SH + + runnable = false + case m + when UN + Err::UnknownLocker.Fail(Thread.current) + + when EX + if sync_ex_locker == Thread.current + if (self.sync_ex_count = sync_ex_count - 1) == 0 + self.sync_ex_locker = nil + if sync_sh_locker.include?(Thread.current) + self.sync_mode = SH + else + self.sync_mode = UN + end + runnable = true + end + else + Err::UnknownLocker.Fail(Thread.current) + end + + when SH + if (count = sync_sh_locker[Thread.current]).nil? + Err::UnknownLocker.Fail(Thread.current) + else + if (sync_sh_locker[Thread.current] = count - 1) == 0 + sync_sh_locker.delete(Thread.current) + if sync_sh_locker.empty? and sync_ex_count == 0 + self.sync_mode = UN + runnable = true + end + end + end end - end - - if runnable - if sync_upgrade_waiting.size > 0 - for k, v in sync_upgrade_waiting - sync_sh_locker[k] = v - end - wait = sync_upgrade_waiting - self.sync_upgrade_waiting = [] - Thread.critical = false - - for w, v in wait - w.run - end - else - wait = sync_waiting - self.sync_waiting = [] - Thread.critical = false - for w in wait - w.run - end + + if runnable + if sync_upgrade_waiting.size > 0 + th, count = sync_upgrade_waiting.shift + sync_sh_locker[th] = count + th.wakeup + wakeup_threads.push th + else + wait = sync_waiting + self.sync_waiting = [] + for th in wait + th.wakeup + wakeup_threads.push th + end + end end end - - Thread.critical = false + for th in wakeup_threads + th.run + end self end - + def sync_synchronize(mode = EX) - begin + Thread.handle_interrupt(StandardError => :on_blocking) do sync_lock(mode) - yield - ensure - sync_unlock + begin + yield + ensure + sync_unlock + end end end - attr :sync_mode, true - - attr :sync_waiting, true - attr :sync_upgrade_waiting, true - attr :sync_sh_locker, true - attr :sync_ex_locker, true - attr :sync_ex_count, true - + attr_accessor :sync_mode + + attr_accessor :sync_waiting + attr_accessor :sync_upgrade_waiting + attr_accessor :sync_sh_locker + attr_accessor :sync_ex_locker + attr_accessor :sync_ex_count + + def sync_inspect + sync_iv = instance_variables.select{|iv| /^@sync_/ =~ iv.id2name}.collect{|iv| iv.id2name + '=' + instance_eval(iv.id2name).inspect}.join(",") + print "<#{self.class}.extend Sync_m: #{inspect}, <Sync_m: #{sync_iv}>" + end + private def sync_initialize @@ -249,63 +260,70 @@ module Sync_m @sync_sh_locker = Hash.new @sync_ex_locker = nil @sync_ex_count = 0 + + @sync_mutex = Thread::Mutex.new end def initialize(*args) - sync_initialize super + sync_initialize end - + def sync_try_lock_sub(m) case m when SH case sync_mode when UN - self.sync_mode = m - sync_sh_locker[Thread.current] = 1 - ret = true + self.sync_mode = m + sync_sh_locker[Thread.current] = 1 + ret = true when SH - count = 0 unless count = sync_sh_locker[Thread.current] - sync_sh_locker[Thread.current] = count + 1 - ret = true + count = 0 unless count = sync_sh_locker[Thread.current] + sync_sh_locker[Thread.current] = count + 1 + ret = true when EX - # in EX mode, lock will upgrade to EX lock - if sync_ex_locker == Thread.current - self.sync_ex_count = sync_ex_count + 1 - ret = true - else - ret = false - end + # in EX mode, lock will upgrade to EX lock + if sync_ex_locker == Thread.current + self.sync_ex_count = sync_ex_count + 1 + ret = true + else + ret = false + end end when EX if sync_mode == UN or - sync_mode == SH && sync_sh_locker.size == 1 && sync_sh_locker.include?(Thread.current) - self.sync_mode = m - self.sync_ex_locker = Thread.current - self.sync_ex_count = 1 - ret = true + sync_mode == SH && sync_sh_locker.size == 1 && sync_sh_locker.include?(Thread.current) + self.sync_mode = m + self.sync_ex_locker = Thread.current + self.sync_ex_count = 1 + ret = true elsif sync_mode == EX && sync_ex_locker == Thread.current - self.sync_ex_count = sync_ex_count + 1 - ret = true + self.sync_ex_count = sync_ex_count + 1 + ret = true else - ret = false + ret = false end else - Thread.critical = false - Err::LockModeFailer.Fail mode + Err::LockModeFailer.Fail m end return ret end end + +## +# An alias for Sync_m from sync.rb + Synchronizer_m = Sync_m +## +# A class that provides two-phase lock with a counter. See Sync_m for +# details. + class Sync - #Sync_m.extend_class self include Sync_m - - def initialize - super - end - end + +## +# An alias for Sync from sync.rb. See Sync_m for details. + Synchronizer = Sync |
