diff options
Diffstat (limited to 'thread_sync.c')
-rw-r--r-- | thread_sync.c | 43 |
1 files changed, 40 insertions, 3 deletions
diff --git a/thread_sync.c b/thread_sync.c index c7055578e6..396cceb2ce 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -4,6 +4,14 @@ static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; static VALUE rb_eClosedQueueError; +/* + * keep these globally so we can walk and reinitialize them at fork + * in the child process + */ +static LIST_HEAD(szqueue_list); +static LIST_HEAD(queue_list); +static LIST_HEAD(condvar_list); + /* sync_waiter is always on-stack */ struct sync_waiter { rb_thread_t *th; @@ -556,6 +564,7 @@ void rb_mutex_allow_trap(VALUE self, int val) #define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq) PACKED_STRUCT_UNALIGNED(struct rb_queue { struct list_head waitq; + rb_serial_t fork_gen; const VALUE que; int num_waiting; }); @@ -601,12 +610,29 @@ queue_alloc(VALUE klass) return obj; } +static int +queue_fork_check(struct rb_queue *q) +{ + rb_serial_t fork_gen = GET_VM()->fork_gen; + + if (q->fork_gen == fork_gen) { + return 0; + } + /* forked children can't reach into parent thread stacks */ + q->fork_gen = fork_gen; + list_head_init(queue_waitq(q)); + q->num_waiting = 0; + return 1; +} + static struct rb_queue * queue_ptr(VALUE obj) { struct rb_queue *q; TypedData_Get_Struct(obj, struct rb_queue, &queue_data_type, q); + queue_fork_check(q); + return q; } @@ -649,6 +675,11 @@ szqueue_ptr(VALUE obj) struct rb_szqueue *sq; TypedData_Get_Struct(obj, struct rb_szqueue, &szqueue_data_type, sq); + if (queue_fork_check(&sq->q)) { + list_head_init(szqueue_pushq(sq)); + sq->num_waiting_push = 0; + } + return sq; } @@ -885,7 +916,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block) list_add_tail(&qw.as.q->waitq, &qw.w.node); qw.as.q->num_waiting++; - rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw); + rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw); } } @@ -1127,7 +1158,7 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) list_add_tail(pushq, &qw.w.node); sq->num_waiting_push++; - rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw); + rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw); } } @@ -1228,9 +1259,9 @@ rb_szqueue_empty_p(VALUE self) /* ConditionalVariable */ -/* TODO: maybe this can be IMEMO */ struct rb_condvar { struct list_head waitq; + rb_serial_t fork_gen; }; /* @@ -1277,9 +1308,15 @@ static struct rb_condvar * condvar_ptr(VALUE self) { struct rb_condvar *cv; + rb_serial_t fork_gen = GET_VM()->fork_gen; TypedData_Get_Struct(self, struct rb_condvar, &cv_data_type, cv); + /* forked children can't reach into parent thread stacks */ + if (cv->fork_gen != fork_gen) { + list_head_init(&cv->waitq); + } + return cv; } |