From 16a46b321107ed533ab89b4b9a814520bce4c713 Mon Sep 17 00:00:00 2001 From: zzz Date: Mon, 13 Aug 2012 15:12:33 +0000 Subject: [PATCH] * SSU EstablishmentManager: - Remove use of outbound timers in EstablishmentManager; drive all events in Establisher thread - Don't change nonces when retransmitting intro packets - More synchronization in EstablishmentManager - Increase establishment timeouts and implement timeouts for individual phases (ticket #669) - Fix bug where InboundEstablishState.createdPacketSent() wasn't being called, so SessionCreated packets weren't retransmitted - Increase retransmission timeout for SessionCreated and implement backoff - Send destroy if establishment times out in the middle - Fix code that pulls outbound states off a deferred queue - Improve UDPPacket.toString() for debugging - More logging of packets dropped in EstablishmentManager - Change establish states to enums --- history.txt | 15 + .../src/net/i2p/router/RouterVersion.java | 2 +- .../transport/udp/EstablishmentManager.java | 504 +++++++++++------- .../transport/udp/InboundEstablishState.java | 95 ++-- .../transport/udp/OutboundEstablishState.java | 156 ++++-- .../transport/udp/OutboundMessageState.java | 6 + .../router/transport/udp/PacketBuilder.java | 90 +++- .../router/transport/udp/PacketHandler.java | 8 +- .../i2p/router/transport/udp/UDPPacket.java | 20 +- 9 files changed, 591 insertions(+), 305 deletions(-) diff --git a/history.txt b/history.txt index 9cb7416cdf..55ab9e5902 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,18 @@ +2012-08-13 zzz + * SSU EstablishmentManager: + - Remove use of outbound timers in EstablishmentManager; drive all events in Establisher thread + - Don't change nonces when retransmitting intro packets + - More synchronization in EstablishmentManager + - Increase establishment timeouts and implement timeouts for individual phases (ticket #669) + - Fix bug where InboundEstablishState.createdPacketSent() wasn't being called, + so SessionCreated packets weren't retransmitted + - Increase retransmission timeout for SessionCreated and implement backoff + - Send destroy if establishment times out in the middle + - Fix code that pulls outbound states off a deferred queue + - Improve UDPPacket.toString() for debugging + - More logging of packets dropped in EstablishmentManager + - Change establish states to enums + 2012-08-12 zzz * Jetty: Don't use direct byte buffers that may be leaking (ticket #679) * PeerManager: Fix NPE on Android (ticket #687) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index fecba78d69..a6204817e3 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 7; + public final static long BUILD = 8; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index fb951927a6..330efbe2df 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -5,6 +5,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.Base64; @@ -20,11 +21,11 @@ import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.router.transport.crypto.DHSessionKeyBuilder; +import static net.i2p.router.transport.udp.InboundEstablishState.InboundState.*; +import static net.i2p.router.transport.udp.OutboundEstablishState.OutboundState.*; import net.i2p.util.Addresses; import net.i2p.util.I2PThread; import net.i2p.util.Log; -import net.i2p.util.SimpleScheduler; -import net.i2p.util.SimpleTimer; /** * Coordinate the establishment of new sessions - both inbound and outbound. @@ -49,8 +50,8 @@ class EstablishmentManager { private final Object _activityLock; private int _activity; - /** max outbound in progress */ - private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 20; + /** max outbound in progress - max inbound is half of this */ + private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 30; private static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish"; /** max pending outbound connections (waiting because we are at MAX_CONCURRENT_ESTABLISH) */ @@ -58,7 +59,31 @@ class EstablishmentManager { /** max queued msgs per peer while the peer connection is queued */ private static final int MAX_QUEUED_PER_PEER = 3; - + + private static final long MAX_NONCE = 0xFFFFFFFFl; + + /** + * Kill any outbound that takes more than this. + * Two round trips (Req-Created-Confirmed-Data) for direct; + * 3 1/2 round trips (RReq-RResp+Intro-HolePunch-Req-Created-Confirmed-Data) for indirect. + * Note that this is way too long for us to be able to fall back to NTCP + * for individual messages unless the message timer fires first. + * But SSU probably isn't higher priority than NTCP. + * And it's important to not fail an establishment too soon and waste it. + */ + private static final int MAX_OB_ESTABLISH_TIME = 35*1000; + + /** + * Kill any inbound that takes more than this + * One round trip (Created-Confirmed) + */ + private static final int MAX_IB_ESTABLISH_TIME = 20*1000; + + /** max before receiving a response to a single message during outbound establishment */ + private static final int OB_MESSAGE_TIMEOUT = 15*1000; + + /** for the DSM and or netdb store */ + private static final int DATA_MESSAGE_TIMEOUT = 10*1000; public EstablishmentManager(RouterContext ctx, UDPTransport transport) { _context = ctx; @@ -72,8 +97,8 @@ class EstablishmentManager { _activityLock = new Object(); _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendIntroRelayRequest", "How often we send a relay request to reach a peer", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.sendIntroRelayTimeout", "How often a relay request times out before getting a response (due to the target or intro peer being offline)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES); @@ -100,6 +125,7 @@ class EstablishmentManager { I2PThread t = new I2PThread(new Establisher(), "UDP Establisher", true); t.start(); } + public void shutdown() { _alive = false; notifyActivity(); @@ -107,6 +133,7 @@ class EstablishmentManager { /** * Grab the active establishing state + * @return null if none */ InboundEstablishState getInboundState(RemoteHostId from) { InboundEstablishState state = _inboundStates.get(from); @@ -115,6 +142,10 @@ class EstablishmentManager { return state; } + /** + * Grab the active establishing state + * @return null if none + */ OutboundEstablishState getOutboundState(RemoteHostId from) { OutboundEstablishState state = _outboundStates.get(from); // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) ) @@ -122,6 +153,9 @@ class EstablishmentManager { return state; } + /** + * How many concurrent outbound sessions to deal with + */ private int getMaxConcurrentEstablish() { return _context.getProperty(PROP_MAX_CONCURRENT_ESTABLISH, DEFAULT_MAX_CONCURRENT_ESTABLISH); } @@ -174,9 +208,11 @@ class EstablishmentManager { state = _outboundStates.get(to); if (state == null) { if (_outboundStates.size() >= getMaxConcurrentEstablish()) { - if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) { + if (_queuedOutbound.size() >= MAX_QUEUED_OUTBOUND) { rejected = true; } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Queueing outbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH); List newQueued = new ArrayList(MAX_QUEUED_PER_PEER); List queued = _queuedOutbound.putIfAbsent(to, newQueued); if (queued == null) @@ -210,7 +246,7 @@ class EstablishmentManager { _transport.failed(msg, "Peer has bad key, cannot establish"); return; } - state = new OutboundEstablishState(_context, remAddr, port, + state = new OutboundEstablishState(_context, remAddr, port, msg.getTarget().getIdentity(), sessionKey, addr, _transport.getDHBuilder()); OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state); @@ -218,8 +254,6 @@ class EstablishmentManager { if (!isNew) // whoops, somebody beat us to it, throw out the state we just created state = oldState; - else - _context.simpleScheduler().addEvent(new Expire(to, state), 10*1000); } } if (state != null) { @@ -249,33 +283,12 @@ class EstablishmentManager { } if (deferred > 0) - msg.timestamp("too many deferred establishers: " + deferred); + msg.timestamp("too many deferred establishers"); else if (state != null) - msg.timestamp("establish state already waiting " + state.getLifetime()); + msg.timestamp("establish state already waiting"); notifyActivity(); } - private class Expire implements SimpleTimer.TimedEvent { - private final RemoteHostId _to; - private final OutboundEstablishState _state; - - public Expire(RemoteHostId to, OutboundEstablishState state) { - _to = to; - _state = state; - } - - public void timeReached() { - // remove only if value == state - boolean removed = _outboundStates.remove(_to, _state); - if (removed) { - _context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Timing out expired outbound: " + _state); - processExpired(_state); - } - } - } - /** * How many concurrent inbound sessions to deal with */ @@ -288,22 +301,27 @@ class EstablishmentManager { * */ void receiveSessionRequest(RemoteHostId from, UDPPacketReader reader) { - if (!_transport.isValid(from.getIP())) + if (!_transport.isValid(from.getIP())) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Receive session request from invalid IP: " + from); return; + } int maxInbound = getMaxInboundEstablishers(); boolean isNew = false; - if (_inboundStates.size() >= maxInbound) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH); - _context.statManager().addRateData("udp.establishDropped", 1); - return; // drop the packet - } - InboundEstablishState state = _inboundStates.get(from); if (state == null) { + // TODO this is insufficient to prevent DoSing, especially if + // IP spoofing is used. For further study. + if (_inboundStates.size() >= maxInbound) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH); + _context.statManager().addRateData("udp.establishDropped", 1); + return; // drop the packet + } + if (_context.blocklist().isBlocklisted(from.getIP())) { if (_log.shouldLog(Log.WARN)) _log.warn("Receive session request from blocklisted IP: " + from); @@ -332,16 +350,16 @@ class EstablishmentManager { long tag = 1 + _context.random().nextLong(MAX_TAG_VALUE); state.setSentRelayTag(tag); if (_log.shouldLog(Log.INFO)) - _log.info("Received session request from " + from + ", sending relay tag " + tag); + _log.info("Received NEW session request from " + from + ", sending relay tag " + tag); } else { if (_log.shouldLog(Log.INFO)) _log.info("Received session request, but our status is " + _transport.getReachabilityStatus()); } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Receive DUP session request from: " + state.getRemoteHostId()); } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Receive session request from: " + state.getRemoteHostId().toString()); - notifyActivity(); } @@ -356,6 +374,9 @@ class EstablishmentManager { notifyActivity(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive session confirmed from: " + state.getRemoteHostId().toString()); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Receive (DUP?) session confirmed from: " + from); } } @@ -370,6 +391,9 @@ class EstablishmentManager { notifyActivity(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive session created from: " + state.getRemoteHostId().toString()); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Receive (DUP?) session created from: " + from); } } @@ -392,18 +416,19 @@ class EstablishmentManager { _log.debug("Receive session destroy (OB) from: " + from); _outboundStates.remove(from); Hash peer = state.getRemoteIdentity().calculateHash(); - _transport.dropPeer(peer, false, "received destroy message"); + _transport.dropPeer(peer, false, "received destroy message during OB establish"); } /** - * Got a SessionDestroy - maybe after an inbound establish? + * Got a SessionDestroy - maybe during an inbound establish? + * TODO - PacketHandler won't look up inbound establishes * As this packet was essentially unauthenticated (i.e. intro key, not session key) * we just log it as it could be spoofed. * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from) { if (_log.shouldLog(Log.WARN)) - _log.warn("Receive session destroy (IB) from: " + from); + _log.warn("Receive session destroy (none) from: " + from); //InboundEstablishState state = _inboundStates.remove(from); //if (state != null) { // Hash peer = state.getConfirmedIdentity().calculateHash(); @@ -436,7 +461,7 @@ class EstablishmentManager { } } - //admitted = locked_admitQueued(); + locked_admitQueued(); //remaining = _queuedOutbound.size(); //if (admitted > 0) @@ -449,24 +474,43 @@ class EstablishmentManager { return peer; } -/******** + /** + * Move pending OB messages from _queuedOutbound to _outboundStates. + * This isn't so great because _queuedOutbound is not a FIFO. + */ private int locked_admitQueued() { + if (_queuedOutbound.isEmpty()) + return 0; int admitted = 0; - while ( (!_queuedOutbound.isEmpty()) && (_outboundStates.size() < getMaxConcurrentEstablish()) ) { - // ok, active shrunk, lets let some queued in. duplicate the synchronized - // section from the add( + int max = getMaxConcurrentEstablish(); + for (Iterator>> iter = _queuedOutbound.entrySet().iterator(); + iter.hasNext() && _outboundStates.size() < max; ) { + // ok, active shrunk, lets let some queued in. - RemoteHostId to = (RemoteHostId)_queuedOutbound.keySet().iterator().next(); - List queued = (List)_queuedOutbound.remove(to); + Map.Entry> entry = iter.next(); + iter.remove(); + RemoteHostId to = entry.getKey(); + List allQueued = entry.getValue(); + List queued = new ArrayList(); + long now = _context.clock().now(); + synchronized (allQueued) { + for (OutNetMessage msg : allQueued) { + if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { + _transport.failed(msg, "Took too long in est. mgr OB queue"); + } else { + queued.add(msg); + } + } + } if (queued.isEmpty()) continue; - OutNetMessage msg = (OutNetMessage)queued.get(0); + OutNetMessage msg = queued.get(0); RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle()); if (ra == null) { for (int i = 0; i < queued.size(); i++) - _transport.failed((OutNetMessage)queued.get(i), "Cannot admit to the queue, as it has no address"); + _transport.failed(queued.get(i), "Cannot admit to the queue, as it has no address"); continue; } UDPAddress addr = new UDPAddress(ra); @@ -475,12 +519,14 @@ class EstablishmentManager { OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port, msg.getTarget().getIdentity(), - new SessionKey(addr.getIntroKey()), addr); - _outboundStates.put(to, qstate); - _context.simpleScheduler().addEvent(new Expire(to, qstate), 10*1000); + new SessionKey(addr.getIntroKey()), addr, + _transport.getDHBuilder()); + OutboundEstablishState old = _outboundStates.putIfAbsent(to, qstate); + if (old != null) + qstate = old; for (int i = 0; i < queued.size(); i++) { - OutNetMessage m = (OutNetMessage)queued.get(i); + OutNetMessage m = queued.get(i); m.timestamp("no longer deferred... establishing"); qstate.addMessage(m); } @@ -488,7 +534,6 @@ class EstablishmentManager { } return admitted; } -*******/ private void notifyActivity() { synchronized (_activityLock) { @@ -497,9 +542,6 @@ class EstablishmentManager { } } - /** kill any inbound or outbound that takes more than 30s */ - private static final int MAX_ESTABLISH_TIME = 30*1000; - /** * ok, fully received, add it to the established cons and queue up a * netDb store to them @@ -553,30 +595,24 @@ class EstablishmentManager { dsm.setArrival(Router.NETWORK_ID); // overloaded, sure, but future versions can check this // This causes huge values in the inNetPool.droppedDeliveryStatusDelay stat // so it needs to be caught in InNetMessagePool. - dsm.setMessageExpiration(_context.clock().now()+10*1000); + dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); _transport.send(dsm, peer); - _context.simpleScheduler().addEvent(new PublishToNewInbound(peer), 0); - } - private class PublishToNewInbound implements SimpleTimer.TimedEvent { - private final PeerState _peer; + // just do this inline + //_context.simpleScheduler().addEvent(new PublishToNewInbound(peer), 0); - public PublishToNewInbound(PeerState peer) { _peer = peer; } - - public void timeReached() { - Hash peer = _peer.getRemotePeer(); - if ((peer != null) && (!_context.shitlist().isShitlisted(peer)) && (!_transport.isUnreachable(peer))) { + Hash hash = peer.getRemotePeer(); + if ((hash != null) && (!_context.shitlist().isShitlisted(hash)) && (!_transport.isUnreachable(hash))) { // ok, we are fine with them, send them our latest info - if (_log.shouldLog(Log.INFO)) - _log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer); - sendOurInfo(_peer, true); + //if (_log.shouldLog(Log.INFO)) + // _log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer); + sendOurInfo(peer, true); } else { // nuh uh. if (_log.shouldLog(Log.WARN)) - _log.warn("NOT publishing to the peer after confirm plus delay (WITH shitlist): " + (peer != null ? peer.toString() : "unknown")); + _log.warn("NOT publishing to the peer after confirm plus delay (WITH shitlist): " + (hash != null ? hash.toString() : "unknown")); } - } } /** @@ -612,19 +648,15 @@ class EstablishmentManager { _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0); sendOurInfo(peer, false); - int i = 0; - while (true) { - OutNetMessage msg = state.getNextQueuedMessage(); - if (msg == null) - break; + OutNetMessage msg; + while ((msg = state.getNextQueuedMessage()) != null) { if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { msg.timestamp("took too long but established..."); _transport.failed(msg, "Took too long to establish, but it was established"); } else { - msg.timestamp("session fully established and sent " + i); + msg.timestamp("session fully established and sent"); _transport.send(msg); } - i++; } return peer; } @@ -636,7 +668,7 @@ class EstablishmentManager { DatabaseStoreMessage m = new DatabaseStoreMessage(_context); m.setEntry(_context.router().getRouterInfo()); - m.setMessageExpiration(_context.clock().now() + 10*1000); + m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); _transport.send(m, peer); } @@ -678,11 +710,12 @@ class EstablishmentManager { return; } _transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey())); - // if they haven't advanced to sending us confirmed packets in 1s, - // repeat - state.setNextSendTime(now + 1000); + state.createdPacketSent(); } + /** + * Caller should probably synch on outboundState + */ private void sendRequest(OutboundEstablishState state) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Send SessionRequest to: " + state.getRemoteHostId()); @@ -696,22 +729,21 @@ class EstablishmentManager { state.requestSent(); } - private static final long MAX_NONCE = 0xFFFFFFFFl; - /** if we don't get a relayResponse in 3 seconds, try again */ - private static final int INTRO_ATTEMPT_TIMEOUT = 3*1000; - + /** + * Send RelayRequests to multiple introducers. + * This may be called multiple times, it sets the nonce the first time only + * Caller should probably synch on state. + */ private void handlePendingIntro(OutboundEstablishState state) { - long nonce = _context.random().nextLong(MAX_NONCE); - while (true) { - OutboundEstablishState old = _liveIntroductions.putIfAbsent(Long.valueOf(nonce), state); - if (old != null) { - nonce = _context.random().nextLong(MAX_NONCE); - } else { - break; - } + long nonce = state.getIntroNonce(); + if (nonce < 0) { + OutboundEstablishState old; + do { + nonce = _context.random().nextLong(MAX_NONCE); + old = _liveIntroductions.putIfAbsent(Long.valueOf(nonce), state); + } while (old != null); + state.setIntroNonce(nonce); } - _context.simpleScheduler().addEvent(new FailIntroduction(state, nonce), INTRO_ATTEMPT_TIMEOUT); - state.setIntroNonce(nonce); _context.statManager().addRateData("udp.sendIntroRelayRequest", 1, 0); UDPPacket requests[] = _builder.buildRelayRequest(_transport, state, _transport.getIntroKey()); for (int i = 0; i < requests.length; i++) { @@ -719,31 +751,10 @@ class EstablishmentManager { _transport.send(requests[i]); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send intro for " + state.getRemoteHostId().toString() + " with our intro key as " + _transport.getIntroKey()); + _log.debug("Send intro for " + state.getRemoteHostId() + " with our intro key as " + _transport.getIntroKey()); state.introSent(); } - private class FailIntroduction implements SimpleTimer.TimedEvent { - private final long _nonce; - private final OutboundEstablishState _state; - - public FailIntroduction(OutboundEstablishState state, long nonce) { - _nonce = nonce; - _state = state; - } - - public void timeReached() { - // remove only if value equal to state - boolean removed = _liveIntroductions.remove(Long.valueOf(_nonce), _state); - if (removed) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send intro for " + _state.getRemoteHostId().toString() + " timed out"); - _context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0); - notifyActivity(); - } - } - } - void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) { long nonce = reader.getRelayResponseReader().readNonce(); OutboundEstablishState state = _liveIntroductions.remove(Long.valueOf(nonce)); @@ -776,6 +787,12 @@ class EstablishmentManager { notifyActivity(); } + /** + * Note that while a SessionConfirmed could in theory be fragmented, + * in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max, + * so it will never be fragmented. + * Caller should probably synch on state. + */ private void sendConfirmation(OutboundEstablishState state) { boolean valid = state.validateSessionCreated(); if (!valid) // validate clears fields on failure @@ -796,7 +813,7 @@ class EstablishmentManager { UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state, _context.router().getRouterInfo().getIdentity()); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send confirm to: " + state.getRemoteHostId().toString()); + _log.debug("Send confirm to: " + state); for (int i = 0; i < packets.length; i++) _transport.send(packets[i]); @@ -804,6 +821,41 @@ class EstablishmentManager { state.confirmedPacketsSent(); } + /** + * Tell the other side never mind. + * This is only useful after we have received SessionCreated, + * and sent SessionConfirmed, but not yet gotten a data packet as an + * ack to the SessionConfirmed - otherwise we haven't generated the keys. + * Caller should probably synch on state. + * + * @since 0.9.2 + */ + private void sendDestroy(OutboundEstablishState state) { + UDPPacket packet = _builder.buildSessionDestroyPacket(state); + if (packet != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send destroy to: " + state); + _transport.send(packet); + } + } + + /** + * Tell the other side never mind. + * This is only useful after we have sent SessionCreated, + * but not received SessionConfirmed + * Otherwise we haven't generated the keys. + * Caller should probably synch on state. + * + * @since 0.9.2 + */ + private void sendDestroy(InboundEstablishState state) { + UDPPacket packet = _builder.buildSessionDestroyPacket(state); + if (packet != null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send destroy to: " + state); + _transport.send(packet); + } + } /** * Drive through the inbound establishment states, adjusting one of them @@ -814,28 +866,29 @@ class EstablishmentManager { long now = _context.clock().now(); long nextSendTime = -1; InboundEstablishState inboundState = null; + boolean expired = false; - //int active = _inboundStates.size(); - //if (active > 0 && _log.shouldLog(Log.DEBUG)) - // _log.debug("# inbound states: " + active); for (Iterator iter = _inboundStates.values().iterator(); iter.hasNext(); ) { InboundEstablishState cur = iter.next(); - if (cur.getState() == InboundEstablishState.STATE_CONFIRMED_COMPLETELY) { + if (cur.getState() == IB_STATE_CONFIRMED_COMPLETELY) { // completely received (though the signature may be invalid) iter.remove(); inboundState = cur; if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing completely confirmed inbound state"); break; - } else if (cur.getLifetime() > MAX_ESTABLISH_TIME) { + } else if (cur.getLifetime() > MAX_IB_ESTABLISH_TIME) { // took too long iter.remove(); - _context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime()); + inboundState = cur; + //_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing expired inbound state"); - } else if (cur.getState() == InboundEstablishState.STATE_FAILED) { + expired = true; + break; + } else if (cur.getState() == IB_STATE_FAILED) { iter.remove(); - _context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime()); + //_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime()); } else { if (cur.getNextSendTime() <= now) { // our turn... @@ -849,7 +902,7 @@ class EstablishmentManager { // established long when = -1; if (cur.getNextSendTime() <= 0) { - when = cur.getEstablishBeginTime() + MAX_ESTABLISH_TIME; + when = cur.getEstablishBeginTime() + MAX_IB_ESTABLISH_TIME; } else { when = cur.getNextSendTime(); } @@ -862,17 +915,23 @@ class EstablishmentManager { if (inboundState != null) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Processing for inbound: " + inboundState); - switch (inboundState.getState()) { - case InboundEstablishState.STATE_REQUEST_RECEIVED: - sendCreated(inboundState); - break; - case InboundEstablishState.STATE_CREATED_SENT: // fallthrough - case InboundEstablishState.STATE_CONFIRMED_PARTIALLY: - // if its been 5s since we sent the SessionCreated, resend - if (inboundState.getNextSendTime() <= now) + synchronized (inboundState) { + switch (inboundState.getState()) { + case IB_STATE_REQUEST_RECEIVED: + if (!expired) sendCreated(inboundState); break; - case InboundEstablishState.STATE_CONFIRMED_COMPLETELY: + + case IB_STATE_CREATED_SENT: // fallthrough + case IB_STATE_CONFIRMED_PARTIALLY: + if (expired) { + sendDestroy(inboundState); + } else if (inboundState.getNextSendTime() <= now) { + sendCreated(inboundState); + } + break; + + case IB_STATE_CONFIRMED_COMPLETELY: RouterIdentity remote = inboundState.getConfirmedIdentity(); if (remote != null) { if (_context.shitlist().isShitlistedForever(remote.calculateHash())) { @@ -881,23 +940,24 @@ class EstablishmentManager { // So next time we will not accept the con, rather than doing the whole handshake _context.blocklist().add(inboundState.getSentIP()); inboundState.fail(); - break; + } else { + handleCompletelyEstablished(inboundState); } - handleCompletelyEstablished(inboundState); - break; } else { if (_log.shouldLog(Log.WARN)) _log.warn("confirmed with invalid? " + inboundState); inboundState.fail(); - break; } - case InboundEstablishState.STATE_FAILED: + break; + + case IB_STATE_FAILED: break; // already removed; - case InboundEstablishState.STATE_UNKNOWN: // fallthrough - default: - // wtf + + case IB_STATE_UNKNOWN: + // Can't happen, always call receiveSessionRequest() before putting in map if (_log.shouldLog(Log.ERROR)) _log.error("hrm, state is unknown for " + inboundState); + } } // ok, since there was something to do, we want to loop again @@ -921,23 +981,20 @@ class EstablishmentManager { //int remaining = 0; //int active = 0; - //int active = _outboundStates.size(); - //if (active > 0 && _log.shouldLog(Log.DEBUG)) - // _log.debug("# outbound states: " + active); for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) { OutboundEstablishState cur = iter.next(); - if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) { + if (cur.getState() == OB_STATE_CONFIRMED_COMPLETELY) { // completely received iter.remove(); outboundState = cur; if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing confirmed outbound: " + cur); break; - } else if (cur.getLifetime() > MAX_ESTABLISH_TIME) { + } else if (cur.getLifetime() > MAX_OB_ESTABLISH_TIME) { // took too long iter.remove(); outboundState = cur; - _context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime()); + //_context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing expired outbound: " + cur); break; @@ -954,7 +1011,7 @@ class EstablishmentManager { // established long when = -1; if (cur.getNextSendTime() <= 0) { - when = cur.getEstablishBeginTime() + MAX_ESTABLISH_TIME; + when = cur.getEstablishBeginTime() + MAX_OB_ESTABLISH_TIME; } else { when = cur.getNextSendTime(); } @@ -975,48 +1032,84 @@ class EstablishmentManager { if (outboundState != null) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Processing for outbound: " + outboundState); - if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) { - processExpired(outboundState); - } else { + synchronized (outboundState) { + boolean expired = outboundState.getLifetime() > MAX_OB_ESTABLISH_TIME; switch (outboundState.getState()) { - case OutboundEstablishState.STATE_UNKNOWN: - sendRequest(outboundState); - break; - case OutboundEstablishState.STATE_REQUEST_SENT: - // no response yet (or it was invalid), lets retry - if (outboundState.getNextSendTime() <= now) + case OB_STATE_UNKNOWN: + if (expired) + processExpired(outboundState); + else sendRequest(outboundState); break; - case OutboundEstablishState.STATE_CREATED_RECEIVED: // fallthrough - case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY: - if (outboundState.getNextSendTime() <= now) + + case OB_STATE_REQUEST_SENT: + // no response yet (or it was invalid), lets retry + long rtime = outboundState.getRequestSentTime(); + if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT < now)) + processExpired(outboundState); + else if (outboundState.getNextSendTime() <= now) + sendRequest(outboundState); + break; + + case OB_STATE_CREATED_RECEIVED: + if (expired) + processExpired(outboundState); + else if (outboundState.getNextSendTime() <= now) sendConfirmation(outboundState); break; - case OutboundEstablishState.STATE_CONFIRMED_COMPLETELY: - handleCompletelyEstablished(outboundState); + + case OB_STATE_CONFIRMED_PARTIALLY: + long ctime = outboundState.getConfirmedSentTime(); + if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT < now)) { + sendDestroy(outboundState); + processExpired(outboundState); + } else if (outboundState.getNextSendTime() <= now) { + sendConfirmation(outboundState); + } break; - case OutboundEstablishState.STATE_PENDING_INTRO: - handlePendingIntro(outboundState); + + case OB_STATE_CONFIRMED_COMPLETELY: + if (expired) + processExpired(outboundState); + else + handleCompletelyEstablished(outboundState); + break; + + case OB_STATE_PENDING_INTRO: + long itime = outboundState.getIntroSentTime(); + if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT < now)) + processExpired(outboundState); + else if (outboundState.getNextSendTime() <= now) + handlePendingIntro(outboundState); break; - default: - // wtf } } - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Since something happened outbound, next=now"); // ok, since there was something to do, we want to loop again nextSendTime = now; - } else { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("Nothing happened outbound, next is in " + (nextSendTime-now)); } return nextSendTime; } + /** + * Caller should probably synch on outboundState + */ private void processExpired(OutboundEstablishState outboundState) { - if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) { + long nonce = outboundState.getIntroNonce(); + if (nonce >= 0) { + // remove only if value == state + boolean removed = _liveIntroductions.remove(Long.valueOf(nonce), outboundState); + if (removed) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send intro for " + outboundState.getRemoteHostId() + " timed out"); + _context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0); + } + } + // should have already been removed in handleOutbound() above + // remove only if value == state + boolean removed = _outboundStates.remove(outboundState.getRemoteHostId(), outboundState); + if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) { if (_log.shouldLog(Log.INFO)) _log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime()); while (true) { @@ -1025,25 +1118,7 @@ class EstablishmentManager { break; _transport.failed(msg, "Expired during failed establish"); } - String err = null; - switch (outboundState.getState()) { - case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY: - err = "Took too long to establish remote connection (confirmed partially)"; - break; - case OutboundEstablishState.STATE_CREATED_RECEIVED: - err = "Took too long to establish remote connection (created received)"; - break; - case OutboundEstablishState.STATE_REQUEST_SENT: - err = "Took too long to establish remote connection (request sent)"; - break; - case OutboundEstablishState.STATE_PENDING_INTRO: - err = "Took too long to establish remote connection (intro failed)"; - break; - case OutboundEstablishState.STATE_UNKNOWN: // fallthrough - default: - err = "Took too long to establish remote connection (unknown state)"; - } - + String err = "Took too long to establish OB connection, state = " + outboundState.getState(); Hash peer = outboundState.getRemoteIdentity().calculateHash(); //_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE); _transport.markUnreachable(peer); @@ -1074,10 +1149,29 @@ class EstablishmentManager { _log.log(Log.CRIT, "Error in the establisher", re); } } + _inboundStates.clear(); + _outboundStates.clear(); + _queuedOutbound.clear(); + _liveIntroductions.clear(); } } + + // Debugging + private long _lastPrinted; + private static final long PRINT_INTERVAL = 5*1000; private void doPass() { + if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) { + _lastPrinted = _context.clock().now(); + int iactive = _inboundStates.size(); + int oactive = _outboundStates.size(); + if (iactive > 0 || oactive > 0) { + int queued = _queuedOutbound.size(); + int live = _liveIntroductions.size(); + _log.debug("OB states: " + oactive + " IB states: " + iactive + + " OB queued: " + queued + " intros: " + live); + } + } _activity = 0; long now = _context.clock().now(); long nextSendTime = -1; diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java index e03d618698..f5a63f421c 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java @@ -39,7 +39,7 @@ class InboundEstablishState { private SessionKey _sessionKey; private SessionKey _macKey; private Signature _sentSignature; - // SessionConfirmed messages + // SessionConfirmed messages - fragmented in theory but not in practice - see below private byte _receivedIdentity[][]; private long _receivedSignedOnTime; private byte _receivedSignature[]; @@ -48,25 +48,36 @@ class InboundEstablishState { // general status private final long _establishBegin; //private long _lastReceive; - // private long _lastSend; + private long _lastSend; private long _nextSend; private final RemoteHostId _remoteHostId; - private int _currentState; + private InboundState _currentState; private boolean _complete; + // count for backoff + private int _createdSentCount; - /** nothin known yet */ - public static final int STATE_UNKNOWN = 0; - /** we have received an initial request */ - public static final int STATE_REQUEST_RECEIVED = 1; - /** we have sent a signed creation packet */ - public static final int STATE_CREATED_SENT = 2; - /** we have received one or more confirmation packets */ - public static final int STATE_CONFIRMED_PARTIALLY = 3; - /** we have completely received all of the confirmation packets */ - public static final int STATE_CONFIRMED_COMPLETELY = 4; - /** we are explicitly failing it */ - public static final int STATE_FAILED = 5; + public enum InboundState { + /** nothin known yet */ + IB_STATE_UNKNOWN, + /** we have received an initial request */ + IB_STATE_REQUEST_RECEIVED, + /** we have sent a signed creation packet */ + IB_STATE_CREATED_SENT, + /** we have received one but not all the confirmation packets + * This never happens in practice - see below. */ + IB_STATE_CONFIRMED_PARTIALLY, + /** we have all the confirmation packets */ + IB_STATE_CONFIRMED_COMPLETELY, + /** we are explicitly failing it */ + IB_STATE_FAILED + } + /** basic delay before backoff */ + private static final long RETRANSMIT_DELAY = 1500; + + /** max delay including backoff */ + private static final long MAX_DELAY = 15*1000; + public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort, DHSessionKeyBuilder dh) { _context = ctx; @@ -75,12 +86,14 @@ class InboundEstablishState { _alicePort = remotePort; _remoteHostId = new RemoteHostId(_aliceIP, _alicePort); _bobPort = localPort; - _currentState = STATE_UNKNOWN; + _currentState = InboundState.IB_STATE_UNKNOWN; _establishBegin = ctx.clock().now(); _keyBuilder = dh; } - public synchronized int getState() { return _currentState; } + public synchronized InboundState getState() { return _currentState; } + + /** @return if previously complete */ public synchronized boolean complete() { boolean already = _complete; _complete = true; @@ -96,8 +109,8 @@ class InboundEstablishState { req.readIP(_bobIP, 0); if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive sessionRequest, BobIP = " + Addresses.toString(_bobIP)); - if (_currentState == STATE_UNKNOWN) - _currentState = STATE_REQUEST_RECEIVED; + if (_currentState == InboundState.IB_STATE_UNKNOWN) + _currentState = InboundState.IB_STATE_REQUEST_RECEIVED; packetReceived(); } @@ -105,6 +118,9 @@ class InboundEstablishState { public synchronized byte[] getReceivedX() { return _receivedX; } public synchronized byte[] getReceivedOurIP() { return _bobIP; } + /** + * Generates session key and mac key. + */ public synchronized void generateSessionKey() throws DHSessionKeyBuilder.InvalidPublicParameterException { if (_sessionKey != null) return; _keyBuilder.setPeerPublicValue(_receivedX); @@ -135,7 +151,7 @@ class InboundEstablishState { } public synchronized void fail() { - _currentState = STATE_FAILED; + _currentState = InboundState.IB_STATE_FAILED; } public synchronized long getSentRelayTag() { return _sentRelayTag; } @@ -197,20 +213,32 @@ class InboundEstablishState { /** note that we just sent a SessionCreated packet */ public synchronized void createdPacketSent() { - // _lastSend = _context.clock().now(); - if ( (_currentState == STATE_UNKNOWN) || (_currentState == STATE_REQUEST_RECEIVED) ) - _currentState = STATE_CREATED_SENT; + _lastSend = _context.clock().now(); + long delay; + if (_createdSentCount == 0) { + delay = RETRANSMIT_DELAY; + } else { + delay = Math.min(RETRANSMIT_DELAY << _createdSentCount, MAX_DELAY); + } + _createdSentCount++; + _nextSend = _lastSend + delay; + if ( (_currentState == InboundState.IB_STATE_UNKNOWN) || (_currentState == InboundState.IB_STATE_REQUEST_RECEIVED) ) + _currentState = InboundState.IB_STATE_CREATED_SENT; } - + /** how long have we been trying to establish this session? */ public long getLifetime() { return _context.clock().now() - _establishBegin; } public long getEstablishBeginTime() { return _establishBegin; } public synchronized long getNextSendTime() { return _nextSend; } - public synchronized void setNextSendTime(long when) { _nextSend = when; } /** RemoteHostId, uniquely identifies an attempt */ RemoteHostId getRemoteHostId() { return _remoteHostId; } + /** + * Note that while a SessionConfirmed could in theory be fragmented, + * in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max, + * so it will never be fragmented. + */ public synchronized void receiveSessionConfirmed(UDPPacketReader.SessionConfirmedReader conf) { if (_receivedIdentity == null) _receivedIdentity = new byte[conf.readTotalFragmentNum()][]; @@ -235,20 +263,23 @@ class InboundEstablishState { conf.readFinalSignature(_receivedSignature, 0); } - if ( (_currentState == STATE_UNKNOWN) || - (_currentState == STATE_REQUEST_RECEIVED) || - (_currentState == STATE_CREATED_SENT) ) { + if ( (_currentState == InboundState.IB_STATE_UNKNOWN) || + (_currentState == InboundState.IB_STATE_REQUEST_RECEIVED) || + (_currentState == InboundState.IB_STATE_CREATED_SENT) ) { if (confirmedFullyReceived()) - _currentState = STATE_CONFIRMED_COMPLETELY; + _currentState = InboundState.IB_STATE_CONFIRMED_COMPLETELY; else - _currentState = STATE_CONFIRMED_PARTIALLY; + _currentState = InboundState.IB_STATE_CONFIRMED_PARTIALLY; } packetReceived(); } - /** have we fully received the SessionConfirmed messages from Alice? */ - public synchronized boolean confirmedFullyReceived() { + /** + * Have we fully received the SessionConfirmed messages from Alice? + * Caller must synch on this. + */ + private boolean confirmedFullyReceived() { if (_receivedIdentity != null) { for (int i = 0; i < _receivedIdentity.length; i++) if (_receivedIdentity[i] == null) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index f6feed7ccd..c40247bf19 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -53,7 +53,7 @@ class OutboundEstablishState { private final RouterIdentity _remotePeer; private final SessionKey _introKey; private final Queue _queuedMessages; - private int _currentState; + private OutboundState _currentState; private long _introductionNonce; // intro private final UDPAddress _remoteAddress; @@ -62,19 +62,25 @@ class OutboundEstablishState { private int _confirmedSentCount; private int _requestSentCount; private int _introSentCount; + // Times for timeout + private long _confirmedSentTime; + private long _requestSentTime; + private long _introSentTime; - /** nothin sent yet */ - public static final int STATE_UNKNOWN = 0; - /** we have sent an initial request */ - public static final int STATE_REQUEST_SENT = 1; - /** we have received a signed creation packet */ - public static final int STATE_CREATED_RECEIVED = 2; - /** we have sent one or more confirmation packets */ - public static final int STATE_CONFIRMED_PARTIALLY = 3; - /** we have received a data packet */ - public static final int STATE_CONFIRMED_COMPLETELY = 4; - /** we need to have someone introduce us to the peer, but haven't received a RelayResponse yet */ - public static final int STATE_PENDING_INTRO = 5; + public enum OutboundState { + /** nothin sent yet */ + OB_STATE_UNKNOWN, + /** we have sent an initial request */ + OB_STATE_REQUEST_SENT, + /** we have received a signed creation packet */ + OB_STATE_CREATED_RECEIVED, + /** we have sent one or more confirmation packets */ + OB_STATE_CONFIRMED_PARTIALLY, + /** we have received a data packet */ + OB_STATE_CONFIRMED_COMPLETELY, + /** we need to have someone introduce us to the peer, but haven't received a RelayResponse yet */ + OB_STATE_PENDING_INTRO + } /** basic delay before backoff */ private static final long RETRANSMIT_DELAY = 1500; @@ -99,7 +105,7 @@ class OutboundEstablishState { _remotePeer = remotePeer; _introKey = introKey; _queuedMessages = new LinkedBlockingQueue(); - _currentState = STATE_UNKNOWN; + _currentState = OutboundState.OB_STATE_UNKNOWN; _establishBegin = ctx.clock().now(); _remoteAddress = addr; _introductionNonce = -1; @@ -109,11 +115,13 @@ class OutboundEstablishState { if ( (addr != null) && (addr.getIntroducerCount() > 0) ) { if (_log.shouldLog(Log.DEBUG)) _log.debug("new outbound establish to " + remotePeer.calculateHash() + ", with address: " + addr); - _currentState = STATE_PENDING_INTRO; + _currentState = OutboundState.OB_STATE_PENDING_INTRO; } } - public synchronized int getState() { return _currentState; } + public synchronized OutboundState getState() { return _currentState; } + + /** @return if previously complete */ public synchronized boolean complete() { boolean already = _complete; _complete = true; @@ -122,6 +130,8 @@ class OutboundEstablishState { public UDPAddress getRemoteAddress() { return _remoteAddress; } public void setIntroNonce(long nonce) { _introductionNonce = nonce; } + + /** @return -1 if unset */ public long getIntroNonce() { return _introductionNonce; } public void addMessage(OutNetMessage msg) { @@ -132,6 +142,7 @@ class OutboundEstablishState { _log.warn("attempt to add duplicate msg to queue: " + msg); } + /** @return null if none */ public OutNetMessage getNextQueuedMessage() { return _queuedMessages.poll(); } @@ -151,7 +162,9 @@ class OutboundEstablishState { } public byte[] getSentX() { return _sentX; } + /** the remote side (Bob) */ public synchronized byte[] getSentIP() { return _bobIP; } + /** the remote side (Bob) */ public synchronized int getSentPort() { return _bobPort; } public synchronized void receiveSessionCreated(UDPPacketReader.SessionCreatedReader reader) { @@ -181,8 +194,8 @@ class OutboundEstablishState { + " SignedOn: " + _receivedSignedOnTime + "\nthis: " + this.toString()); - if ( (_currentState == STATE_UNKNOWN) || (_currentState == STATE_REQUEST_SENT) ) - _currentState = STATE_CREATED_RECEIVED; + if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) || (_currentState == OutboundState.OB_STATE_REQUEST_SENT) ) + _currentState = OutboundState.OB_STATE_CREATED_RECEIVED; packetReceived(); } @@ -191,6 +204,8 @@ class OutboundEstablishState { * session was created properly. If it wasn't, all the SessionCreated * remnants are dropped (perhaps they were spoofed, etc) so that we can * receive another one + * + * Generates session key and mac key. */ public synchronized boolean validateSessionCreated() { if (_receivedSignature != null) { @@ -231,14 +246,18 @@ class OutboundEstablishState { _receivedIV = null; _receivedSignature = null; - if ( (_currentState == STATE_UNKNOWN) || - (_currentState == STATE_REQUEST_SENT) || - (_currentState == STATE_CREATED_RECEIVED) ) - _currentState = STATE_REQUEST_SENT; + if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) || + (_currentState == OutboundState.OB_STATE_REQUEST_SENT) || + (_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) ) + _currentState = OutboundState.OB_STATE_REQUEST_SENT; _nextSend = _context.clock().now(); } + /** + * Generates session key and mac key. + * Caller must synch on this. + */ private void generateSessionKey() throws DHSessionKeyBuilder.InvalidPublicParameterException { if (_sessionKey != null) return; _keyBuilder.setPeerPublicValue(_receivedY); @@ -247,14 +266,15 @@ class OutboundEstablishState { _macKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]); System.arraycopy(extra.getData(), 0, _macKey.getData(), 0, SessionKey.KEYSIZE_BYTES); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Established outbound keys. cipher: " + Base64.encode(_sessionKey.getData()) - + " mac: " + Base64.encode(_macKey.getData())); + _log.debug("Established outbound keys. cipher: " + _sessionKey + + " mac: " + _macKey); } /** * decrypt the signature (and subsequent pad bytes) with the * additional layer of encryption using the negotiated key along side * the packet's IV + * Caller must synch on this. */ private void decryptSignature() { if (_receivedEncryptedSignature == null) throw new NullPointerException("encrypted signature is null! this=" + this.toString()); @@ -272,6 +292,7 @@ class OutboundEstablishState { /** * Verify: Alice's IP + Alice's port + Bob's IP + Bob's port + Alice's * new relay tag + Bob's signed on time + * Caller must synch on this. */ private boolean verifySessionCreated() { byte signed[] = new byte[256+256 // X + Y @@ -324,8 +345,11 @@ class OutboundEstablishState { public synchronized int getReceivedPort() { return _alicePort; } /** - * Lets sign everything so we can fragment properly + * Let's sign everything so we can fragment properly. * + * Note that while a SessionConfirmed could in theory be fragmented, + * in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max, + * so it will never be fragmented. */ public synchronized void prepareSessionConfirmed() { if (_sentSignedOnTime > 0) @@ -365,50 +389,93 @@ class OutboundEstablishState { /** note that we just sent the SessionConfirmed packet */ public synchronized void confirmedPacketsSent() { _lastSend = _context.clock().now(); - long delay = Math.min(RETRANSMIT_DELAY << (_confirmedSentCount++), MAX_DELAY); + long delay; + if (_confirmedSentCount == 0) { + delay = RETRANSMIT_DELAY; + _confirmedSentTime = _lastSend; + } else { + delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount, MAX_DELAY); + } + _confirmedSentCount++; _nextSend = _lastSend + delay; if (_log.shouldLog(Log.DEBUG)) _log.debug("Send confirm packets, nextSend in " + delay); - if ( (_currentState == STATE_UNKNOWN) || - (_currentState == STATE_REQUEST_SENT) || - (_currentState == STATE_CREATED_RECEIVED) ) - _currentState = STATE_CONFIRMED_PARTIALLY; + if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) || + (_currentState == OutboundState.OB_STATE_REQUEST_SENT) || + (_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) ) + _currentState = OutboundState.OB_STATE_CONFIRMED_PARTIALLY; } + /** + * @return when we sent the first SessionConfirmed packet, or 0 + * @since 0.9.2 + */ + public long getConfirmedSentTime() { return _confirmedSentTime; } + /** note that we just sent the SessionRequest packet */ public synchronized void requestSent() { _lastSend = _context.clock().now(); - long delay = Math.min(RETRANSMIT_DELAY << (_requestSentCount++), MAX_DELAY); + long delay; + if (_requestSentCount == 0) { + delay = RETRANSMIT_DELAY; + _requestSentTime = _lastSend; + } else { + delay = Math.min(RETRANSMIT_DELAY << _requestSentCount, MAX_DELAY); + } + _requestSentCount++; _nextSend = _lastSend + delay; if (_log.shouldLog(Log.DEBUG)) _log.debug("Send a request packet, nextSend in " + delay); - if (_currentState == STATE_UNKNOWN) - _currentState = STATE_REQUEST_SENT; + if (_currentState == OutboundState.OB_STATE_UNKNOWN) + _currentState = OutboundState.OB_STATE_REQUEST_SENT; } + + /** + * @return when we sent the first SessionRequest packet, or 0 + * @since 0.9.2 + */ + public long getRequestSentTime() { return _requestSentTime; } + /** note that we just sent the RelayRequest packet */ public synchronized void introSent() { _lastSend = _context.clock().now(); - long delay = Math.min(RETRANSMIT_DELAY << (_introSentCount++), MAX_DELAY); + long delay; + if (_introSentCount == 0) { + delay = RETRANSMIT_DELAY; + _introSentTime = _lastSend; + } else { + delay = Math.min(RETRANSMIT_DELAY << _introSentCount, MAX_DELAY); + } + _introSentCount++; _nextSend = _lastSend + delay; - if (_currentState == STATE_UNKNOWN) - _currentState = STATE_PENDING_INTRO; + if (_currentState == OutboundState.OB_STATE_UNKNOWN) + _currentState = OutboundState.OB_STATE_PENDING_INTRO; } + /** + * @return when we sent the first RelayRequest packet, or 0 + * @since 0.9.2 + */ + public long getIntroSentTime() { return _introSentTime; } + public synchronized void introductionFailed() { _nextSend = _context.clock().now(); - // keep the state as STATE_PENDING_INTRO, so next time the EstablishmentManager asks us + // keep the state as OB_STATE_PENDING_INTRO, so next time the EstablishmentManager asks us // whats up, it'll try a new random intro peer } + /** + * This changes the remoteHostId from a hash-based one to a IP/Port one + */ public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) { - if (_currentState != STATE_PENDING_INTRO) + if (_currentState != OutboundState.OB_STATE_PENDING_INTRO) return; // we've already successfully been introduced, so don't overwrite old settings _nextSend = _context.clock().now() + 500; // wait briefly for the hole punching - if (_currentState == STATE_PENDING_INTRO) { - // STATE_UNKNOWN will probe the EstablishmentManager to send a new + if (_currentState == OutboundState.OB_STATE_PENDING_INTRO) { + // OB_STATE_UNKNOWN will probe the EstablishmentManager to send a new // session request to this newly known address - _currentState = STATE_UNKNOWN; + _currentState = OutboundState.OB_STATE_UNKNOWN; } _bobIP = bobIP; _bobPort = bobPort; @@ -421,11 +488,6 @@ class OutboundEstablishState { public long getLifetime() { return _context.clock().now() - _establishBegin; } public long getEstablishBeginTime() { return _establishBegin; } public synchronized long getNextSendTime() { return _nextSend; } - public synchronized void setNextSendTime(long when) { - _nextSend = when; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Explicit nextSend=" + (_nextSend-_context.clock().now()), new Exception("Set by")); - } /** uniquely identifies an attempt */ RemoteHostId getRemoteHostId() { return _remoteHostId; } @@ -433,7 +495,7 @@ class OutboundEstablishState { /** we have received a real data packet, so we're done establishing */ public synchronized void dataReceived() { packetReceived(); - _currentState = STATE_CONFIRMED_COMPLETELY; + _currentState = OutboundState.OB_STATE_CONFIRMED_COMPLETELY; } private void packetReceived() { diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index f3f8a9ea44..1a637695da 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -64,6 +64,7 @@ class OutboundMessageState { /** * Called from UDPTransport + * @return success */ public boolean initialize(I2NPMessage msg, PeerState peer) { if (msg == null) @@ -81,6 +82,7 @@ class OutboundMessageState { /** * Called from OutboundMessageFragments + * @return success */ public boolean initialize(OutNetMessage m, I2NPMessage msg) { if ( (m == null) || (msg == null) ) @@ -96,6 +98,10 @@ class OutboundMessageState { } } + /** + * Called from OutboundMessageFragments + * @return success + */ private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { _message = m; _peer = peer; diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index cf85a9148a..8b2b3414e8 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -685,12 +685,16 @@ class PacketBuilder { /** * Build a new series of SessionConfirmed packets for the given peer, * encrypting it as necessary. + * + * Note that while a SessionConfirmed could in theory be fragmented, + * in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max, + * so it will never be fragmented. * * @return ready to send packets, or null if there was a problem * * TODO: doesn't really return null, and caller doesn't handle null return * (null SigningPrivateKey should cause this?) - * Should probably return null if buildSessionConfirmedPacket() turns null for any fragment + * Should probably return null if buildSessionConfirmedPacket() returns null for any fragment */ public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState state, RouterIdentity ourIdentity) { byte identity[] = ourIdentity.toByteArray(); @@ -793,26 +797,88 @@ class PacketBuilder { * @since 0.8.1 */ public UDPPacket buildSessionDestroyPacket(PeerState peer) { + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("building session destroy packet to " + peer.getRemotePeer()); + } + return buildSessionDestroyPacket(peer.getCurrentCipherKey(), peer.getCurrentMACKey(), + peer.getRemoteIPAddress(), peer.getRemotePort()); + } + + /** + * Build a destroy packet, which contains a header but no body. + * If the keys and ip/port are not yet set, this will return null. + * + * @return packet or null + * @since 0.9.2 + */ + public UDPPacket buildSessionDestroyPacket(OutboundEstablishState peer) { + SessionKey cipherKey = peer.getCipherKey(); + SessionKey macKey = peer.getMACKey(); + byte[] ip = peer.getSentIP(); + int port = peer.getSentPort(); + if (cipherKey == null || macKey == null || ip == null || port <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Cannot send destroy, incomplete " + peer); + return null; + } + InetAddress addr; + try { + addr = InetAddress.getByAddress(ip); + } catch (UnknownHostException uhe) { + return null; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("building session destroy packet to " + peer); + return buildSessionDestroyPacket(cipherKey, macKey, addr, port); + } + + + /** + * Build a destroy packet, which contains a header but no body. + * If the keys and ip/port are not yet set, this will return null. + * + * @return packet or null + * @since 0.9.2 + */ + public UDPPacket buildSessionDestroyPacket(InboundEstablishState peer) { + SessionKey cipherKey = peer.getCipherKey(); + SessionKey macKey = peer.getMACKey(); + byte[] ip = peer.getSentIP(); + int port = peer.getSentPort(); + if (cipherKey == null || macKey == null || ip == null || port <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Cannot send destroy, incomplete " + peer); + return null; + } + InetAddress addr; + try { + addr = InetAddress.getByAddress(ip); + } catch (UnknownHostException uhe) { + return null; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("building session destroy packet to " + peer); + return buildSessionDestroyPacket(cipherKey, macKey, addr, port); + } + + /** + * Build a destroy packet, which contains a header but no body. + * @param cipherKey non-null + * @param macKey non-null + * @since 0.9.2 + */ + private UDPPacket buildSessionDestroyPacket(SessionKey cipherKey, SessionKey macKey, InetAddress addr, int port) { UDPPacket packet = buildPacketHeader((byte)(UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4)); int off = HEADER_SIZE; - StringBuilder msg = null; - if (_log.shouldLog(Log.DEBUG)) { - msg = new StringBuilder(128); - msg.append("building session destroy packet to ").append(peer.getRemotePeer().toBase64().substring(0,6)); - } - // no body in this message - - if (msg != null) - _log.debug(msg.toString()); // pad up so we're on the encryption boundary if ( (off % 16) != 0) off += 16 - (off % 16); packet.getPacket().setLength(off); - authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey()); - setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); + authenticate(packet, cipherKey, macKey); + setTo(packet, addr, port); return packet; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 0c67d335a6..572d26152e 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -328,7 +328,7 @@ class PacketHandler { // Note that the vast majority of these are NOT corrupted packets, but // packets for which we don't have the PeerState (i.e. SessionKey) if (_log.shouldLog(Log.WARN)) - _log.warn("Invalid introduction packet received: " + packet, new Exception("path")); + _log.warn("Cannot validate rcvd pkt (path): " + packet); _context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration()); switch (peerType) { case INBOUND_FALLBACK: @@ -465,11 +465,13 @@ class PacketHandler { } /** - * Parse out the interesting bits and honor what it says + * The last step. The packet was decrypted with some key. Now get the message type + * and send it to one of four places: The EstablishmentManager, IntroductionManager, + * PeerTestManager, or InboundMessageFragments. * * @param state non-null if fully established * @param outState non-null if outbound establishing in process - * @param inState unused always null + * @param inState unused always null, TODO use for 48-byte destroys during inbound establishment * @param isAuthenticated true if a state key was used, false if our own intro key was used */ private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index 75f4e9c2e2..2f9f9620a3 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -125,6 +125,11 @@ class UDPPacket { _validateCount = 0; _remoteHost = null; _released = false; + // clear out some values to make debugging easier via toString() + _messageType = -1; + _enqueueTime = 0; + _receivedTime = 0; + _fragmentCount = 0; } /**** @@ -284,12 +289,17 @@ class UDPPacket { buf.append(_packet.getAddress().getHostAddress()).append(":"); buf.append(_packet.getPort()); //buf.append(" id=").append(System.identityHashCode(this)); - buf.append(" msgType=").append(_messageType); - buf.append(" markType=").append(_markedType); - buf.append(" fragCount=").append(_fragmentCount); + if (_messageType >= 0) + buf.append(" msgType=").append(_messageType); + if (_markedType >= 0) + buf.append(" markType=").append(_markedType); + if (_fragmentCount > 0) + buf.append(" fragCount=").append(_fragmentCount); - buf.append(" sinceEnqueued=").append((_enqueueTime > 0 ? _context.clock().now()-_enqueueTime : -1)); - buf.append(" sinceReceived=").append((_receivedTime > 0 ? _context.clock().now()-_receivedTime : -1)); + if (_enqueueTime >= 0) + buf.append(" sinceEnqueued=").append(_context.clock().now() - _enqueueTime); + if (_receivedTime >= 0) + buf.append(" sinceReceived=").append(_context.clock().now() - _receivedTime); //buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1)); //buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1)); //buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength()));