summaryrefslogtreecommitdiff
path: root/thread_sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'thread_sync.c')
-rw-r--r--thread_sync.c262
1 files changed, 262 insertions, 0 deletions
diff --git a/thread_sync.c b/thread_sync.c
index fa6a60ab62..cf4e3843ff 100644
--- a/thread_sync.c
+++ b/thread_sync.c
@@ -1261,6 +1261,263 @@ rb_condvar_broadcast(rb_execution_context_t *ec, VALUE self)
return self;
}
+/* Thread::Monitor */
+
+struct rb_monitor {
+ long count;
+ rb_serial_t ec_serial;
+ VALUE mutex;
+};
+
+static void
+monitor_mark(void *ptr)
+{
+ struct rb_monitor *mc = ptr;
+ rb_gc_mark_movable(mc->mutex);
+}
+
+static void
+monitor_compact(void *ptr)
+{
+ struct rb_monitor *mc = ptr;
+ mc->mutex = rb_gc_location(mc->mutex);
+}
+
+static const rb_data_type_t monitor_data_type = {
+ .wrap_struct_name = "monitor",
+ .function = {
+ .dmark = monitor_mark,
+ .dfree = RUBY_TYPED_DEFAULT_FREE,
+ .dsize = NULL, // Fully embeded
+ .dcompact = monitor_compact,
+ },
+ .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE,
+};
+
+static VALUE
+monitor_alloc(VALUE klass)
+{
+ struct rb_monitor *mc;
+ VALUE obj;
+
+ obj = TypedData_Make_Struct(klass, struct rb_monitor, &monitor_data_type, mc);
+ RB_OBJ_WRITE(obj, &mc->mutex, rb_mutex_new());
+ mc->ec_serial = 0;
+ mc->count = 0;
+
+ return obj;
+}
+
+static struct rb_monitor *
+monitor_ptr(VALUE monitor)
+{
+ struct rb_monitor *mc;
+ TypedData_Get_Struct(monitor, struct rb_monitor, &monitor_data_type, mc);
+ return mc;
+}
+
+static bool
+mc_owner_p(struct rb_monitor *mc, rb_serial_t current_fiber_serial)
+{
+ return mc->ec_serial == current_fiber_serial;
+}
+
+static VALUE
+rb_monitor_try_enter(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct rb_monitor *mc = monitor_ptr(monitor);
+
+ rb_serial_t current_fiber_serial = rb_ec_serial(ec);
+ if (!mc_owner_p(mc, current_fiber_serial)) {
+ if (!rb_mut_trylock(ec, mc->mutex)) {
+ return Qfalse;
+ }
+ mc->ec_serial = current_fiber_serial;
+ mc->count = 0;
+ }
+ mc->count += 1;
+ return Qtrue;
+}
+
+struct monitor_args {
+ VALUE monitor;
+ struct rb_monitor *mc;
+ rb_serial_t current_fiber_serial;
+ rb_execution_context_t *ec;
+};
+
+static void
+monitor_enter0(struct monitor_args *args)
+{
+ if (!mc_owner_p(args->mc, args->current_fiber_serial)) {
+ struct mutex_args mut_args = {
+ .self = args->mc->mutex,
+ .mutex = mutex_ptr(args->mc->mutex),
+ .ec= args->ec,
+ };
+ do_mutex_lock(&mut_args, 1);
+ args->mc->ec_serial = args->current_fiber_serial;
+ args->mc->count = 0;
+ }
+ args->mc->count++;
+}
+
+static VALUE
+rb_monitor_enter(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct monitor_args args = {
+ .monitor = monitor,
+ .mc = monitor_ptr(monitor),
+ .ec = ec,
+ .current_fiber_serial = rb_ec_serial(ec),
+ };
+ monitor_enter0(&args);
+ return Qnil;
+}
+
+static inline void
+monitor_check_owner0(struct monitor_args *args)
+{
+ if (!mc_owner_p(args->mc, args->current_fiber_serial)) {
+ rb_raise(rb_eThreadError, "current fiber not owner");
+ }
+}
+
+static VALUE
+rb_monitor_check_owner(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct monitor_args args = {
+ .monitor = monitor,
+ .mc = monitor_ptr(monitor),
+ .ec = ec,
+ .current_fiber_serial = rb_ec_serial(ec),
+ };
+ monitor_check_owner0(&args);
+ return Qnil;
+}
+
+static void
+monitor_exit0(struct monitor_args *args)
+{
+ monitor_check_owner0(args);
+
+ if (args->mc->count <= 0) rb_bug("monitor_exit: count:%d", (int)args->mc->count);
+ args->mc->count--;
+
+ if (args->mc->count == 0) {
+ args->mc->ec_serial = 0;
+
+ struct mutex_args mut_args = {
+ .self = args->mc->mutex,
+ .mutex = mutex_ptr(args->mc->mutex),
+ .ec= args->ec,
+ };
+ do_mutex_unlock(&mut_args);
+ }
+}
+
+static VALUE
+rb_monitor_exit(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct monitor_args args = {
+ .monitor = monitor,
+ .mc = monitor_ptr(monitor),
+ .ec = ec,
+ .current_fiber_serial = rb_ec_serial(ec),
+ };
+ monitor_exit0(&args);
+ return Qnil;
+}
+
+static VALUE
+rb_monitor_locked_p(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct rb_monitor *mc = monitor_ptr(monitor);
+ return rb_mutex_locked_p(mc->mutex);
+}
+
+static VALUE
+rb_monitor_owned_p(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct rb_monitor *mc = monitor_ptr(monitor);
+ return RBOOL(rb_mutex_locked_p(mc->mutex) && mc_owner_p(mc, rb_ec_serial(ec)));
+}
+
+static VALUE
+monitor_exit_for_cond(VALUE monitor)
+{
+ struct rb_monitor *mc = monitor_ptr(monitor);
+ long cnt = mc->count;
+ mc->ec_serial = 0;
+ mc->count = 0;
+ return LONG2NUM(cnt);
+}
+
+struct wait_for_cond_data {
+ VALUE monitor;
+ VALUE cond;
+ VALUE timeout;
+ VALUE count;
+};
+
+static VALUE
+monitor_wait_for_cond_body(VALUE v)
+{
+ struct wait_for_cond_data *data = (struct wait_for_cond_data *)v;
+ struct rb_monitor *mc = monitor_ptr(data->monitor);
+ // cond.wait(monitor.mutex, timeout)
+ VALUE signaled = rb_funcall(data->cond, rb_intern("wait"), 2, mc->mutex, data->timeout);
+ return RTEST(signaled) ? Qtrue : Qfalse;
+}
+
+static VALUE
+monitor_enter_for_cond(VALUE v)
+{
+ // assert(rb_mutex_owned_p(mc->mutex) == Qtrue)
+ // but rb_mutex_owned_p is not exported...
+
+ struct wait_for_cond_data *data = (struct wait_for_cond_data *)v;
+ struct rb_monitor *mc = monitor_ptr(data->monitor);
+ mc->ec_serial = rb_ec_serial(GET_EC());
+ mc->count = NUM2LONG(data->count);
+ return Qnil;
+}
+
+static VALUE
+rb_monitor_wait_for_cond(rb_execution_context_t *ec, VALUE monitor, VALUE cond, VALUE timeout)
+{
+ VALUE count = monitor_exit_for_cond(monitor);
+ struct wait_for_cond_data data = {
+ monitor,
+ cond,
+ timeout,
+ count,
+ };
+
+ return rb_ensure(monitor_wait_for_cond_body, (VALUE)&data,
+ monitor_enter_for_cond, (VALUE)&data);
+}
+
+static VALUE
+monitor_sync_ensure(VALUE v_args)
+{
+ monitor_exit0((struct monitor_args *)v_args);
+ return Qnil;
+}
+
+static VALUE
+rb_monitor_synchronize(rb_execution_context_t *ec, VALUE monitor)
+{
+ struct monitor_args args = {
+ .monitor = monitor,
+ .mc = monitor_ptr(monitor),
+ .ec = ec,
+ .current_fiber_serial = rb_ec_serial(ec),
+ };
+ monitor_enter0(&args);
+ return rb_ec_ensure(ec, do_ec_yield, (VALUE)ec, monitor_sync_ensure, (VALUE)&args);
+}
+
static void
Init_thread_sync(void)
{
@@ -1283,6 +1540,11 @@ Init_thread_sync(void)
id_sleep = rb_intern("sleep");
+ /* Monitor */
+ VALUE rb_cMonitor = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("Monitor"), rb_cObject);
+ rb_define_alloc_func(rb_cMonitor, monitor_alloc);
+
+ rb_provide("monitor.so");
rb_provide("thread.rb");
}