I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit e9de0a14 authored by zzz's avatar zzz
Browse files

Streaming: Send reset when receiving more data after locally closed,

rather than acking (ticket #2071)
parent 5a3f0163
No related branches found
No related tags found
No related merge requests found
...@@ -75,6 +75,7 @@ class ConnectionPacketHandler { ...@@ -75,6 +75,7 @@ class ConnectionPacketHandler {
+ packet + " on " + con + ""); + packet + " on " + con + "");
// this is fine, half-close // this is fine, half-close
// Major bug before 0.9.9, packets were dropped here and a reset sent // Major bug before 0.9.9, packets were dropped here and a reset sent
// If we are fully closed, will handle that in the canAccept test below
} }
if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) { if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) {
...@@ -108,12 +109,20 @@ class ConnectionPacketHandler { ...@@ -108,12 +109,20 @@ class ConnectionPacketHandler {
} }
if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) { if (!con.getInputStream().canAccept(seqNum, packet.getPayloadSize())) {
if (_log.shouldWarn()) if (con.getInputStream().isLocallyClosed()) {
_log.warn("Inbound buffer exceeded on connection " + con + if (_log.shouldWarn())
", choking and dropping " + packet); _log.warn("More data received after local close on connection " + con +
// this will call ackImmediately() ", sending reset and dropping " + packet);
con.setChoking(true); // the following will send a RESET
// TODO we could still process the acks for this packet before discarding con.disconnect(false);
} else {
if (_log.shouldWarn())
_log.warn("Inbound buffer exceeded on connection " + con +
", choking and dropping " + packet);
// this will call ackImmediately()
con.setChoking(true);
// TODO we could still process the acks for this packet before discarding
}
packet.releasePayload(); packet.releasePayload();
return; return;
} // else we will call setChoking(false) below } // else we will call setChoking(false) below
......
...@@ -100,9 +100,22 @@ class MessageInputStream extends InputStream { ...@@ -100,9 +100,22 @@ class MessageInputStream extends InputStream {
} }
} }
/**
* @return true if this has been closed on the read side with close()
*/
public boolean isLocallyClosed() {
synchronized (_dataLock) {
return _locallyClosed;
}
}
/** /**
* Determine if this packet will fit in our buffering limits. * Determine if this packet will fit in our buffering limits.
* Always returns true for zero payloadSize. *
* Always returns true for zero payloadSize and dups, even if locally closed.
* Returns false if there is no room, OR it's not a dup and the stream has been closed on
* the read side with close().
* If this returns false, you probably want to call isLocallyClosed() to find out why.
* *
* @return true if we have room. If false, do not call messageReceived() * @return true if we have room. If false, do not call messageReceived()
* @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock, * @since 0.9.20 moved from ConnectionPacketHandler.receivePacket() so it can all be under one lock,
...@@ -114,13 +127,15 @@ class MessageInputStream extends InputStream { ...@@ -114,13 +127,15 @@ class MessageInputStream extends InputStream {
if (messageId < MIN_READY_BUFFERS) if (messageId < MIN_READY_BUFFERS)
return true; return true;
synchronized (_dataLock) { synchronized (_dataLock) {
// always accept if closed, will be processed elsewhere
if (_locallyClosed)
return true;
// ready dup check // ready dup check
// we always allow sequence numbers less than or equal to highest received // we always allow sequence numbers less than or equal to highest received
if (messageId <= _highestReadyBlockId) if (messageId <= _highestReadyBlockId)
return true; return true;
// We do this after the above dup check.
if (_locallyClosed) {
// return true if a not-ready dup, false if not
return _notYetReadyBlocks.containsKey(Long.valueOf(messageId));
}
// shortcut test, assuming all ready and not ready blocks are max size, // shortcut test, assuming all ready and not ready blocks are max size,
// to avoid iterating through all the ready blocks in getTotalReadySize() // to avoid iterating through all the ready blocks in getTotalReadySize()
if ((_readyDataBlocks.size() + _notYetReadyBlocks.size()) * _maxMessageSize < _maxBufferSize) if ((_readyDataBlocks.size() + _notYetReadyBlocks.size()) * _maxMessageSize < _maxBufferSize)
...@@ -297,6 +312,8 @@ class MessageInputStream extends InputStream { ...@@ -297,6 +312,8 @@ class MessageInputStream extends InputStream {
* previously pending messages to the ready queue if it fills the gap, etc). * previously pending messages to the ready queue if it fills the gap, etc).
* This does no limiting of pending data - see canAccept() for limiting. * This does no limiting of pending data - see canAccept() for limiting.
* *
* Warning - returns true if locally closed.
*
* @param messageId ID of the message * @param messageId ID of the message
* @param payload message payload, may be null or have null or zero-length data * @param payload message payload, may be null or have null or zero-length data
* @return true if this is a new packet, false if it is a dup * @return true if this is a new packet, false if it is a dup
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment