From 87f025da25e2ce7f75f99cc71bd5396ecbf4ae1c Mon Sep 17 00:00:00 2001 From: akr Date: Thu, 9 Jun 2011 15:02:46 +0000 Subject: * io.c: fix IO.copy_stream interrupt handling. based on the patch by Eric Wong. [ruby-core:36156] * vm_core.h (rb_thread_call_with_gvl): don't declare here. * thread.c: include internal.h. (rb_thread_execute_interrupts): new function. * internal.h (rb_thread_execute_interrupts): declared. (rb_thread_call_with_gvl): declared. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@31971 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 13 +++++++ internal.h | 3 ++ io.c | 104 ++++++++++++++++++++++++++++++++++++++++----------- test/ruby/test_io.rb | 32 ++++++++++++++++ thread.c | 7 ++++ vm_core.h | 1 - 6 files changed, 137 insertions(+), 23 deletions(-) diff --git a/ChangeLog b/ChangeLog index f347f83e28..fc0a658dc8 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,16 @@ +Thu Jun 9 23:57:53 2011 Tanaka Akira + + * io.c: fix IO.copy_stream interrupt handling. + based on the patch by Eric Wong. [ruby-core:36156] + + * vm_core.h (rb_thread_call_with_gvl): don't declare here. + + * thread.c: include internal.h. + (rb_thread_execute_interrupts): new function. + + * internal.h (rb_thread_execute_interrupts): declared. + (rb_thread_call_with_gvl): declared. + Thu Jun 9 23:34:01 2011 CHIKANAGA Tomoyuki * gc.c (rb_objspace_call_finalizer): use rb_typeddata_is_kind_of() for diff --git a/internal.h b/internal.h index 2ef1fc4337..bf1fcd5bb3 100644 --- a/internal.h +++ b/internal.h @@ -30,6 +30,9 @@ VALUE rb_big_uminus(VALUE x); VALUE rb_obj_is_thread(VALUE obj); VALUE rb_obj_is_mutex(VALUE obj); +void rb_thread_execute_interrupts(VALUE th); +void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1); + #if defined(__cplusplus) #if 0 { /* satisfy cc-mode */ diff --git a/io.c b/io.c index 12002be927..50892e83e6 100644 --- a/io.c +++ b/io.c @@ -14,6 +14,7 @@ #include "ruby/ruby.h" #include "ruby/io.h" #include "dln.h" +#include "internal.h" #include #include @@ -8517,13 +8518,57 @@ struct copy_stream_struct { VALUE th; }; +static void * +exec_interrupts(void *arg) +{ + VALUE th = (VALUE)arg; + rb_thread_execute_interrupts(th); + return NULL; +} + +/* + * returns TRUE if the preceding system call was interrupted + * so we can continue. If the thread was interrupted, we + * reacquire the GVL to execute interrupts before continuing. + */ static int -maygvl_copy_stream_wait_read(struct copy_stream_struct *stp) +maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp) +{ + switch (errno) { + case EINTR: +#if defined(ERESTART) + case ERESTART: +#endif + if (rb_thread_interrupted(stp->th)) + if (has_gvl) + rb_thread_execute_interrupts(stp->th); + else + rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th); + return TRUE; + } + return FALSE; +} + +static int +maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout) +{ + if (has_gvl) + return rb_thread_fd_select(n, rfds, wfds, efds, timeout); + else + return rb_fd_select(n, rfds, wfds, efds, timeout); +} + +static int +maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp) { int ret; - rb_fd_zero(&stp->fds); - rb_fd_set(stp->src_fd, &stp->fds); - ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL); + + do { + rb_fd_zero(&stp->fds); + rb_fd_set(stp->src_fd, &stp->fds); + ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL); + } while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp)); + if (ret == -1) { stp->syserr = "select"; stp->error_no = errno; @@ -8536,9 +8581,13 @@ static int nogvl_copy_stream_wait_write(struct copy_stream_struct *stp) { int ret; - rb_fd_zero(&stp->fds); - rb_fd_set(stp->dst_fd, &stp->fds); - ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL); + + do { + rb_fd_zero(&stp->fds); + rb_fd_set(stp->dst_fd, &stp->fds); + ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL); + } while (ret == -1 && maygvl_copy_stream_continue_p(0, stp)); + if (ret == -1) { stp->syserr = "select"; stp->error_no = errno; @@ -8600,13 +8649,13 @@ simple_sendfile(int out_fd, int in_fd, off_t *offset, off_t count) #ifdef USE_SENDFILE static int -maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp) +maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp) { int ret; rb_fd_zero(&stp->fds); rb_fd_set(stp->src_fd, &stp->fds); rb_fd_set(stp->dst_fd, &stp->fds); - ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL); + ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL); if (ret == -1) { stp->syserr = "select"; stp->error_no = errno; @@ -8685,6 +8734,8 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) } } if (ss == -1) { + if (maygvl_copy_stream_continue_p(0, stp)) + goto retry_sendfile; switch (errno) { case EINVAL: #ifdef ENOSYS @@ -8695,9 +8746,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif - if (maygvl_copy_stream_wait_readwrite(stp) == -1) - return -1; - if (rb_thread_interrupted(stp->th)) + if (maygvl_copy_stream_wait_readwrite(0, stp) == -1) return -1; goto retry_sendfile; } @@ -8710,12 +8759,22 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp) #endif static ssize_t -maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset) +maygvl_read(int has_gvl, int fd, void *buf, size_t count) +{ + if (has_gvl) + return rb_read_internal(fd, buf, count); + else + return read(fd, buf, count); +} + +static ssize_t +maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset) { ssize_t ss; retry_read: - if (offset == (off_t)-1) - ss = read(stp->src_fd, buf, len); + if (offset == (off_t)-1) { + ss = maygvl_read(has_gvl, stp->src_fd, buf, len); + } else { #ifdef HAVE_PREAD ss = pread(stp->src_fd, buf, len, offset); @@ -8728,12 +8787,14 @@ maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, o return 0; } if (ss == -1) { + if (maygvl_copy_stream_continue_p(has_gvl, stp)) + goto retry_read; switch (errno) { case EAGAIN: #if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN case EWOULDBLOCK: #endif - if (maygvl_copy_stream_wait_read(stp) == -1) + if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1) return -1; goto retry_read; #ifdef ENOSYS @@ -8757,6 +8818,8 @@ nogvl_copy_stream_write(struct copy_stream_struct *stp, char *buf, size_t len) while (len) { ss = write(stp->dst_fd, buf+off, len); if (ss == -1) { + if (maygvl_copy_stream_continue_p(0, stp)) + continue; if (errno == EAGAIN || errno == EWOULDBLOCK) { if (nogvl_copy_stream_wait_write(stp) == -1) return -1; @@ -8811,12 +8874,12 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp) len = sizeof(buf); } if (use_pread) { - ss = maygvl_copy_stream_read(stp, buf, len, src_offset); + ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset); if (0 < ss) src_offset += ss; } else { - ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1); + ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1); } if (ss <= 0) /* EOF or error */ return; @@ -8827,9 +8890,6 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp) if (!use_eof) copy_length -= ss; - - if (rb_thread_interrupted(stp->th)) - return; } } @@ -8890,7 +8950,7 @@ copy_stream_fallback_body(VALUE arg) ssize_t ss; rb_thread_wait_fd(stp->src_fd); rb_str_resize(buf, buflen); - ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off); + ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off); if (ss == -1) return Qnil; if (ss == 0) diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb index de78b71c6a..0a100b1457 100644 --- a/test/ruby/test_io.rb +++ b/test/ruby/test_io.rb @@ -78,6 +78,14 @@ class TestIO < Test::Unit::TestCase } end + def trapping_usr1 + @usr1_rcvd = 0 + trap(:USR1) { @usr1_rcvd += 1 } + yield + ensure + trap(:USR1, "DEFAULT") + end + def test_pipe r, w = IO.pipe assert_instance_of(IO, r) @@ -594,6 +602,30 @@ class TestIO < Test::Unit::TestCase result = t.value assert_equal(megacontent, result) } + with_socketpair {|s1, s2| + begin + s1.nonblock = true + rescue Errno::EBADF + skip "nonblocking IO for pipe is not implemented" + end + trapping_usr1 do + nr = 10 + pid = fork do + s1.close + IO.select([s2]) + Process.kill(:USR1, Process.ppid) + s2.read + end + s2.close + nr.times do + assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1) + end + assert_equal(1, @usr1_rcvd) + s1.close + _, status = Process.waitpid2(pid) + assert status.success?, status.inspect + end + } end } end diff --git a/thread.c b/thread.c index 34f3ad8629..433e7be4fb 100644 --- a/thread.c +++ b/thread.c @@ -46,6 +46,7 @@ #include "eval_intern.h" #include "gc.h" +#include "internal.h" #include "ruby/io.h" #ifndef USE_NATIVE_THREAD_PRIORITY @@ -1358,6 +1359,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th) rb_threadptr_execute_interrupts_rec(th, 0); } +void +rb_thread_execute_interrupts(VALUE th) +{ + rb_threadptr_execute_interrupts_rec((rb_thread_t *)th, 0); +} + void rb_gc_mark_threads(void) { diff --git a/vm_core.h b/vm_core.h index e302e62a4b..f0bb86aedf 100644 --- a/vm_core.h +++ b/vm_core.h @@ -656,7 +656,6 @@ void rb_vm_gvl_destroy(rb_vm_t *vm); void rb_thread_start_timer_thread(void); void rb_thread_stop_timer_thread(void); void rb_thread_reset_timer_thread(void); -void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1); int ruby_thread_has_gvl_p(void); VALUE rb_make_backtrace(void); typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE); -- cgit v1.2.3