summaryrefslogtreecommitdiff
path: root/io.c
diff options
context:
space:
mode:
authorakr <akr@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-03-30 06:38:05 +0000
committerakr <akr@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2008-03-30 06:38:05 +0000
commit0ef84dcefce47cab974f8763abd7cbb3a7eb6d98 (patch)
treedfe1062409fef2f44e7848ded9089a273809c7e5 /io.c
parenta367738381708064e7636f70f75a3777cda5a5bc (diff)
* io.c: IO.copy_stream implemented. [ruby-dev:33843]
* thread.c (rb_fd_select): new function. * configure.in (sys/sendfile.h): check the header file. (sendfile): check the function. (pread): check the function. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@15858 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'io.c')
-rw-r--r--io.c472
1 files changed, 472 insertions, 0 deletions
diff --git a/io.c b/io.c
index d19392bf8e..c8cb76ba7f 100644
--- a/io.c
+++ b/io.c
@@ -14,6 +14,7 @@
#include "ruby/ruby.h"
#include "ruby/io.h"
#include "ruby/signal.h"
+#include "vm_core.h"
#include <ctype.h>
#include <errno.h>
@@ -6219,6 +6220,476 @@ rb_io_s_read(int argc, VALUE *argv, VALUE io)
return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io);
}
+struct copy_stream_struct {
+ VALUE src;
+ VALUE dst;
+ int src_fd;
+ int dst_fd;
+ off_t copy_length;
+ off_t src_offset;
+ int close_src;
+ int close_dst;
+ off_t total;
+ char *syserr;
+ int error_no;
+ char *notimp;
+ rb_fdset_t fds;
+ rb_thread_t *th;
+};
+
+static void
+copy_stream_rbuf_to_dst(struct copy_stream_struct *stp,
+ rb_io_t *src_fptr, rb_io_t *dst_fptr, const char *dst_path)
+{
+ ssize_t r;
+ int len;
+retry:
+ len = src_fptr->rbuf_len;
+ if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
+ len = stp->copy_length;
+ }
+ if (len == 0)
+ return;
+ r = rb_write_internal(dst_fptr->fd, src_fptr->rbuf + src_fptr->rbuf_off, len);
+ if (len == r) {
+ src_fptr->rbuf_len -= len;
+ if (src_fptr->rbuf_len < 0) src_fptr->rbuf_len = 0;
+ if (stp->copy_length != (off_t)-1) stp->copy_length -= len;
+ stp->total += len;
+ return;
+ }
+ else if (0 <= r) {
+ src_fptr->rbuf_off += r;
+ src_fptr->rbuf_len -= r;
+ if (stp->copy_length != (off_t)-1) stp->copy_length -= r;
+ stp->total += r;
+ errno = EAGAIN;
+ }
+ if (rb_io_wait_writable(dst_fptr->fd)) {
+ rb_io_check_closed(dst_fptr);
+ if (src_fptr->rbuf_len)
+ goto retry;
+ }
+ rb_sys_fail(dst_path);
+}
+
+static int
+copy_stream_wait_read(struct copy_stream_struct *stp)
+{
+ int ret;
+ rb_fd_zero(&stp->fds);
+ rb_fd_set(stp->src_fd, &stp->fds);
+ ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL);
+ if (ret == -1) {
+ stp->syserr = "select";
+ stp->error_no = errno;
+ return -1;
+ }
+ return 0;
+}
+
+static int
+copy_stream_wait_write(struct copy_stream_struct *stp)
+{
+ int ret;
+ rb_fd_zero(&stp->fds);
+ rb_fd_set(stp->dst_fd, &stp->fds);
+ ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL);
+ if (ret == -1) {
+ stp->syserr = "select";
+ stp->error_no = errno;
+ return -1;
+ }
+ return 0;
+}
+
+#ifdef HAVE_SENDFILE
+
+#ifdef __linux__
+#define USE_SENDFILE
+
+#ifdef HAVE_SYS_SENDFILE_H
+#include <sys/sendfile.h>
+#endif
+
+static ssize_t
+simple_sendfile(int out_fd, int in_fd, off_t *offset, size_t count)
+{
+ return sendfile(out_fd, in_fd, offset, count);
+}
+
+#endif
+
+#endif
+
+#ifdef USE_SENDFILE
+static int
+copy_stream_sendfile(struct copy_stream_struct *stp)
+{
+ struct stat src_stat, dst_stat;
+ ssize_t ss;
+ int ret;
+
+ off_t copy_length;
+ off_t src_offset;
+ int use_pread;
+
+ ret = fstat(stp->src_fd, &src_stat);
+ if (ret == -1) {
+ stp->syserr = "fstat";
+ stp->error_no = errno;
+ return -1;
+ }
+ if (!S_ISREG(src_stat.st_mode))
+ return 0;
+
+ ret = fstat(stp->dst_fd, &dst_stat);
+ if (ret == -1) {
+ stp->syserr = "fstat";
+ stp->error_no = errno;
+ return -1;
+ }
+ if ((dst_stat.st_mode & S_IFMT) != S_IFSOCK)
+ return 0;
+
+ src_offset = stp->src_offset;
+ use_pread = src_offset != (off_t)-1;
+
+ copy_length = stp->copy_length;
+ if (copy_length == (off_t)-1) {
+ if (use_pread)
+ copy_length = src_stat.st_size - src_offset;
+ else {
+ off_t cur = lseek(stp->src_fd, 0, SEEK_CUR);
+ if (cur == (off_t)-1) {
+ stp->syserr = "lseek";
+ stp->error_no = errno;
+ return -1;
+ }
+ copy_length = src_stat.st_size - cur;
+ }
+ }
+
+retry_sendfile:
+ if (use_pread) {
+ ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, copy_length);
+ }
+ else {
+ ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, copy_length);
+ }
+ if (0 < ss) {
+ stp->total += ss;
+ copy_length -= ss;
+ if (0 < copy_length) {
+ ss = -1;
+ errno = EAGAIN;
+ }
+ }
+ if (ss == -1) {
+ if (errno == EINVAL || errno == ENOSYS)
+ return 0;
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ if (copy_stream_wait_write(stp) == -1)
+ return -1;
+ if (RUBY_VM_INTERRUPTED(stp->th))
+ return;
+ goto retry_sendfile;
+ }
+ stp->syserr = "sendfile";
+ stp->error_no = errno;
+ return -1;
+ }
+ return 1;
+}
+#endif
+
+static ssize_t
+copy_stream_read(struct copy_stream_struct *stp, char *buf, int len, off_t offset)
+{
+ ssize_t ss;
+retry_read:
+ if (offset == (off_t)-1)
+ ss = read(stp->src_fd, buf, len);
+ else {
+#ifdef HAVE_PREAD
+ ss = pread(stp->src_fd, buf, len, offset);
+#else
+ stp->notimp = "pread";
+ return -1;
+#endif
+ }
+ if (ss == 0) {
+ return 0;
+ }
+ if (ss == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ if (copy_stream_wait_read(stp) == -1)
+ return -1;
+ goto retry_read;
+ }
+ if (errno == ENOSYS) {
+ stp->notimp = "pread";
+ return -1;
+ }
+ stp->syserr = offset == (off_t)-1 ? "read" : "pread";
+ stp->error_no = errno;
+ return -1;
+ }
+ return ss;
+}
+
+static int
+copy_stream_write(struct copy_stream_struct *stp, char *buf, int len)
+{
+ ssize_t ss;
+ int off = 0;
+ while (len) {
+ ss = write(stp->dst_fd, buf+off, len);
+ if (ss == -1) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ if (copy_stream_wait_write(stp) == -1)
+ return -1;
+ continue;
+ }
+ stp->syserr = "write";
+ stp->error_no = errno;
+ return -1;
+ }
+ off += ss;
+ len -= ss;
+ stp->total += ss;
+ }
+ return 0;
+}
+
+static void
+copy_stream_read_write(struct copy_stream_struct *stp)
+{
+ char buf[1024*16];
+ int len;
+ ssize_t ss;
+ int ret;
+ off_t copy_length;
+ int use_eof;
+ off_t src_offset;
+ int use_pread;
+
+ copy_length = stp->copy_length;
+ use_eof = copy_length == (off_t)-1;
+ src_offset = stp->src_offset;
+ use_pread = src_offset != (off_t)-1;
+
+ if (use_pread && stp->close_src) {
+ off_t r;
+ r = lseek(stp->src_fd, src_offset, SEEK_SET);
+ if (r == (off_t)-1) {
+ stp->syserr = "lseek";
+ stp->error_no = errno;
+ return;
+ }
+ src_offset = (off_t)-1;
+ use_pread = 0;
+ }
+
+ while (use_eof || 0 < copy_length) {
+ if (!use_eof && copy_length < sizeof(buf)) {
+ len = copy_length;
+ }
+ else {
+ len = sizeof(buf);
+ }
+ if (use_pread) {
+ ss = copy_stream_read(stp, buf, len, src_offset);
+ if (0 < ss)
+ src_offset += ss;
+ }
+ else {
+ ss = copy_stream_read(stp, buf, len, (off_t)-1);
+ }
+ if (ss <= 0) /* EOF or error */
+ return;
+
+ ret = copy_stream_write(stp, buf, ss);
+ if (ret < 0)
+ return;
+
+ if (!use_eof)
+ copy_length -= ss;
+
+ if (RUBY_VM_INTERRUPTED(stp->th))
+ return;
+ }
+}
+
+static VALUE
+copy_stream_func(void *arg)
+{
+ struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+ int ret;
+
+#ifdef USE_SENDFILE
+ ret = copy_stream_sendfile(stp);
+ if (ret != 0)
+ goto finish; /* error or success */
+#endif
+
+ copy_stream_read_write(stp);
+
+finish:
+ return Qnil;
+}
+
+static VALUE
+copy_stream_body(VALUE arg)
+{
+ struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+ VALUE src_io, dst_io;
+ rb_io_t *src_fptr, *dst_fptr;
+ int src_fd, dst_fd;
+ char *src_path = 0, *dst_path = 0;
+
+ stp->th = GET_THREAD();
+
+ src_io = rb_check_convert_type(stp->src, T_FILE, "IO", "to_io");
+ if (!NIL_P(src_io)) {
+ GetOpenFile(src_io, src_fptr);
+ src_fd = src_fptr->fd;
+ }
+ else {
+ src_fptr = 0;
+ FilePathValue(stp->src);
+ src_path = StringValueCStr(stp->src);
+ src_fd = rb_sysopen_internal(src_path, O_RDONLY|O_NOCTTY, 0);
+ if (src_fd == -1) { rb_sys_fail(src_path); }
+ stp->close_src = 1;
+ }
+ stp->src_fd = src_fd;
+
+ dst_io = rb_check_convert_type(stp->dst, T_FILE, "IO", "to_io");
+ if (!NIL_P(dst_io)) {
+ dst_io = GetWriteIO(dst_io);
+ GetOpenFile(dst_io, dst_fptr);
+ dst_fd = dst_fptr->fd;
+ }
+ else {
+ dst_fptr = 0;
+ FilePathValue(stp->dst);
+ dst_path = StringValueCStr(stp->dst);
+ dst_fd = rb_sysopen_internal(dst_path, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0600);
+ if (dst_fd == -1) { rb_sys_fail(dst_path); }
+ stp->close_dst = 1;
+ }
+ stp->dst_fd = dst_fd;
+
+ stp->total = 0;
+
+ if (src_fptr && dst_fptr && src_fptr->rbuf_len && dst_fptr->wbuf_len) {
+ long len = src_fptr->rbuf_len;
+ VALUE str;
+ if (stp->copy_length != (off_t)-1 && stp->copy_length < len) {
+ len = stp->copy_length;
+ }
+ str = rb_str_buf_new(len);
+ rb_str_resize(str,len);
+ read_buffered_data(RSTRING_PTR(str), len, src_fptr);
+ io_fwrite(str, dst_fptr);
+ stp->total += len;
+ if (stp->copy_length != (off_t)-1)
+ stp->copy_length -= len;
+ }
+
+ if (dst_fptr && io_fflush(dst_fptr) < 0) {
+ rb_raise(rb_eIOError, "flush failed");
+ }
+
+ if (src_fptr) {
+ copy_stream_rbuf_to_dst(stp, src_fptr, dst_fptr, dst_path);
+ }
+
+ if (stp->copy_length == 0)
+ return Qnil;
+
+ rb_fd_init(&stp->fds);
+ rb_fd_set(src_fd, &stp->fds);
+ rb_fd_set(dst_fd, &stp->fds);
+
+ return rb_thread_blocking_region(copy_stream_func, (void*)stp, RB_UBF_DFL, 0);
+}
+
+static VALUE
+copy_stream_finalize(VALUE arg)
+{
+ struct copy_stream_struct *stp = (struct copy_stream_struct *)arg;
+ if (stp->close_src)
+ close(stp->src_fd);
+ if (stp->close_dst)
+ close(stp->dst_fd);
+ rb_fd_term(&stp->fds);
+ if (stp->syserr) {
+ errno = stp->error_no;
+ rb_sys_fail(stp->syserr);
+ }
+ if (stp->notimp) {
+ rb_raise(rb_eNotImpError, "%s() not implemented", stp->notimp);
+ }
+ return Qnil;
+}
+
+/*
+ * call-seq:
+ * IO.copy_stream(src, dst)
+ * IO.copy_stream(src, dst, copy_length)
+ * IO.copy_stream(src, dst, copy_length, src_offset)
+ *
+ * IO.copy_stream copies <i>src</i> to <i>dst</i>.
+ * <i>src</i> and <i>dst</i> is either a filename or an IO.
+ *
+ * This method returns the number of bytes copied.
+ *
+ * If optional arguments are not given,
+ * the start position of the copy is
+ * the beginning of the filename or
+ * the current file offset of the IO.
+ * The end position of the copy is the end of file.
+ *
+ * If <i>copy_length</i> is given,
+ * No more than <i>copy_length</i> bytes are copied.
+ *
+ * If <i>src_offset</i> is given,
+ * it specifies the start position of the copy.
+ *
+ * When <i>src_offset</i> is specified and
+ * <i>src</i> is an IO,
+ * IO.copy_stream doesn't move the current file offset.
+ *
+ */
+static VALUE
+rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io)
+{
+ VALUE src, dst, length, src_offset;
+ struct copy_stream_struct st;
+
+ MEMZERO(&st, struct copy_stream_struct, 1);
+
+ rb_scan_args(argc, argv, "22", &src, &dst, &length, &src_offset);
+
+ if (NIL_P(length))
+ st.copy_length = (off_t)-1;
+ else
+ st.copy_length = NUM2OFFT(length);
+
+ if (NIL_P(src_offset))
+ st.src_offset = (off_t)-1;
+ else
+ st.src_offset = NUM2OFFT(src_offset);
+
+ st.src = src;
+ st.dst = dst;
+
+ rb_ensure(copy_stream_body, (VALUE)&st, copy_stream_finalize, (VALUE)&st);
+
+ return OFFT2NUM(st.total);
+}
/*
* call-seq:
@@ -6874,6 +7345,7 @@ Init_IO(void)
rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1);
rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1);
rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1);
+ rb_define_singleton_method(rb_cIO, "copy_stream", rb_io_s_copy_stream, -1);
rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1);