summaryrefslogtreecommitdiff
path: root/io.c
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2022-01-09 01:47:51 +1300
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2022-05-28 15:44:18 +1200
commitce23cfa5182bb53e8b6555fb6a5b2846cd559922 (patch)
treebab733b1c37af9e2f3fb9d5a3e78f21226909e6f /io.c
parent8a13a2e8d1361bdab5d4bc1d75cf07b804a982c0 (diff)
Make `io_binwrite` atomic.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/5419
Diffstat (limited to 'io.c')
-rw-r--r--io.c212
1 files changed, 124 insertions, 88 deletions
diff --git a/io.c b/io.c
index dab1d945f9..697d571c4a 100644
--- a/io.c
+++ b/io.c
@@ -1581,71 +1581,111 @@ struct write_arg {
#ifdef HAVE_WRITEV
static VALUE
-io_binwrite_string(VALUE arg)
+io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
{
- struct binwrite_arg *p = (struct binwrite_arg *)arg;
- rb_io_t *fptr = p->fptr;
- long r;
-
if (fptr->wbuf.len) {
- struct iovec iov[2];
+ struct iovec iov[2];
- iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
- iov[0].iov_len = fptr->wbuf.len;
- iov[1].iov_base = (char *)p->ptr;
- iov[1].iov_len = p->length;
+ iov[0].iov_base = fptr->wbuf.ptr+fptr->wbuf.off;
+ iov[0].iov_len = fptr->wbuf.len;
+ iov[1].iov_base = (void*)ptr;
+ iov[1].iov_len = length;
- r = rb_writev_internal(fptr, iov, 2);
+ long result = rb_writev_internal(fptr, iov, 2);
- if (r < 0)
- return r;
+ if (result < 0)
+ return result;
- if (fptr->wbuf.len <= r) {
- r -= fptr->wbuf.len;
- fptr->wbuf.off = 0;
- fptr->wbuf.len = 0;
- }
- else {
- fptr->wbuf.off += (int)r;
- fptr->wbuf.len -= (int)r;
- r = 0L;
- }
+ if (result >= fptr->wbuf.len) {
+ // We wrote more than the internal buffer:
+ result -= fptr->wbuf.len;
+ fptr->wbuf.off = 0;
+ fptr->wbuf.len = 0;
+ }
+ else {
+ // We only wrote less data than the internal buffer:
+ fptr->wbuf.off += (int)result;
+ fptr->wbuf.len -= (int)result;
+
+ return 0L;
+ }
+
+ return result;
}
else {
- r = rb_write_internal(fptr, p->ptr, p->length);
+ return rb_write_internal(fptr, ptr, length);
}
-
- return r;
}
#else
static VALUE
+io_binwrite_string_internal(rb_io_t *fptr, const char *ptr, long length)
+{
+ long remaining = length;
+
+ if (fptr->wbuf.len) {
+ if (fptr->wbuf.len+length <= fptr->wbuf.capa) {
+ if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+length) {
+ MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
+ fptr->wbuf.off = 0;
+ }
+
+ MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, length);
+ fptr->wbuf.len += (int)length;
+
+ // We copied the entire incoming data to the internal buffer:
+ remaining = 0;
+ }
+
+ // Flush the internal buffer:
+ if (io_fflush(fptr) < 0) {
+ return -1;
+ }
+
+ // If all the data was buffered, we are done:
+ if (remaining == 0) {
+ return length;
+ }
+ }
+
+ // Otherwise, we should write the data directly:
+ return rb_write_internal(fptr, ptr, length);
+}
+#endif
+
+static VALUE
io_binwrite_string(VALUE arg)
{
struct binwrite_arg *p = (struct binwrite_arg *)arg;
- rb_io_t *fptr = p->fptr;
- long l, len;
- l = len = p->length;
+ const char *ptr = p->ptr;
+ long remaining = p->length;
- if (fptr->wbuf.len) {
- if (fptr->wbuf.len+len <= fptr->wbuf.capa) {
- if (fptr->wbuf.capa < fptr->wbuf.off+fptr->wbuf.len+len) {
- MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
- fptr->wbuf.off = 0;
- }
- MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, p->ptr, char, len);
- fptr->wbuf.len += (int)len;
- l = 0;
- }
- if (io_fflush(fptr) < 0)
- return -2L; /* fail in fflush */
- if (l == 0)
- return len;
+ while (remaining) {
+ // Write as much as possible:
+ long result = (long)io_binwrite_string_internal(p->fptr, ptr, remaining);
+
+ // Finished:
+ if (result == remaining) {
+ break;
+ }
+
+ if (result >= 0) {
+ ptr += result;
+ remaining -= result;
+ errno = EAGAIN;
+ }
+
+ // Wait for it to become writable:
+ if (rb_io_maybe_wait_writable(errno, p->fptr->self, Qnil)) {
+ rb_io_check_closed(p->fptr);
+ } else {
+ // The error was unrelated to waiting for it to become writable, so we fail:
+ return -1;
+ }
}
- return rb_write_internal(p->fptr, p->ptr, p->length);
+ return p->length;
}
-#endif
inline static void
io_allocate_write_buffer(rb_io_t *fptr, int sync)
@@ -1660,65 +1700,57 @@ io_allocate_write_buffer(rb_io_t *fptr, int sync)
}
}
+static inline int
+io_binwrite_requires_flush_write(rb_io_t *fptr, long len, int nosync)
+{
+ // If the requested operation was synchronous and the output mode is synchronus or a TTY:
+ if (!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY)))
+ return 1;
+
+ // If the amount of data we want to write exceeds the internal buffer:
+ if (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)
+ return 1;
+
+ // Otherwise, we can append to the internal buffer:
+ return 0;
+}
+
static long
io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
{
- long n, r, offset = 0;
+ if (len <= 0) return len;
- /* don't write anything if current thread has a pending interrupt. */
+ // Don't write anything if current thread has a pending interrupt:
rb_thread_check_ints();
- if ((n = len) <= 0) return n;
-
io_allocate_write_buffer(fptr, !nosync);
- if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
- (fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
+ if (io_binwrite_requires_flush_write(fptr, len, nosync)) {
struct binwrite_arg arg;
arg.fptr = fptr;
arg.str = str;
- retry:
- arg.ptr = ptr + offset;
- arg.length = n;
+ arg.ptr = ptr;
+ arg.length = len;
if (fptr->write_lock) {
- r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
+ return rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
}
else {
- r = io_binwrite_string((VALUE)&arg);
+ return io_binwrite_string((VALUE)&arg);
}
-
- /* xxx: other threads may modify given string. */
- if (r == n) return len;
- if (0 <= r) {
- offset += r;
- n -= r;
- errno = EAGAIN;
+ } else {
+ if (fptr->wbuf.off) {
+ if (fptr->wbuf.len)
+ MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
+ fptr->wbuf.off = 0;
}
- if (r == -2L)
- return -1L;
- if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
- rb_io_check_closed(fptr);
-
- if (offset < len)
- goto retry;
- }
-
- return -1L;
- }
+ MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr, char, len);
+ fptr->wbuf.len += (int)len;
- if (fptr->wbuf.off) {
- if (fptr->wbuf.len)
- MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
- fptr->wbuf.off = 0;
+ return len;
}
-
- MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
- fptr->wbuf.len += (int)len;
-
- return len;
}
# define MODE_BTMODE(a,b,c) ((fmode & FMODE_BINMODE) ? (b) : \
@@ -1792,15 +1824,17 @@ io_fwrite(VALUE str, rb_io_t *fptr, int nosync)
VALUE tmp;
long n, len;
const char *ptr;
+
#ifdef _WIN32
if (fptr->mode & FMODE_TTY) {
- long len = rb_w32_write_console(str, fptr->fd);
- if (len > 0) return len;
+ long len = rb_w32_write_console(str, fptr->fd);
+ if (len > 0) return len;
}
#endif
+
str = do_writeconv(str, fptr, &converted);
if (converted)
- OBJ_FREEZE(str);
+ OBJ_FREEZE(str);
tmp = rb_str_tmp_frozen_acquire(str);
RSTRING_GETMEM(tmp, ptr, len);
@@ -1830,10 +1864,12 @@ io_write(VALUE io, VALUE str, int nosync)
io = GetWriteIO(io);
str = rb_obj_as_string(str);
tmp = rb_io_check_io(io);
+
if (NIL_P(tmp)) {
- /* port is not IO, call write method for it. */
- return rb_funcall(io, id_write, 1, str);
+ /* port is not IO, call write method for it. */
+ return rb_funcall(io, id_write, 1, str);
}
+
io = tmp;
if (RSTRING_LEN(str) == 0) return INT2FIX(0);