diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 275da2725..2c22b6371 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -67,10 +67,14 @@ class ConnectionHandler { */ public void receiveNewSyn(Packet packet) { if (!_active) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropping new SYN request, as we're not listening"); - if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping new SYN request, as we're not listening"); sendReset(packet); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping non-SYN packet - not listening"); + } return; } if (_log.shouldLog(Log.INFO)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 5767ec034..27a90bab2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -72,13 +72,16 @@ class MessageInputStream extends InputStream { } /** What is the highest block ID we've completely received through? - * @return highest data block ID completely received + * @return highest data block ID completely received or -1 for none */ public long getHighestReadyBockId() { // not synchronized as it doesnt hurt to read a too-low value return _highestReadyBlockId; } + /** + * @return highest data block ID received or -1 for none + */ public long getHighestBlockId() { // not synchronized as it doesnt hurt to read a too-low value return _highestBlockId; @@ -168,12 +171,18 @@ class MessageInputStream extends InputStream { * @return how long read calls should block, 0 or less indefinitely block */ public int getReadTimeout() { return _readTimeout; } + public void setReadTimeout(int timeout) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout); _readTimeout = timeout; } + /** + * There is no more data coming from the I2P side. + * Does NOT clear pending data. + * messageReceived() MUST have been called previously with the messageId of the CLOSE packet. + */ public void closeReceived() { synchronized (_dataLock) { if (_log.shouldLog(Log.DEBUG)) { @@ -211,9 +220,10 @@ class MessageInputStream extends InputStream { /** * A new message has arrived - toss it on the appropriate queue (moving * previously pending messages to the ready queue if it fills the gap, etc). + * This does no limiting of pending data - it must be limited in ConnectionPacketHandler. * * @param messageId ID of the message - * @param payload message payload + * @param payload message payload, may be null or have null or zero-length data * @return true if this is a new packet, false if it is a dup */ public boolean messageReceived(long messageId, ByteArray payload) { @@ -221,8 +231,8 @@ class MessageInputStream extends InputStream { _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); synchronized (_dataLock) { if (messageId <= _highestReadyBlockId) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("ignoring dup message " + messageId); + if (_log.shouldLog(Log.INFO)) + _log.info("ignoring dup message " + messageId); _dataLock.notifyAll(); return false; // already received } @@ -250,8 +260,9 @@ class MessageInputStream extends InputStream { _highestReadyBlockId++; } } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("message is out of order: " + messageId); + if (_log.shouldLog(Log.INFO)) + _log.info("Message is out of order: " + messageId); + // _notYetReadyBlocks size is limited in ConnectionPacketHandler. if (_locallyClosed) // dont need the payload, just the msgId in order _notYetReadyBlocks.put(Long.valueOf(messageId), new ByteArray(null)); else @@ -269,12 +280,12 @@ class MessageInputStream extends InputStream { return _oneByte[0] & 0xff; } - @Override + @Override public int read(byte target[]) throws IOException { return read(target, 0, target.length); } - @Override + @Override public int read(byte target[], int offset, int length) throws IOException { if (_locallyClosed) throw new IOException("Already locally closed"); throwAnyError(); @@ -369,7 +380,7 @@ class MessageInputStream extends InputStream { return length; } - @Override + @Override public int available() throws IOException { if (_locallyClosed) throw new IOException("Already closed"); throwAnyError(); @@ -433,7 +444,7 @@ class MessageInputStream extends InputStream { } } - @Override + @Override public void close() { synchronized (_dataLock) { //while (_readyDataBlocks.size() > 0) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 0a641526b..862d65b3b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -688,7 +688,10 @@ class Packet { StringBuilder buf = new StringBuilder(64); buf.append(toId(_sendStreamId)); //buf.append("<-->"); - buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum); + buf.append(toId(_receiveStreamId)).append(':'); + if (_sequenceNum != 0 || isFlagSet(FLAG_SYNCHRONIZE)) + buf.append(" #").append(_sequenceNum); + // else an ack-only packet //if (_sequenceNum < 10) // buf.append(" \t"); // so the tab lines up right //else