* i2ptunnel HTTPResponseOutputStream: Use reusable gunzipper

and a larger pipe for efficiency
This commit is contained in:
zzz
2011-09-19 14:13:24 +00:00
parent 49eeed6ac8
commit f186076fb0
7 changed files with 153 additions and 92 deletions

View File

@@ -20,9 +20,11 @@ import java.util.concurrent.RejectedExecutionException;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.util.BigPipedInputStream;
import net.i2p.util.ByteCache;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
import net.i2p.util.ReusableGZIPInputStream;
/**
* This does the transparent gzip decompression on the client side.
@@ -44,7 +46,6 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private final byte _buf1[];
protected boolean _gzip;
private long _dataWritten;
private InternalGZIPInputStream _in;
private static final int CACHE_SIZE = 8*1024;
private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
// OOM DOS prevention
@@ -227,7 +228,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
protected void beginProcessing() throws IOException {
//out.flush();
PipedInputStream pi = new PipedInputStream();
PipedInputStream pi = BigPipedInputStream.getInstance();
PipedOutputStream po = new PipedOutputStream(pi);
// Run in the client thread pool, as there should be an unused thread
// there after the accept().
@@ -242,25 +243,29 @@ class HTTPResponseOutputStream extends FilterOutputStream {
}
private class Pusher implements Runnable {
private InputStream _inRaw;
private OutputStream _out;
private final InputStream _inRaw;
private final OutputStream _out;
public Pusher(InputStream in, OutputStream out) {
_inRaw = in;
_out = out;
}
public void run() {
_in = null;
ReusableGZIPInputStream _in = null;
long written = 0;
ByteArray ba = null;
try {
_in = new InternalGZIPInputStream(_inRaw);
_in = ReusableGZIPInputStream.acquire();
// blocking
_in.initialize(_inRaw);
ba = _cache.acquire();
byte buf[] = ba.getData();
int read = -1;
while ( (read = _in.read(buf)) != -1) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Read " + read + " and writing it to the browser/streams");
_out.write(buf, 0, read);
; _out.write(buf, 0, read);
_out.flush();
written += read;
}
@@ -286,68 +291,21 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} catch (IOException ioe) {}
}
double compressed = (_in != null ? _in.getTotalRead() : 0);
double expanded = (_in != null ? _in.getTotalExpanded() : 0);
if (compressed > 0 && expanded > 0) {
// only update the stats if we did something
double ratio = compressed/expanded;
_context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0);
_context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0);
_context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0);
if (_in != null) {
double compressed = _in.getTotalRead();
double expanded = _in.getTotalExpanded();
ReusableGZIPInputStream.release(_in);
if (compressed > 0 && expanded > 0) {
// only update the stats if we did something
double ratio = compressed/expanded;
_context.statManager().addRateData("i2ptunnel.httpCompressionRatio", (int)(100d*ratio), 0);
_context.statManager().addRateData("i2ptunnel.httpCompressed", (long)compressed, 0);
_context.statManager().addRateData("i2ptunnel.httpExpanded", (long)expanded, 0);
}
}
}
}
/** just a wrapper to provide stats for debugging */
private static class InternalGZIPInputStream extends GZIPInputStream {
public InternalGZIPInputStream(InputStream in) throws IOException {
super(in);
}
public long getTotalRead() {
try {
return super.inf.getTotalIn();
} catch (Exception e) {
return 0;
}
}
public long getTotalExpanded() {
try {
return super.inf.getTotalOut();
} catch (Exception e) {
return 0;
}
}
/**
* From Inflater javadoc:
* Returns the total number of bytes remaining in the input buffer. This can be used to find out
* what bytes still remain in the input buffer after decompression has finished.
*/
public long getRemaining() {
try {
return super.inf.getRemaining();
} catch (Exception e) {
return 0;
}
}
public boolean getFinished() {
try {
return super.inf.finished();
} catch (Exception e) {
return true;
}
}
@Override
public String toString() {
return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished();
}
}
@Override
public String toString() {
return super.toString() + ": " + _in;
}
/*******
public static void main(String args[]) {
String simple = "HTTP/1.1 200 OK\n" +

View File

@@ -0,0 +1,44 @@
package net.i2p.util;
import java.io.PipedInputStream;
/**
* Java 1.5 PipedInputStream buffers are only 1024 bytes; our I2CP messages are typically 1730 bytes,
* thus causing thread blockage before the whole message is transferred.
* We can specify buffer size in 1.6 but not in 1.5.
*
* Until we switch to Java 1.6 -
* http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/
*
* Moved from InternalServerSocket.
* @since 0.8.9
*/
public class BigPipedInputStream extends PipedInputStream {
private static final boolean oneDotSix =
(new VersionComparator()).compare(System.getProperty("java.version"), "1.6") >= 0;
private static final int PIPE_SIZE = 64*1024;
private BigPipedInputStream(int size) {
super();
buffer = new byte[size];
}
/** default size 64K */
public static PipedInputStream getInstance() {
return getInstance(PIPE_SIZE);
}
public static PipedInputStream getInstance(int size) {
if (oneDotSix) {
try {
return new PipedInputStream(size);
} catch (Throwable t) {
// NoSuchMethodException or NoSuchMethodError if we somehow got the
// version detection wrong or the JVM doesn't support it
}
}
return new BigPipedInputStream(size);
}
}

