summaryrefslogtreecommitdiff
path: root/thread_pthread.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_pthread.c')
-rw-r--r--thread_pthread.c158
1 files changed, 89 insertions, 69 deletions
diff --git a/thread_pthread.c b/thread_pthread.c
index 26fe9ffddf..8bc5bac7c9 100644
--- a/thread_pthread.c
+++ b/thread_pthread.c
@@ -1270,9 +1270,16 @@ static int check_signal_thread_list(void) { return 0; }
#if USE_SLEEPY_TIMER_THREAD
static struct {
+ /*
+ * Read end of each pipe is closed inside timer thread for shutdown
+ * Write ends are closed by a normal Ruby thread during shutdown
+ */
int normal[2];
int low[2];
- rb_pid_t owner_process;
+
+ /* volatile for signal handler use: */
+ volatile rb_pid_t owner_process;
+ rb_atomic_t writing;
} timer_thread_pipe = {
{-1, -1},
{-1, -1}, /* low priority */
@@ -1280,12 +1287,13 @@ static struct {
/* only use signal-safe system calls here */
static void
-rb_thread_wakeup_timer_thread_fd(int fd)
+rb_thread_wakeup_timer_thread_fd(volatile int *fdp)
{
ssize_t result;
+ int fd = *fdp; /* access fdp exactly once here and do not reread fdp */
/* already opened */
- if (timer_thread_pipe.owner_process == getpid()) {
+ if (fd >= 0 && timer_thread_pipe.owner_process == getpid()) {
const char *buff = "!";
retry:
if ((result = write(fd, buff, 1)) <= 0) {
@@ -1311,13 +1319,18 @@ rb_thread_wakeup_timer_thread_fd(int fd)
void
rb_thread_wakeup_timer_thread(void)
{
- rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.normal[1]);
+ /* must be safe inside sighandler, so no mutex */
+ ATOMIC_INC(timer_thread_pipe.writing);
+ rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.normal[1]);
+ ATOMIC_DEC(timer_thread_pipe.writing);
}
static void
rb_thread_wakeup_timer_thread_low(void)
{
- rb_thread_wakeup_timer_thread_fd(timer_thread_pipe.low[1]);
+ ATOMIC_INC(timer_thread_pipe.writing);
+ rb_thread_wakeup_timer_thread_fd(&timer_thread_pipe.low[1]);
+ ATOMIC_DEC(timer_thread_pipe.writing);
}
/* VM-dependent API is not available for this function */
@@ -1351,16 +1364,16 @@ consume_communication_pipe(int fd)
}
}
+#define CLOSE_INVALIDATE(expr) close_invalidate(&expr,#expr)
static void
-close_communication_pipe(int pipes[2])
+close_invalidate(volatile int *fdp, const char *msg)
{
- if (close(pipes[0]) < 0) {
- rb_bug_errno("native_stop_timer_thread - close(ttp[0])", errno);
- }
- if (close(pipes[1]) < 0) {
- rb_bug_errno("native_stop_timer_thread - close(ttp[1])", errno);
+ int fd = *fdp; /* access fdp exactly once here and do not reread fdp */
+
+ *fdp = -1;
+ if (close(fd) < 0) {
+ rb_async_bug_errno(msg, errno);
}
- pipes[0] = pipes[1] = -1;
}
static void
@@ -1378,39 +1391,47 @@ set_nonblock(int fd)
rb_sys_fail(0);
}
-static void
+static int
setup_communication_pipe_internal(int pipes[2])
{
int err;
- if (pipes[0] != -1) {
- /* close pipe of parent process */
- close_communication_pipe(pipes);
- }
-
err = rb_cloexec_pipe(pipes);
if (err != 0) {
- rb_bug_errno("setup_communication_pipe: Failed to create communication pipe for timer thread", errno);
+ rb_warn("Failed to create communication pipe for timer thread: %s",
+ strerror(errno));
+ return -1;
}
rb_update_max_fd(pipes[0]);
rb_update_max_fd(pipes[1]);
set_nonblock(pipes[0]);
set_nonblock(pipes[1]);
+ return 0;
}
/* communication pipe with timer thread and signal handler */
-static void
+static int
setup_communication_pipe(void)
{
- if (timer_thread_pipe.owner_process == getpid()) {
- /* already set up. */
- return;
+ VM_ASSERT(timer_thread_pipe.owner_process == 0);
+ VM_ASSERT(timer_thread_pipe.normal[0] == -1);
+ VM_ASSERT(timer_thread_pipe.normal[1] == -1);
+ VM_ASSERT(timer_thread_pipe.low[0] == -1);
+ VM_ASSERT(timer_thread_pipe.low[1] == -1);
+
+ if (setup_communication_pipe_internal(timer_thread_pipe.normal) < 0) {
+ return errno;
+ }
+ if (setup_communication_pipe_internal(timer_thread_pipe.low) < 0) {
+ int e = errno;
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[0]);
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[1]);
+ return e;
}
- setup_communication_pipe_internal(timer_thread_pipe.normal);
- setup_communication_pipe_internal(timer_thread_pipe.low);
/* validate pipe on this process */
timer_thread_pipe.owner_process = getpid();
+ return 0;
}
/**
@@ -1547,7 +1568,10 @@ thread_timer(void *p)
/* wait */
timer_thread_sleep(gvl);
}
-#if !USE_SLEEPY_TIMER_THREAD
+#if USE_SLEEPY_TIMER_THREAD
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[0]);
+ CLOSE_INVALIDATE(timer_thread_pipe.low[0]);
+#else
native_mutex_unlock(&timer_thread_lock);
native_cond_destroy(&timer_thread_cond);
native_mutex_destroy(&timer_thread_lock);
@@ -1567,8 +1591,9 @@ rb_thread_create_timer_thread(void)
err = pthread_attr_init(&attr);
if (err != 0) {
- fprintf(stderr, "[FATAL] Failed to initialize pthread attr: %s\n", strerror(err));
- exit(EXIT_FAILURE);
+ rb_warn("pthread_attr_init failed for timer: %s, scheduling broken",
+ strerror(err));
+ return;
}
# ifdef PTHREAD_STACK_MIN
{
@@ -1586,7 +1611,12 @@ rb_thread_create_timer_thread(void)
#endif
#if USE_SLEEPY_TIMER_THREAD
- setup_communication_pipe();
+ err = setup_communication_pipe();
+ if (err != 0) {
+ rb_warn("pipe creation failed for timer: %s, scheduling broken",
+ strerror(err));
+ return;
+ }
#endif /* USE_SLEEPY_TIMER_THREAD */
/* create timer thread */
@@ -1599,8 +1629,13 @@ rb_thread_create_timer_thread(void)
err = pthread_create(&timer_thread.id, NULL, thread_timer, &GET_VM()->gvl);
#endif
if (err != 0) {
- fprintf(stderr, "[FATAL] Failed to create timer thread: %s\n", strerror(err));
- exit(EXIT_FAILURE);
+ rb_warn("pthread_create failed for timer: %s, scheduling broken",
+ strerror(err));
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[0]);
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[1]);
+ CLOSE_INVALIDATE(timer_thread_pipe.low[0]);
+ CLOSE_INVALIDATE(timer_thread_pipe.low[1]);
+ return;
}
timer_thread.created = 1;
#ifdef HAVE_PTHREAD_ATTR_INIT
@@ -1610,30 +1645,38 @@ rb_thread_create_timer_thread(void)
}
static int
-native_stop_timer_thread(int close_anyway)
+native_stop_timer_thread(void)
{
int stopped;
stopped = --system_working <= 0;
if (TT_DEBUG) fprintf(stderr, "stop timer thread\n");
if (stopped) {
- /* join */
- rb_thread_wakeup_timer_thread();
+ /* prevent wakeups from signal handler ASAP */
+ timer_thread_pipe.owner_process = 0;
+
+ /*
+ * however, the above was not enough: the FD may already be
+ * captured and in the middle of a write while we are running,
+ * so wait for that to finish:
+ */
+ while (ATOMIC_CAS(timer_thread_pipe.writing, 0, 0)) {
+ native_thread_yield();
+ }
+
+ /* stop writing ends of pipes so timer thread notices EOF */
+ CLOSE_INVALIDATE(timer_thread_pipe.normal[1]);
+ CLOSE_INVALIDATE(timer_thread_pipe.low[1]);
+
+ /* timer thread will stop looping when system_working <= 0: */
native_thread_join(timer_thread.id);
+
+ /* timer thread will close the read end on exit: */
+ VM_ASSERT(timer_thread_pipe.normal[0] == -1);
+ VM_ASSERT(timer_thread_pipe.low[0] == -1);
+
if (TT_DEBUG) fprintf(stderr, "joined timer thread\n");
timer_thread.created = 0;
-
- /* close communication pipe */
- if (close_anyway) {
- /* TODO: Uninstall all signal handlers or mask all signals.
- * This pass is cleaning phase (terminate ruby process).
- * To avoid such race, we skip to close communication
- * pipe. OS will close it at process termination.
- * It may not good practice, but pragmatic.
- * We remain it is TODO.
- */
- /* close_communication_pipe(); */
- }
}
return stopped;
}
@@ -1707,29 +1750,6 @@ rb_reserved_fd_p(int fd)
#endif
}
-int
-rb_divert_reserved_fd(int fd)
-{
-#if USE_SLEEPY_TIMER_THREAD
- int *ptr;
- int newfd;
-
- if ((fd == *(ptr = &(timer_thread_pipe.normal[0])) ||
- fd == *(ptr = &(timer_thread_pipe.normal[1])) ||
- fd == *(ptr = &(timer_thread_pipe.low[0])) ||
- fd == *(ptr = &(timer_thread_pipe.low[1]))) &&
- timer_thread_pipe.owner_process == getpid()) { /* async-signal-safe */
- newfd = rb_cloexec_dup(fd); /* async-signal-safe if no error */
- if (newfd == -1) return -1;
- rb_update_max_fd(newfd); /* async-signal-safe if no error */
- /* set_nonblock(newfd); */ /* async-signal-safe if no error */
- *ptr = newfd;
- rb_thread_wakeup_timer_thread_low(); /* async-signal-safe? */
- }
-#endif
- return 0;
-}
-
rb_nativethread_id_t
rb_nativethread_self(void)
{