diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java index 1b460a3c24d709a8b6b3ce48f9426348f29fb130..e57993a2c39b58193661bfe1e3e6a4b0edb0a69d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketEepGet.java @@ -76,6 +76,19 @@ public class I2PSocketEepGet extends EepGet { return rv; } + /** + * Overridden to disable inline gunzipping + * @since 0.8.10 + */ + @Override + protected void readHeaders() throws IOException { + try { + super.readHeaders(); + } finally { + _isGzippedResponse = false; + } + } + /** * Look up the address, get a socket from the I2PSocketManager supplied in the constructor, * and send the request. diff --git a/core/java/src/net/i2p/util/EepGet.java b/core/java/src/net/i2p/util/EepGet.java index c5d3861aef3bd566c28aa5beca96c05a5d872fc3..7a59775d19197cf9d1b6380342b7493d35666257 100644 --- a/core/java/src/net/i2p/util/EepGet.java +++ b/core/java/src/net/i2p/util/EepGet.java @@ -4,6 +4,8 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.io.OutputStream; import java.net.MalformedURLException; import java.net.Socket; @@ -11,6 +13,7 @@ import java.net.URL; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Date; +import java.util.Formatter; import java.util.List; import java.util.StringTokenizer; @@ -72,6 +75,9 @@ public class EepGet { protected long _fetchInactivityTimeout; protected int _redirects; protected String _redirectLocation; + protected boolean _isGzippedResponse; + protected IOException _decompressException; + /** this will be replaced by the HTTP Proxy if we are using it */ protected static final String USER_AGENT = "Wget/1.11.4"; protected static final long CONNECT_TIMEOUT = 45*1000; @@ -82,30 +88,39 @@ public class EepGet { public EepGet(I2PAppContext ctx, String proxyHost, int proxyPort, int numRetries, String outputFile, String url) { this(ctx, true, proxyHost, proxyPort, numRetries, outputFile, url); } + public EepGet(I2PAppContext ctx, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, boolean allowCaching) { this(ctx, true, proxyHost, proxyPort, numRetries, outputFile, url, allowCaching, null); } + public EepGet(I2PAppContext ctx, int numRetries, String outputFile, String url) { this(ctx, false, null, -1, numRetries, outputFile, url); } + public EepGet(I2PAppContext ctx, int numRetries, String outputFile, String url, boolean allowCaching) { this(ctx, false, null, -1, numRetries, outputFile, url, allowCaching, null); } + public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url) { this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, outputFile, url, true, null); } + public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, String postData) { this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, -1, -1, outputFile, null, url, true, null, postData); } + public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, boolean allowCaching, String etag) { this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, -1, -1, outputFile, null, url, allowCaching, etag, null); } + public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, String outputFile, String url, boolean allowCaching, String etag, String lastModified) { this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, -1, -1, outputFile, null, url, allowCaching, etag, lastModified, null); } + public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, long minSize, long maxSize, String outputFile, OutputStream outputStream, String url, boolean allowCaching, String etag, String postData) { this(ctx, shouldProxy, proxyHost, proxyPort, numRetries, minSize, maxSize, outputFile, outputStream, url, allowCaching, etag, null, postData); } + public EepGet(I2PAppContext ctx, boolean shouldProxy, String proxyHost, int proxyPort, int numRetries, long minSize, long maxSize, String outputFile, OutputStream outputStream, String url, boolean allowCaching, String etag, String lastModified, String postData) { @@ -149,10 +164,9 @@ public class EepGet { try { for (int i = 0; i < args.length; i++) { if (args[i].equals("-p")) { - proxyHost = args[i+1].substring(0, args[i+1].indexOf(':')); - String port = args[i+1].substring(args[i+1].indexOf(':')+1); + proxyHost = args[++i].substring(0, args[i].indexOf(':')); + String port = args[i].substring(args[i].indexOf(':')+1); proxyPort = Integer.parseInt(port); - i++; } else if (args[i].equals("-n")) { numRetries = Integer.parseInt(args[i+1]); i++; @@ -239,9 +253,10 @@ public class EepGet { } private static void usage() { - System.err.println("EepGet [-p 127.0.0.1:4444] [-n #retries] [-o outputFile] " + - "[-m markSize lineLen] [-t timeout] [-h headerKey headerValue] " + - "[-u username password] url"); + System.err.println("EepGet [-p 127.0.0.1:4444] [-n #retries] [-o outputFile]\n" + + " [-m markSize lineLen] [-t timeout] [-h headerKey headerValue]\n" + + " [-u username password] url]\n" + + " (use -p :0 for no proxy)"); } public static interface StatusListener { @@ -323,31 +338,26 @@ public class EepGet { long timeToSend = now - _lastComplete; if (timeToSend > 0) { StringBuilder buf = new StringBuilder(50); + Formatter fmt = new Formatter(buf); buf.append(" "); if ( bytesRemaining > 0 ) { - double pct = ((double)_written + _previousWritten) / + double pct = 100 * ((double)_written + _previousWritten) / ((double)alreadyTransferred + (double)currentWrite + (double)bytesRemaining); - synchronized (_pct) { - buf.append(_pct.format(pct)); - } - buf.append(": "); + fmt.format("%4.1f", Double.valueOf(pct)); + buf.append("%: "); } - buf.append(_written); + fmt.format("%8d", Long.valueOf(_written)); buf.append(" @ "); double lineKBytes = ((double)_markSize * (double)_lineSize)/1024.0d; double kbps = lineKBytes/((double)timeToSend/1000.0d); - synchronized (_kbps) { - buf.append(_kbps.format(kbps)); - } - buf.append("KBps"); + fmt.format("%7.2f", Double.valueOf(kbps)); + buf.append(" KBps"); buf.append(" / "); long lifetime = _context.clock().now() - _startedOn; double lifetimeKBps = (1000.0d*(double)(_written)/((double)lifetime*1024.0d)); - synchronized (_kbps) { - buf.append(_kbps.format(lifetimeKBps)); - } - buf.append("KBps"); + fmt.format("%7.2f", Double.valueOf(lifetimeKBps)); + buf.append(" KBps"); System.out.println(buf.toString()); } _lastComplete = now; @@ -589,8 +599,18 @@ public class EepGet { if ((_maxSize > -1) && (_bytesRemaining > _maxSize)) throw new IOException("HTTP response size " + _bytesRemaining + " violates maximum of " + _maxSize + " bytes"); + Thread pusher = null; + _decompressException = null; + if (_isGzippedResponse) { + PipedInputStream pi = BigPipedInputStream.getInstance(); + PipedOutputStream po = new PipedOutputStream(pi); + pusher = new I2PAppThread(new Gunzipper(pi, _out), "EepGet Decompressor"); + _out = po; + pusher.start(); + } + int remaining = (int)_bytesRemaining; - byte buf[] = new byte[8*1024]; + byte buf[] = new byte[16*1024]; while (_keepFetching && ( (remaining > 0) || !strictSize ) && !_aborted) { int toRead = buf.length; if (strictSize && toRead > remaining) @@ -648,6 +668,18 @@ public class EepGet { _out.close(); _out = null; + if (_isGzippedResponse) { + try { + pusher.join(); + } catch (InterruptedException ie) {} + pusher = null; + if (_decompressException != null) { + // we can't resume from here + _keepFetching = false; + throw _decompressException; + } + } + if (_aborted) throw new IOException("Timed out reading the HTTP data"); @@ -785,6 +817,7 @@ public class EepGet { _transferFailed = true; } + _isGzippedResponse = false; // clear out the arguments, as we use the same variables for return values _etag = null; _lastModified = null; @@ -898,28 +931,35 @@ public class EepGet { } private void handle(String key, String val) { + key = key.trim(); + val = val.trim(); for (int i = 0; i < _listeners.size(); i++) - _listeners.get(i).headerReceived(_url, _currentAttempt, key.trim(), val.trim()); + _listeners.get(i).headerReceived(_url, _currentAttempt, key, val); if (_log.shouldLog(Log.DEBUG)) _log.debug("Header line: [" + key + "] = [" + val + "]"); if (key.equalsIgnoreCase("Content-length")) { try { - _bytesRemaining = Long.parseLong(val.trim()); + _bytesRemaining = Long.parseLong(val); } catch (NumberFormatException nfe) { nfe.printStackTrace(); } } else if (key.equalsIgnoreCase("ETag")) { - _etag = val.trim(); + _etag = val; } else if (key.equalsIgnoreCase("Last-Modified")) { - _lastModified = val.trim(); + _lastModified = val; } else if (key.equalsIgnoreCase("Transfer-encoding")) { - if (val.indexOf("chunked") != -1) - _encodingChunked = true; + _encodingChunked = val.toLowerCase().contains("chunked"); + } else if (key.equalsIgnoreCase("Content-encoding")) { + // This is kindof a hack, but if we are downloading a gzip file + // we don't want to transparently gunzip it and save it as a .gz file. + // A query string will also mess this up + if ((!_actualURL.endsWith(".gz")) && (!_actualURL.endsWith(".tgz"))) + _isGzippedResponse = val.toLowerCase().contains("gzip"); } else if (key.equalsIgnoreCase("Content-Type")) { - _contentType=val.trim(); + _contentType=val; } else if (key.equalsIgnoreCase("Location")) { - _redirectLocation=val.trim(); + _redirectLocation=val; } else { // ignore the rest } @@ -1041,8 +1081,13 @@ public class EepGet { if (post) buf.append("Content-length: ").append(_postData.length()).append("\r\n"); // This will be replaced if we are going through I2PTunnelHTTPClient - buf.append("User-Agent: " + USER_AGENT + "\r\n" + - "Accept-Encoding: \r\n" + + buf.append("Accept-Encoding: "); + if ((!_shouldProxy) && + // This is kindof a hack, but if we are downloading a gzip file + // we don't want to transparently gunzip it and save it as a .gz file. + (!path.endsWith(".gz")) && (!path.endsWith(".tgz"))) + buf.append("gzip"); + buf.append("\r\nUser-Agent: " + USER_AGENT + "\r\n" + "Connection: close\r\n"); if (_extraHeaders != null) { for (String hdr : _extraHeaders) { @@ -1127,4 +1172,48 @@ public class EepGet { addHeader("Proxy-Authorization", "Basic " + Base64.encode((userName + ':' + password).getBytes(), true)); // true = use standard alphabet } + + /** + * Decompressor thread. + * Copied / modified from i2ptunnel HTTPResponseOutputStream (GPL) + * + * @since 0.8.10 + */ + protected class Gunzipper implements Runnable { + private final InputStream _inRaw; + private final OutputStream _out; + + public Gunzipper(InputStream in, OutputStream out) { + _inRaw = in; + _out = out; + } + + public void run() { + ReusableGZIPInputStream in = null; + long written = 0; + try { + in = ReusableGZIPInputStream.acquire(); + // blocking + in.initialize(_inRaw); + byte buf[] = new byte[8*1024]; + int read = -1; + while ( (read = in.read(buf)) != -1) { + _out.write(buf, 0, read); + } + } catch (IOException ioe) { + _decompressException = ioe; + if (_log.shouldLog(Log.WARN)) + _log.warn("Error decompressing: " + written + ", " + (in != null ? in.getTotalRead() + "/" + in.getTotalExpanded() : ""), ioe); + } catch (OutOfMemoryError oom) { + _decompressException = new IOException("OOM in HTTP Decompressor"); + _log.error("OOM in HTTP Decompressor", oom); + } finally { + if (_out != null) try { + _out.close(); + } catch (IOException ioe) {} + if (in != null) + ReusableGZIPInputStream.release(in); + } + } + } } diff --git a/core/java/src/net/i2p/util/SSLEepGet.java b/core/java/src/net/i2p/util/SSLEepGet.java index d35545ae7e54bb039ce7e855eeec01b56034bd2b..e108c7157e04e73baaa853a691d17c0cc49d7083 100644 --- a/core/java/src/net/i2p/util/SSLEepGet.java +++ b/core/java/src/net/i2p/util/SSLEepGet.java @@ -43,6 +43,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; import java.io.PrintWriter; import java.net.MalformedURLException; import java.net.URL; @@ -499,8 +501,18 @@ public class SSLEepGet extends EepGet { boolean strictSize = (_bytesRemaining >= 0); + Thread pusher = null; + _decompressException = null; + if (_isGzippedResponse) { + PipedInputStream pi = BigPipedInputStream.getInstance(); + PipedOutputStream po = new PipedOutputStream(pi); + pusher = new I2PAppThread(new Gunzipper(pi, _out), "EepGet Decompressor"); + _out = po; + pusher.start(); + } + int remaining = (int)_bytesRemaining; - byte buf[] = new byte[1024]; + byte buf[] = new byte[16*1024]; while (_keepFetching && ( (remaining > 0) || !strictSize ) && !_aborted) { int toRead = buf.length; if (strictSize && toRead > remaining) @@ -557,6 +569,18 @@ public class SSLEepGet extends EepGet { _out.close(); _out = null; + if (_isGzippedResponse) { + try { + pusher.join(); + } catch (InterruptedException ie) {} + pusher = null; + if (_decompressException != null) { + // we can't resume from here + _keepFetching = false; + throw _decompressException; + } + } + if (_aborted) throw new IOException("Timed out reading the HTTP data");