diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 620d40a39..7be986cc8 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -20,9 +20,11 @@ import java.util.concurrent.RejectedExecutionException; import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; +import net.i2p.util.BigPipedInputStream; import net.i2p.util.ByteCache; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; +import net.i2p.util.ReusableGZIPInputStream; /** * This does the transparent gzip decompression on the client side. @@ -44,7 +46,6 @@ class HTTPResponseOutputStream extends FilterOutputStream { private final byte _buf1[]; protected boolean _gzip; private long _dataWritten; - private InternalGZIPInputStream _in; private static final int CACHE_SIZE = 8*1024; private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE); // OOM DOS prevention @@ -227,7 +228,7 @@ class HTTPResponseOutputStream extends FilterOutputStream { protected void beginProcessing() throws IOException { //out.flush(); - PipedInputStream pi = new PipedInputStream(); + PipedInputStream pi = BigPipedInputStream.getInstance(); PipedOutputStream po = new PipedOutputStream(pi); // Run in the client thread pool, as there should be an unused thread // there after the accept(). @@ -242,25 +243,29 @@ class HTTPResponseOutputStream extends FilterOutputStream { } private class Pusher implements Runnable { - private InputStream _inRaw; - private OutputStream _out; + private final InputStream _inRaw; + private final OutputStream _out; + public Pusher(InputStream in, OutputStream out) { _inRaw = in; _out = out; } + public void run() { - _in = null; + ReusableGZIPInputStream _in = null; long written = 0; ByteArray ba = null; try { - _in = new InternalGZIPInputStream(_inRaw); + _in = ReusableGZIPInputStream.acquire(); + // blocking + _in.initialize(_inRaw); ba = _cache.acquire(); byte buf[] = ba.getData(); int read = -1; while ( (read = _in.read(buf)) != -1) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Read " + read + " and writing it to the browser/streams"); - _out.write(buf, 0, read); +; _out.write(buf, 0, read); _out.flush(); written += read; } @@ -286,68 +291,21 @@ class HTTPResponseOutputStream extends FilterOutputStream { } catch (IOException ioe) {} } - double compressed = (_in != null ? _in.getTotalRead() : 0); - double expanded = (_in != null ? _in.getTotalExpanded() : 0); - if (compressed > 0 && expanded > 0) { - // only update the stats if we did something - double ratio = compressed/expanded; - _context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0); - _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0); - _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0); + if (_in != null) { + double compressed = _in.getTotalRead(); + double expanded = _in.getTotalExpanded(); + ReusableGZIPInputStream.release(_in); + if (compressed > 0 && expanded > 0) { + // only update the stats if we did something + double ratio = compressed/expanded; + _context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0); + _context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0); + _context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0); + } } } } - /** just a wrapper to provide stats for debugging */ - private static class InternalGZIPInputStream extends GZIPInputStream { - public InternalGZIPInputStream(InputStream in) throws IOException { - super(in); - } - public long getTotalRead() { - try { - return super.inf.getTotalIn(); - } catch (Exception e) { - return 0; - } - } - public long getTotalExpanded() { - try { - return super.inf.getTotalOut(); - } catch (Exception e) { - return 0; - } - } - - /** - * From Inflater javadoc: - * Returns the total number of bytes remaining in the input buffer. This can be used to find out - * what bytes still remain in the input buffer after decompression has finished. - */ - public long getRemaining() { - try { - return super.inf.getRemaining(); - } catch (Exception e) { - return 0; - } - } - public boolean getFinished() { - try { - return super.inf.finished(); - } catch (Exception e) { - return true; - } - } - @Override - public String toString() { - return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished(); - } - } - - @Override - public String toString() { - return super.toString() + ": " + _in; - } - /******* public static void main(String args[]) { String simple = "HTTP/1.1 200 OK\n" + diff --git a/core/java/src/net/i2p/util/BigPipedInputStream.java b/core/java/src/net/i2p/util/BigPipedInputStream.java new file mode 100644 index 000000000..56555ad88 --- /dev/null +++ b/core/java/src/net/i2p/util/BigPipedInputStream.java @@ -0,0 +1,44 @@ +package net.i2p.util; + +import java.io.PipedInputStream; + +/** + * Java 1.5 PipedInputStream buffers are only 1024 bytes; our I2CP messages are typically 1730 bytes, + * thus causing thread blockage before the whole message is transferred. + * We can specify buffer size in 1.6 but not in 1.5. + * + * Until we switch to Java 1.6 - + * http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/ + * + * Moved from InternalServerSocket. + * @since 0.8.9 + */ +public class BigPipedInputStream extends PipedInputStream { + + private static final boolean oneDotSix = + (new VersionComparator()).compare(System.getProperty("java.version"), "1.6") >= 0; + + private static final int PIPE_SIZE = 64*1024; + + private BigPipedInputStream(int size) { + super(); + buffer = new byte[size]; + } + + /** default size 64K */ + public static PipedInputStream getInstance() { + return getInstance(PIPE_SIZE); + } + + public static PipedInputStream getInstance(int size) { + if (oneDotSix) { + try { + return new PipedInputStream(size); + } catch (Throwable t) { + // NoSuchMethodException or NoSuchMethodError if we somehow got the + // version detection wrong or the JVM doesn't support it + } + } + return new BigPipedInputStream(size); + } +} diff --git a/core/java/src/net/i2p/util/InternalServerSocket.java b/core/java/src/net/i2p/util/InternalServerSocket.java index a561a9905..8e88afc5c 100644 --- a/core/java/src/net/i2p/util/InternalServerSocket.java +++ b/core/java/src/net/i2p/util/InternalServerSocket.java @@ -99,8 +99,8 @@ public class InternalServerSocket extends ServerSocket { InternalServerSocket iss = _sockets.get(Integer.valueOf(port)); if (iss == null) throw new IOException("No server for port: " + port); - PipedInputStream cis = new BigPipedInputStream(); - PipedInputStream sis = new BigPipedInputStream(); + PipedInputStream cis = BigPipedInputStream.getInstance(); + PipedInputStream sis = BigPipedInputStream.getInstance(); PipedOutputStream cos = new PipedOutputStream(sis); PipedOutputStream sos = new PipedOutputStream(cis); clientSock.setInputStream(cis); @@ -108,18 +108,6 @@ public class InternalServerSocket extends ServerSocket { iss.queueConnection(new InternalSocket(sis, sos)); } - /** - * Until we switch to Java 1.6 - * http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/ - */ - private static class BigPipedInputStream extends PipedInputStream { - protected static int PIPE_SIZE = 64*1024; - public BigPipedInputStream() { - super(); - buffer = new byte[PIPE_SIZE]; - } - } - private void queueConnection(InternalSocket sock) throws IOException { if (!_running) throw new IOException("Server closed for port: " + _port); diff --git a/core/java/src/net/i2p/util/LookaheadInputStream.java b/core/java/src/net/i2p/util/LookaheadInputStream.java index d9a1d9f93..c3b38e095 100644 --- a/core/java/src/net/i2p/util/LookaheadInputStream.java +++ b/core/java/src/net/i2p/util/LookaheadInputStream.java @@ -14,17 +14,17 @@ import java.util.Arrays; */ public class LookaheadInputStream extends FilterInputStream { private boolean _eofReached; - private byte[] _footerLookahead; + private final byte[] _footerLookahead; private static final InputStream _fakeInputStream = new ByteArrayInputStream(new byte[0]); public LookaheadInputStream(int lookaheadSize) { super(_fakeInputStream); - _eofReached = false; _footerLookahead = new byte[lookaheadSize]; } public boolean getEOFReached() { return _eofReached; } + /** blocking! */ public void initialize(InputStream src) throws IOException { in = src; _eofReached = false; @@ -35,7 +35,6 @@ public class LookaheadInputStream extends FilterInputStream { if (read == -1) throw new IOException("EOF reading the footer lookahead"); footerRead += read; } - boolean f = true; } @Override @@ -53,10 +52,12 @@ public class LookaheadInputStream extends FilterInputStream { if (rv < 0) rv += 256; return rv; } + @Override public int read(byte buf[]) throws IOException { return read(buf, 0, buf.length); } + @Override public int read(byte buf[], int off, int len) throws IOException { if (_eofReached) diff --git a/core/java/src/net/i2p/util/ResettableGZIPInputStream.java b/core/java/src/net/i2p/util/ResettableGZIPInputStream.java index 19c5a903f..2e3f8857a 100644 --- a/core/java/src/net/i2p/util/ResettableGZIPInputStream.java +++ b/core/java/src/net/i2p/util/ResettableGZIPInputStream.java @@ -20,9 +20,9 @@ public class ResettableGZIPInputStream extends InflaterInputStream { private static final int FOOTER_SIZE = 8; // CRC32 + ISIZE private static final boolean DEBUG = false; /** keep a typesafe copy of (LookaheadInputStream)in */ - private LookaheadInputStream _lookaheadStream; - private CRC32 _crc32; - private byte _buf1[] = new byte[1]; + private final LookaheadInputStream _lookaheadStream; + private final CRC32 _crc32; + private final byte _buf1[] = new byte[1]; private boolean _complete; /** @@ -34,8 +34,11 @@ public class ResettableGZIPInputStream extends InflaterInputStream { super(new LookaheadInputStream(FOOTER_SIZE), new Inflater(true)); _lookaheadStream = (LookaheadInputStream)in; _crc32 = new CRC32(); - _complete = false; } + + /** + * Warning - blocking! + */ public ResettableGZIPInputStream(InputStream compressedStream) throws IOException { this(); initialize(compressedStream); @@ -78,6 +81,7 @@ public class ResettableGZIPInputStream extends InflaterInputStream { public int read(byte buf[]) throws IOException { return read(buf, 0, buf.length); } + @Override public int read(byte buf[], int off, int len) throws IOException { if (_complete) { @@ -100,9 +104,69 @@ public class ResettableGZIPInputStream extends InflaterInputStream { } } - long getCurrentCRCVal() { return _crc32.getValue(); } + /** + * Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream + * @since 0.8.9 + */ + public long getTotalRead() { + try { + return inf.getBytesRead(); + } catch (Exception e) { + return 0; + } + } + + /** + * Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream + * @since 0.8.9 + */ + public long getTotalExpanded() { + try { + return inf.getBytesWritten(); + } catch (Exception e) { + // possible NPE in some implementations + return 0; + } + } + + /** + * Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream + * @since 0.8.9 + */ + public long getRemaining() { + try { + return inf.getRemaining(); + } catch (Exception e) { + // possible NPE in some implementations + return 0; + } + } + + /** + * Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream + * @since 0.8.9 + */ + public boolean getFinished() { + try { + return inf.finished(); + } catch (Exception e) { + // possible NPE in some implementations + return true; + } + } + + /** + * Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream + * @since 0.8.9 + */ + @Override + public String toString() { + return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished(); + } + + private long getCurrentCRCVal() { return _crc32.getValue(); } - void verifyFooter() throws IOException { + private void verifyFooter() throws IOException { byte footer[] = _lookaheadStream.getFooter(); long expectedCRCVal = _crc32.getValue(); diff --git a/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java b/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java index 06d1506d4..9895c5f0a 100644 --- a/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java +++ b/core/java/src/net/i2p/util/ResettableGZIPOutputStream.java @@ -26,14 +26,14 @@ public class ResettableGZIPOutputStream extends DeflaterOutputStream { private boolean _headerWritten; /** how much data is in the uncompressed stream? */ private long _writtenSize; - private CRC32 _crc32; + private final CRC32 _crc32; private static final boolean DEBUG = false; public ResettableGZIPOutputStream(OutputStream o) { super(o, new Deflater(9, true)); - _headerWritten = false; _crc32 = new CRC32(); } + /** * Reinitialze everything so we can write a brand new gzip output stream * again. diff --git a/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java b/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java index f7665e2fc..0f82a47de 100644 --- a/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java +++ b/core/java/src/net/i2p/util/ReusableGZIPOutputStream.java @@ -1,6 +1,6 @@ package net.i2p.util; -import java.io.ByteArrayInputStream; +//import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.util.zip.Deflater; import java.util.zip.GZIPInputStream; @@ -9,9 +9,11 @@ import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.DataHelper; /** - * Provide a cache of reusable GZIP streams, each handling up to 32KB without + * Provide a cache of reusable GZIP streams, each handling up to 40 KB output without * expansion. * + * This compresses to memory only. Retrieve the compressed data with getData(). + * There is no facility to compress to an output stream. */ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream { // Apache Harmony 5.0M13 Deflater doesn't work after reset() @@ -50,10 +52,12 @@ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream { } private ByteArrayOutputStream _buffer = null; + private ReusableGZIPOutputStream() { super(new ByteArrayOutputStream(40*1024)); _buffer = (ByteArrayOutputStream)out; } + /** clear the data so we can start again afresh */ @Override public void reset() { @@ -61,9 +65,11 @@ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream { _buffer.reset(); def.setLevel(Deflater.BEST_COMPRESSION); } + public void setLevel(int level) { def.setLevel(level); } + /** pull the contents of the stream written */ public byte[] getData() { return _buffer.toByteArray(); }