summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ChangeLog13
-rw-r--r--internal.h3
-rw-r--r--io.c104
-rw-r--r--test/ruby/test_io.rb32
-rw-r--r--thread.c7
-rw-r--r--vm_core.h1
6 files changed, 137 insertions, 23 deletions
diff --git a/ChangeLog b/ChangeLog
index f347f83e28..fc0a658dc8 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,16 @@
+Thu Jun 9 23:57:53 2011 Tanaka Akira <akr@fsij.org>
+
+ * io.c: fix IO.copy_stream interrupt handling.
+ based on the patch by Eric Wong. [ruby-core:36156]
+
+ * vm_core.h (rb_thread_call_with_gvl): don't declare here.
+
+ * thread.c: include internal.h.
+ (rb_thread_execute_interrupts): new function.
+
+ * internal.h (rb_thread_execute_interrupts): declared.
+ (rb_thread_call_with_gvl): declared.
+
Thu Jun 9 23:34:01 2011 CHIKANAGA Tomoyuki <nagachika00@gmail.com>
* gc.c (rb_objspace_call_finalizer): use rb_typeddata_is_kind_of() for
diff --git a/internal.h b/internal.h
index 2ef1fc4337..bf1fcd5bb3 100644
--- a/internal.h
+++ b/internal.h
@@ -30,6 +30,9 @@ VALUE rb_big_uminus(VALUE x);
VALUE rb_obj_is_thread(VALUE obj);
VALUE rb_obj_is_mutex(VALUE obj);
+void rb_thread_execute_interrupts(VALUE th);
+void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
+
#if defined(__cplusplus)
#if 0
{ /* satisfy cc-mode */
diff --git a/io.c b/io.c
index 12002be927..50892e83e6 100644
--- a/io.c
+++ b/io.c
@@ -14,6 +14,7 @@
#include "ruby/ruby.h"
#include "ruby/io.h"
#include "dln.h"
+#include "internal.h"
#include <ctype.h>
#include <errno.h>
@@ -8517,13 +8518,57 @@ struct copy_stream_struct {
VALUE th;
};
+static void *
+exec_interrupts(void *arg)
+{
+ VALUE th = (VALUE)arg;
+ rb_thread_execute_interrupts(th);
+ return NULL;
+}
+
+/*
+ * returns TRUE if the preceding system call was interrupted
+ * so we can continue. If the thread was interrupted, we
+ * reacquire the GVL to execute interrupts before continuing.
+ */
static int
-maygvl_copy_stream_wait_read(struct copy_stream_struct *stp)
+maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
+{
+ switch (errno) {
+ case EINTR:
+#if defined(ERESTART)
+ case ERESTART:
+#endif
+ if (rb_thread_interrupted(stp->th))
+ if (has_gvl)
+ rb_thread_execute_interrupts(stp->th);
+ else
+ rb_thread_call_with_gvl(exec_interrupts, (void *)stp->th);
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static int
+maygvl_select(int has_gvl, int n, rb_fdset_t *rfds, rb_fdset_t *wfds, rb_fdset_t *efds, struct timeval *timeout)
+{
+ if (has_gvl)
+ return rb_thread_fd_select(n, rfds, wfds, efds, timeout);
+ else
+ return rb_fd_select(n, rfds, wfds, efds, timeout);
+}
+
+static int
+maygvl_copy_stream_wait_read(int has_gvl, struct copy_stream_struct *stp)
{
int ret;
- rb_fd_zero(&stp->fds);
- rb_fd_set(stp->src_fd, &stp->fds);
- ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+
+ do {
+ rb_fd_zero(&stp->fds);
+ rb_fd_set(stp->src_fd, &stp->fds);
+ ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+ } while (ret == -1 && maygvl_copy_stream_continue_p(has_gvl, stp));
+
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
@@ -8536,9 +8581,13 @@ static int
nogvl_copy_stream_wait_write(struct copy_stream_struct *stp)
{
int ret;
- rb_fd_zero(&stp->fds);
- rb_fd_set(stp->dst_fd, &stp->fds);
- ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+
+ do {
+ rb_fd_zero(&stp->fds);
+ rb_fd_set(stp->dst_fd, &stp->fds);
+ ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+ } while (ret == -1 && maygvl_copy_stream_continue_p(0, stp));
+
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
@@ -8600,13 +8649,13 @@ simple_sendfile(int out_fd, int in_fd, off_t *offset, off_t count)
#ifdef USE_SENDFILE
static int
-maygvl_copy_stream_wait_readwrite(struct copy_stream_struct *stp)
+maygvl_copy_stream_wait_readwrite(int has_gvl, struct copy_stream_struct *stp)
{
int ret;
rb_fd_zero(&stp->fds);
rb_fd_set(stp->src_fd, &stp->fds);
rb_fd_set(stp->dst_fd, &stp->fds);
- ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+ ret = maygvl_select(has_gvl, rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
if (ret == -1) {
stp->syserr = "select";
stp->error_no = errno;
@@ -8685,6 +8734,8 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
}
}
if (ss == -1) {
+ if (maygvl_copy_stream_continue_p(0, stp))
+ goto retry_sendfile;
switch (errno) {
case EINVAL:
#ifdef ENOSYS
@@ -8695,9 +8746,7 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
- if (maygvl_copy_stream_wait_readwrite(stp) == -1)
- return -1;
- if (rb_thread_interrupted(stp->th))
+ if (maygvl_copy_stream_wait_readwrite(0, stp) == -1)
return -1;
goto retry_sendfile;
}
@@ -8710,12 +8759,22 @@ nogvl_copy_stream_sendfile(struct copy_stream_struct *stp)
#endif
static ssize_t
-maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
+maygvl_read(int has_gvl, int fd, void *buf, size_t count)
+{
+ if (has_gvl)
+ return rb_read_internal(fd, buf, count);
+ else
+ return read(fd, buf, count);
+}
+
+static ssize_t
+maygvl_copy_stream_read(int has_gvl, struct copy_stream_struct *stp, char *buf, size_t len, off_t offset)
{
ssize_t ss;
retry_read:
- if (offset == (off_t)-1)
- ss = read(stp->src_fd, buf, len);
+ if (offset == (off_t)-1) {
+ ss = maygvl_read(has_gvl, stp->src_fd, buf, len);
+ }
else {
#ifdef HAVE_PREAD
ss = pread(stp->src_fd, buf, len, offset);
@@ -8728,12 +8787,14 @@ maygvl_copy_stream_read(struct copy_stream_struct *stp, char *buf, size_t len, o
return 0;
}
if (ss == -1) {
+ if (maygvl_copy_stream_continue_p(has_gvl, stp))
+ goto retry_read;
switch (errno) {
case EAGAIN:
#if defined(EWOULDBLOCK) && EWOULDBLOCK != EAGAIN
case EWOULDBLOCK:
#endif
- if (maygvl_copy_stream_wait_read(stp) == -1)
+ if (maygvl_copy_stream_wait_read(has_gvl, stp) == -1)
return -1;
goto retry_read;
#ifdef ENOSYS
@@ -8757,6 +8818,8 @@ nogvl_copy_stream_write(struct copy_stream_struct *stp, char *buf, size_t len)
while (len) {
ss = write(stp->dst_fd, buf+off, len);
if (ss == -1) {
+ if (maygvl_copy_stream_continue_p(0, stp))
+ continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (nogvl_copy_stream_wait_write(stp) == -1)
return -1;
@@ -8811,12 +8874,12 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp)
len = sizeof(buf);
}
if (use_pread) {
- ss = maygvl_copy_stream_read(stp, buf, len, src_offset);
+ ss = maygvl_copy_stream_read(0, stp, buf, len, src_offset);
if (0 < ss)
src_offset += ss;
}
else {
- ss = maygvl_copy_stream_read(stp, buf, len, (off_t)-1);
+ ss = maygvl_copy_stream_read(0, stp, buf, len, (off_t)-1);
}
if (ss <= 0) /* EOF or error */
return;
@@ -8827,9 +8890,6 @@ nogvl_copy_stream_read_write(struct copy_stream_struct *stp)
if (!use_eof)
copy_length -= ss;
-
- if (rb_thread_interrupted(stp->th))
- return;
}
}
@@ -8890,7 +8950,7 @@ copy_stream_fallback_body(VALUE arg)
ssize_t ss;
rb_thread_wait_fd(stp->src_fd);
rb_str_resize(buf, buflen);
- ss = maygvl_copy_stream_read(stp, RSTRING_PTR(buf), l, off);
+ ss = maygvl_copy_stream_read(1, stp, RSTRING_PTR(buf), l, off);
if (ss == -1)
return Qnil;
if (ss == 0)
diff --git a/test/ruby/test_io.rb b/test/ruby/test_io.rb
index de78b71c6a..0a100b1457 100644
--- a/test/ruby/test_io.rb
+++ b/test/ruby/test_io.rb
@@ -78,6 +78,14 @@ class TestIO < Test::Unit::TestCase
}
end
+ def trapping_usr1
+ @usr1_rcvd = 0
+ trap(:USR1) { @usr1_rcvd += 1 }
+ yield
+ ensure
+ trap(:USR1, "DEFAULT")
+ end
+
def test_pipe
r, w = IO.pipe
assert_instance_of(IO, r)
@@ -594,6 +602,30 @@ class TestIO < Test::Unit::TestCase
result = t.value
assert_equal(megacontent, result)
}
+ with_socketpair {|s1, s2|
+ begin
+ s1.nonblock = true
+ rescue Errno::EBADF
+ skip "nonblocking IO for pipe is not implemented"
+ end
+ trapping_usr1 do
+ nr = 10
+ pid = fork do
+ s1.close
+ IO.select([s2])
+ Process.kill(:USR1, Process.ppid)
+ s2.read
+ end
+ s2.close
+ nr.times do
+ assert_equal megacontent.bytesize, IO.copy_stream("megasrc", s1)
+ end
+ assert_equal(1, @usr1_rcvd)
+ s1.close
+ _, status = Process.waitpid2(pid)
+ assert status.success?, status.inspect
+ end
+ }
end
}
end
diff --git a/thread.c b/thread.c
index 34f3ad8629..433e7be4fb 100644
--- a/thread.c
+++ b/thread.c
@@ -46,6 +46,7 @@
#include "eval_intern.h"
#include "gc.h"
+#include "internal.h"
#include "ruby/io.h"
#ifndef USE_NATIVE_THREAD_PRIORITY
@@ -1359,6 +1360,12 @@ rb_threadptr_execute_interrupts(rb_thread_t *th)
}
void
+rb_thread_execute_interrupts(VALUE th)
+{
+ rb_threadptr_execute_interrupts_rec((rb_thread_t *)th, 0);
+}
+
+void
rb_gc_mark_threads(void)
{
rb_bug("deprecated function rb_gc_mark_threads is called");
diff --git a/vm_core.h b/vm_core.h
index e302e62a4b..f0bb86aedf 100644
--- a/vm_core.h
+++ b/vm_core.h
@@ -656,7 +656,6 @@ void rb_vm_gvl_destroy(rb_vm_t *vm);
void rb_thread_start_timer_thread(void);
void rb_thread_stop_timer_thread(void);
void rb_thread_reset_timer_thread(void);
-void *rb_thread_call_with_gvl(void *(*func)(void *), void *data1);
int ruby_thread_has_gvl_p(void);
VALUE rb_make_backtrace(void);
typedef int rb_backtrace_iter_func(void *, VALUE, int, VALUE);