forked from I2P_Developers/i2p.i2p
SSU2: Hook in IMF Bloom filter to detect dups
Call messagePartiallyReceived() even for dups or expired messages so an ack is generated.
This commit is contained in:
@@ -83,6 +83,32 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This message was received.
|
||||
* SSU 2 only.
|
||||
* No stats updated here, caller should handle stats.
|
||||
*
|
||||
* @return true if this message was a duplicate
|
||||
* @since 0.9.54
|
||||
*/
|
||||
|
||||
public boolean messageReceived(long messageID) {
|
||||
return _recentlyCompletedMessages.add(messageID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Was this message recently received?
|
||||
* SSU 2 only.
|
||||
* No stats updated here, caller should handle stats.
|
||||
*
|
||||
* @return true if this message was recently received.
|
||||
* @since 0.9.54
|
||||
*/
|
||||
|
||||
public boolean wasRecentlyReceived(long messageID) {
|
||||
return _recentlyCompletedMessages.isKnown(messageID);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pull the fragments and ACKs out of the authenticated data packet
|
||||
*/
|
||||
|
||||
@@ -435,7 +435,14 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
_log.debug("Got I2NP block: " + msg);
|
||||
// 9 byte header
|
||||
int size = msg.getMessageSize() - 7;
|
||||
messageFullyReceived(msg.getUniqueId(), size);
|
||||
long messageId = msg.getUniqueId();
|
||||
messageFullyReceived(messageId, size);
|
||||
if (_transport.getInboundFragments().messageReceived(messageId)) {
|
||||
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got dup msg: " + messageId + " on " + this);
|
||||
return;
|
||||
}
|
||||
// complete message, skip IMF and MessageReceiver
|
||||
_transport.messageReceived(msg, null, _remotePeer, 0, size);
|
||||
}
|
||||
@@ -446,13 +453,20 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
InboundMessageState state;
|
||||
boolean messageComplete = false;
|
||||
boolean messageExpired = false;
|
||||
boolean messageDup = false;
|
||||
boolean messageDup;
|
||||
|
||||
synchronized (_inboundMessages) {
|
||||
state = _inboundMessages.get(messageId);
|
||||
if (state == null) {
|
||||
state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast);
|
||||
_inboundMessages.put(messageId, state);
|
||||
// Bloom filter in router will catch it, but use IMF's Bloom filter
|
||||
// to save resources here
|
||||
messageDup = _transport.getInboundFragments().wasRecentlyReceived(messageId);
|
||||
if (messageDup) {
|
||||
state = null;
|
||||
} else {
|
||||
state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast);
|
||||
_inboundMessages.put(messageId, state);
|
||||
}
|
||||
} else {
|
||||
messageDup = state.hasFragment(frag);
|
||||
if (!messageDup) {
|
||||
@@ -471,22 +485,38 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
}
|
||||
|
||||
if (messageDup) {
|
||||
messagePartiallyReceived();
|
||||
// Only update stats for the first fragment,
|
||||
// otherwise it wildly overstates things
|
||||
if (frag == 0)
|
||||
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
||||
synchronized(this) {
|
||||
_packetsReceivedDuplicate++;
|
||||
}
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("dup fragment rcvd: " + frag + " for " + state);
|
||||
if (_log.shouldInfo()) {
|
||||
if (state != null)
|
||||
_log.info("dup fragment rcvd: " + frag + " for " + state);
|
||||
else
|
||||
_log.info("dup fragment rcvd: " + messageId + ' ' + frag + " on " + this);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (messageComplete) {
|
||||
messageFullyReceived(messageId, state.getCompleteSize());
|
||||
if (_transport.getInboundFragments().messageReceived(messageId)) {
|
||||
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1);
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got dup msg: " + messageId + " on " + this);
|
||||
return;
|
||||
}
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Message received completely! " + state);
|
||||
_context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime());
|
||||
_context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime());
|
||||
receiveMessage(state);
|
||||
} else if (messageExpired) {
|
||||
messagePartiallyReceived();
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Message expired while only being partially read: " + state);
|
||||
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
|
||||
|
||||
@@ -3463,6 +3463,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
return _testManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* @since 0.9.54
|
||||
*/
|
||||
InboundMessageFragments getInboundFragments() {
|
||||
return _inboundFragments;
|
||||
}
|
||||
|
||||
/**
|
||||
* Does nothing
|
||||
* @deprecated as of 0.9.31
|
||||
|
||||
Reference in New Issue
Block a user