From e9b7ca369723262bf550007a007d70e10d95134f Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Thu, 1 Jul 2004 15:08:18 +0000 Subject: [PATCH] dont accept outrageously long delays when building a tunnel (aka now each peer only gets the timeout to respond, instead of the full # peers * timeout to respond) this will cause more dropped messages to show up, but in turn it will avoid slower peers (since they'll be marked down as rejecting the tunnel) --- .../tunnelmanager/RequestTunnelJob.java | 151 ++++++++++++------ 1 file changed, 101 insertions(+), 50 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnelmanager/RequestTunnelJob.java b/router/java/src/net/i2p/router/tunnelmanager/RequestTunnelJob.java index aac207402d..a321fe2ed3 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/RequestTunnelJob.java +++ b/router/java/src/net/i2p/router/tunnelmanager/RequestTunnelJob.java @@ -33,6 +33,7 @@ import net.i2p.data.i2np.TunnelCreateMessage; import net.i2p.data.i2np.TunnelCreateStatusMessage; import net.i2p.router.Job; import net.i2p.router.JobImpl; +import net.i2p.router.MessageHistory; import net.i2p.router.MessageSelector; import net.i2p.router.PeerSelectionCriteria; import net.i2p.router.ReplyJob; @@ -93,10 +94,6 @@ public class RequestTunnelJob extends JobImpl { _log.info("Requesting outbound tunnel " + _tunnelGateway.getTunnelId() + " with " + participants.size() + " participants in it"); } - // since we request serially, we need to up the timeout serially - // change this once we go parallel - _timeoutMs *= participants.size()+1; - // work backwards (end point, then the router pointing at the endpoint, then the router pointing at that, etc, until the gateway _toBeRequested = new ArrayList(participants.size()); for (int i = participants.size()-1; i >= 0; i--) { @@ -108,11 +105,15 @@ public class RequestTunnelJob extends JobImpl { _log.warn("ok who the fuck requested someone we don't know about? (dont answer that"); } } + + // since we request serially, we need to up the timeout serially + // change this once we go parallel + //_timeoutMs *= participants.size()+1; + _expiration = (_timeoutMs * _toBeRequested.size()) + _context.clock().now(); } public String getName() { return "Request Tunnel"; } public void runJob() { - if (_expiration < 0) _expiration = _timeoutMs + _context.clock().now(); if (_context.clock().now() > _expiration) { if (_log.shouldLog(Log.WARN)) _log.warn("Timeout reached building tunnel (timeout = " + _timeoutMs + " expiration = " + new Date(_expiration) + ")"); @@ -162,7 +163,8 @@ public class RequestTunnelJob extends JobImpl { return; } - // select reply peer [peer to which SourceRouteReply should be sent, and from which the reply will be forwarded to an inbound tunnel] + // select reply peer [peer to which SourceRouteReply should be sent, and + // from which the reply will be forwarded to an inbound tunnel] RouterInfo replyPeer = selectReplyPeer(participant); if (replyPeer == null) { if (_log.shouldLog(Log.WARN)) @@ -184,7 +186,9 @@ public class RequestTunnelJob extends JobImpl { Set wrappedTags = new HashSet(64); PublicKey wrappedTo = new PublicKey(); - RequestState state = new RequestState(wrappedKey, wrappedTags, wrappedTo, participant, inboundGateway, replyPeer, outboundTunnel, target); + RequestState state = new RequestState(wrappedKey, wrappedTags, wrappedTo, + participant, inboundGateway, replyPeer, + outboundTunnel, target); Request r = new Request(state); _context.jobQueue().addJob(r); } @@ -205,16 +209,18 @@ public class RequestTunnelJob extends JobImpl { public void runJob() { boolean needsMore = _state.doNext(); - if (needsMore) + if (needsMore) { requeue(0); - else - Request.this._context.messageHistory().requestTunnelCreate(_tunnelGateway.getTunnelId(), - _state.getOutboundTunnel(), - _state.getParticipant().getThisHop(), - _state.getParticipant().getNextHop(), - _state.getReplyPeer().getIdentity().getHash(), - _state.getInboundGateway().getTunnelId(), - _state.getInboundGateway().getGateway()); + } else { + MessageHistory hist = Request.this._context.messageHistory(); + hist.requestTunnelCreate(_tunnelGateway.getTunnelId(), + _state.getOutboundTunnel(), + _state.getParticipant().getThisHop(), + _state.getParticipant().getNextHop(), + _state.getReplyPeer().getIdentity().getHash(), + _state.getInboundGateway().getTunnelId(), + _state.getInboundGateway().getGateway()); + } } public String getName() { return "Request Tunnel (partial)"; } @@ -241,7 +247,9 @@ public class RequestTunnelJob extends JobImpl { private TunnelId _outboundTunnel; private RouterInfo _target; - public RequestState(SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo, TunnelInfo participant, TunnelGateway inboundGateway, RouterInfo replyPeer, TunnelId outboundTunnel, RouterInfo target) { + public RequestState(SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo, + TunnelInfo participant, TunnelGateway inboundGateway, + RouterInfo replyPeer, TunnelId outboundTunnel, RouterInfo target) { _wrappedKey = wrappedKey; _wrappedTags = wrappedTags; _wrappedTo = wrappedTo; @@ -265,7 +273,9 @@ public class RequestTunnelJob extends JobImpl { _statusMsg = buildDeliveryStatusMessage(); return true; } else if (_garlicMessage == null) { - _garlicMessage = buildGarlicMessage(_createMsg, _statusMsg, _replyPeer, _inboundGateway, _target, _wrappedKey, _wrappedTags, _wrappedTo); + _garlicMessage = buildGarlicMessage(_createMsg, _statusMsg, _replyPeer, + _inboundGateway, _target, _wrappedKey, + _wrappedTags, _wrappedTo); return true; } else { // send the GarlicMessage @@ -277,8 +287,10 @@ public class RequestTunnelJob extends JobImpl { ReplyJob onReply = new Success(_participant, _wrappedKey, _wrappedTags, _wrappedTo); Job onFail = new Failure(_participant, _replyPeer.getIdentity().getHash()); MessageSelector selector = new Selector(_participant, _statusMsg.getMessageId()); - SendTunnelMessageJob j = new SendTunnelMessageJob(_context, _garlicMessage, _outboundTunnel, _target.getIdentity().getHash(), - null, null, onReply, onFail, selector, _timeoutMs, PRIORITY); + SendTunnelMessageJob j = new SendTunnelMessageJob(_context, _garlicMessage, + _outboundTunnel, _target.getIdentity().getHash(), + null, null, onReply, onFail, + selector, _timeoutMs, PRIORITY); _context.jobQueue().addJob(j); return false; } @@ -368,7 +380,8 @@ public class RequestTunnelJob extends JobImpl { List ids = _context.tunnelManager().selectInboundTunnelIds(criteria); if (ids.size() <= 0) { if (_log.shouldLog(Log.ERROR)) - _log.error("No inbound tunnels to receive the tunnel create messages. Argh", new Exception("Tunnels suck. whats up?")); + _log.error("No inbound tunnels to receive the tunnel create messages. Argh", + new Exception("Tunnels suck. whats up?")); return null; } else { TunnelInfo gateway = null; @@ -464,7 +477,7 @@ public class RequestTunnelJob extends JobImpl { Certificate replyCert = new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null); - long expiration = _expiration; + long expiration = _context.clock().now() + _timeoutMs; // _expiration; if (_log.shouldLog(Log.DEBUG)) _log.debug("Setting the expiration on the reply block to " + (new Date(expiration))); SourceRouteBlock block = new SourceRouteBlock(); @@ -501,7 +514,7 @@ public class RequestTunnelJob extends JobImpl { DeliveryStatusMessage msg = new DeliveryStatusMessage(_context); msg.setArrival(new Date(_context.clock().now())); msg.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); - Date exp = new Date(_expiration); + Date exp = new Date(_context.clock().now() + _timeoutMs); // _expiration); if (_log.shouldLog(Log.DEBUG)) _log.debug("Setting the expiration on the delivery status message to " + exp); msg.setMessageExpiration(exp); @@ -515,7 +528,10 @@ public class RequestTunnelJob extends JobImpl { * to the replyPeer, where it is then sent down the replyTunnel to the local router. * */ - private GarlicMessage buildGarlicMessage(I2NPMessage data, I2NPMessage status, RouterInfo replyPeer, TunnelGateway replyTunnel, RouterInfo target, SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo) { + private GarlicMessage buildGarlicMessage(I2NPMessage data, I2NPMessage status, + RouterInfo replyPeer, TunnelGateway replyTunnel, + RouterInfo target, SessionKey wrappedKey, + Set wrappedTags, PublicKey wrappedTo) { GarlicConfig config = buildGarlicConfig(data, status, replyPeer, replyTunnel, target); PublicKey rcptKey = config.getRecipientPublicKey(); @@ -546,12 +562,15 @@ public class RequestTunnelJob extends JobImpl { return message; } - private GarlicConfig buildGarlicConfig(I2NPMessage data, I2NPMessage status, RouterInfo replyPeer, TunnelGateway replyTunnel, RouterInfo target) { + private GarlicConfig buildGarlicConfig(I2NPMessage data, I2NPMessage status, + RouterInfo replyPeer, TunnelGateway replyTunnel, + RouterInfo target) { GarlicConfig config = new GarlicConfig(); - PayloadGarlicConfig dataClove = buildDataClove(data, target, _expiration); + long garlicExpiration = _context.clock().now() + _timeoutMs; + PayloadGarlicConfig dataClove = buildDataClove(data, target, garlicExpiration); config.addClove(dataClove); - PayloadGarlicConfig ackClove = buildAckClove(status, replyPeer, replyTunnel, _expiration); + PayloadGarlicConfig ackClove = buildAckClove(status, replyPeer, replyTunnel, garlicExpiration); config.addClove(ackClove); DeliveryInstructions instructions = new DeliveryInstructions(); @@ -563,12 +582,13 @@ public class RequestTunnelJob extends JobImpl { instructions.setRouter(target.getIdentity().getHash()); instructions.setTunnelId(null); - _log.info("Setting the expiration on the garlic config to " + (new Date(_expiration))); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Setting the expiration on the garlic config to " + (new Date(garlicExpiration))); config.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null)); config.setDeliveryInstructions(instructions); config.setId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); - config.setExpiration(_expiration); + config.setExpiration(garlicExpiration); config.setRecipientPublicKey(target.getIdentity().getPublicKey()); config.setRequestAck(false); @@ -578,7 +598,8 @@ public class RequestTunnelJob extends JobImpl { /** * Build a clove that sends a DeliveryStatusMessage to us */ - private PayloadGarlicConfig buildAckClove(I2NPMessage ackMsg, RouterInfo replyPeer, TunnelGateway replyTunnel, long expiration) { + private PayloadGarlicConfig buildAckClove(I2NPMessage ackMsg, RouterInfo replyPeer, + TunnelGateway replyTunnel, long expiration) { PayloadGarlicConfig ackClove = new PayloadGarlicConfig(); Hash replyToTunnelRouter = replyTunnel.getGateway(); // inbound tunnel gateway @@ -631,10 +652,15 @@ public class RequestTunnelJob extends JobImpl { private void fail() { if (_complete) { if (_log.shouldLog(Log.WARN)) - _log.warn("Build tunnel failed via " + _tunnelGateway.getThisHop().toBase64() + ", but we've already completed, so fuck off: " + _tunnelGateway, new Exception("Fail aborted")); + _log.warn("Build tunnel failed via " + _tunnelGateway.getThisHop().toBase64() + + ", but we've already completed, so fuck off: " + _tunnelGateway, + new Exception("Fail aborted")); } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Build tunnel " + _tunnelGateway.getTunnelId().getTunnelId() + " with gateway " + _tunnelGateway.getThisHop().toBase64() + " FAILED: " + _failedTunnelParticipants + " - " + _tunnelGateway, new Exception("Why did we fail building?")); + _log.warn("Build tunnel " + _tunnelGateway.getTunnelId().getTunnelId() + + " with gateway " + _tunnelGateway.getThisHop().toBase64() + + " FAILED: " + _failedTunnelParticipants + " - " + _tunnelGateway, + new Exception("Why did we fail building?")); synchronized (_toBeRequested) { _toBeRequested.clear(); } @@ -651,7 +677,9 @@ public class RequestTunnelJob extends JobImpl { } if (numLeft <= 0) { if (_log.shouldLog(Log.INFO)) - _log.info("Peer (" + peer.getThisHop().toBase64() + ") successful: mark the tunnel as completely ready [inbound? " + _isInbound + "]"); + _log.info("Peer (" + peer.getThisHop().toBase64() + + ") successful: mark the tunnel as completely ready [inbound? " + + _isInbound + "]"); _complete = true; if (_isInbound) _pool.addFreeTunnel(_tunnelGateway); @@ -662,7 +690,8 @@ public class RequestTunnelJob extends JobImpl { } else { if (_log.shouldLog(Log.DEBUG)) { StringBuffer buf = new StringBuffer(128); - buf.append("Hop to ").append(peer.getThisHop().toBase64()).append(" successful for tunnel ").append(peer.getTunnelId().getTunnelId()); + buf.append("Hop to ").append(peer.getThisHop().toBase64()); + buf.append(" successful for tunnel ").append(peer.getTunnelId().getTunnelId()); buf.append(", but ").append(numLeft).append(" are pending"); _log.debug(buf.toString()); } @@ -720,7 +749,8 @@ public class RequestTunnelJob extends JobImpl { _log.debug("Running success status job (tunnel = " + _tunnel + " msg = " + message + ")"); if (message.getType() == DeliveryStatusMessage.MESSAGE_TYPE) { if (_log.shouldLog(Log.INFO)) - _log.info("Tunnel creation message acknowledged for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64()); + _log.info("Tunnel creation message acknowledged for tunnel " + _tunnel.getTunnelId() + + " at router " + _tunnel.getThisHop().toBase64()); } else { TunnelCreateStatusMessage msg = (TunnelCreateStatusMessage)message; if (_successCompleted) { @@ -733,20 +763,28 @@ public class RequestTunnelJob extends JobImpl { case TunnelCreateStatusMessage.STATUS_FAILED_DUPLICATE_ID: case TunnelCreateStatusMessage.STATUS_FAILED_OVERLOADED: if (_log.shouldLog(Log.WARN)) - _log.warn("Tunnel creation failed for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64() + " with status " + msg.getStatus()); + _log.warn("Tunnel creation failed for tunnel " + _tunnel.getTunnelId() + + " at router " + _tunnel.getThisHop().toBase64() + + " with status " + msg.getStatus()); _context.profileManager().tunnelRejected(_tunnel.getThisHop(), responseTime); - Success.this._context.messageHistory().tunnelRejected(_tunnel.getThisHop(), _tunnel.getTunnelId(), null, "refused"); + Success.this._context.messageHistory().tunnelRejected(_tunnel.getThisHop(), + _tunnel.getTunnelId(), + null, "refused"); fail(); _successCompleted = true; break; case TunnelCreateStatusMessage.STATUS_SUCCESS: if (_log.shouldLog(Log.DEBUG)) - _log.debug("Tunnel creation succeeded for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64()); + _log.debug("Tunnel creation succeeded for tunnel " + _tunnel.getTunnelId() + + " at router " + _tunnel.getThisHop().toBase64()); - if ( (_wrappedKey != null) && (_wrappedKey.getData() != null) && (_wrappedTags != null) && (_wrappedTags.size() > 0) && (_wrappedTo != null) ) { + if ( (_wrappedKey != null) && (_wrappedKey.getData() != null) && + (_wrappedTags != null) && (_wrappedTags.size() > 0) && + (_wrappedTo != null) ) { Success.this._context.sessionKeyManager().tagsDelivered(_wrappedTo, _wrappedKey, _wrappedTags); if (_log.shouldLog(Log.INFO)) - _log.info("Delivered tags successfully to " + _tunnel.getThisHop().toBase64() + "! # tags: " + _wrappedTags.size()); + _log.info("Delivered tags successfully to " + _tunnel.getThisHop().toBase64() + + "! # tags: " + _wrappedTags.size()); } _tunnel.setIsReady(true); @@ -762,7 +800,8 @@ public class RequestTunnelJob extends JobImpl { synchronized (_messages) { _messages.add(message); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Reply message " + _messages.size() + " received " + message.getClass().getName(), new Exception("Received from")); + _log.debug("Reply message " + _messages.size() + " received " + + message.getClass().getName(), new Exception("Received from")); } } } @@ -771,13 +810,11 @@ public class RequestTunnelJob extends JobImpl { private TunnelInfo _tunnel; private Hash _replyThrough; private long _started; - private Exception _by; public Failure(TunnelInfo tunnel, Hash replyThrough) { super(RequestTunnelJob.this._context); _tunnel = tunnel; _replyThrough = replyThrough; _started = _context.clock().now(); - _by = new Exception("Failure created"); } public String getName() { return "Create Tunnel Failed"; } @@ -787,7 +824,7 @@ public class RequestTunnelJob extends JobImpl { _log.error("Tunnel creation timed out for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64() + " from router " + _context.routerHash().toBase64() + " after waiting " - + (_context.clock().now()-_started) + "ms", _by); + + (_context.clock().now()-_started) + "ms"); _log.error("Added by", Failure.this.getAddedBy()); } synchronized (_failedTunnelParticipants) { @@ -808,21 +845,25 @@ public class RequestTunnelJob extends JobImpl { private long _ackId; private boolean _statusFound; private boolean _ackFound; + private long _attemptExpiration; public Selector(TunnelInfo tunnel, long ackId) { _tunnel = tunnel; _ackId = ackId; _statusFound = false; _ackFound = false; + _attemptExpiration = _context.clock().now() + _timeoutMs; } public boolean continueMatching() { if (_log.shouldLog(Log.DEBUG)) - _log.debug("ContinueMatching looking for tunnel " + _tunnel.getTunnelId().getTunnelId() + " from " + _tunnel.getThisHop().toBase64() + ": found? " + _statusFound + " ackFound? " + _ackFound); + _log.debug("ContinueMatching looking for tunnel " + _tunnel.getTunnelId().getTunnelId() + + " from " + _tunnel.getThisHop().toBase64() + ": found? " + _statusFound + + " ackFound? " + _ackFound); return !_statusFound || !_ackFound; //return !_statusFound; // who cares about the ack if we get the status OK? } - public long getExpiration() { return _expiration; } + public long getExpiration() { return _attemptExpiration; } public boolean isMatch(I2NPMessage message) { if (message.getType() == TunnelCreateStatusMessage.MESSAGE_TYPE) { TunnelCreateStatusMessage msg = (TunnelCreateStatusMessage)message; @@ -835,13 +876,17 @@ public class RequestTunnelJob extends JobImpl { } else { // hmm another tunnel through the peer... if (_log.shouldLog(Log.DEBUG)) - _log.debug("Status message from peer [" + msg.getFromHash().toBase64() + "], with wrong tunnelId [" + msg.getTunnelId() + "] not [" + _tunnel.getTunnelId().getTunnelId() + "]"); + _log.debug("Status message from peer [" + msg.getFromHash().toBase64() + + "], with wrong tunnelId [" + msg.getTunnelId() + + "] not [" + _tunnel.getTunnelId().getTunnelId() + "]"); return false; } } else { // status message but from the wrong peer if (_log.shouldLog(Log.DEBUG)) - _log.debug("Status message from the wrong peer [" + msg.getFromHash().toBase64() + "], not [" + _tunnel.getThisHop().toBase64() + "]"); + _log.debug("Status message from the wrong peer [" + + msg.getFromHash().toBase64() + "], not [" + + _tunnel.getThisHop().toBase64() + "]"); return false; } } else if (message.getType() == DeliveryStatusMessage.MESSAGE_TYPE) { @@ -856,11 +901,17 @@ public class RequestTunnelJob extends JobImpl { return false; } } else { - //_log.debug("Message " + message.getClass().getName() + " is not a delivery status or tunnel create status message [waiting for ok for tunnel " + _tunnel.getTunnelId() + " so we can fire " + _onCreated + "]"); + //_log.debug("Message " + message.getClass().getName() + // + " is not a delivery status or tunnel create status message [waiting for ok for tunnel " + // + _tunnel.getTunnelId() + " so we can fire " + _onCreated + "]"); return false; } } - public String toString() { return "Build Tunnel Job Selector for tunnel " + _tunnel.getTunnelId().getTunnelId() + " at " + _tunnel.getThisHop().toBase64() + " [found=" + _statusFound + ", ack=" + _ackFound + "] (@" + (new Date(getExpiration())) + ")"; } + public String toString() { + return "Build Tunnel Job Selector for tunnel " + _tunnel.getTunnelId().getTunnelId() + + " at " + _tunnel.getThisHop().toBase64() + " [found=" + _statusFound + ", ack=" + + _ackFound + "] (@" + (new Date(getExpiration())) + ")"; + } } } -- GitLab