summaryrefslogtreecommitdiff
path: root/ext/socket
diff options
context:
space:
mode:
authornobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-07-06 14:49:00 +0000
committernobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-07-06 14:49:00 +0000
commit2754e843b6e38ec9f91070e202abff41e88d4c6f (patch)
treeda3786e73758a90ebe6a46a51345f5c52d2357b0 /ext/socket
parent7d82b3dc3cb1524a82e2161a039511824adc119b (diff)
* ext/socket/socket.c (bsock_send, s_recvfrom, ruby_connect, s_accept),
(udp_send, unix_send_io, unix_recv_io): blocking region support. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@17907 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'ext/socket')
-rw-r--r--ext/socket/socket.c363
1 files changed, 213 insertions, 150 deletions
diff --git a/ext/socket/socket.c b/ext/socket/socket.c
index 1e113ef1b8..c32c6133b5 100644
--- a/ext/socket/socket.c
+++ b/ext/socket/socket.c
@@ -102,6 +102,8 @@ int Rconnect();
#endif
#endif
+#define BLOCKING_REGION(func, arg) (long)rb_thread_blocking_region((func), (arg), RB_UBF_DFL, 0)
+
#define INET_CLIENT 0
#define INET_SERVER 1
#define INET_SOCKS 2
@@ -498,37 +500,60 @@ bsock_getpeername(VALUE sock)
return rb_str_new(buf, len);
}
+struct send_arg {
+ int fd, flags;
+ VALUE mesg;
+ struct sockaddr *to;
+ socklen_t tolen;
+};
+
+static VALUE
+sendto_blocking(void *data)
+{
+ struct send_arg *arg = data;
+ VALUE mesg = arg->mesg;
+ return (VALUE)sendto(arg->fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg),
+ arg->flags, arg->to, arg->tolen);
+}
+
+static VALUE
+send_blocking(void *data)
+{
+ struct send_arg *arg = data;
+ VALUE mesg = arg->mesg;
+ return (VALUE)send(arg->fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg),
+ arg->flags);
+}
+
static VALUE
bsock_send(int argc, VALUE *argv, VALUE sock)
{
- VALUE mesg, to;
- VALUE flags;
+ struct send_arg arg;
+ VALUE flags, to;
rb_io_t *fptr;
- int fd, n;
+ int n;
+ rb_blocking_function_t *func;
rb_secure(4);
- rb_scan_args(argc, argv, "21", &mesg, &flags, &to);
+ rb_scan_args(argc, argv, "21", &arg.mesg, &flags, &to);
- StringValue(mesg);
- if (!NIL_P(to)) StringValue(to);
- GetOpenFile(sock, fptr);
- fd = fptr->fd;
- rb_thread_fd_writable(fd);
- retry:
+ StringValue(arg.mesg);
if (!NIL_P(to)) {
- TRAP_BEG;
- n = sendto(fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg), NUM2INT(flags),
- (struct sockaddr*)RSTRING_PTR(to), RSTRING_LEN(to));
- TRAP_END;
+ StringValue(to);
+ to = rb_str_new4(to);
+ arg.to = (struct sockaddr *)RSTRING_PTR(to);
+ arg.tolen = RSTRING_LEN(to);
+ func = sendto_blocking;
}
else {
- TRAP_BEG;
- n = send(fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg), NUM2INT(flags));
- TRAP_END;
+ func = send_blocking;
}
- if (n < 0) {
- if (rb_io_wait_writable(fd)) {
- goto retry;
+ GetOpenFile(sock, fptr);
+ arg.fd = fptr->fd;
+ arg.flags = NUM2INT(flags);
+ while ((n = (int)BLOCKING_REGION(func, &arg)) < 0) {
+ if (rb_io_wait_writable(arg.fd)) {
+ continue;
}
rb_sys_fail("send(2)");
}
@@ -572,72 +597,80 @@ enum sock_recv_type {
RECV_SOCKET /* Socket#recvfrom */
};
+struct recvfrom_arg {
+ int fd, flags;
+ VALUE str;
+ socklen_t alen;
+ char buf[1024];
+};
+
+static VALUE
+recvfrom_blocking(void *data)
+{
+ struct recvfrom_arg *arg = data;
+ return (VALUE)recvfrom(arg->fd, RSTRING_PTR(arg->str), RSTRING_LEN(arg->str),
+ arg->flags, (struct sockaddr*)arg->buf, &arg->alen);
+}
+
static VALUE
s_recvfrom(VALUE sock, int argc, VALUE *argv, enum sock_recv_type from)
{
rb_io_t *fptr;
- VALUE str;
- char buf[1024];
- socklen_t alen = sizeof buf;
+ VALUE str, klass;
+ struct recvfrom_arg arg;
VALUE len, flg;
long buflen;
long slen;
- int fd, flags;
rb_scan_args(argc, argv, "11", &len, &flg);
- if (flg == Qnil) flags = 0;
- else flags = NUM2INT(flg);
+ if (flg == Qnil) arg.flags = 0;
+ else arg.flags = NUM2INT(flg);
buflen = NUM2INT(len);
GetOpenFile(sock, fptr);
if (rb_io_read_pending(fptr)) {
rb_raise(rb_eIOError, "recv for buffered IO");
}
- fd = fptr->fd;
-
- str = rb_tainted_str_new(0, buflen);
+ arg.fd = fptr->fd;
+ arg.alen = sizeof(arg.buf);
- retry:
- rb_thread_wait_fd(fd);
- rb_io_check_closed(fptr);
- if (RSTRING_LEN(str) != buflen) {
- rb_raise(rb_eRuntimeError, "buffer string modified");
- }
- TRAP_BEG;
- slen = recvfrom(fd, RSTRING_PTR(str), buflen, flags, (struct sockaddr*)buf, &alen);
- TRAP_END;
+ arg.str = str = rb_tainted_str_new(0, buflen);
+ klass = RBASIC(str)->klass;
+ RBASIC(str)->klass = 0;
- if (slen < 0) {
- if (rb_io_wait_readable(fd)) {
- goto retry;
+ while (rb_io_check_closed(fptr),
+ (slen = BLOCKING_REGION(recvfrom_blocking, &arg)) < 0) {
+ if (RBASIC(str)->klass || RSTRING_LEN(str) != buflen) {
+ rb_raise(rb_eRuntimeError, "buffer string modified");
}
- rb_sys_fail("recvfrom(2)");
}
+
+ RBASIC(str)->klass = klass;
if (slen < RSTRING_LEN(str)) {
rb_str_set_len(str, slen);
}
rb_obj_taint(str);
switch (from) {
case RECV_RECV:
- return (VALUE)str;
+ return str;
case RECV_IP:
#if 0
- if (alen != sizeof(struct sockaddr_in)) {
+ if (arg.alen != sizeof(struct sockaddr_in)) {
rb_raise(rb_eTypeError, "sockaddr size differs - should not happen");
}
#endif
- if (alen && alen != sizeof(buf)) /* OSX doesn't return a from result for connection-oriented sockets */
- return rb_assoc_new(str, ipaddr((struct sockaddr*)buf, fptr->mode & FMODE_NOREVLOOKUP));
+ if (arg.alen && arg.alen != sizeof(arg.buf)) /* OSX doesn't return a from result for connection-oriented sockets */
+ return rb_assoc_new(str, ipaddr((struct sockaddr*)arg.buf, fptr->mode & FMODE_NOREVLOOKUP));
else
return rb_assoc_new(str, Qnil);
#ifdef HAVE_SYS_UN_H
case RECV_UNIX:
- return rb_assoc_new(str, unixaddr((struct sockaddr_un*)buf, alen));
+ return rb_assoc_new(str, unixaddr((struct sockaddr_un*)arg.buf, arg.alen));
#endif
case RECV_SOCKET:
- return rb_assoc_new(str, rb_str_new(buf, alen));
+ return rb_assoc_new(str, rb_str_new(arg.buf, arg.alen));
default:
rb_bug("s_recvfrom called with bad value");
}
@@ -921,19 +954,20 @@ sock_addrinfo(VALUE host, VALUE port, int socktype, int flags)
#if defined(__APPLE__) && defined(__MACH__)
{
- struct addrinfo *r;
- r = res;
- while (r) {
- if (! r->ai_socktype) r->ai_socktype = hints.ai_socktype;
- if (! r->ai_protocol) {
- if (r->ai_socktype == SOCK_DGRAM) {
- r->ai_protocol = IPPROTO_UDP;
- } else if (r->ai_socktype == SOCK_STREAM) {
- r->ai_protocol = IPPROTO_TCP;
- }
- }
- r = r->ai_next;
- }
+ struct addrinfo *r;
+ r = res;
+ while (r) {
+ if (! r->ai_socktype) r->ai_socktype = hints.ai_socktype;
+ if (! r->ai_protocol) {
+ if (r->ai_socktype == SOCK_DGRAM) {
+ r->ai_protocol = IPPROTO_UDP;
+ }
+ else if (r->ai_socktype == SOCK_STREAM) {
+ r->ai_protocol = IPPROTO_TCP;
+ }
+ }
+ r = r->ai_next;
+ }
}
#endif
return res;
@@ -1100,49 +1134,48 @@ wait_connectable(int fd)
#define WAIT_IN_PROGRESS 1
#endif
+struct connect_arg {
+ int fd;
+ const struct sockaddr *sockaddr;
+ socklen_t len;
+};
+
+static VALUE
+connect_blocking(void *data)
+{
+ struct connect_arg *arg = data;
+ return (VALUE)connect(arg->fd, arg->sockaddr, arg->len);
+}
+
+#if defined(SOCKS) && !defined(SOCKS5)
+static VALUE
+socks_connect_blocking(void *data)
+{
+ struct connect_arg *arg = data;
+ return (VALUE)Rconnect(arg->fd, arg->sockaddr, arg->len);
+}
+#endif
+
static int
-ruby_connect(int fd, struct sockaddr *sockaddr, int len, int socks)
+ruby_connect(int fd, const struct sockaddr *sockaddr, int len, int socks)
{
int status;
- int mode;
+ rb_blocking_function_t *func = connect_blocking;
+ struct connect_arg arg;
#if WAIT_IN_PROGRESS > 0
int wait_in_progress = -1;
int sockerr;
socklen_t sockerrlen;
#endif
-#if defined(HAVE_FCNTL)
-# if defined(F_GETFL)
- mode = fcntl(fd, F_GETFL, 0);
-# else
- mode = 0;
-# endif
-
-#ifdef O_NDELAY
-# define NONBLOCKING O_NDELAY
-#else
-#ifdef O_NBIO
-# define NONBLOCKING O_NBIO
-#else
-# define NONBLOCKING O_NONBLOCK
-#endif
-#endif
-#ifdef SOCKS5
- if (!socks)
-#endif
- fcntl(fd, F_SETFL, mode|NONBLOCKING);
-#endif /* HAVE_FCNTL */
-
- for (;;) {
+ arg.fd = fd;
+ arg.sockaddr = sockaddr;
+ arg.len = len;
#if defined(SOCKS) && !defined(SOCKS5)
- if (socks) {
- status = Rconnect(fd, sockaddr, len);
- }
- else
+ if (socks) func = socks_connect_blocking;
#endif
- {
- status = connect(fd, sockaddr, len);
- }
+ for (;;) {
+ status = (int)BLOCKING_REGION(func, &arg);
if (status < 0) {
switch (errno) {
case EAGAIN:
@@ -1203,9 +1236,6 @@ ruby_connect(int fd, struct sockaddr *sockaddr, int len, int socks)
break;
}
}
-#ifdef HAVE_FCNTL
- fcntl(fd, F_SETFL, mode);
-#endif
return status;
}
}
@@ -1244,7 +1274,7 @@ init_inetsock_internal(struct inetsock_arg *arg)
int type = arg->type;
struct addrinfo *res;
int fd, status = 0;
- const char *syscall;
+ const char *syscall = 0;
arg->remote.res = sock_addrinfo(arg->remote.host, arg->remote.serv, SOCK_STREAM,
(type == INET_SERVER) ? AI_PASSIVE : 0);
@@ -1495,22 +1525,32 @@ s_accept_nonblock(VALUE klass, rb_io_t *fptr, struct sockaddr *sockaddr, socklen
return init_sock(rb_obj_alloc(klass), fd2);
}
+struct accept_arg {
+ int fd;
+ struct sockaddr *sockaddr;
+ socklen_t *len;
+};
+
+static VALUE
+accept_blocking(void *data)
+{
+ struct accept_arg *arg = data;
+ return (VALUE)accept(arg->fd, arg->sockaddr, arg->len);
+}
+
static VALUE
s_accept(VALUE klass, int fd, struct sockaddr *sockaddr, socklen_t *len)
{
int fd2;
int retry = 0;
+ struct accept_arg arg;
rb_secure(3);
+ arg.fd = fd;
+ arg.sockaddr = sockaddr;
+ arg.len = len;
retry:
- rb_thread_wait_fd(fd);
-#if defined(_nec_ews)
- fd2 = accept(fd, sockaddr, len);
-#else
- TRAP_BEG;
- fd2 = accept(fd, sockaddr, len);
- TRAP_END;
-#endif
+ fd2 = (int)BLOCKING_REGION(accept_blocking, &arg);
if (fd2 < 0) {
switch (errno) {
case EMFILE:
@@ -1791,24 +1831,28 @@ udp_bind(VALUE sock, VALUE host, VALUE port)
static VALUE
udp_send(int argc, VALUE *argv, VALUE sock)
{
- VALUE mesg, flags, host, port;
+ VALUE flags, host, port;
rb_io_t *fptr;
int n;
struct addrinfo *res0, *res;
+ struct send_arg arg;
if (argc == 2 || argc == 3) {
return bsock_send(argc, argv, sock);
}
rb_secure(4);
- rb_scan_args(argc, argv, "4", &mesg, &flags, &host, &port);
+ rb_scan_args(argc, argv, "4", &arg.mesg, &flags, &host, &port);
- StringValue(mesg);
+ StringValue(arg.mesg);
res0 = sock_addrinfo(host, port, SOCK_DGRAM, 0);
GetOpenFile(sock, fptr);
+ arg.fd = fptr->fd;
+ arg.flags = NUM2INT(flags);
for (res = res0; res; res = res->ai_next) {
retry:
- n = sendto(fptr->fd, RSTRING_PTR(mesg), RSTRING_LEN(mesg), NUM2INT(flags),
- res->ai_addr, res->ai_addrlen);
+ arg.to = res->ai_addr;
+ arg.tolen = res->ai_addrlen;
+ n = (int)BLOCKING_REGION(sendto_blocking, &arg);
if (n >= 0) {
freeaddrinfo(res0);
return INT2FIX(n);
@@ -1924,13 +1968,25 @@ unix_recvfrom(int argc, VALUE *argv, VALUE sock)
#define FD_PASSING_BY_MSG_ACCRIGHTS 0
#endif
+struct iomsg_arg {
+ int fd;
+ struct msghdr msg;
+};
+
+static VALUE
+sendmsg_blocking(void *data)
+{
+ struct iomsg_arg *arg = data;
+ return sendmsg(arg->fd, &arg->msg, 0);
+}
+
static VALUE
unix_send_io(VALUE sock, VALUE val)
{
#if defined(HAVE_SENDMSG) && (FD_PASSING_BY_MSG_CONTROL || FD_PASSING_BY_MSG_ACCRIGHTS)
int fd;
rb_io_t *fptr;
- struct msghdr msg;
+ struct iomsg_arg arg;
struct iovec vec[1];
char buf[1];
@@ -1955,31 +2011,32 @@ unix_send_io(VALUE sock, VALUE val)
GetOpenFile(sock, fptr);
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
+ arg.msg.msg_name = NULL;
+ arg.msg.msg_namelen = 0;
/* Linux and Solaris doesn't work if msg_iov is NULL. */
buf[0] = '\0';
vec[0].iov_base = buf;
vec[0].iov_len = 1;
- msg.msg_iov = vec;
- msg.msg_iovlen = 1;
+ arg.msg.msg_iov = vec;
+ arg.msg.msg_iovlen = 1;
#if FD_PASSING_BY_MSG_CONTROL
- msg.msg_control = (caddr_t)&cmsg;
- msg.msg_controllen = CMSG_LEN(sizeof(int));
- msg.msg_flags = 0;
+ arg.msg.msg_control = (caddr_t)&cmsg;
+ arg.msg.msg_controllen = CMSG_LEN(sizeof(int));
+ arg.msg.msg_flags = 0;
MEMZERO((char*)&cmsg, char, sizeof(cmsg));
cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.hdr.cmsg_level = SOL_SOCKET;
cmsg.hdr.cmsg_type = SCM_RIGHTS;
*(int *)CMSG_DATA(&cmsg.hdr) = fd;
#else
- msg.msg_accrights = (caddr_t)&fd;
- msg.msg_accrightslen = sizeof(fd);
+ arg.msg.msg_accrights = (caddr_t)&fd;
+ arg.msg.msg_accrightslen = sizeof(fd);
#endif
- if (sendmsg(fptr->fd, &msg, 0) == -1)
+ arg.fd = fptr->fd;
+ if ((int)BLOCKING_REGION(sendmsg_blocking, &arg) == -1)
rb_sys_fail("sendmsg(2)");
return Qnil;
@@ -1990,12 +2047,19 @@ unix_send_io(VALUE sock, VALUE val)
}
static VALUE
+recvmsg_blocking(void *data)
+{
+ struct iomsg_arg *arg = data;
+ return recvmsg(arg->fd, &arg->msg, 0);
+}
+
+static VALUE
unix_recv_io(int argc, VALUE *argv, VALUE sock)
{
#if defined(HAVE_RECVMSG) && (FD_PASSING_BY_MSG_CONTROL || FD_PASSING_BY_MSG_ACCRIGHTS)
VALUE klass, mode;
rb_io_t *fptr;
- struct msghdr msg;
+ struct iomsg_arg arg;
struct iovec vec[2];
char buf[1];
@@ -2015,59 +2079,58 @@ unix_recv_io(int argc, VALUE *argv, VALUE sock)
GetOpenFile(sock, fptr);
- rb_io_wait_readable(fptr->fd);
-
- msg.msg_name = NULL;
- msg.msg_namelen = 0;
+ arg.msg.msg_name = NULL;
+ arg.msg.msg_namelen = 0;
vec[0].iov_base = buf;
vec[0].iov_len = sizeof(buf);
- msg.msg_iov = vec;
- msg.msg_iovlen = 1;
+ arg.msg.msg_iov = vec;
+ arg.msg.msg_iovlen = 1;
#if FD_PASSING_BY_MSG_CONTROL
- msg.msg_control = (caddr_t)&cmsg;
- msg.msg_controllen = CMSG_SPACE(sizeof(int));
- msg.msg_flags = 0;
+ arg.msg.msg_control = (caddr_t)&cmsg;
+ arg.msg.msg_controllen = CMSG_SPACE(sizeof(int));
+ arg.msg.msg_flags = 0;
cmsg.hdr.cmsg_len = CMSG_LEN(sizeof(int));
cmsg.hdr.cmsg_level = SOL_SOCKET;
cmsg.hdr.cmsg_type = SCM_RIGHTS;
*(int *)CMSG_DATA(&cmsg.hdr) = -1;
#else
- msg.msg_accrights = (caddr_t)&fd;
- msg.msg_accrightslen = sizeof(fd);
+ arg.msg.msg_accrights = (caddr_t)&fd;
+ arg.msg.msg_accrightslen = sizeof(fd);
fd = -1;
#endif
- if (recvmsg(fptr->fd, &msg, 0) == -1)
+ arg.fd = fptr->fd;
+ if ((int)BLOCKING_REGION(recvmsg_blocking, &arg) == -1)
rb_sys_fail("recvmsg(2)");
#if FD_PASSING_BY_MSG_CONTROL
- if (msg.msg_controllen != CMSG_SPACE(sizeof(int))) {
- rb_raise(rb_eSocket,
- "file descriptor was not passed (msg_controllen=%d, %d expected)",
- (int)msg.msg_controllen, (int)CMSG_SPACE(sizeof(int)));
+ if (arg.msg.msg_controllen != CMSG_SPACE(sizeof(int))) {
+ rb_raise(rb_eSocket,
+ "file descriptor was not passed (msg_controllen=%d, %d expected)",
+ (int)arg.msg.msg_controllen, (int)CMSG_SPACE(sizeof(int)));
}
if (cmsg.hdr.cmsg_len != CMSG_LEN(sizeof(int))) {
- rb_raise(rb_eSocket,
- "file descriptor was not passed (cmsg_len=%d, %d expected)",
- (int)cmsg.hdr.cmsg_len, (int)CMSG_LEN(sizeof(int)));
+ rb_raise(rb_eSocket,
+ "file descriptor was not passed (cmsg_len=%d, %d expected)",
+ (int)cmsg.hdr.cmsg_len, (int)CMSG_LEN(sizeof(int)));
}
if (cmsg.hdr.cmsg_level != SOL_SOCKET) {
- rb_raise(rb_eSocket,
- "file descriptor was not passed (cmsg_level=%d, %d expected)",
- cmsg.hdr.cmsg_level, SOL_SOCKET);
+ rb_raise(rb_eSocket,
+ "file descriptor was not passed (cmsg_level=%d, %d expected)",
+ cmsg.hdr.cmsg_level, SOL_SOCKET);
}
if (cmsg.hdr.cmsg_type != SCM_RIGHTS) {
- rb_raise(rb_eSocket,
- "file descriptor was not passed (cmsg_type=%d, %d expected)",
- cmsg.hdr.cmsg_type, SCM_RIGHTS);
+ rb_raise(rb_eSocket,
+ "file descriptor was not passed (cmsg_type=%d, %d expected)",
+ cmsg.hdr.cmsg_type, SCM_RIGHTS);
}
#else
- if (msg.msg_accrightslen != sizeof(fd)) {
+ if (arg.msg.msg_accrightslen != sizeof(fd)) {
rb_raise(rb_eSocket,
- "file descriptor was not passed (accrightslen) : %d != %d",
- msg.msg_accrightslen, (int)sizeof(fd));
+ "file descriptor was not passed (accrightslen) : %d != %d",
+ arg.msg.msg_accrightslen, (int)sizeof(fd));
}
#endif