diff options
Diffstat (limited to 'concurrent_set.c')
| -rw-r--r-- | concurrent_set.c | 513 |
1 files changed, 513 insertions, 0 deletions
diff --git a/concurrent_set.c b/concurrent_set.c new file mode 100644 index 0000000000..234b6408b6 --- /dev/null +++ b/concurrent_set.c @@ -0,0 +1,513 @@ +#include "internal.h" +#include "internal/gc.h" +#include "internal/concurrent_set.h" +#include "ruby/atomic.h" +#include "vm_sync.h" + +#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1)) +#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT) + +enum concurrent_set_special_values { + CONCURRENT_SET_EMPTY, + CONCURRENT_SET_DELETED, + CONCURRENT_SET_MOVED, + CONCURRENT_SET_SPECIAL_VALUE_COUNT +}; + +struct concurrent_set_entry { + VALUE hash; + VALUE key; +}; + +struct concurrent_set { + rb_atomic_t size; + unsigned int capacity; + unsigned int deleted_entries; + const struct rb_concurrent_set_funcs *funcs; + struct concurrent_set_entry *entries; +}; + +static void +concurrent_set_mark_continuation(struct concurrent_set_entry *entry, VALUE curr_hash_and_flags) +{ + if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT) return; + + RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0); + + VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT; + VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + + // At the moment we only expect to be racing concurrently against another + // thread also setting the continuation bit. + // In the future if deletion is concurrent this will need adjusting + RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash); + (void)prev_hash; +} + +static VALUE +concurrent_set_hash(const struct concurrent_set *set, VALUE key) +{ + VALUE hash = set->funcs->hash(key); + hash &= CONCURRENT_SET_HASH_MASK; + if (hash == 0) { + hash ^= CONCURRENT_SET_HASH_MASK; + } + RUBY_ASSERT(hash != 0); + RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT)); + return hash; +} + +static void +concurrent_set_free(void *ptr) +{ + struct concurrent_set *set = ptr; + xfree(set->entries); +} + +static size_t +concurrent_set_size(const void *ptr) +{ + const struct concurrent_set *set = ptr; + return sizeof(struct concurrent_set) + + (set->capacity * sizeof(struct concurrent_set_entry)); +} + +/* Hack: Though it would be trivial, we're intentionally avoiding WB-protecting + * this object. This prevents the object from aging and ensures it can always be + * collected in a minor GC. + * Longer term this deserves a better way to reclaim memory promptly. + */ +static void +concurrent_set_mark(void *ptr) +{ + (void)ptr; +} + +static const rb_data_type_t concurrent_set_type = { + .wrap_struct_name = "VM/concurrent_set", + .function = { + .dmark = concurrent_set_mark, + .dfree = concurrent_set_free, + .dsize = concurrent_set_size, + }, + /* Hack: NOT WB_PROTECTED on purpose (see above) */ + .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_EMBEDDABLE +}; + +VALUE +rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity) +{ + struct concurrent_set *set; + VALUE obj = TypedData_Make_Struct(0, struct concurrent_set, &concurrent_set_type, set); + set->funcs = funcs; + set->entries = ZALLOC_N(struct concurrent_set_entry, capacity); + set->capacity = capacity; + return obj; +} + +rb_atomic_t +rb_concurrent_set_size(VALUE set_obj) +{ + struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + + return RUBY_ATOMIC_LOAD(set->size); +} + +struct concurrent_set_probe { + int idx; + int d; + int mask; +}; + +static int +concurrent_set_probe_start(struct concurrent_set_probe *probe, struct concurrent_set *set, VALUE hash) +{ + RUBY_ASSERT((set->capacity & (set->capacity - 1)) == 0); + probe->d = 0; + probe->mask = set->capacity - 1; + probe->idx = hash & probe->mask; + return probe->idx; +} + +static int +concurrent_set_probe_next(struct concurrent_set_probe *probe) +{ + probe->d++; + probe->idx = (probe->idx + probe->d) & probe->mask; + return probe->idx; +} + +static void +concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr) +{ + // Check if another thread has already resized. + if (rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE) != old_set_obj) { + return; + } + + struct concurrent_set *old_set = RTYPEDDATA_GET_DATA(old_set_obj); + + // This may overcount by up to the number of threads concurrently attempting to insert + // GC may also happen between now and the set being rebuilt + int expected_size = rbimpl_atomic_load(&old_set->size, RBIMPL_ATOMIC_RELAXED) - old_set->deleted_entries; + + // NOTE: new capacity must make sense with load factor, don't change one without checking the other. + struct concurrent_set_entry *old_entries = old_set->entries; + int old_capacity = old_set->capacity; + int new_capacity = old_capacity * 2; + if (new_capacity > expected_size * 8) { + new_capacity = old_capacity / 2; + } + else if (new_capacity > expected_size * 4) { + new_capacity = old_capacity; + } + + // May cause GC and therefore deletes, so must happen first. + VALUE new_set_obj = rb_concurrent_set_new(old_set->funcs, new_capacity); + struct concurrent_set *new_set = RTYPEDDATA_GET_DATA(new_set_obj); + + for (int i = 0; i < old_capacity; i++) { + struct concurrent_set_entry *old_entry = &old_entries[i]; + VALUE key = rbimpl_atomic_value_exchange(&old_entry->key, CONCURRENT_SET_MOVED, RBIMPL_ATOMIC_ACQUIRE); + RUBY_ASSERT(key != CONCURRENT_SET_MOVED); + + if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue; + if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue; + + VALUE hash = rbimpl_atomic_value_load(&old_entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK; + RUBY_ASSERT(hash != 0); + RUBY_ASSERT(hash == concurrent_set_hash(old_set, key)); + + // Insert key into new_set. + struct concurrent_set_probe probe; + int idx = concurrent_set_probe_start(&probe, new_set, hash); + + while (true) { + struct concurrent_set_entry *entry = &new_set->entries[idx]; + + if (entry->hash == CONCURRENT_SET_EMPTY) { + RUBY_ASSERT(entry->key == CONCURRENT_SET_EMPTY); + + new_set->size++; + RUBY_ASSERT(new_set->size <= new_set->capacity / 2); + + entry->key = key; + entry->hash = hash; + break; + } + + RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); + entry->hash |= CONCURRENT_SET_CONTINUATION_BIT; + idx = concurrent_set_probe_next(&probe); + } + } + + rbimpl_atomic_value_store(set_obj_ptr, new_set_obj, RBIMPL_ATOMIC_RELEASE); + + RB_GC_GUARD(old_set_obj); +} + +static void +concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr) +{ + RB_VM_LOCKING() { + concurrent_set_try_resize_without_locking(old_set_obj, set_obj_ptr); + } +} + +VALUE +rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) +{ + RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); + + VALUE set_obj; + VALUE hash = 0; + struct concurrent_set *set; + struct concurrent_set_probe probe; + int idx; + + retry: + set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); + RUBY_ASSERT(set_obj); + set = RTYPEDDATA_GET_DATA(set_obj); + + if (hash == 0) { + // We don't need to recompute the hash on every retry because it should + // never change. + hash = concurrent_set_hash(set, key); + } + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); + + idx = concurrent_set_probe_start(&probe, set, hash); + + while (true) { + struct concurrent_set_entry *entry = &set->entries[idx]; + VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE); + VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK; + bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT; + + if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) { + return 0; + } + + if (curr_hash != hash) { + if (!continuation) { + return 0; + } + idx = concurrent_set_probe_next(&probe); + continue; + } + + VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE); + + switch (curr_key) { + case CONCURRENT_SET_EMPTY: + // In-progress insert: hash written but key not yet + break; + case CONCURRENT_SET_DELETED: + break; + case CONCURRENT_SET_MOVED: + // Wait + RB_VM_LOCKING(); + + goto retry; + default: { + if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) { + // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object. + // Skip it and let the GC pass clean it up + break; + } + + if (set->funcs->cmp(key, curr_key)) { + // We've found a match. + RB_GC_GUARD(set_obj); + return curr_key; + } + + if (!continuation) { + return 0; + } + + break; + } + } + + idx = concurrent_set_probe_next(&probe); + } +} + +VALUE +rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) +{ + RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); + + // First attempt to find + { + VALUE result = rb_concurrent_set_find(set_obj_ptr, key); + if (result) return result; + } + + // First time we need to call create, and store the hash + VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); + RUBY_ASSERT(set_obj); + + struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + key = set->funcs->create(key, data); + VALUE hash = concurrent_set_hash(set, key); + + struct concurrent_set_probe probe; + int idx; + + goto start_search; + +retry: + // On retries we only need to load the hash object + set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE); + RUBY_ASSERT(set_obj); + set = RTYPEDDATA_GET_DATA(set_obj); + + RUBY_ASSERT(hash == concurrent_set_hash(set, key)); + +start_search: + idx = concurrent_set_probe_start(&probe, set, hash); + + while (true) { + struct concurrent_set_entry *entry = &set->entries[idx]; + VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE); + VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK; + bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT; + + if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) { + // Reserve this slot for our hash value + curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) { + // Lost race, retry same slot to check winner's hash + continue; + } + + // CAS succeeded, so these are the values stored + curr_hash_and_flags = hash; + curr_hash = hash; + + // Fall through to try to claim key + } + + if (curr_hash != hash) { + goto probe_next; + } + + VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE); + + switch (curr_key) { + case CONCURRENT_SET_EMPTY: { + rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED); + + // Load_factor reached at 75% full. ex: prev_size: 32, capacity: 64, load_factor: 50%. + bool load_factor_reached = (uint64_t)(prev_size * 4) >= (uint64_t)(set->capacity * 3); + + if (UNLIKELY(load_factor_reached)) { + concurrent_set_try_resize(set_obj, set_obj_ptr); + goto retry; + } + + VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + if (prev_key == CONCURRENT_SET_EMPTY) { + RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key); + RB_GC_GUARD(set_obj); + return key; + } + else { + // Entry was not inserted. + rbimpl_atomic_sub(&set->size, 1, RBIMPL_ATOMIC_RELAXED); + + // Another thread won the race, try again at the same location. + continue; + } + } + case CONCURRENT_SET_DELETED: + break; + case CONCURRENT_SET_MOVED: + // Wait + RB_VM_LOCKING(); + goto retry; + default: + // We're never GC during our search + // If the continuation bit wasn't set at the start of our search, + // any concurrent find with the same hash value would also look at + // this location and try to swap curr_key + if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) { + if (continuation) { + goto probe_next; + } + rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED); + continue; + } + + if (set->funcs->cmp(key, curr_key)) { + // We've found a live match. + RB_GC_GUARD(set_obj); + + // We created key using set->funcs->create, but we didn't end + // up inserting it into the set. Free it here to prevent memory + // leaks. + if (set->funcs->free) set->funcs->free(key); + + return curr_key; + } + break; + } + + probe_next: + RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY); + concurrent_set_mark_continuation(entry, curr_hash_and_flags); + idx = concurrent_set_probe_next(&probe); + } +} + +static void +concurrent_set_delete_entry_locked(struct concurrent_set *set, struct concurrent_set_entry *entry) +{ + ASSERT_vm_locking_with_barrier(); + + if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) { + entry->hash = CONCURRENT_SET_CONTINUATION_BIT; + entry->key = CONCURRENT_SET_DELETED; + set->deleted_entries++; + } + else { + entry->hash = CONCURRENT_SET_EMPTY; + entry->key = CONCURRENT_SET_EMPTY; + set->size--; + } +} + +VALUE +rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key) +{ + ASSERT_vm_locking_with_barrier(); + + struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + + VALUE hash = concurrent_set_hash(set, key); + + struct concurrent_set_probe probe; + int idx = concurrent_set_probe_start(&probe, set, hash); + + while (true) { + struct concurrent_set_entry *entry = &set->entries[idx]; + VALUE curr_key = entry->key; + + switch (curr_key) { + case CONCURRENT_SET_EMPTY: + // We didn't find our entry to delete. + return 0; + case CONCURRENT_SET_DELETED: + break; + case CONCURRENT_SET_MOVED: + rb_bug("rb_concurrent_set_delete_by_identity: moved entry"); + break; + default: + if (key == curr_key) { + RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash); + concurrent_set_delete_entry_locked(set, entry); + return curr_key; + } + break; + } + + idx = concurrent_set_probe_next(&probe); + } +} + +void +rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data) +{ + ASSERT_vm_locking_with_barrier(); + + struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + + for (unsigned int i = 0; i < set->capacity; i++) { + struct concurrent_set_entry *entry = &set->entries[i]; + VALUE key = entry->key; + + switch (key) { + case CONCURRENT_SET_EMPTY: + case CONCURRENT_SET_DELETED: + continue; + case CONCURRENT_SET_MOVED: + rb_bug("rb_concurrent_set_foreach_with_replace: moved entry"); + break; + default: { + int ret = callback(&entry->key, data); + switch (ret) { + case ST_STOP: + return; + case ST_DELETE: + concurrent_set_delete_entry_locked(set, entry); + break; + } + break; + } + } + } +} |
