From 58b325366dbc5c84be12fb336ee5e68f208d9365 Mon Sep 17 00:00:00 2001 From: nobu Date: Sat, 12 Feb 2011 05:44:23 +0000 Subject: * thread.c (rb_thread_io_blocking_region): new function to run blocking region with GIL released, for fd. * thread.c (rb_thread_fd_close): implement. [ruby-core:35203] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@30852 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 7 ++++++- ext/socket/basicsocket.c | 2 +- ext/socket/init.c | 6 +++--- ext/socket/rubysocket.h | 1 + ext/socket/udpsocket.c | 2 +- ext/socket/unixsocket.c | 4 ++-- include/ruby/intern.h | 1 + io.c | 9 ++++---- test/socket/test_socket.rb | 20 ++++++++++++++++++ thread.c | 52 +++++++++++++++++++++++++++++++++++++++++++++- vm_core.h | 3 +++ 11 files changed, 94 insertions(+), 13 deletions(-) diff --git a/ChangeLog b/ChangeLog index 127069eb36..7b2fbda460 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,4 +1,9 @@ -Sat Feb 12 14:42:11 2011 Nobuyoshi Nakada +Sat Feb 12 14:44:20 2011 Nobuyoshi Nakada + + * thread.c (rb_thread_io_blocking_region): new function to run + blocking region with GIL released, for fd. + + * thread.c (rb_thread_fd_close): implement. [ruby-core:35203] * vm.c (th_init): rename from th_init2. diff --git a/ext/socket/basicsocket.c b/ext/socket/basicsocket.c index e744ad27d3..aa99041664 100644 --- a/ext/socket/basicsocket.c +++ b/ext/socket/basicsocket.c @@ -559,7 +559,7 @@ rsock_bsock_send(int argc, VALUE *argv, VALUE sock) arg.fd = fptr->fd; arg.flags = NUM2INT(flags); while (rb_thread_fd_writable(arg.fd), - (n = (int)BLOCKING_REGION(func, &arg)) < 0) { + (n = (int)BLOCKING_REGION_FD(func, &arg)) < 0) { if (rb_io_wait_writable(arg.fd)) { continue; } diff --git a/ext/socket/init.c b/ext/socket/init.c index ebc9031150..8fa907f1dc 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -129,7 +129,7 @@ rsock_s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from) while (rb_io_check_closed(fptr), rb_thread_wait_fd(arg.fd), - (slen = BLOCKING_REGION(recvfrom_blocking, &arg)) < 0) { + (slen = BLOCKING_REGION_FD(recvfrom_blocking, &arg)) < 0) { if (!rb_io_wait_readable(fptr->fd)) { rb_sys_fail("recvfrom(2)"); } @@ -380,7 +380,7 @@ rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks) if (socks) func = socks_connect_blocking; #endif for (;;) { - status = (int)BLOCKING_REGION(func, &arg); + status = (int)BLOCKING_REGION_FD(func, &arg); if (status < 0) { switch (errno) { case EAGAIN: @@ -515,7 +515,7 @@ rsock_s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len) arg.len = len; retry: rb_thread_wait_fd(fd); - fd2 = (int)BLOCKING_REGION(accept_blocking, &arg); + fd2 = (int)BLOCKING_REGION_FD(accept_blocking, &arg); if (fd2 < 0) { switch (errno) { case EMFILE: diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index 60ac25fbbd..a80c93e1fd 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -197,6 +197,7 @@ int Rconnect(); #include "constdefs.h" #define BLOCKING_REGION(func, arg) (long)rb_thread_blocking_region((func), (arg), RUBY_UBF_IO, 0) +#define BLOCKING_REGION_FD(func, arg) (long)rb_thread_io_blocking_region((func), (arg), (arg)->fd) #define SockAddrStringValue(v) rsock_sockaddr_string_value(&(v)) #define SockAddrStringValuePtr(v) rsock_sockaddr_string_value_ptr(&(v)) diff --git a/ext/socket/udpsocket.c b/ext/socket/udpsocket.c index b278523eba..bd62d89872 100644 --- a/ext/socket/udpsocket.c +++ b/ext/socket/udpsocket.c @@ -177,7 +177,7 @@ udp_send(int argc, VALUE *argv, VALUE sock) arg.to = res->ai_addr; arg.tolen = res->ai_addrlen; rb_thread_fd_writable(arg.fd); - n = (int)BLOCKING_REGION(rsock_sendto_blocking, &arg); + n = (int)BLOCKING_REGION_FD(rsock_sendto_blocking, &arg); if (n >= 0) { freeaddrinfo(res0); return INT2FIX(n); diff --git a/ext/socket/unixsocket.c b/ext/socket/unixsocket.c index 25a2c1e331..5a8641acc2 100644 --- a/ext/socket/unixsocket.c +++ b/ext/socket/unixsocket.c @@ -249,7 +249,7 @@ unix_send_io(VALUE sock, VALUE val) arg.fd = fptr->fd; rb_thread_fd_writable(arg.fd); - if ((int)BLOCKING_REGION(sendmsg_blocking, &arg) == -1) + if ((int)BLOCKING_REGION_FD(sendmsg_blocking, &arg) == -1) rb_sys_fail("sendmsg(2)"); return Qnil; @@ -335,7 +335,7 @@ unix_recv_io(int argc, VALUE *argv, VALUE sock) arg.fd = fptr->fd; rb_thread_wait_fd(arg.fd); - if ((int)BLOCKING_REGION(recvmsg_blocking, &arg) == -1) + if ((int)BLOCKING_REGION_FD(recvmsg_blocking, &arg) == -1) rb_sys_fail("recvmsg(2)"); #if FD_PASSING_BY_MSG_CONTROL diff --git a/include/ruby/intern.h b/include/ruby/intern.h index 9b32298bde..7321a022a9 100644 --- a/include/ruby/intern.h +++ b/include/ruby/intern.h @@ -797,6 +797,7 @@ void rb_thread_check_ints(void); int rb_thread_interrupted(VALUE thval); VALUE rb_thread_blocking_region(rb_blocking_function_t *func, void *data1, rb_unblock_function_t *ubf, void *data2); +VALUE rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd); #define RUBY_UBF_IO ((rb_unblock_function_t *)-1) #define RUBY_UBF_PROCESS ((rb_unblock_function_t *)-1) VALUE rb_mutex_new(void); diff --git a/io.c b/io.c index a50159771c..33c97a68d3 100644 --- a/io.c +++ b/io.c @@ -605,7 +605,7 @@ rb_read_internal(int fd, void *buf, size_t count) iis.buf = buf; iis.capa = count; - return (ssize_t)rb_thread_blocking_region(internal_read_func, &iis, RUBY_UBF_IO, 0); + return (ssize_t)rb_thread_io_blocking_region(internal_read_func, &iis, fd); } static ssize_t @@ -616,7 +616,7 @@ rb_write_internal(int fd, const void *buf, size_t count) iis.buf = buf; iis.capa = count; - return (ssize_t)rb_thread_blocking_region(internal_write_func, &iis, RUBY_UBF_IO, 0); + return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fd); } static long @@ -653,7 +653,8 @@ io_flush_buffer_sync(void *arg) static VALUE io_flush_buffer_async(VALUE arg) { - return rb_thread_blocking_region(io_flush_buffer_sync, (void *)arg, RUBY_UBF_IO, 0); + rb_io_t *fptr = (rb_io_t *)arg; + return rb_thread_io_blocking_region(io_flush_buffer_sync, fptr, fptr->fd); } static inline int @@ -7475,7 +7476,7 @@ do_io_advise(rb_io_t *fptr, VALUE advice, off_t offset, off_t len) ias.offset = offset; ias.len = len; - if (rv = (int)rb_thread_blocking_region(io_advise_internal, &ias, RUBY_UBF_IO, 0)) + if (rv = (int)rb_thread_io_blocking_region(io_advise_internal, &ias, fptr->fd)) /* posix_fadvise(2) doesn't set errno. On success it returns 0; otherwise it returns the error code. */ rb_syserr_fail(rv, RSTRING_PTR(fptr->pathv)); diff --git a/test/socket/test_socket.rb b/test/socket/test_socket.rb index 01d4652abb..b3b6fee5ef 100644 --- a/test/socket/test_socket.rb +++ b/test/socket/test_socket.rb @@ -410,4 +410,24 @@ class TestSocket < Test::Unit::TestCase assert_equal(stamp.data[-8,8].unpack("Q")[0], t.subsec * 2**64) end + def test_closed_read + require 'timeout' + require 'socket' + bug4390 = '[ruby-core:35203]' + server = TCPServer.new("localhost", 0) + serv_thread = Thread.new {server.accept} + begin sleep(0.1) end until serv_thread.stop? + sock = TCPSocket.new("localhost", server.addr[1]) + client_thread = Thread.new do + sock.readline + end + begin sleep(0.1) end until client_thread.stop? + Timeout.timeout(1) do + sock.close + sock = nil + assert_raise(IOError, bug4390) {client_thread.join} + end + ensure + server.close + end end if defined?(Socket) diff --git a/thread.c b/thread.c index b1a802b89c..7108f586af 100644 --- a/thread.c +++ b/thread.c @@ -73,6 +73,8 @@ static const VALUE eKillSignal = INT2FIX(0); static const VALUE eTerminateSignal = INT2FIX(1); static volatile int system_working = 1; +#define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] + inline static void st_delete_wrap(st_table *table, st_data_t key) { @@ -1122,6 +1124,7 @@ rb_thread_blocking_region( rb_thread_t *th = GET_THREAD(); int saved_errno = 0; + th->waiting_fd = -1; if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { ubf = ubf_select; data2 = th; @@ -1136,6 +1139,23 @@ rb_thread_blocking_region( return val; } +VALUE +rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) +{ + VALUE val; + rb_thread_t *th = GET_THREAD(); + int saved_errno = 0; + + th->waiting_fd = fd; + BLOCKING_REGION({ + val = func(data1); + saved_errno = errno; + }, ubf_select, th); + errno = saved_errno; + + return val; +} + /* alias of rb_thread_blocking_region() */ VALUE @@ -1427,10 +1447,36 @@ rb_threadptr_reset_raised(rb_thread_t *th) return 1; } +#define THREAD_IO_WAITING_P(th) ( \ + ((th)->status == THREAD_STOPPED || \ + (th)->status == THREAD_STOPPED_FOREVER) && \ + (th)->blocking_region_buffer && \ + (th)->unblock.func == ubf_select && \ + 1) + +static int +thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) +{ + int fd = (int)data; + rb_thread_t *th; + GetThreadPtr((VALUE)key, th); + + if (THREAD_IO_WAITING_P(th)) { + native_mutex_lock(&th->interrupt_lock); + if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) { + th->errinfo = th->vm->special_exceptions[ruby_error_closed_stream]; + RUBY_VM_SET_INTERRUPT(th); + (th->unblock.func)(th->unblock.arg); + } + native_mutex_unlock(&th->interrupt_lock); + } + return ST_CONTINUE; +} + void rb_thread_fd_close(int fd) { - /* TODO: fix me */ + st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd); } /* @@ -4362,6 +4408,10 @@ Init_Thread(void) rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); + closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed"); + OBJ_TAINT(closed_stream_error); + OBJ_FREEZE(closed_stream_error); + cThGroup = rb_define_class("ThreadGroup", rb_cObject); rb_define_alloc_func(cThGroup, thgroup_s_alloc); rb_define_method(cThGroup, "list", thgroup_list, 0); diff --git a/vm_core.h b/vm_core.h index 997ed38f13..257bdce668 100644 --- a/vm_core.h +++ b/vm_core.h @@ -260,6 +260,7 @@ enum ruby_special_exceptions { ruby_error_reenter, ruby_error_nomemory, ruby_error_sysstack, + ruby_error_closed_stream, ruby_special_error_count }; @@ -395,6 +396,8 @@ typedef struct rb_thread_struct { /* passing state */ int state; + int waiting_fd; + /* for rb_iterate */ const rb_block_t *passed_block; -- cgit v1.2.3