summaryrefslogtreecommitdiff
path: root/io.c
diff options
context:
space:
mode:
authornobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-11-07 20:47:02 +0000
committernobu <nobu@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-11-07 20:47:02 +0000
commit551dfe9d8b9877d81b0ca87aff004c11383b698d (patch)
treef0117a2caff4874dd09ff185eb5f54bee9551f9d /io.c
parentf8cdd9907a9076b8e628060a38e9d641625f4c8e (diff)
* include/ruby/io.h (rb_io_t): added write_lock to serialize.
[ruby-core:19668] * gc.c (gc_mark_children): mark write_lock. * gc.c (rb_gc_call_finalizer_at_exit): Mutex also needs to be deferred. * io.c (io_fflush, io_binwrite, fptr_finalize): serializes writing. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@20144 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'io.c')
-rw-r--r--io.c92
1 files changed, 72 insertions, 20 deletions
diff --git a/io.c b/io.c
index 5fa6a1f17c..810f385dd8 100644
--- a/io.c
+++ b/io.c
@@ -524,11 +524,29 @@ rb_write_internal(int fd, void *buf, size_t count)
return rb_thread_blocking_region(internal_write_func, &iis, RUBY_UBF_IO, 0);
}
+static long
+io_writable_length(rb_io_t *fptr, long l)
+{
+ if (PIPE_BUF < l &&
+ !rb_thread_alone() &&
+ wsplit_p(fptr)) {
+ l = PIPE_BUF;
+ }
+ return l;
+}
+
+static VALUE
+io_flush_buffer(VALUE arg)
+{
+ rb_io_t *fptr = (rb_io_t *)arg;
+ long l = io_writable_length(fptr, fptr->wbuf_len);
+ return rb_write_internal(fptr->fd, fptr->wbuf+fptr->wbuf_off, l);
+}
+
static int
io_fflush(rb_io_t *fptr)
{
- int r, l;
- int wbuf_off, wbuf_len;
+ long r;
rb_io_check_closed(fptr);
if (fptr->wbuf_len == 0)
@@ -539,15 +557,7 @@ io_fflush(rb_io_t *fptr)
retry:
if (fptr->wbuf_len == 0)
return 0;
- wbuf_off = fptr->wbuf_off;
- wbuf_len = fptr->wbuf_len;
- l = wbuf_len;
- if (PIPE_BUF < l &&
- !rb_thread_alone() &&
- wsplit_p(fptr)) {
- l = PIPE_BUF;
- }
- r = rb_write_internal(fptr->fd, fptr->wbuf+wbuf_off, l);
+ r = rb_mutex_synchronize(fptr->write_lock, io_flush_buffer, (VALUE)fptr);
/* xxx: Other threads may modify wbuf.
* A lock is required, definitely. */
rb_io_check_closed(fptr);
@@ -731,11 +741,25 @@ make_writeconv(rb_io_t *fptr)
}
/* writing functions */
+struct binwrite_arg {
+ rb_io_t *fptr;
+ VALUE str;
+ long offset;
+ long length;
+};
+
+static VALUE
+io_binwrite_string(VALUE arg)
+{
+ struct binwrite_arg *p = (struct binwrite_arg *)arg;
+ long l = io_writable_length(p->fptr, p->length);
+ return rb_write_internal(p->fptr->fd, RSTRING_PTR(p->str)+p->offset, l);
+}
static long
io_binwrite(VALUE str, rb_io_t *fptr, int nosync)
{
- long len, n, r, l, offset = 0;
+ long len, n, r, offset = 0;
len = RSTRING_LEN(str);
if ((n = len) <= 0) return n;
@@ -744,9 +768,12 @@ io_binwrite(VALUE str, rb_io_t *fptr, int nosync)
fptr->wbuf_len = 0;
fptr->wbuf_capa = 8192;
fptr->wbuf = ALLOC_N(char, fptr->wbuf_capa);
+ fptr->write_lock = rb_mutex_new();
}
if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
(fptr->wbuf && fptr->wbuf_capa <= fptr->wbuf_len + len)) {
+ struct binwrite_arg arg;
+
/* xxx: use writev to avoid double write if available */
if (fptr->wbuf_len && fptr->wbuf_len+len <= fptr->wbuf_capa) {
if (fptr->wbuf_capa < fptr->wbuf_off+fptr->wbuf_len+len) {
@@ -766,14 +793,18 @@ io_binwrite(VALUE str, rb_io_t *fptr, int nosync)
if (fptr->stdio_file != stderr && !rb_thread_fd_writable(fptr->fd)) {
rb_io_check_closed(fptr);
}
+ arg.fptr = fptr;
+ arg.str = str;
+ arg.offset = offset;
retry:
- l = n;
- if (PIPE_BUF < l &&
- !rb_thread_alone() &&
- wsplit_p(fptr)) {
- l = PIPE_BUF;
- }
- r = rb_write_internal(fptr->fd, RSTRING_PTR(str)+offset, l);
+ arg.length = n;
+ if (fptr->write_lock) {
+ r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
+ }
+ else {
+ long l = io_writable_length(fptr, n);
+ r = rb_write_internal(fptr->fd, RSTRING_PTR(str)+offset, l);
+ }
/* xxx: other threads may modify given string. */
if (r == n) return len;
if (0 <= r) {
@@ -3039,12 +3070,33 @@ finish_writeconv(rb_io_t *fptr, int noraise)
}
+struct finish_writeconv_arg {
+ rb_io_t *fptr;
+ int noraise;
+};
+
+static VALUE
+finish_writeconv_sync(VALUE arg)
+{
+ struct finish_writeconv_arg *p = (struct finish_writeconv_arg *)arg;
+ finish_writeconv(p->fptr, p->noraise);
+ return Qnil;
+}
+
static void
fptr_finalize(rb_io_t *fptr, int noraise)
{
int ebadf = 0;
if (fptr->writeconv) {
- finish_writeconv(fptr, noraise);
+ if (fptr->write_lock) {
+ struct finish_writeconv_arg arg;
+ arg.fptr = fptr;
+ arg.noraise = noraise;
+ rb_mutex_synchronize(fptr->write_lock, finish_writeconv_sync, (VALUE)&arg);
+ }
+ else {
+ finish_writeconv(fptr, noraise);
+ }
}
if (fptr->wbuf_len) {
io_fflush(fptr);