diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 90a945a7d167f6466d4646fe3ec97dcdb313649a..1a2394bdb69e035cffcf78d8de98e93eb0959fd4 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -83,6 +83,32 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{ } } + /** + * This message was received. + * SSU 2 only. + * No stats updated here, caller should handle stats. + * + * @return true if this message was a duplicate + * @since 0.9.54 + */ + + public boolean messageReceived(long messageID) { + return _recentlyCompletedMessages.add(messageID); + } + + /** + * Was this message recently received? + * SSU 2 only. + * No stats updated here, caller should handle stats. + * + * @return true if this message was recently received. + * @since 0.9.54 + */ + + public boolean wasRecentlyReceived(long messageID) { + return _recentlyCompletedMessages.isKnown(messageID); + } + /** * Pull the fragments and ACKs out of the authenticated data packet */ diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState2.java b/router/java/src/net/i2p/router/transport/udp/PeerState2.java index 46a7787edf66b51213316c06c8227cb23e94c762..08093c360436ce005a314e5dc6ecce843cc44c80 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState2.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState2.java @@ -435,7 +435,14 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback _log.debug("Got I2NP block: " + msg); // 9 byte header int size = msg.getMessageSize() - 7; - messageFullyReceived(msg.getUniqueId(), size); + long messageId = msg.getUniqueId(); + messageFullyReceived(messageId, size); + if (_transport.getInboundFragments().messageReceived(messageId)) { + _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1); + if (_log.shouldInfo()) + _log.info("Got dup msg: " + messageId + " on " + this); + return; + } // complete message, skip IMF and MessageReceiver _transport.messageReceived(msg, null, _remotePeer, 0, size); } @@ -446,13 +453,20 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback InboundMessageState state; boolean messageComplete = false; boolean messageExpired = false; - boolean messageDup = false; + boolean messageDup; synchronized (_inboundMessages) { state = _inboundMessages.get(messageId); if (state == null) { - state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast); - _inboundMessages.put(messageId, state); + // Bloom filter in router will catch it, but use IMF's Bloom filter + // to save resources here + messageDup = _transport.getInboundFragments().wasRecentlyReceived(messageId); + if (messageDup) { + state = null; + } else { + state = new InboundMessageState(_context, messageId, _remotePeer, data, off, len, frag, isLast); + _inboundMessages.put(messageId, state); + } } else { messageDup = state.hasFragment(frag); if (!messageDup) { @@ -471,22 +485,38 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback } if (messageDup) { + messagePartiallyReceived(); + // Only update stats for the first fragment, + // otherwise it wildly overstates things + if (frag == 0) + _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1); synchronized(this) { _packetsReceivedDuplicate++; } - if (_log.shouldWarn()) - _log.warn("dup fragment rcvd: " + frag + " for " + state); + if (_log.shouldInfo()) { + if (state != null) + _log.info("dup fragment rcvd: " + frag + " for " + state); + else + _log.info("dup fragment rcvd: " + messageId + ' ' + frag + " on " + this); + } return; } if (messageComplete) { messageFullyReceived(messageId, state.getCompleteSize()); + if (_transport.getInboundFragments().messageReceived(messageId)) { + _context.statManager().addRateData("udp.ignoreRecentDuplicate", 1); + if (_log.shouldInfo()) + _log.info("Got dup msg: " + messageId + " on " + this); + return; + } if (_log.shouldDebug()) _log.debug("Message received completely! " + state); _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); receiveMessage(state); } else if (messageExpired) { + messagePartiallyReceived(); if (_log.shouldWarn()) _log.warn("Message expired while only being partially read: " + state); _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString()); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 3a896654418a28f608d1d00a515ea2015eb5d13b..8ad773d4051c059f26188a92107b5f131b294393 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -3463,6 +3463,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return _testManager; } + /** + * @since 0.9.54 + */ + InboundMessageFragments getInboundFragments() { + return _inboundFragments; + } + /** * Does nothing * @deprecated as of 0.9.31