summaryrefslogtreecommitdiff
path: root/lib/sync.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sync.rb')
-rw-r--r--lib/sync.rb346
1 files changed, 182 insertions, 164 deletions
diff --git a/lib/sync.rb b/lib/sync.rb
index 03b161c274..40a48ac985 100644
--- a/lib/sync.rb
+++ b/lib/sync.rb
@@ -1,9 +1,9 @@
+# frozen_string_literal: false
#
# sync.rb - 2 phase lock with counter
-# $Release Version: 1.0$
-# $Revision: 1.4 $
-# $Date: 2001/06/06 14:19:33 $
-# by Keiju ISHITSUKA(keiju@ishitsuka.com)
+# $Release Version: 1.0$
+# $Revision$
+# by Keiju ISHITSUKA(keiju@ishitsuka.com)
#
# --
# Sync_m, Synchronizer_m
@@ -11,8 +11,8 @@
# obj.extend(Sync_m)
# or
# class Foo
-# include Sync_m
-# :
+# include Sync_m
+# :
# end
#
# Sync_m#sync_mode
@@ -23,9 +23,8 @@
# Sync_m#sync_lock, lock
# Sync_m#sync_unlock, unlock
#
-# Sync, Synchronicer:
-# include Sync_m
-# Usage:
+# Sync, Synchronizer:
+# Usage:
# sync = Sync.new
#
# Sync#mode
@@ -36,45 +35,46 @@
# Sync#lock(mode) -- mode = :EX, :SH, :UN
# Sync#unlock
# Sync#synchronize(mode) {...}
-#
+#
#
unless defined? Thread
- fail "Thread not available for this ruby interpreter"
+ raise "Thread not available for this ruby interpreter"
end
+##
+# A module that provides a two-phase lock with a counter.
+
module Sync_m
- RCS_ID='-$Header: /src/ruby/lib/sync.rb,v 1.4 2001/06/06 14:19:33 keiju Exp $-'
-
# lock mode
UN = :UN
SH = :SH
EX = :EX
-
+
# exceptions
class Err < StandardError
def Err.Fail(*opt)
fail self, sprintf(self::Message, *opt)
end
-
+
class UnknownLocker < Err
Message = "Thread(%s) not locked."
def UnknownLocker.Fail(th)
- super(th.inspect)
+ super(th.inspect)
end
end
-
+
class LockModeFailer < Err
Message = "Unknown lock mode(%s)"
def LockModeFailer.Fail(mode)
- if mode.id2name
- mode = id2name
- end
- super(mode)
+ if mode.id2name
+ mode = mode.id2name
+ end
+ super(mode)
end
end
end
-
+
def Sync_m.define_aliases(cl)
cl.module_eval %q{
alias locked? sync_locked?
@@ -86,30 +86,29 @@ module Sync_m
alias synchronize sync_synchronize
}
end
-
+
def Sync_m.append_features(cl)
super
- unless cl.instance_of?(Module)
- # do nothing for Modules
- # make aliases and include the proper module.
- define_aliases(cl)
- end
+ # do nothing for Modules
+ # make aliases for Classes.
+ define_aliases(cl) unless cl.instance_of?(Module)
+ self
end
-
+
def Sync_m.extend_object(obj)
super
- obj.sync_extended
+ obj.sync_extend
end
- def sync_extended
+ def sync_extend
unless (defined? locked? and
- defined? shared? and
- defined? exclusive? and
- defined? lock and
- defined? unlock and
- defined? try_lock and
- defined? synchronize)
- Sync_m.define_aliases(class<<self;self;end)
+ defined? shared? and
+ defined? exclusive? and
+ defined? lock and
+ defined? unlock and
+ defined? try_lock and
+ defined? synchronize)
+ Sync_m.define_aliases(singleton_class)
end
sync_initialize
end
@@ -118,128 +117,140 @@ module Sync_m
def sync_locked?
sync_mode != UN
end
-
+
def sync_shared?
sync_mode == SH
end
-
+
def sync_exclusive?
sync_mode == EX
end
-
+
# locking methods.
def sync_try_lock(mode = EX)
- return unlock if sync_mode == UN
-
- Thread.critical = true
- ret = sync_try_lock_sub(sync_mode)
- Thread.critical = false
- ret
+ return unlock if mode == UN
+ @sync_mutex.synchronize do
+ sync_try_lock_sub(mode)
+ end
end
-
+
def sync_lock(m = EX)
return unlock if m == UN
-
- until (Thread.critical = true; sync_try_lock_sub(m))
- if sync_sh_locker[Thread.current]
- sync_upgrade_waiting.push [Thread.current, sync_sh_locker[Thread.current]]
- sync_sh_locker.delete(Thread.current)
- else
- sync_waiting.push Thread.current
+ Thread.handle_interrupt(StandardError => :on_blocking) do
+ while true
+ @sync_mutex.synchronize do
+ begin
+ if sync_try_lock_sub(m)
+ return self
+ else
+ if sync_sh_locker[Thread.current]
+ sync_upgrade_waiting.push [Thread.current, sync_sh_locker[Thread.current]]
+ sync_sh_locker.delete(Thread.current)
+ else
+ unless sync_waiting.include?(Thread.current) || sync_upgrade_waiting.reverse_each.any?{|w| w.first == Thread.current }
+ sync_waiting.push Thread.current
+ end
+ end
+ @sync_mutex.sleep
+ end
+ ensure
+ sync_waiting.delete(Thread.current)
+ end
+ end
end
- Thread.stop
end
- Thread.critical = false
self
end
-
+
def sync_unlock(m = EX)
- Thread.critical = true
- if sync_mode == UN
- Thread.critical = false
- Err::UnknownLocker.Fail(Thread.current)
- end
-
- m = sync_mode if m == EX and sync_mode == SH
-
- runnable = false
- case m
- when UN
- Thread.critical = false
- Err::UnknownLocker.Fail(Thread.current)
-
- when EX
- if sync_ex_locker == Thread.current
- if (self.sync_ex_count = sync_ex_count - 1) == 0
- self.sync_ex_locker = nil
- if sync_sh_locker.include?(Thread.current)
- self.sync_mode = SH
- else
- self.sync_mode = UN
- end
- runnable = true
- end
- else
- Err::UnknownLocker.Fail(Thread.current)
+ wakeup_threads = []
+ @sync_mutex.synchronize do
+ if sync_mode == UN
+ Err::UnknownLocker.Fail(Thread.current)
end
-
- when SH
- if (count = sync_sh_locker[Thread.current]).nil?
- Err::UnknownLocker.Fail(Thread.current)
- else
- if (sync_sh_locker[Thread.current] = count - 1) == 0
- sync_sh_locker.delete(Thread.current)
- if sync_sh_locker.empty? and sync_ex_count == 0
- self.sync_mode = UN
- runnable = true
- end
- end
+
+ m = sync_mode if m == EX and sync_mode == SH
+
+ runnable = false
+ case m
+ when UN
+ Err::UnknownLocker.Fail(Thread.current)
+
+ when EX
+ if sync_ex_locker == Thread.current
+ if (self.sync_ex_count = sync_ex_count - 1) == 0
+ self.sync_ex_locker = nil
+ if sync_sh_locker.include?(Thread.current)
+ self.sync_mode = SH
+ else
+ self.sync_mode = UN
+ end
+ runnable = true
+ end
+ else
+ Err::UnknownLocker.Fail(Thread.current)
+ end
+
+ when SH
+ if (count = sync_sh_locker[Thread.current]).nil?
+ Err::UnknownLocker.Fail(Thread.current)
+ else
+ if (sync_sh_locker[Thread.current] = count - 1) == 0
+ sync_sh_locker.delete(Thread.current)
+ if sync_sh_locker.empty? and sync_ex_count == 0
+ self.sync_mode = UN
+ runnable = true
+ end
+ end
+ end
end
- end
-
- if runnable
- if sync_upgrade_waiting.size > 0
- for k, v in sync_upgrade_waiting
- sync_sh_locker[k] = v
- end
- wait = sync_upgrade_waiting
- self.sync_upgrade_waiting = []
- Thread.critical = false
-
- for w, v in wait
- w.run
- end
- else
- wait = sync_waiting
- self.sync_waiting = []
- Thread.critical = false
- for w in wait
- w.run
- end
+
+ if runnable
+ if sync_upgrade_waiting.size > 0
+ th, count = sync_upgrade_waiting.shift
+ sync_sh_locker[th] = count
+ th.wakeup
+ wakeup_threads.push th
+ else
+ wait = sync_waiting
+ self.sync_waiting = []
+ for th in wait
+ th.wakeup
+ wakeup_threads.push th
+ end
+ end
end
end
-
- Thread.critical = false
+ for th in wakeup_threads
+ th.run
+ end
self
end
-
+
def sync_synchronize(mode = EX)
- begin
+ Thread.handle_interrupt(StandardError => :on_blocking) do
sync_lock(mode)
- yield
- ensure
- sync_unlock
+ begin
+ yield
+ ensure
+ sync_unlock
+ end
end
end
- attr :sync_mode, true
-
- attr :sync_waiting, true
- attr :sync_upgrade_waiting, true
- attr :sync_sh_locker, true
- attr :sync_ex_locker, true
- attr :sync_ex_count, true
-
+ attr_accessor :sync_mode
+
+ attr_accessor :sync_waiting
+ attr_accessor :sync_upgrade_waiting
+ attr_accessor :sync_sh_locker
+ attr_accessor :sync_ex_locker
+ attr_accessor :sync_ex_count
+
+ def sync_inspect
+ sync_iv = instance_variables.select{|iv| /^@sync_/ =~ iv.id2name}.collect{|iv| iv.id2name + '=' + instance_eval(iv.id2name).inspect}.join(",")
+ print "<#{self.class}.extend Sync_m: #{inspect}, <Sync_m: #{sync_iv}>"
+ end
+
private
def sync_initialize
@@ -249,63 +260,70 @@ module Sync_m
@sync_sh_locker = Hash.new
@sync_ex_locker = nil
@sync_ex_count = 0
+
+ @sync_mutex = Thread::Mutex.new
end
def initialize(*args)
- sync_initialize
super
+ sync_initialize
end
-
+
def sync_try_lock_sub(m)
case m
when SH
case sync_mode
when UN
- self.sync_mode = m
- sync_sh_locker[Thread.current] = 1
- ret = true
+ self.sync_mode = m
+ sync_sh_locker[Thread.current] = 1
+ ret = true
when SH
- count = 0 unless count = sync_sh_locker[Thread.current]
- sync_sh_locker[Thread.current] = count + 1
- ret = true
+ count = 0 unless count = sync_sh_locker[Thread.current]
+ sync_sh_locker[Thread.current] = count + 1
+ ret = true
when EX
- # in EX mode, lock will upgrade to EX lock
- if sync_ex_locker == Thread.current
- self.sync_ex_count = sync_ex_count + 1
- ret = true
- else
- ret = false
- end
+ # in EX mode, lock will upgrade to EX lock
+ if sync_ex_locker == Thread.current
+ self.sync_ex_count = sync_ex_count + 1
+ ret = true
+ else
+ ret = false
+ end
end
when EX
if sync_mode == UN or
- sync_mode == SH && sync_sh_locker.size == 1 && sync_sh_locker.include?(Thread.current)
- self.sync_mode = m
- self.sync_ex_locker = Thread.current
- self.sync_ex_count = 1
- ret = true
+ sync_mode == SH && sync_sh_locker.size == 1 && sync_sh_locker.include?(Thread.current)
+ self.sync_mode = m
+ self.sync_ex_locker = Thread.current
+ self.sync_ex_count = 1
+ ret = true
elsif sync_mode == EX && sync_ex_locker == Thread.current
- self.sync_ex_count = sync_ex_count + 1
- ret = true
+ self.sync_ex_count = sync_ex_count + 1
+ ret = true
else
- ret = false
+ ret = false
end
else
- Thread.critical = false
- Err::LockModeFailer.Fail mode
+ Err::LockModeFailer.Fail m
end
return ret
end
end
+
+##
+# An alias for Sync_m from sync.rb
+
Synchronizer_m = Sync_m
+##
+# A class that provides two-phase lock with a counter. See Sync_m for
+# details.
+
class Sync
- #Sync_m.extend_class self
include Sync_m
-
- def initialize
- super
- end
-
end
+
+##
+# An alias for Sync from sync.rb. See Sync_m for details.
+
Synchronizer = Sync