diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 2d8f263ff1..a3329f3c6a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -72,8 +72,8 @@ public class Connection { private long _lifetimeDupMessageSent; private long _lifetimeDupMessageReceived; - public static final long MAX_RESEND_DELAY = 8*1000; - public static final long MIN_RESEND_DELAY = 3*1000; + public static final long MAX_RESEND_DELAY = 15*1000; + public static final long MIN_RESEND_DELAY = 2*1000; /** wait up to 5 minutes after disconnection so we can ack/close packets */ public static int DISCONNECT_TIMEOUT = 5*60*1000; @@ -193,7 +193,28 @@ public class Connection { } void ackImmediately() { - PacketLocal packet = _receiver.send(null, 0, 0); + PacketLocal packet = null; + synchronized (_outboundPackets) { + if (_outboundPackets.size() > 0) { + // ordered, so pick the lowest to retransmit + Iterator iter = _outboundPackets.values().iterator(); + packet = (PacketLocal)iter.next(); + //iter.remove(); + } + } + if (packet != null) { + ResendPacketEvent evt = (ResendPacketEvent)packet.getResendEvent(); + if (evt != null) { + boolean sent = evt.retransmit(false); + if (sent) { + return; + } else { + //SimpleTimer.getInstance().addEvent(evt, evt.getNextSendTime()); + } + } + } + // if we don't have anything to retransmit, send a small ack + packet = _receiver.send(null, 0, 0); //packet.releasePayload(); } @@ -277,7 +298,7 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Resend in " + timeout + " for " + packet, new Exception("Sent by")); - SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), timeout); + SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet, timeout + _context.clock().now()), timeout); } _context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); @@ -899,18 +920,29 @@ public class Connection { */ private class ResendPacketEvent implements SimpleTimer.TimedEvent { private PacketLocal _packet; - public ResendPacketEvent(PacketLocal packet) { + private long _nextSendTime; + public ResendPacketEvent(PacketLocal packet, long sendTime) { _packet = packet; + _nextSendTime = sendTime; packet.setResendPacketEvent(ResendPacketEvent.this); } - public void timeReached() { + public long getNextSendTime() { return _nextSendTime; } + public void timeReached() { retransmit(true); } + /** + * Retransmit the packet if we need to. + * + * @param penalize true if this retransmission is caused by a timeout, false if we + * are just sending this packet instead of an ACK + * @return true if the packet was sent, false if it was not + */ + public boolean retransmit(boolean penalize) { if (_packet.getAckTime() > 0) - return; + return false; if (_resetSent || _resetReceived) { _packet.cancelled(); - return; + return false; } //if (_log.shouldLog(Log.DEBUG)) @@ -932,7 +964,8 @@ public class Connection { _log.warn("Delaying resend of " + _packet + " as there are " + _activeResends + " active resends already in play"); SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, 1000); - return; + _nextSendTime = 1000 + _context.clock().now(); + return false; } // revamp various fields, in case we need to ack more, etc _inputStream.updateAcks(_packet); @@ -949,7 +982,7 @@ public class Connection { int newWindowSize = getOptions().getWindowSize(); - if (_ackSinceCongestion) { + if (penalize && _ackSinceCongestion) { // only shrink the window once per window if (_packet.getSequenceNum() > _lastCongestionHighestUnacked) { congestionOccurred(); @@ -1004,7 +1037,7 @@ public class Connection { synchronized (_outboundPackets) { _outboundPackets.notifyAll(); } - return; + return true; } if (numSends - 1 > _options.getMaxResends()) { @@ -1023,11 +1056,14 @@ public class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Scheduling resend in " + timeout + "ms for " + _packet); SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout); + _nextSendTime = timeout + _context.clock().now(); } + return true; } else { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Packet acked before resend (resend="+ resend + "): " // + _packet + " on " + Connection.this); + return false; } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index d830026f53..d75cb45759 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -54,9 +54,11 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor"; private static final int TREND_COUNT = 3; - static final int INITIAL_WINDOW_SIZE = 4; + static final int INITIAL_WINDOW_SIZE = 6; static final int DEFAULT_MAX_SENDS = 8; + static final int MIN_WINDOW_SIZE = 6; + public ConnectionOptions() { super(); } @@ -183,6 +185,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { numMsgs = _maxWindowSize; else if (numMsgs <= 0) numMsgs = 1; + if (numMsgs < MIN_WINDOW_SIZE) + numMsgs = MIN_WINDOW_SIZE; _windowSize = numMsgs; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 6a319fbdbb..2c4a4f3cfa 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -52,6 +52,16 @@ public class ConnectionPacketHandler { packet.releasePayload(); return; } + + if ( (con.getCloseSentOn() > 0) && (con.getUnackedPacketsSent() <= 0) && + (packet.getSequenceNum() > 0) && (packet.getPayloadSize() > 0)) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received new data when we've sent them data and all of our data is acked: " + + packet + " on " + con + ""); + con.sendReset(); + packet.releasePayload(); + return; + } if (packet.isFlagSet(Packet.FLAG_MAX_PACKET_SIZE_INCLUDED)) { if (packet.getOptionalMaxSize() < con.getOptions().getMaxMessageSize()) { @@ -285,8 +295,10 @@ public class ConnectionPacketHandler { _context.statManager().addRateData("stream.trend", trend, newWindowSize); if ( (!congested) && (acked > 0) && (numResends <= 0) ) { - if ( (newWindowSize > con.getLastCongestionSeenAt() / 2) || - (trend > 0) ) { // tcp vegas: avoidance if rtt is increasing, even if we arent at ssthresh/2 yet + if (trend < 0) { + // rtt is shrinking, so lets increment the cwin + newWindowSize++; + } else if (newWindowSize > con.getLastCongestionSeenAt() / 2) { // congestion avoidance // we can't use newWindowSize += 1/newWindowSize, since we're diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index b96aaf005b..0ff9f746f6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -236,8 +236,8 @@ public class PacketHandler { } packet.releasePayload(); } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Packet received on an unknown stream (and not an ECHO): " + packet); + if (_log.shouldLog(Log.DEBUG) && !packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + _log.debug("Packet received on an unknown stream (and not an ECHO or SYN): " + packet); if (sendId <= 0) { Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId()); if (con != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 9b869fd5ca..36c4a690c7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -101,6 +101,7 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat if (_log.shouldLog(Log.DEBUG)) _log.debug("Cancelled! " + toString(), new Exception("cancelled")); } + public SimpleTimer.TimedEvent getResendEvent() { return _resendEvent; } /** how long after packet creation was it acked? */ public int getAckTime() { diff --git a/history.txt b/history.txt index 08ae2f2c07..6bcb51153e 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,24 @@ -$Id: history.txt,v 1.304 2005/10/20 14:42:13 dust Exp $ +$Id: history.txt,v 1.305 2005/10/22 13:06:03 jrandom Exp $ + +2005-10-25 jrandom + * Defer netDb searches for newly referenced peers until we actually want + them + * Ignore netDb references to peers on our shitlist + * Set the timeout for end to end client messages to the max delay after + finding the leaseSet, so we don't have as many expired messages floating + around. + * Add a floor to the streaming lib window size + * When we need to send a streaming lib ACK, try to retransmit one of the + unacked packets instead (with updated ACK/NACK fields, of course). The + bandwidth cost of an unnecessary retransmission should be minor as + compared to both an ACK packet (rounded up to 1KB in the tunnels) and + the probability of a necessary retransmission. + * Adjust the streaming lib cwin algorithm to allow growth after a full + cwin messages if the rtt is trending downwards. If it is not, use the + existing algorithm. + * Increased the maximum rto size in the streaming lib. + * Load balancing bugfix on end to end messages to distribute across + tunnels more evenly. 2005-10-22 jrandom * Integrated GNU-Crypto's Fortuna PRNG, seeding it off /dev/urandom and diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 03711224b2..7cf0ccd561 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.275 $ $Date: 2005/10/20 14:42:16 $"; + public final static String ID = "$Revision: 1.276 $ $Date: 2005/10/22 13:06:03 $"; public final static String VERSION = "0.6.1.3"; - public final static long BUILD = 6; + public final static long BUILD = 7; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 534c531d81..01d95bbae8 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -151,9 +151,6 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public void runJob() { if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": Send outbound client message job beginning"); - buildClove(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Clove built to " + _toString); long timeoutMs = _overallExpiration - getContext().clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": preparing to search for the leaseSet for " + _toString); @@ -210,9 +207,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().statManager().addRateData("client.leaseSetFoundRemoteTime", lookupTime, lookupTime); } boolean ok = getNextLease(); - if (ok) + if (ok) { send(); - else { + } else { if (_log.shouldLog(Log.ERROR)) _log.error("Unable to send on a random lease, as getNext returned null (to=" + _toString + ")"); dieFatal(); @@ -258,19 +255,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // sort are randomly ordered) Collections.shuffle(leases); - // ordered by lease number of failures - TreeMap orderedLeases = new TreeMap(); - for (Iterator iter = leases.iterator(); iter.hasNext(); ) { - Lease lease = (Lease)iter.next(); - long id = lease.getNumFailure(); - while (orderedLeases.containsKey(new Long(id))) - id++; - orderedLeases.put(new Long(id), lease); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": ranking lease we havent sent it down as " + id); + if (false) { + // ordered by lease number of failures + TreeMap orderedLeases = new TreeMap(); + for (Iterator iter = leases.iterator(); iter.hasNext(); ) { + Lease lease = (Lease)iter.next(); + long id = lease.getNumFailure(); + while (orderedLeases.containsKey(new Long(id))) + id++; + orderedLeases.put(new Long(id), lease); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": ranking lease we havent sent it down as " + id); + } + + _lease = (Lease)orderedLeases.get(orderedLeases.firstKey()); + } else { + _lease = (Lease)leases.get(0); } - - _lease = (Lease)orderedLeases.get(orderedLeases.firstKey()); return true; } @@ -320,7 +321,10 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } _inTunnel = selectInboundTunnel(); - + + buildClove(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + ": Clove built to " + _toString); GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token, _overallExpiration, key, _clove, _from.calculateHash(), @@ -461,12 +465,12 @@ public class OutboundClientMessageOneShotJob extends JobImpl { clove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); clove.setDeliveryInstructions(instructions); - clove.setExpiration(_overallExpiration); + clove.setExpiration(OVERALL_TIMEOUT_MS_DEFAULT+getContext().clock().now()); clove.setId(getContext().random().nextLong(I2NPMessage.MAX_ID_VALUE)); DataMessage msg = new DataMessage(getContext()); msg.setData(_clientMessage.getPayload().getEncryptedData()); - msg.setMessageExpiration(_overallExpiration); + msg.setMessageExpiration(clove.getExpiration()); clove.setPayload(msg); clove.setRecipientPublicKey(null); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index e3ae93a580..c53f3daee7 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -128,6 +128,7 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { _lastExploreNew = 0; _activeRequests = new HashMap(8); _enforceNetId = DEFAULT_ENFORCE_NETID; + context.statManager().createRateStat("netDb.lookupLeaseSetDeferred", "how many lookups are deferred for a single leaseSet lookup?", "NetworkDatabase", new long[] { 60*1000, 5*60*1000 }); } protected PeerSelector createPeerSelector() { return new PeerSelector(_context); } @@ -808,7 +809,8 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } else { if (_log.shouldLog(Log.INFO)) _log.info("Deferring search for " + key.toBase64() + " with " + onFindJob); - searchJob.addDeferred(onFindJob, onFailedLookupJob, timeoutMs, isLease); + int deferred = searchJob.addDeferred(onFindJob, onFailedLookupJob, timeoutMs, isLease); + _context.statManager().addRateData("netDb.lookupLeaseSetDeferred", deferred, searchJob.getExpiration()-_context.clock().now()); } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 9ee7ae79b8..680872c8cf 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -125,8 +125,8 @@ class SearchJob extends JobImpl { protected SearchState getState() { return _state; } protected KademliaNetworkDatabaseFacade getFacade() { return _facade; } - protected long getExpiration() { return _expiration; } - protected long getTimeoutMs() { return _timeoutMs; } + public long getExpiration() { return _expiration; } + public long getTimeoutMs() { return _timeoutMs; } /** * Let each peer take up to the average successful search RTT @@ -164,8 +164,8 @@ class SearchJob extends JobImpl { _state.complete(true); succeed(); } else if (isExpired()) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Key search expired"); + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": Key search expired"); _state.complete(true); fail(); } else { @@ -208,51 +208,61 @@ class SearchJob extends JobImpl { requeuePending(); return; } - List closestHashes = getClosestRouters(_state.getTarget(), toCheck, _state.getAttempted()); - if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { - if (_state.getPending().size() <= 0) { - // we tried to find some peers, but there weren't any and no one else is going to answer - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": No peers left, and none pending! Already searched: " - + _state.getAttempted().size() + " failed: " + _state.getFailed().size()); - fail(); - } else { - // no more to try, but we might get data or close peers from some outstanding requests - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + ": No peers left, but some are pending! Pending: " - + _state.getPending().size() + " attempted: " + _state.getAttempted().size() - + " failed: " + _state.getFailed().size()); - requeuePending(); - return; - } - } else { - _state.addPending(closestHashes); - int sent = 0; - for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - DataStructure ds = _facade.getDataStore().get(peer); - if ( (ds == null) || !(ds instanceof RouterInfo) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " - + peer + " : " + (ds == null ? "null" : ds.getClass().getName())); - _state.replyTimeout(peer); + int sent = 0; + Set attempted = _state.getAttempted(); + while (sent <= 0) { + List closestHashes = getClosestRouters(_state.getTarget(), toCheck, attempted); + if ( (closestHashes == null) || (closestHashes.size() <= 0) ) { + if (_state.getPending().size() <= 0) { + // we tried to find some peers, but there weren't any and no one else is going to answer + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": No peers left, and none pending! Already searched: " + + _state.getAttempted().size() + " failed: " + _state.getFailed().size()); + fail(); } else { - if (getContext().shitlist().isShitlisted(peer)) { - // dont bother + // no more to try, but we might get data or close peers from some outstanding requests + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": No peers left, but some are pending! Pending: " + + _state.getPending().size() + " attempted: " + _state.getAttempted().size() + + " failed: " + _state.getFailed().size()); + requeuePending(); + } + return; + } else { + attempted.addAll(closestHashes); + for (Iterator iter = closestHashes.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + DataStructure ds = _facade.getDataStore().get(peer); + if (ds == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Next closest peer " + peer + " was only recently referred to us, sending a search for them"); + getContext().netDb().lookupRouterInfo(peer, null, null, _timeoutMs); + } else if (!(ds instanceof RouterInfo)) { + if (_log.shouldLog(Log.WARN)) + _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + + peer + " : " + (ds == null ? "null" : ds.getClass().getName())); + _state.replyTimeout(peer); } else { - sendSearch((RouterInfo)ds); - sent++; + if (getContext().shitlist().isShitlisted(peer)) { + // dont bother + } else { + _state.addPending(peer); + sendSearch((RouterInfo)ds); + sent++; + } } } - } - if (sent <= 0) { - // the (potentially) last peers being searched for could not be, - // er, searched for, so lets retry ASAP (causing either another - // peer to be selected, or the whole search to fail) - if (_log.shouldLog(Log.WARN)) - _log.warn(getJobId() + ": No new peer queued up, so we are going to requeue " + - "ourselves in our search for " + _state.getTarget().toBase64()); - requeuePending(0); + /* + if (sent <= 0) { + // the (potentially) last peers being searched for could not be, + // er, searched for, so lets retry ASAP (causing either another + // peer to be selected, or the whole search to fail) + if (_log.shouldLog(Log.INFO)) + _log.info(getJobId() + ": No new peer queued up, so we are going to requeue " + + "ourselves in our search for " + _state.getTarget().toBase64()); + requeuePending(0); + } + */ } } } @@ -503,6 +513,8 @@ class SearchJob extends JobImpl { } else { Hash peer = _msg.getReply(_curIndex); + boolean shouldAdd = false; + RouterInfo info = getContext().netDb().lookupRouterInfoLocally(peer); if (info == null) { // if the peer is giving us lots of bad peer references, @@ -512,11 +524,18 @@ class SearchJob extends JobImpl { if (!sendsBadInfo) { // we don't need to search for everthing we're given here - only ones that // are next in our search path... - getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs); - _repliesPendingVerification++; + if (getContext().shitlist().isShitlisted(peer)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Not looking for a shitlisted peer..."); + getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); + } else { + //getContext().netDb().lookupRouterInfo(peer, new ReplyVerifiedJob(getContext(), peer), new ReplyNotVerifiedJob(getContext(), peer), _timeoutMs); + //_repliesPendingVerification++; + shouldAdd = true; + } } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64()); + if (_log.shouldLog(Log.INFO)) + _log.info("Peer " + _peer.toBase64() + " sends us bad replies, so not verifying " + peer.toBase64()); getContext().statManager().addRateData("netDb.searchReplyValidationSkipped", 1, 0); } } @@ -527,10 +546,12 @@ class SearchJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug(getJobId() + ": dbSearchReply received on search referencing router " + peer); - if (_facade.getKBuckets().add(peer)) - _newPeers++; - else - _seenPeers++; + if (shouldAdd) { + if (_facade.getKBuckets().add(peer)) + _newPeers++; + else + _seenPeers++; + } _curIndex++; requeue(0); @@ -597,8 +618,8 @@ class SearchJob extends JobImpl { if (_state.completed()) return; _state.replyTimeout(_peer); if (_penalizePeer) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Penalizing peer for timeout on search: " + _peer.toBase64() + " after " + (getContext().clock().now() - _sentOn)); + if (_log.shouldLog(Log.INFO)) + _log.info("Penalizing peer for timeout on search: " + _peer.toBase64() + " after " + (getContext().clock().now() - _sentOn)); getContext().profileManager().dbLookupFailed(_peer); } else { if (_log.shouldLog(Log.ERROR)) @@ -736,14 +757,16 @@ class SearchJob extends JobImpl { handleDeferred(false); } - public void addDeferred(Job onFind, Job onFail, long expiration, boolean isLease) { + public int addDeferred(Job onFind, Job onFail, long expiration, boolean isLease) { Search search = new Search(onFind, onFail, expiration, isLease); boolean ok = true; + int deferred = 0; synchronized (_deferredSearches) { if (_deferredCleared) ok = false; else _deferredSearches.add(search); + deferred = _deferredSearches.size(); } if (!ok) { @@ -754,6 +777,9 @@ class SearchJob extends JobImpl { // the following /shouldn't/ be necessary, but it doesnt hurt _facade.searchComplete(_state.getTarget()); _facade.search(_state.getTarget(), onFind, onFail, expiration - getContext().clock().now(), isLease); + return 0; + } else { + return deferred; } } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java index 829d7434a7..da00483ab4 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java @@ -104,6 +104,25 @@ class SearchState { _attemptedPeers.addAll(pending); } } + public void addPending(Hash peer) { + synchronized (_pendingPeers) { + _pendingPeers.add(peer); + _pendingPeerTimes.put(peer, new Long(_context.clock().now())); + } + synchronized (_attemptedPeers) { + _attemptedPeers.add(peer); + } + } + /** we didn't actually want to add this peer as part of the pending list... */ + public void removePending(Hash peer) { + synchronized (_pendingPeers) { + _pendingPeers.remove(peer); + _pendingPeerTimes.remove(peer); + } + synchronized (_attemptedPeers) { + _attemptedPeers.remove(peer); + } + } /** how long did it take to get the reply, or -1 if we don't know */ public long dataFound(Hash peer) { diff --git a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java index 0fb4d4bca4..88892f7bdd 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java @@ -72,7 +72,12 @@ public class HandleTunnelCreateMessageJob extends JobImpl { } } + /** don't accept requests to join for 15 minutes or more */ + public static final int MAX_DURATION_SECONDS = 15*60; + private int shouldAccept() { + if (_request.getDurationSeconds() >= MAX_DURATION_SECONDS) + return TunnelHistory.TUNNEL_REJECT_CRIT; Hash nextRouter = _request.getNextRouter(); if (nextRouter != null) { RouterInfo ri = getContext().netDb().lookupRouterInfoLocally(nextRouter);