summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--concurrent_set.c85
1 files changed, 47 insertions, 38 deletions
diff --git a/concurrent_set.c b/concurrent_set.c
index eebf7df9cb..376b20d7d4 100644
--- a/concurrent_set.c
+++ b/concurrent_set.c
@@ -222,11 +222,14 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
VALUE set_obj;
VALUE hash = 0;
+ struct concurrent_set *set;
+ struct concurrent_set_probe probe;
+ int idx;
retry:
set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
- struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
+ set = RTYPEDDATA_GET_DATA(set_obj);
if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
@@ -235,8 +238,7 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
}
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
- struct concurrent_set_probe probe;
- int idx = concurrent_set_probe_start(&probe, set, hash);
+ idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
@@ -299,37 +301,43 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
{
RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
- bool inserting = false;
- VALUE set_obj;
- VALUE hash = 0;
+ // First attempt to find
+ {
+ VALUE result = rb_concurrent_set_find(set_obj_ptr, key);
+ if (result) return result;
+ }
- retry:
- set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
+ // First time we need to call create, and store the hash
+ VALUE set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
RUBY_ASSERT(set_obj);
+
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
+ key = set->funcs->create(key, data);
+ VALUE hash = concurrent_set_hash(set, key);
+
+ struct concurrent_set_probe probe;
+ int idx;
+
+ goto start_search;
+
+retry:
+ // On retries we only need to load the hash object
+ set_obj = rbimpl_atomic_value_load(set_obj_ptr, RBIMPL_ATOMIC_ACQUIRE);
+ RUBY_ASSERT(set_obj);
+ set = RTYPEDDATA_GET_DATA(set_obj);
- if (hash == 0) {
- // We don't need to recompute the hash on every retry because it should
- // never change.
- hash = concurrent_set_hash(set, key);
- }
RUBY_ASSERT(hash == concurrent_set_hash(set, key));
- struct concurrent_set_probe probe;
- int idx = concurrent_set_probe_start(&probe, set, hash);
+start_search:
+ idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
+ bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
- if (!inserting) {
- key = set->funcs->create(key, data);
- RUBY_ASSERT(hash == concurrent_set_hash(set, key));
- inserting = true;
- }
-
// Reserve this slot for our hash value
curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
@@ -340,6 +348,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
// CAS succeeded, so these are the values stored
curr_hash_and_flags = hash;
curr_hash = hash;
+
// Fall through to try to claim key
}
@@ -350,13 +359,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
switch (curr_key) {
- case CONCURRENT_SET_EMPTY:
- if (!inserting) {
- key = set->funcs->create(key, data);
- RUBY_ASSERT(hash == concurrent_set_hash(set, key));
- inserting = true;
- }
-
+ case CONCURRENT_SET_EMPTY: {
rb_atomic_t prev_size = rbimpl_atomic_fetch_add(&set->size, 1, RBIMPL_ATOMIC_RELAXED);
// Load_factor reached at 75% full. ex: prev_size: 32, capacity: 64, load_factor: 50%.
@@ -369,6 +372,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
if (prev_key == CONCURRENT_SET_EMPTY) {
+ RUBY_ASSERT(rb_concurrent_set_find(set_obj_ptr, key) == key);
RB_GC_GUARD(set_obj);
return key;
}
@@ -379,6 +383,7 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
// Another thread won the race, try again at the same location.
continue;
}
+ }
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
@@ -386,22 +391,26 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
RB_VM_LOCKING();
goto retry;
default:
+ // We're never GC during our search
+ // If the continuation bit wasn't set at the start of our search,
+ // any concurrent find with the same hash value would also look at
+ // this location and try to swap curr_key
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
- // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
- // Skip it and let the GC pass clean it up
- break;
+ if (continuation) {
+ goto probe_next;
+ }
+ rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_EMPTY, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
+ continue;
}
if (set->funcs->cmp(key, curr_key)) {
- // We've found a match.
+ // We've found a live match.
RB_GC_GUARD(set_obj);
- if (inserting) {
- // We created key using set->funcs->create, but we didn't end
- // up inserting it into the set. Free it here to prevent memory
- // leaks.
- if (set->funcs->free) set->funcs->free(key);
- }
+ // We created key using set->funcs->create, but we didn't end
+ // up inserting it into the set. Free it here to prevent memory
+ // leaks.
+ if (set->funcs->free) set->funcs->free(key);
return curr_key;
}