diff options
Diffstat (limited to 'thread_sync.rb')
| -rw-r--r-- | thread_sync.rb | 537 |
1 files changed, 537 insertions, 0 deletions
diff --git a/thread_sync.rb b/thread_sync.rb new file mode 100644 index 0000000000..398c0d02b7 --- /dev/null +++ b/thread_sync.rb @@ -0,0 +1,537 @@ +# frozen_string_literal: true + +class Thread + # The Thread::Queue class implements multi-producer, multi-consumer + # queues. It is especially useful in threaded programming when + # information must be exchanged safely between multiple threads. The + # Thread::Queue class implements all the required locking semantics. + # + # The class implements FIFO (first in, first out) type of queue. + # In a FIFO queue, the first tasks added are the first retrieved. + # + # Example: + # + # queue = Thread::Queue.new + # + # producer = Thread.new do + # 5.times do |i| + # sleep rand(i) # simulate expense + # queue << i + # puts "#{i} produced" + # end + # end + # + # consumer = Thread.new do + # 5.times do |i| + # value = queue.pop + # sleep rand(i/2) # simulate expense + # puts "consumed #{value}" + # end + # end + # + # consumer.join + class Queue + # Document-method: Queue::new + # + # call-seq: + # Thread::Queue.new -> empty_queue + # Thread::Queue.new(enumerable) -> queue + # + # Creates a new queue instance, optionally using the contents of an +enumerable+ + # for its initial state. + # + # Example: + # + # q = Thread::Queue.new + # #=> #<Thread::Queue:0x00007ff7501110d0> + # q.empty? + # #=> true + # + # q = Thread::Queue.new([1, 2, 3]) + # #=> #<Thread::Queue:0x00007ff7500ec500> + # q.empty? + # #=> false + # q.pop + # #=> 1 + def initialize(enumerable = nil) + Primitive.queue_initialize(enumerable) + end + + # call-seq: + # pop(non_block=false, timeout: nil) + # + # Retrieves data from the queue. + # + # If the queue is empty, the calling thread is suspended until data is pushed + # onto the queue. If +non_block+ is true, the thread isn't suspended, and + # +ThreadError+ is raised. + # + # If +timeout+ seconds have passed and no data is available +nil+ is + # returned. If +timeout+ is +0+ it returns immediately. + def pop(non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + Primitive.rb_queue_pop(non_block, timeout) + end + alias_method :deq, :pop + alias_method :shift, :pop + + undef_method :initialize_copy + + # call-seq: + # push(object) + # enq(object) + # <<(object) + # + # Pushes the given +object+ to the queue. + def push(object) + Primitive.cexpr!('queue_do_push(self, queue_ptr(self), object)') + end + alias_method :enq, :push + alias_method :<<, :push + + # call-seq: + # close + # + # Closes the queue. A closed queue cannot be re-opened. + # + # After the call to close completes, the following are true: + # + # - +closed?+ will return true + # + # - +close+ will be ignored. + # + # - calling enq/push/<< will raise a +ClosedQueueError+. + # + # - when +empty?+ is false, calling deq/pop/shift will return an object + # from the queue as usual. + # - when +empty?+ is true, deq(false) will not suspend the thread and will return nil. + # deq(true) will raise a +ThreadError+. + # + # ClosedQueueError is inherited from StopIteration, so that you can break loop block. + # + # Example: + # + # q = Thread::Queue.new + # Thread.new{ + # while e = q.deq # wait for nil to break loop + # # ... + # end + # } + # q.close + def close + Primitive.cstmt! %{ + if (!queue_closed_p(self)) { + FL_SET_RAW(self, QUEUE_CLOSED); + + wakeup_all(&queue_ptr(self)->waitq); + } + + return self; + } + end + + # call-seq: closed? + # + # Returns +true+ if the queue is closed. + def closed? + Primitive.cexpr!('RBOOL(FL_TEST_RAW(self, QUEUE_CLOSED))') + end + + # call-seq: + # length + # size + # + # Returns the length of the queue. + def length + Primitive.cexpr!('LONG2NUM(queue_ptr(self)->len)') + end + alias_method :size, :length + + # call-seq: empty? + # + # Returns +true+ if the queue is empty. + def empty? + Primitive.cexpr!('RBOOL(queue_ptr(self)->len == 0)') + end + + # Removes all objects from the queue. + def clear + Primitive.cstmt! %{ + queue_clear(queue_ptr(self)); + return self; + } + end + + # call-seq: + # num_waiting + # + # Returns the number of threads waiting on the queue. + def num_waiting + Primitive.cexpr!('INT2NUM(queue_ptr(self)->num_waiting)') + end + + def marshal_dump # :nodoc: + raise TypeError, "can't dump #{self.class}" + end + + # call-seq: + # freeze + # + # The queue can't be frozen, so this method raises an exception: + # Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>) + def freeze + raise TypeError, "cannot freeze #{self}" + end + end + + # This class represents queues of specified size capacity. The push operation + # may be blocked if the capacity is full. + # + # See Thread::Queue for an example of how a Thread::SizedQueue works. + class SizedQueue < Queue + # Document-method: SizedQueue::new + # call-seq: new(max) + # + # Creates a fixed-length queue with a maximum size of +max+. + def initialize(vmax) + Primitive.szqueue_initialize(vmax) + end + + # call-seq: + # pop(non_block=false, timeout: nil) + # + # Retrieves data from the queue. + # + # If the queue is empty, the calling thread is suspended until data is + # pushed onto the queue. If +non_block+ is true, the thread isn't + # suspended, and +ThreadError+ is raised. + # + # If +timeout+ seconds have passed and no data is available +nil+ is + # returned. If +timeout+ is +0+ it returns immediately. + def pop(non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + Primitive.rb_szqueue_pop(non_block, timeout) + end + alias_method :deq, :pop + alias_method :shift, :pop + + # call-seq: + # push(object, non_block=false, timeout: nil) + # enq(object, non_block=false, timeout: nil) + # <<(object) + # + # Pushes +object+ to the queue. + # + # If there is no space left in the queue, waits until space becomes + # available, unless +non_block+ is true. If +non_block+ is true, the + # thread isn't suspended, and +ThreadError+ is raised. + # + # If +timeout+ seconds have passed and no space is available +nil+ is + # returned. If +timeout+ is +0+ it returns immediately. + # Otherwise it returns +self+. + def push(object, non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + Primitive.rb_szqueue_push(object, non_block, timeout) + end + alias_method :enq, :push + alias_method :<<, :push + + # call-seq: + # close + # + # Similar to Thread::Queue#close. + # + # The difference is behavior with waiting enqueuing threads. + # + # If there are waiting enqueuing threads, they are interrupted by + # raising ClosedQueueError('queue closed'). + def close + Primitive.cstmt! %{ + if (!queue_closed_p(self)) { + struct rb_szqueue *sq = szqueue_ptr(self); + + FL_SET(self, QUEUE_CLOSED); + wakeup_all(szqueue_waitq(sq)); + wakeup_all(szqueue_pushq(sq)); + } + return self; + } + end + + # Removes all objects from the queue. + def clear + Primitive.cstmt! %{ + struct rb_szqueue *sq = szqueue_ptr(self); + queue_clear(&sq->q); + wakeup_all(szqueue_pushq(sq)); + return self; + } + end + + # Returns the number of threads waiting on the queue. + def num_waiting + Primitive.cstmt! %{ + struct rb_szqueue *sq = szqueue_ptr(self); + return INT2NUM(sq->q.num_waiting + sq->num_waiting_push); + } + end + + # Returns the maximum size of the queue. + def max + Primitive.cexpr!('LONG2NUM(szqueue_ptr(self)->max)') + end + + # call-seq: max=(number) + # + # Sets the maximum size of the queue to the given +number+. + def max=(vmax) + Primitive.cstmt! %{ + long max = NUM2LONG(vmax); + if (max <= 0) { + rb_raise(rb_eArgError, "queue size must be positive"); + } + + long diff = 0; + struct rb_szqueue *sq = szqueue_ptr(self); + + if (max > sq->max) { + diff = max - sq->max; + } + sq->max = max; + sync_wakeup(szqueue_pushq(sq), diff); + return vmax; + } + end + end + + # Thread::Mutex implements a simple semaphore that can be used to + # coordinate access to shared data from multiple concurrent threads. + # + # Example: + # + # semaphore = Thread::Mutex.new + # + # a = Thread.new { + # semaphore.synchronize { + # # access shared resource + # } + # } + # + # b = Thread.new { + # semaphore.synchronize { + # # access shared resource + # } + # } + class Mutex + # call-seq: + # Thread::Mutex.new -> mutex + # + # Creates a new Mutex + def initialize + end + + # call-seq: + # mutex.locked? -> true or false + # + # Returns +true+ if this lock is currently held by some thread. + def locked? + Primitive.cexpr! %q{ RBOOL(mutex_locked_p(mutex_ptr(self))) } + end + + # call-seq: + # mutex.owned? -> true or false + # + # Returns +true+ if this lock is currently held by current thread. + def owned? + Primitive.rb_mut_owned_p + end + + # call-seq: + # mutex.lock -> self + # + # Attempts to grab the lock and waits if it isn't available. + # Raises +ThreadError+ if +mutex+ was locked by the current thread. + def lock + Primitive.rb_mut_lock + end + + # call-seq: + # mutex.try_lock -> true or false + # + # Attempts to obtain the lock and returns immediately. Returns +true+ if the + # lock was granted. + def try_lock + Primitive.rb_mut_trylock + end + + # call-seq: + # mutex.lock -> self + # + # Attempts to grab the lock and waits if it isn't available. + # Raises +ThreadError+ if +mutex+ was locked by the current thread. + def unlock + Primitive.rb_mut_unlock + end + + # call-seq: + # mutex.synchronize { ... } -> result of the block + # + # Obtains a lock, runs the block, and releases the lock when the block + # completes. See the example under Thread::Mutex. + def synchronize + raise ThreadError, "must be called with a block" unless defined?(yield) + + Primitive.rb_mut_synchronize + end + + # call-seq: + # mutex.sleep(timeout = nil) -> number or nil + # + # Releases the lock and sleeps +timeout+ seconds if it is given and + # non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by + # the current thread. + # + # When the thread is next woken up, it will attempt to reacquire + # the lock. + # + # Note that this method can wakeup without explicit Thread#wakeup call. + # For example, receiving signal and so on. + # + # Returns the slept time in seconds if woken up, or +nil+ if timed out. + def sleep(timeout = nil) + Primitive.rb_mut_sleep(timeout) + end + end + + # ConditionVariable objects augment class Mutex. Using condition variables, + # it is possible to suspend while in the middle of a critical section until a + # condition is met, such as a resource becomes available. + # + # Due to non-deterministic scheduling and spurious wake-ups, users of + # condition variables should always use a separate boolean predicate (such as + # reading from a boolean variable) to check if the condition is actually met + # before starting to wait, and should wait in a loop, re-checking the + # condition every time the ConditionVariable is waken up. The idiomatic way + # of using condition variables is calling the +wait+ method in an +until+ + # loop with the predicate as the loop condition. + # + # condvar.wait(mutex) until condition_is_met + # + # In the example below, we use the boolean variable +resource_available+ + # (which is protected by +mutex+) to indicate the availability of the + # resource, and use +condvar+ to wait for that variable to become true. Note + # that: + # + # 1. Thread +b+ may be scheduled before thread +a1+ and +a2+, and may run so + # fast that it have already made the resource available before either + # +a1+ or +a2+ starts. Therefore, +a1+ and +a2+ should check if + # +resource_available+ is already true before starting to wait. + # 2. The +wait+ method may spuriously wake up without signalling. Therefore, + # thread +a1+ and +a2+ should recheck +resource_available+ after the + # +wait+ method returns, and go back to wait if the condition is not + # actually met. + # 3. It is possible that thread +a2+ starts right after thread +a1+ is waken + # up by +b+. Thread +a2+ may have acquired the +mutex+ and consumed the + # resource before thread +a1+ acquires the +mutex+. This necessitates + # rechecking after +wait+, too. + # + # Example: + # + # mutex = Thread::Mutex.new + # + # resource_available = false + # condvar = Thread::ConditionVariable.new + # + # a1 = Thread.new { + # # Thread 'a1' waits for the resource to become available and consumes + # # the resource. + # mutex.synchronize { + # condvar.wait(mutex) until resource_available + # # After the loop, 'resource_available' is guaranteed to be true. + # + # resource_available = false + # puts "a1 consumed the resource" + # } + # } + # + # a2 = Thread.new { + # # Thread 'a2' behaves like 'a1'. + # mutex.synchronize { + # condvar.wait(mutex) until resource_available + # resource_available = false + # puts "a2 consumed the resource" + # } + # } + # + # b = Thread.new { + # # Thread 'b' periodically makes the resource available. + # loop { + # mutex.synchronize { + # resource_available = true + # + # # Notify one waiting thread if any. It is possible that neither + # # 'a1' nor 'a2 is waiting on 'condvar' at this moment. That's OK. + # condvar.signal + # } + # sleep 1 + # } + # } + # + # # Eventually both 'a1' and 'a2' will have their resources, albeit in an + # # unspecified order. + # [a1, a2].each {|th| th.join} + class ConditionVariable + # Document-method: ConditionVariable::new + # + # Creates a new condition variable instance. + def initialize + end + + undef_method :initialize_copy + + # :nodoc: + def marshal_dump + raise TypeError, "can't dump #{self.class}" + end + + # Document-method: Thread::ConditionVariable#signal + # + # Wakes up the first thread in line waiting for this lock. + def signal + Primitive.rb_condvar_signal + end + + # Document-method: Thread::ConditionVariable#broadcast + # + # Wakes up all threads waiting for this lock. + def broadcast + Primitive.rb_condvar_broadcast + end + + # Document-method: Thread::ConditionVariable#wait + # call-seq: wait(mutex, timeout=nil) + # + # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. + # + # If +timeout+ is given, this method returns after +timeout+ seconds passed, + # even if no other thread doesn't signal. + # + # This method may wake up spuriously due to underlying implementation details. + # + # Returns the slept result on +mutex+. + def wait(mutex, timeout=nil) + Primitive.rb_condvar_wait(mutex, timeout) + end + end +end + +Mutex = Thread::Mutex +ConditionVariable = Thread::ConditionVariable +Queue = Thread::Queue +SizedQueue = Thread::SizedQueue |
