summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2012-07-19 14:19:40 +0000
committerko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2012-07-19 14:19:40 +0000
commitf4a8db647ae66621f5d37402f5a11a3d57c69bb0 (patch)
tree2325b2b809b9d94ef2ccc367f5f84d9672e9f00d
parent422e8d5adc3cf2d67b53cf9050c750eba7db3673 (diff)
* thread.c (rb_thread_s_control_interrupt,
rb_thread_s_check_interrupt): added for Thread.control_intgerrupt and Thread.check_interrupt. See details on rdoc. I'll make an ticket for this feature. * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt. * thread.c (rb_threadptr_raise): make a new exception object even if argc is 0. * thread.c (rb_thread_kill): kill thread immediately if target thread is current thread. * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added. CHECK_INTS while/after blocking operation. * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr. * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS(). * eval.c (ruby_cleanup): ditto. * insns.def: ditto. * process.c (rb_waitpid): ditto. * vm_eval.c (vm_call0): ditto. * vm_insnhelper.c (vm_call_method): ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@36470 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
-rw-r--r--ChangeLog33
-rw-r--r--cont.c2
-rw-r--r--eval.c2
-rw-r--r--insns.def12
-rw-r--r--process.c2
-rw-r--r--test/ruby/test_thread.rb66
-rw-r--r--thread.c471
-rw-r--r--vm_core.h14
-rw-r--r--vm_eval.c4
-rw-r--r--vm_insnhelper.c2
10 files changed, 504 insertions, 104 deletions
diff --git a/ChangeLog b/ChangeLog
index 600cdb958e..8d3f4a35a3 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,36 @@
+Thu Jul 19 15:08:40 2012 Koichi Sasada <ko1@atdot.net>
+
+ * thread.c (rb_thread_s_control_interrupt,
+ rb_thread_s_check_interrupt): added for
+ Thread.control_intgerrupt and Thread.check_interrupt.
+ See details on rdoc.
+ I'll make an ticket for this feature.
+
+ * test/ruby/test_thread.rb: add a test for Thread.control_intgerrupt.
+
+ * thread.c (rb_threadptr_raise): make a new exception object
+ even if argc is 0.
+
+ * thread.c (rb_thread_kill): kill thread immediately if target thread
+ is current thread.
+
+ * vm_core.h (RUBY_VM_CHECK_INTS_BLOCKING): added.
+ CHECK_INTS while/after blocking operation.
+
+ * vm_core.h (RUBY_VM_CHECK_INTS): require rb_thread_t ptr.
+
+ * cont.c (fiber_switch): use replaced RUBY_VM_CHECK_INTS().
+
+ * eval.c (ruby_cleanup): ditto.
+
+ * insns.def: ditto.
+
+ * process.c (rb_waitpid): ditto.
+
+ * vm_eval.c (vm_call0): ditto.
+
+ * vm_insnhelper.c (vm_call_method): ditto.
+
Thu Jul 19 22:46:48 2012 Tanaka Akira <akr@fsij.org>
* test/ruby/test_io.rb: remove temporally files early.
diff --git a/cont.c b/cont.c
index 1da0ba4705..58b2d303ac 100644
--- a/cont.c
+++ b/cont.c
@@ -1328,7 +1328,7 @@ fiber_switch(VALUE fibval, int argc, VALUE *argv, int is_resume)
rb_bug("rb_fiber_resume: unreachable");
}
#endif
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
return value;
}
diff --git a/eval.c b/eval.c
index 411e50727f..14e0c5ce53 100644
--- a/eval.c
+++ b/eval.c
@@ -160,7 +160,7 @@ ruby_cleanup(volatile int ex)
rb_threadptr_check_signal(th);
PUSH_TAG();
if ((state = EXEC_TAG()) == 0) {
- SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(); });
+ SAVE_ROOT_JMPBUF(th, { RUBY_VM_CHECK_INTS(th); });
}
POP_TAG();
diff --git a/insns.def b/insns.def
index aa5b0eaf9f..1a7bc93754 100644
--- a/insns.def
+++ b/insns.def
@@ -1086,7 +1086,7 @@ leave
}
}
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
if (UNLIKELY(VM_FRAME_TYPE_FINISH_P(GET_CFP()))) {
#if OPT_CALL_THREADED_CODE
@@ -1117,7 +1117,7 @@ throw
(VALUE throwobj)
(VALUE val)
{
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
val = vm_throw(th, GET_CFP(), throw_state, throwobj);
THROW_EXCEPTION(val);
/* unreachable */
@@ -1138,7 +1138,7 @@ jump
()
()
{
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
JUMP(dst);
}
@@ -1154,7 +1154,7 @@ branchif
()
{
if (RTEST(val)) {
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
JUMP(dst);
}
}
@@ -1171,7 +1171,7 @@ branchunless
()
{
if (!RTEST(val)) {
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
JUMP(dst);
}
}
@@ -1220,7 +1220,7 @@ onceinlinecache
}
else if (ic->ic_value.value == Qundef)
{
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
rb_thread_schedule();
goto retry;
}
diff --git a/process.c b/process.c
index 90eaa0e2e7..3aacc5d60b 100644
--- a/process.c
+++ b/process.c
@@ -663,7 +663,7 @@ rb_waitpid(rb_pid_t pid, int *st, int flags)
RUBY_UBF_PROCESS, 0);
if (result < 0) {
if (errno == EINTR) {
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(GET_THREAD());
goto retry;
}
return (rb_pid_t)-1;
diff --git a/test/ruby/test_thread.rb b/test/ruby/test_thread.rb
index 946d522cd9..ed22cf9b44 100644
--- a/test/ruby/test_thread.rb
+++ b/test/ruby/test_thread.rb
@@ -615,6 +615,72 @@ class TestThread < Test::Unit::TestCase
end
assert_equal("Can't call on top of Fiber or Thread", error.message, bug5083)
end
+
+ def make_control_interrupt_test_thread1 flag
+ r = []
+ q = Queue.new
+ th = Thread.new{
+ begin
+ Thread.control_interrupt(RuntimeError => flag){
+ q << :go
+ begin
+ sleep 0.5
+ rescue
+ r << :c1
+ end
+ }
+ sleep 0.5
+ rescue
+ r << :c2
+ end
+ }
+ q.pop # wait
+ th.raise
+ begin
+ th.join
+ rescue
+ r << :c3
+ end
+ r
+ end
+
+ def test_control_interrupt
+ [[:never, :c2],
+ [:immediate, :c1],
+ [:on_blocking, :c1]].each{|(flag, c)|
+ assert_equal([flag, c], [flag] + make_control_interrupt_test_thread1(flag))
+ }
+ # TODO: complex cases are needed.
+ end
+
+ def test_check_interrupt
+ q = Queue.new
+ Thread.control_interrupt(RuntimeError => :never){
+ th = Thread.new{
+ q.push :e
+ begin
+ begin
+ sleep 0.5
+ rescue => e
+ q.push :ng1
+ end
+ begin
+ Thread.check_interrupt
+ rescue => e
+ q.push :ok
+ end
+ rescue => e
+ q.push :ng2
+ ensure
+ q.push :ng3
+ end
+ }
+ q.pop
+ th.raise
+ th.join
+ assert_equal(:ok, q.pop)
+ }
+ end
end
class TestThreadGroup < Test::Unit::TestCase
diff --git a/thread.c b/thread.c
index 6cd95696ef..acd9ac4a0b 100644
--- a/thread.c
+++ b/thread.c
@@ -265,7 +265,7 @@ set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg,
struct rb_unblock_callback *old)
{
check_ints:
- RUBY_VM_CHECK_INTS(); /* check signal or so */
+ RUBY_VM_CHECK_INTS(th); /* check signal or so */
native_mutex_lock(&th->interrupt_lock);
if (th->interrupt_flag) {
native_mutex_unlock(&th->interrupt_lock);
@@ -545,7 +545,7 @@ thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_s
static VALUE
thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
{
- rb_thread_t *th;
+ rb_thread_t *th, *current_th = GET_THREAD();
int err;
if (OBJ_FROZEN(GET_THREAD()->thgroup)) {
@@ -559,12 +559,12 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
th->first_proc = fn ? Qfalse : rb_block_proc();
th->first_args = args; /* GC: shouldn't put before above line */
- th->priority = GET_THREAD()->priority;
- th->thgroup = GET_THREAD()->thgroup;
+ th->priority = current_th->priority;
+ th->thgroup = current_th->thgroup;
th->async_errinfo_queue = rb_ary_new();
th->async_errinfo_queue_checked = 0;
- th->async_errinfo_mask_stack = rb_ary_new();
+ th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack);
native_mutex_initialize(&th->interrupt_lock);
if (GET_VM()->event_hooks != NULL)
@@ -859,7 +859,7 @@ sleep_forever(rb_thread_t *th, int deadlockable)
if (deadlockable) {
th->vm->sleeper--;
}
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
} while (th->status == status);
th->status = prev_status;
}
@@ -896,7 +896,7 @@ sleep_timeval(rb_thread_t *th, struct timeval tv)
th->status = THREAD_STOPPED;
do {
native_sleep(th, &tv);
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
getclockofday(&tvn);
if (to.tv_sec < tvn.tv_sec) break;
if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break;
@@ -968,9 +968,9 @@ rb_thread_wait_for(struct timeval time)
void
rb_thread_polling(void)
{
- RUBY_VM_CHECK_INTS();
if (!rb_thread_alone()) {
rb_thread_t *th = GET_THREAD();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
sleep_for_polling(th);
}
}
@@ -985,7 +985,7 @@ rb_thread_polling(void)
void
rb_thread_check_ints(void)
{
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD());
}
/*
@@ -1013,7 +1013,7 @@ rb_thread_sleep(int sec)
rb_thread_wait_for(rb_time_timeval(INT2FIX(sec)));
}
-static void rb_threadptr_execute_interrupts_common(rb_thread_t *);
+static void rb_threadptr_execute_interrupts_common(rb_thread_t *, int blocking);
static void
rb_thread_schedule_limits(unsigned long limits_us)
@@ -1040,7 +1040,7 @@ rb_thread_schedule(void)
rb_thread_schedule_limits(0);
if (UNLIKELY(GET_THREAD()->interrupt_flag)) {
- rb_threadptr_execute_interrupts_common(GET_THREAD());
+ rb_threadptr_execute_interrupts_common(GET_THREAD(), 0);
}
}
@@ -1076,7 +1076,7 @@ rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region)
rb_thread_t *th = GET_THREAD();
blocking_region_end(th, region);
xfree(region);
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
errno = saved_errno;
}
@@ -1181,7 +1181,7 @@ rb_thread_call_without_gvl2(void *(*func)(void *data, int *skip_checkints), void
}, ubf, data2);
if (!skip_checkints) {
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
}
errno = saved_errno;
@@ -1237,7 +1237,7 @@ rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
JUMP_TAG(state);
}
/* TODO: check func() */
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
errno = saved_errno;
@@ -1350,12 +1350,348 @@ thread_s_pass(VALUE klass)
return Qnil;
}
+/*****************************************************/
+
/*
+ * rb_threadptr_async_errinfo_* - manage async errors queue
+ *
+ * Async events such as an exception throwed by Thread#raise,
+ * Thread#kill and thread termination (after main thread termination)
+ * will be queued to th->async_errinfo_queue.
+ * - clear: clear the queue.
+ * - enque: enque err object into queue.
+ * - deque: deque err object from queue.
+ * - active_p: return 1 if the queue should be checked.
*
+ * All rb_threadptr_async_errinfo_* functions are called by
+ * a GVL acquired thread, of course.
+ * Note that all "rb_" prefix APIs need GVL to call.
*/
+void
+rb_threadptr_async_errinfo_clear(rb_thread_t *th)
+{
+ rb_ary_clear(th->async_errinfo_queue);
+}
+
+void
+rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
+{
+ rb_ary_push(th->async_errinfo_queue, v);
+ th->async_errinfo_queue_checked = 0;
+}
+
+enum interrupt_timing {
+ INTERRUPT_NONE,
+ INTERRUPT_IMMEDIATE,
+ INTERRUPT_ON_BLOCKING,
+ INTERRUPT_NEVER
+};
+
+static enum interrupt_timing
+rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err)
+{
+ VALUE mask;
+ long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack);
+ VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack);
+ VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */
+ long ancestors_len = RARRAY_LEN(ancestors);
+ VALUE *ancestors_ptr = RARRAY_PTR(ancestors);
+ int i, j;
+
+ for (i=0; i<mask_stack_len; i++) {
+ mask = mask_stack[mask_stack_len-(i+1)];
+
+ for (j=0; j<ancestors_len; j++) {
+ VALUE klass = ancestors_ptr[j];
+ VALUE sym;
+
+ /* TODO: remove rb_intern() */
+ if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
+ if (sym == ID2SYM(rb_intern("immediate"))) {
+ return INTERRUPT_IMMEDIATE;
+ }
+ else if (sym == ID2SYM(rb_intern("on_blocking"))) {
+ return INTERRUPT_ON_BLOCKING;
+ }
+ else if (sym == ID2SYM(rb_intern("never"))) {
+ return INTERRUPT_NEVER;
+ }
+ else {
+ rb_raise(rb_eThreadError, "unknown mask signature");
+ }
+ }
+ }
+ /* try to next mask */
+ }
+ return INTERRUPT_NONE;
+}
+
+static int
+rb_threadptr_async_errinfo_empty_p(rb_thread_t *th)
+{
+ return RARRAY_LEN(th->async_errinfo_queue) == 0;
+}
+
+static VALUE
+rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum interrupt_timing timing)
+{
+#if 1 /* 1 to enable Thread#control_interrupt, 0 to ignore it */
+ int i;
+
+ for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) {
+ VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i];
+
+ enum interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err));
+
+ switch (mask_timing) {
+ case INTERRUPT_ON_BLOCKING:
+ if (timing != INTERRUPT_ON_BLOCKING) {
+ break;
+ }
+ /* fall through */
+ case INTERRUPT_NONE: /* default: IMMEDIATE */
+ case INTERRUPT_IMMEDIATE:
+ rb_ary_delete_at(th->async_errinfo_queue, i);
+ return err;
+ case INTERRUPT_NEVER:
+ break;
+ }
+ }
+
+ th->async_errinfo_queue_checked = 1;
+ return Qundef;
+#else
+ VALUE err = rb_ary_shift(th->async_errinfo_queue);
+ if (rb_threadptr_async_errinfo_empty_p(th)) {
+ th->async_errinfo_queue_checked = 1;
+ }
+ return err;
+#endif
+}
+
+int
+rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
+{
+ if (th->async_errinfo_queue_checked || rb_threadptr_async_errinfo_empty_p(th)) {
+ return 0;
+ }
+ else {
+ return 1;
+ }
+}
+
+static VALUE
+rb_threadptr_interrupt_mask(rb_thread_t *th, VALUE mask, VALUE (*func)(rb_thread_t *th))
+{
+ VALUE r = Qnil;
+ int state;
+
+ rb_ary_push(th->async_errinfo_mask_stack, mask);
+ if (!rb_threadptr_async_errinfo_empty_p(th)) {
+ th->async_errinfo_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(th);
+ }
+
+ TH_PUSH_TAG(th);
+ if ((state = EXEC_TAG()) == 0) {
+ r = func(th);
+ }
+ TH_POP_TAG();
+
+ rb_ary_pop(th->async_errinfo_mask_stack);
+ if (!rb_threadptr_async_errinfo_empty_p(th)) {
+ th->async_errinfo_queue_checked = 0;
+ RUBY_VM_SET_INTERRUPT(th);
+ }
+
+ if (state) {
+ JUMP_TAG(state);
+ }
+
+ return r;
+}
+
+/*
+ * call-seq:
+ * Thread.control_interrupt(hash) { ... } -> result of the block
+ *
+ * Thread.control_interrupt controls interrupt timing.
+ *
+ * _interrupt_ means asynchronous event and corresponding procedure
+ * by Thread#raise, Thread#kill, signal trap (not supported yet)
+ * and main thread termination (if main thread terminates, then all
+ * other thread will be killed).
+ *
+ * _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol
+ * is one of them:
+ * - :immediate Invoke interrupt immediately.
+ * - :on_blocking Invoke interrupt while _BlockingOperation_.
+ * - :never Never invoke interrupt.
+ *
+ * _BlockingOperation_ means that the operation will block the calling thread,
+ * such as read and write. On CRuby implementation, _BlockingOperation_ is
+ * operation executed without GVL.
+ *
+ * Masked interrupts are delayed until they are enabled.
+ * This method is similar to sigprocmask(3).
+ *
+ * TODO (DOC): control_interrupt is stacked.
+ * TODO (DOC): check ancestors.
+ * TODO (DOC): to prevent all interrupt, {Object => :never} works.
+ *
+ * NOTE: Asynchronous interrupts are difficult to use.
+ * If you need to communicate between threads,
+ * please consider to use another way such as Queue.
+ * Or use them with deep understanding about this method.
+ *
+ *
+ * # example: Guard from Thread#raise
+ * th = Thread.new do
+ * Thead.control_interrupt(RuntimeError => :never) {
+ * begin
+ * # Thread#raise doesn't interrupt here.
+ * # You can write resource allocation code safely.
+ * Thread.control_interrupt(RuntimeError => :immediate) {
+ * # ...
+ * # It is possible to be interrupted by Thread#raise.
+ * }
+ * ensure
+ * # Thread#raise doesn't interrupt here.
+ * # You can write resource dealocation code safely.
+ * end
+ * }
+ * end
+ * Thread.pass
+ * # ...
+ * th.raise "stop"
+ *
+ * # example: Guard from TimeoutError
+ * require 'timeout'
+ * Thread.control_interrupt(TimeoutError => :never) {
+ * timeout(10){
+ * # TimeoutError doesn't occur here
+ * Thread.control_interrupt(TimeoutError => :on_blocking) {
+ * # possible to be killed by TimeoutError
+ * # while blocking operation
+ * }
+ * # TimeoutError doesn't occur here
+ * }
+ * }
+ *
+ * # example: Stack control settings
+ * Thread.control_interrupt(FooError => :never) {
+ * Thread.control_interrupt(BarError => :never) {
+ * # FooError and BarError are prohibited.
+ * }
+ * }
+ *
+ * # example: check ancestors
+ * Thread.control_interrupt(Exception => :never) {
+ * # all exceptions inherited from Exception are prohibited.
+ * }
+ *
+ */
+
+static VALUE
+control_interrupt_func(rb_thread_t *th)
+{
+ return rb_yield(Qnil);
+}
+
+static VALUE
+rb_thread_s_control_interrupt(VALUE self, VALUE mask_arg)
+{
+ if (!rb_block_given_p()) {
+ rb_raise(rb_eArgError, "block is needed.");
+ }
+
+ return rb_threadptr_interrupt_mask(GET_THREAD(),
+ rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"),
+ control_interrupt_func);
+}
+
+/*
+ * call-seq:
+ * Thread.check_interrupt() -> nil
+ *
+ * Check queued interrupts.
+ *
+ * If there are queued interrupts, process respective procedures.
+ *
+ * This method can be defined as the following Ruby code:
+ *
+ * def Thread.check_interrupt
+ * Thread.control_interrupt(Object => :immediate) {
+ * Thread.pass
+ * }
+ * end
+ *
+ * Examples:
+ *
+ * th = Thread.new{
+ * Thread.control_interrupt(RuntimeError => :on_blocking){
+ * while true
+ * ...
+ * # reach safe point to invoke interrupt
+ * Thread.check_interrupt
+ * ...
+ * end
+ * }
+ * }
+ * ...
+ * th.raise # stop thread
+ *
+ * NOTE: This example can be described by the another code.
+ * You need to keep to avoid asynchronous interrupts.
+ *
+ * flag = true
+ * th = Thread.new{
+ * Thread.control_interrupt(RuntimeError => :on_blocking){
+ * while true
+ * ...
+ * # reach safe point to invoke interrupt
+ * break if flag == false
+ * ...
+ * end
+ * }
+ * }
+ * ...
+ * flag = false # stop thread
+ */
+
+static VALUE
+check_interrupt_func(rb_thread_t *th)
+{
+ RUBY_VM_CHECK_INTS(th);
+ return Qnil;
+}
+
+static VALUE
+rb_thread_s_check_interrupt(VALUE self)
+{
+ rb_thread_t *th = GET_THREAD();
+
+ if (!rb_threadptr_async_errinfo_empty_p(th)) {
+ VALUE mask = rb_hash_new();
+ rb_hash_aset(mask, rb_cObject, ID2SYM(rb_intern("immediate")));
+ rb_threadptr_interrupt_mask(GET_THREAD(), mask, check_interrupt_func);
+ }
+
+ return Qnil;
+}
+
+static void
+rb_threadptr_to_kill(rb_thread_t *th)
+{
+ rb_threadptr_async_errinfo_clear(th);
+ th->status = THREAD_TO_KILL;
+ th->errinfo = INT2FIX(TAG_FATAL);
+ TH_JUMP_TAG(th, TAG_FATAL);
+}
+
static void
-rb_threadptr_execute_interrupts_common(rb_thread_t *th)
+rb_threadptr_execute_interrupts_common(rb_thread_t *th, int blocking_timing)
{
rb_atomic_t interrupt;
@@ -1378,15 +1714,15 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th)
/* exception from another thread */
if (rb_threadptr_async_errinfo_active_p(th)) {
- VALUE err = rb_threadptr_async_errinfo_deque(th);
+ VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
- if (err == eKillSignal /* Thread#kill receieved */ ||
- err == eTerminateSignal /* Terminate thread */ ) {
- rb_threadptr_async_errinfo_clear(th);
- th->status = THREAD_TO_KILL;
- th->errinfo = INT2FIX(TAG_FATAL);
- TH_JUMP_TAG(th, TAG_FATAL);
+ if (err == Qundef) {
+ /* no error */
+ }
+ else if (err == eKillSignal /* Thread#kill receieved */ ||
+ err == eTerminateSignal /* Terminate thread */ ) {
+ rb_threadptr_to_kill(th);
}
else {
rb_exc_raise(err);
@@ -1417,9 +1753,9 @@ rb_threadptr_execute_interrupts_common(rb_thread_t *th)
}
void
-rb_threadptr_execute_interrupts(rb_thread_t *th)
+rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
{
- rb_threadptr_execute_interrupts_common(th);
+ rb_threadptr_execute_interrupts_common(th, blocking_timing);
}
void
@@ -1427,59 +1763,7 @@ rb_thread_execute_interrupts(VALUE thval)
{
rb_thread_t *th;
GetThreadPtr(thval, th);
- rb_threadptr_execute_interrupts_common(th);
-}
-
-/*****************************************************/
-
-/*
- * rb_threadptr_async_errinfo_* - manage async errors queue
- *
- * Async events such as an exception throwed by Thread#raise,
- * Thread#kill and thread termination (after main thread termination)
- * will be queued to th->async_errinfo_queue.
- * - clear: clear the queue.
- * - enque: enque err object into queue.
- * - deque: deque err object from queue.
- * - active_p: return 1 if the queue should be checked.
- *
- * All rb_threadptr_async_errinfo_* functions are called by
- * a GVL acquired thread, of course.
- * Note that all "rb_" prefix APIs need GVL to call.
- */
-
-void
-rb_threadptr_async_errinfo_clear(rb_thread_t *th)
-{
- rb_ary_clear(th->async_errinfo_queue);
-}
-
-void
-rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
-{
- rb_ary_push(th->async_errinfo_queue, v);
- th->async_errinfo_queue_checked = 0;
-}
-
-VALUE
-rb_threadptr_async_errinfo_deque(rb_thread_t *th)
-{
- VALUE err = rb_ary_shift(th->async_errinfo_queue);
- if (RARRAY_LEN(th->async_errinfo_queue) == 0) {
- th->async_errinfo_queue_checked = 1;
- }
- return err;
-}
-
-int
-rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
-{
- if (th->async_errinfo_queue_checked) {
- return 0;
- }
- else {
- return RARRAY_LEN(th->async_errinfo_queue) > 0;
- }
+ rb_threadptr_execute_interrupts_common(th, 1);
}
static void
@@ -1497,7 +1781,12 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
return Qnil;
}
- exc = rb_make_exception(argc, argv);
+ if (argc == 0) {
+ exc = rb_exc_new(rb_eRuntimeError, 0, 0);
+ }
+ else {
+ exc = rb_make_exception(argc, argv);
+ }
rb_threadptr_async_errinfo_enque(th, exc);
rb_threadptr_interrupt(th);
return Qnil;
@@ -1642,9 +1931,15 @@ rb_thread_kill(VALUE thread)
thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id);
- rb_threadptr_async_errinfo_enque(th, eKillSignal);
- th->status = THREAD_TO_KILL;
- rb_threadptr_interrupt(th);
+ if (th == GET_THREAD()) {
+ /* kill myself immediately */
+ rb_threadptr_to_kill(th);
+ }
+ else {
+ rb_threadptr_async_errinfo_enque(th, eKillSignal);
+ th->status = THREAD_TO_KILL;
+ rb_threadptr_interrupt(th);
+ }
return thread;
}
@@ -1683,7 +1978,8 @@ rb_thread_s_kill(VALUE obj, VALUE th)
static VALUE
rb_thread_exit(void)
{
- return rb_thread_kill(GET_THREAD()->self);
+ rb_thread_t *th = GET_THREAD();
+ return rb_thread_kill(th->self);
}
@@ -2704,7 +3000,7 @@ do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except,
if (result < 0) lerrno = errno;
}, ubf_select, th);
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
errno = lerrno;
@@ -2910,6 +3206,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
double limit = 0;
struct timespec ts;
struct timespec *timeout = NULL;
+ rb_thread_t *th = GET_THREAD();
if (tv) {
ts.tv_sec = tv->tv_sec;
@@ -2927,9 +3224,9 @@ retry:
BLOCKING_REGION({
result = ppoll(&fds, 1, timeout, NULL);
if (result < 0) lerrno = errno;
- }, ubf_select, GET_THREAD());
+ }, ubf_select, th);
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
if (result < 0) {
errno = lerrno;
@@ -3673,7 +3970,7 @@ rb_mutex_lock(VALUE self)
if (mutex->th == th) mutex_locked(th, self);
if (interrupted) {
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS_BLOCKING(th);
}
}
}
@@ -4755,6 +5052,8 @@ Init_Thread(void)
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
#endif
+ rb_define_singleton_method(rb_cThread, "control_interrupt", rb_thread_s_control_interrupt, 1);
+ rb_define_singleton_method(rb_cThread, "check_interrupt", rb_thread_s_check_interrupt, 1);
rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
diff --git a/vm_core.h b/vm_core.h
index b6dd7982df..97d4410d73 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -769,25 +769,27 @@ void rb_signal_exec(rb_thread_t *th, int sig);
void rb_threadptr_check_signal(rb_thread_t *mth);
void rb_threadptr_signal_raise(rb_thread_t *th, int sig);
void rb_threadptr_signal_exit(rb_thread_t *th);
-void rb_threadptr_execute_interrupts(rb_thread_t *);
+void rb_threadptr_execute_interrupts(rb_thread_t *, int);
void rb_threadptr_interrupt(rb_thread_t *th);
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
void rb_threadptr_async_errinfo_clear(rb_thread_t *th);
void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v);
-VALUE rb_threadptr_async_errinfo_deque(rb_thread_t *th);
int rb_threadptr_async_errinfo_active_p(rb_thread_t *th);
void rb_thread_lock_unlock(rb_thread_lock_t *);
void rb_thread_lock_destroy(rb_thread_lock_t *);
-#define RUBY_VM_CHECK_INTS_TH(th) do { \
+#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \
if (UNLIKELY((th)->interrupt_flag)) { \
- rb_threadptr_execute_interrupts(th); \
+ rb_threadptr_execute_interrupts(th, 1); \
} \
} while (0)
-#define RUBY_VM_CHECK_INTS() \
- RUBY_VM_CHECK_INTS_TH(GET_THREAD())
+#define RUBY_VM_CHECK_INTS(th) do { \
+ if (UNLIKELY((th)->interrupt_flag)) { \
+ rb_threadptr_execute_interrupts(th, 0); \
+ } \
+} while (0)
/* tracer */
void
diff --git a/vm_eval.c b/vm_eval.c
index ea52273d52..12e4eb4e0b 100644
--- a/vm_eval.c
+++ b/vm_eval.c
@@ -103,7 +103,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv,
if (!klass || !(me = rb_method_entry(klass, id))) {
return method_missing(recv, id, argc, argv, NOEX_SUPER);
}
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
if (!(def = me->def)) return Qnil;
goto again;
}
@@ -138,7 +138,7 @@ vm_call0(rb_thread_t* th, VALUE recv, VALUE id, int argc, const VALUE *argv,
rb_bug("vm_call0: unsupported method type (%d)", def->type);
val = Qundef;
}
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
return val;
}
diff --git a/vm_insnhelper.c b/vm_insnhelper.c
index e4a45e669d..1b23ee6e18 100644
--- a/vm_insnhelper.c
+++ b/vm_insnhelper.c
@@ -706,7 +706,7 @@ vm_call_method(rb_thread_t *th, rb_control_frame_t *cfp,
}
}
- RUBY_VM_CHECK_INTS();
+ RUBY_VM_CHECK_INTS(th);
return val;
}