I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit e9b7ca36 authored by jrandom's avatar jrandom Committed by zzz
Browse files

dont accept outrageously long delays when building a tunnel (aka now each peer...

dont accept outrageously long delays when building a tunnel (aka now each peer only gets the timeout to respond, instead of the full # peers * timeout to respond)
this will cause more dropped messages to show up, but in turn it will avoid slower peers (since they'll be marked down as rejecting the tunnel)
parent 1b03e9a3
No related branches found
No related tags found
No related merge requests found
......@@ -33,6 +33,7 @@ import net.i2p.data.i2np.TunnelCreateMessage;
import net.i2p.data.i2np.TunnelCreateStatusMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.MessageHistory;
import net.i2p.router.MessageSelector;
import net.i2p.router.PeerSelectionCriteria;
import net.i2p.router.ReplyJob;
......@@ -93,10 +94,6 @@ public class RequestTunnelJob extends JobImpl {
_log.info("Requesting outbound tunnel " + _tunnelGateway.getTunnelId() + " with " + participants.size() + " participants in it");
}
// since we request serially, we need to up the timeout serially
// change this once we go parallel
_timeoutMs *= participants.size()+1;
// work backwards (end point, then the router pointing at the endpoint, then the router pointing at that, etc, until the gateway
_toBeRequested = new ArrayList(participants.size());
for (int i = participants.size()-1; i >= 0; i--) {
......@@ -108,11 +105,15 @@ public class RequestTunnelJob extends JobImpl {
_log.warn("ok who the fuck requested someone we don't know about? (dont answer that");
}
}
// since we request serially, we need to up the timeout serially
// change this once we go parallel
//_timeoutMs *= participants.size()+1;
_expiration = (_timeoutMs * _toBeRequested.size()) + _context.clock().now();
}
public String getName() { return "Request Tunnel"; }
public void runJob() {
if (_expiration < 0) _expiration = _timeoutMs + _context.clock().now();
if (_context.clock().now() > _expiration) {
if (_log.shouldLog(Log.WARN))
_log.warn("Timeout reached building tunnel (timeout = " + _timeoutMs + " expiration = " + new Date(_expiration) + ")");
......@@ -162,7 +163,8 @@ public class RequestTunnelJob extends JobImpl {
return;
}
// select reply peer [peer to which SourceRouteReply should be sent, and from which the reply will be forwarded to an inbound tunnel]
// select reply peer [peer to which SourceRouteReply should be sent, and
// from which the reply will be forwarded to an inbound tunnel]
RouterInfo replyPeer = selectReplyPeer(participant);
if (replyPeer == null) {
if (_log.shouldLog(Log.WARN))
......@@ -184,7 +186,9 @@ public class RequestTunnelJob extends JobImpl {
Set wrappedTags = new HashSet(64);
PublicKey wrappedTo = new PublicKey();
RequestState state = new RequestState(wrappedKey, wrappedTags, wrappedTo, participant, inboundGateway, replyPeer, outboundTunnel, target);
RequestState state = new RequestState(wrappedKey, wrappedTags, wrappedTo,
participant, inboundGateway, replyPeer,
outboundTunnel, target);
Request r = new Request(state);
_context.jobQueue().addJob(r);
}
......@@ -205,16 +209,18 @@ public class RequestTunnelJob extends JobImpl {
public void runJob() {
boolean needsMore = _state.doNext();
if (needsMore)
if (needsMore) {
requeue(0);
else
Request.this._context.messageHistory().requestTunnelCreate(_tunnelGateway.getTunnelId(),
_state.getOutboundTunnel(),
_state.getParticipant().getThisHop(),
_state.getParticipant().getNextHop(),
_state.getReplyPeer().getIdentity().getHash(),
_state.getInboundGateway().getTunnelId(),
_state.getInboundGateway().getGateway());
} else {
MessageHistory hist = Request.this._context.messageHistory();
hist.requestTunnelCreate(_tunnelGateway.getTunnelId(),
_state.getOutboundTunnel(),
_state.getParticipant().getThisHop(),
_state.getParticipant().getNextHop(),
_state.getReplyPeer().getIdentity().getHash(),
_state.getInboundGateway().getTunnelId(),
_state.getInboundGateway().getGateway());
}
}
public String getName() { return "Request Tunnel (partial)"; }
......@@ -241,7 +247,9 @@ public class RequestTunnelJob extends JobImpl {
private TunnelId _outboundTunnel;
private RouterInfo _target;
public RequestState(SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo, TunnelInfo participant, TunnelGateway inboundGateway, RouterInfo replyPeer, TunnelId outboundTunnel, RouterInfo target) {
public RequestState(SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo,
TunnelInfo participant, TunnelGateway inboundGateway,
RouterInfo replyPeer, TunnelId outboundTunnel, RouterInfo target) {
_wrappedKey = wrappedKey;
_wrappedTags = wrappedTags;
_wrappedTo = wrappedTo;
......@@ -265,7 +273,9 @@ public class RequestTunnelJob extends JobImpl {
_statusMsg = buildDeliveryStatusMessage();
return true;
} else if (_garlicMessage == null) {
_garlicMessage = buildGarlicMessage(_createMsg, _statusMsg, _replyPeer, _inboundGateway, _target, _wrappedKey, _wrappedTags, _wrappedTo);
_garlicMessage = buildGarlicMessage(_createMsg, _statusMsg, _replyPeer,
_inboundGateway, _target, _wrappedKey,
_wrappedTags, _wrappedTo);
return true;
} else {
// send the GarlicMessage
......@@ -277,8 +287,10 @@ public class RequestTunnelJob extends JobImpl {
ReplyJob onReply = new Success(_participant, _wrappedKey, _wrappedTags, _wrappedTo);
Job onFail = new Failure(_participant, _replyPeer.getIdentity().getHash());
MessageSelector selector = new Selector(_participant, _statusMsg.getMessageId());
SendTunnelMessageJob j = new SendTunnelMessageJob(_context, _garlicMessage, _outboundTunnel, _target.getIdentity().getHash(),
null, null, onReply, onFail, selector, _timeoutMs, PRIORITY);
SendTunnelMessageJob j = new SendTunnelMessageJob(_context, _garlicMessage,
_outboundTunnel, _target.getIdentity().getHash(),
null, null, onReply, onFail,
selector, _timeoutMs, PRIORITY);
_context.jobQueue().addJob(j);
return false;
}
......@@ -368,7 +380,8 @@ public class RequestTunnelJob extends JobImpl {
List ids = _context.tunnelManager().selectInboundTunnelIds(criteria);
if (ids.size() <= 0) {
if (_log.shouldLog(Log.ERROR))
_log.error("No inbound tunnels to receive the tunnel create messages. Argh", new Exception("Tunnels suck. whats up?"));
_log.error("No inbound tunnels to receive the tunnel create messages. Argh",
new Exception("Tunnels suck. whats up?"));
return null;
} else {
TunnelInfo gateway = null;
......@@ -464,7 +477,7 @@ public class RequestTunnelJob extends JobImpl {
Certificate replyCert = new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null);
long expiration = _expiration;
long expiration = _context.clock().now() + _timeoutMs; // _expiration;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting the expiration on the reply block to " + (new Date(expiration)));
SourceRouteBlock block = new SourceRouteBlock();
......@@ -501,7 +514,7 @@ public class RequestTunnelJob extends JobImpl {
DeliveryStatusMessage msg = new DeliveryStatusMessage(_context);
msg.setArrival(new Date(_context.clock().now()));
msg.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
Date exp = new Date(_expiration);
Date exp = new Date(_context.clock().now() + _timeoutMs); // _expiration);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting the expiration on the delivery status message to " + exp);
msg.setMessageExpiration(exp);
......@@ -515,7 +528,10 @@ public class RequestTunnelJob extends JobImpl {
* to the replyPeer, where it is then sent down the replyTunnel to the local router.
*
*/
private GarlicMessage buildGarlicMessage(I2NPMessage data, I2NPMessage status, RouterInfo replyPeer, TunnelGateway replyTunnel, RouterInfo target, SessionKey wrappedKey, Set wrappedTags, PublicKey wrappedTo) {
private GarlicMessage buildGarlicMessage(I2NPMessage data, I2NPMessage status,
RouterInfo replyPeer, TunnelGateway replyTunnel,
RouterInfo target, SessionKey wrappedKey,
Set wrappedTags, PublicKey wrappedTo) {
GarlicConfig config = buildGarlicConfig(data, status, replyPeer, replyTunnel, target);
PublicKey rcptKey = config.getRecipientPublicKey();
......@@ -546,12 +562,15 @@ public class RequestTunnelJob extends JobImpl {
return message;
}
private GarlicConfig buildGarlicConfig(I2NPMessage data, I2NPMessage status, RouterInfo replyPeer, TunnelGateway replyTunnel, RouterInfo target) {
private GarlicConfig buildGarlicConfig(I2NPMessage data, I2NPMessage status,
RouterInfo replyPeer, TunnelGateway replyTunnel,
RouterInfo target) {
GarlicConfig config = new GarlicConfig();
PayloadGarlicConfig dataClove = buildDataClove(data, target, _expiration);
long garlicExpiration = _context.clock().now() + _timeoutMs;
PayloadGarlicConfig dataClove = buildDataClove(data, target, garlicExpiration);
config.addClove(dataClove);
PayloadGarlicConfig ackClove = buildAckClove(status, replyPeer, replyTunnel, _expiration);
PayloadGarlicConfig ackClove = buildAckClove(status, replyPeer, replyTunnel, garlicExpiration);
config.addClove(ackClove);
DeliveryInstructions instructions = new DeliveryInstructions();
......@@ -563,12 +582,13 @@ public class RequestTunnelJob extends JobImpl {
instructions.setRouter(target.getIdentity().getHash());
instructions.setTunnelId(null);
_log.info("Setting the expiration on the garlic config to " + (new Date(_expiration)));
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting the expiration on the garlic config to " + (new Date(garlicExpiration)));
config.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
config.setDeliveryInstructions(instructions);
config.setId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
config.setExpiration(_expiration);
config.setExpiration(garlicExpiration);
config.setRecipientPublicKey(target.getIdentity().getPublicKey());
config.setRequestAck(false);
......@@ -578,7 +598,8 @@ public class RequestTunnelJob extends JobImpl {
/**
* Build a clove that sends a DeliveryStatusMessage to us
*/
private PayloadGarlicConfig buildAckClove(I2NPMessage ackMsg, RouterInfo replyPeer, TunnelGateway replyTunnel, long expiration) {
private PayloadGarlicConfig buildAckClove(I2NPMessage ackMsg, RouterInfo replyPeer,
TunnelGateway replyTunnel, long expiration) {
PayloadGarlicConfig ackClove = new PayloadGarlicConfig();
Hash replyToTunnelRouter = replyTunnel.getGateway(); // inbound tunnel gateway
......@@ -631,10 +652,15 @@ public class RequestTunnelJob extends JobImpl {
private void fail() {
if (_complete) {
if (_log.shouldLog(Log.WARN))
_log.warn("Build tunnel failed via " + _tunnelGateway.getThisHop().toBase64() + ", but we've already completed, so fuck off: " + _tunnelGateway, new Exception("Fail aborted"));
_log.warn("Build tunnel failed via " + _tunnelGateway.getThisHop().toBase64()
+ ", but we've already completed, so fuck off: " + _tunnelGateway,
new Exception("Fail aborted"));
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Build tunnel " + _tunnelGateway.getTunnelId().getTunnelId() + " with gateway " + _tunnelGateway.getThisHop().toBase64() + " FAILED: " + _failedTunnelParticipants + " - " + _tunnelGateway, new Exception("Why did we fail building?"));
_log.warn("Build tunnel " + _tunnelGateway.getTunnelId().getTunnelId()
+ " with gateway " + _tunnelGateway.getThisHop().toBase64()
+ " FAILED: " + _failedTunnelParticipants + " - " + _tunnelGateway,
new Exception("Why did we fail building?"));
synchronized (_toBeRequested) {
_toBeRequested.clear();
}
......@@ -651,7 +677,9 @@ public class RequestTunnelJob extends JobImpl {
}
if (numLeft <= 0) {
if (_log.shouldLog(Log.INFO))
_log.info("Peer (" + peer.getThisHop().toBase64() + ") successful: mark the tunnel as completely ready [inbound? " + _isInbound + "]");
_log.info("Peer (" + peer.getThisHop().toBase64()
+ ") successful: mark the tunnel as completely ready [inbound? "
+ _isInbound + "]");
_complete = true;
if (_isInbound)
_pool.addFreeTunnel(_tunnelGateway);
......@@ -662,7 +690,8 @@ public class RequestTunnelJob extends JobImpl {
} else {
if (_log.shouldLog(Log.DEBUG)) {
StringBuffer buf = new StringBuffer(128);
buf.append("Hop to ").append(peer.getThisHop().toBase64()).append(" successful for tunnel ").append(peer.getTunnelId().getTunnelId());
buf.append("Hop to ").append(peer.getThisHop().toBase64());
buf.append(" successful for tunnel ").append(peer.getTunnelId().getTunnelId());
buf.append(", but ").append(numLeft).append(" are pending");
_log.debug(buf.toString());
}
......@@ -720,7 +749,8 @@ public class RequestTunnelJob extends JobImpl {
_log.debug("Running success status job (tunnel = " + _tunnel + " msg = " + message + ")");
if (message.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
if (_log.shouldLog(Log.INFO))
_log.info("Tunnel creation message acknowledged for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64());
_log.info("Tunnel creation message acknowledged for tunnel " + _tunnel.getTunnelId()
+ " at router " + _tunnel.getThisHop().toBase64());
} else {
TunnelCreateStatusMessage msg = (TunnelCreateStatusMessage)message;
if (_successCompleted) {
......@@ -733,20 +763,28 @@ public class RequestTunnelJob extends JobImpl {
case TunnelCreateStatusMessage.STATUS_FAILED_DUPLICATE_ID:
case TunnelCreateStatusMessage.STATUS_FAILED_OVERLOADED:
if (_log.shouldLog(Log.WARN))
_log.warn("Tunnel creation failed for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64() + " with status " + msg.getStatus());
_log.warn("Tunnel creation failed for tunnel " + _tunnel.getTunnelId()
+ " at router " + _tunnel.getThisHop().toBase64()
+ " with status " + msg.getStatus());
_context.profileManager().tunnelRejected(_tunnel.getThisHop(), responseTime);
Success.this._context.messageHistory().tunnelRejected(_tunnel.getThisHop(), _tunnel.getTunnelId(), null, "refused");
Success.this._context.messageHistory().tunnelRejected(_tunnel.getThisHop(),
_tunnel.getTunnelId(),
null, "refused");
fail();
_successCompleted = true;
break;
case TunnelCreateStatusMessage.STATUS_SUCCESS:
if (_log.shouldLog(Log.DEBUG))
_log.debug("Tunnel creation succeeded for tunnel " + _tunnel.getTunnelId() + " at router " + _tunnel.getThisHop().toBase64());
_log.debug("Tunnel creation succeeded for tunnel " + _tunnel.getTunnelId()
+ " at router " + _tunnel.getThisHop().toBase64());
if ( (_wrappedKey != null) && (_wrappedKey.getData() != null) && (_wrappedTags != null) && (_wrappedTags.size() > 0) && (_wrappedTo != null) ) {
if ( (_wrappedKey != null) && (_wrappedKey.getData() != null) &&
(_wrappedTags != null) && (_wrappedTags.size() > 0) &&
(_wrappedTo != null) ) {
Success.this._context.sessionKeyManager().tagsDelivered(_wrappedTo, _wrappedKey, _wrappedTags);
if (_log.shouldLog(Log.INFO))
_log.info("Delivered tags successfully to " + _tunnel.getThisHop().toBase64() + "! # tags: " + _wrappedTags.size());
_log.info("Delivered tags successfully to " + _tunnel.getThisHop().toBase64()
+ "! # tags: " + _wrappedTags.size());
}
_tunnel.setIsReady(true);
......@@ -762,7 +800,8 @@ public class RequestTunnelJob extends JobImpl {
synchronized (_messages) {
_messages.add(message);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reply message " + _messages.size() + " received " + message.getClass().getName(), new Exception("Received from"));
_log.debug("Reply message " + _messages.size() + " received "
+ message.getClass().getName(), new Exception("Received from"));
}
}
}
......@@ -771,13 +810,11 @@ public class RequestTunnelJob extends JobImpl {
private TunnelInfo _tunnel;
private Hash _replyThrough;
private long _started;
private Exception _by;
public Failure(TunnelInfo tunnel, Hash replyThrough) {
super(RequestTunnelJob.this._context);
_tunnel = tunnel;
_replyThrough = replyThrough;
_started = _context.clock().now();
_by = new Exception("Failure created");
}
public String getName() { return "Create Tunnel Failed"; }
......@@ -787,7 +824,7 @@ public class RequestTunnelJob extends JobImpl {
_log.error("Tunnel creation timed out for tunnel " + _tunnel.getTunnelId() + " at router "
+ _tunnel.getThisHop().toBase64() + " from router "
+ _context.routerHash().toBase64() + " after waiting "
+ (_context.clock().now()-_started) + "ms", _by);
+ (_context.clock().now()-_started) + "ms");
_log.error("Added by", Failure.this.getAddedBy());
}
synchronized (_failedTunnelParticipants) {
......@@ -808,21 +845,25 @@ public class RequestTunnelJob extends JobImpl {
private long _ackId;
private boolean _statusFound;
private boolean _ackFound;
private long _attemptExpiration;
public Selector(TunnelInfo tunnel, long ackId) {
_tunnel = tunnel;
_ackId = ackId;
_statusFound = false;
_ackFound = false;
_attemptExpiration = _context.clock().now() + _timeoutMs;
}
public boolean continueMatching() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("ContinueMatching looking for tunnel " + _tunnel.getTunnelId().getTunnelId() + " from " + _tunnel.getThisHop().toBase64() + ": found? " + _statusFound + " ackFound? " + _ackFound);
_log.debug("ContinueMatching looking for tunnel " + _tunnel.getTunnelId().getTunnelId()
+ " from " + _tunnel.getThisHop().toBase64() + ": found? " + _statusFound
+ " ackFound? " + _ackFound);
return !_statusFound || !_ackFound;
//return !_statusFound; // who cares about the ack if we get the status OK?
}
public long getExpiration() { return _expiration; }
public long getExpiration() { return _attemptExpiration; }
public boolean isMatch(I2NPMessage message) {
if (message.getType() == TunnelCreateStatusMessage.MESSAGE_TYPE) {
TunnelCreateStatusMessage msg = (TunnelCreateStatusMessage)message;
......@@ -835,13 +876,17 @@ public class RequestTunnelJob extends JobImpl {
} else {
// hmm another tunnel through the peer...
if (_log.shouldLog(Log.DEBUG))
_log.debug("Status message from peer [" + msg.getFromHash().toBase64() + "], with wrong tunnelId [" + msg.getTunnelId() + "] not [" + _tunnel.getTunnelId().getTunnelId() + "]");
_log.debug("Status message from peer [" + msg.getFromHash().toBase64()
+ "], with wrong tunnelId [" + msg.getTunnelId()
+ "] not [" + _tunnel.getTunnelId().getTunnelId() + "]");
return false;
}
} else {
// status message but from the wrong peer
if (_log.shouldLog(Log.DEBUG))
_log.debug("Status message from the wrong peer [" + msg.getFromHash().toBase64() + "], not [" + _tunnel.getThisHop().toBase64() + "]");
_log.debug("Status message from the wrong peer ["
+ msg.getFromHash().toBase64() + "], not ["
+ _tunnel.getThisHop().toBase64() + "]");
return false;
}
} else if (message.getType() == DeliveryStatusMessage.MESSAGE_TYPE) {
......@@ -856,11 +901,17 @@ public class RequestTunnelJob extends JobImpl {
return false;
}
} else {
//_log.debug("Message " + message.getClass().getName() + " is not a delivery status or tunnel create status message [waiting for ok for tunnel " + _tunnel.getTunnelId() + " so we can fire " + _onCreated + "]");
//_log.debug("Message " + message.getClass().getName()
// + " is not a delivery status or tunnel create status message [waiting for ok for tunnel "
// + _tunnel.getTunnelId() + " so we can fire " + _onCreated + "]");
return false;
}
}
public String toString() { return "Build Tunnel Job Selector for tunnel " + _tunnel.getTunnelId().getTunnelId() + " at " + _tunnel.getThisHop().toBase64() + " [found=" + _statusFound + ", ack=" + _ackFound + "] (@" + (new Date(getExpiration())) + ")"; }
public String toString() {
return "Build Tunnel Job Selector for tunnel " + _tunnel.getTunnelId().getTunnelId()
+ " at " + _tunnel.getThisHop().toBase64() + " [found=" + _statusFound + ", ack="
+ _ackFound + "] (@" + (new Date(getExpiration())) + ")";
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment