diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 5e6bcaf47f98231ea939904698a3b24f22dbdd78..e883a3b6f9ae1201c292cd7b9020b179a46f8521 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -9,6 +9,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; +import net.i2p.util.Clock; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -414,20 +415,27 @@ class I2PSocketImpl implements I2PSocket { public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); + Clock clock = I2PAppContext.getGlobalContext().clock(); + long endAfter = clock.now() + _options.getWriteTimeout(); synchronized (bc) { if (_options.getMaxBufferSize() > 0) { - int waited = 0; - while (bc.getCurrentSize() + len > _options.getMaxBufferSize()) { + while (bc.getCurrentSize() > _options.getMaxBufferSize()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize()); - if ( (_options.getWriteTimeout() > 0) && (waited > _options.getWriteTimeout()) ) { - throw new InterruptedIOException("Waited " + waited + "ms to write " + len + " with a buffer at " + bc.getCurrentSize()); + if (_options.getWriteTimeout() > 0) { + long timeLeft = endAfter - clock.now(); + if (timeLeft <= 0) { + long waited = _options.getWriteTimeout() - timeLeft; + throw new InterruptedIOException("Waited too long (" + waited + "ms) to write " + + len + " with a buffer at " + bc.getCurrentSize()); + } } if (inStreamClosed) throw new IOException("Stream closed while writing"); + if (_closedOn > 0) + throw new IOException("I2PSocket closed while writing"); try { bc.wait(1000); - waited += 1000; } catch (InterruptedException ie) {} } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 91e71003fec8b97819e2c1efd9f0097e17f2276b..f58c9f687758a69aa1a80af16b77e37dd367b36f 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -10,12 +10,13 @@ public class I2PSocketOptions { private long _writeTimeout; private int _maxBufferSize; - public static final int DEFAULT_BUFFER_SIZE = 1024*128; + public static final int DEFAULT_BUFFER_SIZE = 1024*64; + public static final int DEFAULT_WRITE_TIMEOUT = 60*1000; public I2PSocketOptions() { _connectTimeout = -1; _readTimeout = -1; - _writeTimeout = -1; + _writeTimeout = DEFAULT_WRITE_TIMEOUT; _maxBufferSize = DEFAULT_BUFFER_SIZE; }