View File

@@ -99,8 +99,8 @@ public class InternalServerSocket extends ServerSocket {
InternalServerSocket iss = _sockets.get(Integer.valueOf(port));
if (iss == null)
throw new IOException("No server for port: " + port);
PipedInputStream cis = new BigPipedInputStream();
PipedInputStream sis = new BigPipedInputStream();
PipedInputStream cis = BigPipedInputStream.getInstance();
PipedInputStream sis = BigPipedInputStream.getInstance();
PipedOutputStream cos = new PipedOutputStream(sis);
PipedOutputStream sos = new PipedOutputStream(cis);
clientSock.setInputStream(cis);
@@ -108,18 +108,6 @@ public class InternalServerSocket extends ServerSocket {
iss.queueConnection(new InternalSocket(sis, sos));
}
/**
* Until we switch to Java 1.6
* http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/
*/
private static class BigPipedInputStream extends PipedInputStream {
protected static int PIPE_SIZE = 64*1024;
public BigPipedInputStream() {
super();
buffer = new byte[PIPE_SIZE];
}
}
private void queueConnection(InternalSocket sock) throws IOException {
if (!_running)
throw new IOException("Server closed for port: " + _port);

View File

@@ -14,17 +14,17 @@ import java.util.Arrays;
*/
public class LookaheadInputStream extends FilterInputStream {
private boolean _eofReached;
private byte[] _footerLookahead;
private final byte[] _footerLookahead;
private static final InputStream _fakeInputStream = new ByteArrayInputStream(new byte[0]);
public LookaheadInputStream(int lookaheadSize) {
super(_fakeInputStream);
_eofReached = false;
_footerLookahead = new byte[lookaheadSize];
}
public boolean getEOFReached() { return _eofReached; }
/** blocking! */
public void initialize(InputStream src) throws IOException {
in = src;
_eofReached = false;
@@ -35,7 +35,6 @@ public class LookaheadInputStream extends FilterInputStream {
if (read == -1) throw new IOException("EOF reading the footer lookahead");
footerRead += read;
}
boolean f = true;
}
@Override
@@ -53,10 +52,12 @@ public class LookaheadInputStream extends FilterInputStream {
if (rv < 0) rv += 256;
return rv;
}
@Override
public int read(byte buf[]) throws IOException {
return read(buf, 0, buf.length);
}
@Override
public int read(byte buf[], int off, int len) throws IOException {
if (_eofReached)

View File

@@ -20,9 +20,9 @@ public class ResettableGZIPInputStream extends InflaterInputStream {
private static final int FOOTER_SIZE = 8; // CRC32 + ISIZE
private static final boolean DEBUG = false;
/** keep a typesafe copy of (LookaheadInputStream)in */
private LookaheadInputStream _lookaheadStream;
private CRC32 _crc32;
private byte _buf1[] = new byte[1];
private final LookaheadInputStream _lookaheadStream;
private final CRC32 _crc32;
private final byte _buf1[] = new byte[1];
private boolean _complete;
/**
@@ -34,8 +34,11 @@ public class ResettableGZIPInputStream extends InflaterInputStream {
super(new LookaheadInputStream(FOOTER_SIZE), new Inflater(true));
_lookaheadStream = (LookaheadInputStream)in;
_crc32 = new CRC32();
_complete = false;
}
/**
* Warning - blocking!
*/
public ResettableGZIPInputStream(InputStream compressedStream) throws IOException {
this();
initialize(compressedStream);
@@ -78,6 +81,7 @@ public class ResettableGZIPInputStream extends InflaterInputStream {
public int read(byte buf[]) throws IOException {
return read(buf, 0, buf.length);
}
@Override
public int read(byte buf[], int off, int len) throws IOException {
if (_complete) {
@@ -100,9 +104,69 @@ public class ResettableGZIPInputStream extends InflaterInputStream {
}
}
long getCurrentCRCVal() { return _crc32.getValue(); }
/**
* Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
* @since 0.8.9
*/
public long getTotalRead() {
try {
return inf.getBytesRead();
} catch (Exception e) {
return 0;
}
}
/**
* Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
* @since 0.8.9
*/
public long getTotalExpanded() {
try {
return inf.getBytesWritten();
} catch (Exception e) {
// possible NPE in some implementations
return 0;
}
}
/**
* Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
* @since 0.8.9
*/
public long getRemaining() {
try {
return inf.getRemaining();
} catch (Exception e) {
// possible NPE in some implementations
return 0;
}
}
/**
* Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
* @since 0.8.9
*/
public boolean getFinished() {
try {
return inf.finished();
} catch (Exception e) {
// possible NPE in some implementations
return true;
}
}
/**
* Moved from i2ptunnel HTTPResponseOutputStream.InternalGZIPInputStream
* @since 0.8.9
*/
@Override
public String toString() {
return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished();
}
private long getCurrentCRCVal() { return _crc32.getValue(); }
void verifyFooter() throws IOException {
private void verifyFooter() throws IOException {
byte footer[] = _lookaheadStream.getFooter();
long expectedCRCVal = _crc32.getValue();

View File

@@ -26,14 +26,14 @@ public class ResettableGZIPOutputStream extends DeflaterOutputStream {
private boolean _headerWritten;
/** how much data is in the uncompressed stream? */
private long _writtenSize;
private CRC32 _crc32;
private final CRC32 _crc32;
private static final boolean DEBUG = false;
public ResettableGZIPOutputStream(OutputStream o) {
super(o, new Deflater(9, true));
_headerWritten = false;
_crc32 = new CRC32();
}
/**
* Reinitialze everything so we can write a brand new gzip output stream
* again.

View File

@@ -1,6 +1,6 @@
package net.i2p.util;
import java.io.ByteArrayInputStream;
//import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.zip.Deflater;
import java.util.zip.GZIPInputStream;
@@ -9,9 +9,11 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.DataHelper;
/**
* Provide a cache of reusable GZIP streams, each handling up to 32KB without
* Provide a cache of reusable GZIP streams, each handling up to 40 KB output without
* expansion.
*
* This compresses to memory only. Retrieve the compressed data with getData().
* There is no facility to compress to an output stream.
*/
public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream {
// Apache Harmony 5.0M13 Deflater doesn't work after reset()
@@ -50,10 +52,12 @@ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream {
}
private ByteArrayOutputStream _buffer = null;
private ReusableGZIPOutputStream() {
super(new ByteArrayOutputStream(40*1024));
_buffer = (ByteArrayOutputStream)out;
}
/** clear the data so we can start again afresh */
@Override
public void reset() {
@@ -61,9 +65,11 @@ public class ReusableGZIPOutputStream extends ResettableGZIPOutputStream {
_buffer.reset();
def.setLevel(Deflater.BEST_COMPRESSION);
}
public void setLevel(int level) {
def.setLevel(level);
}
/** pull the contents of the stream written */
public byte[] getData() { return _buffer.toByteArray(); }