summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bootstraptest/test_ractor.rb3
-rw-r--r--concurrent_set.c173
2 files changed, 129 insertions, 47 deletions
diff --git a/bootstraptest/test_ractor.rb b/bootstraptest/test_ractor.rb
index 81dc2d6b8d..13c4652d37 100644
--- a/bootstraptest/test_ractor.rb
+++ b/bootstraptest/test_ractor.rb
@@ -1494,6 +1494,9 @@ assert_equal "ok", %Q{
unless a[i].equal?(b[i])
raise [a[i], b[i]].inspect
end
+ unless a[i] == i.to_s
+ raise [i, a[i], b[i]].inspect
+ end
end
:ok
}
diff --git a/concurrent_set.c b/concurrent_set.c
index 3aa61507aa..eebf7df9cb 100644
--- a/concurrent_set.c
+++ b/concurrent_set.c
@@ -4,6 +4,9 @@
#include "ruby/atomic.h"
#include "vm_sync.h"
+#define CONCURRENT_SET_CONTINUATION_BIT ((VALUE)1 << (sizeof(VALUE) * CHAR_BIT - 1))
+#define CONCURRENT_SET_HASH_MASK (~CONCURRENT_SET_CONTINUATION_BIT)
+
enum concurrent_set_special_values {
CONCURRENT_SET_EMPTY,
CONCURRENT_SET_DELETED,
@@ -25,6 +28,36 @@ struct concurrent_set {
};
static void
+concurrent_set_mark_continuation(struct concurrent_set_entry *entry, VALUE curr_hash_and_flags)
+{
+ if (curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT) return;
+
+ RUBY_ASSERT((curr_hash_and_flags & CONCURRENT_SET_HASH_MASK) != 0);
+
+ VALUE new_hash = curr_hash_and_flags | CONCURRENT_SET_CONTINUATION_BIT;
+ VALUE prev_hash = rbimpl_atomic_value_cas(&entry->hash, curr_hash_and_flags, new_hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
+
+ // At the moment we only expect to be racing concurrently against another
+ // thread also setting the continuation bit.
+ // In the future if deletion is concurrent this will need adjusting
+ RUBY_ASSERT(prev_hash == curr_hash_and_flags || prev_hash == new_hash);
+ (void)prev_hash;
+}
+
+static VALUE
+concurrent_set_hash(const struct concurrent_set *set, VALUE key)
+{
+ VALUE hash = set->funcs->hash(key);
+ hash &= CONCURRENT_SET_HASH_MASK;
+ if (hash == 0) {
+ hash ^= CONCURRENT_SET_HASH_MASK;
+ }
+ RUBY_ASSERT(hash != 0);
+ RUBY_ASSERT(!(hash & CONCURRENT_SET_CONTINUATION_BIT));
+ return hash;
+}
+
+static void
concurrent_set_free(void *ptr)
{
struct concurrent_set *set = ptr;
@@ -141,13 +174,9 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
if (key < CONCURRENT_SET_SPECIAL_VALUE_COUNT) continue;
if (!RB_SPECIAL_CONST_P(key) && rb_objspace_garbage_object_p(key)) continue;
- VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
- if (hash == 0) {
- // Either in-progress insert or extremely unlikely 0 hash.
- // Re-calculate the hash.
- hash = old_set->funcs->hash(key);
- }
- RUBY_ASSERT(hash == old_set->funcs->hash(key));
+ VALUE hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED) & CONCURRENT_SET_HASH_MASK;
+ RUBY_ASSERT(hash != 0);
+ RUBY_ASSERT(hash == concurrent_set_hash(old_set, key));
// Insert key into new_set.
struct concurrent_set_probe probe;
@@ -156,20 +185,19 @@ concurrent_set_try_resize_without_locking(VALUE old_set_obj, VALUE *set_obj_ptr)
while (true) {
struct concurrent_set_entry *entry = &new_set->entries[idx];
- if (entry->key == CONCURRENT_SET_EMPTY) {
- new_set->size++;
+ if (entry->hash == CONCURRENT_SET_EMPTY) {
+ RUBY_ASSERT(entry->key == CONCURRENT_SET_EMPTY);
+ new_set->size++;
RUBY_ASSERT(new_set->size <= new_set->capacity / 2);
- RUBY_ASSERT(entry->hash == 0);
entry->key = key;
entry->hash = hash;
break;
}
- else {
- RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
- }
+ RUBY_ASSERT(entry->key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT);
+ entry->hash |= CONCURRENT_SET_CONTINUATION_BIT;
idx = concurrent_set_probe_next(&probe);
}
}
@@ -203,20 +231,37 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
// never change.
- hash = set->funcs->hash(key);
+ hash = concurrent_set_hash(set, key);
}
- RUBY_ASSERT(hash == set->funcs->hash(key));
+ RUBY_ASSERT(hash == concurrent_set_hash(set, key));
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
+ VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
+ VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
+ bool continuation = curr_hash_and_flags & CONCURRENT_SET_CONTINUATION_BIT;
+
+ if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
+ return 0;
+ }
+
+ if (curr_hash != hash) {
+ if (!continuation) {
+ return 0;
+ }
+ idx = concurrent_set_probe_next(&probe);
+ continue;
+ }
+
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
switch (curr_key) {
case CONCURRENT_SET_EMPTY:
- return 0;
+ // In-progress insert: hash written but key not yet
+ break;
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
@@ -225,13 +270,9 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
goto retry;
default: {
- VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
- if (curr_hash != 0 && curr_hash != hash) break;
-
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
- // Skip it and mark it as deleted.
- rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
+ // Skip it and let the GC pass clean it up
break;
}
@@ -241,6 +282,10 @@ rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key)
return curr_key;
}
+ if (!continuation) {
+ return 0;
+ }
+
break;
}
}
@@ -266,23 +311,49 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
if (hash == 0) {
// We don't need to recompute the hash on every retry because it should
// never change.
- hash = set->funcs->hash(key);
+ hash = concurrent_set_hash(set, key);
}
- RUBY_ASSERT(hash == set->funcs->hash(key));
+ RUBY_ASSERT(hash == concurrent_set_hash(set, key));
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);
while (true) {
struct concurrent_set_entry *entry = &set->entries[idx];
+ VALUE curr_hash_and_flags = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_ACQUIRE);
+ VALUE curr_hash = curr_hash_and_flags & CONCURRENT_SET_HASH_MASK;
+
+ if (curr_hash_and_flags == CONCURRENT_SET_EMPTY) {
+ if (!inserting) {
+ key = set->funcs->create(key, data);
+ RUBY_ASSERT(hash == concurrent_set_hash(set, key));
+ inserting = true;
+ }
+
+ // Reserve this slot for our hash value
+ curr_hash_and_flags = rbimpl_atomic_value_cas(&entry->hash, CONCURRENT_SET_EMPTY, hash, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
+ if (curr_hash_and_flags != CONCURRENT_SET_EMPTY) {
+ // Lost race, retry same slot to check winner's hash
+ continue;
+ }
+
+ // CAS succeeded, so these are the values stored
+ curr_hash_and_flags = hash;
+ curr_hash = hash;
+ // Fall through to try to claim key
+ }
+
+ if (curr_hash != hash) {
+ goto probe_next;
+ }
+
VALUE curr_key = rbimpl_atomic_value_load(&entry->key, RBIMPL_ATOMIC_ACQUIRE);
switch (curr_key) {
- case CONCURRENT_SET_EMPTY: {
- // Not in set
+ case CONCURRENT_SET_EMPTY:
if (!inserting) {
key = set->funcs->create(key, data);
- RUBY_ASSERT(hash == set->funcs->hash(key));
+ RUBY_ASSERT(hash == concurrent_set_hash(set, key));
inserting = true;
}
@@ -293,14 +364,11 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
if (UNLIKELY(load_factor_reached)) {
concurrent_set_try_resize(set_obj, set_obj_ptr);
-
goto retry;
}
- curr_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
- if (curr_key == CONCURRENT_SET_EMPTY) {
- rbimpl_atomic_value_store(&entry->hash, hash, RBIMPL_ATOMIC_RELAXED);
-
+ VALUE prev_key = rbimpl_atomic_value_cas(&entry->key, CONCURRENT_SET_EMPTY, key, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
+ if (prev_key == CONCURRENT_SET_EMPTY) {
RB_GC_GUARD(set_obj);
return key;
}
@@ -311,22 +379,16 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
// Another thread won the race, try again at the same location.
continue;
}
- }
case CONCURRENT_SET_DELETED:
break;
case CONCURRENT_SET_MOVED:
// Wait
RB_VM_LOCKING();
-
goto retry;
- default: {
- VALUE curr_hash = rbimpl_atomic_value_load(&entry->hash, RBIMPL_ATOMIC_RELAXED);
- if (curr_hash != 0 && curr_hash != hash) break;
-
+ default:
if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) {
// This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object.
- // Skip it and mark it as deleted.
- rbimpl_atomic_value_cas(&entry->key, curr_key, CONCURRENT_SET_DELETED, RBIMPL_ATOMIC_RELEASE, RBIMPL_ATOMIC_RELAXED);
+ // Skip it and let the GC pass clean it up
break;
}
@@ -343,15 +405,33 @@ rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data)
return curr_key;
}
-
break;
- }
}
+ probe_next:
+ RUBY_ASSERT(curr_hash_and_flags != CONCURRENT_SET_EMPTY);
+ concurrent_set_mark_continuation(entry, curr_hash_and_flags);
idx = concurrent_set_probe_next(&probe);
}
}
+static void
+concurrent_set_delete_entry_locked(struct concurrent_set *set, struct concurrent_set_entry *entry)
+{
+ ASSERT_vm_locking_with_barrier();
+
+ if (entry->hash & CONCURRENT_SET_CONTINUATION_BIT) {
+ entry->hash = CONCURRENT_SET_CONTINUATION_BIT;
+ entry->key = CONCURRENT_SET_DELETED;
+ set->deleted_entries++;
+ }
+ else {
+ entry->hash = CONCURRENT_SET_EMPTY;
+ entry->key = CONCURRENT_SET_EMPTY;
+ set->size--;
+ }
+}
+
VALUE
rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
{
@@ -359,7 +439,7 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj);
- VALUE hash = set->funcs->hash(key);
+ VALUE hash = concurrent_set_hash(set, key);
struct concurrent_set_probe probe;
int idx = concurrent_set_probe_start(&probe, set, hash);
@@ -379,8 +459,8 @@ rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key)
break;
default:
if (key == curr_key) {
- entry->key = CONCURRENT_SET_DELETED;
- set->deleted_entries++;
+ RUBY_ASSERT((entry->hash & CONCURRENT_SET_HASH_MASK) == hash);
+ concurrent_set_delete_entry_locked(set, entry);
return curr_key;
}
break;
@@ -399,7 +479,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key
for (unsigned int i = 0; i < set->capacity; i++) {
struct concurrent_set_entry *entry = &set->entries[i];
- VALUE key = set->entries[i].key;
+ VALUE key = entry->key;
switch (key) {
case CONCURRENT_SET_EMPTY:
@@ -414,8 +494,7 @@ rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key
case ST_STOP:
return;
case ST_DELETE:
- set->entries[i].key = CONCURRENT_SET_DELETED;
- set->deleted_entries++;
+ concurrent_set_delete_entry_locked(set, entry);
break;
}
break;