summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c204
1 files changed, 120 insertions, 84 deletions
diff --git a/thread_sync.c b/thread_sync.c
index 74a26bd0ac..ae69cb4a6e 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -41,6 +41,8 @@ struct queue_sleep_arg {
static void
sync_wakeup(struct ccan_list_head *head, long max)
{
+ RUBY_DEBUG_LOG("max:%ld", max);
+
struct sync_waiter *cur = 0, *next;
ccan_list_for_each_safe(head, cur, next, node) {
@@ -51,6 +53,7 @@ sync_wakeup(struct ccan_list_head *head, long max)
rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
}
else {
+ RUBY_DEBUG_LOG("target_th:%u", rb_th_serial(cur->th));
rb_threadptr_interrupt(cur->th);
cur->th->status = THREAD_RUNNABLE;
}
@@ -251,6 +254,8 @@ rb_mutex_trylock(VALUE self)
rb_mutex_t *mutex = mutex_ptr(self);
if (mutex->fiber == 0) {
+ RUBY_DEBUG_LOG("%p ok", mutex);
+
rb_fiber_t *fiber = GET_EC()->fiber_ptr;
rb_thread_t *th = GET_THREAD();
mutex->fiber = fiber;
@@ -258,17 +263,12 @@ rb_mutex_trylock(VALUE self)
mutex_locked(th, self);
return Qtrue;
}
-
- return Qfalse;
+ else {
+ RUBY_DEBUG_LOG("%p ng", mutex);
+ return Qfalse;
+ }
}
-/*
- * At maximum, only one thread can use cond_timedwait and watch deadlock
- * periodically. Multiple polling thread (i.e. concurrent deadlock check)
- * introduces new race conditions. [Bug #6278] [ruby-core:44275]
- */
-static const rb_thread_t *patrol_thread = NULL;
-
static VALUE
mutex_owned_p(rb_fiber_t *fiber, rb_mutex_t *mutex)
{
@@ -290,6 +290,8 @@ delete_from_waitq(VALUE value)
return Qnil;
}
+static inline rb_atomic_t threadptr_get_interrupts(rb_thread_t *th);
+
static VALUE
do_mutex_lock(VALUE self, int interruptible_p)
{
@@ -297,6 +299,7 @@ do_mutex_lock(VALUE self, int interruptible_p)
rb_thread_t *th = ec->thread_ptr;
rb_fiber_t *fiber = ec->fiber_ptr;
rb_mutex_t *mutex = mutex_ptr(self);
+ rb_atomic_t saved_ints = 0;
/* When running trap handler */
if (!FL_TEST_RAW(self, MUTEX_ALLOW_TRAP) &&
@@ -310,6 +313,8 @@ do_mutex_lock(VALUE self, int interruptible_p)
}
while (mutex->fiber != fiber) {
+ VM_ASSERT(mutex->fiber != NULL);
+
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
struct sync_waiter sync_waiter = {
@@ -331,51 +336,47 @@ do_mutex_lock(VALUE self, int interruptible_p)
rb_raise(rb_eThreadError, "deadlock; lock already owned by another fiber belonging to the same thread");
}
- enum rb_thread_status prev_status = th->status;
- rb_hrtime_t *timeout = 0;
- rb_hrtime_t rel = rb_msec2hrtime(100);
-
- th->status = THREAD_STOPPED_FOREVER;
- th->locking_mutex = self;
- rb_ractor_sleeper_threads_inc(th->ractor);
- /*
- * Carefully! while some contended threads are in native_sleep(),
- * ractor->sleeper is unstable value. we have to avoid both deadlock
- * and busy loop.
- */
- if ((rb_ractor_living_thread_num(th->ractor) == rb_ractor_sleeper_thread_num(th->ractor)) &&
- !patrol_thread) {
- timeout = &rel;
- patrol_thread = th;
- }
-
struct sync_waiter sync_waiter = {
.self = self,
.th = th,
- .fiber = nonblocking_fiber(fiber)
+ .fiber = nonblocking_fiber(fiber),
};
- ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
+ RUBY_DEBUG_LOG("%p wait", mutex);
+
+ // similar code with `sleep_forever`, but
+ // sleep_forever(SLEEP_DEADLOCKABLE) raises an exception.
+ // Ensure clause is needed like but `rb_ensure` a bit slow.
+ //
+ // begin
+ // sleep_forever(th, SLEEP_DEADLOCKABLE);
+ // ensure
+ // ccan_list_del(&sync_waiter.node);
+ // end
+ enum rb_thread_status prev_status = th->status;
+ th->status = THREAD_STOPPED_FOREVER;
+ rb_ractor_sleeper_threads_inc(th->ractor);
+ rb_check_deadlock(th->ractor);
- native_sleep(th, timeout); /* release GVL */
+ th->locking_mutex = self;
+ ccan_list_add_tail(&mutex->waitq, &sync_waiter.node);
+ {
+ native_sleep(th, NULL);
+ }
ccan_list_del(&sync_waiter.node);
+ // unlocked by another thread while sleeping
if (!mutex->fiber) {
mutex->fiber = fiber;
}
- if (patrol_thread == th)
- patrol_thread = NULL;
-
- th->locking_mutex = Qfalse;
- if (mutex->fiber && timeout && !RUBY_VM_INTERRUPTED(th->ec)) {
- rb_check_deadlock(th->ractor);
- }
- if (th->status == THREAD_STOPPED_FOREVER) {
- th->status = prev_status;
- }
rb_ractor_sleeper_threads_dec(th->ractor);
+ th->status = prev_status;
+ th->locking_mutex = Qfalse;
+ th->locking_mutex = Qfalse;
+
+ RUBY_DEBUG_LOG("%p wakeup", mutex);
}
if (interruptible_p) {
@@ -387,11 +388,27 @@ do_mutex_lock(VALUE self, int interruptible_p)
mutex->fiber = fiber;
}
}
+ else {
+ // clear interrupt information
+ if (RUBY_VM_INTERRUPTED(th->ec)) {
+ // reset interrupts
+ if (saved_ints == 0) {
+ saved_ints = threadptr_get_interrupts(th);
+ }
+ else {
+ // ignore additional interrupts
+ threadptr_get_interrupts(th);
+ }
+ }
+ }
}
+ if (saved_ints) th->ec->interrupt_flag = saved_ints;
if (mutex->fiber == fiber) mutex_locked(th, self);
}
+ RUBY_DEBUG_LOG("%p locked", mutex);
+
// assertion
if (mutex_owned_p(fiber, mutex) == Qfalse) rb_bug("do_mutex_lock: mutex is not owned.");
@@ -435,46 +452,46 @@ rb_mutex_owned_p(VALUE self)
static const char *
rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th, rb_fiber_t *fiber)
{
- const char *err = NULL;
+ RUBY_DEBUG_LOG("%p", mutex);
if (mutex->fiber == 0) {
- err = "Attempt to unlock a mutex which is not locked";
+ return "Attempt to unlock a mutex which is not locked";
}
else if (mutex->fiber != fiber) {
- err = "Attempt to unlock a mutex which is locked by another thread/fiber";
+ return "Attempt to unlock a mutex which is locked by another thread/fiber";
}
- else {
- struct sync_waiter *cur = 0, *next;
- mutex->fiber = 0;
- ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
- ccan_list_del_init(&cur->node);
+ struct sync_waiter *cur = 0, *next;
- if (cur->th->scheduler != Qnil && cur->fiber) {
- rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
- goto found;
- }
- else {
- switch (cur->th->status) {
- case THREAD_RUNNABLE: /* from someone else calling Thread#run */
- case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
- rb_threadptr_interrupt(cur->th);
- goto found;
- case THREAD_STOPPED: /* probably impossible */
- rb_bug("unexpected THREAD_STOPPED");
- case THREAD_KILLED:
- /* not sure about this, possible in exit GC? */
- rb_bug("unexpected THREAD_KILLED");
- continue;
- }
+ mutex->fiber = 0;
+ thread_mutex_remove(th, mutex);
+
+ ccan_list_for_each_safe(&mutex->waitq, cur, next, node) {
+ ccan_list_del_init(&cur->node);
+
+ if (cur->th->scheduler != Qnil && cur->fiber) {
+ rb_fiber_scheduler_unblock(cur->th->scheduler, cur->self, rb_fiberptr_self(cur->fiber));
+ return NULL;
+ }
+ else {
+ switch (cur->th->status) {
+ case THREAD_RUNNABLE: /* from someone else calling Thread#run */
+ case THREAD_STOPPED_FOREVER: /* likely (rb_mutex_lock) */
+ RUBY_DEBUG_LOG("wakeup th:%u", rb_th_serial(cur->th));
+ rb_threadptr_interrupt(cur->th);
+ return NULL;
+ case THREAD_STOPPED: /* probably impossible */
+ rb_bug("unexpected THREAD_STOPPED");
+ case THREAD_KILLED:
+ /* not sure about this, possible in exit GC? */
+ rb_bug("unexpected THREAD_KILLED");
+ continue;
}
}
-
- found:
- thread_mutex_remove(th, mutex);
}
- return err;
+ // We did not find any threads to wake up, so we can just return with no error:
+ return NULL;
}
/*
@@ -652,22 +669,24 @@ rb_mutex_allow_trap(VALUE self, int val)
#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
#define queue_list(q) UNALIGNED_MEMBER_PTR(q, que)
-PACKED_STRUCT_UNALIGNED(struct rb_queue {
+RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
+struct rb_queue {
struct ccan_list_head waitq;
rb_serial_t fork_gen;
const VALUE que;
int num_waiting;
-});
+} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
#define szqueue_list(sq) UNALIGNED_MEMBER_PTR(sq, q.que)
#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
-PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
+RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_BEGIN()
+struct rb_szqueue {
struct rb_queue q;
int num_waiting_push;
struct ccan_list_head pushq;
long max;
-});
+} RBIMPL_ATTR_PACKED_STRUCT_UNALIGNED_END();
static void
queue_mark(void *ptr)
@@ -835,7 +854,7 @@ raise_closed_queue_error(VALUE self)
static VALUE
queue_closed_result(VALUE self, struct rb_queue *q)
{
- assert(queue_length(self, q) == 0);
+ RUBY_ASSERT(queue_length(self, q) == 0);
return Qnil;
}
@@ -847,8 +866,8 @@ queue_closed_result(VALUE self, struct rb_queue *q)
* information must be exchanged safely between multiple threads. The
* Thread::Queue class implements all the required locking semantics.
*
- * The class implements FIFO type of queue. In a FIFO queue, the first
- * tasks added are the first retrieved.
+ * The class implements FIFO (first in, first out) type of queue.
+ * In a FIFO queue, the first tasks added are the first retrieved.
*
* Example:
*
@@ -856,17 +875,17 @@ queue_closed_result(VALUE self, struct rb_queue *q)
*
* producer = Thread.new do
* 5.times do |i|
- * sleep rand(i) # simulate expense
- * queue << i
- * puts "#{i} produced"
+ * sleep rand(i) # simulate expense
+ * queue << i
+ * puts "#{i} produced"
* end
* end
*
* consumer = Thread.new do
* 5.times do |i|
- * value = queue.pop
- * sleep rand(i/2) # simulate expense
- * puts "consumed #{value}"
+ * value = queue.pop
+ * sleep rand(i/2) # simulate expense
+ * puts "consumed #{value}"
* end
* end
*
@@ -1062,8 +1081,8 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
else {
rb_execution_context_t *ec = GET_EC();
- assert(RARRAY_LEN(q->que) == 0);
- assert(queue_closed_p(self) == 0);
+ RUBY_ASSERT(RARRAY_LEN(q->que) == 0);
+ RUBY_ASSERT(queue_closed_p(self) == 0);
struct queue_waiter queue_waiter = {
.w = {.self = self, .th = ec->thread_ptr, .fiber = nonblocking_fiber(ec->fiber_ptr)},
@@ -1139,6 +1158,22 @@ rb_queue_length(VALUE self)
return LONG2NUM(queue_length(self, queue_ptr(self)));
}
+NORETURN(static VALUE rb_queue_freeze(VALUE self));
+/*
+ * call-seq:
+ * freeze
+ *
+ * The queue can't be frozen, so this method raises an exception:
+ * Thread::Queue.new.freeze # Raises TypeError (cannot freeze #<Thread::Queue:0x...>)
+ *
+ */
+static VALUE
+rb_queue_freeze(VALUE self)
+{
+ rb_raise(rb_eTypeError, "cannot freeze " "%+"PRIsVALUE, self);
+ UNREACHABLE_RETURN(self);
+}
+
/*
* Document-method: Thread::Queue#num_waiting
*
@@ -1600,6 +1635,7 @@ Init_thread_sync(void)
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
+ rb_define_method(rb_cQueue, "freeze", rb_queue_freeze, 0);
rb_define_alias(rb_cQueue, "enq", "push");
rb_define_alias(rb_cQueue, "<<", "push");