diff options
author | akr <akr@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2008-03-30 06:38:05 +0000 |
---|---|---|
committer | akr <akr@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2008-03-30 06:38:05 +0000 |
commit | 0ef84dcefce47cab974f8763abd7cbb3a7eb6d98 (patch) | |
tree | dfe1062409fef2f44e7848ded9089a273809c7e5 /io.c | |
parent | a367738381708064e7636f70f75a3777cda5a5bc (diff) |
* io.c: IO.copy_stream implemented. [ruby-dev:33843]
* thread.c (rb_fd_select): new function.
* configure.in (sys/sendfile.h): check the header file.
(sendfile): check the function.
(pread): check the function.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@15858 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'io.c')
-rw-r--r-- | io.c | 472 |
1 files changed, 472 insertions, 0 deletions
@@ -14,6 +14,7 @@ #include "ruby/ruby.h" #include "ruby/io.h" #include "ruby/signal.h" +#include "vm_core.h" #include <ctype.h> #include <errno.h> @@ -6219,6 +6220,476 @@ rb_io_s_read(int argc, VALUE *argv, VALUE io) return rb_ensure(io_s_read, (VALUE)&arg, rb_io_close, arg.io); } +struct copy_stream_struct { + VALUE src; + VALUE dst; + int src_fd; + int dst_fd; + off_t copy_length; + off_t src_offset; + int close_src; + int close_dst; + off_t total; + char *syserr; + int error_no; + char *notimp; + rb_fdset_t fds; + rb_thread_t *th; +}; + +static void +copy_stream_rbuf_to_dst(struct copy_stream_struct *stp, + rb_io_t *src_fptr, rb_io_t *dst_fptr, const char *dst_path) +{ + ssize_t r; + int len; +retry: + len = src_fptr->rbuf_len; + if (stp->copy_length != (off_t)-1 && stp->copy_length < len) { + len = stp->copy_length; + } + if (len == 0) + return; + r = rb_write_internal(dst_fptr->fd, src_fptr->rbuf + src_fptr->rbuf_off, len); + if (len == r) { + src_fptr->rbuf_len -= len; + if (src_fptr->rbuf_len < 0) src_fptr->rbuf_len = 0; + if (stp->copy_length != (off_t)-1) stp->copy_length -= len; + stp->total += len; + return; + } + else if (0 <= r) { + src_fptr->rbuf_off += r; + src_fptr->rbuf_len -= r; + if (stp->copy_length != (off_t)-1) stp->copy_length -= r; + stp->total += r; + errno = EAGAIN; + } + if (rb_io_wait_writable(dst_fptr->fd)) { + rb_io_check_closed(dst_fptr); + if (src_fptr->rbuf_len) + goto retry; + } + rb_sys_fail(dst_path); +} + +static int +copy_stream_wait_read(struct copy_stream_struct *stp) +{ + int ret; + rb_fd_zero(&stp->fds); + rb_fd_set(stp->src_fd, &stp->fds); + ret = rb_fd_select(rb_fd_max(&stp->fds), &stp->fds, NULL, NULL, NULL); + if (ret == -1) { + stp->syserr = "select"; + stp->error_no = errno; + return -1; + } + return 0; +} + +static int +copy_stream_wait_write(struct copy_stream_struct *stp) +{ + int ret; + rb_fd_zero(&stp->fds); + rb_fd_set(stp->dst_fd, &stp->fds); + ret = rb_fd_select(rb_fd_max(&stp->fds), NULL, &stp->fds, NULL, NULL); + if (ret == -1) { + stp->syserr = "select"; + stp->error_no = errno; + return -1; + } + return 0; +} + +#ifdef HAVE_SENDFILE + +#ifdef __linux__ +#define USE_SENDFILE + +#ifdef HAVE_SYS_SENDFILE_H +#include <sys/sendfile.h> +#endif + +static ssize_t +simple_sendfile(int out_fd, int in_fd, off_t *offset, size_t count) +{ + return sendfile(out_fd, in_fd, offset, count); +} + +#endif + +#endif + +#ifdef USE_SENDFILE +static int +copy_stream_sendfile(struct copy_stream_struct *stp) +{ + struct stat src_stat, dst_stat; + ssize_t ss; + int ret; + + off_t copy_length; + off_t src_offset; + int use_pread; + + ret = fstat(stp->src_fd, &src_stat); + if (ret == -1) { + stp->syserr = "fstat"; + stp->error_no = errno; + return -1; + } + if (!S_ISREG(src_stat.st_mode)) + return 0; + + ret = fstat(stp->dst_fd, &dst_stat); + if (ret == -1) { + stp->syserr = "fstat"; + stp->error_no = errno; + return -1; + } + if ((dst_stat.st_mode & S_IFMT) != S_IFSOCK) + return 0; + + src_offset = stp->src_offset; + use_pread = src_offset != (off_t)-1; + + copy_length = stp->copy_length; + if (copy_length == (off_t)-1) { + if (use_pread) + copy_length = src_stat.st_size - src_offset; + else { + off_t cur = lseek(stp->src_fd, 0, SEEK_CUR); + if (cur == (off_t)-1) { + stp->syserr = "lseek"; + stp->error_no = errno; + return -1; + } + copy_length = src_stat.st_size - cur; + } + } + +retry_sendfile: + if (use_pread) { + ss = simple_sendfile(stp->dst_fd, stp->src_fd, &src_offset, copy_length); + } + else { + ss = simple_sendfile(stp->dst_fd, stp->src_fd, NULL, copy_length); + } + if (0 < ss) { + stp->total += ss; + copy_length -= ss; + if (0 < copy_length) { + ss = -1; + errno = EAGAIN; + } + } + if (ss == -1) { + if (errno == EINVAL || errno == ENOSYS) + return 0; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (copy_stream_wait_write(stp) == -1) + return -1; + if (RUBY_VM_INTERRUPTED(stp->th)) + return; + goto retry_sendfile; + } + stp->syserr = "sendfile"; + stp->error_no = errno; + return -1; + } + return 1; +} +#endif + +static ssize_t +copy_stream_read(struct copy_stream_struct *stp, char *buf, int len, off_t offset) +{ + ssize_t ss; +retry_read: + if (offset == (off_t)-1) + ss = read(stp->src_fd, buf, len); + else { +#ifdef HAVE_PREAD + ss = pread(stp->src_fd, buf, len, offset); +#else + stp->notimp = "pread"; + return -1; +#endif + } + if (ss == 0) { + return 0; + } + if (ss == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (copy_stream_wait_read(stp) == -1) + return -1; + goto retry_read; + } + if (errno == ENOSYS) { + stp->notimp = "pread"; + return -1; + } + stp->syserr = offset == (off_t)-1 ? "read" : "pread"; + stp->error_no = errno; + return -1; + } + return ss; +} + +static int +copy_stream_write(struct copy_stream_struct *stp, char *buf, int len) +{ + ssize_t ss; + int off = 0; + while (len) { + ss = write(stp->dst_fd, buf+off, len); + if (ss == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (copy_stream_wait_write(stp) == -1) + return -1; + continue; + } + stp->syserr = "write"; + stp->error_no = errno; + return -1; + } + off += ss; + len -= ss; + stp->total += ss; + } + return 0; +} + +static void +copy_stream_read_write(struct copy_stream_struct *stp) +{ + char buf[1024*16]; + int len; + ssize_t ss; + int ret; + off_t copy_length; + int use_eof; + off_t src_offset; + int use_pread; + + copy_length = stp->copy_length; + use_eof = copy_length == (off_t)-1; + src_offset = stp->src_offset; + use_pread = src_offset != (off_t)-1; + + if (use_pread && stp->close_src) { + off_t r; + r = lseek(stp->src_fd, src_offset, SEEK_SET); + if (r == (off_t)-1) { + stp->syserr = "lseek"; + stp->error_no = errno; + return; + } + src_offset = (off_t)-1; + use_pread = 0; + } + + while (use_eof || 0 < copy_length) { + if (!use_eof && copy_length < sizeof(buf)) { + len = copy_length; + } + else { + len = sizeof(buf); + } + if (use_pread) { + ss = copy_stream_read(stp, buf, len, src_offset); + if (0 < ss) + src_offset += ss; + } + else { + ss = copy_stream_read(stp, buf, len, (off_t)-1); + } + if (ss <= 0) /* EOF or error */ + return; + + ret = copy_stream_write(stp, buf, ss); + if (ret < 0) + return; + + if (!use_eof) + copy_length -= ss; + + if (RUBY_VM_INTERRUPTED(stp->th)) + return; + } +} + +static VALUE +copy_stream_func(void *arg) +{ + struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; + int ret; + +#ifdef USE_SENDFILE + ret = copy_stream_sendfile(stp); + if (ret != 0) + goto finish; /* error or success */ +#endif + + copy_stream_read_write(stp); + +finish: + return Qnil; +} + +static VALUE +copy_stream_body(VALUE arg) +{ + struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; + VALUE src_io, dst_io; + rb_io_t *src_fptr, *dst_fptr; + int src_fd, dst_fd; + char *src_path = 0, *dst_path = 0; + + stp->th = GET_THREAD(); + + src_io = rb_check_convert_type(stp->src, T_FILE, "IO", "to_io"); + if (!NIL_P(src_io)) { + GetOpenFile(src_io, src_fptr); + src_fd = src_fptr->fd; + } + else { + src_fptr = 0; + FilePathValue(stp->src); + src_path = StringValueCStr(stp->src); + src_fd = rb_sysopen_internal(src_path, O_RDONLY|O_NOCTTY, 0); + if (src_fd == -1) { rb_sys_fail(src_path); } + stp->close_src = 1; + } + stp->src_fd = src_fd; + + dst_io = rb_check_convert_type(stp->dst, T_FILE, "IO", "to_io"); + if (!NIL_P(dst_io)) { + dst_io = GetWriteIO(dst_io); + GetOpenFile(dst_io, dst_fptr); + dst_fd = dst_fptr->fd; + } + else { + dst_fptr = 0; + FilePathValue(stp->dst); + dst_path = StringValueCStr(stp->dst); + dst_fd = rb_sysopen_internal(dst_path, O_WRONLY|O_CREAT|O_TRUNC|O_NOCTTY, 0600); + if (dst_fd == -1) { rb_sys_fail(dst_path); } + stp->close_dst = 1; + } + stp->dst_fd = dst_fd; + + stp->total = 0; + + if (src_fptr && dst_fptr && src_fptr->rbuf_len && dst_fptr->wbuf_len) { + long len = src_fptr->rbuf_len; + VALUE str; + if (stp->copy_length != (off_t)-1 && stp->copy_length < len) { + len = stp->copy_length; + } + str = rb_str_buf_new(len); + rb_str_resize(str,len); + read_buffered_data(RSTRING_PTR(str), len, src_fptr); + io_fwrite(str, dst_fptr); + stp->total += len; + if (stp->copy_length != (off_t)-1) + stp->copy_length -= len; + } + + if (dst_fptr && io_fflush(dst_fptr) < 0) { + rb_raise(rb_eIOError, "flush failed"); + } + + if (src_fptr) { + copy_stream_rbuf_to_dst(stp, src_fptr, dst_fptr, dst_path); + } + + if (stp->copy_length == 0) + return Qnil; + + rb_fd_init(&stp->fds); + rb_fd_set(src_fd, &stp->fds); + rb_fd_set(dst_fd, &stp->fds); + + return rb_thread_blocking_region(copy_stream_func, (void*)stp, RB_UBF_DFL, 0); +} + +static VALUE +copy_stream_finalize(VALUE arg) +{ + struct copy_stream_struct *stp = (struct copy_stream_struct *)arg; + if (stp->close_src) + close(stp->src_fd); + if (stp->close_dst) + close(stp->dst_fd); + rb_fd_term(&stp->fds); + if (stp->syserr) { + errno = stp->error_no; + rb_sys_fail(stp->syserr); + } + if (stp->notimp) { + rb_raise(rb_eNotImpError, "%s() not implemented", stp->notimp); + } + return Qnil; +} + +/* + * call-seq: + * IO.copy_stream(src, dst) + * IO.copy_stream(src, dst, copy_length) + * IO.copy_stream(src, dst, copy_length, src_offset) + * + * IO.copy_stream copies <i>src</i> to <i>dst</i>. + * <i>src</i> and <i>dst</i> is either a filename or an IO. + * + * This method returns the number of bytes copied. + * + * If optional arguments are not given, + * the start position of the copy is + * the beginning of the filename or + * the current file offset of the IO. + * The end position of the copy is the end of file. + * + * If <i>copy_length</i> is given, + * No more than <i>copy_length</i> bytes are copied. + * + * If <i>src_offset</i> is given, + * it specifies the start position of the copy. + * + * When <i>src_offset</i> is specified and + * <i>src</i> is an IO, + * IO.copy_stream doesn't move the current file offset. + * + */ +static VALUE +rb_io_s_copy_stream(int argc, VALUE *argv, VALUE io) +{ + VALUE src, dst, length, src_offset; + struct copy_stream_struct st; + + MEMZERO(&st, struct copy_stream_struct, 1); + + rb_scan_args(argc, argv, "22", &src, &dst, &length, &src_offset); + + if (NIL_P(length)) + st.copy_length = (off_t)-1; + else + st.copy_length = NUM2OFFT(length); + + if (NIL_P(src_offset)) + st.src_offset = (off_t)-1; + else + st.src_offset = NUM2OFFT(src_offset); + + st.src = src; + st.dst = dst; + + rb_ensure(copy_stream_body, (VALUE)&st, copy_stream_finalize, (VALUE)&st); + + return OFFT2NUM(st.total); +} /* * call-seq: @@ -6874,6 +7345,7 @@ Init_IO(void) rb_define_singleton_method(rb_cIO, "select", rb_f_select, -1); rb_define_singleton_method(rb_cIO, "pipe", rb_io_s_pipe, -1); rb_define_singleton_method(rb_cIO, "try_convert", rb_io_s_try_convert, 1); + rb_define_singleton_method(rb_cIO, "copy_stream", rb_io_s_copy_stream, -1); rb_define_method(rb_cIO, "initialize", rb_io_initialize, -1); |