diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index ed2b56e7af10467db26d5086c4dceafe7ce8bea0..d2eee7c89c72d0d51a0e36e1c1f3d0a0ec1ff3f0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -55,6 +55,7 @@ public class Connection { private String _connectionError; public static final long MAX_RESEND_DELAY = 60*1000; + public static final long MIN_RESEND_DELAY = 20*1000; public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); @@ -178,7 +179,7 @@ public class Connection { } packet.setFlag(Packet.FLAG_DELAY_REQUESTED); - long timeout = (_options.getRTT() < 10000 ? 10000 : _options.getRTT()); + long timeout = (_options.getRTT() < MIN_RESEND_DELAY ? MIN_RESEND_DELAY : _options.getRTT()); if (timeout > MAX_RESEND_DELAY) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) @@ -491,6 +492,8 @@ public class Connection { } else { //long timeout = _options.getResendDelay() << numSends; long timeout = _options.getRTT() << (numSends-1); + if (timeout < MIN_RESEND_DELAY) + timeout = MIN_RESEND_DELAY; if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) ) timeout = MAX_RESEND_DELAY; if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 1dc8747e0cc92d30093476ca06f41b57c445d4ce..ee014ccb182db2d075d838d8b378243cb0f46c7d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -40,8 +40,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if (_connection.getUnackedPacketsReceived() > 0) doSend = true; - if (_log.shouldLog(Log.ERROR) && !doSend) - _log.error("writeData called: size="+size + " doSend=" + doSend + if (_log.shouldLog(Log.INFO) && !doSend) + _log.info("writeData called: size="+size + " doSend=" + doSend + " unackedReceived: " + _connection.getUnackedPacketsReceived() + " con: " + _connection, new Exception("write called by")); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 4331034870e379685c90ff0d3673968e97f47b27..b87c5c0d2d8353293f7be5ff2d80f7922bea7984 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -53,6 +53,8 @@ public class MessageInputStream extends InputStream { private IOException _streamError; private long _readTotal; + private byte[] _oneByte = new byte[1]; + private Object _dataLock; public MessageInputStream(I2PAppContext ctx) { @@ -205,6 +207,7 @@ public class MessageInputStream extends InputStream { if (messageId <= _highestReadyBlockId) { if (_log.shouldLog(Log.DEBUG)) _log.debug("ignoring dup message " + messageId); + _dataLock.notifyAll(); return false; // already received } if (messageId > _highestBlockId) @@ -238,76 +241,118 @@ public class MessageInputStream extends InputStream { _notYetReadyBlocks.put(new Long(messageId), new ByteArray(null)); else _notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload)); + _dataLock.notifyAll(); } } return true; } public int read() throws IOException { + int read = read(_oneByte, 0, 1); + if (read < 0) + return -1; + else + return _oneByte[0]; + } + + public int read(byte target[]) throws IOException { + return read(target, 0, target.length); + } + + public int read(byte target[], int offset, int length) throws IOException { if (_locallyClosed) throw new IOException("Already locally closed"); throwAnyError(); long expiration = -1; if (_readTimeout > 0) expiration = _readTimeout + System.currentTimeMillis(); synchronized (_dataLock) { - while (_readyDataBlocks.size() <= 0) { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString()); - - if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() got EOF after " + _readTotal + " " + toString()); - return -1; + for (int i = 0; i < length; i++) { + if ( (_readyDataBlocks.size() <= 0) && (i == 0) ) { + // ok, we havent found anything, so lets block until we get + // at least one byte + + while (_readyDataBlocks.size() <= 0) { + if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { + if (_log.shouldLog(Log.INFO)) + _log.info("read(...," + offset + ", " + length + ")[" + i + + "] got EOF after " + _readTotal + " " + toString()); + return -1; + } else { + if (_readTimeout < 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read(...," + offset+", " + length+ ")[" + i + + ") with no timeout: " + toString()); + try { _dataLock.wait(); } catch (InterruptedException ie) { } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read(...," + offset+", " + length+ ")[" + i + + ") with no timeout complete: " + toString()); + throwAnyError(); + } else if (_readTimeout > 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read(...," + offset+", " + length+ ")[" + i + + ") with timeout: " + _readTimeout + ": " + toString()); + try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read(...," + offset+", " + length+ ")[" + i + + ") with timeout complete: " + _readTimeout + ": " + toString()); + throwAnyError(); + } else { // readTimeout == 0 + // noop, don't block + if (_log.shouldLog(Log.INFO)) + _log.info("read(...," + offset+", " + length+ ")[" + i + + ") with nonblocking setup: " + toString()); + return i; + } + if (_readyDataBlocks.size() <= 0) { + if ( (_readTimeout > 0) && (expiration < System.currentTimeMillis()) ) { + if (_log.shouldLog(Log.INFO)) + _log.info("read(...," + offset+", " + length+ ")[" + i + + ") expired: " + toString()); + return i; + } + } + } + } + // we looped a few times then got data, so this pass doesnt count + i--; + } else if (_readyDataBlocks.size() <= 0) { + if (_log.shouldLog(Log.INFO)) + _log.info("read(...," + offset+", " + length+ ")[" + i + + "] no more ready blocks, returning"); + return i; } else { - if (_readTimeout < 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() with no timeout: " + toString()); - try { _dataLock.wait(); } catch (InterruptedException ie) { } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() with no timeout complete: " + toString()); - throwAnyError(); - } else if (_readTimeout > 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() with timeout: " + _readTimeout + ": " + toString()); - try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() with timeout complete: " + _readTimeout + ": " + toString()); - throwAnyError(); - } else { // readTimeout == 0 - // noop, don't block - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() with nonblocking setup: " + toString()); + // either was already ready, or we wait()ed and it arrived + ByteArray cur = (ByteArray)_readyDataBlocks.get(0); + byte rv = cur.getData()[_readyDataBlockIndex]; + _readyDataBlockIndex++; + if (cur.getData().length <= _readyDataBlockIndex) { + _readyDataBlockIndex = 0; + _readyDataBlocks.remove(0); } - if (_readyDataBlocks.size() <= 0) { - if ( (_readTimeout > 0) && (expiration > System.currentTimeMillis()) ) - throw new InterruptedIOException("Timeout reading (timeout=" + _readTimeout + ")"); + _readTotal++; + target[offset + i] = rv; // rv < 0 ? rv + 256 : rv + if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getData().length - 5) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read(...," + offset+", " + length+ ")[" + i + + "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex + + " readyBlocks=" + _readyDataBlocks.size() + + " readTotal=" + _readTotal); } } - } - - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString()); - - // either was already ready, or we wait()ed and it arrived - ByteArray cur = (ByteArray)_readyDataBlocks.get(0); - byte rv = cur.getData()[_readyDataBlockIndex]; - _readyDataBlockIndex++; - if (cur.getData().length <= _readyDataBlockIndex) { - _readyDataBlockIndex = 0; - _readyDataBlocks.remove(0); - } - _readTotal++; - return (rv < 0 ? rv + 256 : rv); - } + } // for (int i = 0; i < length; i++) { + } // synchronized (_dataLock) + + if (_log.shouldLog(Log.DEBUG)) + _log.info("read(...," + offset+", " + length+ ") read fully total read: " +_readTotal); + + return length; } public int available() throws IOException { if (_locallyClosed) throw new IOException("Already closed, you wanker"); throwAnyError(); + int numBytes = 0; synchronized (_dataLock) { - if (_readyDataBlocks.size() <= 0) - return 0; - int numBytes = 0; for (int i = 0; i < _readyDataBlocks.size(); i++) { ByteArray cur = (ByteArray)_readyDataBlocks.get(i); if (i == 0) @@ -315,8 +360,11 @@ public class MessageInputStream extends InputStream { else numBytes += cur.getData().length; } - return numBytes; } + if (_log.shouldLog(Log.DEBUG)) + _log.info("available(): " + numBytes + " " + toString()); + + return numBytes; } /**