summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authorko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2012-07-18 05:46:40 +0000
committerko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2012-07-18 05:46:40 +0000
commit28144433b2f951279552b39bc358769a5267e26a (patch)
treea3cd6ea5ed28ca07afde89b07dba43906f7ca6ef /thread.c
parent18c04b880a2ee244cf350ff88004e868d72836eb (diff)
* thread.c (rb_threadptr_async_errinfo_*): manage async errors queue.
Async events such as an exception throwed by Thread#raise, Thread#kill and thread termination (after main thread termination) will be queued to th->async_errinfo_queue. - clear: clear the queue. - enque: enque err object into queue. - deque: deque err object from queue. - active_p: return 1 if the queue should be checked. rb_thread_t#thrown_errinfo was removed. * vm_core.h: add declarations of rb_threadptr_async_errinfo_*. remove rb_thread_t#thrown_errinfo field and add rb_thread_t#async_errinfo_queue (queue body: Array), rb_thread_t#async_errinfo_queue_checked (flag), rb_thread_t#async_errinfo_mask_stack(Array, not used yet). * vm.c (rb_thread_mark): fix a mark function. * cont.c (rb_fiber_start): enque an error. * process.c (after_fork): clear async errinfo queue. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36430 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c143
1 files changed, 105 insertions, 38 deletions
diff --git a/thread.c b/thread.c
index 2a40069b91..634c943cc7 100644
--- a/thread.c
+++ b/thread.c
@@ -68,7 +68,6 @@ static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec);
static void sleep_forever(rb_thread_t *th, int nodeadlock);
static double timeofday(void);
static int rb_threadptr_dead(rb_thread_t *th);
-
static void rb_check_deadlock(rb_vm_t *vm);
#define eKillSignal INT2FIX(0)
@@ -131,7 +130,6 @@ static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_regio
blocking_region_begin(__th, &__region, (ubf), (ubfarg)); \
exec; \
blocking_region_end(__th, &__region); \
- RUBY_VM_CHECK_INTS(); \
} while(0)
#if THREAD_DEBUG
@@ -313,9 +311,9 @@ terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)
if (th != main_thread) {
thread_debug("terminate_i: %p\n", (void *)th);
- rb_threadptr_interrupt(th);
- th->thrown_errinfo = eTerminateSignal;
+ rb_threadptr_async_errinfo_enque(th, eTerminateSignal);
th->status = THREAD_TO_KILL;
+ rb_threadptr_interrupt(th);
}
else {
thread_debug("terminate_i: main thread (%p)\n", (void *)th);
@@ -564,6 +562,10 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
th->priority = GET_THREAD()->priority;
th->thgroup = GET_THREAD()->thgroup;
+ th->async_errinfo_queue = rb_ary_new();
+ th->async_errinfo_queue_checked = 0;
+ th->async_errinfo_mask_stack = rb_ary_new();
+
native_mutex_initialize(&th->interrupt_lock);
if (GET_VM()->event_hooks != NULL)
th->event_flags |= RUBY_EVENT_VM;
@@ -1133,6 +1135,10 @@ rb_thread_call_without_gvl(void *(*func)(void *), void *data1,
val = func(data1);
saved_errno = errno;
}, ubf, data2);
+
+ /* TODO: check */
+ RUBY_VM_CHECK_INTS();
+
errno = saved_errno;
return val;
@@ -1144,13 +1150,28 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
VALUE val;
rb_thread_t *th = GET_THREAD();
int saved_errno = 0;
+ int state;
th->waiting_fd = fd;
- BLOCKING_REGION({
- val = func(data1);
- saved_errno = errno;
- }, ubf_select, th);
+
+ TH_PUSH_TAG(th);
+ if ((state = EXEC_TAG()) == 0) {
+ BLOCKING_REGION({
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, th);
+ }
+ TH_POP_TAG();
+
+ /* clear waitinf_fd anytime */
th->waiting_fd = -1;
+
+ if (state) {
+ JUMP_TAG(state);
+ }
+ /* TODO: check func() */
+ RUBY_VM_CHECK_INTS();
+
errno = saved_errno;
return val;
@@ -1294,12 +1315,14 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th)
}
/* exception from another thread */
- if (th->thrown_errinfo) {
- VALUE err = th->thrown_errinfo;
- th->thrown_errinfo = 0;
+ if (rb_threadptr_async_errinfo_active_p(th)) {
+ VALUE err = rb_threadptr_async_errinfo_deque(th);
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
- if (err == eKillSignal || err == eTerminateSignal) {
+ if (err == eKillSignal /* Thread#kill receieved */ ||
+ err == eTerminateSignal /* Terminate thread */ ) {
+ rb_threadptr_async_errinfo_clear(th);
+ th->status = THREAD_TO_KILL;
th->errinfo = INT2FIX(TAG_FATAL);
TH_JUMP_TAG(th, TAG_FATAL);
}
@@ -1353,6 +1376,59 @@ rb_gc_mark_threads(void)
/*****************************************************/
+/*
+ * rb_threadptr_async_errinfo_* - manage async errors queue
+ *
+ * Async events such as an exception throwed by Thread#raise,
+ * Thread#kill and thread termination (after main thread termination)
+ * will be queued to th->async_errinfo_queue.
+ * - clear: clear the queue.
+ * - enque: enque err object into queue.
+ * - deque: deque err object from queue.
+ * - active_p: return 1 if the queue should be checked.
+ *
+ * All rb_threadptr_async_errinfo_* functions are called by
+ * a GVL acquired thread, of course.
+ * Note that all "rb_" prefix APIs need GVL to call.
+ */
+
+void
+rb_threadptr_async_errinfo_clear(rb_thread_t *th)
+{
+ rb_ary_clear(th->async_errinfo_queue);
+}
+
+void
+rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
+{
+ rb_ary_push(th->async_errinfo_queue, v);
+ th->async_errinfo_queue_checked = 0;
+}
+
+VALUE
+rb_threadptr_async_errinfo_deque(rb_thread_t *th)
+{
+ VALUE err = rb_ary_shift(th->async_errinfo_queue);
+ if (RARRAY_LEN(th->async_errinfo_queue) == 0) {
+ th->async_errinfo_queue_checked = 1;
+ }
+ return err;
+}
+
+int
+rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
+{
+ if (th->async_errinfo_queue_checked) {
+ return 0;
+ }
+ else {
+ return RARRAY_LEN(th->async_errinfo_queue) > 0;
+ }
+}
+
+VALUE
+rb_thread
+
static void
rb_threadptr_ready(rb_thread_t *th)
{
@@ -1364,19 +1440,13 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
{
VALUE exc;
- again:
if (rb_threadptr_dead(th)) {
return Qnil;
}
- if (th->thrown_errinfo != 0 || th->raised_flag) {
- rb_thread_schedule();
- goto again;
- }
-
exc = rb_make_exception(argc, argv);
- th->thrown_errinfo = exc;
- rb_threadptr_ready(th);
+ rb_threadptr_async_errinfo_enque(th, exc);
+ rb_threadptr_interrupt(th);
return Qnil;
}
@@ -1436,13 +1506,6 @@ rb_threadptr_reset_raised(rb_thread_t *th)
return 1;
}
-#define THREAD_IO_WAITING_P(th) ( \
- ((th)->status == THREAD_STOPPED || \
- (th)->status == THREAD_STOPPED_FOREVER) && \
- (th)->blocking_region_buffer && \
- (th)->unblock.func == ubf_select && \
- 1)
-
static int
thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
{
@@ -1450,14 +1513,10 @@ thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
rb_thread_t *th;
GetThreadPtr((VALUE)key, th);
- if (THREAD_IO_WAITING_P(th)) {
- native_mutex_lock(&th->interrupt_lock);
- if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
- th->thrown_errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
- RUBY_VM_SET_INTERRUPT(th);
- (th->unblock.func)(th->unblock.arg);
- }
- native_mutex_unlock(&th->interrupt_lock);
+ if (th->waiting_fd == fd) {
+ VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
+ rb_threadptr_async_errinfo_enque(th, err);
+ rb_threadptr_interrupt(th);
}
return ST_CONTINUE;
}
@@ -1530,10 +1589,9 @@ rb_thread_kill(VALUE thread)
thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
- rb_threadptr_interrupt(th);
- th->thrown_errinfo = eKillSignal;
+ rb_threadptr_async_errinfo_enque(th, eKillSignal);
th->status = THREAD_TO_KILL;
-
+ rb_threadptr_interrupt(th);
return thread;
}
@@ -2592,6 +2650,9 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
result = native_fd_select(n, read, write, except, timeout, th);
if (result < 0) lerrno = errno;
}, ubf_select, th);
+
+ RUBY_VM_CHECK_INTS();
+
errno = lerrno;
if (result < 0) {
@@ -2815,6 +2876,8 @@ retry:
if (result < 0) lerrno = errno;
}, ubf_select, GET_THREAD());
+ RUBY_VM_CHECK_INTS();
+
if (result < 0) {
errno = lerrno;
switch (errno) {
@@ -4707,6 +4770,10 @@ Init_Thread(void)
gvl_init(th->vm);
gvl_acquire(th->vm, th);
native_mutex_initialize(&th->interrupt_lock);
+
+ th->async_errinfo_queue = rb_ary_new();
+ th->async_errinfo_queue_checked = 0;
+ th->async_errinfo_mask_stack = rb_ary_new();
}
}