From 1162523bae926cfa6128043b635e28c14b732754 Mon Sep 17 00:00:00 2001 From: Takashi Kokubun Date: Wed, 15 Jun 2022 09:40:54 -0700 Subject: Remove MJIT worker thread (#6006) [Misc #18830] --- mjit.c | 316 ++++++++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 184 insertions(+), 132 deletions(-) (limited to 'mjit.c') diff --git a/mjit.c b/mjit.c index b36331e72e..93e3d823e4 100644 --- a/mjit.c +++ b/mjit.c @@ -25,6 +25,7 @@ #include "internal/hash.h" #include "internal/warnings.h" #include "vm_sync.h" +#include "ractor_core.h" #include "mjit_worker.c" @@ -50,40 +51,6 @@ get_uniq_filename(unsigned long id, const char *prefix, const char *suffix) return str; } -// Wait until workers don't compile any iseq. It is called at the -// start of GC. -void -mjit_gc_start_hook(void) -{ - if (!mjit_enabled) - return; - CRITICAL_SECTION_START(4, "mjit_gc_start_hook"); - while (in_jit) { - verbose(4, "Waiting wakeup from a worker for GC"); - rb_native_cond_wait(&mjit_client_wakeup, &mjit_engine_mutex); - verbose(4, "Getting wakeup from a worker for GC"); - } - in_gc++; - CRITICAL_SECTION_FINISH(4, "mjit_gc_start_hook"); -} - -// Send a signal to workers to continue iseq compilations. It is -// called at the end of GC. -void -mjit_gc_exit_hook(void) -{ - if (!mjit_enabled) - return; - CRITICAL_SECTION_START(4, "mjit_gc_exit_hook"); - in_gc--; - RUBY_ASSERT_ALWAYS(in_gc >= 0); - if (!in_gc) { - verbose(4, "Sending wakeup signal to workers after GC"); - rb_native_cond_broadcast(&mjit_gc_wakeup); - } - CRITICAL_SECTION_FINISH(4, "mjit_gc_exit_hook"); -} - // Prohibit calling JIT-ed code and let existing JIT-ed frames exit before the next insn. void mjit_cancel_all(const char *reason) @@ -133,9 +100,6 @@ mjit_free_iseq(const rb_iseq_t *iseq) if (!mjit_enabled) return; - CRITICAL_SECTION_START(4, "mjit_free_iseq"); - RUBY_ASSERT_ALWAYS(in_gc); - RUBY_ASSERT_ALWAYS(!in_jit); if (ISEQ_BODY(iseq)->jit_unit) { // jit_unit is not freed here because it may be referred by multiple // lists of units. `get_from_list` and `mjit_finish` do the job. @@ -150,7 +114,6 @@ mjit_free_iseq(const rb_iseq_t *iseq) unit->iseq = NULL; } } - CRITICAL_SECTION_FINISH(4, "mjit_free_iseq"); } // Free unit list. This should be called only when worker is finished @@ -245,19 +208,169 @@ finish_conts(void) } } -// Create unit for `iseq`. This function may be called from an MJIT worker. +static void mjit_wait(struct rb_iseq_constant_body *body); + +// Check the unit queue and start mjit_compile if nothing is in progress. static void -create_unit(const rb_iseq_t *iseq) +check_unit_queue(void) { - struct rb_mjit_unit *unit; + if (worker_stopped) return; + if (current_cc_pid != 0) return; // still compiling + + // Run unload_units after it's requested `max_cache_size / 10` (default: 10) times. + // This throttles the call to mitigate locking in unload_units. It also throttles JIT compaction. + int throttle_threshold = mjit_opts.max_cache_size / 10; + if (unload_requests >= throttle_threshold) { + unload_units(); + unload_requests = 0; + if (active_units.length == mjit_opts.max_cache_size && mjit_opts.wait) { // Sometimes all methods may be in use + mjit_opts.max_cache_size++; // avoid infinite loop on `rb_mjit_wait_call`. Note that --jit-wait is just for testing. + verbose(1, "No units can be unloaded -- incremented max-cache-size to %d for --jit-wait", mjit_opts.max_cache_size); + } + } + if (active_units.length >= mjit_opts.max_cache_size) return; // wait until unload_units makes a progress - unit = calloc(1, sizeof(struct rb_mjit_unit)); + // Dequeue a unit + struct rb_mjit_unit *unit = get_from_list(&unit_queue); + if (unit == NULL) return; + +#ifdef _WIN32 + // Synchronously compile methods on Windows. + // mswin: No SIGCHLD, MinGW: directly compiling .c to .so doesn't work + mjit_func_t func = convert_unit_to_func(unit); + if ((uintptr_t)func > (uintptr_t)LAST_JIT_ISEQ_FUNC) { + add_to_list(unit, &active_units); + MJIT_ATOMIC_SET(ISEQ_BODY(unit->iseq)->jit_func, func); + } +#else + current_cc_ms = real_ms_time(); + current_cc_unit = unit; + current_cc_pid = start_mjit_compile(unit); + // TODO: handle -1 + if (mjit_opts.wait) { + mjit_wait(unit->iseq->body); + } +#endif +} + +// Create unit for `iseq`. This function may be called from an MJIT worker. +static struct rb_mjit_unit* +create_unit(const rb_iseq_t *iseq) +{ + // To prevent GC, don't use ZALLOC // TODO: just use ZALLOC + struct rb_mjit_unit *unit = calloc(1, sizeof(struct rb_mjit_unit)); if (unit == NULL) - return; + return NULL; unit->id = current_unit_num++; - unit->iseq = (rb_iseq_t *)iseq; - ISEQ_BODY(iseq)->jit_unit = unit; + if (iseq == NULL) { // Compact unit + unit->compact_p = true; + } else { // Normal unit + unit->iseq = (rb_iseq_t *)iseq; + ISEQ_BODY(iseq)->jit_unit = unit; + } + return unit; +} + +// Check if it should compact all JIT code and start it as needed +static void +check_compaction(void) +{ +#if USE_JIT_COMPACTION + // Allow only `max_cache_size / 100` times (default: 100) of compaction. + // Note: GC of compacted code has not been implemented yet. + int max_compact_size = mjit_opts.max_cache_size / 100; + if (max_compact_size < 10) max_compact_size = 10; + + // Run unload_units after it's requested `max_cache_size / 10` (default: 10) times. + // This throttles the call to mitigate locking in unload_units. It also throttles JIT compaction. + int throttle_threshold = mjit_opts.max_cache_size / 10; + + if (compact_units.length < max_compact_size + && ((!mjit_opts.wait && unit_queue.length == 0 && active_units.length > 1) + || (active_units.length == mjit_opts.max_cache_size && compact_units.length * throttle_threshold <= total_unloads))) { // throttle compaction by total_unloads + struct rb_mjit_unit *unit = create_unit(NULL); + if (unit != NULL) { + // TODO: assert unit is null + current_cc_ms = real_ms_time(); + current_cc_unit = unit; + current_cc_pid = start_mjit_compact(unit); + // TODO: check -1 + } + } +#endif +} + +// Check the current CC process if any, and start a next C compiler process as needed. +void +mjit_notify_waitpid(int status) +{ + // TODO: check current_cc_pid? + current_cc_pid = 0; + + // Delete .c file + char c_file[MAXPATHLEN]; + sprint_uniq_filename(c_file, (int)sizeof(c_file), current_cc_unit->id, MJIT_TMP_PREFIX, ".c"); + if (!mjit_opts.save_temps) + remove_file(c_file); + + // Check the result + bool success = false; + if (WIFEXITED(status)) { + success = (WEXITSTATUS(status) == 0); + } + if (!success) { + verbose(2, "Failed to generate so"); + // TODO: free unit? + // TODO: set NOT_COMPILED_JIT_ISEQ_FUNC? + return; + } + + // Load .so file + char so_file[MAXPATHLEN]; + sprint_uniq_filename(so_file, (int)sizeof(so_file), current_cc_unit->id, MJIT_TMP_PREFIX, DLEXT); + if (current_cc_unit->compact_p) { // Compact unit +#if USE_JIT_COMPACTION + load_compact_funcs_from_so(current_cc_unit, c_file, so_file); + current_cc_unit = NULL; +#else + RUBY_ASSERT(!current_cc_unit->compact_p); +#endif + } + else { // Normal unit + // Load the function from so + char funcname[MAXPATHLEN]; + sprint_funcname(funcname, current_cc_unit); + void *func = load_func_from_so(so_file, funcname, current_cc_unit); + + // Delete .so file + if (!mjit_opts.save_temps) + remove_file(so_file); + + // Set the jit_func if successful + if ((uintptr_t)func > (uintptr_t)LAST_JIT_ISEQ_FUNC) { + rb_iseq_t *iseq = current_cc_unit->iseq; + double end_time = real_ms_time(); + verbose(1, "JIT success (%.1fms): %s@%s:%ld -> %s", + end_time - current_cc_ms, RSTRING_PTR(ISEQ_BODY(iseq)->location.label), + RSTRING_PTR(rb_iseq_path(iseq)), FIX2LONG(ISEQ_BODY(iseq)->location.first_lineno), c_file); + + add_to_list(current_cc_unit, &active_units); + MJIT_ATOMIC_SET(ISEQ_BODY(iseq)->jit_func, func); + } + current_cc_unit = NULL; + + // Run compaction if it should + if (!stop_worker_p) { + check_compaction(); + } + } + + // Skip further compilation if mjit_finish is trying to stop it + if (!stop_worker_p) { + // Start the next one as needed + check_unit_queue(); + } } // Return true if given ISeq body should be compiled by MJIT @@ -273,23 +386,14 @@ mjit_target_iseq_p(struct rb_iseq_constant_body *body) static void mjit_add_iseq_to_process(const rb_iseq_t *iseq, const struct rb_mjit_compile_info *compile_info, bool recompile_p) { - if (!mjit_enabled || pch_status == PCH_FAILED) + // TODO: Support non-main Ractors + if (!mjit_enabled || pch_status == PCH_FAILED || !rb_ractor_main_p()) return; if (!mjit_target_iseq_p(ISEQ_BODY(iseq))) { ISEQ_BODY(iseq)->jit_func = (mjit_func_t)NOT_COMPILED_JIT_ISEQ_FUNC; // skip mjit_wait return; } - if (!recompile_p) { - CRITICAL_SECTION_START(3, "in add_iseq_to_process"); - - // This prevents multiple Ractors from enqueueing the same ISeq twice. - if (rb_multi_ractor_p() && (uintptr_t)ISEQ_BODY(iseq)->jit_func != NOT_ADDED_JIT_ISEQ_FUNC) { - CRITICAL_SECTION_FINISH(3, "in add_iseq_to_process"); - return; - } - } - RB_DEBUG_COUNTER_INC(mjit_add_iseq_to_process); ISEQ_BODY(iseq)->jit_func = (mjit_func_t)NOT_READY_JIT_ISEQ_FUNC; create_unit(iseq); @@ -302,12 +406,6 @@ mjit_add_iseq_to_process(const rb_iseq_t *iseq, const struct rb_mjit_compile_inf if (active_units.length >= mjit_opts.max_cache_size) { unload_requests++; } - - if (!recompile_p) { - verbose(3, "Sending wakeup signal to workers in mjit_add_iseq_to_process"); - rb_native_cond_broadcast(&mjit_worker_wakeup); - CRITICAL_SECTION_FINISH(3, "in add_iseq_to_process"); - } } // Add ISEQ to be JITed in parallel with the current thread. @@ -316,6 +414,7 @@ void rb_mjit_add_iseq_to_process(const rb_iseq_t *iseq) { mjit_add_iseq_to_process(iseq, NULL, false); + check_unit_queue(); } // For this timeout seconds, --jit-wait will wait for JIT compilation finish. @@ -324,23 +423,21 @@ rb_mjit_add_iseq_to_process(const rb_iseq_t *iseq) static void mjit_wait(struct rb_iseq_constant_body *body) { + pid_t initial_pid = current_cc_pid; struct timeval tv; int tries = 0; tv.tv_sec = 0; tv.tv_usec = 1000; - while (body->jit_func == (mjit_func_t)NOT_READY_JIT_ISEQ_FUNC) { + while (body == NULL ? current_cc_pid == initial_pid : body->jit_func == (mjit_func_t)NOT_READY_JIT_ISEQ_FUNC) { // TODO: refactor this tries++; if (tries / 1000 > MJIT_WAIT_TIMEOUT_SECONDS || pch_status == PCH_FAILED) { - CRITICAL_SECTION_START(3, "in rb_mjit_wait_call to set jit_func"); - body->jit_func = (mjit_func_t)NOT_COMPILED_JIT_ISEQ_FUNC; // JIT worker seems dead. Give up. - CRITICAL_SECTION_FINISH(3, "in rb_mjit_wait_call to set jit_func"); + if (body != NULL) { + body->jit_func = (mjit_func_t) NOT_COMPILED_JIT_ISEQ_FUNC; // JIT worker seems dead. Give up. + } mjit_warning("timed out to wait for JIT finish"); break; } - CRITICAL_SECTION_START(3, "in rb_mjit_wait_call for a client wakeup"); - rb_native_cond_broadcast(&mjit_worker_wakeup); - CRITICAL_SECTION_FINISH(3, "in rb_mjit_wait_call for a client wakeup"); rb_thread_wait_for(tv); } } @@ -377,24 +474,8 @@ mjit_recompile(const rb_iseq_t *iseq) RSTRING_PTR(rb_iseq_path(iseq)), FIX2INT(ISEQ_BODY(iseq)->location.first_lineno)); assert(ISEQ_BODY(iseq)->jit_unit != NULL); - if (UNLIKELY(mjit_opts.wait)) { - CRITICAL_SECTION_START(3, "in rb_mjit_recompile_iseq"); - remove_from_list(ISEQ_BODY(iseq)->jit_unit, &active_units); - add_to_list(ISEQ_BODY(iseq)->jit_unit, &stale_units); - mjit_add_iseq_to_process(iseq, &ISEQ_BODY(iseq)->jit_unit->compile_info, true); - CRITICAL_SECTION_FINISH(3, "in rb_mjit_recompile_iseq"); - mjit_wait(ISEQ_BODY(iseq)); - } - else { - // Lazily move active_units to stale_units to avoid race conditions around active_units with compaction. - // Also, it's lazily moved to unit_queue as well because otherwise it won't be added to stale_units properly. - // It's good to avoid a race condition between mjit_add_iseq_to_process and mjit_compile around jit_unit as well. - CRITICAL_SECTION_START(3, "in rb_mjit_recompile_iseq"); - ISEQ_BODY(iseq)->jit_unit->stale_p = true; - ISEQ_BODY(iseq)->jit_func = (mjit_func_t)NOT_READY_JIT_ISEQ_FUNC; - pending_stale_p = true; - CRITICAL_SECTION_FINISH(3, "in rb_mjit_recompile_iseq"); - } + mjit_add_iseq_to_process(iseq, &ISEQ_BODY(iseq)->jit_unit->compile_info, true); + check_unit_queue(); } // Recompile iseq, disabling send optimization @@ -642,17 +723,6 @@ start_worker(void) { stop_worker_p = false; worker_stopped = false; - - if (!rb_thread_create_mjit_thread(mjit_worker)) { - mjit_enabled = false; - rb_native_mutex_destroy(&mjit_engine_mutex); - rb_native_cond_destroy(&mjit_pch_wakeup); - rb_native_cond_destroy(&mjit_client_wakeup); - rb_native_cond_destroy(&mjit_worker_wakeup); - rb_native_cond_destroy(&mjit_gc_wakeup); - verbose(1, "Failure in MJIT thread initialization\n"); - return false; - } return true; } @@ -816,21 +886,21 @@ mjit_init(const struct mjit_options *opts) // Initialize worker thread start_worker(); + +#ifndef _MSC_VER + // TODO: Consider running C compiler asynchronously + make_pch(); +#endif } static void stop_worker(void) { - rb_execution_context_t *ec = GET_EC(); - - while (!worker_stopped) { - verbose(3, "Sending cancel signal to worker"); - CRITICAL_SECTION_START(3, "in stop_worker"); - stop_worker_p = true; // Setting this inside loop because RUBY_VM_CHECK_INTS may make this false. - rb_native_cond_broadcast(&mjit_worker_wakeup); - CRITICAL_SECTION_FINISH(3, "in stop_worker"); - RUBY_VM_CHECK_INTS(ec); + stop_worker_p = true; + if (current_cc_unit != NULL) { + mjit_wait(current_cc_unit->iseq->body); } + worker_stopped = true; } // Stop JIT-compiling methods but compiled code is kept available. @@ -846,15 +916,12 @@ mjit_pause(bool wait_p) // Flush all queued units with no option or `wait: true` if (wait_p) { - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 1000; - - while (unit_queue.length > 0 && active_units.length < mjit_opts.max_cache_size) { // inverse of condition that waits for mjit_worker_wakeup - CRITICAL_SECTION_START(3, "in mjit_pause for a worker wakeup"); - rb_native_cond_broadcast(&mjit_worker_wakeup); - CRITICAL_SECTION_FINISH(3, "in mjit_pause for a worker wakeup"); - rb_thread_wait_for(tv); + while (current_cc_unit != NULL) { + if (current_cc_unit->compact_p) { + mjit_wait(NULL); + } else { + mjit_wait(current_cc_unit->iseq->body); + } } } @@ -932,21 +999,8 @@ mjit_finish(bool close_handle_p) if (!mjit_enabled) return; - // Wait for pch finish - verbose(2, "Stopping worker thread"); - CRITICAL_SECTION_START(3, "in mjit_finish to wakeup from pch"); - // As our threads are detached, we could just cancel them. But it - // is a bad idea because OS processes (C compiler) started by - // threads can produce temp files. And even if the temp files are - // removed, the used C compiler still complaint about their - // absence. So wait for a clean finish of the threads. - while (pch_status == PCH_NOT_READY) { - verbose(3, "Waiting wakeup from make_pch"); - rb_native_cond_wait(&mjit_pch_wakeup, &mjit_engine_mutex); - } - CRITICAL_SECTION_FINISH(3, "in mjit_finish to wakeup from pch"); - // Stop worker + verbose(2, "Stopping worker thread"); stop_worker(); rb_native_mutex_destroy(&mjit_engine_mutex); @@ -1002,7 +1056,6 @@ mjit_mark(void) // // Because an MJIT worker may modify active_units anytime, we need to convert // the linked list to an array to safely loop its ISeqs without keeping a lock. - CRITICAL_SECTION_START(4, "mjit_mark"); int length = 0; if (compiling_iseqs != NULL) { while (compiling_iseqs[length]) length++; @@ -1023,7 +1076,6 @@ mjit_mark(void) i++; } assert(i == length); - CRITICAL_SECTION_FINISH(4, "mjit_mark"); for (i = 0; i < length; i++) { if (iseqs[i] == NULL) // ISeq is GC-ed -- cgit v1.2.3