- Add peer test throttling
   - Peer test packet count fixes
   - Adjust peer test timeouts and add backoff
   - Reject relays and peer tests from same /16
   - More peer test cleanup and log tweaks
 * Transports:
   - Enforce minimum peer port
   - Warn on low router ports
This commit is contained in:
zzz
2012-10-09 10:20:45 +00:00
parent f10478ceef
commit c419016a12
10 changed files with 331 additions and 124 deletions

View File

@@ -1,3 +1,17 @@
2012-10-09 zzz
* Console, i2ptunnel: Warn on low ports
* NetDB: Increase floodfills again
* RouterInfo: Exit 1 on error in main()
* SSU:
- Add peer test throttling
- Peer test packet count fixes
- Adjust peer test timeouts and add backoff
- Reject relays and peer tests from same /16
- More peer test cleanup and log tweaks
* Transports:
- Enforce minimum peer port
- Warn on low router ports
2012-10-08 zzz
* SSU:
- Fix relay request handling bug from -10

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 12;
public final static long BUILD = 13;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -69,6 +69,13 @@ public class NTCPTransport extends TransportImpl {
private long _lastBadSkew;
private static final long[] RATES = { 10*60*1000 };
/**
* To prevent trouble. To be raised to 1024 in 0.9.4.
*
* @since 0.9.3
*/
private static final int MIN_PEER_PORT = 500;
// Opera doesn't have the char, TODO check UA
//private static final String THINSP = " / ";
private static final String THINSP = " / ";
@@ -301,7 +308,7 @@ public class NTCPTransport extends TransportImpl {
return null;
}
byte[] ip = addr.getIP();
if ( (addr.getPort() <= 0) || (ip == null) ) {
if ( (addr.getPort() < MIN_PEER_PORT) || (ip == null) ) {
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1);
markUnreachable(peer);
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
@@ -541,11 +548,14 @@ public class NTCPTransport extends TransportImpl {
ServerSocketChannel chan = ServerSocketChannel.open();
chan.configureBlocking(false);
int port = _myAddress.getPort();
if (port > 0 && port < 1024)
_log.logAlways(Log.WARN, "Specified NTCP port is " + port + ", ports lower than 1024 not recommended");
InetSocketAddress addr = null;
if(bindToAddr==null) {
addr = new InetSocketAddress(_myAddress.getPort());
addr = new InetSocketAddress(port);
} else {
addr = new InetSocketAddress(bindToAddr, _myAddress.getPort());
addr = new InetSocketAddress(bindToAddr, port);
if (_log.shouldLog(Log.WARN))
_log.warn("Binding only to " + bindToAddr);
}

View File

@@ -10,6 +10,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
@@ -419,9 +420,9 @@ class EstablishmentManager {
*
*/
void receiveSessionRequest(RemoteHostId from, UDPPacketReader reader) {
if (!_transport.isValid(from.getIP())) {
if (from.getPort() < UDPTransport.MIN_PEER_PORT || !_transport.isValid(from.getIP())) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive session request from invalid IP: " + from);
_log.warn("Receive session request from invalid: " + from);
return;
}
@@ -936,13 +937,14 @@ class EstablishmentManager {
/**
* Are IP and port valid?
* Refuse anybody in the same /16
* @since 0.9.3
*/
private boolean isValid(byte[] ip, int port) {
return port >= 1024 &&
port <= 65535 &&
_transport.isValid(ip) &&
(!Arrays.equals(ip, _transport.getExternalIP())) &&
(!DataHelper.eq(ip, 0, _transport.getExternalIP(), 0, 2)) &&
(!_context.blocklist().isBlocklisted(ip));
}

View File

@@ -0,0 +1,44 @@
package net.i2p.router.transport.udp;
import net.i2p.util.ObjectCounter;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Count IPs
*
* @since 0.9.3
*/
class IPThrottler {
private ObjectCounter<Integer> _counter;
private final int _max;
public IPThrottler(int max, long time) {
_max = max;
_counter = new ObjectCounter();
SimpleScheduler.getInstance().addPeriodicEvent(new Cleaner(), time);
}
/**
* Increments before checking
* @return true if ip.length != 4
*/
public boolean shouldThrottle(byte[] ip) {
if (ip.length != 4)
return true;
return _counter.increment(toInt(ip)) > _max;
}
private static Integer toInt(byte ip[]) {
int rv = 0;
for (int i = 0; i < 4; i++)
rv |= (ip[i] & 0xff) << ((3-i)*8);
return Integer.valueOf(rv);
}
private class Cleaner implements SimpleTimer.TimedEvent {
public void timeReached() {
_counter.clear();
}
}
}

View File

@@ -3,7 +3,6 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -12,6 +11,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
@@ -368,13 +368,14 @@ class IntroductionManager {
/**
* Are IP and port valid?
* Refuse anybody in the same /16
* @since 0.9.3
*/
private boolean isValid(byte[] ip, int port) {
return port >= 1024 &&
port <= 65535 &&
_transport.isValid(ip) &&
(!Arrays.equals(ip, _transport.getExternalIP())) &&
(!DataHelper.eq(ip, 0, _transport.getExternalIP(), 0, 2)) &&
(!_context.blocklist().isBlocklisted(ip));
}
}

View File

@@ -2,7 +2,6 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -15,12 +14,16 @@ import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.router.CommSystemFacade;
import net.i2p.router.RouterContext;
import static net.i2p.router.transport.udp.PeerTestState.Role.*;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
/**
* Entry points are runTest() to start a new test as Alice,
* and receiveTest() for all received test packets.
*
* From udp.html on the website:
<p>The automation of collaborative reachability testing for peers is
@@ -31,12 +34,27 @@ quite simple:</p>
<pre>
Alice Bob Charlie
runTest()
sendTestToBob() receiveFromAliceAsBob()
PeerTest -------------------&gt;
sendTestToCharlie() receiveFromBobAsCharlie()
PeerTest--------------------&gt;
receiveFromCharlieAsBob()
&lt;-------------------PeerTest
receiveTestReply()
&lt;-------------------PeerTest
receiveTestReply()
&lt;------------------------------------------PeerTest
receiveFromAliceAsCharlie()
PeerTest------------------------------------------&gt;
receiveTestReply()
&lt;------------------------------------------PeerTest
</pre>
@@ -105,9 +123,27 @@ class PeerTestManager {
private boolean _currentTestComplete;
/** as Alice */
private final Queue<Long> _recentTests;
private final IPThrottler _throttle;
private static final int MAX_RELAYED_PER_TEST_ALICE = 9;
private static final int MAX_RELAYED_PER_TEST_BOB = 6;
private static final int MAX_RELAYED_PER_TEST_CHARLIE = 6;
/** longest we will keep track of a Charlie nonce for */
private static final int MAX_CHARLIE_LIFETIME = 10*1000;
private static final int MAX_CHARLIE_LIFETIME = 15*1000;
/** as Bob/Charlie */
private static final int MAX_ACTIVE_TESTS = 20;
private static final int MAX_RECENT_TESTS = 40;
/** for the throttler */
private static final int MAX_PER_IP = 12;
private static final long THROTTLE_CLEAN_TIME = 10*60*1000;
/** initial - ContinueTest adds backoff */
private static final int RESEND_TIMEOUT = 4*1000;
private static final int MAX_TEST_TIME = 30*1000;
private static final long MAX_NONCE = (1l << 32) - 1l;
/**
* Have seen peer tests (as Alice) get stuck (_currentTest != null)
@@ -121,16 +157,12 @@ class PeerTestManager {
_activeTests = new ConcurrentHashMap();
_recentTests = new LinkedBlockingQueue();
_packetBuilder = new PacketBuilder(context, transport);
_throttle = new IPThrottler(MAX_PER_IP, THROTTLE_CLEAN_TIME);
_context.statManager().createRateStat("udp.statusKnownCharlie", "How often the bob we pick passes us to a charlie we already have a session with?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveTestReply", "How often we get a reply to our peer test?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.receiveTest", "How often we get a packet requesting us to participate in a peer test?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.testBadIP", "Received IP or port was bad", "udp", UDPTransport.RATES);
}
private static final int RESEND_TIMEOUT = 5*1000;
private static final int MAX_TEST_TIME = 30*1000;
private static final long MAX_NONCE = (1l << 32) - 1l;
//public void runTest(InetAddress bobIP, int bobPort, SessionKey bobIntroKey) {
/**
* The next few methods are for when we are Alice
@@ -138,42 +170,60 @@ class PeerTestManager {
public synchronized void runTest(InetAddress bobIP, int bobPort, SessionKey bobCipherKey, SessionKey bobMACKey) {
if (_currentTest != null) {
if (_log.shouldLog(Log.WARN))
_log.warn("We are already running a test with bob = " + _currentTest.getBobIP() + ", aborting test with bob = " + bobIP);
_log.warn("We are already running a test: " + _currentTest + ", aborting test with bob = " + bobIP);
return;
}
PeerTestState test = new PeerTestState();
test.setNonce(_context.random().nextLong(MAX_NONCE));
if (DataHelper.eq(bobIP.getAddress(), 0, _transport.getExternalIP(), 0, 2)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Not running test with Bob too close to us " + bobIP);
return;
}
PeerTestState test = new PeerTestState(ALICE,
_context.random().nextLong(MAX_NONCE),
_context.clock().now());
test.setBobIP(bobIP);
test.setBobPort(bobPort);
test.setBobCipherKey(bobCipherKey);
test.setBobMACKey(bobMACKey);
test.setBeginTime(_context.clock().now());
test.setLastSendTime(test.getBeginTime());
test.setOurRole(PeerTestState.ALICE);
_currentTest = test;
_currentTestComplete = false;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running test with bob = " + bobIP + ":" + bobPort + " " + test.getNonce());
while (_recentTests.size() > 16)
_log.debug("Start new test: " + test);
while (_recentTests.size() > MAX_RECENT_TESTS)
_recentTests.poll();
_recentTests.offer(Long.valueOf(test.getNonce()));
test.incrementPacketsRelayed();
sendTestToBob();
_context.simpleScheduler().addEvent(new ContinueTest(), RESEND_TIMEOUT);
_context.simpleScheduler().addEvent(new ContinueTest(test.getNonce()), RESEND_TIMEOUT);
}
private class ContinueTest implements SimpleTimer.TimedEvent {
private final long _nonce;
public ContinueTest(long nonce) {
_nonce = nonce;
}
public void timeReached() {
synchronized (PeerTestManager.this) {
PeerTestState state = _currentTest;
if (state == null) {
// already completed
if (state == null || state.getNonce() != _nonce) {
// already completed, possibly on to the next test
return;
} else if (expired()) {
testComplete(true);
} else if (_context.clock().now() - state.getLastSendTime() >= RESEND_TIMEOUT) {
int sent = state.incrementPacketsRelayed();
if (sent > MAX_RELAYED_PER_TEST_ALICE) {
testComplete(false);
if (_log.shouldLog(Log.WARN))
_log.warn("Sent too many packets: " + state);
return;
}
if (state.getReceiveBobTime() <= 0) {
// no message from Bob yet, send it again
sendTestToBob();
@@ -186,7 +236,8 @@ class PeerTestManager {
// second message from Charlie yet
sendTestToCharlie();
}
_context.simpleScheduler().addEvent(ContinueTest.this, RESEND_TIMEOUT);
// retx at 4, 10, 17, 25 elapsed time
_context.simpleScheduler().addEvent(ContinueTest.this, RESEND_TIMEOUT + (sent*1000));
}
}
}
@@ -206,19 +257,20 @@ class PeerTestManager {
PeerTestState test = _currentTest;
if (!expired()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending test to bob: " + test.getBobIP() + ":" + test.getBobPort());
_log.debug("Sending test to Bob: " + test);
_transport.send(_packetBuilder.buildPeerTestFromAlice(test.getBobIP(), test.getBobPort(), test.getBobCipherKey(), test.getBobMACKey(), //_bobIntroKey,
test.getNonce(), _transport.getIntroKey()));
} else {
_currentTest = null;
}
}
/** call from a synchronized method */
private void sendTestToCharlie() {
PeerTestState test = _currentTest;
if (!expired()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending test to charlie: " + test.getCharlieIP() + ":" + test.getCharliePort());
_log.debug("Sending test to Charlie: " + test);
_transport.send(_packetBuilder.buildPeerTestFromAlice(test.getCharlieIP(), test.getCharliePort(), test.getCharlieIntroKey(),
test.getNonce(), _transport.getIntroKey()));
} else {
@@ -278,7 +330,7 @@ class PeerTestManager {
test.setAlicePort(testPort);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test reply from bob @ " + from + " via our " + test.getAlicePort() + "/" + test.getAlicePortFromCharlie());
_log.debug("Receive test reply from Bob: " + test);
if (test.getAlicePortFromCharlie() > 0)
testComplete(false);
} catch (UnknownHostException uhe) {
@@ -318,8 +370,7 @@ class PeerTestManager {
InetAddress addr = InetAddress.getByAddress(ip);
test.setAliceIPFromCharlie(addr);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test reply from charlie @ " + test.getCharlieIP() + " via our "
+ test.getAlicePort() + "/" + test.getAlicePortFromCharlie());
_log.debug("Receive test reply from Charlie: " + test);
if (test.getReceiveBobTime() > 0)
testComplete(true);
} catch (UnknownHostException uhe) {
@@ -328,10 +379,10 @@ class PeerTestManager {
_context.statManager().addRateData("udp.testBadIP", 1);
}
} else {
if (test.getPacketsRelayed() > MAX_RELAYED_PER_TEST) {
if (test.incrementPacketsRelayed() > MAX_RELAYED_PER_TEST_ALICE) {
testComplete(false);
if (_log.shouldLog(Log.WARN))
_log.warn("Received too many packets on the test: " + test);
_log.warn("Sent too many packets on the test: " + test);
return;
}
@@ -348,7 +399,7 @@ class PeerTestManager {
test.setCharlieIP(InetAddress.getByAddress(from.getIP()));
test.setCharliePort(from.getPort());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test from charlie @ " + from);
_log.debug("Receive test from Charlie: " + test);
sendTestToCharlie();
} catch (UnknownHostException uhe) {
if (_log.shouldLog(Log.WARN))
@@ -420,9 +471,12 @@ class PeerTestManager {
}
/**
* Entry point for all incoming packets. Most of the source and dest validation is here.
*
* Receive a test message of some sort from the given peer, queueing up any packet
* that should be sent in response, or if its a reply to our own current testing,
* adjusting our test state.
*
* We could be Alice, Bob, or Charlie.
*/
public void receiveTest(RemoteHostId from, UDPPacketReader reader) {
@@ -431,6 +485,7 @@ class PeerTestManager {
int fromPort = from.getPort();
if (fromPort < 1024 || fromPort > 65535 ||
(!_transport.isValid(fromIP)) ||
DataHelper.eq(fromIP, 0, _transport.getExternalIP(), 0, 2) ||
_context.blocklist().isBlocklisted(fromIP)) {
// spoof check, and don't respond to privileged ports
if (_log.shouldLog(Log.WARN))
@@ -466,36 +521,69 @@ class PeerTestManager {
long nonce = testInfo.readNonce();
PeerTestState test = _currentTest;
if ( (test != null) && (test.getNonce() == nonce) ) {
// we are Alice
// we are Alice, we initiated the test
receiveTestReply(from, testInfo);
return;
}
// we are Bob or Charlie
// we are Bob or Charlie, we are helping Alice
if (testIP != null && Arrays.equals(testIP, _transport.getExternalIP())) {
// spoof check - have to do this after receiveTestReply(), since
// the field should be us there
if (_throttle.shouldThrottle(fromIP)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid address in PeerTest: " + Addresses.toString(testIP, testPort));
_context.statManager().addRateData("udp.testBadIP", 1);
_log.warn("PeerTest throttle from " + Addresses.toString(fromIP, fromPort));
return;
}
PeerTestState state = _activeTests.get(Long.valueOf(nonce));
// use the same counter for both from and to IPs
if (testIP != null && _throttle.shouldThrottle(testIP)) {
if (_log.shouldLog(Log.WARN))
_log.warn("PeerTest throttle to " + Addresses.toString(testIP, testPort));
return;
}
Long lNonce = Long.valueOf(nonce);
PeerTestState state = _activeTests.get(lNonce);
if (testIP != null && DataHelper.eq(testIP, 0, _transport.getExternalIP(), 0, 2)) {
// spoof check - have to do this after receiveTestReply(), since
// the field should be us there.
// Let's also eliminate anybody in the same /16
if (_recentTests.contains(lNonce)) {
if (_log.shouldLog(Log.INFO))
_log.info("Got delayed reply on nonce " + nonce +
" from: " + Addresses.toString(fromIP, fromPort));
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Nearby address in PeerTest: " + Addresses.toString(testIP, testPort) +
" from: " + Addresses.toString(fromIP, fromPort) +
" state? " + state);
_context.statManager().addRateData("udp.testBadIP", 1);
}
return;
}
if (state == null) {
// NEW TEST
if ( (testIP == null) || (testPort <= 0) ) {
// we are bob, since we haven't seen this nonce before AND its coming from alice
if (_activeTests.size() >= MAX_ACTIVE_TESTS) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too many active tests, droppping from Alice " + Addresses.toString(fromIP, fromPort));
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("test IP/port are blank coming from " + from + ", assuming we are Bob and they are alice");
receiveFromAliceAsBob(from, testInfo, nonce, null);
} else {
if (_recentTests.contains(Long.valueOf(nonce))) {
if (_recentTests.contains(lNonce)) {
// ignore the packet, as its a holdover from a recently completed locally
// initiated test
} else {
if (_activeTests.size() >= MAX_ACTIVE_TESTS) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too many active tests, droppping from Bob " + Addresses.toString(fromIP, fromPort));
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("We are charlie, as the testIP/port is " + Addresses.toString(testIP, testPort) + " and the state is unknown for " + nonce);
// we are charlie, since alice never sends us her IP and port, only bob does (and,
@@ -505,7 +593,7 @@ class PeerTestManager {
}
} else {
// EXISTING TEST
if (state.getOurRole() == PeerTestState.BOB) {
if (state.getOurRole() == BOB) {
if (DataHelper.eq(fromIP, state.getAliceIP().getAddress()) &&
(fromPort == state.getAlicePort()) ) {
receiveFromAliceAsBob(from, testInfo, nonce, state);
@@ -516,9 +604,9 @@ class PeerTestManager {
if (_log.shouldLog(Log.WARN))
_log.warn("Received from a fourth party as bob! alice: " + state.getAliceIP() + ", charlie: " + state.getCharlieIP() + ", dave: " + from);
}
} else if (state.getOurRole() == PeerTestState.CHARLIE) {
} else if (state.getOurRole() == CHARLIE) {
if ( (testIP == null) || (testPort <= 0) ) {
receiveFromAliceAsCharlie(from, testInfo, nonce);
receiveFromAliceAsCharlie(from, testInfo, nonce, state);
} else {
receiveFromBobAsCharlie(from, testInfo, nonce, state);
}
@@ -528,24 +616,27 @@ class PeerTestManager {
// Below here are methods for when we are Bob or Charlie
private static final int MAX_RELAYED_PER_TEST = 5;
/**
* The packet's IP/port does not match the IP/port included in the message,
* so we must be Charlie receiving a PeerTest from Bob.
*
* @param state null if new
*/
private void receiveFromBobAsCharlie(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce, PeerTestState state) {
long now = _context.clock().now();
boolean isNew = false;
if (state == null) {
isNew = true;
state = new PeerTestState();
state.setOurRole(PeerTestState.CHARLIE);
state = new PeerTestState(CHARLIE, nonce, now);
} else {
if (state.getReceiveBobTime() > now - (RESEND_TIMEOUT / 2)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too soon, not retransmitting: " + state);
return;
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive test as charlie nonce " + nonce);
// TODO should only do most of this if isNew
int sz = testInfo.readIPSize();
byte aliceIPData[] = new byte[sz];
try {
@@ -561,12 +652,10 @@ class PeerTestManager {
state.setAliceIP(aliceIP);
state.setAlicePort(alicePort);
state.setAliceIntroKey(aliceIntroKey);
state.setNonce(nonce);
state.setBobIP(bobIP);
state.setBobPort(from.getPort());
state.setLastSendTime(_context.clock().now());
state.setOurRole(PeerTestState.CHARLIE);
state.setReceiveBobTime(_context.clock().now());
state.setLastSendTime(now);
state.setReceiveBobTime(now);
PeerState bob = _transport.getPeerState(from);
if (bob == null) {
@@ -578,16 +667,15 @@ class PeerTestManager {
state.setBobMACKey(bob.getCurrentMACKey());
}
state.incrementPacketsRelayed();
if (state.getPacketsRelayed() > MAX_RELAYED_PER_TEST) {
// we send two packets below, but increment just once
if (state.incrementPacketsRelayed() > MAX_RELAYED_PER_TEST_CHARLIE) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive from bob (" + from + ") as charlie with alice @ " + aliceIP + ":" + alicePort
+ ", but we've already relayed too many packets to that test, so we're dropping it");
_log.warn("Too many, not retransmitting: " + state);
return;
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from bob (" + from + ") as charlie, sending back to bob and sending to alice @ " + aliceIP + ":" + alicePort);
_log.debug("Receive from Bob: " + state);
if (isNew) {
_activeTests.put(Long.valueOf(nonce), state);
@@ -611,20 +699,19 @@ class PeerTestManager {
* any info in the message), plus we are not acting as Charlie (so we've got to be Bob).
*
* testInfo IP/port ignored
* @param state null if new
*/
private void receiveFromAliceAsBob(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce, PeerTestState state) {
// we are Bob, so pick a (potentially) Charlie and send Charlie Alice's info
PeerState charlie = null;
PeerState charlie;
RouterInfo charlieInfo = null;
if (state == null) { // pick a new charlie
charlie = _transport.pickTestPeer(from);
if (charlie != null)
charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
} else {
charlie = _transport.getPeerState(new RemoteHostId(state.getCharlieIP().getAddress(), state.getCharliePort()));
if (charlie != null)
charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
}
if (charlie != null)
charlieInfo = _context.netDb().lookupRouterInfoLocally(charlie.getRemotePeer());
if ( (charlie == null) || (charlieInfo == null) ) {
if (_log.shouldLog(Log.WARN))
@@ -632,6 +719,7 @@ class PeerTestManager {
return;
}
// TODO should only do most of this if isNew
InetAddress aliceIP = null;
SessionKey aliceIntroKey = null;
try {
@@ -651,28 +739,30 @@ class PeerTestManager {
//UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, charlieIntroKey, nonce);
//_transport.send(packet);
long now = _context.clock().now();
boolean isNew = false;
if (state == null) {
isNew = true;
state = new PeerTestState();
state.setBeginTime(_context.clock().now());
state = new PeerTestState(BOB, nonce, now);
} else {
if (state.getReceiveAliceTime() > now - (RESEND_TIMEOUT / 2)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too soon, not retransmitting: " + state);
return;
}
}
state.setAliceIP(aliceIP);
state.setAlicePort(from.getPort());
state.setAliceIntroKey(aliceIntroKey);
state.setNonce(nonce);
state.setCharlieIP(charlie.getRemoteIPAddress());
state.setCharliePort(charlie.getRemotePort());
state.setCharlieIntroKey(charlieIntroKey);
state.setLastSendTime(_context.clock().now());
state.setOurRole(PeerTestState.BOB);
state.setReceiveAliceTime(_context.clock().now());
state.setLastSendTime(now);
state.setReceiveAliceTime(now);
state.incrementPacketsRelayed();
if (state.getPacketsRelayed() > MAX_RELAYED_PER_TEST) {
if (state.incrementPacketsRelayed() > MAX_RELAYED_PER_TEST_BOB) {
if (_log.shouldLog(Log.WARN))
_log.warn("Receive from alice (" + aliceIP + ":" + from.getPort()
+ ") as bob, but we've already relayed too many packets to that test, so we're dropping it");
_log.warn("Too many, not retransmitting: " + state);
return;
}
@@ -688,8 +778,7 @@ class PeerTestManager {
charlie.getCurrentMACKey());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from alice as bob for " + nonce + ", picking charlie @ " + charlie.getRemoteIPAddress() + ":"
+ charlie.getRemotePort() + " for alice @ " + aliceIP + ":" + from.getPort());
_log.debug("Receive from Alice: " + state);
_transport.send(packet);
} catch (UnknownHostException uhe) {
@@ -704,23 +793,29 @@ class PeerTestManager {
* packet verifying participation.
*
* testInfo IP/port ignored
* @param state non-null
*/
private void receiveFromCharlieAsBob(RemoteHostId from, PeerTestState state) {
state.setReceiveCharlieTime(_context.clock().now());
state.incrementPacketsRelayed();
if (state.getPacketsRelayed() > MAX_RELAYED_PER_TEST) {
long now = _context.clock().now();
if (state.getReceiveCharlieTime() > now - (RESEND_TIMEOUT / 2)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Received from charlie (" + from + ") as bob (" + state + "), but we've already relayed too many, so drop it");
_log.warn("Too soon, not retransmitting: " + state);
return;
}
if (state.incrementPacketsRelayed() > MAX_RELAYED_PER_TEST_BOB) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too many, not retransmitting: " + state);
return;
}
state.setReceiveCharlieTime(now);
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(state.getAliceIP(), state.getAlicePort(),
state.getAliceIntroKey(), state.getCharlieIntroKey(),
state.getNonce());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from charlie @ " + from + " as bob, sending alice back the ok @ " + state.getAliceIP() + ":" + state.getAlicePort());
_log.debug("Receive from Charlie, sending Alice back the OK: " + state);
_transport.send(packet);
}
@@ -729,8 +824,24 @@ class PeerTestManager {
* We are charlie, so send Alice her PeerTest message
*
* testInfo IP/port ignored
* @param state non-null
*/
private void receiveFromAliceAsCharlie(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo, long nonce) {
private void receiveFromAliceAsCharlie(RemoteHostId from, UDPPacketReader.PeerTestReader testInfo,
long nonce, PeerTestState state) {
long now = _context.clock().now();
if (state.getReceiveAliceTime() > now - (RESEND_TIMEOUT / 2)) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too soon, not retransmitting: " + state);
return;
}
if (state.incrementPacketsRelayed() > MAX_RELAYED_PER_TEST_CHARLIE) {
if (_log.shouldLog(Log.WARN))
_log.warn("Too many, not retransmitting: " + state);
return;
}
state.setReceiveAliceTime(now);
try {
InetAddress aliceIP = InetAddress.getByAddress(from.getIP());
SessionKey aliceIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
@@ -738,7 +849,7 @@ class PeerTestManager {
UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, _transport.getIntroKey(), nonce);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive from alice as charlie, w/ alice @ " + aliceIP + ":" + from.getPort() + " and nonce " + nonce);
_log.debug("Receive from Alice: " + state);
_transport.send(packet);
} catch (UnknownHostException uhe) {
@@ -749,13 +860,15 @@ class PeerTestManager {
}
/**
* forget about charlie's nonce after 60s.
* forget about charlie's nonce after a short while.
*/
private class RemoveTest implements SimpleTimer.TimedEvent {
private long _nonce;
private final long _nonce;
public RemoveTest(long nonce) {
_nonce = nonce;
}
public void timeReached() {
_activeTests.remove(Long.valueOf(_nonce));
}

View File

@@ -1,15 +1,17 @@
package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.SessionKey;
/**
*
* Track the state of a peer test.
* Used only by PeerTestManager.
*/
class PeerTestState {
private long _testNonce;
private short _ourRole;
private final long _testNonce;
private final Role _ourRole;
private InetAddress _aliceIP;
private int _alicePort;
private InetAddress _bobIP;
@@ -22,22 +24,26 @@ class PeerTestState {
private SessionKey _charlieIntroKey;
private SessionKey _bobCipherKey;
private SessionKey _bobMACKey;
private long _beginTime;
private final long _beginTime;
private long _lastSendTime;
private long _receiveAliceTime;
private long _receiveBobTime;
private long _receiveCharlieTime;
private int _packetsRelayed;
private final AtomicInteger _packetsRelayed = new AtomicInteger();
public static final short ALICE = 1;
public static final short BOB = 2;
public static final short CHARLIE = 3;
public enum Role {ALICE, BOB, CHARLIE};
public PeerTestState(Role role, long nonce, long now) {
_ourRole = role;
_testNonce = nonce;
_beginTime = now;
}
public long getNonce() { return _testNonce; }
public void setNonce(long nonce) { _testNonce = nonce; }
/** Are we Alice, bob, or Charlie. */
public short getOurRole() { return _ourRole; }
public void setOurRole(short role) { _ourRole = role; }
public Role getOurRole() { return _ourRole; }
/**
* If we are Alice, this will contain the IP that Bob says we
* can be reached at - the IP Charlie says we can be reached
@@ -79,47 +85,49 @@ class PeerTestState {
/** when did this test begin? */
public long getBeginTime() { return _beginTime; }
public void setBeginTime(long when) { _beginTime = when; }
/** when did we last send out a packet? */
public long getLastSendTime() { return _lastSendTime; }
public void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last hear from alice? */
/**
* when did we last hear from alice?
*/
public long getReceiveAliceTime() { return _receiveAliceTime; }
public void setReceiveAliceTime(long when) { _receiveAliceTime = when; }
/** when did we last hear from bob? */
public long getReceiveBobTime() { return _receiveBobTime; }
public void setReceiveBobTime(long when) { _receiveBobTime = when; }
/** when did we last hear from charlie? */
public long getReceiveCharlieTime() { return _receiveCharlieTime; }
public void setReceiveCharlieTime(long when) { _receiveCharlieTime = when; }
public int getPacketsRelayed() { return _packetsRelayed; }
public void incrementPacketsRelayed() { ++_packetsRelayed; }
/** @return new value */
public int incrementPacketsRelayed() { return _packetsRelayed.incrementAndGet(); }
@Override
public String toString() {
StringBuilder buf = new StringBuilder(512);
buf.append("Role: ");
if (_ourRole == ALICE) buf.append("Alice");
else if (_ourRole == BOB) buf.append("Bob");
else if (_ourRole == CHARLIE) buf.append("Charlie");
else buf.append("unkown!");
StringBuilder buf = new StringBuilder(256);
buf.append("PeerTest ").append(_testNonce)
.append(" as ").append(_ourRole.toString());
if (_aliceIP != null)
buf.append(" alice: ").append(_aliceIP).append(':').append(_alicePort);
buf.append("; Alice: ").append(_aliceIP).append(':').append(_alicePort);
if (_aliceIPFromCharlie != null)
buf.append(" (fromCharlie ").append(_aliceIPFromCharlie).append(':').append(_alicePortFromCharlie).append(')');
if (_bobIP != null)
buf.append(" bob: ").append(_bobIP).append(':').append(_bobPort);
buf.append("; Bob: ").append(_bobIP).append(':').append(_bobPort);
if (_charlieIP != null)
buf.append(" charlie: ").append(_charlieIP).append(':').append(_charliePort);
buf.append(" last send after ").append(_lastSendTime - _beginTime).append("ms");
buf.append(" Charlie: ").append(_charlieIP).append(':').append(_charliePort);
buf.append("; last send after ").append(_lastSendTime - _beginTime);
if (_receiveAliceTime > 0)
buf.append(" receive from alice after ").append(_receiveAliceTime - _beginTime).append("ms");
buf.append("; rcvd from Alice after ").append(_receiveAliceTime - _beginTime);
if (_receiveBobTime > 0)
buf.append(" receive from bob after ").append(_receiveBobTime - _beginTime).append("ms");
buf.append("; rcvd from Bob after ").append(_receiveBobTime - _beginTime);
if (_receiveCharlieTime > 0)
buf.append(" receive from charlie after ").append(_receiveCharlieTime - _beginTime).append("ms");
buf.append(" packets relayed: ").append(_packetsRelayed);
buf.append("; rcvd from Charlie after ").append(_receiveCharlieTime - _beginTime);
buf.append("; pkts relayed: ").append(_packetsRelayed.get());
return buf.toString();
}
}

View File

@@ -96,6 +96,8 @@ class UDPEndpoint {
private DatagramSocket getSocket() {
DatagramSocket socket = null;
int port = _listenPort;
if (port > 0 && port < 1024)
_log.logAlways(Log.WARN, "Specified UDP port is " + port + ", ports lower than 1024 not recommended");
for (int i = 0; i < MAX_PORT_RETRIES; i++) {
if (port <= 0) {
@@ -113,7 +115,7 @@ class UDPEndpoint {
break;
} catch (SocketException se) {
if (_log.shouldLog(Log.WARN))
_log.warn("Binding to port " + port + " failed: " + se);
_log.warn("Binding to port " + port + " failed", se);
}
port = -1;
}

View File

@@ -112,6 +112,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
*/
public static final int DEFAULT_INTERNAL_PORT = 8887;
/**
* To prevent trouble. To be raised to 1024 in 0.9.4.
*
* @since 0.9.3
*/
static final int MIN_PEER_PORT = 500;
/** Limits on port told to us by others,
* We should have an exception if it matches the existing low port.
*/
@@ -1288,7 +1295,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (addr.getOption("ihost0") == null) {
byte[] ip = addr.getIP();
int port = addr.getPort();
if (ip == null || port <= 0 ||
if (ip == null || port < MIN_PEER_PORT ||
(!isValid(ip)) ||
Arrays.equals(ip, getExternalIP())) {
markUnreachable(to);
@@ -2646,8 +2653,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (peerInfo == null)
continue;
RouterAddress addr = peerInfo.getTargetAddress(STYLE);
if (addr != null)
return peer;
if (addr == null)
continue;
byte[] ip = addr.getIP();
if (ip == null)
continue;
if (DataHelper.eq(ip, 0, getExternalIP(), 0, 2))
continue;
return peer;
}
return null;
}