summaryrefslogtreecommitdiff
path: root/thread.c
diff options
context:
space:
mode:
authornobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2011-02-12 05:44:23 +0000
committernobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2011-02-12 05:44:23 +0000
commit58b325366dbc5c84be12fb336ee5e68f208d9365 (patch)
treee76cc28208f340b4fa1f1a126f35e5953f56e27a /thread.c
parentdda8de065c1c7691ec8627d3024a61f324bc9f73 (diff)
* thread.c (rb_thread_io_blocking_region): new function to run
blocking region with GIL released, for fd. * thread.c (rb_thread_fd_close): implement. [ruby-core:35203] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@30852 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread.c')
-rw-r--r--thread.c52
1 files changed, 51 insertions, 1 deletions
diff --git a/thread.c b/thread.c
index b1a802b89c..7108f586af 100644
--- a/thread.c
+++ b/thread.c
@@ -73,6 +73,8 @@ static const VALUE eKillSignal = INT2FIX(0);
static const VALUE eTerminateSignal = INT2FIX(1);
static volatile int system_working = 1;
+#define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream]
+
inline static void
st_delete_wrap(st_table *table, st_data_t key)
{
@@ -1122,6 +1124,7 @@ rb_thread_blocking_region(
rb_thread_t *th = GET_THREAD();
int saved_errno = 0;
+ th->waiting_fd = -1;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
data2 = th;
@@ -1136,6 +1139,23 @@ rb_thread_blocking_region(
return val;
}
+VALUE
+rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd)
+{
+ VALUE val;
+ rb_thread_t *th = GET_THREAD();
+ int saved_errno = 0;
+
+ th->waiting_fd = fd;
+ BLOCKING_REGION({
+ val = func(data1);
+ saved_errno = errno;
+ }, ubf_select, th);
+ errno = saved_errno;
+
+ return val;
+}
+
/* alias of rb_thread_blocking_region() */
VALUE
@@ -1427,10 +1447,36 @@ rb_threadptr_reset_raised(rb_thread_t *th)
return 1;
}
+#define THREAD_IO_WAITING_P(th) ( \
+ ((th)->status == THREAD_STOPPED || \
+ (th)->status == THREAD_STOPPED_FOREVER) && \
+ (th)->blocking_region_buffer && \
+ (th)->unblock.func == ubf_select && \
+ 1)
+
+static int
+thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
+{
+ int fd = (int)data;
+ rb_thread_t *th;
+ GetThreadPtr((VALUE)key, th);
+
+ if (THREAD_IO_WAITING_P(th)) {
+ native_mutex_lock(&th->interrupt_lock);
+ if (THREAD_IO_WAITING_P(th) && th->waiting_fd == fd) {
+ th->errinfo = th->vm->special_exceptions[ruby_error_closed_stream];
+ RUBY_VM_SET_INTERRUPT(th);
+ (th->unblock.func)(th->unblock.arg);
+ }
+ native_mutex_unlock(&th->interrupt_lock);
+ }
+ return ST_CONTINUE;
+}
+
void
rb_thread_fd_close(int fd)
{
- /* TODO: fix me */
+ st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd);
}
/*
@@ -4362,6 +4408,10 @@ Init_Thread(void)
rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0);
+ closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed");
+ OBJ_TAINT(closed_stream_error);
+ OBJ_FREEZE(closed_stream_error);
+
cThGroup = rb_define_class("ThreadGroup", rb_cObject);
rb_define_alloc_func(cThGroup, thgroup_s_alloc);
rb_define_method(cThGroup, "list", thgroup_list, 0);