From e9de0a14b98b67bf3df01afc7e3646eca0acc112 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 2 Dec 2017 22:32:03 +0000 Subject: [PATCH] Streaming: Send reset when receiving more data after locally closed, rather than acking (ticket #2071) --- .../impl/ConnectionPacketHandler.java | 21 +++++++++++----- .../streaming/impl/MessageInputStream.java | 25 ++++++++++++++++--- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java index 1c82188828..36f15899e5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/ConnectionPacketHandler.java @@ -75,6 +75,7 @@ class ConnectionPacketHandler { + packet + " on " + con + ""); // this is fine, half-close // Major bug before 0.9.9, packets were dropped here and a reset sent + // If we are fully closed, will handle that in the canAccept test below } if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) { @@ -108,12 +109,20 @@ class ConnectionPacketHandler { } if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) { - if (_log.shouldWarn()) - _log.warn("Inbound buffer exceeded on connection " + con + - ", choking and dropping " + packet); - // this will call ackImmediately() - con.setChoking(true); - // TODO we could still process the acks for this packet before discarding + if (con.getInputStream().isLocallyClosed()) { + if (_log.shouldWarn()) + _log.warn("More data received after local close on connection " + con + + ", sending reset and dropping " + packet); + // the following will send a RESET + con.disconnect(false); + } else { + if (_log.shouldWarn()) + _log.warn("Inbound buffer exceeded on connection " + con + + ", choking and dropping " + packet); + // this will call ackImmediately() + con.setChoking(true); + // TODO we could still process the acks for this packet before discarding + } packet.releasePayload(); return; } // else we will call setChoking(false) below diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index 858f59157e..31875a4f3e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -100,9 +100,22 @@ class MessageInputStream extends InputStream { } } + /** + * @return true if this has been closed on the read side with close() + */ + public boolean isLocallyClosed() { + synchronized (_dataLock) { + return _locallyClosed; + } + } + /** * Determine if this packet will fit in our buffering limits. - * Always returns true for zero payloadSize. + * + * Always returns true for zero payloadSize and dups, even if locally closed. + * Returns false if there is no room, OR it's not a dup and the stream has been closed on + * the read side with close(). + * If this returns false, you probably want to call isLocallyClosed() to find out why. * * @return true if we have room. If false, do not call messageReceived() * @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock, @@ -114,13 +127,15 @@ class MessageInputStream extends InputStream { if (messageId < MIN_READY_BUFFERS) return true; synchronized (_dataLock) { - // always accept if closed, will be processed elsewhere - if (_locallyClosed) - return true; // ready dup check // we always allow sequence numbers less than or equal to highest received if (messageId <= _highestReadyBlockId) return true; + // We do this after the above dup check. + if (_locallyClosed) { + // return true if a not-ready dup, false if not + return _notYetReadyBlocks.containsKey(Long.valueOf(messageId)); + } // shortcut test, assuming all ready and not ready blocks are max size, // to avoid iterating through all the ready blocks in getTotalReadySize() if ((_readyDataBlocks.size() + _notYetReadyBlocks.size()) * _maxMessageSize < _maxBufferSize) @@ -297,6 +312,8 @@ class MessageInputStream extends InputStream { * previously pending messages to the ready queue if it fills the gap, etc). * This does no limiting of pending data - see canAccept() for limiting. * + * Warning - returns true if locally closed. + * * @param messageId ID of the message * @param payload message payload, may be null or have null or zero-length data * @return true if this is a new packet, false if it is a dup -- GitLab