From 03f509ca5448a537cdf720dd5304eeff1af2eb55 Mon Sep 17 00:00:00 2001 From: jrandom Date: Wed, 22 Feb 2006 14:54:22 +0000 Subject: [PATCH] 2006-02-22 jrandom * Handle a rare race under high bandwidth situations in the SSU transport * Minor refactoring so we don't confuse sun's 1.6.0-b2 validator --- history.txt | 6 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../kademlia/RepublishLeaseSetJob.java | 41 +-- .../router/networkdb/kademlia/SearchJob.java | 268 +++++++++--------- .../peermanager/PersistProfilesJob.java | 54 ++-- .../router/transport/udp/PacketBuilder.java | 7 + .../router/transport/udp/PacketHandler.java | 27 +- .../router/transport/udp/PacketPusher.java | 14 +- .../i2p/router/transport/udp/PeerState.java | 7 +- .../i2p/router/transport/udp/UDPPacket.java | 18 ++ 10 files changed, 253 insertions(+), 193 deletions(-) diff --git a/history.txt b/history.txt index 4a7f965ff..7c8a0ef6b 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,8 @@ -$Id: history.txt,v 1.414 2006/02/21 10:20:21 jrandom Exp $ +$Id: history.txt,v 1.415 2006/02/22 01:19:19 complication Exp $ + +2006-02-22 jrandom + * Handle a rare race under high bandwidth situations in the SSU transport + * Minor refactoring so we don't confuse sun's 1.6.0-b2 validator 2006-02-21 Complication * Reactivate TCP tranport by default, in addition to re-allowing diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index db7dc0b01..993ecc278 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.355 $ $Date: 2006/02/21 08:31:24 $"; + public final static String ID = "$Revision: 1.356 $ $Date: 2006/02/21 10:20:20 $"; public final static String VERSION = "0.6.1.11"; - public final static long BUILD = 0; + public final static long BUILD = 1; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java index 6e5b1b717..a6ca3c0bb 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/RepublishLeaseSetJob.java @@ -51,7 +51,7 @@ public class RepublishLeaseSetJob extends JobImpl { _log.warn("Not publishing a LOCAL lease that isn't current - " + _dest, new Exception("Publish expired LOCAL lease?")); } else { getContext().statManager().addRateData("netDb.republishLeaseSetCount", 1, 0); - _facade.sendStore(_dest, ls, new OnSuccess(getContext()), new OnFailure(getContext()), REPUBLISH_LEASESET_TIMEOUT, null); + _facade.sendStore(_dest, ls, new OnRepublishSuccess(getContext()), new OnRepublishFailure(getContext(), this), REPUBLISH_LEASESET_TIMEOUT, null); //getContext().jobQueue().addJob(new StoreJob(getContext(), _facade, _dest, ls, new OnSuccess(getContext()), new OnFailure(getContext()), REPUBLISH_LEASESET_TIMEOUT)); } } else { @@ -76,21 +76,28 @@ public class RepublishLeaseSetJob extends JobImpl { } } - private class OnSuccess extends JobImpl { - public OnSuccess(RouterContext ctx) { super(ctx); } - public String getName() { return "Publish leaseSet successful"; } - public void runJob() { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("successful publishing of the leaseSet for " + _dest.toBase64()); - } - } - private class OnFailure extends JobImpl { - public OnFailure(RouterContext ctx) { super(ctx); } - public String getName() { return "Publish leaseSet failed"; } - public void runJob() { - if (_log.shouldLog(Log.WARN)) - _log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64()); - RepublishLeaseSetJob.this.requeue(getContext().random().nextInt(60*1000)); - } + void requeueRepublish() { + if (_log.shouldLog(Log.WARN)) + _log.warn("FAILED publishing of the leaseSet for " + _dest.toBase64()); + requeue(getContext().random().nextInt(60*1000)); } } + +class OnRepublishSuccess extends JobImpl { + public OnRepublishSuccess(RouterContext ctx) { super(ctx); } + public String getName() { return "Publish leaseSet successful"; } + public void runJob() { + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("successful publishing of the leaseSet for " + _dest.toBase64()); + } +} + +class OnRepublishFailure extends JobImpl { + private RepublishLeaseSetJob _job; + public OnRepublishFailure(RouterContext ctx, RepublishLeaseSetJob job) { + super(ctx); + _job = job; + } + public String getName() { return "Publish leaseSet failed"; } + public void runJob() { _job.requeueRepublish(); } +} \ No newline at end of file diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index de665484d..695289db5 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -456,7 +456,7 @@ class SearchJob extends JobImpl { void replyFound(DatabaseSearchReplyMessage message, Hash peer) { long duration = _state.replyFound(peer); // this processing can take a while, so split 'er up - getContext().jobQueue().addJob(new SearchReplyJob(getContext(), (DatabaseSearchReplyMessage)message, peer, duration)); + getContext().jobQueue().addJob(new SearchReplyJob(getContext(), this, (DatabaseSearchReplyMessage)message, peer, duration)); } /** @@ -468,132 +468,6 @@ class SearchJob extends JobImpl { // noop } - private final class SearchReplyJob extends JobImpl { - private DatabaseSearchReplyMessage _msg; - /** - * Peer who we think sent us the reply. Note: could be spoofed! If the - * attacker knew we were searching for a particular key from a - * particular peer, they could send us some searchReply messages with - * shitty values, trying to get us to consider that peer unreliable. - * Potential fixes include either authenticated 'from' address or use a - * nonce in the search + searchReply (and check for it in the selector). - * - */ - private Hash _peer; - private int _curIndex; - private int _invalidPeers; - private int _seenPeers; - private int _newPeers; - private int _duplicatePeers; - private int _repliesPendingVerification; - private long _duration; - public SearchReplyJob(RouterContext enclosingContext, DatabaseSearchReplyMessage message, Hash peer, long duration) { - super(enclosingContext); - _msg = message; - _peer = peer; - _curIndex = 0; - _invalidPeers = 0; - _seenPeers = 0; - _newPeers = 0; - _duplicatePeers = 0; - _repliesPendingVerification = 0; - } - public String getName() { return "Process Reply for Kademlia Search"; } - public void runJob() { - if (_curIndex >= _msg.getNumReplies()) { - if (_repliesPendingVerification > 0) { - // we received new references from the peer, but still - // haven't verified all of them, so lets give it more time - SearchReplyJob.this.requeue(_timeoutMs); - } else { - // either they didn't tell us anything new or we have verified - // (or failed to verify) all of them. we're done - getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers, - _invalidPeers, _duplicatePeers, _duration); - if (_newPeers > 0) - newPeersFound(_newPeers); - } - } else { - Hash peer = _msg.getReply(_curIndex); - - boolean shouldAdd = false; - - RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer); - if (info == null) { - // if the peer is giving us lots of bad peer references, - // dont try to fetch them. - - boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer); - if (!sendsBadInfo) { - // we don't need to search for everthing we're given here - only ones that - // are next in our search path... - if (getContext().shitlist().isShitlisted(peer)) { - if (_log.shouldLog(Log.INFO)) - _log.info("Not looking for a shitlisted peer..."); - getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); - } else { - //getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs); - //_repliesPendingVerification++; - shouldAdd = true; - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64()); - getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); - } - } - - if (_state.wasAttempted(peer)) { - _duplicatePeers++; - } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": dbSearchReply received on search referencing router " - + peer); - if (shouldAdd) { - if (_facade.getKBuckets().add(peer)) - _newPeers++; - else - _seenPeers++; - } - - _curIndex++; - requeue(0); - } - } - - /** the peer gave us a reference to a new router, and we were able to fetch it */ - private final class ReplyVerifiedJob extends JobImpl { - private Hash _key; - public ReplyVerifiedJob(RouterContext enclosingContext, Hash key) { - super(enclosingContext); - _key = key; - } - public String getName() { return "Search reply value verified"; } - public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info("Peer reply from " + _peer.toBase64() + " verified: " + _key.toBase64()); - _repliesPendingVerification--; - getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0); - } - } - /** the peer gave us a reference to a new router, and we were NOT able to fetch it */ - private final class ReplyNotVerifiedJob extends JobImpl { - private Hash _key; - public ReplyNotVerifiedJob(RouterContext enclosingContext, Hash key) { - super(enclosingContext); - _key = key; - } - public String getName() { return "Search reply value NOT verified"; } - public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info("Peer reply from " + _peer.toBase64() + " failed verification: " + _key.toBase64()); - _repliesPendingVerification--; - _invalidPeers++; - getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0); - } - } - } - /** * Called when a particular peer failed to respond before the timeout was * reached, or if the peer could not be contacted at all. @@ -833,4 +707,144 @@ class SearchJob extends JobImpl { return super.toString() + " started " + DataHelper.formatDuration((getContext().clock().now() - _startedOn)) + " ago"; } + + boolean wasAttempted(Hash peer) { return _state.wasAttempted(peer); } + long timeoutMs() { return _timeoutMs; } + boolean add(Hash peer) { return _facade.getKBuckets().add(peer); } +} + +class SearchReplyJob extends JobImpl { + private DatabaseSearchReplyMessage _msg; + private Log _log; + /** + * Peer who we think sent us the reply. Note: could be spoofed! If the + * attacker knew we were searching for a particular key from a + * particular peer, they could send us some searchReply messages with + * shitty values, trying to get us to consider that peer unreliable. + * Potential fixes include either authenticated 'from' address or use a + * nonce in the search + searchReply (and check for it in the selector). + * + */ + private Hash _peer; + private int _curIndex; + private int _invalidPeers; + private int _seenPeers; + private int _newPeers; + private int _duplicatePeers; + private int _repliesPendingVerification; + private long _duration; + private SearchJob _searchJob; + public SearchReplyJob(RouterContext enclosingContext, SearchJob job, DatabaseSearchReplyMessage message, Hash peer, long duration) { + super(enclosingContext); + _log = enclosingContext.logManager().getLog(getClass()); + _searchJob = job; + _msg = message; + _peer = peer; + _curIndex = 0; + _invalidPeers = 0; + _seenPeers = 0; + _newPeers = 0; + _duplicatePeers = 0; + _repliesPendingVerification = 0; + } + public String getName() { return "Process Reply for Kademlia Search"; } + public void runJob() { + if (_curIndex >= _msg.getNumReplies()) { + if (_repliesPendingVerification > 0) { + // we received new references from the peer, but still + // haven't verified all of them, so lets give it more time + requeue(_searchJob.timeoutMs()); + } else { + // either they didn't tell us anything new or we have verified + // (or failed to verify) all of them. we're done + getContext().profileManager().dbLookupReply(_peer, _newPeers, _seenPeers, + _invalidPeers, _duplicatePeers, _duration); + if (_newPeers > 0) + _searchJob.newPeersFound(_newPeers); + } + } else { + Hash peer = _msg.getReply(_curIndex); + + boolean shouldAdd = false; + + RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer); + if (info == null) { + // if the peer is giving us lots of bad peer references, + // dont try to fetch them. + + boolean sendsBadInfo = getContext().profileOrganizer().peerSendsBadReplies(_peer); + if (!sendsBadInfo) { + // we don't need to search for everthing we're given here - only ones that + // are next in our search path... + if (getContext().shitlist().isShitlisted(peer)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Not looking for a shitlisted peer..."); + getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); + } else { + //getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs); + //_repliesPendingVerification++; + shouldAdd = true; + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64()); + getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); + } + } + + if (_searchJob.wasAttempted(peer)) { + _duplicatePeers++; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer); + if (shouldAdd) { + if (_searchJob.add(peer)) + _newPeers++; + else + _seenPeers++; + } + + _curIndex++; + requeue(0); + } + } + void replyVerified() { + if (_log.shouldLog(Log.INFO)) + _log.info("Peer reply from " + _peer.toBase64()); + _repliesPendingVerification--; + getContext().statManager().addRateData("netDb.searchReplyValidated", 1, 0); + } + void replyNotVerified() { + if (_log.shouldLog(Log.INFO)) + _log.info("Peer reply from " + _peer.toBase64()); + _repliesPendingVerification--; + _invalidPeers++; + getContext().statManager().addRateData("netDb.searchReplyNotValidated", 1, 0); + } +} + +/** the peer gave us a reference to a new router, and we were able to fetch it */ +class ReplyVerifiedJob extends JobImpl { + private Hash _key; + private SearchReplyJob _replyJob; + public ReplyVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) { + super(enclosingContext); + _replyJob = srj; + _key = key; + } + public String getName() { return "Search reply value verified"; } + public void runJob() { _replyJob.replyVerified(); } +} + +/** the peer gave us a reference to a new router, and we were NOT able to fetch it */ +class ReplyNotVerifiedJob extends JobImpl { + private Hash _key; + private SearchReplyJob _replyJob; + public ReplyNotVerifiedJob(RouterContext enclosingContext, SearchReplyJob srj, Hash key) { + super(enclosingContext); + _key = key; + _replyJob = srj; + } + public String getName() { return "Search reply value NOT verified"; } + public void runJob() { _replyJob.replyNotVerified(); } } diff --git a/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java b/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java index e900d612c..6d43923e3 100644 --- a/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java +++ b/router/java/src/net/i2p/router/peermanager/PersistProfilesJob.java @@ -20,35 +20,29 @@ class PersistProfilesJob extends JobImpl { public String getName() { return "Persist profiles"; } public void runJob() { Set peers = _mgr.selectPeers(); - Hash hashes[] = new Hash[peers.size()]; - int i = 0; - for (Iterator iter = peers.iterator(); iter.hasNext(); ) - hashes[i] = (Hash)iter.next(); - getContext().jobQueue().addJob(new PersistProfileJob(getContext(), hashes)); - } - - private class PersistProfileJob extends JobImpl { - private Hash _peers[]; - private int _cur; - public PersistProfileJob(RouterContext enclosingContext, Hash peers[]) { - super(enclosingContext); - _peers = peers; - _cur = 0; - } - public void runJob() { - if (_cur < _peers.length) { - _mgr.storeProfile(_peers[_cur]); - _cur++; - } - if (_cur >= _peers.length) { - // no more left, requeue up the main persist-em-all job - PersistProfilesJob.this.getTiming().setStartAfter(getContext().clock().now() + PERSIST_DELAY); - PersistProfilesJob.this.getContext().jobQueue().addJob(PersistProfilesJob.this); - } else { - // we've got peers left to persist, so requeue the persist profile job - PersistProfilesJob.PersistProfileJob.this.requeue(1000); - } - } - public String getName() { return "Persist profile"; } + getContext().jobQueue().addJob(new PersistProfileJob(getContext(), this, peers)); } + void persist(Hash peer) { _mgr.storeProfile(peer); } + void requeue() { requeue(PERSIST_DELAY); } } + +class PersistProfileJob extends JobImpl { + private PersistProfilesJob _job; + private Iterator _peers; + public PersistProfileJob(RouterContext enclosingContext, PersistProfilesJob job, Set peers) { + super(enclosingContext); + _peers = peers.iterator(); + _job = job; + } + public void runJob() { + if (_peers.hasNext()) + _job.persist((Hash)_peers.next()); + if (_peers.hasNext()) { + requeue(1000); + } else { + // no more left, requeue up the main persist-em-all job + _job.requeue(); + } + } + public String getName() { return "Persist profile"; } +} \ No newline at end of file diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index 21a82f229..3aa70072f 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -38,6 +38,8 @@ public class PacketBuilder { _context = ctx; _transport = transport; _log = ctx.logManager().getLog(PacketBuilder.class); + _context.statManager().createRateStat("udp.packetAuthTime", "How long it takes to encrypt and MAC a packet for sending", "udp", new long[] { 60*1000 }); + _context.statManager().createRateStat("udp.packetAuthTimeSlow", "How long it takes to encrypt and MAC a packet for sending (when its slow)", "udp", new long[] { 60*1000, 10*60*1000 }); } public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer) { @@ -1029,6 +1031,7 @@ public class PacketBuilder { * @param iv IV to deliver */ private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) { + long before = System.currentTimeMillis(); int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE; int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset(); byte data[] = packet.getPacket().getData(); @@ -1059,5 +1062,9 @@ public class PacketBuilder { System.arraycopy(ba.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE); System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE); _hmacCache.release(ba); + long timeToAuth = System.currentTimeMillis() - before; + _context.statManager().addRateData("udp.packetAuthTime", timeToAuth, timeToAuth); + if (timeToAuth > 100) + _context.statManager().addRateData("udp.packetAuthTimeSlow", timeToAuth, timeToAuth); } } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 90766d300..1f8f539a4 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -134,17 +134,26 @@ public class PacketHandler { + packet + ": " + _reader); } - long timeToDequeue = packet.getTimeSinceEnqueue() - packet.getTimeSinceReceived(); - long timeToVerify = 0; - long beforeRecv = packet.getTimeSinceReceiveFragments(); - if (beforeRecv > 0) - timeToVerify = beforeRecv - packet.getTimeSinceReceived(); + long enqueueTime = packet.getEnqueueTime(); + long recvTime = packet.getReceivedTime(); + long beforeValidateTime = packet.getBeforeValidate(); + long afterValidateTime = packet.getAfterValidate(); + + long timeToDequeue = recvTime - enqueueTime; + long timeToValidate = 0; + long authTime = 0; + if (afterValidateTime > 0) { + timeToValidate = afterValidateTime - enqueueTime; + authTime = afterValidateTime - beforeValidateTime; + } if (timeToDequeue > 50) _context.statManager().addRateData("udp.packetDequeueTime", timeToDequeue, timeToDequeue); - if (timeToVerify > 0) { - _context.statManager().addRateData("udp.packetVerifyTime", timeToVerify, timeToDequeue); - if (timeToVerify > 100) - _context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToVerify, timeToDequeue); + if (authTime > 50) + _context.statManager().addRateData("udp.packetAuthRecvTime", authTime, beforeValidateTime-recvTime); + if (timeToValidate > 0) { + _context.statManager().addRateData("udp.packetVerifyTime", timeToValidate, authTime); + if (timeToValidate > 50) + _context.statManager().addRateData("udp.packetVerifyTimeSlow", timeToValidate, authTime); } // back to the cache with thee! diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 18c08617b..9ba115f85 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -35,12 +35,16 @@ public class PacketPusher implements Runnable { public void run() { while (_alive) { - UDPPacket packets[] = _fragments.getNextVolley(); - if (packets != null) { - for (int i = 0; i < packets.length; i++) { - if (packets[i] != null) // null for ACKed fragments - _sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms + try { + UDPPacket packets[] = _fragments.getNextVolley(); + if (packets != null) { + for (int i = 0; i < packets.length; i++) { + if (packets[i] != null) // null for ACKed fragments + _sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms + } } + } catch (Exception e) { + _log.log(Log.CRIT, "Error pushing", e); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 00c41c5d2..fb621aa8b 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1202,10 +1202,13 @@ public class PeerState { } - if ( (_retransmitter != null) && ( (_retransmitter.isExpired() || _retransmitter.isComplete()) ) ) + OutboundMessageState retrans = _retransmitter; + if ( (retrans != null) && ( (retrans.isExpired() || retrans.isComplete()) ) ) { _retransmitter = null; + retrans = null; + } - if ( (_retransmitter != null) && (_retransmitter != state) ) { + if ( (retrans != null) && (retrans != state) ) { // choke it, since there's already another message retransmitting to this // peer. _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted, _packetsTransmitted); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index 95b756968..571484481 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -38,6 +38,8 @@ public class UDPPacket { private volatile Exception _acquiredBy; private long _enqueueTime; private long _receivedTime; + private long _beforeValidate; + private long _afterValidate; private long _beforeReceiveFragments; private long _afterHandlingTime; private boolean _isInbound; @@ -84,6 +86,7 @@ public class UDPPacket { ctx.statManager().createRateStat("udp.packetsLiveOutbound", "Number of live outbound packets in memory", "udp", new long[] { 60*1000, 5*60*1000 }); ctx.statManager().createRateStat("udp.packetsLivePendingRecvInbound", "Number of live inbound packets not yet handled by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 }); ctx.statManager().createRateStat("udp.packetsLivePendingHandleInbound", "Number of live inbound packets not yet handled fully by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 }); + ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", new long[] { 60*1000 }); // the data buffer is clobbered on init(..), but we need it to bootstrap _packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE); init(ctx, inbound); @@ -146,10 +149,14 @@ public class UDPPacket { public RemoteHostId getRemoteHost() { if (_remoteHost == null) { + long before = System.currentTimeMillis(); InetAddress addr = _packet.getAddress(); byte ip[] = addr.getAddress(); int port = _packet.getPort(); _remoteHost = new RemoteHostId(ip, port); + long timeToFetch = System.currentTimeMillis() - before; + if (timeToFetch > 50) + _context.statManager().addRateData("udp.fetchRemoteSlow", timeToFetch, getLifetime()); } return _remoteHost; } @@ -161,6 +168,7 @@ public class UDPPacket { */ public boolean validate(SessionKey macKey) { verifyNotReleased(); + _beforeValidate = _context.clock().now(); boolean eq = false; ByteArray buf = _validateCache.acquire(); @@ -202,6 +210,7 @@ public class UDPPacket { } _validateCache.release(buf); + _afterValidate = _context.clock().now(); return eq; } @@ -237,6 +246,15 @@ public class UDPPacket { /** a packet handler has finished parsing out the good bits */ long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); } + /** when it was added to the endpoint's receive queue */ + long getEnqueueTime() { return _enqueueTime; } + /** when it was pulled off the endpoint receive queue */ + long getReceivedTime() { return _receivedTime; } + /** when we began validate() */ + long getBeforeValidate() { return _beforeValidate; } + /** when we finished validate() */ + long getAfterValidate() { return _afterValidate; } + public String toString() { verifyNotReleased(); StringBuffer buf = new StringBuffer(64);