diff options
Diffstat (limited to 'thread_sync.c')
| -rw-r--r-- | thread_sync.c | 262 |
1 files changed, 262 insertions, 0 deletions
diff --git a/thread_sync.c b/thread_sync.c index fa6a60ab62..cf4e3843ff 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1261,6 +1261,263 @@ rb_condvar_broadcast(rb_execution_context_t *ec, VALUE self) return self; } +/* Thread::Monitor */ + +struct rb_monitor { + long count; + rb_serial_t ec_serial; + VALUE mutex; +}; + +static void +monitor_mark(void *ptr) +{ + struct rb_monitor *mc = ptr; + rb_gc_mark_movable(mc->mutex); +} + +static void +monitor_compact(void *ptr) +{ + struct rb_monitor *mc = ptr; + mc->mutex = rb_gc_location(mc->mutex); +} + +static const rb_data_type_t monitor_data_type = { + .wrap_struct_name = "monitor", + .function = { + .dmark = monitor_mark, + .dfree = RUBY_TYPED_DEFAULT_FREE, + .dsize = NULL, // Fully embeded + .dcompact = monitor_compact, + }, + .flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED | RUBY_TYPED_EMBEDDABLE, +}; + +static VALUE +monitor_alloc(VALUE klass) +{ + struct rb_monitor *mc; + VALUE obj; + + obj = TypedData_Make_Struct(klass, struct rb_monitor, &monitor_data_type, mc); + RB_OBJ_WRITE(obj, &mc->mutex, rb_mutex_new()); + mc->ec_serial = 0; + mc->count = 0; + + return obj; +} + +static struct rb_monitor * +monitor_ptr(VALUE monitor) +{ + struct rb_monitor *mc; + TypedData_Get_Struct(monitor, struct rb_monitor, &monitor_data_type, mc); + return mc; +} + +static bool +mc_owner_p(struct rb_monitor *mc, rb_serial_t current_fiber_serial) +{ + return mc->ec_serial == current_fiber_serial; +} + +static VALUE +rb_monitor_try_enter(rb_execution_context_t *ec, VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + + rb_serial_t current_fiber_serial = rb_ec_serial(ec); + if (!mc_owner_p(mc, current_fiber_serial)) { + if (!rb_mut_trylock(ec, mc->mutex)) { + return Qfalse; + } + mc->ec_serial = current_fiber_serial; + mc->count = 0; + } + mc->count += 1; + return Qtrue; +} + +struct monitor_args { + VALUE monitor; + struct rb_monitor *mc; + rb_serial_t current_fiber_serial; + rb_execution_context_t *ec; +}; + +static void +monitor_enter0(struct monitor_args *args) +{ + if (!mc_owner_p(args->mc, args->current_fiber_serial)) { + struct mutex_args mut_args = { + .self = args->mc->mutex, + .mutex = mutex_ptr(args->mc->mutex), + .ec= args->ec, + }; + do_mutex_lock(&mut_args, 1); + args->mc->ec_serial = args->current_fiber_serial; + args->mc->count = 0; + } + args->mc->count++; +} + +static VALUE +rb_monitor_enter(rb_execution_context_t *ec, VALUE monitor) +{ + struct monitor_args args = { + .monitor = monitor, + .mc = monitor_ptr(monitor), + .ec = ec, + .current_fiber_serial = rb_ec_serial(ec), + }; + monitor_enter0(&args); + return Qnil; +} + +static inline void +monitor_check_owner0(struct monitor_args *args) +{ + if (!mc_owner_p(args->mc, args->current_fiber_serial)) { + rb_raise(rb_eThreadError, "current fiber not owner"); + } +} + +static VALUE +rb_monitor_check_owner(rb_execution_context_t *ec, VALUE monitor) +{ + struct monitor_args args = { + .monitor = monitor, + .mc = monitor_ptr(monitor), + .ec = ec, + .current_fiber_serial = rb_ec_serial(ec), + }; + monitor_check_owner0(&args); + return Qnil; +} + +static void +monitor_exit0(struct monitor_args *args) +{ + monitor_check_owner0(args); + + if (args->mc->count <= 0) rb_bug("monitor_exit: count:%d", (int)args->mc->count); + args->mc->count--; + + if (args->mc->count == 0) { + args->mc->ec_serial = 0; + + struct mutex_args mut_args = { + .self = args->mc->mutex, + .mutex = mutex_ptr(args->mc->mutex), + .ec= args->ec, + }; + do_mutex_unlock(&mut_args); + } +} + +static VALUE +rb_monitor_exit(rb_execution_context_t *ec, VALUE monitor) +{ + struct monitor_args args = { + .monitor = monitor, + .mc = monitor_ptr(monitor), + .ec = ec, + .current_fiber_serial = rb_ec_serial(ec), + }; + monitor_exit0(&args); + return Qnil; +} + +static VALUE +rb_monitor_locked_p(rb_execution_context_t *ec, VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + return rb_mutex_locked_p(mc->mutex); +} + +static VALUE +rb_monitor_owned_p(rb_execution_context_t *ec, VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + return RBOOL(rb_mutex_locked_p(mc->mutex) && mc_owner_p(mc, rb_ec_serial(ec))); +} + +static VALUE +monitor_exit_for_cond(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + long cnt = mc->count; + mc->ec_serial = 0; + mc->count = 0; + return LONG2NUM(cnt); +} + +struct wait_for_cond_data { + VALUE monitor; + VALUE cond; + VALUE timeout; + VALUE count; +}; + +static VALUE +monitor_wait_for_cond_body(VALUE v) +{ + struct wait_for_cond_data *data = (struct wait_for_cond_data *)v; + struct rb_monitor *mc = monitor_ptr(data->monitor); + // cond.wait(monitor.mutex, timeout) + VALUE signaled = rb_funcall(data->cond, rb_intern("wait"), 2, mc->mutex, data->timeout); + return RTEST(signaled) ? Qtrue : Qfalse; +} + +static VALUE +monitor_enter_for_cond(VALUE v) +{ + // assert(rb_mutex_owned_p(mc->mutex) == Qtrue) + // but rb_mutex_owned_p is not exported... + + struct wait_for_cond_data *data = (struct wait_for_cond_data *)v; + struct rb_monitor *mc = monitor_ptr(data->monitor); + mc->ec_serial = rb_ec_serial(GET_EC()); + mc->count = NUM2LONG(data->count); + return Qnil; +} + +static VALUE +rb_monitor_wait_for_cond(rb_execution_context_t *ec, VALUE monitor, VALUE cond, VALUE timeout) +{ + VALUE count = monitor_exit_for_cond(monitor); + struct wait_for_cond_data data = { + monitor, + cond, + timeout, + count, + }; + + return rb_ensure(monitor_wait_for_cond_body, (VALUE)&data, + monitor_enter_for_cond, (VALUE)&data); +} + +static VALUE +monitor_sync_ensure(VALUE v_args) +{ + monitor_exit0((struct monitor_args *)v_args); + return Qnil; +} + +static VALUE +rb_monitor_synchronize(rb_execution_context_t *ec, VALUE monitor) +{ + struct monitor_args args = { + .monitor = monitor, + .mc = monitor_ptr(monitor), + .ec = ec, + .current_fiber_serial = rb_ec_serial(ec), + }; + monitor_enter0(&args); + return rb_ec_ensure(ec, do_ec_yield, (VALUE)ec, monitor_sync_ensure, (VALUE)&args); +} + static void Init_thread_sync(void) { @@ -1283,6 +1540,11 @@ Init_thread_sync(void) id_sleep = rb_intern("sleep"); + /* Monitor */ + VALUE rb_cMonitor = rb_define_class_id_under_no_pin(rb_cThread, rb_intern("Monitor"), rb_cObject); + rb_define_alloc_func(rb_cMonitor, monitor_alloc); + + rb_provide("monitor.so"); rb_provide("thread.rb"); } |
