diff options
Diffstat (limited to 'gc/mmtk/src/weak_proc.rs')
| -rw-r--r-- | gc/mmtk/src/weak_proc.rs | 257 |
1 files changed, 167 insertions, 90 deletions
diff --git a/gc/mmtk/src/weak_proc.rs b/gc/mmtk/src/weak_proc.rs index 11f7f5abbf..d38dbe04a4 100644 --- a/gc/mmtk/src/weak_proc.rs +++ b/gc/mmtk/src/weak_proc.rs @@ -1,23 +1,23 @@ use std::sync::Mutex; -use mmtk::{ - scheduler::{GCWork, GCWorker, WorkBucketStage}, - util::ObjectReference, - vm::ObjectTracerContext, -}; - -use crate::{ - abi::GCThreadTLS, - upcalls, - Ruby, -}; +use mmtk::scheduler::GCWork; +use mmtk::scheduler::GCWorker; +use mmtk::scheduler::WorkBucketStage; +use mmtk::util::ObjectReference; +use mmtk::vm::ObjectTracerContext; + +use crate::abi::GCThreadTLS; +use crate::upcalls; +use crate::Ruby; pub struct WeakProcessor { + non_parallel_obj_free_candidates: Mutex<Vec<ObjectReference>>, + parallel_obj_free_candidates: Vec<Mutex<Vec<ObjectReference>>>, + /// Objects that needs `obj_free` called when dying. /// If it is a bottleneck, replace it with a lock-free data structure, /// or add candidates in batch. - obj_free_candidates: Mutex<Vec<ObjectReference>>, - weak_references: Mutex<Vec<&'static mut ObjectReference>>, + weak_references: Mutex<Vec<ObjectReference>>, } impl Default for WeakProcessor { @@ -29,47 +29,74 @@ impl Default for WeakProcessor { impl WeakProcessor { pub fn new() -> Self { Self { - obj_free_candidates: Mutex::new(Vec::new()), + non_parallel_obj_free_candidates: Mutex::new(Vec::new()), + parallel_obj_free_candidates: vec![Mutex::new(Vec::new())], weak_references: Mutex::new(Vec::new()), } } - /// Add an object as a candidate for `obj_free`. - /// - /// Multiple mutators can call it concurrently, so it has `&self`. - pub fn add_obj_free_candidate(&self, object: ObjectReference) { - let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); - obj_free_candidates.push(object); + pub fn init_parallel_obj_free_candidates(&mut self, num_workers: usize) { + debug_assert_eq!(self.parallel_obj_free_candidates.len(), 1); + + for _ in 1..num_workers { + self.parallel_obj_free_candidates + .push(Mutex::new(Vec::new())); + } } - /// Add many objects as candidates for `obj_free`. + /// Add a batch of objects as candidates for `obj_free`. /// - /// Multiple mutators can call it concurrently, so it has `&self`. - pub fn add_obj_free_candidates(&self, objects: &[ObjectReference]) { - let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); - for object in objects.iter().copied() { - obj_free_candidates.push(object); + /// Amortizes mutex acquisition over the entire batch. Called when a + /// mutator's local buffer is flushed (buffer full or stop-the-world). + pub fn add_obj_free_candidates_batch( + &self, + objects: &[ObjectReference], + can_parallel_free: bool, + ) { + if objects.is_empty() { + return; + } + + if can_parallel_free { + let num_buckets = self.parallel_obj_free_candidates.len(); + for idx in 0..num_buckets { + let mut bucket = self.parallel_obj_free_candidates[idx].lock().unwrap(); + for (i, &obj) in objects.iter().enumerate() { + if i % num_buckets == idx { + bucket.push(obj); + } + } + } + } else { + self.non_parallel_obj_free_candidates + .lock() + .unwrap() + .extend_from_slice(objects); } } pub fn get_all_obj_free_candidates(&self) -> Vec<ObjectReference> { - let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); - std::mem::take(obj_free_candidates.as_mut()) + // let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap(); + let mut all_obj_free_candidates = self + .non_parallel_obj_free_candidates + .lock() + .unwrap() + .to_vec(); + + for candidates_mutex in &self.parallel_obj_free_candidates { + all_obj_free_candidates.extend(candidates_mutex.lock().unwrap().to_vec()); + } + + std::mem::take(all_obj_free_candidates.as_mut()) } - pub fn add_weak_reference(&self, ptr: &'static mut ObjectReference) { + pub fn add_weak_reference(&self, object: ObjectReference) { let mut weak_references = self.weak_references.lock().unwrap(); - weak_references.push(ptr); + weak_references.push(object); } - pub fn remove_weak_reference(&self, ptr: &ObjectReference) { - let mut weak_references = self.weak_references.lock().unwrap(); - for (i, curr_ptr) in weak_references.iter().enumerate() { - if *curr_ptr == ptr { - weak_references.swap_remove(i); - break; - } - } + pub fn weak_references_count(&self) -> usize { + self.weak_references.lock().unwrap().len() } pub fn process_weak_stuff( @@ -77,80 +104,133 @@ impl WeakProcessor { worker: &mut GCWorker<Ruby>, _tracer_context: impl ObjectTracerContext<Ruby>, ) { - worker.add_work(WorkBucketStage::VMRefClosure, ProcessObjFreeCandidates); + worker.add_work( + WorkBucketStage::VMRefClosure, + ProcessNonParallelObjFreeCanadidates {}, + ); + + for index in 0..self.parallel_obj_free_candidates.len() { + worker.add_work( + WorkBucketStage::VMRefClosure, + ProcessParallelObjFreeCandidates { index }, + ); + } + worker.add_work(WorkBucketStage::VMRefClosure, ProcessWeakReferences); worker.add_work(WorkBucketStage::Prepare, UpdateFinalizerObjIdTables); let global_tables_count = (crate::upcalls().global_tables_count)(); let work_packets = (0..global_tables_count) - .map(|i| { - Box::new(UpdateGlobalTables { idx: i }) as _ - }) - .collect(); + .map(|i| Box::new(UpdateGlobalTables { idx: i }) as _) + .collect(); worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure].bulk_add(work_packets); - worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure].bulk_add(vec![ - Box::new(UpdateWbUnprotectedObjectsList) as _, - ]); + worker.scheduler().work_buckets[WorkBucketStage::VMRefClosure] + .bulk_add(vec![Box::new(UpdateWbUnprotectedObjectsList) as _]); + } +} + +fn process_obj_free_candidates(obj_free_candidates: &mut Vec<ObjectReference>) { + // Process obj_free + let mut new_candidates = Vec::new(); + + for object in obj_free_candidates.iter().copied() { + if object.is_reachable() { + // Forward and add back to the candidate list. + let new_object = object.forward(); + trace!("Forwarding obj_free candidate: {object} -> {new_object}"); + new_candidates.push(new_object); + } else { + (upcalls().call_obj_free)(object); + } + } + + *obj_free_candidates = new_candidates; +} + +struct ProcessParallelObjFreeCandidates { + index: usize, +} + +impl GCWork<Ruby> for ProcessParallelObjFreeCandidates { + fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) { + let mut obj_free_candidates = crate::binding().weak_proc.parallel_obj_free_candidates + [self.index] + .try_lock() + .expect("Lock for parallel_obj_free_candidates should not be held"); + + process_obj_free_candidates(&mut obj_free_candidates); } } -struct ProcessObjFreeCandidates; +struct ProcessNonParallelObjFreeCanadidates; -impl GCWork<Ruby> for ProcessObjFreeCandidates { +impl GCWork<Ruby> for ProcessNonParallelObjFreeCanadidates { fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) { - // If it blocks, it is a bug. let mut obj_free_candidates = crate::binding() .weak_proc - .obj_free_candidates + .non_parallel_obj_free_candidates .try_lock() - .expect("It's GC time. No mutators should hold this lock at this time."); + .expect("Lock for non_parallel_obj_free_candidates should not be held"); - let n_cands = obj_free_candidates.len(); + process_obj_free_candidates(&mut obj_free_candidates); + } +} - debug!("Total: {} candidates", n_cands); +struct ProcessWeakReferences; - // Process obj_free - let mut new_candidates = Vec::new(); +impl GCWork<Ruby> for ProcessWeakReferences { + fn do_work(&mut self, worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) { + if crate::mmtk().get_plan().current_gc_may_move_object() { + let gc_tls: &mut GCThreadTLS = unsafe { GCThreadTLS::from_vwt_check(worker.tls) }; - for object in obj_free_candidates.iter().copied() { - if object.is_reachable() { - // Forward and add back to the candidate list. - let new_object = object.forward(); - trace!( - "Forwarding obj_free candidate: {} -> {}", - object, - new_object + let visit_object = |_worker, target_object: ObjectReference, _pin| { + debug_assert!( + mmtk::memory_manager::is_mmtk_object(target_object.to_raw_address()).is_some(), + "Destination is not an MMTk object" ); - new_candidates.push(new_object); - } else { - (upcalls().call_obj_free)(object); - } - } - *obj_free_candidates = new_candidates; + target_object + .get_forwarded_object() + .unwrap_or(target_object) + }; + + gc_tls + .object_closure + .set_temporarily_and_run_code(visit_object, || { + self.process_weak_references(true); + }) + } else { + self.process_weak_references(false); + } } } -struct ProcessWeakReferences; - -impl GCWork<Ruby> for ProcessWeakReferences { - fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) { +impl ProcessWeakReferences { + fn process_weak_references(&mut self, moving_gc: bool) { let mut weak_references = crate::binding() .weak_proc .weak_references .try_lock() .expect("Mutators should not be holding the lock."); - for ptr_ptr in weak_references.iter_mut() { - if !(**ptr_ptr).is_reachable() { - **ptr_ptr = crate::binding().weak_reference_dead_value; - } + weak_references.retain_mut(|object_ptr| { + let object = object_ptr.get_forwarded_object().unwrap_or(*object_ptr); + + if object != *object_ptr { + *object_ptr = object; } - weak_references.clear(); + if object.is_reachable() { + (upcalls().handle_weak_references)(object, moving_gc); + + true + } else { + false + } + }); } } @@ -165,11 +245,10 @@ trait GlobalTableProcessingWork { let forward_object = |_worker, object: ObjectReference, _pin| { debug_assert!( mmtk::memory_manager::is_mmtk_object(object.to_raw_address()).is_some(), - "{} is not an MMTk object", - object + "{object} is not an MMTk object" ); let result = object.forward(); - trace!("Forwarding reference: {} -> {}", object, result); + trace!("Forwarding reference: {object} -> {result}"); result }; @@ -185,7 +264,6 @@ struct UpdateFinalizerObjIdTables; impl GlobalTableProcessingWork for UpdateFinalizerObjIdTables { fn process_table(&mut self) { (crate::upcalls().update_finalizer_table)(); - (crate::upcalls().update_obj_id_tables)(); } } impl GCWork<Ruby> for UpdateFinalizerObjIdTables { @@ -195,11 +273,14 @@ impl GCWork<Ruby> for UpdateFinalizerObjIdTables { } struct UpdateGlobalTables { - idx: i32 + idx: i32, } impl GlobalTableProcessingWork for UpdateGlobalTables { fn process_table(&mut self) { - (crate::upcalls().update_global_tables)(self.idx) + (crate::upcalls().update_global_tables)( + self.idx, + crate::mmtk().get_plan().current_gc_may_move_object(), + ) } } impl GCWork<Ruby> for UpdateGlobalTables { @@ -224,14 +305,10 @@ impl GCWork<Ruby> for UpdateWbUnprotectedObjectsList { if object.is_reachable() { // Forward and add back to the candidate list. let new_object = object.forward(); - trace!( - "Forwarding WB-unprotected object: {} -> {}", - object, - new_object - ); + trace!("Forwarding WB-unprotected object: {object} -> {new_object}"); objects.insert(new_object); } else { - trace!("Removing WB-unprotected object from list: {}", object); + trace!("Removing WB-unprotected object from list: {object}"); } } |
