summaryrefslogtreecommitdiff
path: root/thread_pthread.c
diff options
context:
space:
mode:
authornagachika <nagachika@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2013-03-11 15:54:49 +0000
committernagachika <nagachika@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2013-03-11 15:54:49 +0000
commit6901ebcb7ceddb125d28490577afb19c06f5fc40 (patch)
tree77dae54bc870d0d98527fb1d20e4ccaa2e86273d /thread_pthread.c
parentc98f39af775058dfc39e9f14fb0557e887efbba5 (diff)
merge revision(s) 39679,39682,39683,39685,39686,39694: [Backport #7999]
* thread_pthread.c (set_nonblock): new helper function for set O_NONBLOCK. * thread_pthread.c (rb_thread_create_timer_thread): set O_NONBLOCK to timer_thread_pipe[0] too. * thread_pthread.c (consume_communication_pipe): retry when read returned CCP_READ_BUFF_SIZE. * thread_pthread.c (rb_thread_create_timer_thread): factor out creating communication pipe logic into separate function. * thread_pthread.c (setup_communication_pipe): new helper function. * thread_pthread.c (set_nonblock): moves a definition before setup_communication_pipe. * thread_pthread.c (rb_thread_wakeup_timer_thread_fd): add fd argument and remove hardcoded dependency of timer_thread_pipe[1]. * thread_pthread.c (consume_communication_pipe): add fd argument. * thread_pthread.c (close_communication_pipe): ditto. * thread_pthread.c (timer_thread_sleep): adjust the above changes. * thread_pthread.c (setup_communication_pipe_internal): factor out pipe initialize logic. * thread_pthread.c (ARRAY_SIZE): new. * thread_pthread.c (gvl_acquire_common): use low priority notification for avoiding timer thread interval confusion. If we use timer_thread_pipe[1], every gvl_yield() request one more gvl_yield(). It lead to thread starvation. [Bug #7999] [ruby-core:53095] * thread_pthread.c (rb_reserved_fd_p): adds timer_thread_pipe_low to reserved fds. * process.c (setup_communication_pipe): remove unused function. it was unintentionally added r39683. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_2_0_0@39727 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread_pthread.c')
-rw-r--r--thread_pthread.c175
1 files changed, 116 insertions, 59 deletions
diff --git a/thread_pthread.c b/thread_pthread.c
index 7adc229c42..37a1e4740e 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -44,6 +44,7 @@ static void native_cond_broadcast(rb_thread_cond_t *cond);
static void native_cond_wait(rb_thread_cond_t *cond, pthread_mutex_t *mutex);
static void native_cond_initialize(rb_thread_cond_t *cond, int flags);
static void native_cond_destroy(rb_thread_cond_t *cond);
+static void rb_thread_wakeup_timer_thread_low(void);
static pthread_t timer_thread_id;
#define RB_CONDATTR_CLOCK_MONOTONIC 1
@@ -63,6 +64,10 @@ static pthread_t timer_thread_id;
# define USE_SLEEPY_TIMER_THREAD 0
#endif
+#ifndef ARRAY_SIZE
+#define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
+#endif
+
static void
gvl_acquire_common(rb_vm_t *vm)
{
@@ -70,8 +75,12 @@ gvl_acquire_common(rb_vm_t *vm)
vm->gvl.waiting++;
if (vm->gvl.waiting == 1) {
- /* transit to polling mode */
- rb_thread_wakeup_timer_thread();
+ /*
+ * Wake up timer thread iff timer thread is slept.
+ * When timer thread is polling mode, we don't want to
+ * make confusing timer thread interval time.
+ */
+ rb_thread_wakeup_timer_thread_low();
}
while (vm->gvl.acquired) {
@@ -1145,12 +1154,12 @@ static int check_signal_thread_list(void) { return 0; }
#if USE_SLEEPY_TIMER_THREAD
static int timer_thread_pipe[2] = {-1, -1};
+static int timer_thread_pipe_low[2] = {-1, -1}; /* low priority */
static int timer_thread_pipe_owner_process;
-
/* only use signal-safe system calls here */
-void
-rb_thread_wakeup_timer_thread(void)
+static void
+rb_thread_wakeup_timer_thread_fd(int fd)
{
ssize_t result;
@@ -1158,7 +1167,7 @@ rb_thread_wakeup_timer_thread(void)
if (timer_thread_pipe_owner_process == getpid()) {
const char *buff = "!";
retry:
- if ((result = write(timer_thread_pipe[1], buff, 1)) <= 0) {
+ if ((result = write(fd, buff, 1)) <= 0) {
switch (errno) {
case EINTR: goto retry;
case EAGAIN:
@@ -1177,36 +1186,111 @@ rb_thread_wakeup_timer_thread(void)
}
}
+void
+rb_thread_wakeup_timer_thread(void)
+{
+ rb_thread_wakeup_timer_thread_fd(timer_thread_pipe[1]);
+}
+
+static void
+rb_thread_wakeup_timer_thread_low(void)
+{
+ rb_thread_wakeup_timer_thread_fd(timer_thread_pipe_low[1]);
+}
+
/* VM-dependent API is not available for this function */
static void
-consume_communication_pipe(void)
+consume_communication_pipe(int fd)
{
#define CCP_READ_BUFF_SIZE 1024
/* buffer can be shared because no one refers to them. */
static char buff[CCP_READ_BUFF_SIZE];
ssize_t result;
- retry:
- result = read(timer_thread_pipe[0], buff, CCP_READ_BUFF_SIZE);
- if (result < 0) {
- switch (errno) {
- case EINTR: goto retry;
- default:
- rb_async_bug_errno("consume_communication_pipe: read\n", errno);
+ while (1) {
+ result = read(fd, buff, sizeof(buff));
+ if (result == 0) {
+ return;
+ }
+ else if (result < 0) {
+ switch (errno) {
+ case EINTR:
+ continue; /* retry */
+ case EAGAIN:
+ return;
+ default:
+ rb_async_bug_errno("consume_communication_pipe: read\n", errno);
+ }
}
}
}
static void
-close_communication_pipe(void)
+close_communication_pipe(int pipes[2])
{
- if (close(timer_thread_pipe[0]) < 0) {
+ if (close(pipes[0]) < 0) {
rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno);
}
- if (close(timer_thread_pipe[1]) < 0) {
+ if (close(pipes[1]) < 0) {
rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno);
}
- timer_thread_pipe[0] = timer_thread_pipe[1] = -1;
+ pipes[0] = pipes[1] = -1;
+}
+
+#if USE_SLEEPY_TIMER_THREAD
+static void
+set_nonblock(int fd)
+{
+ int oflags;
+ int err;
+
+ oflags = fcntl(fd, F_GETFL);
+ if (oflags == -1)
+ rb_sys_fail(0);
+ oflags |= O_NONBLOCK;
+ err = fcntl(fd, F_SETFL, oflags);
+ if (err == -1)
+ rb_sys_fail(0);
+}
+#endif
+
+#if USE_SLEEPY_TIMER_THREAD
+static void
+setup_communication_pipe_internal(int pipes[2])
+{
+ int err;
+
+ if (pipes[0] != -1) {
+ /* close pipe of parent process */
+ close_communication_pipe(pipes);
+ }
+
+ err = rb_cloexec_pipe(pipes);
+ if (err != 0) {
+ rb_bug_errno("setup_communication_pipe: Failed to create communication pipe for timer thread", errno);
+ }
+ rb_update_max_fd(pipes[0]);
+ rb_update_max_fd(pipes[1]);
+ set_nonblock(pipes[0]);
+ set_nonblock(pipes[1]);
+}
+#endif /* USE_SLEEPY_TIMER_THREAD */
+
+/* communication pipe with timer thread and signal handler */
+static void
+setup_communication_pipe(void)
+{
+#if USE_SLEEPY_TIMER_THREAD
+ if (timer_thread_pipe_owner_process == getpid()) {
+ /* already set up. */
+ return;
+ }
+ setup_communication_pipe_internal(timer_thread_pipe);
+ setup_communication_pipe_internal(timer_thread_pipe_low);
+
+ /* validate pipe on this process */
+ timer_thread_pipe_owner_process = getpid();
+#endif /* USE_SLEEPY_TIMER_THREAD */
}
/**
@@ -1220,27 +1304,30 @@ timer_thread_sleep(rb_global_vm_lock_t* gvl)
{
int result;
int need_polling;
- struct pollfd pollfd;
+ struct pollfd pollfds[2];
- pollfd.fd = timer_thread_pipe[0];
- pollfd.events = POLLIN;
+ pollfds[0].fd = timer_thread_pipe[0];
+ pollfds[0].events = POLLIN;
+ pollfds[1].fd = timer_thread_pipe_low[0];
+ pollfds[1].events = POLLIN;
need_polling = check_signal_thread_list();
if (gvl->waiting > 0 || need_polling) {
/* polling (TIME_QUANTUM_USEC usec) */
- result = poll(&pollfd, 1, TIME_QUANTUM_USEC/1000);
+ result = poll(pollfds, 1, TIME_QUANTUM_USEC/1000);
}
else {
/* wait (infinite) */
- result = poll(&pollfd, 1, -1);
+ result = poll(pollfds, ARRAY_SIZE(pollfds), -1);
}
if (result == 0) {
/* maybe timeout */
}
else if (result > 0) {
- consume_communication_pipe();
+ consume_communication_pipe(timer_thread_pipe[0]);
+ consume_communication_pipe(timer_thread_pipe_low[0]);
}
else { /* result < 0 */
switch (errno) {
@@ -1340,39 +1427,7 @@ rb_thread_create_timer_thread(void)
# endif
#endif
-#if USE_SLEEPY_TIMER_THREAD
- /* communication pipe with timer thread and signal handler */
- if (timer_thread_pipe_owner_process != getpid()) {
- if (timer_thread_pipe[0] != -1) {
- /* close pipe of parent process */
- close_communication_pipe();
- }
-
- err = rb_cloexec_pipe(timer_thread_pipe);
- if (err != 0) {
- rb_bug_errno("thread_timer: Failed to create communication pipe for timer thread", errno);
- }
- rb_update_max_fd(timer_thread_pipe[0]);
- rb_update_max_fd(timer_thread_pipe[1]);
-# if defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) && defined(O_NONBLOCK)
- {
- int oflags;
- int err;
-
- oflags = fcntl(timer_thread_pipe[1], F_GETFL);
- if (oflags == -1)
- rb_sys_fail(0);
- oflags |= O_NONBLOCK;
- err = fcntl(timer_thread_pipe[1], F_SETFL, oflags);
- if (err == -1)
- rb_sys_fail(0);
- }
-# endif /* defined(HAVE_FCNTL) && defined(F_GETFL) && defined(F_SETFL) */
-
- /* validate pipe on this process */
- timer_thread_pipe_owner_process = getpid();
- }
-#endif /* USE_SLEEPY_TIMER_THREAD */
+ setup_communication_pipe();
/* create timer thread */
if (timer_thread_id) {
@@ -1467,8 +1522,10 @@ int
rb_reserved_fd_p(int fd)
{
#if USE_SLEEPY_TIMER_THREAD
- if (fd == timer_thread_pipe[0] ||
- fd == timer_thread_pipe[1]) {
+ if (fd == timer_thread_pipe[0] ||
+ fd == timer_thread_pipe[1] ||
+ fd == timer_thread_pipe_low[0] ||
+ fd == timer_thread_pipe_low[1]) {
return 1;
}
else {