diff options
-rw-r--r-- | ChangeLog | 12 | ||||
-rw-r--r-- | gc.c | 4 | ||||
-rw-r--r-- | include/ruby/io.h | 2 | ||||
-rw-r--r-- | io.c | 92 |
4 files changed, 89 insertions, 21 deletions
@@ -1,3 +1,15 @@ +Sat Nov 8 05:46:50 2008 Nobuyoshi Nakada <nobu@ruby-lang.org> + + * include/ruby/io.h (rb_io_t): added write_lock to serialize. + [ruby-core:19668] + + * gc.c (gc_mark_children): mark write_lock. + + * gc.c (rb_gc_call_finalizer_at_exit): Mutex also needs to be + deferred. + + * io.c (io_fflush, io_binwrite, fptr_finalize): serializes writing. + Tue Nov 11 17:37:47 2008 NAKAMURA Usaku <usa@ruby-lang.org> * win32/win32.c (rb_w32_accept): secure fd before accept because if @@ -396,6 +396,7 @@ rb_objspace_alloc(void) #define HEAP_OBJ_LIMIT (HEAP_SIZE / sizeof(struct RVALUE)) +extern VALUE rb_cMutex; extern st_table *rb_class_tbl; int ruby_disable_gc_stress = 0; @@ -1508,6 +1509,7 @@ gc_mark_children(rb_objspace_t *objspace, VALUE ptr, int lev) gc_mark(objspace, obj->as.file.fptr->writeconv_asciicompat, lev); gc_mark(objspace, obj->as.file.fptr->writeconv_pre_ecopts, lev); gc_mark(objspace, obj->as.file.fptr->encs.ecopts, lev); + gc_mark(objspace, obj->as.file.fptr->write_lock, lev); } break; @@ -2345,7 +2347,7 @@ rb_gc_call_finalizer_at_exit(void) while (p < pend) { if (BUILTIN_TYPE(p) == T_DATA && DATA_PTR(p) && RANY(p)->as.data.dfree && - RANY(p)->as.basic.klass != rb_cThread) { + RANY(p)->as.basic.klass != rb_cThread && RANY(p)->as.basic.klass != rb_cMutex) { p->as.free.flags = 0; if ((long)RANY(p)->as.data.dfree == -1) { xfree(DATA_PTR(p)); diff --git a/include/ruby/io.h b/include/ruby/io.h index d406b32264..04af3d07e3 100644 --- a/include/ruby/io.h +++ b/include/ruby/io.h @@ -73,6 +73,7 @@ typedef struct rb_io_t { VALUE writeconv_pre_ecopts; int writeconv_initialized; + VALUE write_lock; } rb_io_t; #define HAVE_RB_IO_T 1 @@ -133,6 +134,7 @@ typedef struct rb_io_t { fp->encs.enc2 = NULL;\ fp->encs.ecflags = 0;\ fp->encs.ecopts = Qnil;\ + fp->write_lock = 0;\ } while (0) FILE *rb_io_stdio_file(rb_io_t *fptr); @@ -524,11 +524,29 @@ rb_write_internal(int fd, void *buf, size_t count) return rb_thread_blocking_region(internal_write_func, &iis, RUBY_UBF_IO, 0); } +static long +io_writable_length(rb_io_t *fptr, long l) +{ + if (PIPE_BUF < l && + !rb_thread_alone() && + wsplit_p(fptr)) { + l = PIPE_BUF; + } + return l; +} + +static VALUE +io_flush_buffer(VALUE arg) +{ + rb_io_t *fptr = (rb_io_t *)arg; + long l = io_writable_length(fptr, fptr->wbuf_len); + return rb_write_internal(fptr->fd, fptr->wbuf+fptr->wbuf_off, l); +} + static int io_fflush(rb_io_t *fptr) { - int r, l; - int wbuf_off, wbuf_len; + long r; rb_io_check_closed(fptr); if (fptr->wbuf_len == 0) @@ -539,15 +557,7 @@ io_fflush(rb_io_t *fptr) retry: if (fptr->wbuf_len == 0) return 0; - wbuf_off = fptr->wbuf_off; - wbuf_len = fptr->wbuf_len; - l = wbuf_len; - if (PIPE_BUF < l && - !rb_thread_alone() && - wsplit_p(fptr)) { - l = PIPE_BUF; - } - r = rb_write_internal(fptr->fd, fptr->wbuf+wbuf_off, l); + r = rb_mutex_synchronize(fptr->write_lock, io_flush_buffer, (VALUE)fptr); /* xxx: Other threads may modify wbuf. * A lock is required, definitely. */ rb_io_check_closed(fptr); @@ -731,11 +741,25 @@ make_writeconv(rb_io_t *fptr) } /* writing functions */ +struct binwrite_arg { + rb_io_t *fptr; + VALUE str; + long offset; + long length; +}; + +static VALUE +io_binwrite_string(VALUE arg) +{ + struct binwrite_arg *p = (struct binwrite_arg *)arg; + long l = io_writable_length(p->fptr, p->length); + return rb_write_internal(p->fptr->fd, RSTRING_PTR(p->str)+p->offset, l); +} static long io_binwrite(VALUE str, rb_io_t *fptr, int nosync) { - long len, n, r, l, offset = 0; + long len, n, r, offset = 0; len = RSTRING_LEN(str); if ((n = len) <= 0) return n; @@ -744,9 +768,12 @@ io_binwrite(VALUE str, rb_io_t *fptr, int nosync) fptr->wbuf_len = 0; fptr->wbuf_capa = 8192; fptr->wbuf = ALLOC_N(char, fptr->wbuf_capa); + fptr->write_lock = rb_mutex_new(); } if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) || (fptr->wbuf && fptr->wbuf_capa <= fptr->wbuf_len + len)) { + struct binwrite_arg arg; + /* xxx: use writev to avoid double write if available */ if (fptr->wbuf_len && fptr->wbuf_len+len <= fptr->wbuf_capa) { if (fptr->wbuf_capa < fptr->wbuf_off+fptr->wbuf_len+len) { @@ -766,14 +793,18 @@ io_binwrite(VALUE str, rb_io_t *fptr, int nosync) if (fptr->stdio_file != stderr && !rb_thread_fd_writable(fptr->fd)) { rb_io_check_closed(fptr); } + arg.fptr = fptr; + arg.str = str; + arg.offset = offset; retry: - l = n; - if (PIPE_BUF < l && - !rb_thread_alone() && - wsplit_p(fptr)) { - l = PIPE_BUF; - } - r = rb_write_internal(fptr->fd, RSTRING_PTR(str)+offset, l); + arg.length = n; + if (fptr->write_lock) { + r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg); + } + else { + long l = io_writable_length(fptr, n); + r = rb_write_internal(fptr->fd, RSTRING_PTR(str)+offset, l); + } /* xxx: other threads may modify given string. */ if (r == n) return len; if (0 <= r) { @@ -3039,12 +3070,33 @@ finish_writeconv(rb_io_t *fptr, int noraise) } +struct finish_writeconv_arg { + rb_io_t *fptr; + int noraise; +}; + +static VALUE +finish_writeconv_sync(VALUE arg) +{ + struct finish_writeconv_arg *p = (struct finish_writeconv_arg *)arg; + finish_writeconv(p->fptr, p->noraise); + return Qnil; +} + static void fptr_finalize(rb_io_t *fptr, int noraise) { int ebadf = 0; if (fptr->writeconv) { - finish_writeconv(fptr, noraise); + if (fptr->write_lock) { + struct finish_writeconv_arg arg; + arg.fptr = fptr; + arg.noraise = noraise; + rb_mutex_synchronize(fptr->write_lock, finish_writeconv_sync, (VALUE)&arg); + } + else { + finish_writeconv(fptr, noraise); + } } if (fptr->wbuf_len) { io_fflush(fptr); |