diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java index 72b2e0251..b8889c972 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageOutputStream.java @@ -30,7 +30,9 @@ class MessageOutputStream extends OutputStream { private final AtomicBoolean _closed = new AtomicBoolean(); private long _written; private int _writeTimeout; - private ByteCache _dataCache; + private final ByteCache _dataCache; + private final int _originalBufferSize; + private int _currentBufferSize; private final Flusher _flusher; private volatile long _lastBuffered; /** if we enqueue data but don't flush it in this period, flush it passively */ @@ -68,6 +70,8 @@ class MessageOutputStream extends OutputStream { DataReceiver receiver, int bufSize, int passiveFlushDelay) { super(); _dataCache = ByteCache.getInstance(128, bufSize); + _originalBufferSize = bufSize; + _currentBufferSize = bufSize; _context = ctx; _log = ctx.logManager().getLog(MessageOutputStream.class); _buf = _dataCache.acquire().getData(); // new byte[bufSize]; @@ -75,7 +79,7 @@ class MessageOutputStream extends OutputStream { _dataLock = new Object(); _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; - _nextBufferSize = -1; + _nextBufferSize = 0; //_sendPeriodBeginTime = ctx.clock().now(); //_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(timer); @@ -92,7 +96,16 @@ class MessageOutputStream extends OutputStream { public int getWriteTimeout() { return _writeTimeout; } - public void setBufferSize(int size) { _nextBufferSize = size; } + /** + * Caller should enforce a sane minimum. + * + * @param size must be greater than 0, and smaller than or equal to bufSize in constructor + */ + public void setBufferSize(int size) { + if (size <= 0 || size > _originalBufferSize) + return; + _nextBufferSize = size; + } @Override public void write(byte b[]) throws IOException { @@ -115,8 +128,11 @@ class MessageOutputStream extends OutputStream { // this is the only method that *adds* to the _buf, and all // code that reads from it is synchronized synchronized (_dataLock) { + // To simplify the code, and avoid losing data from shrinking the max size, + // we only update max size when current buffer is empty + final int maxBuffer = (_valid == 0) ? locked_updateBufferSize() : _currentBufferSize; if (_buf == null) throw new IOException("closed (buffer went away)"); - if (_valid + remaining < _buf.length) { + if (_valid + remaining < maxBuffer) { // simply buffer the data, no flush System.arraycopy(b, cur, _buf, _valid, remaining); _valid += remaining; @@ -131,19 +147,17 @@ class MessageOutputStream extends OutputStream { // buffer whatever we can fit then flush, // repeating until we've pushed all of the // data through - int toWrite = _buf.length - _valid; + int toWrite = maxBuffer - _valid; System.arraycopy(b, cur, _buf, _valid, toWrite); remaining -= toWrite; cur += toWrite; - _valid = _buf.length; + _valid = maxBuffer; if (_log.shouldLog(Log.INFO)) _log.info("write() direct valid = " + _valid); ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; throwAnyError(); - - locked_updateBufferSize(); } } if (ws != null) { @@ -207,17 +221,21 @@ class MessageOutputStream extends OutputStream { /** * If the other side requested we shrink our buffer, do so. * + * @return the current buffer size */ - private final void locked_updateBufferSize() { + private final int locked_updateBufferSize() { int size = _nextBufferSize; if (size > 0) { // update the buffer size to the requested amount - _dataCache.release(new ByteArray(_buf)); - _dataCache = ByteCache.getInstance(128, size); - ByteArray ba = _dataCache.acquire(); - _buf = ba.getData(); - _nextBufferSize = -1; + // No, never do this, to avoid ByteCache churn. + //_dataCache.release(new ByteArray(_buf)); + //_dataCache = ByteCache.getInstance(128, size); + //ByteArray ba = _dataCache.acquire(); + //_buf = ba.getData(); + _currentBufferSize = size; + _nextBufferSize = 0; } + return _currentBufferSize; } /** @@ -273,7 +291,6 @@ class MessageOutputStream extends OutputStream { ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; - locked_updateBufferSize(); _dataLock.notifyAll(); sent = true; } @@ -336,7 +353,6 @@ class MessageOutputStream extends OutputStream { ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; - locked_updateBufferSize(); _dataLock.notifyAll(); } } @@ -409,7 +425,6 @@ class MessageOutputStream extends OutputStream { ba = new ByteArray(_buf); _buf = null; _valid = 0; - locked_updateBufferSize(); } _dataLock.notifyAll(); } @@ -494,7 +509,6 @@ class MessageOutputStream extends OutputStream { ws = target.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; - locked_updateBufferSize(); _dataLock.notifyAll(); } long afterBuild = System.currentTimeMillis(); diff --git a/history.txt b/history.txt index de72cc384..2a136d72a 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2017-03-09 zzz + * i2psnark: Prevent RuntimeException caused by corrupt i2psnark.dht.dat file + * Router: Set default sig type to EdDSA for non-Android ARM + * Streaming: Don't change buffer size when max message size is adjusted + 2017-03-06 zzz * CPUID: - Fix saving of libjcpuid.jnifile on Macs, diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 282c18b42..0725033fa 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 2; + public final static long BUILD = 3; /** for example "-test" */ public final static String EXTRA = "";