From 706ee243a50793f4f3edd218baa11e91306e56b7 Mon Sep 17 00:00:00 2001 From: zzz Date: Sat, 4 Apr 2015 17:00:57 +0000 Subject: [PATCH] Streaming read timeout fixes: i2ptunnel: - Better timeout handling when reading headers in HTTP server (improved fix for ticket #723) Enforce total header timeout with new readLine() - Prep for returning specific HTTP errors to client on request timeout and header errors, instead of just closing socket... further work to be in i2p.i2p.zzz.test2 branch Streaming: - Fix read timeout on input stream - was waiting too long, often twice as long as timeout, or more Enforce total timeout even when notify()ed - Fix read() returning 0 on read timeout instead of -1 (possible fix for ticket #335) This prevents passing partial headers to server on timeout - Fix javadocs for read timeout to match current behavior - Fix StandardSocket SoTimeout to account for differences with I2PSocket readTimeout - log tweaks --- .../i2p/i2ptunnel/I2PTunnelHTTPServer.java | 101 ++++++++++++++++-- .../client/streaming/I2PSocketOptions.java | 8 +- .../client/streaming/impl/I2PSocketFull.java | 4 + .../streaming/impl/I2PSocketOptionsImpl.java | 8 +- .../streaming/impl/MessageInputStream.java | 75 +++++++++---- .../client/streaming/impl/StandardSocket.java | 13 ++- history.txt | 8 ++ .../src/net/i2p/router/RouterVersion.java | 2 +- 8 files changed, 181 insertions(+), 38 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index 3c8066c56..8f963e649 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -4,6 +4,7 @@ package net.i2p.i2ptunnel; import java.io.BufferedInputStream; +import java.io.EOFException; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -11,6 +12,7 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -63,9 +65,12 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { private static final String SERVER_HEADER = "Server"; private static final String X_POWERED_BY_HEADER = "X-Powered-By"; private static final String[] SERVER_SKIPHEADERS = {SERVER_HEADER, X_POWERED_BY_HEADER}; + /** timeout for first request line */ private static final long HEADER_TIMEOUT = 15*1000; + /** total timeout for the request and all the headers */ private static final long TOTAL_HEADER_TIMEOUT = 2 * HEADER_TIMEOUT; private static final long START_INTERVAL = (60 * 1000) * 3; + private static final int MAX_LINE_LENGTH = 8*1024; private long _startedOn = 0L; private ConnThrottler _postThrottler; @@ -206,12 +211,9 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { long afterAccept = getTunnel().getContext().clock().now(); // The headers _should_ be in the first packet, but // may not be, depending on the client-side options - socket.setReadTimeout(HEADER_TIMEOUT); - - InputStream in = socket.getInputStream(); StringBuilder command = new StringBuilder(128); - Map> headers = readHeaders(in, command, + Map> headers = readHeaders(socket, null, command, CLIENT_SKIPHEADERS, getTunnel().getContext()); long afterHeaders = getTunnel().getContext().clock().now(); @@ -435,7 +437,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { //Change headers to protect server identity StringBuilder command = new StringBuilder(128); - Map> headers = readHeaders(serverin, command, + Map> headers = readHeaders(null, serverin, command, SERVER_SKIPHEADERS, _ctx); String modifiedHeaders = formatHeaders(headers, command); compressedOut.write(modifiedHeaders.getBytes()); @@ -671,15 +673,33 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { } } - protected static Map> readHeaders(InputStream in, StringBuilder command, + /** + * From I2P to server: socket non-null, in null. + * From server to I2P: socket null, in non-null. + * + * @param socket if null, use in as InputStream + * @param in if null, use socket.getInputStream() as InputStream + * @param command out parameter, first line + * @param command out parameter, first line + * @throws SocketTimeoutException if timeout is reached before newline + * @throws EOFException if EOF is reached before newline + * @throws LineTooLongException if too long + * @throws IOException on other errors in the underlying stream + */ + private static Map> readHeaders(I2PSocket socket, InputStream in, StringBuilder command, String[] skipHeaders, I2PAppContext ctx) throws IOException { HashMap> headers = new HashMap>(); StringBuilder buf = new StringBuilder(128); // slowloris / darkloris long expire = ctx.clock().now() + TOTAL_HEADER_TIMEOUT; - boolean ok = DataHelper.readLine(in, command); - if (!ok) throw new IOException("EOF reached while reading the HTTP command [" + command.toString() + "]"); + if (socket != null) { + readLine(socket, command, HEADER_TIMEOUT); + } else { + boolean ok = DataHelper.readLine(in, command); + if (!ok) + throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]"); + } //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Read the http command [" + command.toString() + "]"); @@ -703,14 +723,19 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { if (++i > MAX_HEADERS) throw new IOException("Too many header lines - max " + MAX_HEADERS); buf.setLength(0); - ok = DataHelper.readLine(in, buf); - if (!ok) throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]"); + if (socket != null) { + readLine(socket, buf, expire - ctx.clock().now()); + } else { + boolean ok = DataHelper.readLine(in, buf); + if (!ok) + throw new IOException("EOF reached before the end of the headers [" + buf.toString() + "]"); + } if ( (buf.length() == 0) || ((buf.charAt(0) == '\n') || (buf.charAt(0) == '\r')) ) { // end of headers reached return headers; } else { - if (ctx.clock().now() > expire) + if (ctx.clock().now() >= expire) throw new IOException("Headers took too long [" + buf.toString() + "]"); int split = buf.indexOf(":"); if (split <= 0) throw new IOException("Invalid HTTP header, missing colon [" + buf.toString() + "]"); @@ -752,5 +777,59 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { } } } + + /** + * Read a line teriminated by newline, with a total read timeout. + * + * Warning - strips \n but not \r + * Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded + * Warning - not UTF-8 + * + * @param buf output + * @param timeout throws SocketTimeoutException immediately if zero or negative + * @throws SocketTimeoutException if timeout is reached before newline + * @throws EOFException if EOF is reached before newline + * @throws LineTooLongException if too long + * @throws IOException on other errors in the underlying stream + * @since 0.9.19 modified from DataHelper + */ + private static void readLine(I2PSocket socket, StringBuilder buf, long timeout) throws IOException { + if (timeout <= 0) + throw new SocketTimeoutException(); + long expires = System.currentTimeMillis() + timeout; + InputStream in = socket.getInputStream(); + int c; + int i = 0; + socket.setReadTimeout(timeout); + while ( (c = in.read()) != -1) { + if (++i > MAX_LINE_LENGTH) + throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH); + if (c == '\n') + break; + long newTimeout = expires - System.currentTimeMillis(); + if (newTimeout <= 0) + throw new SocketTimeoutException(); + buf.append((char)c); + if (newTimeout != timeout) { + timeout = newTimeout; + socket.setReadTimeout(timeout); + } + } + if (c == -1) { + if (System.currentTimeMillis() >= expires) + throw new SocketTimeoutException(); + else + throw new EOFException(); + } + } + + /** + * @since 0.9.19 + */ + private static class LineTooLongException extends IOException { + public LineTooLongException(String s) { + super(s); + } + } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 1095a9334..692c04f7b 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -37,22 +37,22 @@ public interface I2PSocketOptions { /** * What is the longest we'll block on the input stream while waiting * for more data. If this value is exceeded, the read() throws - * InterruptedIOException + * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead. * * WARNING: Default -1 (unlimited), which is probably not what you want. * - * @return timeout in ms + * @return timeout in ms, 0 for nonblocking, -1 for forever */ public long getReadTimeout(); /** * What is the longest we'll block on the input stream while waiting * for more data. If this value is exceeded, the read() throws - * InterruptedIOException + * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead. * * WARNING: Default -1 (unlimited), which is probably not what you want. * - * @param ms timeout in ms + * @param ms timeout in ms, 0 for nonblocking, -1 for forever */ public void setReadTimeout(long ms); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java index 2885db4c9..5d3bec526 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketFull.java @@ -51,6 +51,8 @@ class I2PSocketFull implements I2PSocket { } Connection c = _connection; if (c == null) return; + if (log.shouldLog(Log.INFO)) + log.info("close() called, connected? " + c.getIsConnected() + " : " + c); if (c.getIsConnected()) { MessageInputStream in = c.getInputStream(); in.close(); @@ -136,6 +138,8 @@ class I2PSocketFull implements I2PSocket { Connection c = _connection; if (c == null) return; + if (ms > Integer.MAX_VALUE) + ms = Integer.MAX_VALUE; c.getInputStream().setReadTimeout((int)ms); c.getOptions().setReadTimeout(ms); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketOptionsImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketOptionsImpl.java index 5516451a0..63401f92d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketOptionsImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/I2PSocketOptionsImpl.java @@ -144,9 +144,11 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { /** * What is the longest we'll block on the input stream while waiting * for more data. If this value is exceeded, the read() throws - * InterruptedIOException + * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead. * * WARNING: Default -1 (unlimited), which is probably not what you want. + * + * @return timeout in ms, 0 for nonblocking, -1 for forever */ public long getReadTimeout() { return _readTimeout; @@ -155,9 +157,11 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { /** * What is the longest we'll block on the input stream while waiting * for more data. If this value is exceeded, the read() throws - * InterruptedIOException + * InterruptedIOException - FIXME doesn't really, returns -1 or 0 instead. * * WARNING: Default -1 (unlimited), which is probably not what you want. + * + * @param ms timeout in ms, 0 for nonblocking, -1 for forever */ public void setReadTimeout(long ms) { _readTimeout = ms; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index 8527bc72f..b8ea9a0a2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -170,10 +170,15 @@ class MessageInputStream extends InputStream { /** * how long a read() call should block (if less than 0, block indefinitely, * but if it is 0, do not block at all) - * @return how long read calls should block, 0 or less indefinitely block + * @return how long read calls should block, 0 for nonblocking, negative to indefinitely block */ public int getReadTimeout() { return _readTimeout; } + /** + * how long a read() call should block (if less than 0, block indefinitely, + * but if it is 0, do not block at all) + * @param timeout how long read calls should block, 0 for nonblocking, negative to indefinitely block + */ public void setReadTimeout(int timeout) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout); @@ -230,7 +235,8 @@ class MessageInputStream extends InputStream { */ public boolean messageReceived(long messageId, ByteArray payload) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); + _log.debug("received msg ID " + messageId + " with " + + (payload != null ? payload.getValid() + " bytes" : "no payload")); synchronized (_dataLock) { if (messageId <= _highestReadyBlockId) { if (_log.shouldLog(Log.INFO)) @@ -261,6 +267,10 @@ class MessageInputStream extends InputStream { cur++; _highestReadyBlockId++; } + // FIXME Javadocs for setReadTimeout() say we will throw + // an InterruptedIOException. + // Java throws a SocketTimeoutException. + // We do neither. } else { if (_log.shouldLog(Log.INFO)) _log.info("Message is out of order: " + messageId); @@ -275,23 +285,41 @@ class MessageInputStream extends InputStream { return true; } + /** + * On a read timeout, this returns -1 + * (doesn't throw SocketTimeoutException like Socket) + * (doesn't throw InterruptedIOException like our javadocs say) + */ public int read() throws IOException { int read = read(_oneByte, 0, 1); - if (read < 0) + if (read <= 0) return -1; return _oneByte[0] & 0xff; } + /** + * On a read timeout, this returns 0 + * (doesn't throw SocketTimeoutException like Socket) + * (doesn't throw InterruptedIOException like our javadocs say) + */ @Override public int read(byte target[]) throws IOException { return read(target, 0, target.length); } + /** + * On a read timeout, this returns 0 + * (doesn't throw SocketTimeoutException like Socket) + * (doesn't throw InterruptedIOException like our javadocs say) + */ @Override public int read(byte target[], int offset, int length) throws IOException { - long expiration = -1; - if (_readTimeout > 0) - expiration = _readTimeout + System.currentTimeMillis(); + int readTimeout = _readTimeout; + long expiration; + if (readTimeout > 0) + expiration = readTimeout + System.currentTimeMillis(); + else + expiration = -1; synchronized (_dataLock) { if (_locallyClosed) throw new IOException("Already locally closed"); throwAnyError(); @@ -310,10 +338,10 @@ class MessageInputStream extends InputStream { + "] got EOF after " + _readTotal + " " + toString()); return -1; } else { - if (_readTimeout < 0) { + if (readTimeout < 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i - + ") with no timeout: " + toString()); + + "] with no timeout: " + toString()); try { _dataLock.wait(); } catch (InterruptedException ie) { @@ -323,14 +351,14 @@ class MessageInputStream extends InputStream { } if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i - + ") with no timeout complete: " + toString()); + + "] with no timeout complete: " + toString()); throwAnyError(); - } else if (_readTimeout > 0) { + } else if (readTimeout > 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i - + ") with timeout: " + _readTimeout + ": " + toString()); + + "] with timeout: " + readTimeout + ": " + toString()); try { - _dataLock.wait(_readTimeout); + _dataLock.wait(readTimeout); } catch (InterruptedException ie) { IOException ioe2 = new InterruptedIOException("Interrupted read"); ioe2.initCause(ie); @@ -338,21 +366,30 @@ class MessageInputStream extends InputStream { } if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i - + ") with timeout complete: " + _readTimeout + ": " + toString()); + + "] with timeout complete: " + readTimeout + ": " + toString()); throwAnyError(); } else { // readTimeout == 0 // noop, don't block if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i - + ") with nonblocking setup: " + toString()); + + "] with nonblocking setup: " + toString()); return i; } if (_readyDataBlocks.isEmpty()) { - if ( (_readTimeout > 0) && (expiration < System.currentTimeMillis()) ) { - if (_log.shouldLog(Log.INFO)) - _log.info("read(...," + offset+", " + length+ ")[" + i - + ") expired: " + toString()); - return i; + if (readTimeout > 0) { + long remaining = expiration - System.currentTimeMillis(); + if (remaining <= 0) { + // FIXME Javadocs for setReadTimeout() say we will throw + // an InterruptedIOException. + // Java throws a SocketTimeoutException. + // We do neither. + if (_log.shouldLog(Log.INFO)) + _log.info("read(...," + offset+", " + length+ ")[" + i + + "] expired: " + toString()); + return i; + } else { + readTimeout = (int) remaining; + } } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java index 686eadd98..151c1f395 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/StandardSocket.java @@ -193,7 +193,15 @@ class StandardSocket extends Socket { I2PSocketOptions opts = _socket.getOptions(); if (opts == null) return 0; - return (int) opts.getReadTimeout(); + long rv = opts.getReadTimeout(); + // Java Socket: 0 is forever, and we don't exactly have nonblocking + if (rv > Integer.MAX_VALUE) + rv = Integer.MAX_VALUE; + else if (rv < 0) + rv = 0; + else if (rv == 0) + rv = 1; + return (int) rv; } /** @@ -309,6 +317,9 @@ class StandardSocket extends Socket { I2PSocketOptions opts = _socket.getOptions(); if (opts == null) throw new SocketException("No options"); + // Java Socket: 0 is forever + if (timeout == 0) + timeout = -1; opts.setReadTimeout(timeout); } diff --git a/history.txt b/history.txt index 7fd596003..4f731d065 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,11 @@ +2015-04-04 zzz + * i2ptunnel: + - Better timeout handling when reading headers in HTTP server (ticket #723) + - Fix NoSuchElementException processing proxyList caused by 03-31 checkin + * Streaming: + - Fix read timeout on input stream (ticket #723) + - Fix read() returning 0 instead of -1 on read timeout (ticket #335) + 2015-04-03 zzz * i2ptunnel: Fix stopping tunnel on bad args when starting * wrapper.config: Remove old mortbay Jetty parameters diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 421ff4f63..098e16d49 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 16; + public final static long BUILD = 17; /** for example "-test" */ public final static String EXTRA = "-rc";