From 28144433b2f951279552b39bc358769a5267e26a Mon Sep 17 00:00:00 2001 From: ko1 Date: Wed, 18 Jul 2012 05:46:40 +0000 Subject: * thread.c (rb_threadptr_async_errinfo_*): manage async errors queue. Async events such as an exception throwed by Thread#raise, Thread#kill and thread termination (after main thread termination) will be queued to th->async_errinfo_queue. - clear: clear the queue. - enque: enque err object into queue. - deque: deque err object from queue. - active_p: return 1 if the queue should be checked. rb_thread_t#thrown_errinfo was removed. * vm_core.h: add declarations of rb_threadptr_async_errinfo_*. remove rb_thread_t#thrown_errinfo field and add rb_thread_t#async_errinfo_queue (queue body: Array), rb_thread_t#async_errinfo_queue_checked (flag), rb_thread_t#async_errinfo_mask_stack(Array, not used yet). * vm.c (rb_thread_mark): fix a mark function. * cont.c (rb_fiber_start): enque an error. * process.c (after_fork): clear async errinfo queue. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36430 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 24 +++++++++++ cont.c | 6 +-- process.c | 2 +- thread.c | 143 +++++++++++++++++++++++++++++++++++++++++++++----------------- vm.c | 3 +- vm_core.h | 11 ++++- 6 files changed, 145 insertions(+), 44 deletions(-) diff --git a/ChangeLog b/ChangeLog index 3955c48073..4d2986014d 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,27 @@ +Wed Jul 18 14:16:51 2012 Koichi Sasada + + * thread.c (rb_threadptr_async_errinfo_*): manage async errors queue. + Async events such as an exception throwed by Thread#raise, + Thread#kill and thread termination (after main thread termination) + will be queued to th->async_errinfo_queue. + - clear: clear the queue. + - enque: enque err object into queue. + - deque: deque err object from queue. + - active_p: return 1 if the queue should be checked. + rb_thread_t#thrown_errinfo was removed. + + * vm_core.h: add declarations of rb_threadptr_async_errinfo_*. + remove rb_thread_t#thrown_errinfo field and + add rb_thread_t#async_errinfo_queue (queue body: Array), + rb_thread_t#async_errinfo_queue_checked (flag), + rb_thread_t#async_errinfo_mask_stack(Array, not used yet). + + * vm.c (rb_thread_mark): fix a mark function. + + * cont.c (rb_fiber_start): enque an error. + + * process.c (after_fork): clear async errinfo queue. + Wed Jul 18 14:25:55 2012 URABE Shyouhei * pack.c: (ditto) bitwise operations are not char. Apply explicit diff --git a/cont.c b/cont.c index dee7d1690e..1da0ba4705 100644 --- a/cont.c +++ b/cont.c @@ -1164,11 +1164,11 @@ rb_fiber_start(void) if (state) { if (state == TAG_RAISE) { - th->thrown_errinfo = th->errinfo; + rb_threadptr_async_errinfo_enque(th, th->errinfo); } else { - th->thrown_errinfo = - rb_vm_make_jump_tag_but_local_jump(state, th->errinfo); + VALUE err = rb_vm_make_jump_tag_but_local_jump(state, th->errinfo); + rb_threadptr_async_errinfo_enque(th, err); } RUBY_VM_SET_INTERRUPT(th); } diff --git a/process.c b/process.c index 64c0568bde..90eaa0e2e7 100644 --- a/process.c +++ b/process.c @@ -1055,7 +1055,7 @@ after_exec(void) } #define before_fork() before_exec() -#define after_fork() (GET_THREAD()->thrown_errinfo = 0, after_exec()) +#define after_fork() (rb_threadptr_async_errinfo_clear(GET_THREAD()), after_exec()) #include "dln.h" diff --git a/thread.c b/thread.c index 2a40069b91..634c943cc7 100644 --- a/thread.c +++ b/thread.c @@ -68,7 +68,6 @@ static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec); static void sleep_forever(rb_thread_t *th, int nodeadlock); static double timeofday(void); static int rb_threadptr_dead(rb_thread_t *th); - static void rb_check_deadlock(rb_vm_t *vm); #define eKillSignal INT2FIX(0) @@ -131,7 +130,6 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \ exec; \ blocking_region_end(__th, &__region); \ - RUBY_VM_CHECK_INTS(); \ } while(0) #if THREAD_DEBUG @@ -313,9 +311,9 @@ terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread) if (th != main_thread) { thread_debug("terminate_i: %p\n", (void *)th); - rb_threadptr_interrupt(th); - th->thrown_errinfo = eTerminateSignal; + rb_threadptr_async_errinfo_enque(th, eTerminateSignal); th->status = THREAD_TO_KILL; + rb_threadptr_interrupt(th); } else { thread_debug("terminate_i: main thread (%p)\n", (void *)th); @@ -564,6 +562,10 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) th->priority = GET_THREAD()->priority; th->thgroup = GET_THREAD()->thgroup; + th->async_errinfo_queue = rb_ary_new(); + th->async_errinfo_queue_checked = 0; + th->async_errinfo_mask_stack = rb_ary_new(); + native_mutex_initialize(&th->interrupt_lock); if (GET_VM()->event_hooks != NULL) th->event_flags |= RUBY_EVENT_VM; @@ -1133,6 +1135,10 @@ rb_thread_call_without_gvl(void *(*func)(void *), void *data1, val = func(data1); saved_errno = errno; }, ubf, data2); + + /* TODO: check */ + RUBY_VM_CHECK_INTS(); + errno = saved_errno; return val; @@ -1144,13 +1150,28 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) VALUE val; rb_thread_t *th = GET_THREAD(); int saved_errno = 0; + int state; th->waiting_fd = fd; - BLOCKING_REGION({ - val = func(data1); - saved_errno = errno; - }, ubf_select, th); + + TH_PUSH_TAG(th); + if ((state = EXEC_TAG()) == 0) { + BLOCKING_REGION({ + val = func(data1); + saved_errno = errno; + }, ubf_select, th); + } + TH_POP_TAG(); + + /* clear waitinf_fd anytime */ th->waiting_fd = -1; + + if (state) { + JUMP_TAG(state); + } + /* TODO: check func() */ + RUBY_VM_CHECK_INTS(); + errno = saved_errno; return val; @@ -1294,12 +1315,14 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th) } /* exception from another thread */ - if (th->thrown_errinfo) { - VALUE err = th->thrown_errinfo; - th->thrown_errinfo = 0; + if (rb_threadptr_async_errinfo_active_p(th)) { + VALUE err = rb_threadptr_async_errinfo_deque(th); thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); - if (err == eKillSignal || err == eTerminateSignal) { + if (err == eKillSignal /* Thread#kill receieved */ || + err == eTerminateSignal /* Terminate thread */ ) { + rb_threadptr_async_errinfo_clear(th); + th->status = THREAD_TO_KILL; th->errinfo = INT2FIX(TAG_FATAL); TH_JUMP_TAG(th, TAG_FATAL); } @@ -1353,6 +1376,59 @@ rb_gc_mark_threads(void) /*****************************************************/ +/* + * rb_threadptr_async_errinfo_* - manage async errors queue + * + * Async events such as an exception throwed by Thread#raise, + * Thread#kill and thread termination (after main thread termination) + * will be queued to th->async_errinfo_queue. + * - clear: clear the queue. + * - enque: enque err object into queue. + * - deque: deque err object from queue. + * - active_p: return 1 if the queue should be checked. + * + * All rb_threadptr_async_errinfo_* functions are called by + * a GVL acquired thread, of course. + * Note that all "rb_" prefix APIs need GVL to call. + */ + +void +rb_threadptr_async_errinfo_clear(rb_thread_t *th) +{ + rb_ary_clear(th->async_errinfo_queue); +} + +void +rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v) +{ + rb_ary_push(th->async_errinfo_queue, v); + th->async_errinfo_queue_checked = 0; +} + +VALUE +rb_threadptr_async_errinfo_deque(rb_thread_t *th) +{ + VALUE err = rb_ary_shift(th->async_errinfo_queue); + if (RARRAY_LEN(th->async_errinfo_queue) == 0) { + th->async_errinfo_queue_checked = 1; + } + return err; +} + +int +rb_threadptr_async_errinfo_active_p(rb_thread_t *th) +{ + if (th->async_errinfo_queue_checked) { + return 0; + } + else { + return RARRAY_LEN(th->async_errinfo_queue) > 0; + } +} + +VALUE +rb_thread + static void rb_threadptr_ready(rb_thread_t *th) { @@ -1364,19 +1440,13 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) { VALUE exc; - again: if (rb_threadptr_dead(th)) { return Qnil; } - if (th->thrown_errinfo != 0 || th->raised_flag) { - rb_thread_schedule(); - goto again; - } - exc = rb_make_exception(argc, argv); - th->thrown_errinfo = exc; - rb_threadptr_ready(th); + rb_threadptr_async_errinfo_enque(th, exc); + rb_threadptr_interrupt(th); return Qnil; } @@ -1436,13 +1506,6 @@ rb_threadptr_reset_raised(rb_thread_t *th) return 1; } -#define THREAD_IO_WAITING_P(th) ( \ - ((th)->status == THREAD_STOPPED || \ - (th)->status == THREAD_STOPPED_FOREVER) && \ - (th)->blocking_region_buffer && \ - (th)->unblock.func == ubf_select && \ - 1) - static int thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) { @@ -1450,14 +1513,10 @@ thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) rb_thread_t *th; GetThreadPtr((VALUE)key, th); - if (THREAD_IO_WAITING_P(th)) { - native_mutex_lock(&th->interrupt_lock); - if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) { - th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream]; - RUBY_VM_SET_INTERRUPT(th); - (th->unblock.func)(th->unblock.arg); - } - native_mutex_unlock(&th->interrupt_lock); + if (th->waiting_fd == fd) { + VALUE err = th->vm->special_exceptions[ruby_error_closed_stream]; + rb_threadptr_async_errinfo_enque(th, err); + rb_threadptr_interrupt(th); } return ST_CONTINUE; } @@ -1530,10 +1589,9 @@ rb_thread_kill(VALUE thread) thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); - rb_threadptr_interrupt(th); - th->thrown_errinfo = eKillSignal; + rb_threadptr_async_errinfo_enque(th, eKillSignal); th->status = THREAD_TO_KILL; - + rb_threadptr_interrupt(th); return thread; } @@ -2592,6 +2650,9 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, result = native_fd_select(n, read, write, except, timeout, th); if (result < 0) lerrno = errno; }, ubf_select, th); + + RUBY_VM_CHECK_INTS(); + errno = lerrno; if (result < 0) { @@ -2815,6 +2876,8 @@ retry: if (result < 0) lerrno = errno; }, ubf_select, GET_THREAD()); + RUBY_VM_CHECK_INTS(); + if (result < 0) { errno = lerrno; switch (errno) { @@ -4707,6 +4770,10 @@ Init_Thread(void) gvl_init(th->vm); gvl_acquire(th->vm, th); native_mutex_initialize(&th->interrupt_lock); + + th->async_errinfo_queue = rb_ary_new(); + th->async_errinfo_queue_checked = 0; + th->async_errinfo_mask_stack = rb_ary_new(); } } diff --git a/vm.c b/vm.c index 0a85e0fced..54ee3e1ef4 100644 --- a/vm.c +++ b/vm.c @@ -1645,7 +1645,8 @@ rb_thread_mark(void *ptr) RUBY_MARK_UNLESS_NULL(th->thgroup); RUBY_MARK_UNLESS_NULL(th->value); RUBY_MARK_UNLESS_NULL(th->errinfo); - RUBY_MARK_UNLESS_NULL(th->thrown_errinfo); + RUBY_MARK_UNLESS_NULL(th->async_errinfo_queue); + RUBY_MARK_UNLESS_NULL(th->async_errinfo_mask_stack); RUBY_MARK_UNLESS_NULL(th->root_svar); RUBY_MARK_UNLESS_NULL(th->top_self); RUBY_MARK_UNLESS_NULL(th->top_wrapper); diff --git a/vm_core.h b/vm_core.h index ef13b2f4ee..b6dd7982df 100644 --- a/vm_core.h +++ b/vm_core.h @@ -446,8 +446,13 @@ typedef struct rb_thread_struct { VALUE thgroup; VALUE value; + /* temporary place of errinfo */ VALUE errinfo; - VALUE thrown_errinfo; + + /* async errinfo queue */ + VALUE async_errinfo_queue; + int async_errinfo_queue_checked; + VALUE async_errinfo_mask_stack; rb_atomic_t interrupt_flag; rb_thread_lock_t interrupt_lock; @@ -767,6 +772,10 @@ void rb_threadptr_signal_exit(rb_thread_t *th); void rb_threadptr_execute_interrupts(rb_thread_t *); void rb_threadptr_interrupt(rb_thread_t *th); void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th); +void rb_threadptr_async_errinfo_clear(rb_thread_t *th); +void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v); +VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th); +int rb_threadptr_async_errinfo_active_p(rb_thread_t *th); void rb_thread_lock_unlock(rb_thread_lock_t *); void rb_thread_lock_destroy(rb_thread_lock_t *); -- cgit v1.2.3