summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <samuel.williams@oriontransfer.co.nz>2022-01-09 11:41:20 +1300
committerSamuel Williams <samuel.williams@oriontransfer.co.nz>2022-05-28 15:44:18 +1200
commit15ebfe28493560f8368fc9a2fe7d9b5913051b84 (patch)
tree668db59e67c402e0dcf91c5cf8418f0123c862b5
parentce23cfa5182bb53e8b6555fb6a5b2846cd559922 (diff)
Make `io_binwritev` atomic.
Notes
Notes: Merged: https://github.com/ruby/ruby/pull/5419
-rw-r--r--io.c145
1 files changed, 83 insertions, 62 deletions
diff --git a/io.c b/io.c
index 697d571c4a..a01aca8f61 100644
--- a/io.c
+++ b/io.c
@@ -1885,105 +1885,126 @@ io_write(VALUE io, VALUE str, int nosync)
#ifdef HAVE_WRITEV
struct binwritev_arg {
rb_io_t *fptr;
- const struct iovec *iov;
+ struct iovec *iov;
int iovcnt;
+ size_t total;
};
static VALUE
-call_writev_internal(VALUE arg)
+io_binwritev_internal(VALUE arg)
{
struct binwritev_arg *p = (struct binwritev_arg *)arg;
- return rb_writev_internal(p->fptr, p->iov, p->iovcnt);
+
+ size_t remaining = p->total;
+ size_t offset = 0;
+
+ rb_io_t *fptr = p->fptr;
+ struct iovec *iov = p->iov;
+ int iovcnt = p->iovcnt;
+
+ while (remaining) {
+ long result = rb_writev_internal(fptr, iov, iovcnt);
+
+ if (result >= 0) {
+ offset += result;
+ if (fptr->wbuf.ptr && fptr->wbuf.len) {
+ if (offset < (size_t)fptr->wbuf.len) {
+ fptr->wbuf.off += result;
+ fptr->wbuf.len -= result;
+ }
+ else {
+ offset -= (size_t)fptr->wbuf.len;
+ fptr->wbuf.off = 0;
+ fptr->wbuf.len = 0;
+ }
+ }
+
+ if (offset == p->total) {
+ return p->total;
+ }
+
+ while (result >= (ssize_t)iov->iov_len) {
+ /* iovcnt > 0 */
+ result -= iov->iov_len;
+ iov->iov_len = 0;
+ iov++;
+
+ if (!--iovcnt) {
+ // I don't believe this code path can ever occur.
+ return offset;
+ }
+ }
+
+ iov->iov_base = (char *)iov->iov_base + result;
+ iov->iov_len -= result;
+
+ errno = EAGAIN;
+ }
+
+ if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
+ rb_io_check_closed(fptr);
+ } else {
+ return -1;
+ }
+ }
+
+ return offset;
}
static long
io_binwritev(struct iovec *iov, int iovcnt, rb_io_t *fptr)
{
- int i;
- long r, total = 0, written_len = 0;
-
- /* don't write anything if current thread has a pending interrupt. */
+ // Don't write anything if current thread has a pending interrupt:
rb_thread_check_ints();
if (iovcnt == 0) return 0;
- for (i = 1; i < iovcnt; i++) total += iov[i].iov_len;
+
+ size_t total = 0;
+ for (int i = 1; i < iovcnt; i++) total += iov[i].iov_len;
io_allocate_write_buffer(fptr, 1);
if (fptr->wbuf.ptr && fptr->wbuf.len) {
- long offset = fptr->wbuf.off + fptr->wbuf.len;
- if (offset + total <= fptr->wbuf.capa) {
- for (i = 1; i < iovcnt; i++) {
+ // The end of the buffered data:
+ size_t offset = fptr->wbuf.off + fptr->wbuf.len;
+
+ if (offset + total <= (size_t)fptr->wbuf.capa) {
+ for (int i = 1; i < iovcnt; i++) {
memcpy(fptr->wbuf.ptr+offset, iov[i].iov_base, iov[i].iov_len);
offset += iov[i].iov_len;
}
fptr->wbuf.len += total;
+
return total;
}
else {
iov[0].iov_base = fptr->wbuf.ptr + fptr->wbuf.off;
- iov[0].iov_len = fptr->wbuf.len;
+ iov[0].iov_len = fptr->wbuf.len;
}
}
else {
+ // The first iov is reserved for the internal buffer, and it's empty.
iov++;
- if (!--iovcnt) return 0;
- }
-
- retry:
- if (fptr->write_lock) {
- struct binwritev_arg arg;
- arg.fptr = fptr;
- arg.iov = iov;
- arg.iovcnt = iovcnt;
- r = rb_mutex_synchronize(fptr->write_lock, call_writev_internal, (VALUE)&arg);
- }
- else {
- r = rb_writev_internal(fptr, iov, iovcnt);
- }
-
- if (r >= 0) {
- written_len += r;
- if (fptr->wbuf.ptr && fptr->wbuf.len) {
- if (written_len < fptr->wbuf.len) {
- fptr->wbuf.off += r;
- fptr->wbuf.len -= r;
- }
- else {
- written_len -= fptr->wbuf.len;
- fptr->wbuf.off = 0;
- fptr->wbuf.len = 0;
- }
- }
-
- if (written_len == total) return total;
- while (r >= (ssize_t)iov->iov_len) {
- /* iovcnt > 0 */
- r -= iov->iov_len;
- iov->iov_len = 0;
- iov++;
-
- if (!--iovcnt) {
- // assert(written_len == total);
-
- return total;
- }
+ if (!--iovcnt) {
+ // If there are no other io vectors we are done.
+ return 0;
}
+ }
- iov->iov_base = (char *)iov->iov_base + r;
- iov->iov_len -= r;
+ struct binwritev_arg arg;
+ arg.fptr = fptr;
+ arg.iov = iov;
+ arg.iovcnt = iovcnt;
+ arg.total = total;
- errno = EAGAIN;
+ if (fptr->write_lock) {
+ return rb_mutex_synchronize(fptr->write_lock, io_binwritev_internal, (VALUE)&arg);
}
-
- if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
- rb_io_check_closed(fptr);
- goto retry;
+ else {
+ return io_binwritev_internal((VALUE)&arg);
}
-
- return -1L;
}
static long