From 3a546612d9e8311446cd196d9a1f75b188d510db Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 17 Aug 2012 14:15:01 +0000 Subject: [PATCH] * SSU: - Use remote MTU when published (ticket #687) - Queue outbound msgs during inbound establish - IntroManager cleanups - More synchronization - More log tweaks --- history.txt | 16 +++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../transport/udp/EstablishmentManager.java | 132 +++++++++++++++--- .../transport/udp/InboundEstablishState.java | 56 ++++++-- .../transport/udp/IntroductionManager.java | 38 ++++- .../transport/udp/OutboundEstablishState.java | 26 ++-- .../i2p/router/transport/udp/PeerState.java | 31 +++- .../i2p/router/transport/udp/UDPAddress.java | 2 +- .../router/transport/udp/UDPTransport.java | 17 +-- 9 files changed, 256 insertions(+), 64 deletions(-) diff --git a/history.txt b/history.txt index 4da7fbd43..738f706a6 100644 --- a/history.txt +++ b/history.txt @@ -1,7 +1,23 @@ +2012-08-17 zzz + * i2psnark: + - Adjust DHT timeouts + - Add max peers per-torrent in tracker + - Remove duplicate clean task for nodes + - Fix another DHT warning message + * SSU: + - Use remote MTU when published (ticket #687) + - Queue outbound msgs during inbound establish + - IntroManager cleanups + - More synchronization + 2012-08-17 sponge * BOB: just some cleanup of old, dead meaningless commentedout code and a little reformatting. +2012-08-16 zzz + * Utils: Drop unused BufferedRandomSource, PooledRandomSource, + EepGetScheduler, EepPost and HTTPSendData, moved to i2p.scripts + 2012-08-15 zzz * i2psnark: - Fix bug preventing completion announcement, broken in 0.9.1 diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 5a58b19bc..e36cd3b14 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 11; + public final static long BUILD = 12; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 0f7cbe830..a31b4d38c 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -12,6 +12,7 @@ import net.i2p.data.Base64; import net.i2p.data.Hash; import net.i2p.data.RouterAddress; import net.i2p.data.RouterIdentity; +import net.i2p.data.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.data.i2np.DeliveryStatusMessage; @@ -191,9 +192,39 @@ class EstablishmentManager { //_context.shitlist().shitlistRouter(msg.getTarget().getIdentity().calculateHash(), "Invalid SSU address", UDPTransport.STYLE); return; } - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Add outbound establish state to: " + to); + + InboundEstablishState inState = _inboundStates.get(to); + if (inState != null) { + // we have an inbound establishment in progress, queue it there instead + synchronized (inState) { + switch (inState.getState()) { + case IB_STATE_UNKNOWN: + case IB_STATE_REQUEST_RECEIVED: + case IB_STATE_CREATED_SENT: + case IB_STATE_CONFIRMED_PARTIALLY: + case IB_STATE_CONFIRMED_COMPLETELY: + // queue it + inState.addMessage(msg); + if (_log.shouldLog(Log.WARN)) + _log.debug("OB msg queued to IES"); + break; + + case IB_STATE_COMPLETE: + // race, send it out (but don't call _transport.send() again and risk a loop) + _transport.sendIfEstablished(msg); + break; + + case IB_STATE_FAILED: + // race, failed + _transport.failed(msg, "OB msg failed during IB establish"); + break; + } + } + return; + } + + + } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Add indirect outbound establish state to: " + addr); @@ -251,9 +282,13 @@ class EstablishmentManager { sessionKey, addr, _transport.getDHBuilder()); OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state); boolean isNew = oldState == null; - if (!isNew) + if (isNew) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Adding new " + state); + } else { // whoops, somebody beat us to it, throw out the state we just created state = oldState; + } } } if (state != null) { @@ -548,7 +583,7 @@ class EstablishmentManager { * */ private void handleCompletelyEstablished(InboundEstablishState state) { - if (state.complete()) return; + if (state.isComplete()) return; RouterIdentity remote = state.getConfirmedIdentity(); PeerState peer = new PeerState(_context, _transport, @@ -556,6 +591,22 @@ class EstablishmentManager { peer.setCurrentCipherKey(state.getCipherKey()); peer.setCurrentMACKey(state.getMACKey()); peer.setWeRelayToThemAs(state.getSentRelayTag()); + // Lookup the peer's MTU from the netdb, since it isn't included in the protocol setup (yet) + // TODO if we don't have RI then we will get it shortly, but too late. + // Perhaps netdb should notify transport when it gets a new RI... + RouterInfo info = _context.netDb().lookupRouterInfoLocally(remote.calculateHash()); + if (info != null) { + RouterAddress addr = info.getTargetAddress(UDPTransport.STYLE); + if (addr != null) { + String smtu = addr.getOption(UDPAddress.PROP_MTU); + if (smtu != null) { + try { + int mtu = MTU.rectify(Integer.parseInt(smtu)); + peer.setHisMTU(mtu); + } catch (NumberFormatException nfe) {} + } + } + } // 0 is the default //peer.setTheyRelayToUsAs(0); @@ -573,6 +624,17 @@ class EstablishmentManager { _context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0); sendInboundComplete(peer); + OutNetMessage msg; + while ((msg = state.getNextQueuedMessage()) != null) { + if (_context.clock().now() - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { + msg.timestamp("took too long but established..."); + _transport.failed(msg, "Took too long to establish, but it was established"); + } else { + msg.timestamp("session fully established and sent"); + _transport.send(msg); + } + } + state.complete(); } /** @@ -634,6 +696,9 @@ class EstablishmentManager { peer.setCurrentCipherKey(state.getCipherKey()); peer.setCurrentMACKey(state.getMACKey()); peer.setTheyRelayToUsAs(state.getReceivedRelayTag()); + int mtu = state.getRemoteAddress().getMTU(); + if (mtu > 0) + peer.setHisMTU(mtu); // 0 is the default //peer.setWeRelayToThemAs(0); @@ -780,15 +845,22 @@ class EstablishmentManager { if (_log.shouldLog(Log.INFO)) _log.info("Received RelayResponse for " + state.getRemoteIdentity().calculateHash() + " - they are on " + addr.toString() + ":" + port + " (according to " + bob + ")"); - RemoteHostId oldId = state.getRemoteHostId(); - state.introduced(addr, ip, port); - RemoteHostId newId = state.getRemoteHostId(); - // Swap out the RemoteHostId the state is indexed under - // TODO only if !oldId.equals(newId) ? synch? - OutboundEstablishState oldState = _outboundStates.remove(oldId); - _outboundStates.put(newId, state); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("RR replaced " + oldId + " -> " + oldState + " with " + newId + " -> " + state); + synchronized (state) { + RemoteHostId oldId = state.getRemoteHostId(); + state.introduced(addr, ip, port); + RemoteHostId newId = state.getRemoteHostId(); + // Swap out the RemoteHostId the state is indexed under + // TODO only if !oldId.equals(newId) ? synch? + // FIXME if the RemoteHostIDs aren't the same we have problems + // FIXME if the RemoteHostIDs aren't the same the SessionCreated signature is probably going to fail + // Common occurrence - port changes + if (!oldId.equals(newId)) { + _outboundStates.remove(oldId); + _outboundStates.put(newId, state); + if (_log.shouldLog(Log.WARN)) + _log.warn("RR replaced " + oldId + " with " + newId + " -> " + state); + } + } notifyActivity(); } @@ -923,7 +995,9 @@ class EstablishmentManager { synchronized (inboundState) { switch (inboundState.getState()) { case IB_STATE_REQUEST_RECEIVED: - if (!expired) + if (expired) + processExpired(inboundState); + else sendCreated(inboundState); break; @@ -931,6 +1005,7 @@ class EstablishmentManager { case IB_STATE_CONFIRMED_PARTIALLY: if (expired) { sendDestroy(inboundState); + processExpired(inboundState); } else if (inboundState.getNextSendTime() <= now) { sendCreated(inboundState); } @@ -945,6 +1020,7 @@ class EstablishmentManager { // So next time we will not accept the con, rather than doing the whole handshake _context.blocklist().add(inboundState.getSentIP()); inboundState.fail(); + processExpired(inboundState); } else { handleCompletelyEstablished(inboundState); } @@ -952,9 +1028,11 @@ class EstablishmentManager { if (_log.shouldLog(Log.WARN)) _log.warn("confirmed with invalid? " + inboundState); inboundState.fail(); + processExpired(inboundState); } break; + case IB_STATE_COMPLETE: // fall through case IB_STATE_FAILED: break; // already removed; @@ -1118,10 +1196,8 @@ class EstablishmentManager { if (outboundState.getState() != OB_STATE_CONFIRMED_COMPLETELY) { if (_log.shouldLog(Log.INFO)) _log.info("Lifetime of expired outbound establish: " + outboundState.getLifetime()); - while (true) { - OutNetMessage msg = outboundState.getNextQueuedMessage(); - if (msg == null) - break; + OutNetMessage msg; + while ((msg = outboundState.getNextQueuedMessage()) != null) { _transport.failed(msg, "Expired during failed establish"); } String err = "Took too long to establish OB connection, state = " + outboundState.getState(); @@ -1131,15 +1207,25 @@ class EstablishmentManager { _transport.dropPeer(peer, false, err); //_context.profileManager().commErrorOccurred(peer); } else { - while (true) { - OutNetMessage msg = outboundState.getNextQueuedMessage(); - if (msg == null) - break; + OutNetMessage msg; + while ((msg = outboundState.getNextQueuedMessage()) != null) { _transport.send(msg); } } } + + /** + * Caller should probably synch on inboundState + * @since 0.9.2 + */ + private void processExpired(InboundEstablishState inboundState) { + OutNetMessage msg; + while ((msg = inboundState.getNextQueuedMessage()) != null) { + _transport.failed(msg, "Expired during failed establish"); + } + } + /** * Driving thread, processing up to one step for an inbound peer and up to * one step for an outbound peer. This is prodded whenever any peer's state diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java index add2ffa5a..7412562f8 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java @@ -2,6 +2,8 @@ package net.i2p.router.transport.udp; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.Base64; import net.i2p.data.ByteArray; @@ -10,6 +12,7 @@ import net.i2p.data.DataHelper; import net.i2p.data.RouterIdentity; import net.i2p.data.SessionKey; import net.i2p.data.Signature; +import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.router.transport.crypto.DHSessionKeyBuilder; import net.i2p.util.Addresses; @@ -52,7 +55,7 @@ class InboundEstablishState { private long _nextSend; private final RemoteHostId _remoteHostId; private InboundState _currentState; - private boolean _complete; + private final Queue _queuedMessages; // count for backoff private int _createdSentCount; @@ -69,7 +72,9 @@ class InboundEstablishState { /** we have all the confirmation packets */ IB_STATE_CONFIRMED_COMPLETELY, /** we are explicitly failing it */ - IB_STATE_FAILED + IB_STATE_FAILED, + /** Successful completion, PeerState created and added to transport */ + IB_STATE_COMPLETE } /** basic delay before backoff */ @@ -89,17 +94,44 @@ class InboundEstablishState { _currentState = InboundState.IB_STATE_UNKNOWN; _establishBegin = ctx.clock().now(); _keyBuilder = dh; + _queuedMessages = new LinkedBlockingQueue(); } public synchronized InboundState getState() { return _currentState; } /** @return if previously complete */ - public synchronized boolean complete() { - boolean already = _complete; - _complete = true; - return already; + public synchronized boolean isComplete() { + return _currentState == InboundState.IB_STATE_COMPLETE || + _currentState == InboundState.IB_STATE_FAILED; + } + + /** Notify successful completion */ + public synchronized void complete() { + _currentState = InboundState.IB_STATE_COMPLETE; } + /** + * Queue a message to be sent after the session is established. + * This will only happen if we decide to send something during establishment + * @since 0.9.2 + */ + public void addMessage(OutNetMessage msg) { + // chance of a duplicate here in a race, that's ok + if (!_queuedMessages.contains(msg)) + _queuedMessages.offer(msg); + else if (_log.shouldLog(Log.WARN)) + _log.warn("attempt to add duplicate msg to queue: " + msg); + } + + /** + * Pull from the message queue + * @return null if none + * @since 0.9.2 + */ + public OutNetMessage getNextQueuedMessage() { + return _queuedMessages.poll(); + } + public synchronized void receiveSessionRequest(UDPPacketReader.SessionRequestReader req) { if (_receivedX == null) _receivedX = new byte[UDPPacketReader.SessionRequestReader.X_LENGTH]; @@ -200,8 +232,8 @@ class InboundEstablishState { if (_log.shouldLog(Log.DEBUG)) { StringBuilder buf = new StringBuilder(128); buf.append("Signing sessionCreated:"); - buf.append(" ReceivedX: ").append(Base64.encode(_receivedX)); - buf.append(" SentY: ").append(Base64.encode(_sentY)); + //buf.append(" ReceivedX: ").append(Base64.encode(_receivedX)); + //buf.append(" SentY: ").append(Base64.encode(_sentY)); buf.append(" Alice: ").append(Addresses.toString(_aliceIP, _alicePort)); buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort)); buf.append(" RelayTag: ").append(_sentRelayTag); @@ -370,15 +402,15 @@ class InboundEstablishState { @Override public String toString() { StringBuilder buf = new StringBuilder(128); - buf.append("IES ").append(super.toString()); + buf.append("IES "); + buf.append(Addresses.toString(_aliceIP, _alicePort)); if (_receivedX != null) buf.append(" ReceivedX: ").append(Base64.encode(_receivedX, 0, 4)); if (_sentY != null) buf.append(" SentY: ").append(Base64.encode(_sentY, 0, 4)); - buf.append(" Alice: ").append(Addresses.toString(_aliceIP, _alicePort)); - buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort)); + //buf.append(" Bob: ").append(Addresses.toString(_bobIP, _bobPort)); buf.append(" RelayTag: ").append(_sentRelayTag); - buf.append(" SignedOn: ").append(_sentSignedOnTime); + //buf.append(" SignedOn: ").append(_sentSignedOnTime); buf.append(' ').append(_currentState); return buf.toString(); } diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index 20f7d1e55..af3d28f44 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -81,7 +81,7 @@ class IntroductionManager { } } - public PeerState get(long id) { + private PeerState get(long id) { return _outbound.get(Long.valueOf(id)); } @@ -187,28 +187,54 @@ class IntroductionManager { return _inbound.size(); } + /** + * We are Charlie and we got this from Bob. + * Send a HolePunch to Alice, who will soon be sending us a RelayRequest. + * We should already have a session with Bob, but probably not with Alice. + * + * We do some throttling here. + */ void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) { if (_context.router().isHidden()) return; if (_log.shouldLog(Log.INFO)) _log.info("Receive relay intro from " + bob); _context.statManager().addRateData("udp.receiveRelayIntro", 1, 0); + + if (!_transport.allowConnection()) + return; + + // TODO throttle + // TODO IB req limits + // TODO check if already have a session or in progress state. + _transport.send(_builder.buildHolePunch(reader)); } + /** + * We are Bob and we got this from Alice. + * Send a RelayIntro to Charlie and a RelayResponse to Alice. + * We should already have a session with Charlie, but not necessarily with Alice. + */ void receiveRelayRequest(RemoteHostId alice, UDPPacketReader reader) { if (_context.router().isHidden()) return; long tag = reader.getRelayRequestReader().readTag(); - PeerState charlie = _transport.getPeerState(tag); + PeerState charlie = get(tag); + if (charlie == null) { + if (_log.shouldLog(Log.INFO)) + _log.info("Receive relay request from " + alice + + " with unknown tag"); + _context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0); + return; + } if (_log.shouldLog(Log.INFO)) _log.info("Receive relay request from " + alice + " for tag " + tag + " and relaying with " + charlie); - if (charlie == null) { - _context.statManager().addRateData("udp.receiveRelayRequestBadTag", 1, 0); - return; - } + + // TODO throttle based on alice identity and/or intro tag? + _context.statManager().addRateData("udp.receiveRelayRequest", 1, 0); byte key[] = new byte[SessionKey.KEYSIZE_BYTES]; reader.getRelayRequestReader().readAliceIntroKey(key, 0); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index 2d3395cf5..c67a7e3ba 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -90,6 +90,9 @@ class OutboundEstablishState { /** max delay including backoff */ private static final long MAX_DELAY = 15*1000; + /** + * @param addr non-null + */ public OutboundEstablishState(RouterContext ctx, InetAddress remoteHost, int remotePort, RouterIdentity remotePeer, SessionKey introKey, UDPAddress addr, DHSessionKeyBuilder dh) { @@ -113,7 +116,7 @@ class OutboundEstablishState { _keyBuilder = dh; _sentX = new byte[UDPPacketReader.SessionRequestReader.X_LENGTH]; prepareSessionRequest(); - if ( (addr != null) && (addr.getIntroducerCount() > 0) ) { + if (addr.getIntroducerCount() > 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("new outbound establish to " + remotePeer.calculateHash() + ", with address: " + addr); _currentState = OutboundState.OB_STATE_PENDING_INTRO; @@ -131,12 +134,17 @@ class OutboundEstablishState { return already; } + /** @return non-null */ public UDPAddress getRemoteAddress() { return _remoteAddress; } + public void setIntroNonce(long nonce) { _introductionNonce = nonce; } /** @return -1 if unset */ public long getIntroNonce() { return _introductionNonce; } + /** + * Queue a message to be sent after the session is established. + */ public void addMessage(OutNetMessage msg) { // chance of a duplicate here in a race, that's ok if (!_queuedMessages.contains(msg)) @@ -190,12 +198,12 @@ class OutboundEstablishState { reader.readIV(_receivedIV, 0); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Receive session created:\neSig: " + Base64.encode(_receivedEncryptedSignature) - + "\nreceivedIV: " + Base64.encode(_receivedIV) - + "\nAliceIP: " + Addresses.toString(_aliceIP) + _log.debug("Receive session created:Sig: " + Base64.encode(_receivedEncryptedSignature) + + "receivedIV: " + Base64.encode(_receivedIV) + + "AliceIP: " + Addresses.toString(_aliceIP) + " RelayTag: " + _receivedRelayTag + " SignedOn: " + _receivedSignedOnTime - + "\nthis: " + this.toString()); + + ' ' + this.toString()); if (_currentState == OutboundState.OB_STATE_UNKNOWN || _currentState == OutboundState.OB_STATE_REQUEST_SENT || @@ -212,6 +220,8 @@ class OutboundEstablishState { * receive another one * * Generates session key and mac key. + * + * @return true if valid */ public synchronized boolean validateSessionCreated() { if (_receivedSignature != null) { @@ -253,7 +263,6 @@ class OutboundEstablishState { _receivedSignature = null; if ( (_currentState == OutboundState.OB_STATE_UNKNOWN) || - (_currentState == OutboundState.OB_STATE_REQUEST_SENT) || (_currentState == OutboundState.OB_STATE_CREATED_RECEIVED) ) _currentState = OutboundState.OB_STATE_REQUEST_SENT; @@ -292,7 +301,7 @@ class OutboundEstablishState { System.arraycopy(_receivedEncryptedSignature, 0, signatureBytes, 0, Signature.SIGNATURE_BYTES); _receivedSignature = new Signature(signatureBytes); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Decrypted received signature: \n" + Base64.encode(signatureBytes)); + _log.debug("Decrypted received signature: " + Base64.encode(signatureBytes)); } /** @@ -475,7 +484,8 @@ class OutboundEstablishState { } /** - * This changes the remoteHostId from a hash-based one to a IP/Port one + * This changes the remoteHostId from a hash-based one to a IP/Port one, + * OR the IP or port could change. */ public synchronized void introduced(InetAddress bob, byte bobIP[], int bobPort) { if (_currentState != OutboundState.OB_STATE_PENDING_INTRO) diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index a42f2bff5..96b40db43 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -156,7 +156,7 @@ class PeerState { private int _mtu; private int _mtuReceive; /** what is the largest packet we will ever send to the peer? */ - private final int _largeMTU; + private int _largeMTU; /* how many consecutive packets at or under the min MTU have been received */ private long _consecutiveSmall; /** when did we last check the MTU? */ @@ -987,8 +987,10 @@ class PeerState { _messagesSent++; if (numSends < 2) { - recalculateTimeouts(lifetime); - adjustMTU(); + synchronized (this) { + recalculateTimeouts(lifetime); + adjustMTU(); + } } else if (_log.shouldLog(Log.INFO)) _log.info("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); @@ -996,7 +998,10 @@ class PeerState { _context.statManager().addRateData("udp.sendBps", _sendBps, lifetime); } - /** adjust the tcp-esque timeouts */ + /** + * Adjust the tcp-esque timeouts. + * Caller should synch on this + */ private void recalculateTimeouts(long lifetime) { _rttDeviation = _rttDeviation + (int)(0.25d*(Math.abs(lifetime-_rtt)-_rttDeviation)); @@ -1017,6 +1022,9 @@ class PeerState { _rto = MAX_RTO; } + /** + * Caller should synch on this + */ private void adjustMTU() { double retransPct = 0; if (_packetsTransmitted > 10) { @@ -1037,6 +1045,17 @@ class PeerState { _mtu = DEFAULT_MTU; } } + + /** + * @since 0.9.2 + */ + public synchronized void setHisMTU(int mtu) { + if (mtu <= MIN_MTU || mtu >= _largeMTU) + return; + _largeMTU = mtu; + if (mtu < _mtu) + _mtu = mtu; + } /** we are resending a packet, so lets jack up the rto */ public void messageRetransmitted(int packets) { @@ -1054,7 +1073,9 @@ class PeerState { *****/ congestionOccurred(); _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation); - adjustMTU(); + synchronized (this) { + adjustMTU(); + } //_rto *= 2; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPAddress.java b/router/java/src/net/i2p/router/transport/udp/UDPAddress.java index 945dff269..07a844783 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPAddress.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPAddress.java @@ -62,7 +62,7 @@ public class UDPAddress { } return rv.toString(); } - + private void parse(RouterAddress addr) { if (addr == null) return; _host = addr.getOption(PROP_HOST); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index f1d1fd9b1..11719a3e6 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -697,14 +697,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return _peersByIdent.get(remotePeer); } - /** - * get the state for the peer being introduced, or null if we aren't - * offering to introduce anyone with that tag. - */ - PeerState getPeerState(long relayTag) { - return _introManager.get(relayTag); - } - /** * Intercept RouterInfo entries received directly from a peer to inject them into * the PeersByCapacity listing. @@ -1278,6 +1270,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + /** + * Send only if established, otherwise fail immediately. + * Never queue with the establisher. + * @since 0.9.2 + */ + void sendIfEstablished(OutNetMessage msg) { + _fragments.add(msg); + } + void send(I2NPMessage msg, PeerState peer) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Injecting a data message to a new peer: " + peer);