* SSU EstablishmentManager:

- Remove use of outbound timers in EstablishmentManager; drive all events in Establisher thread
   - Don't change nonces when retransmitting intro packets
   - More synchronization in EstablishmentManager
   - Increase establishment timeouts and implement timeouts for individual phases (ticket #669)
   - Fix bug where InboundEstablishState.createdPacketSent() wasn't being called,
     so SessionCreated packets weren't retransmitted
   - Increase retransmission timeout for SessionCreated and implement backoff
   - Send destroy if establishment times out in the middle
   - Fix code that pulls outbound states off a deferred queue
   - Improve UDPPacket.toString() for debugging
   - More logging of packets dropped in EstablishmentManager
   - Change establish states to enums
This commit is contained in:
zzz
2012-08-13 15:12:33 +00:00
parent e9cc85141c
commit 16a46b3211
9 changed files with 591 additions and 305 deletions

View File

@@ -1,3 +1,18 @@
2012-08-13 zzz
* SSU EstablishmentManager:
- Remove use of outbound timers in EstablishmentManager; drive all events in Establisher thread
- Don't change nonces when retransmitting intro packets
- More synchronization in EstablishmentManager
- Increase establishment timeouts and implement timeouts for individual phases (ticket #669)
- Fix bug where InboundEstablishState.createdPacketSent() wasn't being called,
so SessionCreated packets weren't retransmitted
- Increase retransmission timeout for SessionCreated and implement backoff
- Send destroy if establishment times out in the middle
- Fix code that pulls outbound states off a deferred queue
- Improve UDPPacket.toString() for debugging
- More logging of packets dropped in EstablishmentManager
- Change establish states to enums
2012-08-12 zzz
* Jetty: Don't use direct byte buffers that may be leaking (ticket #679)
* PeerManager: Fix NPE on Android (ticket #687)

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 7;
public final static long BUILD = 8;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -5,6 +5,7 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Base64;
@@ -20,11 +21,11 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.transport.crypto.DHSessionKeyBuilder;
import static net.i2p.router.transport.udp.InboundEstablishState.InboundState.*;
import static net.i2p.router.transport.udp.OutboundEstablishState.OutboundState.*;
import net.i2p.util.Addresses;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Coordinate the establishment of new sessions - both inbound and outbound.
@@ -49,8 +50,8 @@ class EstablishmentManager {
private final Object _activityLock;
private int _activity;
/** max outbound in progress */
private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 20;
/** max outbound in progress - max inbound is half of this */
private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 30;
private static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
/** max pending outbound connections (waiting because we are at MAX_CONCURRENT_ESTABLISH) */
@@ -58,7 +59,31 @@ class EstablishmentManager {
/** max queued msgs per peer while the peer connection is queued */
private static final int MAX_QUEUED_PER_PEER = 3;
private static final long MAX_NONCE = 0xFFFFFFFFl;
/**
* Kill any outbound that takes more than this.
* Two round trips (Req-Created-Confirmed-Data) for direct;
* 3 1/2 round trips (RReq-RResp+Intro-HolePunch-Req-Created-Confirmed-Data) for indirect.
* Note that this is way too long for us to be able to fall back to NTCP
* for individual messages unless the message timer fires first.
* But SSU probably isn't higher priority than NTCP.
* And it's important to not fail an establishment too soon and waste it.
*/
private static final int MAX_OB_ESTABLISH_TIME = 35*1000;
/**
* Kill any inbound that takes more than this
* One round trip (Created-Confirmed)
*/
private static final int MAX_IB_ESTABLISH_TIME = 20*1000;
/** max before receiving a response to a single message during outbound establishment */
private static final int OB_MESSAGE_TIMEOUT = 15*1000;
/** for the DSM and or netdb store */
private static final int DATA_MESSAGE_TIMEOUT = 10*1000;
public EstablishmentManager(RouterContext ctx, UDPTransport transport) {
_context = ctx;
@@ -72,8 +97,8 @@ class EstablishmentManager {
_activityLock = new Object();
_context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendIntroRelayRequest", "How often we send a relay request to reach a peer", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendIntroRelayTimeout", "How often a relay request times out before getting a response (due to the target or intro peer being offline)", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", UDPTransport.RATES);
@@ -100,6 +125,7 @@ class EstablishmentManager {
I2PThread t = new I2PThread(new Establisher(), "UDP Establisher", true);
t.start();
}
public void shutdown() {
_alive = false;
notifyActivity();
@@ -107,6 +133,7 @@ class EstablishmentManager {
/**
* Grab the active establishing state
* @return null if none
*/
InboundEstablishState getInboundState(RemoteHostId from) {
InboundEstablishState state = _inboundStates.get(from);
@@ -115,6 +142,10 @@ class EstablishmentManager {
return state;
}
/**
* Grab the active establishing state
* @return null if none
*/
OutboundEstablishState getOutboundState(RemoteHostId from) {
OutboundEstablishState state = _outboundStates.get(from);
// if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
@@ -122,6 +153,9 @@ class EstablishmentManager {
return state;
}
/**
* How many concurrent outbound sessions to deal with
*/
private int getMaxConcurrentEstablish() {
return _context.getProperty(PROP_MAX_CONCURRENT_ESTABLISH, DEFAULT_MAX_CONCURRENT_ESTABLISH);
}
@@ -174,9 +208,11 @@ class EstablishmentManager {
state = _outboundStates.get(to);
if (state == null) {
if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
if (_queuedOutbound.size() >= MAX_QUEUED_OUTBOUND) {
rejected = true;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Queueing outbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
List<OutNetMessage> newQueued = new ArrayList(MAX_QUEUED_PER_PEER);
List<OutNetMessage> queued = _queuedOutbound.putIfAbsent(to, newQueued);
if (queued == null)
@@ -210,7 +246,7 @@ class EstablishmentManager {
_transport.failed(msg, "Peer has bad key, cannot establish");
return;
}
state = new OutboundEstablishState(_context, remAddr, port,
state = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
sessionKey, addr, _transport.getDHBuilder());
OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
@@ -218,8 +254,6 @@ class EstablishmentManager {
if (!isNew)
// whoops, somebody beat us to it, throw out the state we just created
state = oldState;
else
_context.simpleScheduler().addEvent(new Expire(to, state), 10*1000);
}
}
if (state != null) {
@@ -249,33 +283,12 @@ class EstablishmentManager {
}
if (deferred > 0)
msg.timestamp("too many deferred establishers: " + deferred);
msg.timestamp("too many deferred establishers");
else if (state != null)
msg.timestamp("establish state already waiting " + state.getLifetime());
msg.timestamp("establish state already waiting");
notifyActivity();
}
private class Expire implements SimpleTimer.TimedEvent {
private final RemoteHostId _to;
private final OutboundEstablishState _state;
public Expire(RemoteHostId to, OutboundEstablishState state) {
_to = to;
_state = state;
}
public void timeReached() {
// remove only if value == state
boolean removed = _outboundStates.remove(_to, _state);
if (removed) {
_context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Timing out expired outbound: " + _state);
processExpired(_state);
}
}
}
/**
* How many concurrent inbound sessions to deal with
*/
@@ -288,22 +301,27 @@ class EstablishmentManager {
*
*/
void receiveSessionRequest(RemoteHostId from, UDPPacketReader reader) {
if (!_transport.isValid(from.getIP()))
if (!_transport.isValid(from.getIP())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request from invalid IP: " + from);
return;
}
int maxInbound = getMaxInboundEstablishers();
boolean isNew = false;
if (_inboundStates.size() >= maxInbound) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
_context.statManager().addRateData("udp.establishDropped", 1);
return; // drop the packet
}
InboundEstablishState state = _inboundStates.get(from);
if (state == null) {
// TODO this is insufficient to prevent DoSing, especially if
// IP spoofing is used. For further study.
if (_inboundStates.size() >= maxInbound) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping inbound establish, increase " + PROP_MAX_CONCURRENT_ESTABLISH);
_context.statManager().addRateData("udp.establishDropped", 1);
return; // drop the packet
}
if (_context.blocklist().isBlocklisted(from.getIP())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request from blocklisted IP: " + from);
@@ -332,16 +350,16 @@ class EstablishmentManager {
long tag = 1 + _context.random().nextLong(MAX_TAG_VALUE);
state.setSentRelayTag(tag);
if (_log.shouldLog(Log.INFO))
_log.info("Received session request from " + from + ", sending relay tag " + tag);
_log.info("Received NEW session request from " + from + ", sending relay tag " + tag);
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Received session request, but our status is " + _transport.getReachabilityStatus());
}
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive DUP session request from: " + state.getRemoteHostId());
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session request from: " + state.getRemoteHostId().toString());
notifyActivity();
}
@@ -356,6 +374,9 @@ class EstablishmentManager {
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session confirmed from: " + state.getRemoteHostId().toString());
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive (DUP?) session confirmed from: " + from);
}
}
@@ -370,6 +391,9 @@ class EstablishmentManager {
notifyActivity();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive session created from: " + state.getRemoteHostId().toString());
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive (DUP?) session created from: " + from);
}
}
@@ -392,18 +416,19 @@ class EstablishmentManager {
_log.debug("Receive session destroy (OB) from: " + from);
_outboundStates.remove(from);
Hash peer = state.getRemoteIdentity().calculateHash();
_transport.dropPeer(peer, false, "received destroy message");
_transport.dropPeer(peer, false, "received destroy message during OB establish");
}
/**
* Got a SessionDestroy - maybe after an inbound establish?
* Got a SessionDestroy - maybe during an inbound establish?
* TODO - PacketHandler won't look up inbound establishes
* As this packet was essentially unauthenticated (i.e. intro key, not session key)
* we just log it as it could be spoofed.
* @since 0.8.1
*/
void receiveSessionDestroy(RemoteHostId from) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session destroy (IB) from: " + from);
_log.warn("Receive session destroy (none) from: " + from);
//InboundEstablishState state = _inboundStates.remove(from);
//if (state != null) {
// Hash peer = state.getConfirmedIdentity().calculateHash();
@@ -436,7 +461,7 @@ class EstablishmentManager {
}
}
//admitted = locked_admitQueued();
locked_admitQueued();
//remaining = _queuedOutbound.size();
//if (admitted > 0)
@@ -449,24 +474,43 @@ class EstablishmentManager {
return peer;
}
/********
/**
* Move pending OB messages from _queuedOutbound to _outboundStates.
* This isn't so great because _queuedOutbound is not a FIFO.
*/
private int locked_admitQueued() {
if (_queuedOutbound.isEmpty())
return 0;
int admitted = 0;
while ( (!_queuedOutbound.isEmpty()) && (_outboundStates.size() < getMaxConcurrentEstablish()) ) {
// ok, active shrunk, lets let some queued in. duplicate the synchronized
// section from the add(
int max = getMaxConcurrentEstablish();
for (Iterator<Map.Entry<RemoteHostId, List<OutNetMessage>>> iter = _queuedOutbound.entrySet().iterator();
iter.hasNext() && _outboundStates.size() < max; ) {
// ok, active shrunk, lets let some queued in.
RemoteHostId to = (RemoteHostId)_queuedOutbound.keySet().iterator().next();
List queued = (List)_queuedOutbound.remove(to);
Map.Entry<RemoteHostId, List<OutNetMessage>> entry = iter.next();
iter.remove();
RemoteHostId to = entry.getKey();
List<OutNetMessage> allQueued = entry.getValue();
List<OutNetMessage> queued = new ArrayList();
long now = _context.clock().now();
synchronized (allQueued) {
for (OutNetMessage msg : allQueued) {
if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) {
_transport.failed(msg, "Took too long in est. mgr OB queue");
} else {
queued.add(msg);
}
}
}
if (queued.isEmpty())
continue;
OutNetMessage msg = (OutNetMessage)queued.get(0);
OutNetMessage msg = queued.get(0);
RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
if (ra == null) {
for (int i = 0; i < queued.size(); i++)
_transport.failed((OutNetMessage)queued.get(i), "Cannot admit to the queue, as it has no address");
_transport.failed(queued.get(i), "Cannot admit to the queue, as it has no address");
continue;
}
UDPAddress addr = new UDPAddress(ra);
@@ -475,12 +519,14 @@ class EstablishmentManager {
OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()), addr);
_outboundStates.put(to, qstate);
_context.simpleScheduler().addEvent(new Expire(to, qstate), 10*1000);
new SessionKey(addr.getIntroKey()), addr,
_transport.getDHBuilder());
OutboundEstablishState old = _outboundStates.putIfAbsent(to, qstate);
if (old != null)
qstate = old;
for (int i = 0; i < queued.size(); i++) {
OutNetMessage m = (OutNetMessage)queued.get(i);
OutNetMessage m = queued.get(i);
m.timestamp("no longer deferred... establishing");
qstate.addMessage(m);
}
@@ -488,7 +534,6 @@ class EstablishmentManager {
}
return admitted;
}
*******/
private void notifyActivity() {
synchronized (_activityLock) {
@@ -497,9 +542,6 @@ class EstablishmentManager {
}
}
/** kill any inbound or outbound that takes more than 30s */
private static final int MAX_ESTABLISH_TIME = 30*1000;
/**
* ok, fully received, add it to the established cons and queue up a
* netDb store to them
@@ -553,30 +595,24 @@ class EstablishmentManager {
dsm.setArrival(Router.NETWORK_ID); // overloaded, sure, but future versions can check this
// This causes huge values in the inNetPool.droppedDeliveryStatusDelay stat
// so it needs to be caught in InNetMessagePool.
dsm.setMessageExpiration(_context.clock().now()+10*1000);
dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);
dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE));
_transport.send(dsm, peer);
_context.simpleScheduler().addEvent(new PublishToNewInbound(peer), 0);
}
private class PublishToNewInbound implements SimpleTimer.TimedEvent {
private final PeerState _peer;
// just do this inline
//_context.simpleScheduler().addEvent(new PublishToNewInbound(peer), 0);
public PublishToNewInbound(PeerState peer) { _peer = peer; }
public void timeReached() {
Hash peer = _peer.getRemotePeer();
if ((peer != null) && (!_context.shitlist().isShitlisted(peer)) && (!_transport.isUnreachable(peer))) {
Hash hash = peer.getRemotePeer();
if ((hash != null) && (!_context.shitlist().isShitlisted(hash)) && (!_transport.isUnreachable(hash))) {
// ok, we are fine with them, send them our latest info
if (_log.shouldLog(Log.INFO))
_log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer);
sendOurInfo(_peer, true);
//if (_log.shouldLog(Log.INFO))
// _log.info("Publishing to the peer after confirm plus delay (without shitlist): " + peer);
sendOurInfo(peer, true);
} else {
// nuh uh.
if (_log.shouldLog(Log.WARN))
_log.warn("NOT publishing to the peer after confirm plus delay (WITH shitlist): " + (peer != null ? peer.toString() : "unknown"));
_log.warn("NOT publishing to the peer after confirm plus delay (WITH shitlist): " + (hash != null ? hash.toString() : "unknown"));
}
}
}
/**
@@ -612,19 +648,15 @@ class EstablishmentManager {
_context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer, false);
int i = 0;
while (true) {
OutNetMessage msg = state.getNextQueuedMessage();
if (msg == null)
break;
OutNetMessage msg;
while ((msg = state.getNextQueuedMessage()) != null) {
if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) {
msg.timestamp("took too long but established...");
_transport.failed(msg, "Took too long to establish, but it was established");
} else {
msg.timestamp("session fully established and sent " + i);
msg.timestamp("session fully established and sent");
_transport.send(msg);
}
i++;
}
return peer;
}
@@ -636,7 +668,7 @@ class EstablishmentManager {
DatabaseStoreMessage m = new DatabaseStoreMessage(_context);
m.setEntry(_context.router().getRouterInfo());
m.setMessageExpiration(_context.clock().now() + 10*1000);
m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT);
_transport.send(m, peer);
}
@@ -678,11 +710,12 @@ class EstablishmentManager {
return;
}
_transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey()));
// if they haven't advanced to sending us confirmed packets in 1s,
// repeat
state.setNextSendTime(now + 1000);
state.createdPacketSent();
}
/**
* Caller should probably synch on outboundState
*/
private void sendRequest(OutboundEstablishState state) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send SessionRequest to: " + state.getRemoteHostId());
@@ -696,22 +729,21 @@ class EstablishmentManager {
state.requestSent();
}
private static final long MAX_NONCE = 0xFFFFFFFFl;
/** if we don't get a relayResponse in 3 seconds, try again */
private static final int INTRO_ATTEMPT_TIMEOUT = 3*1000;
/**
* Send RelayRequests to multiple introducers.
* This may be called multiple times, it sets the nonce the first time only
* Caller should probably synch on state.
*/
private void handlePendingIntro(OutboundEstablishState state) {
long nonce = _context.random().nextLong(MAX_NONCE);
while (true) {
OutboundEstablishState old = _liveIntroductions.putIfAbsent(Long.valueOf(nonce), state);
if (old != null) {
nonce = _context.random().nextLong(MAX_NONCE);
} else {
break;
}
long nonce = state.getIntroNonce();
if (nonce < 0) {
OutboundEstablishState old;
do {
nonce = _context.random().nextLong(MAX_NONCE);
old = _liveIntroductions.putIfAbsent(Long.valueOf(nonce), state);
} while (old != null);
state.setIntroNonce(nonce);
}
_context.simpleScheduler().addEvent(new FailIntroduction(state, nonce), INTRO_ATTEMPT_TIMEOUT);
state.setIntroNonce(nonce);
_context.statManager().addRateData("udp.sendIntroRelayRequest", 1, 0);
UDPPacket requests[] = _builder.buildRelayRequest(_transport, state, _transport.getIntroKey());
for (int i = 0; i < requests.length; i++) {
@@ -719,31 +751,10 @@ class EstablishmentManager {
_transport.send(requests[i]);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + state.getRemoteHostId().toString() + " with our intro key as " + _transport.getIntroKey());
_log.debug("Send intro for " + state.getRemoteHostId() + " with our intro key as " + _transport.getIntroKey());
state.introSent();
}
private class FailIntroduction implements SimpleTimer.TimedEvent {
private final long _nonce;
private final OutboundEstablishState _state;
public FailIntroduction(OutboundEstablishState state, long nonce) {
_nonce = nonce;
_state = state;
}
public void timeReached() {
// remove only if value equal to state
boolean removed = _liveIntroductions.remove(Long.valueOf(_nonce), _state);
if (removed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + _state.getRemoteHostId().toString() + " timed out");
_context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0);
notifyActivity();
}
}
}
void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) {
long nonce = reader.getRelayResponseReader().readNonce();
OutboundEstablishState state = _liveIntroductions.remove(Long.valueOf(nonce));
@@ -776,6 +787,12 @@ class EstablishmentManager {
notifyActivity();
}
/**
* Note that while a SessionConfirmed could in theory be fragmented,
* in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max,
* so it will never be fragmented.
* Caller should probably synch on state.
*/
private void sendConfirmation(OutboundEstablishState state) {
boolean valid = state.validateSessionCreated();
if (!valid) // validate clears fields on failure
@@ -796,7 +813,7 @@ class EstablishmentManager {
UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state, _context.router().getRouterInfo().getIdentity());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send confirm to: " + state.getRemoteHostId().toString());
_log.debug("Send confirm to: " + state);
for (int i = 0; i < packets.length; i++)
_transport.send(packets[i]);
@@ -804,6 +821,41 @@ class EstablishmentManager {
state.confirmedPacketsSent();
}
/**
* Tell the other side never mind.
* This is only useful after we have received SessionCreated,
* and sent SessionConfirmed, but not yet gotten a data packet as an
* ack to the SessionConfirmed - otherwise we haven't generated the keys.
* Caller should probably synch on state.
*
* @since 0.9.2
*/
private void sendDestroy(OutboundEstablishState state) {
UDPPacket packet = _builder.buildSessionDestroyPacket(state);
if (packet != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send destroy to: " + state);
_transport.send(packet);
}
}
/**
* Tell the other side never mind.
* This is only useful after we have sent SessionCreated,
* but not received SessionConfirmed
* Otherwise we haven't generated the keys.
* Caller should probably synch on state.
*
* @since 0.9.2
*/
private void sendDestroy(InboundEstablishState state) {
UDPPacket packet = _builder.buildSessionDestroyPacket(state);
if (packet != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send destroy to: " + state);
_transport.send(packet);
}
}
/**
* Drive through the inbound establishment states, adjusting one of them
@@ -814,28 +866,29 @@ class EstablishmentManager {
long now = _context.clock().now();
long nextSendTime = -1;
InboundEstablishState inboundState = null;
boolean expired = false;
//int active = _inboundStates.size();
//if (active > 0 && _log.shouldLog(Log.DEBUG))
// _log.debug("# inbound states: " + active);
for (Iterator<InboundEstablishState> iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
InboundEstablishState cur = iter.next();
if (cur.getState() == InboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
if (cur.getState() == IB_STATE_CONFIRMED_COMPLETELY) {
// completely received (though the signature may be invalid)
iter.remove();
inboundState = cur;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing completely confirmed inbound state");
break;
} else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
} else if (cur.getLifetime() > MAX_IB_ESTABLISH_TIME) {
// took too long
iter.remove();
_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
inboundState = cur;
//_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing expired inbound state");
} else if (cur.getState() == InboundEstablishState.STATE_FAILED) {
expired = true;
break;
} else if (cur.getState() == IB_STATE_FAILED) {
iter.remove();
_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
//_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
} else {
if (cur.getNextSendTime() <= now) {
// our turn...
@@ -849,7 +902,7 @@ class EstablishmentManager {
// established
long when = -1;
if (cur.getNextSendTime() <= 0) {
when = cur.getEstablishBeginTime() + MAX_ESTABLISH_TIME;
when = cur.getEstablishBeginTime() + MAX_IB_ESTABLISH_TIME;
} else {
when = cur.getNextSendTime();
}
@@ -862,17 +915,23 @@ class EstablishmentManager {
if (inboundState != null) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Processing for inbound: " + inboundState);
switch (inboundState.getState()) {
case InboundEstablishState.STATE_REQUEST_RECEIVED:
sendCreated(inboundState);
break;
case InboundEstablishState.STATE_CREATED_SENT: // fallthrough
case InboundEstablishState.STATE_CONFIRMED_PARTIALLY:
// if its been 5s since we sent the SessionCreated, resend
if (inboundState.getNextSendTime() <= now)
synchronized (inboundState) {
switch (inboundState.getState()) {
case IB_STATE_REQUEST_RECEIVED:
if (!expired)
sendCreated(inboundState);
break;
case InboundEstablishState.STATE_CONFIRMED_COMPLETELY:
case IB_STATE_CREATED_SENT: // fallthrough
case IB_STATE_CONFIRMED_PARTIALLY:
if (expired) {
sendDestroy(inboundState);
} else if (inboundState.getNextSendTime() <= now) {
sendCreated(inboundState);
}
break;
case IB_STATE_CONFIRMED_COMPLETELY:
RouterIdentity remote = inboundState.getConfirmedIdentity();
if (remote != null) {
if (_context.shitlist().isShitlistedForever(remote.calculateHash())) {
@@ -881,23 +940,24 @@ class EstablishmentManager {
// So next time we will not accept the con, rather than doing the whole handshake
_context.blocklist().add(inboundState.getSentIP());
inboundState.fail();
break;
} else {
handleCompletelyEstablished(inboundState);
}
handleCompletelyEstablished(inboundState);
break;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("confirmed with invalid? " + inboundState);
inboundState.fail();
break;
}
case InboundEstablishState.STATE_FAILED:
break;
case IB_STATE_FAILED:
break; // already removed;
case InboundEstablishState.STATE_UNKNOWN: // fallthrough
default:
// wtf
case IB_STATE_UNKNOWN:
// Can't happen, always call receiveSessionRequest() before putting in map
if (_log.shouldLog(Log.ERROR))
_log.error("hrm, state is unknown for " + inboundState);
}
}
// ok, since there was something to do, we want to loop again
@@ -921,23 +981,20 @@ class EstablishmentManager {
//int remaining = 0;
//int active = 0;
//int active = _outboundStates.size();
//if (active > 0 && _log.shouldLog(Log.DEBUG))
// _log.debug("# outbound states: " + active);
for (Iterator<OutboundEstablishState> iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState cur = iter.next();
if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
if (cur.getState() == OB_STATE_CONFIRMED_COMPLETELY) {
// completely received
iter.remove();
outboundState = cur;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing confirmed outbound: " + cur);
break;
} else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
} else if (cur.getLifetime() > MAX_OB_ESTABLISH_TIME) {
// took too long
iter.remove();
outboundState = cur;
_context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime());
//_context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing expired outbound: " + cur);
break;
@@ -954,7 +1011,7 @@ class EstablishmentManager {
// established
long when = -1;
if (cur.getNextSendTime() <= 0) {
when = cur.getEstablishBeginTime() + MAX_ESTABLISH_TIME;
when = cur.getEstablishBeginTime() + MAX_OB_ESTABLISH_TIME;
} else {
when = cur.getNextSendTime();
}
@@ -975,48 +1032,84 @@ class EstablishmentManager {
if (outboundState != null) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Processing for outbound: " + outboundState);
if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) {
processExpired(outboundState);
} else {
synchronized (outboundState) {
boolean expired = outboundState.getLifetime() > MAX_OB_ESTABLISH_TIME;
switch (outboundState.getState()) {
case OutboundEstablishState.STATE_UNKNOWN:
sendRequest(outboundState);
break;
case OutboundEstablishState.STATE_REQUEST_SENT:
// no response yet (or it was invalid), lets retry
if (outboundState.getNextSendTime() <= now)
case OB_STATE_UNKNOWN:
if (expired)
processExpired(outboundState);
else
sendRequest(outboundState);
break;
case OutboundEstablishState.STATE_CREATED_RECEIVED: // fallthrough
case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY:
if (outboundState.getNextSendTime() <= now)
case OB_STATE_REQUEST_SENT:
// no response yet (or it was invalid), lets retry
long rtime = outboundState.getRequestSentTime();
if (expired || (rtime > 0 && rtime + OB_MESSAGE_TIMEOUT < now))
processExpired(outboundState);
else if (outboundState.getNextSendTime() <= now)
sendRequest(outboundState);
break;
case OB_STATE_CREATED_RECEIVED:
if (expired)
processExpired(outboundState);
else if (outboundState.getNextSendTime() <= now)
sendConfirmation(outboundState);
break;
case OutboundEstablishState.STATE_CONFIRMED_COMPLETELY:
handleCompletelyEstablished(outboundState);
case OB_STATE_CONFIRMED_PARTIALLY:
long ctime = outboundState.getConfirmedSentTime();
if (expired || (ctime > 0 && ctime + OB_MESSAGE_TIMEOUT < now)) {
sendDestroy(outboundState);
processExpired(outboundState);
} else if (outboundState.getNextSendTime() <= now) {
sendConfirmation(outboundState);
}
break;
case OutboundEstablishState.STATE_PENDING_INTRO:
handlePendingIntro(outboundState);
case OB_STATE_CONFIRMED_COMPLETELY:
if (expired)
processExpired(outboundState);
else
handleCompletelyEstablished(outboundState);
break;
case OB_STATE_PENDING_INTRO:
long itime = outboundState.getIntroSentTime();
if (expired || (itime > 0 && itime + OB_MESSAGE_TIMEOUT < now))
processExpired(outboundState);
else if (outboundState.getNextSendTime() <= now)
handlePendingIntro(outboundState);
break;
default:
// wtf
}
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Since something happened outbound, next=now");
// ok, since there was something to do, we want to loop again
nextSendTime = now;
} else {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Nothing happened outbound, next is in " + (nextSendTime-now));
}
return nextSendTime;
}
/**
* Caller should probably synch on outboundState
*/
private void processExpired(OutboundEstablishState outboundState) {
if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
long nonce = outboundState.getIntroNonce();
if (nonce >= 0) {
// remove only if value == state
boolean removed = _liveIntroductions.remove(Long.valueOf(nonce), outboundState);
if (removed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + outboundState.getRemoteHostId() + " timed out");
_context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0);
}
}
// should have already been removed in handleOutbound() above
// remove only if value == state
boolean removed = _outboundStates.remove(outboundState.getRemoteHostId(), outboundState);
if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) {
if (_log.shouldLog(Log.INFO))
_log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime());
while (true) {
@@ -1025,25 +1118,7 @@ class EstablishmentManager {
break;
_transport.failed(msg, "Expired during failed establish");
}
String err = null;
switch (outboundState.getState()) {
case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY:
err = "Took too long to establish remote connection (confirmed partially)";
break;
case OutboundEstablishState.STATE_CREATED_RECEIVED:
err = "Took too long to establish remote connection (created received)";
break;
case OutboundEstablishState.STATE_REQUEST_SENT:
err = "Took too long to establish remote connection (request sent)";
break;
case OutboundEstablishState.STATE_PENDING_INTRO:
err = "Took too long to establish remote connection (intro failed)";
break;
case OutboundEstablishState.STATE_UNKNOWN: // fallthrough
default:
err = "Took too long to establish remote connection (unknown state)";
}
String err = "Took too long to establish OB connection, state = " + outboundState.getState();
Hash peer = outboundState.getRemoteIdentity().calculateHash();
//_context.shitlist().shitlistRouter(peer, err, UDPTransport.STYLE);
_transport.markUnreachable(peer);
@@ -1074,10 +1149,29 @@ class EstablishmentManager {
_log.log(Log.CRIT, "Error in the establisher", re);
}
}
_inboundStates.clear();
_outboundStates.clear();
_queuedOutbound.clear();
_liveIntroductions.clear();
}
}
// Debugging
private long _lastPrinted;
private static final long PRINT_INTERVAL = 5*1000;
private void doPass() {
if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) {
_lastPrinted = _context.clock().now();
int iactive = _inboundStates.size();
int oactive = _outboundStates.size();
if (iactive > 0 || oactive > 0) {
int queued = _queuedOutbound.size();
int live = _liveIntroductions.size();
_log.debug("OB states: " + oactive + " IB states: " + iactive +
" OB queued: " + queued + " intros: " + live);
}
}
_activity = 0;
long now = _context.clock().now();
long nextSendTime = -1;

View File

@@ -39,7 +39,7 @@ class InboundEstablishState {
private SessionKey _sessionKey;
private SessionKey _macKey;
private Signature _sentSignature;
// SessionConfirmed messages
// SessionConfirmed messages - fragmented in theory but not in practice - see below
private byte _receivedIdentity[][];
private long _receivedSignedOnTime;
private byte _receivedSignature[];
@@ -48,25 +48,36 @@ class InboundEstablishState {
// general status
private final long _establishBegin;
//private long _lastReceive;
// private long _lastSend;
private long _lastSend;
private long _nextSend;
private final RemoteHostId _remoteHostId;
private int _currentState;
private InboundState _currentState;
private boolean _complete;
// count for backoff
private int _createdSentCount;
/** nothin known yet */
public static final int STATE_UNKNOWN = 0;
/** we have received an initial request */
public static final int STATE_REQUEST_RECEIVED = 1;
/** we have sent a signed creation packet */
public static final int STATE_CREATED_SENT = 2;
/** we have received one or more confirmation packets */
public static final int STATE_CONFIRMED_PARTIALLY = 3;
/** we have completely received all of the confirmation packets */
public static final int STATE_CONFIRMED_COMPLETELY = 4;
/** we are explicitly failing it */
public static final int STATE_FAILED = 5;
public enum InboundState {
/** nothin known yet */
IB_STATE_UNKNOWN,
/** we have received an initial request */
IB_STATE_REQUEST_RECEIVED,
/** we have sent a signed creation packet */
IB_STATE_CREATED_SENT,
/** we have received one but not all the confirmation packets
* This never happens in practice - see below. */
IB_STATE_CONFIRMED_PARTIALLY,
/** we have all the confirmation packets */
IB_STATE_CONFIRMED_COMPLETELY,
/** we are explicitly failing it */
IB_STATE_FAILED
}
/** basic delay before backoff */
private static final long RETRANSMIT_DELAY = 1500;
/** max delay including backoff */
private static final long MAX_DELAY = 15*1000;
public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort,
DHSessionKeyBuilder dh) {
_context = ctx;
@@ -75,12 +86,14 @@ class InboundEstablishState {
_alicePort = remotePort;
_remoteHostId = new RemoteHostId(_aliceIP, _alicePort);
_bobPort = localPort;
_currentState = STATE_UNKNOWN;
_currentState = InboundState.IB_STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
_keyBuilder = dh;
}
public synchronized int getState() { return _currentState; }
public synchronized InboundState getState() { return _currentState; }
/** @return if previously complete */
public synchronized boolean complete() {
boolean already = _complete;
_complete = true;
@@ -96,8 +109,8 @@ class InboundEstablishState {
req.readIP(_bobIP, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive sessionRequest, BobIP = " + Addresses.toString(_bobIP));
if (_currentState == STATE_UNKNOWN)
_currentState = STATE_REQUEST_RECEIVED;
if (_currentState == InboundState.IB_STATE_UNKNOWN)
_currentState = InboundState.IB_STATE_REQUEST_RECEIVED;
packetReceived();
}
@@ -105,6 +118,9 @@ class InboundEstablishState {
public synchronized byte[] getReceivedX() { return _receivedX; }
public synchronized byte[] getReceivedOurIP() { return _bobIP; }
/**
* Generates session key and mac key.
*/
public synchronized void generateSessionKey() throws DHSessionKeyBuilder.InvalidPublicParameterException {
if (_sessionKey != null) return;
_keyBuilder.setPeerPublicValue(_receivedX);
@@ -135,7 +151,7 @@ class InboundEstablishState {
}
public synchronized void fail() {
_currentState = STATE_FAILED;
_currentState = InboundState.IB_STATE_FAILED;
}
public synchronized long getSentRelayTag() { return _sentRelayTag; }
@@ -197,20 +213,32 @@ class InboundEstablishState {
/** note that we just sent a SessionCreated packet */
public synchronized void createdPacketSent() {
// _lastSend = _context.clock().now();
if ( (_currentState == STATE_UNKNOWN) || (_currentState == STATE_REQUEST_RECEIVED) )
_currentState = STATE_CREATED_SENT;
_lastSend = _context.clock().now();
long delay;
if (_createdSentCount == 0) {
delay = RETRANSMIT_DELAY;
} else {
delay = Math.min(RETRANSMIT_DELAY << _createdSentCount, MAX_DELAY);
}
_createdSentCount++;
_nextSend = _lastSend + delay;
if ( (_currentState == InboundState.IB_STATE_UNKNOWN) || (_currentState == InboundState.IB_STATE_REQUEST_RECEIVED) )
_currentState = InboundState.IB_STATE_CREATED_SENT;
}
/** how long have we been trying to establish this session? */
public long getLifetime() { return _context.clock().now() - _establishBegin; }
public long getEstablishBeginTime() { return _establishBegin; }
public synchronized long getNextSendTime() { return _nextSend; }
public synchronized void setNextSendTime(long when) { _nextSend = when; }
/** RemoteHostId, uniquely identifies an attempt */
RemoteHostId getRemoteHostId() { return _remoteHostId; }
/**
* Note that while a SessionConfirmed could in theory be fragmented,
* in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max,
* so it will never be fragmented.
*/
public synchronized void receiveSessionConfirmed(UDPPacketReader.SessionConfirmedReader conf) {
if (_receivedIdentity == null)
_receivedIdentity = new byte[conf.readTotalFragmentNum()][];
@@ -235,20 +263,23 @@ class InboundEstablishState {
conf.readFinalSignature(_receivedSignature, 0);
}
if ( (_currentState == STATE_UNKNOWN) ||
(_currentState == STATE_REQUEST_RECEIVED) ||
(_currentState == STATE_CREATED_SENT) ) {
if ( (_currentState == InboundState.IB_STATE_UNKNOWN) ||
(_currentState == InboundState.IB_STATE_REQUEST_RECEIVED) ||
(_currentState == InboundState.IB_STATE_CREATED_SENT) ) {
if (confirmedFullyReceived())
_currentState = STATE_CONFIRMED_COMPLETELY;
_currentState = InboundState.IB_STATE_CONFIRMED_COMPLETELY;
else
_currentState = STATE_CONFIRMED_PARTIALLY;
_currentState = InboundState.IB_STATE_CONFIRMED_PARTIALLY;
}
packetReceived();
}
/** have we fully received the SessionConfirmed messages from Alice? */
public synchronized boolean confirmedFullyReceived() {
/**
* Have we fully received the SessionConfirmed messages from Alice?
* Caller must synch on this.
*/
private boolean confirmedFullyReceived() {
if (_receivedIdentity != null) {
for (int i = 0; i < _receivedIdentity.length; i++)
if (_receivedIdentity[i] == null)

View File

@@ -53,7 +53,7 @@ class OutboundEstablishState {
private final RouterIdentity _remotePeer;
private final SessionKey _introKey;
private final Queue<OutNetMessage> _queuedMessages;
private int _currentState;
private OutboundState _currentState;
private long _introductionNonce;
// intro
private final UDPAddress _remoteAddress;
@@ -62,19 +62,25 @@ class OutboundEstablishState {
private int _confirmedSentCount;
private int _requestSentCount;
private int _introSentCount;
// Times for timeout
private long _confirmedSentTime;
private long _requestSentTime;
private long _introSentTime;
/** nothin sent yet */
public static final int STATE_UNKNOWN = 0;
/** we have sent an initial request */
public static final int STATE_REQUEST_SENT = 1;
/** we have received a signed creation packet */
public static final int STATE_CREATED_RECEIVED = 2;
/** we have sent one or more confirmation packets */
public static final int STATE_CONFIRMED_PARTIALLY = 3;
/** we have received a data packet */
public static final int STATE_CONFIRMED_COMPLETELY = 4;
/** we need to have someone introduce us to the peer, but haven't received a RelayResponse yet */
public static final int STATE_PENDING_INTRO = 5;
public enum OutboundState {
/** nothin sent yet */
OB_STATE_UNKNOWN,
/** we have sent an initial request */
OB_STATE_REQUEST_SENT,
/** we have received a signed creation packet */
OB_STATE_CREATED_RECEIVED,
/** we have sent one or more confirmation packets */
OB_STATE_CONFIRMED_PARTIALLY,
/** we have received a data packet */
OB_STATE_CONFIRMED_COMPLETELY,
/** we need to have someone introduce us to the peer, but haven't received a RelayResponse yet */
OB_STATE_PENDING_INTRO
}
/** basic delay before backoff */
private static final long RETRANSMIT_DELAY = 1500;
@@ -99,7 +105,7 @@ class OutboundEstablishState {
_remotePeer = remotePeer;
_introKey = introKey;
_queuedMessages = new LinkedBlockingQueue();
_currentState = STATE_UNKNOWN;
_currentState = OutboundState.OB_STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
_remoteAddress = addr;
_introductionNonce = -1;
@@ -109,11 +115,13 @@ class OutboundEstablishState {
if ( (addr != null) && (addr.getIntroducerCount() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("new outbound establish to " + remotePeer.calculateHash() + ", with address: " + addr);
_currentState = STATE_PENDING_INTRO;
_currentState = OutboundState.OB_STATE_PENDING_INTRO;
}
}
public synchronized int getState() { return _currentState; }
public synchronized OutboundState getState() { return _currentState; }
/** @return if previously complete */
public synchronized boolean complete() {
boolean already = _complete;
_complete = true;
@@ -122,6 +130,8 @@ class OutboundEstablishState {
public UDPAddress getRemoteAddress() { return _remoteAddress; }
public void setIntroNonce(long nonce) { _introductionNonce = nonce; }
/** @return -1 if unset */
public long getIntroNonce() { return _introductionNonce; }
public void addMessage(OutNetMessage msg) {
@@ -132,6 +142,7 @@ class OutboundEstablishState {
_log.warn("attempt to add duplicate msg to queue: " + msg);
}
/** @return null if none */
public OutNetMessage getNextQueuedMessage() {
return _queuedMessages.poll();
}
@@ -151,7 +162,9 @@ class OutboundEstablishState {
}
public byte[] getSentX() { return _sentX; }
/** the remote side (Bob) */
public synchronized byte[] getSentIP() { return _bobIP; }
/** the remote side (Bob) */
public synchronized int getSentPort() { return _bobPort; }
public synchronized void receiveSessionCreated(UDPPacketReader.SessionCreatedReader reader) {
@@ -181,8 +194,8 @@ class OutboundEstablishState {
+ " SignedOn: " + _receivedSignedOnTime
+ "\nthis: " + this.toString());
if ( (_currentState == STATE_UNKNOWN) || (_currentState == STATE_REQUEST_SENT) )
_currentState = STATE_CREATED_RECEIVED;
if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) || (_currentState == OutboundState.OB_STATE_REQUEST_SENT) )
_currentState = OutboundState.OB_STATE_CREATED_RECEIVED;
packetReceived();
}
@@ -191,6 +204,8 @@ class OutboundEstablishState {
* session was created properly. If it wasn't, all the SessionCreated
* remnants are dropped (perhaps they were spoofed, etc) so that we can
* receive another one
*
* Generates session key and mac key.
*/
public synchronized boolean validateSessionCreated() {
if (_receivedSignature != null) {
@@ -231,14 +246,18 @@ class OutboundEstablishState {
_receivedIV = null;
_receivedSignature = null;
if ( (_currentState == STATE_UNKNOWN) ||
(_currentState == STATE_REQUEST_SENT) ||
(_currentState == STATE_CREATED_RECEIVED) )
_currentState = STATE_REQUEST_SENT;
if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) ||
(_currentState == OutboundState.OB_STATE_REQUEST_SENT) ||
(_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) )
_currentState = OutboundState.OB_STATE_REQUEST_SENT;
_nextSend = _context.clock().now();
}
/**
* Generates session key and mac key.
* Caller must synch on this.
*/
private void generateSessionKey() throws DHSessionKeyBuilder.InvalidPublicParameterException {
if (_sessionKey != null) return;
_keyBuilder.setPeerPublicValue(_receivedY);
@@ -247,14 +266,15 @@ class OutboundEstablishState {
_macKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
System.arraycopy(extra.getData(), 0, _macKey.getData(), 0, SessionKey.KEYSIZE_BYTES);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Established outbound keys. cipher: " + Base64.encode(_sessionKey.getData())
+ " mac: " + Base64.encode(_macKey.getData()));
_log.debug("Established outbound keys. cipher: " + _sessionKey
+ " mac: " + _macKey);
}
/**
* decrypt the signature (and subsequent pad bytes) with the
* additional layer of encryption using the negotiated key along side
* the packet's IV
* Caller must synch on this.
*/
private void decryptSignature() {
if (_receivedEncryptedSignature == null) throw new NullPointerException("encrypted signature is null! this=" + this.toString());
@@ -272,6 +292,7 @@ class OutboundEstablishState {
/**
* Verify: Alice's IP + Alice's port + Bob's IP + Bob's port + Alice's
* new relay tag + Bob's signed on time
* Caller must synch on this.
*/
private boolean verifySessionCreated() {
byte signed[] = new byte[256+256 // X + Y
@@ -324,8 +345,11 @@ class OutboundEstablishState {
public synchronized int getReceivedPort() { return _alicePort; }
/**
* Lets sign everything so we can fragment properly
* Let's sign everything so we can fragment properly.
*
* Note that while a SessionConfirmed could in theory be fragmented,
* in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max,
* so it will never be fragmented.
*/
public synchronized void prepareSessionConfirmed() {
if (_sentSignedOnTime > 0)
@@ -365,50 +389,93 @@ class OutboundEstablishState {
/** note that we just sent the SessionConfirmed packet */
public synchronized void confirmedPacketsSent() {
_lastSend = _context.clock().now();
long delay = Math.min(RETRANSMIT_DELAY << (_confirmedSentCount++), MAX_DELAY);
long delay;
if (_confirmedSentCount == 0) {
delay = RETRANSMIT_DELAY;
_confirmedSentTime = _lastSend;
} else {
delay = Math.min(RETRANSMIT_DELAY << _confirmedSentCount, MAX_DELAY);
}
_confirmedSentCount++;
_nextSend = _lastSend + delay;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send confirm packets, nextSend in " + delay);
if ( (_currentState == STATE_UNKNOWN) ||
(_currentState == STATE_REQUEST_SENT) ||
(_currentState == STATE_CREATED_RECEIVED) )
_currentState = STATE_CONFIRMED_PARTIALLY;
if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) ||
(_currentState == OutboundState.OB_STATE_REQUEST_SENT) ||
(_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) )
_currentState = OutboundState.OB_STATE_CONFIRMED_PARTIALLY;
}
/**
* @return when we sent the first SessionConfirmed packet, or 0
* @since 0.9.2
*/
public long getConfirmedSentTime() { return _confirmedSentTime; }
/** note that we just sent the SessionRequest packet */
public synchronized void requestSent() {
_lastSend = _context.clock().now();
long delay = Math.min(RETRANSMIT_DELAY << (_requestSentCount++), MAX_DELAY);
long delay;
if (_requestSentCount == 0) {
delay = RETRANSMIT_DELAY;
_requestSentTime = _lastSend;
} else {
delay = Math.min(RETRANSMIT_DELAY << _requestSentCount, MAX_DELAY);
}
_requestSentCount++;
_nextSend = _lastSend + delay;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send a request packet, nextSend in " + delay);
if (_currentState == STATE_UNKNOWN)
_currentState = STATE_REQUEST_SENT;
if (_currentState == OutboundState.OB_STATE_UNKNOWN)
_currentState = OutboundState.OB_STATE_REQUEST_SENT;
}
/**
* @return when we sent the first SessionRequest packet, or 0
* @since 0.9.2
*/
public long getRequestSentTime() { return _requestSentTime; }
/** note that we just sent the RelayRequest packet */
public synchronized void introSent() {
_lastSend = _context.clock().now();
long delay = Math.min(RETRANSMIT_DELAY << (_introSentCount++), MAX_DELAY);
long delay;
if (_introSentCount == 0) {
delay = RETRANSMIT_DELAY;
_introSentTime = _lastSend;
} else {
delay = Math.min(RETRANSMIT_DELAY << _introSentCount, MAX_DELAY);
}
_introSentCount++;
_nextSend = _lastSend + delay;
if (_currentState == STATE_UNKNOWN)
_currentState = STATE_PENDING_INTRO;
if (_currentState == OutboundState.OB_STATE_UNKNOWN)
_currentState = OutboundState.OB_STATE_PENDING_INTRO;
}
/**
* @return when we sent the first RelayRequest packet, or 0
* @since 0.9.2
*/
public long getIntroSentTime() { return _introSentTime; }
public synchronized void introductionFailed() {
_nextSend = _context.clock().now();
// keep the state as STATE_PENDING_INTRO, so next time the EstablishmentManager asks us
// keep the state as OB_STATE_PENDING_INTRO, so next time the EstablishmentManager asks us
// whats up, it'll try a new random intro peer
}
/**
* This changes the remoteHostId from a hash-based one to a IP/Port one
*/
public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) {
if (_currentState != STATE_PENDING_INTRO)
if (_currentState != OutboundState.OB_STATE_PENDING_INTRO)
return; // we've already successfully been introduced, so don't overwrite old settings
_nextSend = _context.clock().now() + 500; // wait briefly for the hole punching
if (_currentState == STATE_PENDING_INTRO) {
// STATE_UNKNOWN will probe the EstablishmentManager to send a new
if (_currentState == OutboundState.OB_STATE_PENDING_INTRO) {
// OB_STATE_UNKNOWN will probe the EstablishmentManager to send a new
// session request to this newly known address
_currentState = STATE_UNKNOWN;
_currentState = OutboundState.OB_STATE_UNKNOWN;
}
_bobIP = bobIP;
_bobPort = bobPort;
@@ -421,11 +488,6 @@ class OutboundEstablishState {
public long getLifetime() { return _context.clock().now() - _establishBegin; }
public long getEstablishBeginTime() { return _establishBegin; }
public synchronized long getNextSendTime() { return _nextSend; }
public synchronized void setNextSendTime(long when) {
_nextSend = when;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Explicit nextSend=" + (_nextSend-_context.clock().now()), new Exception("Set by"));
}
/** uniquely identifies an attempt */
RemoteHostId getRemoteHostId() { return _remoteHostId; }
@@ -433,7 +495,7 @@ class OutboundEstablishState {
/** we have received a real data packet, so we're done establishing */
public synchronized void dataReceived() {
packetReceived();
_currentState = STATE_CONFIRMED_COMPLETELY;
_currentState = OutboundState.OB_STATE_CONFIRMED_COMPLETELY;
}
private void packetReceived() {

View File

@@ -64,6 +64,7 @@ class OutboundMessageState {
/**
* Called from UDPTransport
* @return success
*/
public boolean initialize(I2NPMessage msg, PeerState peer) {
if (msg == null)
@@ -81,6 +82,7 @@ class OutboundMessageState {
/**
* Called from OutboundMessageFragments
* @return success
*/
public boolean initialize(OutNetMessage m, I2NPMessage msg) {
if ( (m == null) || (msg == null) )
@@ -96,6 +98,10 @@ class OutboundMessageState {
}
}
/**
* Called from OutboundMessageFragments
* @return success
*/
private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
_message = m;
_peer = peer;

View File

@@ -685,12 +685,16 @@ class PacketBuilder {
/**
* Build a new series of SessionConfirmed packets for the given peer,
* encrypting it as necessary.
*
* Note that while a SessionConfirmed could in theory be fragmented,
* in practice a RouterIdentity is 387 bytes and a single fragment is 512 bytes max,
* so it will never be fragmented.
*
* @return ready to send packets, or null if there was a problem
*
* TODO: doesn't really return null, and caller doesn't handle null return
* (null SigningPrivateKey should cause this?)
* Should probably return null if buildSessionConfirmedPacket() turns null for any fragment
* Should probably return null if buildSessionConfirmedPacket() returns null for any fragment
*/
public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState state, RouterIdentity ourIdentity) {
byte identity[] = ourIdentity.toByteArray();
@@ -793,26 +797,88 @@ class PacketBuilder {
* @since 0.8.1
*/
public UDPPacket buildSessionDestroyPacket(PeerState peer) {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("building session destroy packet to " + peer.getRemotePeer());
}
return buildSessionDestroyPacket(peer.getCurrentCipherKey(), peer.getCurrentMACKey(),
peer.getRemoteIPAddress(), peer.getRemotePort());
}
/**
* Build a destroy packet, which contains a header but no body.
* If the keys and ip/port are not yet set, this will return null.
*
* @return packet or null
* @since 0.9.2
*/
public UDPPacket buildSessionDestroyPacket(OutboundEstablishState peer) {
SessionKey cipherKey = peer.getCipherKey();
SessionKey macKey = peer.getMACKey();
byte[] ip = peer.getSentIP();
int port = peer.getSentPort();
if (cipherKey == null || macKey == null || ip == null || port <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cannot send destroy, incomplete " + peer);
return null;
}
InetAddress addr;
try {
addr = InetAddress.getByAddress(ip);
} catch (UnknownHostException uhe) {
return null;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("building session destroy packet to " + peer);
return buildSessionDestroyPacket(cipherKey, macKey, addr, port);
}
/**
* Build a destroy packet, which contains a header but no body.
* If the keys and ip/port are not yet set, this will return null.
*
* @return packet or null
* @since 0.9.2
*/
public UDPPacket buildSessionDestroyPacket(InboundEstablishState peer) {
SessionKey cipherKey = peer.getCipherKey();
SessionKey macKey = peer.getMACKey();
byte[] ip = peer.getSentIP();
int port = peer.getSentPort();
if (cipherKey == null || macKey == null || ip == null || port <= 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Cannot send destroy, incomplete " + peer);
return null;
}
InetAddress addr;
try {
addr = InetAddress.getByAddress(ip);
} catch (UnknownHostException uhe) {
return null;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("building session destroy packet to " + peer);
return buildSessionDestroyPacket(cipherKey, macKey, addr, port);
}
/**
* Build a destroy packet, which contains a header but no body.
* @param cipherKey non-null
* @param macKey non-null
* @since 0.9.2
*/
private UDPPacket buildSessionDestroyPacket(SessionKey cipherKey, SessionKey macKey, InetAddress addr, int port) {
UDPPacket packet = buildPacketHeader((byte)(UDPPacket.PAYLOAD_TYPE_SESSION_DESTROY << 4));
int off = HEADER_SIZE;
StringBuilder msg = null;
if (_log.shouldLog(Log.DEBUG)) {
msg = new StringBuilder(128);
msg.append("building session destroy packet to ").append(peer.getRemotePeer().toBase64().substring(0,6));
}
// no body in this message
if (msg != null)
_log.debug(msg.toString());
// pad up so we're on the encryption boundary
if ( (off % 16) != 0)
off += 16 - (off % 16);
packet.getPacket().setLength(off);
authenticate(packet, peer.getCurrentCipherKey(), peer.getCurrentMACKey());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
authenticate(packet, cipherKey, macKey);
setTo(packet, addr, port);
return packet;
}

View File

@@ -328,7 +328,7 @@ class PacketHandler {
// Note that the vast majority of these are NOT corrupted packets, but
// packets for which we don't have the PeerState (i.e. SessionKey)
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid introduction packet received: " + packet, new Exception("path"));
_log.warn("Cannot validate rcvd pkt (path): " + packet);
_context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration());
switch (peerType) {
case INBOUND_FALLBACK:
@@ -465,11 +465,13 @@ class PacketHandler {
}
/**
* Parse out the interesting bits and honor what it says
* The last step. The packet was decrypted with some key. Now get the message type
* and send it to one of four places: The EstablishmentManager, IntroductionManager,
* PeerTestManager, or InboundMessageFragments.
*
* @param state non-null if fully established
* @param outState non-null if outbound establishing in process
* @param inState unused always null
* @param inState unused always null, TODO use for 48-byte destroys during inbound establishment
* @param isAuthenticated true if a state key was used, false if our own intro key was used
*/
private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state,

View File

@@ -125,6 +125,11 @@ class UDPPacket {
_validateCount = 0;
_remoteHost = null;
_released = false;
// clear out some values to make debugging easier via toString()
_messageType = -1;
_enqueueTime = 0;
_receivedTime = 0;
_fragmentCount = 0;
}
/****
@@ -284,12 +289,17 @@ class UDPPacket {
buf.append(_packet.getAddress().getHostAddress()).append(":");
buf.append(_packet.getPort());
//buf.append(" id=").append(System.identityHashCode(this));
buf.append(" msgType=").append(_messageType);
buf.append(" markType=").append(_markedType);
buf.append(" fragCount=").append(_fragmentCount);
if (_messageType >= 0)
buf.append(" msgType=").append(_messageType);
if (_markedType >= 0)
buf.append(" markType=").append(_markedType);
if (_fragmentCount > 0)
buf.append(" fragCount=").append(_fragmentCount);
buf.append(" sinceEnqueued=").append((_enqueueTime > 0 ? _context.clock().now()-_enqueueTime : -1));
buf.append(" sinceReceived=").append((_receivedTime > 0 ? _context.clock().now()-_receivedTime : -1));
if (_enqueueTime >= 0)
buf.append(" sinceEnqueued=").append(_context.clock().now() - _enqueueTime);
if (_receivedTime >= 0)
buf.append(" sinceReceived=").append(_context.clock().now() - _receivedTime);
//buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1));
//buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1));
//buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength()));