summaryrefslogtreecommitdiff
path: root/ext/socket/ipsocket.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/socket/ipsocket.c')
-rw-r--r--ext/socket/ipsocket.c586
1 files changed, 353 insertions, 233 deletions
diff --git a/ext/socket/ipsocket.c b/ext/socket/ipsocket.c
index f3c00b518d..e1943b8496 100644
--- a/ext/socket/ipsocket.c
+++ b/ext/socket/ipsocket.c
@@ -23,8 +23,25 @@ struct inetsock_arg
int type;
VALUE resolv_timeout;
VALUE connect_timeout;
+ VALUE open_timeout;
};
+void
+rsock_raise_user_specified_timeout(struct addrinfo *ai, VALUE host, VALUE port)
+{
+ VALUE message;
+
+ if (ai && ai->ai_addr) {
+ VALUE rai = rsock_addrinfo_new((struct sockaddr *)ai->ai_addr, (socklen_t)ai->ai_addrlen, PF_UNSPEC, 0, 0, Qnil, Qnil);
+ VALUE addr_str = rsock_addrinfo_inspect_sockaddr(rai);
+ message = rb_sprintf("user specified timeout for %" PRIsVALUE, addr_str);
+ } else {
+ message = rb_sprintf("user specified timeout for %" PRIsVALUE " port %" PRIsVALUE, host, port);
+ }
+
+ rb_exc_raise(rb_exc_new_str(rb_eIOTimeoutError, message));
+}
+
static VALUE
inetsock_cleanup(VALUE v)
{
@@ -45,6 +62,13 @@ inetsock_cleanup(VALUE v)
}
static VALUE
+current_clocktime(void)
+{
+ VALUE clock_monotnic_const = rb_const_get(rb_mProcess, rb_intern("CLOCK_MONOTONIC"));
+ return rb_funcall(rb_mProcess, rb_intern("clock_gettime"), 1, clock_monotnic_const);
+}
+
+static VALUE
init_inetsock_internal(VALUE v)
{
struct inetsock_arg *arg = (void *)v;
@@ -54,12 +78,18 @@ init_inetsock_internal(VALUE v)
int status = 0, local = 0;
int family = AF_UNSPEC;
const char *syscall = 0;
+ VALUE resolv_timeout = arg->resolv_timeout;
VALUE connect_timeout = arg->connect_timeout;
+ VALUE open_timeout = arg->open_timeout;
+ VALUE timeout;
+ VALUE starts_at;
+
+ timeout = NIL_P(open_timeout) ? resolv_timeout : open_timeout;
+ starts_at = current_clocktime();
arg->remote.res = rsock_addrinfo(arg->remote.host, arg->remote.serv,
family, SOCK_STREAM,
- (type == INET_SERVER) ? AI_PASSIVE : 0);
-
+ (type == INET_SERVER) ? AI_PASSIVE : 0, timeout);
/*
* Maybe also accept a local address
@@ -67,7 +97,7 @@ init_inetsock_internal(VALUE v)
if (type != INET_SERVER && (!NIL_P(arg->local.host) || !NIL_P(arg->local.serv))) {
arg->local.res = rsock_addrinfo(arg->local.host, arg->local.serv,
- family, SOCK_STREAM, 0);
+ family, SOCK_STREAM, 0, timeout);
}
VALUE io = Qnil;
@@ -122,8 +152,18 @@ init_inetsock_internal(VALUE v)
syscall = "bind(2)";
}
+ if (NIL_P(open_timeout)) {
+ timeout = connect_timeout;
+ } else {
+ VALUE elapsed = rb_funcall(current_clocktime(), '-', 1, starts_at);
+ timeout = rb_funcall(open_timeout, '-', 1, elapsed);
+ if (rb_funcall(timeout, '<', 1, INT2FIX(0)) == Qtrue) {
+ rsock_raise_user_specified_timeout(res, arg->remote.host, arg->remote.serv);
+ }
+ }
+
if (status >= 0) {
- status = rsock_connect(io, res->ai_addr, res->ai_addrlen, (type == INET_SOCKS), connect_timeout);
+ status = rsock_connect(io, res->ai_addr, res->ai_addrlen, (type == INET_SOCKS), timeout);
syscall = "connect(2)";
}
}
@@ -172,8 +212,16 @@ init_inetsock_internal(VALUE v)
#if FAST_FALLBACK_INIT_INETSOCK_IMPL == 0
VALUE
-rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE _fast_fallback, VALUE _test_mode_settings)
+rsock_init_inetsock(
+ VALUE self, VALUE remote_host, VALUE remote_serv,
+ VALUE local_host, VALUE local_serv, int type,
+ VALUE resolv_timeout, VALUE connect_timeout, VALUE open_timeout,
+ VALUE _fast_fallback, VALUE _test_mode_settings)
{
+ if (!NIL_P(open_timeout) && (!NIL_P(resolv_timeout) || !NIL_P(connect_timeout))) {
+ rb_raise(rb_eArgError, "Cannot specify open_timeout along with connect_timeout or resolv_timeout");
+ }
+
struct inetsock_arg arg;
arg.self = self;
arg.io = Qnil;
@@ -186,6 +234,7 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca
arg.type = type;
arg.resolv_timeout = resolv_timeout;
arg.connect_timeout = connect_timeout;
+ arg.open_timeout = open_timeout;
return rb_ensure(init_inetsock_internal, (VALUE)&arg,
inetsock_cleanup, (VALUE)&arg);
}
@@ -209,6 +258,21 @@ is_specified_ip_address(const char *hostname)
inet_pton(AF_INET, hostname, &ipv4addr) == 1);
}
+static int
+is_local_port_fixed(const char *portp)
+{
+ if (!portp) return 0;
+
+ char *endp;
+ errno = 0;
+ long port = strtol(portp, &endp, 10);
+
+ if (endp == portp) return 0;
+ if (errno == ERANGE) return 0;
+
+ return port > 0;
+}
+
struct fast_fallback_inetsock_arg
{
VALUE self;
@@ -221,46 +285,34 @@ struct fast_fallback_inetsock_arg
int type;
VALUE resolv_timeout;
VALUE connect_timeout;
+ VALUE open_timeout;
const char *hostp, *portp;
int *families;
int family_size;
int additional_flags;
- int cancelled;
- rb_nativethread_lock_t *lock;
struct fast_fallback_getaddrinfo_entry *getaddrinfo_entries[2];
struct fast_fallback_getaddrinfo_shared *getaddrinfo_shared;
+ rb_fdset_t readfds, writefds;
+ int wait;
int connection_attempt_fds_size;
int *connection_attempt_fds;
VALUE test_mode_settings;
};
static struct fast_fallback_getaddrinfo_shared *
-allocate_fast_fallback_getaddrinfo_shared(void)
+allocate_fast_fallback_getaddrinfo_shared(int family_size)
{
struct fast_fallback_getaddrinfo_shared *shared;
shared = (struct fast_fallback_getaddrinfo_shared *)calloc(
1,
- sizeof(struct fast_fallback_getaddrinfo_shared)
+ sizeof(struct fast_fallback_getaddrinfo_shared) + (family_size == 1 ? 0 : 2) * sizeof(struct fast_fallback_getaddrinfo_entry)
);
return shared;
}
-static struct fast_fallback_getaddrinfo_entry *
-allocate_fast_fallback_getaddrinfo_entry(void)
-{
- struct fast_fallback_getaddrinfo_entry *entry;
-
- entry = (struct fast_fallback_getaddrinfo_entry *)calloc(
- 1,
- sizeof(struct fast_fallback_getaddrinfo_entry)
- );
-
- return entry;
-}
-
static void
allocate_fast_fallback_getaddrinfo_hints(struct addrinfo *hints, int family, int remote_addrinfo_hints, int additional_flags)
{
@@ -282,55 +334,21 @@ allocate_connection_attempt_fds(int additional_capacity)
}
static int
-reallocate_connection_attempt_fds(int *fds, int current_capacity, int additional_capacity)
+reallocate_connection_attempt_fds(int **fds, int current_capacity, int additional_capacity)
{
int new_capacity = current_capacity + additional_capacity;
+ int *new_fds;
- if (realloc(fds, new_capacity * sizeof(int)) == NULL) {
+ new_fds = realloc(*fds, new_capacity * sizeof(int));
+ if (new_fds == NULL) {
rb_syserr_fail(errno, "realloc(3)");
}
+ *fds = new_fds;
- for (int i = current_capacity; i < new_capacity; i++) fds[i] = -1;
+ for (int i = current_capacity; i < new_capacity; i++) (*fds)[i] = -1;
return new_capacity;
}
-struct wait_fast_fallback_arg
-{
- int status, nfds;
- fd_set *readfds, *writefds;
- struct timeval *delay;
- int *cancelled;
-};
-
-static void *
-wait_fast_fallback(void *ptr)
-{
- struct wait_fast_fallback_arg *arg = (struct wait_fast_fallback_arg *)ptr;
- int status;
- status = select(arg->nfds, arg->readfds, arg->writefds, NULL, arg->delay);
- arg->status = status;
- if (errno == EINTR) *arg->cancelled = true;
- return 0;
-}
-
-static void
-cancel_fast_fallback(void *ptr)
-{
- if (!ptr) return;
-
- struct fast_fallback_getaddrinfo_shared *arg = (struct fast_fallback_getaddrinfo_shared *)ptr;
-
- rb_nativethread_lock_lock(arg->lock);
- {
- *arg->cancelled = true;
- char notification = SELECT_CANCELLED;
- if ((write(arg->notify, &notification, 1)) < 0) {
- rb_syserr_fail(errno, "write(2)");
- }
- }
- rb_nativethread_lock_unlock(arg->lock);
-}
-
struct hostname_resolution_result
{
struct addrinfo *ai;
@@ -342,7 +360,7 @@ struct hostname_resolution_store
{
struct hostname_resolution_result v6;
struct hostname_resolution_result v4;
- int is_all_finised;
+ int is_all_finished;
};
static int
@@ -427,12 +445,22 @@ select_expires_at(
struct timeval *resolution_delay,
struct timeval *connection_attempt_delay,
struct timeval *user_specified_resolv_timeout_at,
- struct timeval *user_specified_connect_timeout_at
-) {
+ struct timeval *user_specified_connect_timeout_at,
+ struct timeval *user_specified_open_timeout_at)
+{
if (any_addrinfos(resolution_store)) {
- return resolution_delay ? resolution_delay : connection_attempt_delay;
+ struct timeval *delay;
+ delay = resolution_delay ? resolution_delay : connection_attempt_delay;
+
+ if (user_specified_open_timeout_at &&
+ timercmp(user_specified_open_timeout_at, delay, <)) {
+ return user_specified_open_timeout_at;
+ }
+ return delay;
}
+ if (user_specified_open_timeout_at) return user_specified_open_timeout_at;
+
struct timeval *timeout = NULL;
if (user_specified_resolv_timeout_at) {
@@ -521,7 +549,8 @@ in_progress_fds(int fds_size)
}
static void
-remove_connection_attempt_fd(int *fds, int *fds_size, int removing_fd) {
+remove_connection_attempt_fd(int *fds, int *fds_size, int removing_fd)
+{
int i, j;
for (i = 0; i < *fds_size; i++) {
@@ -550,6 +579,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
VALUE io = arg->io;
VALUE resolv_timeout = arg->resolv_timeout;
VALUE connect_timeout = arg->connect_timeout;
+ VALUE open_timeout = arg->open_timeout;
VALUE test_mode_settings = arg->test_mode_settings;
struct addrinfo *remote_ai = NULL, *local_ai = NULL;
int connected_fd = -1, status = 0, local_status = 0;
@@ -565,19 +595,16 @@ init_fast_fallback_inetsock_internal(VALUE v)
pthread_t threads[arg->family_size];
char resolved_type[2];
ssize_t resolved_type_size;
- int hostname_resolution_waiter = 0, hostname_resolution_notifier = 0;
+ int hostname_resolution_waiter = -1, hostname_resolution_notifier = -1;
int pipefd[2];
- fd_set readfds, writefds;
- struct wait_fast_fallback_arg wait_arg;
+ int nfds = 0;
struct timeval *ends_at = NULL;
struct timeval delay = (struct timeval){ -1, -1 };
- wait_arg.nfds = 0;
- wait_arg.writefds = NULL;
- wait_arg.status = 0;
+ struct timeval *delay_p = NULL;
struct hostname_resolution_store resolution_store;
- resolution_store.is_all_finised = false;
+ resolution_store.is_all_finished = false;
resolution_store.v6.ai = NULL;
resolution_store.v6.finished = false;
resolution_store.v6.has_error = false;
@@ -586,7 +613,6 @@ init_fast_fallback_inetsock_internal(VALUE v)
resolution_store.v4.has_error = false;
int last_family = 0;
-
int additional_capacity = 10;
int current_capacity = additional_capacity;
arg->connection_attempt_fds = allocate_connection_attempt_fds(additional_capacity);
@@ -600,17 +626,32 @@ init_fast_fallback_inetsock_internal(VALUE v)
struct timeval *user_specified_resolv_timeout_at = NULL;
struct timeval user_specified_connect_timeout_storage;
struct timeval *user_specified_connect_timeout_at = NULL;
+ struct timeval user_specified_open_timeout_storage;
+ struct timeval *user_specified_open_timeout_at = NULL;
struct timespec now = current_clocktime_ts();
+ VALUE starts_at = current_clocktime();
+
+ if (!NIL_P(open_timeout)) {
+ struct timeval open_timeout_tv = rb_time_interval(open_timeout);
+ user_specified_open_timeout_storage = add_ts_to_tv(open_timeout_tv, now);
+ user_specified_open_timeout_at = &user_specified_open_timeout_storage;
+ }
/* start of hostname resolution */
if (arg->family_size == 1) {
+ arg->wait = -1;
+ arg->getaddrinfo_shared = NULL;
+
int family = arg->families[0];
+ VALUE t = NIL_P(open_timeout) ? resolv_timeout : open_timeout;
+
arg->remote.res = rsock_addrinfo(
arg->remote.host,
arg->remote.serv,
family,
SOCK_STREAM,
- 0
+ 0,
+ t
);
if (family == AF_INET6) {
@@ -622,8 +663,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
resolution_store.v4.finished = true;
resolution_store.v6.finished = true;
}
- resolution_store.is_all_finised = true;
- wait_arg.readfds = NULL;
+ resolution_store.is_all_finished = true;
} else {
if (pipe(pipefd) != 0) rb_syserr_fail(errno, "pipe(2)");
hostname_resolution_waiter = pipefd[0];
@@ -632,30 +672,21 @@ init_fast_fallback_inetsock_internal(VALUE v)
if ((fcntl(hostname_resolution_waiter, F_SETFL, waiter_flags | O_NONBLOCK)) < 0) {
rb_syserr_fail(errno, "fcntl(2)");
}
-
+ arg->wait = hostname_resolution_waiter;
hostname_resolution_notifier = pipefd[1];
- wait_arg.readfds = &readfds;
- arg->getaddrinfo_shared = allocate_fast_fallback_getaddrinfo_shared();
+ arg->getaddrinfo_shared = allocate_fast_fallback_getaddrinfo_shared(arg->family_size);
if (!arg->getaddrinfo_shared) rb_syserr_fail(errno, "calloc(3)");
- arg->getaddrinfo_shared->lock = calloc(1, sizeof(rb_nativethread_lock_t));
- if (!arg->getaddrinfo_shared->lock) rb_syserr_fail(errno, "calloc(3)");
- rb_nativethread_lock_initialize(arg->getaddrinfo_shared->lock);
+ rb_nativethread_lock_initialize(&arg->getaddrinfo_shared->lock);
+ arg->getaddrinfo_shared->notify = hostname_resolution_notifier;
- arg->getaddrinfo_shared->node = arg->hostp ? strdup(arg->hostp) : NULL;
- arg->getaddrinfo_shared->service = strdup(arg->portp);
+ arg->getaddrinfo_shared->node = arg->hostp ? ruby_strdup(arg->hostp) : NULL;
+ arg->getaddrinfo_shared->service = arg->portp ? ruby_strdup(arg->portp) : NULL;
arg->getaddrinfo_shared->refcount = arg->family_size + 1;
- arg->getaddrinfo_shared->notify = hostname_resolution_notifier;
- arg->getaddrinfo_shared->wait = hostname_resolution_waiter;
- arg->getaddrinfo_shared->connection_attempt_fds = arg->connection_attempt_fds;
- arg->getaddrinfo_shared->connection_attempt_fds_size = arg->connection_attempt_fds_size;
- arg->getaddrinfo_shared->cancelled = &arg->cancelled;
- wait_arg.cancelled = &arg->cancelled;
for (int i = 0; i < arg->family_size; i++) {
- arg->getaddrinfo_entries[i] = allocate_fast_fallback_getaddrinfo_entry();
- if (!(arg->getaddrinfo_entries[i])) rb_syserr_fail(errno, "calloc(3)");
+ arg->getaddrinfo_entries[i] = &arg->getaddrinfo_shared->getaddrinfo_entries[i];
arg->getaddrinfo_entries[i]->shared = arg->getaddrinfo_shared;
struct addrinfo getaddrinfo_hints[arg->family_size];
@@ -695,10 +726,9 @@ init_fast_fallback_inetsock_internal(VALUE v)
}
}
- if (raddrinfo_pthread_create(&threads[i], do_fast_fallback_getaddrinfo, arg->getaddrinfo_entries[i]) != 0) {
+ if (raddrinfo_pthread_create(&threads[i], fork_safe_do_fast_fallback_getaddrinfo, arg->getaddrinfo_entries[i]) != 0) {
rsock_raise_resolution_error("getaddrinfo(3)", EAI_AGAIN);
}
- pthread_detach(threads[i]);
}
if (NIL_P(resolv_timeout)) {
@@ -722,7 +752,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (remote_ai->ai_family == AF_INET6) {
if (any_addrinfos(&resolution_store)) continue;
if (!in_progress_fds(arg->connection_attempt_fds_size)) break;
- if (resolution_store.is_all_finised) break;
+ if (resolution_store.is_all_finished) break;
if (local_status < 0) {
host = arg->local.host;
@@ -748,7 +778,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (!local_ai) {
if (any_addrinfos(&resolution_store)) continue;
if (in_progress_fds(arg->connection_attempt_fds_size)) break;
- if (!resolution_store.is_all_finised) break;
+ if (!resolution_store.is_all_finished) break;
/* Use a different family local address if no choice, this
* will cause EAFNOSUPPORT. */
@@ -765,7 +795,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (any_addrinfos(&resolution_store)) continue;
if (in_progress_fds(arg->connection_attempt_fds_size)) break;
- if (!resolution_store.is_all_finised) break;
+ if (!resolution_store.is_all_finished) break;
if (local_status < 0) {
host = arg->local.host;
@@ -801,7 +831,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (any_addrinfos(&resolution_store)) continue;
if (in_progress_fds(arg->connection_attempt_fds_size)) break;
- if (!resolution_store.is_all_finised) break;
+ if (!resolution_store.is_all_finished) break;
if (local_status < 0) {
host = arg->local.host;
@@ -822,19 +852,31 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (any_addrinfos(&resolution_store) ||
in_progress_fds(arg->connection_attempt_fds_size) ||
- !resolution_store.is_all_finised) {
+ !resolution_store.is_all_finished) {
socket_nonblock_set(fd);
status = connect(fd, remote_ai->ai_addr, remote_ai->ai_addrlen);
last_family = remote_ai->ai_family;
} else {
- if (!NIL_P(connect_timeout)) {
- user_specified_connect_timeout_storage = rb_time_interval(connect_timeout);
- user_specified_connect_timeout_at = &user_specified_connect_timeout_storage;
+ VALUE timeout = Qnil;
+
+ if (!NIL_P(open_timeout)) {
+ VALUE elapsed = rb_funcall(current_clocktime(), '-', 1, starts_at);
+ timeout = rb_funcall(open_timeout, '-', 1, elapsed);
+
+ if (rb_funcall(timeout, '<', 1, INT2FIX(0)) == Qtrue) {
+ rsock_raise_user_specified_timeout(NULL, arg->remote.host, arg->remote.serv);
+ }
+ }
+ if (NIL_P(timeout)) {
+ if (!NIL_P(connect_timeout)) {
+ user_specified_connect_timeout_storage = rb_time_interval(connect_timeout);
+ user_specified_connect_timeout_at = &user_specified_connect_timeout_storage;
+ }
+ timeout =
+ (user_specified_connect_timeout_at && is_infinity(*user_specified_connect_timeout_at)) ?
+ Qnil : tv_to_seconds(user_specified_connect_timeout_at);
}
- VALUE timeout =
- (user_specified_connect_timeout_at && is_infinity(*user_specified_connect_timeout_at)) ?
- Qnil : tv_to_seconds(user_specified_connect_timeout_at);
io = arg->io = rsock_init_sock(arg->self, fd);
status = rsock_connect(io, remote_ai->ai_addr, remote_ai->ai_addrlen, 0, timeout);
}
@@ -847,14 +889,13 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (errno == EINPROGRESS) {
if (current_capacity == arg->connection_attempt_fds_size) {
current_capacity = reallocate_connection_attempt_fds(
- arg->connection_attempt_fds,
+ &arg->connection_attempt_fds,
current_capacity,
additional_capacity
);
}
arg->connection_attempt_fds[arg->connection_attempt_fds_size] = fd;
(arg->connection_attempt_fds_size)++;
- wait_arg.writefds = &writefds;
set_timeout_tv(&connection_attempt_delay_strage, 250, now);
connection_attempt_delay_expires_at = &connection_attempt_delay_strage;
@@ -883,7 +924,7 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (any_addrinfos(&resolution_store)) continue;
if (in_progress_fds(arg->connection_attempt_fds_size)) break;
- if (!resolution_store.is_all_finised) break;
+ if (!resolution_store.is_all_finished) break;
if (local_status < 0) {
host = arg->local.host;
@@ -907,47 +948,51 @@ init_fast_fallback_inetsock_internal(VALUE v)
resolution_delay_expires_at,
connection_attempt_delay_expires_at,
user_specified_resolv_timeout_at,
- user_specified_connect_timeout_at
+ user_specified_connect_timeout_at,
+ user_specified_open_timeout_at
);
if (ends_at) {
delay = tv_to_timeout(ends_at, now);
- wait_arg.delay = &delay;
+ delay_p = &delay;
} else {
- wait_arg.delay = NULL;
+ if (((resolution_store.v6.finished && !resolution_store.v4.finished) ||
+ (resolution_store.v4.finished && !resolution_store.v6.finished)) &&
+ !any_addrinfos(&resolution_store) &&
+ !in_progress_fds(arg->connection_attempt_fds_size)) {
+ /* A limited timeout is introduced to prevent select(2) from hanging when it is exclusively
+ * waiting for name resolution and write(2) failure occurs in a child thread. */
+ delay.tv_sec = 0;
+ delay.tv_usec = 50000;
+ delay_p = &delay;
+ } else {
+ delay_p = NULL;
+ }
}
- if (arg->connection_attempt_fds_size) {
- FD_ZERO(wait_arg.writefds);
+ nfds = 0;
+ rb_fd_zero(&arg->writefds);
+ if (in_progress_fds(arg->connection_attempt_fds_size)) {
int n = 0;
for (int i = 0; i < arg->connection_attempt_fds_size; i++) {
int cfd = arg->connection_attempt_fds[i];
if (cfd < 0) continue;
if (cfd > n) n = cfd;
- FD_SET(cfd, wait_arg.writefds);
+ rb_fd_set(cfd, &arg->writefds);
}
if (n > 0) n++;
- wait_arg.nfds = n;
- } else {
- wait_arg.writefds = NULL;
+ nfds = n;
}
- FD_ZERO(wait_arg.readfds);
- FD_SET(hostname_resolution_waiter, wait_arg.readfds);
- if ((hostname_resolution_waiter + 1) > wait_arg.nfds) {
- wait_arg.nfds = hostname_resolution_waiter + 1;
- }
+ rb_fd_zero(&arg->readfds);
+ if (arg->family_size > 1) {
+ rb_fd_set(hostname_resolution_waiter, &arg->readfds);
- rb_thread_call_without_gvl2(
- wait_fast_fallback,
- &wait_arg,
- cancel_fast_fallback,
- arg->getaddrinfo_shared
- );
- rb_thread_check_ints();
- if (errno == EINTR || arg->cancelled) break;
+ if ((hostname_resolution_waiter + 1) > nfds) {
+ nfds = hostname_resolution_waiter + 1;
+ }
+ }
- status = wait_arg.status;
- syscall = "select(2)";
+ status = rb_thread_fd_select(nfds, &arg->readfds, &arg->writefds, NULL, delay_p);
now = current_clocktime_ts();
if (is_timeout_tv(resolution_delay_expires_at, now)) {
@@ -961,14 +1006,39 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (status > 0) {
/* check for connection */
- for (int i = 0; i < arg->connection_attempt_fds_size; i++) {
- int fd = arg->connection_attempt_fds[i];
- if (fd < 0 || !FD_ISSET(fd, wait_arg.writefds)) continue;
+ if (in_progress_fds(arg->connection_attempt_fds_size)) {
+ for (int i = 0; i < arg->connection_attempt_fds_size; i++) {
+ int fd = arg->connection_attempt_fds[i];
+ if (fd < 0 || !rb_fd_isset(fd, &arg->writefds)) continue;
+
+ int err;
+ socklen_t len = sizeof(err);
+
+ status = getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len);
- int err;
- socklen_t len = sizeof(err);
+ if (status < 0) {
+ last_error.type = SYSCALL_ERROR;
+ last_error.ecode = errno;
+ close(fd);
+
+ if (any_addrinfos(&resolution_store)) continue;
+ if (in_progress_fds(arg->connection_attempt_fds_size)) break;
+ if (!resolution_store.is_all_finished) break;
+
+ if (local_status < 0) {
+ host = arg->local.host;
+ serv = arg->local.serv;
+ } else {
+ host = arg->remote.host;
+ serv = arg->remote.serv;
+ }
+ if (last_error.type == RESOLUTION_ERROR) {
+ rsock_raise_resolution_error(syscall, last_error.ecode);
+ } else {
+ rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv);
+ }
+ }
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) == 0) {
if (err == 0) { /* success */
remove_connection_attempt_fd(
arg->connection_attempt_fds,
@@ -977,47 +1047,42 @@ init_fast_fallback_inetsock_internal(VALUE v)
);
connected_fd = fd;
break;
- };
-
- /* fail */
- errno = err;
- close(fd);
- remove_connection_attempt_fd(
- arg->connection_attempt_fds,
- &arg->connection_attempt_fds_size,
- fd
- );
- continue;
+ } else { /* fail */
+ close(fd);
+ remove_connection_attempt_fd(
+ arg->connection_attempt_fds,
+ &arg->connection_attempt_fds_size,
+ fd
+ );
+ last_error.type = SYSCALL_ERROR;
+ last_error.ecode = err;
+ }
}
- }
- if (connected_fd >= 0) break;
- last_error.type = SYSCALL_ERROR;
- last_error.ecode = errno;
+ if (connected_fd >= 0) break;
- if (any_addrinfos(&resolution_store) ||
- in_progress_fds(arg->connection_attempt_fds_size) ||
- !resolution_store.is_all_finised) {
if (!in_progress_fds(arg->connection_attempt_fds_size)) {
+ if (!any_addrinfos(&resolution_store) && resolution_store.is_all_finished) {
+ if (local_status < 0) {
+ host = arg->local.host;
+ serv = arg->local.serv;
+ } else {
+ host = arg->remote.host;
+ serv = arg->remote.serv;
+ }
+ if (last_error.type == RESOLUTION_ERROR) {
+ rsock_raise_resolution_error(syscall, last_error.ecode);
+ } else {
+ rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv);
+ }
+ }
+ connection_attempt_delay_expires_at = NULL;
user_specified_connect_timeout_at = NULL;
}
- } else {
- if (local_status < 0) {
- host = arg->local.host;
- serv = arg->local.serv;
- } else {
- host = arg->remote.host;
- serv = arg->remote.serv;
- }
- if (last_error.type == RESOLUTION_ERROR) {
- rsock_raise_resolution_error(syscall, last_error.ecode);
- } else {
- rsock_syserr_fail_host_port(last_error.ecode, syscall, host, serv);
- }
}
/* check for hostname resolution */
- if (!resolution_store.is_all_finised && FD_ISSET(hostname_resolution_waiter, wait_arg.readfds)) {
+ if (!resolution_store.is_all_finished && rb_fd_isset(hostname_resolution_waiter, &arg->readfds)) {
while (true) {
resolved_type_size = read(
hostname_resolution_waiter,
@@ -1033,15 +1098,17 @@ init_fast_fallback_inetsock_internal(VALUE v)
if (arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err &&
arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err != EAI_ADDRFAMILY) {
- last_error.type = RESOLUTION_ERROR;
- last_error.ecode = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err;
- syscall = "getaddrinfo(3)";
+ if (!resolution_store.v4.finished || resolution_store.v4.has_error) {
+ last_error.type = RESOLUTION_ERROR;
+ last_error.ecode = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err;
+ syscall = "getaddrinfo(3)";
+ }
resolution_store.v6.has_error = true;
} else {
resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai;
}
if (resolution_store.v4.finished) {
- resolution_store.is_all_finised = true;
+ resolution_store.is_all_finished = true;
resolution_delay_expires_at = NULL;
user_specified_resolv_timeout_at = NULL;
break;
@@ -1050,16 +1117,18 @@ init_fast_fallback_inetsock_internal(VALUE v)
resolution_store.v4.finished = true;
if (arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err) {
- last_error.type = RESOLUTION_ERROR;
- last_error.ecode = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err;
- syscall = "getaddrinfo(3)";
+ if (!resolution_store.v6.finished || resolution_store.v6.has_error) {
+ last_error.type = RESOLUTION_ERROR;
+ last_error.ecode = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err;
+ syscall = "getaddrinfo(3)";
+ }
resolution_store.v4.has_error = true;
} else {
resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai;
}
if (resolution_store.v6.finished) {
- resolution_store.is_all_finised = true;
+ resolution_store.is_all_finished = true;
resolution_delay_expires_at = NULL;
user_specified_resolv_timeout_at = NULL;
break;
@@ -1083,12 +1152,63 @@ init_fast_fallback_inetsock_internal(VALUE v)
}
}
- status = wait_arg.status = 0;
+ status = 0;
+ }
+
+ /* For cases where write(2) fails in child threads */
+ if (!resolution_store.is_all_finished) {
+ if (!resolution_store.v6.finished && arg->getaddrinfo_entries[IPV6_ENTRY_POS]->has_syserr) {
+ resolution_store.v6.finished = true;
+
+ if (arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err) {
+ if (!resolution_store.v4.finished || resolution_store.v4.has_error) {
+ last_error.type = RESOLUTION_ERROR;
+ last_error.ecode = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->err;
+ syscall = "getaddrinfo(3)";
+ }
+ resolution_store.v6.has_error = true;
+ } else {
+ resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai;
+ }
+
+ if (resolution_store.v4.finished) {
+ resolution_store.is_all_finished = true;
+ resolution_delay_expires_at = NULL;
+ user_specified_resolv_timeout_at = NULL;
+ }
+ }
+ if (!resolution_store.v4.finished && arg->getaddrinfo_entries[IPV4_ENTRY_POS]->has_syserr) {
+ resolution_store.v4.finished = true;
+
+ if (arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err) {
+ if (!resolution_store.v6.finished || resolution_store.v6.has_error) {
+ last_error.type = RESOLUTION_ERROR;
+ last_error.ecode = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->err;
+ syscall = "getaddrinfo(3)";
+ }
+ resolution_store.v4.has_error = true;
+ } else {
+ resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai;
+ }
+
+ if (resolution_store.v6.finished) {
+ resolution_store.is_all_finished = true;
+ resolution_delay_expires_at = NULL;
+ user_specified_resolv_timeout_at = NULL;
+ } else {
+ set_timeout_tv(&resolution_delay_storage, 50, now);
+ resolution_delay_expires_at = &resolution_delay_storage;
+ }
+ }
+ }
+
+ if (is_timeout_tv(user_specified_open_timeout_at, now)) {
+ rsock_raise_user_specified_timeout(NULL, arg->remote.host, arg->remote.serv);
}
if (!any_addrinfos(&resolution_store)) {
if (!in_progress_fds(arg->connection_attempt_fds_size) &&
- resolution_store.is_all_finised) {
+ resolution_store.is_all_finished) {
if (local_status < 0) {
host = arg->local.host;
serv = arg->local.serv;
@@ -1104,46 +1224,14 @@ init_fast_fallback_inetsock_internal(VALUE v)
}
if ((is_timeout_tv(user_specified_resolv_timeout_at, now) ||
- resolution_store.is_all_finised) &&
+ resolution_store.is_all_finished) &&
(is_timeout_tv(user_specified_connect_timeout_at, now) ||
!in_progress_fds(arg->connection_attempt_fds_size))) {
- VALUE errno_module = rb_const_get(rb_cObject, rb_intern("Errno"));
- VALUE etimedout_error = rb_const_get(errno_module, rb_intern("ETIMEDOUT"));
- rb_raise(etimedout_error, "user specified timeout");
- }
- }
-
- if (!resolution_store.is_all_finised) {
- if (!resolution_store.v6.finished && arg->getaddrinfo_entries[IPV6_ENTRY_POS]->has_syserr) {
- resolution_store.v6.ai = arg->getaddrinfo_entries[IPV6_ENTRY_POS]->ai;
- resolution_store.v6.finished = true;
-
- if (resolution_store.v4.finished) {
- resolution_store.is_all_finised = true;
- wait_arg.readfds = NULL;
- resolution_delay_expires_at = NULL;
- user_specified_resolv_timeout_at = NULL;
- }
- }
- if (!resolution_store.v4.finished && arg->getaddrinfo_entries[IPV4_ENTRY_POS]->has_syserr) {
- resolution_store.v4.ai = arg->getaddrinfo_entries[IPV4_ENTRY_POS]->ai;
- resolution_store.v4.finished = true;
-
- if (resolution_store.v6.finished) {
- resolution_store.is_all_finised = true;
- wait_arg.readfds = NULL;
- resolution_delay_expires_at = NULL;
- user_specified_resolv_timeout_at = NULL;
- } else {
- set_timeout_tv(&resolution_delay_storage, 50, now);
- resolution_delay_expires_at = &resolution_delay_storage;
- }
+ rsock_raise_user_specified_timeout(NULL, arg->remote.host, arg->remote.serv);
}
}
}
- rb_thread_check_ints();
-
if (NIL_P(arg->io)) {
/* create new instance */
arg->io = rsock_init_sock(arg->self, connected_fd);
@@ -1167,27 +1255,40 @@ fast_fallback_inetsock_cleanup(VALUE v)
arg->local.res = 0;
}
+ if (arg->wait != -1) close(arg->wait);
+
if (getaddrinfo_shared) {
+ if (getaddrinfo_shared->notify != -1) close(getaddrinfo_shared->notify);
+ getaddrinfo_shared->notify = -1;
+
int shared_need_free = 0;
- int need_free[2] = { 0, 0 };
+ struct addrinfo *ais[arg->family_size];
+ for (int i = 0; i < arg->family_size; i++) ais[i] = NULL;
- rb_nativethread_lock_lock(getaddrinfo_shared->lock);
+ rb_nativethread_lock_lock(&getaddrinfo_shared->lock);
{
for (int i = 0; i < arg->family_size; i++) {
- if (arg->getaddrinfo_entries[i] && --(arg->getaddrinfo_entries[i]->refcount) == 0) {
- need_free[i] = 1;
+ struct fast_fallback_getaddrinfo_entry *getaddrinfo_entry = arg->getaddrinfo_entries[i];
+
+ if (!getaddrinfo_entry) continue;
+
+ if (--(getaddrinfo_entry->refcount) == 0) {
+ ais[i] = getaddrinfo_entry->ai;
+ getaddrinfo_entry->ai = NULL;
}
}
if (--(getaddrinfo_shared->refcount) == 0) {
shared_need_free = 1;
}
}
- rb_nativethread_lock_unlock(getaddrinfo_shared->lock);
+ rb_nativethread_lock_unlock(&getaddrinfo_shared->lock);
for (int i = 0; i < arg->family_size; i++) {
- if (need_free[i]) free_fast_fallback_getaddrinfo_entry(&arg->getaddrinfo_entries[i]);
+ if (ais[i]) freeaddrinfo(ais[i]);
+ }
+ if (getaddrinfo_shared && shared_need_free) {
+ free_fast_fallback_getaddrinfo_shared(&getaddrinfo_shared);
}
- if (shared_need_free) free_fast_fallback_getaddrinfo_shared(&getaddrinfo_shared);
}
int connection_attempt_fd;
@@ -1204,6 +1305,9 @@ fast_fallback_inetsock_cleanup(VALUE v)
}
}
+ if (arg->readfds.fdset) rb_fd_term(&arg->readfds);
+ if (arg->writefds.fdset) rb_fd_term(&arg->writefds);
+
if (arg->connection_attempt_fds) {
free(arg->connection_attempt_fds);
arg->connection_attempt_fds = NULL;
@@ -1213,17 +1317,27 @@ fast_fallback_inetsock_cleanup(VALUE v)
}
VALUE
-rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE local_host, VALUE local_serv, int type, VALUE resolv_timeout, VALUE connect_timeout, VALUE fast_fallback, VALUE test_mode_settings)
+rsock_init_inetsock(
+ VALUE self, VALUE remote_host, VALUE remote_serv,
+ VALUE local_host, VALUE local_serv, int type,
+ VALUE resolv_timeout, VALUE connect_timeout, VALUE open_timeout,
+ VALUE fast_fallback, VALUE test_mode_settings)
{
+ if (!NIL_P(open_timeout) && (!NIL_P(resolv_timeout) || !NIL_P(connect_timeout))) {
+ rb_raise(rb_eArgError, "Cannot specify open_timeout along with connect_timeout or resolv_timeout");
+ }
+
if (type == INET_CLIENT && FAST_FALLBACK_INIT_INETSOCK_IMPL == 1 && RTEST(fast_fallback)) {
struct rb_addrinfo *local_res = NULL;
- char *hostp, *portp;
- char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV];
+ char *hostp, *portp, *local_portp;
+ char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV], local_pbuf[NI_MAXSERV];
int additional_flags = 0;
- hostp = host_str(remote_host, hbuf, sizeof(hbuf), &additional_flags);
- portp = port_str(remote_serv, pbuf, sizeof(pbuf), &additional_flags);
+ int local_flags = 0;
+ hostp = raddrinfo_host_str(remote_host, hbuf, sizeof(hbuf), &additional_flags);
+ portp = raddrinfo_port_str(remote_serv, pbuf, sizeof(pbuf), &additional_flags);
+ local_portp = raddrinfo_port_str(local_serv, local_pbuf, sizeof(local_pbuf), &local_flags);
- if (!is_specified_ip_address(hostp)) {
+ if (!is_specified_ip_address(hostp) && !is_local_port_fixed(local_portp)) {
int target_families[2] = { 0, 0 };
int resolving_family_size = 0;
@@ -1231,16 +1345,18 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca
* Maybe also accept a local address
*/
if (!NIL_P(local_host) || !NIL_P(local_serv)) {
+ VALUE t = NIL_P(open_timeout) ? resolv_timeout : open_timeout;
local_res = rsock_addrinfo(
local_host,
local_serv,
AF_UNSPEC,
SOCK_STREAM,
- 0
+ 0,
+ t
);
struct addrinfo *tmp_p = local_res->ai;
- for (tmp_p; tmp_p != NULL; tmp_p = tmp_p->ai_next) {
+ for (; tmp_p != NULL; tmp_p = tmp_p->ai_next) {
if (target_families[0] == 0 && tmp_p->ai_family == AF_INET6) {
target_families[0] = AF_INET6;
resolving_family_size++;
@@ -1270,10 +1386,10 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca
fast_fallback_arg.type = type;
fast_fallback_arg.resolv_timeout = resolv_timeout;
fast_fallback_arg.connect_timeout = connect_timeout;
+ fast_fallback_arg.open_timeout = open_timeout;
fast_fallback_arg.hostp = hostp;
fast_fallback_arg.portp = portp;
fast_fallback_arg.additional_flags = additional_flags;
- fast_fallback_arg.cancelled = false;
int resolving_families[resolving_family_size];
int resolving_family_index = 0;
@@ -1287,6 +1403,9 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca
fast_fallback_arg.family_size = resolving_family_size;
fast_fallback_arg.test_mode_settings = test_mode_settings;
+ rb_fd_init(&fast_fallback_arg.readfds);
+ rb_fd_init(&fast_fallback_arg.writefds);
+
return rb_ensure(init_fast_fallback_inetsock_internal, (VALUE)&fast_fallback_arg,
fast_fallback_inetsock_cleanup, (VALUE)&fast_fallback_arg);
}
@@ -1304,6 +1423,7 @@ rsock_init_inetsock(VALUE self, VALUE remote_host, VALUE remote_serv, VALUE loca
arg.type = type;
arg.resolv_timeout = resolv_timeout;
arg.connect_timeout = connect_timeout;
+ arg.open_timeout = open_timeout;
return rb_ensure(init_inetsock_internal, (VALUE)&arg,
inetsock_cleanup, (VALUE)&arg);
@@ -1489,7 +1609,7 @@ static VALUE
ip_s_getaddress(VALUE obj, VALUE host)
{
union_sockaddr addr;
- struct rb_addrinfo *res = rsock_addrinfo(host, Qnil, AF_UNSPEC, SOCK_STREAM, 0);
+ struct rb_addrinfo *res = rsock_addrinfo(host, Qnil, AF_UNSPEC, SOCK_STREAM, 0, Qnil);
socklen_t len = res->ai->ai_addrlen;
/* just take the first one */