From 46149188656f5c2560a0fd78d077c145db0d1d95 Mon Sep 17 00:00:00 2001 From: shyouhei Date: Sun, 15 Jun 2008 13:49:29 +0000 Subject: merge revision(s) 16379: * win32/win32.c (rb_w32_select): backport from trunk. [ruby-talk:300743] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/branches/ruby_1_8_5@17288 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- ChangeLog | 5 ++ version.h | 2 +- win32/win32.c | 276 ++++++++++++++++++++++++++++++++++++++++++++++------------ 3 files changed, 227 insertions(+), 56 deletions(-) diff --git a/ChangeLog b/ChangeLog index 9a659250a8..783c7ca42f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +Sun Jun 15 22:49:12 2008 NAKAMURA Usaku + + * win32/win32.c (rb_w32_select): backport from trunk. + [ruby-talk:300743] + Sun Jun 15 22:47:37 2008 Nobuyoshi Nakada * lib/delegate.rb (SimpleDelegator::dup): removed needless argument. diff --git a/version.h b/version.h index fb272faf5c..c5b7dcc351 100644 --- a/version.h +++ b/version.h @@ -2,7 +2,7 @@ #define RUBY_RELEASE_DATE "2008-06-15" #define RUBY_VERSION_CODE 185 #define RUBY_RELEASE_CODE 20080615 -#define RUBY_PATCHLEVEL 205 +#define RUBY_PATCHLEVEL 206 #define RUBY_VERSION_MAJOR 1 #define RUBY_VERSION_MINOR 8 diff --git a/win32/win32.c b/win32/win32.c index f9c1af57aa..b408676359 100644 --- a/win32/win32.c +++ b/win32/win32.c @@ -371,6 +371,7 @@ static void invalid_parameter(const wchar_t *expr, const wchar_t *func, const wc } #endif +static CRITICAL_SECTION select_mutex; static BOOL fWinsock; static char *envarea; static void @@ -384,6 +385,7 @@ exit_handler(void) FreeEnvironmentStrings(envarea); envarea = NULL; } + DeleteCriticalSection(&select_mutex); } static void @@ -472,6 +474,8 @@ NtInitialize(int *argc, char ***argv) init_stdhandle(); + InitializeCriticalSection(&select_mutex); + atexit(exit_handler); // Initialize Winsock @@ -2043,87 +2047,250 @@ rb_w32_fdisset(int fd, fd_set *set) static int NtSocketsInitialized = 0; static int -extract_file_fd(fd_set *set, fd_set *fileset) +extract_fd(fd_set *dst, fd_set *src, int (*func)(SOCKET)) { - int idx; + int s = 0; + if (!src || !dst) return 0; - fileset->fd_count = 0; - if (!set) - return 0; - for (idx = 0; idx < set->fd_count; idx++) { - SOCKET fd = set->fd_array[idx]; + while (s < src->fd_count) { + SOCKET fd = src->fd_array[s]; - if (!is_socket(fd)) { - int i; + if (!func || (*func)(fd)) { /* move it to dst */ + int d; - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } + for (d = 0; d < dst->fd_count; d++) { + if (dst->fd_array[d] == fd) break; } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } + if (d == dst->fd_count && dst->fd_count < FD_SETSIZE) { + dst->fd_array[dst->fd_count++] = fd; + } + memmove( + &src->fd_array[s], + &src->fd_array[s+1], + sizeof(src->fd_array[0]) * (--src->fd_count - s)); + } + else s++; + } + + return dst->fd_count; +} + +static int +is_not_socket(SOCKET sock) +{ + return !is_socket(sock); +} + +static int +is_pipe(SOCKET sock) /* DONT call this for SOCKET! it clains it is PIPE. */ +{ + int ret; + + RUBY_CRITICAL( + ret = (GetFileType((HANDLE)sock) == FILE_TYPE_PIPE) + ); + + return ret; +} + +static int +is_readable_pipe(SOCKET sock) /* call this for pipe only */ +{ + int ret; + DWORD n = 0; + + RUBY_CRITICAL( + if (PeekNamedPipe((HANDLE)sock, NULL, 0, NULL, &n, NULL)) { + ret = (n > 0); + } + else { + ret = (GetLastError() == ERROR_BROKEN_PIPE); /* pipe was closed */ + } + ); + + return ret; +} + +static int +is_console(SOCKET sock) /* DONT call this for SOCKET! */ +{ + int ret; + DWORD n = 0; + INPUT_RECORD ir; + + RUBY_CRITICAL( + ret = (PeekConsoleInput((HANDLE)sock, &ir, 1, &n)) + ); + + return ret; +} + +static int +is_readable_console(SOCKET sock) /* call this for console only */ +{ + int ret = 0; + DWORD n = 0; + INPUT_RECORD ir; + + RUBY_CRITICAL( + if (PeekConsoleInput((HANDLE)sock, &ir, 1, &n) && n > 0) { + if (ir.EventType == KEY_EVENT && ir.Event.KeyEvent.bKeyDown && + ir.Event.KeyEvent.uChar.AsciiChar) { + ret = 1; + } + else { + ReadConsoleInput((HANDLE)sock, &ir, 1, &n); + } + } + ); + + return ret; +} + +static int +do_select(int nfds, fd_set *rd, fd_set *wr, fd_set *ex, + struct timeval *timeout) +{ + int r = 0; + + if (nfds == 0) { + if (timeout) + rb_w32_sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000); + else + rb_w32_sleep(INFINITE); + } + else { + RUBY_CRITICAL( + EnterCriticalSection(&select_mutex); + r = select(nfds, rd, wr, ex, timeout); + LeaveCriticalSection(&select_mutex); + if (r == SOCKET_ERROR) { + errno = map_errno(WSAGetLastError()); + r = -1; } + ); + } + + return r; +} + +static inline int +subst(struct timeval *rest, const struct timeval *wait) +{ + while (rest->tv_usec < wait->tv_usec) { + if (rest->tv_sec <= wait->tv_sec) { + return 0; } + rest->tv_sec -= 1; + rest->tv_usec += 1000 * 1000; } - return fileset->fd_count; + rest->tv_sec -= wait->tv_sec; + rest->tv_usec -= wait->tv_usec; + return 1; } +static inline int +compare(const struct timeval *t1, const struct timeval *t2) +{ + if (t1->tv_sec < t2->tv_sec) + return -1; + if (t1->tv_sec > t2->tv_sec) + return 1; + if (t1->tv_usec < t2->tv_usec) + return -1; + if (t1->tv_usec > t2->tv_usec) + return 1; + return 0; +} + +#undef Sleep long rb_w32_select (int nfds, fd_set *rd, fd_set *wr, fd_set *ex, struct timeval *timeout) { long r; - fd_set file_rd; - fd_set file_wr; -#ifdef USE_INTERRUPT_WINSOCK - fd_set trap; -#endif /* USE_INTERRUPT_WINSOCK */ - int file_nfds; + fd_set pipe_rd; + fd_set cons_rd; + fd_set else_rd; + fd_set else_wr; + int nonsock = 0; + if (nfds < 0 || (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0))) { + errno = EINVAL; + return -1; + } if (!NtSocketsInitialized) { StartSockets(); } + + // assume else_{rd,wr} (other than socket, pipe reader, console reader) + // are always readable/writable. but this implementation still has + // problem. if pipe's buffer is full, writing to pipe will block + // until some data is read from pipe. but ruby is single threaded system, + // so whole system will be blocked forever. + + else_rd.fd_count = 0; + nonsock += extract_fd(&else_rd, rd, is_not_socket); + + pipe_rd.fd_count = 0; + extract_fd(&pipe_rd, &else_rd, is_pipe); // should not call is_pipe for socket + + cons_rd.fd_count = 0; + extract_fd(&cons_rd, &else_rd, is_console); // ditto + + else_wr.fd_count = 0; + nonsock += extract_fd(&else_wr, wr, is_not_socket); + r = 0; if (rd && rd->fd_count > r) r = rd->fd_count; if (wr && wr->fd_count > r) r = wr->fd_count; if (ex && ex->fd_count > r) r = ex->fd_count; if (nfds > r) nfds = r; - if (nfds == 0 && timeout) { - Sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000); - return 0; - } - file_nfds = extract_file_fd(rd, &file_rd); - file_nfds += extract_file_fd(wr, &file_wr); - if (file_nfds) + { - // assume normal files are always readable/writable - // fake read/write fd_set and return value - if (rd) *rd = file_rd; - if (wr) *wr = file_wr; - return file_nfds; - } + struct timeval rest; + struct timeval wait; + struct timeval zero; + if (timeout) rest = *timeout; + wait.tv_sec = 0; wait.tv_usec = 10 * 1000; // 10ms + zero.tv_sec = 0; zero.tv_usec = 0; // 0ms + do { + if (nonsock) { + // modifying {else,pipe,cons}_rd is safe because + // if they are modified, function returns immediately. + extract_fd(&else_rd, &pipe_rd, is_readable_pipe); + extract_fd(&else_rd, &cons_rd, is_readable_console); + } -#if USE_INTERRUPT_WINSOCK - if (ex) - trap = *ex; - else - trap.fd_count = 0; - if (trap.fd_count < FD_SETSIZE) - trap.fd_array[trap.fd_count++] = (SOCKET)interrupted_event; - // else unable to catch interrupt. - ex = &trap; -#endif /* USE_INTERRUPT_WINSOCK */ + if (else_rd.fd_count || else_wr.fd_count) { + r = do_select(nfds, rd, wr, ex, &zero); // polling + if (r < 0) break; // XXX: should I ignore error and return signaled handles? + r += extract_fd(rd, &else_rd, NULL); // move all + r += extract_fd(wr, &else_wr, NULL); // move all + break; + } + else { + struct timeval *dowait = + compare(&rest, &wait) < 0 ? &rest : &wait; + + fd_set orig_rd; + fd_set orig_wr; + fd_set orig_ex; + if (rd) orig_rd = *rd; + if (wr) orig_wr = *wr; + if (ex) orig_ex = *ex; + r = do_select(nfds, rd, wr, ex, &zero); // polling + if (r != 0) break; // signaled or error + if (rd) *rd = orig_rd; + if (wr) *wr = orig_wr; + if (ex) *ex = orig_ex; + + // XXX: should check the time select spent + Sleep(dowait->tv_sec * 1000 + dowait->tv_usec / 1000); + } + } while (!timeout || subst(&rest, &wait)); + } - RUBY_CRITICAL({ - r = select(nfds, rd, wr, ex, timeout); - if (r == SOCKET_ERROR) { - errno = map_errno(WSAGetLastError()); - } - }); return r; } @@ -3258,7 +3425,6 @@ rb_w32_times(struct tms *tmbuf) return 0; } -#undef Sleep #define yield_once() Sleep(0) #define yield_until(condition) do yield_once(); while (!(condition)) -- cgit v1.2.3