- Big refactor of several classes for concurrent,
        elimination of several locks
      - Reduce max number of resent acks in a packet to
        lower overhead
      - Take incoming messages from the head of the queue,
        not sure why taking them from the tail "reduces latency"
      - Java 5 cleanup
This commit is contained in:
zzz
2010-03-09 20:44:46 +00:00
parent d79387bd92
commit 40e820cabb
22 changed files with 653 additions and 645 deletions

View File

@@ -1,7 +1,10 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
@@ -18,8 +21,9 @@ public class ACKSender implements Runnable {
private UDPTransport _transport;
private PacketBuilder _builder;
/** list of peers (PeerState) who we have received data from but not yet ACKed to */
private final List _peersToACK;
private final BlockingQueue<PeerState> _peersToACK;
private boolean _alive;
private static final long POISON_PS = -9999999999l;
/** how frequently do we want to send ACKs to a peer? */
static final int ACK_FREQUENCY = 500;
@@ -28,7 +32,7 @@ public class ACKSender implements Runnable {
_context = ctx;
_log = ctx.logManager().getLog(ACKSender.class);
_transport = transport;
_peersToACK = new ArrayList(4);
_peersToACK = new LinkedBlockingQueue();
_builder = new PacketBuilder(_context, transport);
_alive = true;
_context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", UDPTransport.RATES);
@@ -37,27 +41,34 @@ public class ACKSender implements Runnable {
_context.statManager().createRateStat("udp.abortACK", "How often do we schedule up an ACK send only to find it had already been sent (through piggyback)?", "udp", UDPTransport.RATES);
}
/**
* Add to the queue.
* For speed, don't check for duplicates here.
* The runner will remove them in its own thread.
*/
public void ackPeer(PeerState peer) {
synchronized (_peersToACK) {
if (!_peersToACK.contains(peer))
_peersToACK.add(peer);
_peersToACK.notifyAll();
}
if (_alive)
_peersToACK.offer(peer);
}
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP ACK sender");
t.setDaemon(true);
_peersToACK.clear();
I2PThread t = new I2PThread(this, "UDP ACK sender", true);
t.start();
}
public void shutdown() {
_alive = false;
synchronized (_peersToACK) {
_peersToACK.clear();
_peersToACK.notifyAll();
PeerState poison = new PeerState(_context, _transport);
poison.setTheyRelayToUsAs(POISON_PS);
_peersToACK.offer(poison);
for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_peersToACK.clear();
}
private long ackFrequency(long timeSinceACK, long rtt) {
@@ -71,47 +82,89 @@ public class ACKSender implements Runnable {
}
public void run() {
// we use a Set to strip out dups that come in on the Queue
Set<PeerState> notYet = new HashSet();
while (_alive) {
PeerState peer = null;
long now = _context.clock().now();
long now = 0;
long remaining = -1;
try {
synchronized (_peersToACK) {
for (int i = 0; i < _peersToACK.size(); i++) {
PeerState cur = (PeerState)_peersToACK.get(i);
long wanted = cur.getWantedACKSendSince();
long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now;
if ( ( (wanted > 0) && (delta < 0) ) || (cur.unsentACKThresholdReached()) ) {
_peersToACK.remove(i);
peer = cur;
break;
}
}
if (peer == null) {
if (_peersToACK.size() <= 0)
_peersToACK.wait();
long wanted = 0;
while (_alive) {
// Pull from the queue until we find one ready to ack
// Any that are not ready we will put back on the queue
PeerState cur = null;
try {
if (notYet.isEmpty())
// wait forever
cur = _peersToACK.take();
else
_peersToACK.wait(50);
} else {
remaining = _peersToACK.size();
}
}
} catch (InterruptedException ie) {}
// Don't wait if nothing there, just put everybody back and sleep below
cur = _peersToACK.poll();
} catch (InterruptedException ie) {}
if (cur != null) {
if (cur.getTheyRelayToUsAs() == POISON_PS)
return;
wanted = cur.getWantedACKSendSince();
now = _context.clock().now();
long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now;
if (wanted <= 0) {
// it got acked by somebody - discard, remove any dups, and go around again
notYet.remove(cur);
} else if ( (delta <= 0) || (cur.unsentACKThresholdReached()) ) {
// found one to ack
peer = cur;
notYet.remove(cur); // in case a dup
try {
// bulk operations may throw an exception
_peersToACK.addAll(notYet);
} catch (Exception e) {}
notYet.clear();
break;
} else {
// not yet, go around again
// moving from the Queue to the Set and then back removes duplicates
boolean added = notYet.add(cur);
if (added && _log.shouldLog(Log.DEBUG))
_log.debug("Pending ACK (delta = " + delta + ") for " + cur);
}
} else if (!notYet.isEmpty()) {
// put them all back and wait a while
try {
// bulk operations may throw an exception
_peersToACK.addAll(notYet);
} catch (Exception e) {}
if (_log.shouldLog(Log.INFO))
_log.info("sleeping, pending size = " + notYet.size());
notYet.clear();
try {
// sleep a little longer than the divided frequency,
// so it will be ready after we circle around a few times
Thread.sleep(5 + (ACK_FREQUENCY / 3));
} catch (InterruptedException ie) {}
} // else go around again where we will wait at take()
} // inner while()
if (peer != null) {
long lastSend = peer.getLastACKSend();
long wanted = peer.getWantedACKSendSince();
List ackBitfields = peer.retrieveACKBitfields(false);
// set above before the break
//long wanted = peer.getWantedACKSendSince();
List<ACKBitfield> ackBitfields = peer.retrieveACKBitfields(false);
if (wanted < 0)
_log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
if (wanted < 0) {
if (_log.shouldLog(Log.WARN))
_log.warn("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields);
continue;
}
if ( (ackBitfields != null) && (ackBitfields.size() > 0) ) {
if ( (ackBitfields != null) && (!ackBitfields.isEmpty()) ) {
_context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0);
if (remaining > 0)
_context.statManager().addRateData("udp.sendACKRemaining", remaining, 0);
now = _context.clock().now();
// set above before the break
//now = _context.clock().now();
if (lastSend < 0)
lastSend = now - 1;
_context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted);
@@ -119,7 +172,7 @@ public class ACKSender implements Runnable {
UDPPacket ack = _builder.buildACK(peer, ackBitfields);
ack.markType(1);
ack.setFragmentCount(-1);
ack.setMessageType(42);
ack.setMessageType(PacketBuilder.TYPE_ACK);
if (_log.shouldLog(Log.INFO))
_log.info("Sending ACK for " + ackBitfields);

View File

@@ -7,6 +7,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.crypto.DHSessionKeyBuilder;
import net.i2p.data.Base64;
@@ -37,13 +38,13 @@ public class EstablishmentManager {
private UDPTransport _transport;
private PacketBuilder _builder;
/** map of RemoteHostId to InboundEstablishState */
private final Map _inboundStates;
private final ConcurrentHashMap<RemoteHostId, InboundEstablishState> _inboundStates;
/** map of RemoteHostId to OutboundEstablishState */
private final Map _outboundStates;
private final ConcurrentHashMap<RemoteHostId, OutboundEstablishState> _outboundStates;
/** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */
private final Map _queuedOutbound;
private final ConcurrentHashMap<RemoteHostId, List<OutNetMessage>> _queuedOutbound;
/** map of nonce (Long) to OutboundEstablishState */
private final Map _liveIntroductions;
private final ConcurrentHashMap<Long, OutboundEstablishState> _liveIntroductions;
private boolean _alive;
private final Object _activityLock;
private int _activity;
@@ -56,10 +57,10 @@ public class EstablishmentManager {
_log = ctx.logManager().getLog(EstablishmentManager.class);
_transport = transport;
_builder = new PacketBuilder(ctx, transport);
_inboundStates = new HashMap(32);
_outboundStates = new HashMap(32);
_queuedOutbound = new HashMap(32);
_liveIntroductions = new HashMap(32);
_inboundStates = new ConcurrentHashMap();
_outboundStates = new ConcurrentHashMap();
_queuedOutbound = new ConcurrentHashMap();
_liveIntroductions = new ConcurrentHashMap();
_activityLock = new Object();
_context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES);
@@ -74,8 +75,7 @@ public class EstablishmentManager {
public void startup() {
_alive = true;
I2PThread t = new I2PThread(new Establisher(), "UDP Establisher");
t.setDaemon(true);
I2PThread t = new I2PThread(new Establisher(), "UDP Establisher", true);
t.start();
}
public void shutdown() {
@@ -87,21 +87,17 @@ public class EstablishmentManager {
* Grab the active establishing state
*/
InboundEstablishState getInboundState(RemoteHostId from) {
synchronized (_inboundStates) {
InboundEstablishState state = (InboundEstablishState)_inboundStates.get(from);
InboundEstablishState state = _inboundStates.get(from);
// if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
// _log.debug("No inbound states for " + from + ", with remaining: " + _inboundStates);
return state;
}
}
OutboundEstablishState getOutboundState(RemoteHostId from) {
synchronized (_outboundStates) {
OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(from);
OutboundEstablishState state = _outboundStates.get(from);
// if ( (state == null) && (_log.shouldLog(Log.DEBUG)) )
// _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates);
return state;
}
}
private int getMaxConcurrentEstablish() {
@@ -163,39 +159,42 @@ public class EstablishmentManager {
int deferred = 0;
boolean rejected = false;
int queueCount = 0;
synchronized (_outboundStates) {
state = (OutboundEstablishState)_outboundStates.get(to);
state = _outboundStates.get(to);
if (state == null) {
if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
List queued = (List)_queuedOutbound.get(to);
if (queued == null) {
queued = new ArrayList(1);
if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
rejected = true;
} else {
_queuedOutbound.put(to, queued);
}
if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) {
rejected = true;
} else {
List<OutNetMessage> newQueued = new ArrayList(1);
List<OutNetMessage> queued = _queuedOutbound.putIfAbsent(to, newQueued);
if (queued == null)
queued = newQueued;
queueCount = queued.size();
if (queueCount < MAX_QUEUED_PER_PEER)
queued.add(msg);
}
queueCount = queued.size();
if ( (queueCount < MAX_QUEUED_PER_PEER) && (!rejected) )
queued.add(msg);
deferred = _queuedOutbound.size();
} else {
state = new OutboundEstablishState(_context, remAddr, port,
msg.getTarget().getIdentity(),
new SessionKey(addr.getIntroKey()), addr);
_outboundStates.put(to, state);
SimpleScheduler.getInstance().addEvent(new Expire(to, state), 10*1000);
OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state);
boolean isNew = oldState == null;
if (!isNew)
// whoops, somebody beat us to it, throw out the state we just created
state = oldState;
else
SimpleScheduler.getInstance().addEvent(new Expire(to, state), 10*1000);
}
}
if (state != null) {
state.addMessage(msg);
List queued = (List)_queuedOutbound.remove(to);
List<OutNetMessage> queued = _queuedOutbound.remove(to);
if (queued != null)
for (int i = 0; i < queued.size(); i++)
state.addMessage((OutNetMessage)queued.get(i));
state.addMessage(queued.get(i));
}
}
if (rejected) {
_transport.failed(msg, "Too many pending outbound connections");
@@ -223,17 +222,9 @@ public class EstablishmentManager {
_state = state;
}
public void timeReached() {
Object removed = null;
synchronized (_outboundStates) {
removed = _outboundStates.remove(_to);
if ( (removed != null) && (removed != _state) ) { // oops, we must have failed, then retried
_outboundStates.put(_to, removed);
removed = null;
}/* else {
locked_admitQueued();
}*/
}
if (removed != null) {
// remove only if value == state
boolean removed = _outboundStates.remove(_to, _state);
if (removed) {
_context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime());
if (_log.shouldLog(Log.WARN))
_log.warn("Timing out expired outbound: " + _state);
@@ -260,12 +251,11 @@ public class EstablishmentManager {
int maxInbound = getMaxInboundEstablishers();
boolean isNew = false;
InboundEstablishState state = null;
synchronized (_inboundStates) {
if (_inboundStates.size() >= maxInbound)
return; // drop the packet
state = (InboundEstablishState)_inboundStates.get(from);
InboundEstablishState state = _inboundStates.get(from);
if (state == null) {
if (_context.blocklist().isBlocklisted(from.getIP())) {
if (_log.shouldLog(Log.WARN))
@@ -276,10 +266,13 @@ public class EstablishmentManager {
return; // drop the packet
state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort());
state.receiveSessionRequest(reader.getSessionRequestReader());
isNew = true;
_inboundStates.put(from, state);
InboundEstablishState oldState = _inboundStates.putIfAbsent(from, state);
isNew = oldState == null;
if (!isNew)
// whoops, somebody beat us to it, throw out the state we just created
state = oldState;
}
}
if (isNew) {
// we don't expect inbound connections when hidden, but it could happen
// Don't offer if we are approaching max connections. While Relay Intros do not
@@ -307,10 +300,7 @@ public class EstablishmentManager {
* establishment)
*/
void receiveSessionConfirmed(RemoteHostId from, UDPPacketReader reader) {
InboundEstablishState state = null;
synchronized (_inboundStates) {
state = (InboundEstablishState)_inboundStates.get(from);
}
InboundEstablishState state = _inboundStates.get(from);
if (state != null) {
state.receiveSessionConfirmed(reader.getSessionConfirmedReader());
notifyActivity();
@@ -324,10 +314,7 @@ public class EstablishmentManager {
*
*/
void receiveSessionCreated(RemoteHostId from, UDPPacketReader reader) {
OutboundEstablishState state = null;
synchronized (_outboundStates) {
state = (OutboundEstablishState)_outboundStates.get(from);
}
OutboundEstablishState state = _outboundStates.get(from);
if (state != null) {
state.receiveSessionCreated(reader.getSessionCreatedReader());
notifyActivity();
@@ -346,21 +333,19 @@ public class EstablishmentManager {
//int active = 0;
//int admitted = 0;
//int remaining = 0;
synchronized (_outboundStates) {
//active = _outboundStates.size();
_outboundStates.remove(state.getRemoteHostId());
if (_queuedOutbound.size() > 0) {
// there shouldn't have been queued messages for this active state, but just in case...
List queued = (List)_queuedOutbound.remove(state.getRemoteHostId());
List<OutNetMessage> queued = _queuedOutbound.remove(state.getRemoteHostId());
if (queued != null) {
for (int i = 0; i < queued.size(); i++)
state.addMessage((OutNetMessage)queued.get(i));
state.addMessage(queued.get(i));
}
//admitted = locked_admitQueued();
}
//remaining = _queuedOutbound.size();
}
//if (admitted > 0)
// _log.log(Log.CRIT, "Admitted " + admitted + " with " + remaining + " remaining queued and " + active + " active");
@@ -371,6 +356,7 @@ public class EstablishmentManager {
return peer;
}
/********
private int locked_admitQueued() {
int admitted = 0;
while ( (_queuedOutbound.size() > 0) && (_outboundStates.size() < getMaxConcurrentEstablish()) ) {
@@ -409,6 +395,7 @@ public class EstablishmentManager {
}
return admitted;
}
*******/
private void notifyActivity() {
synchronized (_activityLock) {
@@ -596,9 +583,7 @@ public class EstablishmentManager {
} catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Peer " + state.getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe);
synchronized (_inboundStates) {
_inboundStates.remove(state.getRemoteHostId());
}
_inboundStates.remove(state.getRemoteHostId());
return;
}
_transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey()));
@@ -627,14 +612,12 @@ public class EstablishmentManager {
private void handlePendingIntro(OutboundEstablishState state) {
long nonce = _context.random().nextLong(MAX_NONCE);
while (true) {
synchronized (_liveIntroductions) {
OutboundEstablishState old = (OutboundEstablishState)_liveIntroductions.put(new Long(nonce), state);
OutboundEstablishState old = _liveIntroductions.putIfAbsent(new Long(nonce), state);
if (old != null) {
nonce = _context.random().nextLong(MAX_NONCE);
} else {
break;
}
}
}
SimpleScheduler.getInstance().addEvent(new FailIntroduction(state, nonce), INTRO_ATTEMPT_TIMEOUT);
state.setIntroNonce(nonce);
@@ -656,16 +639,9 @@ public class EstablishmentManager {
_state = state;
}
public void timeReached() {
OutboundEstablishState removed = null;
synchronized (_liveIntroductions) {
removed = (OutboundEstablishState)_liveIntroductions.remove(new Long(_nonce));
if (removed != _state) {
// another one with the same nonce in a very brief time...
_liveIntroductions.put(new Long(_nonce), removed);
removed = null;
}
}
if (removed != null) {
// remove only if value equal to state
boolean removed = _liveIntroductions.remove(new Long(_nonce), _state);
if (removed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send intro for " + _state.getRemoteHostId().toString() + " timed out");
_context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0);
@@ -677,10 +653,7 @@ public class EstablishmentManager {
/* FIXME Exporting non-public type through public API FIXME */
public void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) {
long nonce = reader.getRelayResponseReader().readNonce();
OutboundEstablishState state = null;
synchronized (_liveIntroductions) {
state = (OutboundEstablishState)_liveIntroductions.remove(new Long(nonce));
}
OutboundEstablishState state = _liveIntroductions.remove(new Long(nonce));
if (state == null)
return; // already established
@@ -705,10 +678,8 @@ public class EstablishmentManager {
+ addr.toString() + ":" + port + " (according to " + bob.toString(true) + ")");
RemoteHostId oldId = state.getRemoteHostId();
state.introduced(addr, ip, port);
synchronized (_outboundStates) {
_outboundStates.remove(oldId);
_outboundStates.put(state.getRemoteHostId(), state);
}
_outboundStates.remove(oldId);
_outboundStates.put(state.getRemoteHostId(), state);
notifyActivity();
}
@@ -748,11 +719,11 @@ public class EstablishmentManager {
long now = _context.clock().now();
long nextSendTime = -1;
InboundEstablishState inboundState = null;
synchronized (_inboundStates) {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("# inbound states: " + _inboundStates.size());
for (Iterator iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
InboundEstablishState cur = (InboundEstablishState)iter.next();
for (Iterator<InboundEstablishState> iter = _inboundStates.values().iterator(); iter.hasNext(); ) {
InboundEstablishState cur = iter.next();
if (cur.getState() == InboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
// completely received (though the signature may be invalid)
iter.remove();
@@ -791,7 +762,6 @@ public class EstablishmentManager {
}
}
}
}
if (inboundState != null) {
//if (_log.shouldLog(Log.DEBUG))
@@ -853,12 +823,12 @@ public class EstablishmentManager {
//int admitted = 0;
//int remaining = 0;
//int active = 0;
synchronized (_outboundStates) {
//active = _outboundStates.size();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("# outbound states: " + _outboundStates.size());
for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState cur = (OutboundEstablishState)iter.next();
for (Iterator<OutboundEstablishState> iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
OutboundEstablishState cur = iter.next();
if (cur == null) continue;
if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
// completely received
@@ -902,7 +872,6 @@ public class EstablishmentManager {
//admitted = locked_admitQueued();
//remaining = _queuedOutbound.size();
}
//if (admitted > 0)
// _log.log(Log.CRIT, "Admitted " + admitted + " in push with " + remaining + " remaining queued and " + active + " active");

View File

@@ -21,17 +21,17 @@ import net.i2p.util.Log;
*
*/
public class InboundEstablishState {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
// SessionRequest message
private byte _receivedX[];
private byte _bobIP[];
private int _bobPort;
private final int _bobPort;
private DHSessionKeyBuilder _keyBuilder;
// SessionCreated message
private byte _sentY[];
private byte _aliceIP[];
private int _alicePort;
private final byte _aliceIP[];
private final int _alicePort;
private long _sentRelayTag;
private long _sentSignedOnTime;
private SessionKey _sessionKey;
@@ -44,11 +44,11 @@ public class InboundEstablishState {
private boolean _verificationAttempted;
private RouterIdentity _receivedConfirmedIdentity;
// general status
private long _establishBegin;
private long _lastReceive;
private final long _establishBegin;
//private long _lastReceive;
// private long _lastSend;
private long _nextSend;
private RemoteHostId _remoteHostId;
private final RemoteHostId _remoteHostId;
private int _currentState;
private boolean _complete;
@@ -121,9 +121,10 @@ public class InboundEstablishState {
public synchronized SessionKey getMACKey() { return _macKey; }
/** what IP do they appear to be on? */
public synchronized byte[] getSentIP() { return _aliceIP; }
public byte[] getSentIP() { return _aliceIP; }
/** what port number do they appear to be coming from? */
public synchronized int getSentPort() { return _alicePort; }
public int getSentPort() { return _alicePort; }
public synchronized byte[] getBobIP() { return _bobIP; }
@@ -205,8 +206,8 @@ public class InboundEstablishState {
}
/** how long have we been trying to establish this session? */
public synchronized long getLifetime() { return _context.clock().now() - _establishBegin; }
public synchronized long getEstablishBeginTime() { return _establishBegin; }
public long getLifetime() { return _context.clock().now() - _establishBegin; }
public long getEstablishBeginTime() { return _establishBegin; }
public synchronized long getNextSendTime() { return _nextSend; }
public synchronized void setNextSendTime(long when) { _nextSend = when; }
@@ -328,8 +329,7 @@ public class InboundEstablishState {
}
private void packetReceived() {
_lastReceive = _context.clock().now();
_nextSend = _lastReceive;
_nextSend = _context.clock().now();
}
@Override

View File

@@ -96,8 +96,8 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
if (fragments <= 0) return fragments;
Hash fromPeer = from.getRemotePeer();
Map messages = from.getInboundMessages();
Map<Long, InboundMessageState> messages = from.getInboundMessages();
for (int i = 0; i < fragments; i++) {
long mid = data.readMessageId(i);
Long messageId = new Long(mid);
@@ -122,7 +122,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource
boolean partialACK = false;
synchronized (messages) {
state = (InboundMessageState)messages.get(messageId);
state = messages.get(messageId);
if (state == null) {
state = new InboundMessageState(_context, mid, fromPeer);
messages.put(messageId, state);

View File

@@ -172,8 +172,8 @@ public class InboundMessageState {
@Override
public String toString() {
StringBuilder buf = new StringBuilder(32);
buf.append("Message: ").append(_messageId);
StringBuilder buf = new StringBuilder(256);
buf.append("IB Message: ").append(_messageId);
if (isComplete()) {
buf.append(" completely received with ");
buf.append(getCompleteSize()).append(" bytes");

View File

@@ -6,12 +6,15 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.Base64;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.router.RouterContext;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
/**
@@ -23,17 +26,17 @@ public class IntroductionManager {
private UDPTransport _transport;
private PacketBuilder _builder;
/** map of relay tag to PeerState that should receive the introduction */
private Map<Long, PeerState> _outbound;
private final Map<Long, PeerState> _outbound;
/** list of peers (PeerState) who have given us introduction tags */
private final List<PeerState> _inbound;
private final Set<PeerState> _inbound;
public IntroductionManager(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(IntroductionManager.class);
_transport = transport;
_builder = new PacketBuilder(ctx, transport);
_outbound = Collections.synchronizedMap(new HashMap(128));
_inbound = new ArrayList(128);
_outbound = new ConcurrentHashMap(128);
_inbound = new ConcurrentHashSet(128);
ctx.statManager().createRateStat("udp.receiveRelayIntro", "How often we get a relayed request for us to talk to someone?", "udp", UDPTransport.RATES);
ctx.statManager().createRateStat("udp.receiveRelayRequest", "How often we receive a good request to relay to someone else?", "udp", UDPTransport.RATES);
ctx.statManager().createRateStat("udp.receiveRelayRequestBadTag", "Received relay requests with bad/expired tag", "udp", UDPTransport.RATES);
@@ -52,10 +55,7 @@ public class IntroductionManager {
if (peer.getWeRelayToThemAs() > 0)
_outbound.put(new Long(peer.getWeRelayToThemAs()), peer);
if (peer.getTheyRelayToUsAs() > 0) {
synchronized (_inbound) {
if (!_inbound.contains(peer))
_inbound.add(peer);
}
}
}
@@ -67,9 +67,7 @@ public class IntroductionManager {
if (peer.getWeRelayToThemAs() > 0)
_outbound.remove(new Long(peer.getWeRelayToThemAs()));
if (peer.getTheyRelayToUsAs() > 0) {
synchronized (_inbound) {
_inbound.remove(peer);
}
_inbound.remove(peer);
}
}
@@ -90,14 +88,11 @@ public class IntroductionManager {
* and we want to keep our introducers valid.
*/
public int pickInbound(Properties ssuOptions, int howMany) {
List<PeerState> peers = null;
int start = _context.random().nextInt(Integer.MAX_VALUE);
synchronized (_inbound) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Picking inbound out of " + _inbound.size());
if (_inbound.size() <= 0) return 0;
peers = new ArrayList(_inbound);
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Picking inbound out of " + _inbound.size());
if (_inbound.isEmpty()) return 0;
List<PeerState> peers = new ArrayList(_inbound);
int sz = peers.size();
start = start % sz;
int found = 0;
@@ -164,9 +159,7 @@ public class IntroductionManager {
* @return number of peers that have volunteerd to introduce us
*/
int introducerCount() {
synchronized(_inbound) {
return _inbound.size();
}
}
void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) {

View File

@@ -1,7 +1,7 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@@ -24,15 +24,17 @@ public class MessageReceiver {
private Log _log;
private UDPTransport _transport;
/** list of messages (InboundMessageState) fully received but not interpreted yet */
private final List _completeMessages;
private final BlockingQueue<InboundMessageState> _completeMessages;
private boolean _alive;
private ByteCache _cache;
private static final int THREADS = 5;
private static final long POISON_IMS = -99999999999l;
public MessageReceiver(RouterContext ctx, UDPTransport transport) {
_context = ctx;
_log = ctx.logManager().getLog(MessageReceiver.class);
_transport = transport;
_completeMessages = new ArrayList(16);
_completeMessages = new LinkedBlockingQueue();
_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
@@ -46,9 +48,8 @@ public class MessageReceiver {
public void startup() {
_alive = true;
for (int i = 0; i < 5; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i);
t.setDaemon(true);
for (int i = 0; i < THREADS; i++) {
I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i + '/' + THREADS, true);
t.start();
}
}
@@ -61,26 +62,31 @@ public class MessageReceiver {
public void shutdown() {
_alive = false;
synchronized (_completeMessages) {
_completeMessages.clear();
_completeMessages.notifyAll();
_completeMessages.clear();
for (int i = 0; i < THREADS; i++) {
InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null);
_completeMessages.offer(ims);
}
for (int i = 1; i <= 5 && !_completeMessages.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_completeMessages.clear();
}
public void receiveMessage(InboundMessageState state) {
int total = 0;
long lag = -1;
synchronized (_completeMessages) {
_completeMessages.add(state);
total = _completeMessages.size();
if (total > 1)
lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();
_completeMessages.notifyAll();
}
if (total > 1)
_context.statManager().addRateData("udp.inboundReady", total, 0);
if (lag > 1000)
_context.statManager().addRateData("udp.inboundLag", lag, total);
//int total = 0;
//long lag = -1;
if (_alive)
_completeMessages.offer(state);
//total = _completeMessages.size();
//if (total > 1)
// lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();
//if (total > 1)
// _context.statManager().addRateData("udp.inboundReady", total, 0);
//if (lag > 1000)
// _context.statManager().addRateData("udp.inboundLag", lag, total);
}
public void loop(I2NPMessageHandler handler) {
@@ -91,19 +97,18 @@ public class MessageReceiver {
long expiredLifetime = 0;
int remaining = 0;
try {
synchronized (_completeMessages) {
while (message == null) {
if (_completeMessages.size() > 0) // grab the tail for lowest latency
message = (InboundMessageState)_completeMessages.remove(_completeMessages.size()-1);
else
_completeMessages.wait(5000);
message = _completeMessages.take();
if ( (message != null) && (message.getMessageId() == POISON_IMS) ) {
message = null;
break;
}
if ( (message != null) && (message.isExpired()) ) {
expiredLifetime += message.getLifetime();
message = null;
expired++;
}
remaining = _completeMessages.size();
}
//remaining = _completeMessages.size();
}
} catch (InterruptedException ie) {}

View File

@@ -1,8 +1,8 @@
package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.crypto.DHSessionKeyBuilder;
import net.i2p.data.Base64;
@@ -22,8 +22,8 @@ import net.i2p.util.Log;
*
*/
public class OutboundEstablishState {
private RouterContext _context;
private Log _log;
private final RouterContext _context;
private final Log _log;
// SessionRequest message
private byte _sentX[];
private byte _bobIP[];
@@ -44,18 +44,18 @@ public class OutboundEstablishState {
private long _sentSignedOnTime;
private Signature _sentSignature;
// general status
private long _establishBegin;
private long _lastReceive;
private final long _establishBegin;
//private long _lastReceive;
private long _lastSend;
private long _nextSend;
private RemoteHostId _remoteHostId;
private RouterIdentity _remotePeer;
private final RouterIdentity _remotePeer;
private SessionKey _introKey;
private final List _queuedMessages;
private final Queue<OutNetMessage> _queuedMessages;
private int _currentState;
private long _introductionNonce;
// intro
private UDPAddress _remoteAddress;
private final UDPAddress _remoteAddress;
private boolean _complete;
/** nothin sent yet */
@@ -87,7 +87,7 @@ public class OutboundEstablishState {
_remotePeer = remotePeer;
_introKey = introKey;
_keyBuilder = null;
_queuedMessages = new ArrayList(4);
_queuedMessages = new LinkedBlockingQueue();
_currentState = STATE_UNKNOWN;
_establishBegin = ctx.clock().now();
_remoteAddress = addr;
@@ -113,22 +113,21 @@ public class OutboundEstablishState {
public long getIntroNonce() { return _introductionNonce; }
public void addMessage(OutNetMessage msg) {
synchronized (_queuedMessages) {
if (!_queuedMessages.contains(msg))
_queuedMessages.add(msg);
}
// chance of a duplicate here in a race, that's ok
if (!_queuedMessages.contains(msg))
_queuedMessages.offer(msg);
else if (_log.shouldLog(Log.WARN))
_log.warn("attempt to add duplicate msg to queue: " + msg);
}
public OutNetMessage getNextQueuedMessage() {
synchronized (_queuedMessages) {
if (_queuedMessages.size() > 0)
return (OutNetMessage)_queuedMessages.remove(0);
}
return null;
return _queuedMessages.poll();
}
public RouterIdentity getRemoteIdentity() { return _remotePeer; }
public SessionKey getIntroKey() { return _introKey; }
/** called from constructor, no need to synch */
private void prepareSessionRequest() {
_keyBuilder = new DHSessionKeyBuilder();
byte X[] = _keyBuilder.getMyPublicValue().toByteArray();
@@ -142,7 +141,7 @@ public class OutboundEstablishState {
System.arraycopy(X, 0, _sentX, _sentX.length - X.length, X.length);
}
public synchronized byte[] getSentX() { return _sentX; }
public byte[] getSentX() { return _sentX; }
public synchronized byte[] getSentIP() { return _bobIP; }
public synchronized int getSentPort() { return _bobPort; }
@@ -403,8 +402,8 @@ public class OutboundEstablishState {
}
/** how long have we been trying to establish this session? */
public synchronized long getLifetime() { return _context.clock().now() - _establishBegin; }
public synchronized long getEstablishBeginTime() { return _establishBegin; }
public long getLifetime() { return _context.clock().now() - _establishBegin; }
public long getEstablishBeginTime() { return _establishBegin; }
public synchronized long getNextSendTime() { return _nextSend; }
public synchronized void setNextSendTime(long when) {
_nextSend = when;
@@ -422,8 +421,7 @@ public class OutboundEstablishState {
}
private void packetReceived() {
_lastReceive = _context.clock().now();
_nextSend = _lastReceive;
_nextSend = _context.clock().now();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Got a packet, nextSend == now");
}

View File

@@ -28,7 +28,7 @@ public class OutboundMessageFragments {
private UDPTransport _transport;
// private ActiveThrottle _throttle; // LINT not used ??
/** peers we are actively sending messages to */
private final List _activePeers;
private final List<PeerState> _activePeers;
private boolean _alive;
/** which peer should we build the next packet out of? */
private int _nextPeer;
@@ -207,7 +207,7 @@ public class OutboundMessageFragments {
synchronized (_activePeers) {
peers = new ArrayList(_activePeers.size());
for (int i = 0; i < _activePeers.size(); i++) {
PeerState state = (PeerState)_activePeers.get(i);
PeerState state = _activePeers.get(i);
if (state.getOutboundMessageCount() <= 0) {
_activePeers.remove(i);
i--;
@@ -255,7 +255,7 @@ public class OutboundMessageFragments {
if (cycleTime > 1000)
_context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size());
}
peer = (PeerState)_activePeers.get(i);
peer = _activePeers.get(i);
state = peer.allocateSend();
if (state != null) {
_nextPeer = i + 1;
@@ -318,12 +318,12 @@ public class OutboundMessageFragments {
return null;
// ok, simplest possible thing is to always tack on the bitfields if
List msgIds = peer.getCurrentFullACKs();
List<Long> msgIds = peer.getCurrentFullACKs();
if (msgIds == null) msgIds = new ArrayList();
List partialACKBitfields = new ArrayList();
List<ACKBitfield> partialACKBitfields = new ArrayList();
peer.fetchPartialACKs(partialACKBitfields);
int piggybackedPartialACK = partialACKBitfields.size();
List remaining = new ArrayList(msgIds);
List<Long> remaining = new ArrayList(msgIds);
int sparseCount = 0;
UDPPacket rv[] = new UDPPacket[fragments]; //sparse
for (int i = 0; i < fragments; i++) {
@@ -356,7 +356,7 @@ public class OutboundMessageFragments {
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int i = 0; i < msgIds.size(); i++) {
Long id = (Long)msgIds.get(i);
Long id = msgIds.get(i);
if (!remaining.contains(id)) {
peer.removeACKMessage(id);
piggybackedAck++;

View File

@@ -342,8 +342,8 @@ public class OutboundMessageState {
public String toString() {
short sends[] = _fragmentSends;
ByteArray messageBuf = _messageBuf;
StringBuilder buf = new StringBuilder(64);
buf.append("Message ").append(_messageId);
StringBuilder buf = new StringBuilder(256);
buf.append("OB Message ").append(_messageId);
if (sends != null)
buf.append(" with ").append(sends.length).append(" fragments");
if (messageBuf != null)

View File

@@ -31,8 +31,7 @@ public class OutboundRefiller implements Runnable {
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP outbound refiller");
t.setDaemon(true);
I2PThread t = new I2PThread(this, "UDP outbound refiller", true);
t.start();
}
public void shutdown() { _alive = false; }

View File

@@ -2,8 +2,8 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -32,6 +32,25 @@ public class PacketBuilder {
private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
private static final ByteCache _blockCache = ByteCache.getInstance(64, 16);
/**
* For debugging and stats only - does not go out on the wire.
* These are chosen to be higher than the highest I2NP message type,
* as a data packet is set to the underlying I2NP message type.
*/
static final int TYPE_FIRST = 42;
static final int TYPE_ACK = TYPE_FIRST;
static final int TYPE_PUNCH = 43;
static final int TYPE_RESP = 44;
static final int TYPE_INTRO = 45;
static final int TYPE_RREQ = 46;
static final int TYPE_TCB = 47;
static final int TYPE_TBC = 48;
static final int TYPE_TTA = 49;
static final int TYPE_TFA = 50;
static final int TYPE_CONF = 51;
static final int TYPE_SREQ = 52;
static final int TYPE_CREAT = 53;
/** we only talk to people of the right version */
static final int PROTOCOL_VERSION = 0;
@@ -58,7 +77,7 @@ public class PacketBuilder {
* The list itself is passed by reference, and if a messageId is
* included, it should be removed from the list.
*/
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) {
public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List<Long> ackIdsRemaining, List<ACKBitfield> partialACKsRemaining) {
UDPPacket packet = UDPPacket.acquire(_context, false);
StringBuilder msg = null;
@@ -92,18 +111,18 @@ public class PacketBuilder {
// is under the MTU, but for now, since the # of packets acked is so few (usually
// just one or two), and since the packets are so small anyway, an additional five
// or ten bytes doesn't hurt.
if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) )
if ( (ackIdsRemaining != null) && (!ackIdsRemaining.isEmpty()) )
data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK;
if ( (partialACKsRemaining != null) && (partialACKsRemaining.size() > 0) )
if ( (partialACKsRemaining != null) && (!partialACKsRemaining.isEmpty()) )
data[off] |= UDPPacket.DATA_FLAG_ACK_BITFIELDS;
off++;
if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) {
if ( (ackIdsRemaining != null) && (!ackIdsRemaining.isEmpty()) ) {
DataHelper.toLong(data, off, 1, ackIdsRemaining.size());
off++;
for (int i = 0; i < ackIdsRemaining.size(); i++) {
//while (ackIdsRemaining.size() > 0) {
Long ackId = (Long)ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0);
Long ackId = ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0);
DataHelper.toLong(data, off, 4, ackId.longValue());
off += 4;
if (msg != null) // logging it
@@ -118,7 +137,7 @@ public class PacketBuilder {
// leave it blank for now, since we could skip some
off++;
for (int i = 0; i < partialACKsRemaining.size(); i++) {
ACKBitfield bitfield = (ACKBitfield)partialACKsRemaining.get(i);
ACKBitfield bitfield = partialACKsRemaining.get(i);
if (bitfield.receivedComplete()) continue;
DataHelper.toLong(data, off, 4, bitfield.getMessageId());
off += 4;
@@ -214,15 +233,18 @@ public class PacketBuilder {
// We use this for keepalive purposes.
// It doesn't generate a reply, but that's ok.
public UDPPacket buildPing(PeerState peer) {
return buildACK(peer, new ArrayList(0));
return buildACK(peer, Collections.EMPTY_LIST);
}
private static final int ACK_PRIORITY = 1;
/**
* Build the ack packet. The list need not be sorted into full and partial;
* this method will put all fulls before the partials in the outgoing packet.
*
* @param ackBitfields list of ACKBitfield instances to either fully or partially ACK
*/
public UDPPacket buildACK(PeerState peer, List ackBitfields) {
public UDPPacket buildACK(PeerState peer, List<ACKBitfield> ackBitfields) {
UDPPacket packet = UDPPacket.acquire(_context, false);
StringBuilder msg = null;
@@ -263,7 +285,7 @@ public class PacketBuilder {
DataHelper.toLong(data, off, 1, fullACKCount);
off++;
for (int i = 0; i < ackBitfields.size(); i++) {
ACKBitfield bf = (ACKBitfield)ackBitfields.get(i);
ACKBitfield bf = ackBitfields.get(i);
if (bf.receivedComplete()) {
DataHelper.toLong(data, off, 4, bf.getMessageId());
off += 4;
@@ -415,7 +437,7 @@ public class PacketBuilder {
authenticate(packet, ourIntroKey, ourIntroKey, iv);
setTo(packet, to, state.getSentPort());
_ivCache.release(iv);
packet.setMessageType(53);
packet.setMessageType(TYPE_CREAT);
return packet;
}
@@ -479,7 +501,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, state.getIntroKey(), state.getIntroKey());
setTo(packet, to, state.getSentPort());
packet.setMessageType(52);
packet.setMessageType(TYPE_SREQ);
return packet;
}
@@ -586,7 +608,7 @@ public class PacketBuilder {
}
setTo(packet, to, state.getSentPort());
packet.setMessageType(51);
packet.setMessageType(TYPE_CONF);
return packet;
}
@@ -639,7 +661,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, toCipherKey, toMACKey);
setTo(packet, toIP, toPort);
packet.setMessageType(50);
packet.setMessageType(TYPE_TFA);
return packet;
}
@@ -684,7 +706,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, aliceIntroKey, aliceIntroKey);
setTo(packet, aliceIP, alicePort);
packet.setMessageType(49);
packet.setMessageType(TYPE_TTA);
return packet;
}
@@ -731,7 +753,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, charlieCipherKey, charlieMACKey);
setTo(packet, charlieIP, charliePort);
packet.setMessageType(48);
packet.setMessageType(TYPE_TBC);
return packet;
}
@@ -776,7 +798,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, bobCipherKey, bobMACKey);
setTo(packet, bobIP, bobPort);
packet.setMessageType(47);
packet.setMessageType(TYPE_TCB);
return packet;
}
@@ -875,7 +897,7 @@ public class PacketBuilder {
if (encrypt)
authenticate(packet, new SessionKey(introKey), new SessionKey(introKey));
setTo(packet, introHost, introPort);
packet.setMessageType(46);
packet.setMessageType(TYPE_RREQ);
return packet;
}
@@ -925,7 +947,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey());
setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort());
packet.setMessageType(45);
packet.setMessageType(TYPE_INTRO);
return packet;
}
@@ -986,7 +1008,7 @@ public class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, aliceIntroKey, aliceIntroKey);
setTo(packet, aliceAddr, alice.getPort());
packet.setMessageType(44);
packet.setMessageType(TYPE_RESP);
return packet;
}
@@ -1019,7 +1041,7 @@ public class PacketBuilder {
packet.getPacket().setLength(0);
setTo(packet, to, port);
packet.setMessageType(43);
packet.setMessageType(TYPE_PUNCH);
return packet;
}

View File

@@ -1,8 +1,6 @@
package net.i2p.router.transport.udp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
@@ -31,7 +29,7 @@ public class PacketHandler {
private PeerTestManager _testManager;
private IntroductionManager _introManager;
private boolean _keepReading;
private List _handlers;
private final Handler[] _handlers;
private static final int NUM_HANDLERS = 5;
/** let packets be up to 30s slow */
@@ -46,9 +44,9 @@ public class PacketHandler {
_inbound = inbound;
_testManager = testManager;
_introManager = introManager;
_handlers = new ArrayList(NUM_HANDLERS);
_handlers = new Handler[NUM_HANDLERS];
for (int i = 0; i < NUM_HANDLERS; i++) {
_handlers.add(new Handler());
_handlers[i] = new Handler();
}
_context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES);
@@ -81,9 +79,8 @@ public class PacketHandler {
public void startup() {
_keepReading = true;
for (int i = 0; i < _handlers.size(); i++) {
I2PThread t = new I2PThread((Handler)_handlers.get(i), "UDP Packet handler " + i + "/" + _handlers.size());
t.setDaemon(true);
for (int i = 0; i < NUM_HANDLERS; i++) {
I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + i + '/' + NUM_HANDLERS, true);
t.start();
}
}
@@ -94,10 +91,9 @@ public class PacketHandler {
String getHandlerStatus() {
StringBuilder rv = new StringBuilder();
int size = _handlers.size();
rv.append("Handlers: ").append(size);
for (int i = 0; i < size; i++) {
Handler handler = (Handler)_handlers.get(i);
rv.append("Handlers: ").append(NUM_HANDLERS);
for (int i = 0; i < NUM_HANDLERS; i++) {
Handler handler = _handlers[i];
rv.append(" handler ").append(i).append(" state: ").append(handler._state);
}
return rv.toString();

View File

@@ -25,8 +25,7 @@ public class PacketPusher implements Runnable {
public void startup() {
_alive = true;
I2PThread t = new I2PThread(this, "UDP packet pusher");
t.setDaemon(true);
I2PThread t = new I2PThread(this, "UDP packet pusher", true);
t.start();
}
@@ -39,7 +38,8 @@ public class PacketPusher implements Runnable {
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
//_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
_sender.add(packets[i]);
}
}
} catch (Exception e) {

View File

@@ -8,12 +8,16 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.ConcurrentHashSet;
/**
* Contain all of the state about a UDP connection to a peer.
@@ -73,14 +77,22 @@ public class PeerState {
private int _consecutiveFailedSends;
/** when did we last have a failed send (beginning of period) */
// private long _lastFailedSendPeriod;
/** list of messageIds (Long) that we have received but not yet sent */
private final List _currentACKs;
/**
* Set of messageIds (Long) that we have received but not yet sent
* Since even with the smallest MTU we can fit 131 acks in a message,
* we are unlikely to get backed up on acks, so we don't keep
* them in any particular order.
*/
private final Set<Long> _currentACKs;
/**
* list of the most recent messageIds (Long) that we have received and sent
* an ACK for. We keep a few of these around to retransmit with _currentACKs,
* hopefully saving some spurious retransmissions
*/
private final List _currentACKsResend;
private final Queue<Long> _currentACKsResend;
/** when did we last send ACKs to the peer? */
private volatile long _lastACKSend;
/** when did we decide we need to ACK to this peer? */
@@ -169,9 +181,9 @@ public class PeerState {
private long _packetsReceived;
/** list of InboundMessageState for active message */
private final Map _inboundMessages;
private final Map<Long, InboundMessageState> _inboundMessages;
/** list of OutboundMessageState */
private final List _outboundMessages;
private final List<OutboundMessageState> _outboundMessages;
/** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter;
@@ -180,8 +192,10 @@ public class PeerState {
/** have we migrated away from this peer to another newer one? */
private volatile boolean _dead;
/** Make sure a 4229 byte TunnelBuildMessage can be sent in one volley with small MTU */
private static final int MIN_CONCURRENT_MSGS = 8;
/** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
private volatile int _concurrentMessagesAllowed = 8;
private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
/**
* how many outbound messages are currently being transmitted. Not thread safe, as we're not strict
*/
@@ -203,6 +217,11 @@ public class PeerState {
* we need 522 fragment bytes to fit it in 2 packets - add 46 for SSU, 20
* for UDP, and 8 for IP, giving us 596. round up to mod 16, giving a total
* of 608
*
* Well, we really need to count the acks as well, especially
* 4 * MAX_RESEND_ACKS which can take up a significant amount of space.
* We reduce the max acks when using the small MTU but it may not be enough...
*
*/
private static final int MIN_MTU = 608;//600; //1500;
private static final int DEFAULT_MTU = MIN_MTU;
@@ -234,8 +253,8 @@ public class PeerState {
_currentReceiveSecond = -1;
_lastSendTime = -1;
_lastReceiveTime = -1;
_currentACKs = new ArrayList(8);
_currentACKsResend = new ArrayList(8);
_currentACKs = new ConcurrentHashSet();
_currentACKsResend = new LinkedBlockingQueue();
_currentSecondECNReceived = false;
_remoteWantsPreviousACKs = false;
_sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES;
@@ -582,12 +601,9 @@ public class PeerState {
_context.statManager().addRateData("udp.receiveBps", _receiveBps, 0);
}
synchronized (_currentACKs) {
if (_wantACKSendSince <= 0)
_wantACKSendSince = now;
if (!_currentACKs.contains(messageId))
_currentACKs.add(messageId);
}
if (_wantACKSendSince <= 0)
_wantACKSendSince = now;
_currentACKs.add(messageId);
_messagesReceived++;
}
@@ -600,7 +616,8 @@ public class PeerState {
* Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages.
* Access to this map must be synchronized explicitly!
*/
public Map getInboundMessages() { return _inboundMessages; }
public Map<Long, InboundMessageState> getInboundMessages() { return _inboundMessages; }
/**
* Expire partially received inbound messages, returning how many are still pending.
* This should probably be fired periodically, in case a peer goes silent and we don't
@@ -661,26 +678,36 @@ public class PeerState {
* removeACKMessage(Long) should be called.
*
*/
public List getCurrentFullACKs() {
synchronized (_currentACKs) {
ArrayList rv = new ArrayList(_currentACKs);
public List<Long> getCurrentFullACKs() {
ArrayList<Long> rv = new ArrayList(_currentACKs);
// include some for retransmission
rv.addAll(_currentACKsResend);
return rv;
}
}
public void removeACKMessage(Long messageId) {
synchronized (_currentACKs) {
_currentACKs.remove(messageId);
_currentACKsResend.add(messageId);
_currentACKsResend.offer(messageId);
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
_currentACKsResend.remove(0);
}
_lastACKSend = _context.clock().now();
_currentACKsResend.poll();
_lastACKSend = _context.clock().now();
}
/**
* The max number of acks we save to send as duplicates
*/
private static final int MAX_RESEND_ACKS = 16;
/**
* The number of duplicate acks sent in each messge -
* Warning, this directly affects network overhead
* Was 16 but that's too much (64 bytes in a max 608 byte packet,
* and often much smaller)
* @since 0.7.13
*/
private static final int MAX_RESEND_ACKS_LARGE = 9;
/** for small MTU */
private static final int MAX_RESEND_ACKS_SMALL = 4;
/**
* grab a list of ACKBitfield instances, some of which may fully
@@ -691,51 +718,75 @@ public class PeerState {
* will be unchanged if there are ACKs remaining.
*
*/
public List retrieveACKBitfields() { return retrieveACKBitfields(true); }
public List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
List rv = null;
public List<ACKBitfield> retrieveACKBitfields() { return retrieveACKBitfields(true); }
public List<ACKBitfield> retrieveACKBitfields(boolean alwaysIncludeRetransmissions) {
List<ACKBitfield> rv = new ArrayList(MAX_RESEND_ACKS);
int bytesRemaining = countMaxACKData();
synchronized (_currentACKs) {
rv = new ArrayList(16); //_currentACKs.size());
int oldIndex = _currentACKsResend.size();
while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) {
Long val = (Long)_currentACKs.remove(0);
// Limit the overhead of all the resent acks when using small MTU
// 64 bytes in a 608-byte packet is too much...
// Send a random subset of all the queued resend acks.
int resendSize = _currentACKsResend.size();
int maxResendAcks;
if (bytesRemaining < MIN_MTU)
maxResendAcks = MAX_RESEND_ACKS_SMALL;
else
maxResendAcks = MAX_RESEND_ACKS_LARGE;
List<Long> randomResends = new ArrayList(_currentACKsResend);
// As explained above, we include the acks in any order
// since we are unlikely to get backed up -
// just take them using the Set iterator.
Iterator<Long> iter = _currentACKs.iterator();
while (bytesRemaining >= 4 && iter.hasNext()) {
Long val = iter.next();
iter.remove();
long id = val.longValue();
rv.add(new FullACKBitfield(id));
_currentACKsResend.add(val);
_currentACKsResend.offer(val);
bytesRemaining -= 4;
}
if (_currentACKs.size() <= 0)
if (_currentACKs.isEmpty())
_wantACKSendSince = -1;
if (alwaysIncludeRetransmissions || rv.size() > 0) {
// now repeat by putting in some old ACKs
for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) {
Long cur = (Long)_currentACKsResend.get(i);
// randomly selected from the Resend queue.
// Maybe we should only resend each one a certain number of times...
int oldIndex = Math.min(resendSize, maxResendAcks);
if (oldIndex > 0 && oldIndex < resendSize)
Collections.shuffle(randomResends, _context.random());
iter = randomResends.iterator();
while (bytesRemaining >= 4 && oldIndex-- > 0 && iter.hasNext()) {
Long cur = iter.next();
long c = cur.longValue();
FullACKBitfield bf = new FullACKBitfield(c);
rv.add(bf);
bytesRemaining -= 4;
// try to avoid duplicates ??
// ACKsResend is not checked for dups at add time
//if (rv.contains(bf)) {
// iter.remove();
//} else {
rv.add(bf);
bytesRemaining -= 4;
//}
}
}
// trim down the resends
while (_currentACKsResend.size() > MAX_RESEND_ACKS)
_currentACKsResend.remove(0);
}
_currentACKsResend.poll();
int partialIncluded = 0;
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
// we should try to find some packets to partially ACK
// (preferably the ones which have the most received fragments)
List partial = new ArrayList();
List<ACKBitfield> partial = new ArrayList();
fetchPartialACKs(partial);
// we may not be able to use them all, but lets try...
for (int i = 0; (bytesRemaining > 4) && (i < partial.size()); i++) {
ACKBitfield bitfield = (ACKBitfield)partial.get(i);
ACKBitfield bitfield = partial.get(i);
int bytes = (bitfield.fragmentCount() / 7) + 1;
if (bytesRemaining > bytes + 4) { // msgId + bitfields
if (rv == null)
rv = new ArrayList(partial.size());
rv.add(bitfield);
bytesRemaining -= bytes + 4;
partialIncluded++;
@@ -754,7 +805,7 @@ public class PeerState {
return rv;
}
void fetchPartialACKs(List rv) {
void fetchPartialACKs(List<ACKBitfield> rv) {
InboundMessageState states[] = null;
int curState = 0;
synchronized (_inboundMessages) {
@@ -762,9 +813,8 @@ public class PeerState {
if (numMessages <= 0)
return;
// todo: make this a list instead of a map, so we can iterate faster w/out the memory overhead?
int remaining = _inboundMessages.size();
for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) {
InboundMessageState state = (InboundMessageState)iter.next();
for (Iterator<InboundMessageState> iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = iter.next();
if (state.isExpired()) {
//if (_context instanceof RouterContext)
// ((RouterContext)_context).messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired partially received: " + state.toString());
@@ -795,6 +845,13 @@ public class PeerState {
public boolean received(int fragmentNum) { return true; }
public boolean receivedComplete() { return true; }
@Override
public int hashCode() { return (int) _msgId; }
@Override
public boolean equals(Object o) {
if (!(o instanceof FullACKBitfield)) return false;
return _msgId == ((ACKBitfield)o).getMessageId();
}
@Override
public String toString() { return "Full ACK of " + _msgId; }
}
@@ -825,8 +882,8 @@ public class PeerState {
}
} else {
int allow = _concurrentMessagesAllowed - 1;
if (allow < 8)
allow = 8;
if (allow < MIN_CONCURRENT_MSGS)
allow = MIN_CONCURRENT_MSGS;
_concurrentMessagesAllowed = allow;
}
if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES)
@@ -977,10 +1034,10 @@ public class PeerState {
public long getWantedACKSendSince() { return _wantACKSendSince; }
public boolean unsentACKThresholdReached() {
int threshold = countMaxACKData() / 4;
synchronized (_currentACKs) {
return _currentACKs.size() >= threshold;
}
return _currentACKs.size() >= threshold;
}
/** @return MTU - 83 */
private int countMaxACKData() {
return _mtu
- IP_HEADER_SIZE
@@ -1013,7 +1070,7 @@ public class PeerState {
state.setPeer(this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
if (msgs == null) return 0;
int rv = 0;
boolean fail = false;
@@ -1068,12 +1125,12 @@ public class PeerState {
public void dropOutbound() {
//if (_dead) return;
_dead = true;
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
//_outboundMessages = null;
_retransmitter = null;
if (msgs != null) {
int sz = 0;
List tempList = null;
List<OutboundMessageState> tempList = null;
synchronized (msgs) {
sz = msgs.size();
if (sz > 0) {
@@ -1082,12 +1139,14 @@ public class PeerState {
}
}
for (int i = 0; i < sz; i++)
_transport.failed((OutboundMessageState)tempList.get(i), false);
_transport.failed(tempList.get(i), false);
}
// so the ACKSender will drop this peer from its queue
_wantACKSendSince = -1;
}
public int getOutboundMessageCount() {
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return 0;
if (msgs != null) {
synchronized (msgs) {
@@ -1104,17 +1163,17 @@ public class PeerState {
*/
public int finishMessages() {
int rv = 0;
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) {
dropOutbound();
return 0;
}
List succeeded = null;
List failed = null;
List<OutboundMessageState> succeeded = null;
List<OutboundMessageState> failed = null;
synchronized (msgs) {
int size = msgs.size();
for (int i = 0; i < size; i++) {
OutboundMessageState state = (OutboundMessageState)msgs.get(i);
OutboundMessageState state = msgs.get(i);
if (state.isComplete()) {
msgs.remove(i);
i--;
@@ -1147,7 +1206,7 @@ public class PeerState {
}
for (int i = 0; succeeded != null && i < succeeded.size(); i++) {
OutboundMessageState state = (OutboundMessageState)succeeded.get(i);
OutboundMessageState state = succeeded.get(i);
_transport.succeeded(state);
state.releaseResources();
OutNetMessage msg = state.getMessage();
@@ -1156,7 +1215,7 @@ public class PeerState {
}
for (int i = 0; failed != null && i < failed.size(); i++) {
OutboundMessageState state = (OutboundMessageState)failed.get(i);
OutboundMessageState state = failed.get(i);
OutNetMessage msg = state.getMessage();
if (msg != null) {
msg.timestamp("expired in the active pool");
@@ -1180,12 +1239,12 @@ public class PeerState {
*/
public OutboundMessageState allocateSend() {
int total = 0;
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return null;
synchronized (msgs) {
int size = msgs.size();
for (int i = 0; i < size; i++) {
OutboundMessageState state = (OutboundMessageState)msgs.get(i);
OutboundMessageState state = msgs.get(i);
if (locked_shouldSend(state)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
@@ -1217,7 +1276,7 @@ public class PeerState {
public int getNextDelay() {
int rv = -1;
long now = _context.clock().now();
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return -1;
synchronized (msgs) {
if (_retransmitter != null) {
@@ -1229,7 +1288,7 @@ public class PeerState {
}
int size = msgs.size();
for (int i = 0; i < size; i++) {
OutboundMessageState state = (OutboundMessageState)msgs.get(i);
OutboundMessageState state = msgs.get(i);
int delay = (int)(state.getNextSendTime() - now);
if (delay <= 0)
delay = 1;
@@ -1346,12 +1405,12 @@ public class PeerState {
public int acked(long messageId) {
OutboundMessageState state = null;
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
if (_dead) return 0;
synchronized (msgs) {
int sz = msgs.size();
for (int i = 0; i < sz; i++) {
state = (OutboundMessageState)msgs.get(i);
state = msgs.get(i);
if (state.getMessageId() == messageId) {
msgs.remove(i);
break;
@@ -1407,13 +1466,13 @@ public class PeerState {
return;
}
List msgs = _outboundMessages;
List<OutboundMessageState> msgs = _outboundMessages;
OutboundMessageState state = null;
boolean isComplete = false;
synchronized (msgs) {
for (int i = 0; i < msgs.size(); i++) {
state = (OutboundMessageState)msgs.get(i);
state = msgs.get(i);
if (state.getMessageId() == bitfield.getMessageId()) {
boolean complete = state.acked(bitfield);
if (complete) {
@@ -1486,26 +1545,23 @@ public class PeerState {
_sendWindowBytes = oldPeer._sendWindowBytes;
oldPeer._dead = true;
List tmp = new ArrayList();
synchronized (oldPeer._currentACKs) {
tmp.addAll(oldPeer._currentACKs);
oldPeer._currentACKs.clear();
}
List<Long> tmp = new ArrayList();
tmp.addAll(oldPeer._currentACKs);
oldPeer._currentACKs.clear();
if (!_dead) {
synchronized (_currentACKs) { _currentACKs.addAll(tmp); }
_currentACKs.addAll(tmp);
}
tmp.clear();
synchronized (oldPeer._currentACKsResend) {
tmp.addAll(oldPeer._currentACKsResend);
oldPeer._currentACKsResend.clear();
}
tmp.addAll(oldPeer._currentACKsResend);
oldPeer._currentACKsResend.clear();
if (!_dead) {
synchronized (_currentACKsResend) { _currentACKsResend.addAll(tmp); }
_currentACKsResend.addAll(tmp);
}
tmp.clear();
Map msgs = new HashMap();
Map<Long, InboundMessageState> msgs = new HashMap();
synchronized (oldPeer._inboundMessages) {
msgs.putAll(oldPeer._inboundMessages);
oldPeer._inboundMessages.clear();
@@ -1515,20 +1571,20 @@ public class PeerState {
}
msgs.clear();
List<OutboundMessageState> tmp2 = new ArrayList();
OutboundMessageState retransmitter = null;
synchronized (oldPeer._outboundMessages) {
tmp.addAll(oldPeer._outboundMessages);
tmp2.addAll(oldPeer._outboundMessages);
oldPeer._outboundMessages.clear();
retransmitter = oldPeer._retransmitter;
oldPeer._retransmitter = null;
}
if (!_dead) {
synchronized (_outboundMessages) {
_outboundMessages.addAll(tmp);
_outboundMessages.addAll(tmp2);
_retransmitter = retransmitter;
}
}
tmp.clear();
}
/*

View File

@@ -2,11 +2,10 @@ package net.i2p.router.transport.udp;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
@@ -102,7 +101,7 @@ class PeerTestManager {
private PeerTestState _currentTest;
private boolean _currentTestComplete;
/** as Alice */
private List<Long> _recentTests;
private Queue<Long> _recentTests;
/** longest we will keep track of a Charlie nonce for */
private static final int MAX_CHARLIE_LIFETIME = 10*1000;
@@ -116,8 +115,8 @@ class PeerTestManager {
_context = context;
_transport = transport;
_log = context.logManager().getLog(PeerTestManager.class);
_activeTests = new HashMap(64);
_recentTests = Collections.synchronizedList(new ArrayList(16));
_activeTests = new ConcurrentHashMap();
_recentTests = new LinkedBlockingQueue();
_packetBuilder = new PacketBuilder(context, transport);
_currentTest = null;
_currentTestComplete = false;
@@ -155,8 +154,8 @@ class PeerTestManager {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Running test with bob = " + bobIP + ":" + bobPort + " " + test.getNonce());
while (_recentTests.size() > 16)
_recentTests.remove(0);
_recentTests.add(new Long(test.getNonce()));
_recentTests.poll();
_recentTests.offer(new Long(test.getNonce()));
sendTestToBob();
@@ -435,10 +434,7 @@ class PeerTestManager {
testInfo.readIP(testIP, 0);
}
PeerTestState state = null;
synchronized (_activeTests) {
state = (PeerTestState)_activeTests.get(new Long(nonce));
}
PeerTestState state = _activeTests.get(new Long(nonce));
if (state == null) {
if ( (testIP == null) || (testPort <= 0) ) {
@@ -542,9 +538,7 @@ class PeerTestManager {
_log.debug("Receive from bob (" + from + ") as charlie, sending back to bob and sending to alice @ " + aliceIP + ":" + alicePort);
if (isNew) {
synchronized (_activeTests) {
_activeTests.put(new Long(nonce), state);
}
_activeTests.put(new Long(nonce), state);
SimpleScheduler.getInstance().addEvent(new RemoveTest(nonce), MAX_CHARLIE_LIFETIME);
}
@@ -623,9 +617,7 @@ class PeerTestManager {
}
if (isNew) {
synchronized (_activeTests) {
_activeTests.put(new Long(nonce), state);
}
_activeTests.put(new Long(nonce), state);
SimpleScheduler.getInstance().addEvent(new RemoveTest(nonce), MAX_CHARLIE_LIFETIME);
}
@@ -701,9 +693,7 @@ class PeerTestManager {
_nonce = nonce;
}
public void timeReached() {
synchronized (_activeTests) {
_activeTests.remove(new Long(_nonce));
}
}
}
}

View File

@@ -33,71 +33,71 @@ class PeerTestState {
public static final short BOB = 2;
public static final short CHARLIE = 3;
public synchronized long getNonce() { return _testNonce; }
public synchronized void setNonce(long nonce) { _testNonce = nonce; }
public long getNonce() { return _testNonce; }
public void setNonce(long nonce) { _testNonce = nonce; }
/** Are we Alice, bob, or Charlie. */
public synchronized short getOurRole() { return _ourRole; }
public synchronized void setOurRole(short role) { _ourRole = role; }
public short getOurRole() { return _ourRole; }
public void setOurRole(short role) { _ourRole = role; }
/**
* If we are Alice, this will contain the IP that Bob says we
* can be reached at - the IP Charlie says we can be reached
* at is _aliceIPFromCharlie
*
*/
public synchronized InetAddress getAliceIP() { return _aliceIP; }
public synchronized void setAliceIP(InetAddress ip) { _aliceIP = ip; }
public synchronized InetAddress getBobIP() { return _bobIP; }
public synchronized void setBobIP(InetAddress ip) { _bobIP = ip; }
public synchronized InetAddress getCharlieIP() { return _charlieIP; }
public synchronized void setCharlieIP(InetAddress ip) { _charlieIP = ip; }
public synchronized InetAddress getAliceIPFromCharlie() { return _aliceIPFromCharlie; }
public synchronized void setAliceIPFromCharlie(InetAddress ip) { _aliceIPFromCharlie = ip; }
public InetAddress getAliceIP() { return _aliceIP; }
public void setAliceIP(InetAddress ip) { _aliceIP = ip; }
public InetAddress getBobIP() { return _bobIP; }
public void setBobIP(InetAddress ip) { _bobIP = ip; }
public InetAddress getCharlieIP() { return _charlieIP; }
public void setCharlieIP(InetAddress ip) { _charlieIP = ip; }
public InetAddress getAliceIPFromCharlie() { return _aliceIPFromCharlie; }
public void setAliceIPFromCharlie(InetAddress ip) { _aliceIPFromCharlie = ip; }
/**
* If we are Alice, this will contain the port that Bob says we
* can be reached at - the port Charlie says we can be reached
* at is _alicePortFromCharlie
*
*/
public synchronized int getAlicePort() { return _alicePort; }
public synchronized void setAlicePort(int alicePort) { _alicePort = alicePort; }
public synchronized int getBobPort() { return _bobPort; }
public synchronized void setBobPort(int bobPort) { _bobPort = bobPort; }
public synchronized int getCharliePort() { return _charliePort; }
public synchronized void setCharliePort(int charliePort) { _charliePort = charliePort; }
public int getAlicePort() { return _alicePort; }
public void setAlicePort(int alicePort) { _alicePort = alicePort; }
public int getBobPort() { return _bobPort; }
public void setBobPort(int bobPort) { _bobPort = bobPort; }
public int getCharliePort() { return _charliePort; }
public void setCharliePort(int charliePort) { _charliePort = charliePort; }
public synchronized int getAlicePortFromCharlie() { return _alicePortFromCharlie; }
public synchronized void setAlicePortFromCharlie(int alicePortFromCharlie) { _alicePortFromCharlie = alicePortFromCharlie; }
public int getAlicePortFromCharlie() { return _alicePortFromCharlie; }
public void setAlicePortFromCharlie(int alicePortFromCharlie) { _alicePortFromCharlie = alicePortFromCharlie; }
public synchronized SessionKey getAliceIntroKey() { return _aliceIntroKey; }
public synchronized void setAliceIntroKey(SessionKey key) { _aliceIntroKey = key; }
public synchronized SessionKey getCharlieIntroKey() { return _charlieIntroKey; }
public synchronized void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; }
public synchronized SessionKey getBobCipherKey() { return _bobCipherKey; }
public synchronized void setBobCipherKey(SessionKey key) { _bobCipherKey = key; }
public synchronized SessionKey getBobMACKey() { return _bobMACKey; }
public synchronized void setBobMACKey(SessionKey key) { _bobMACKey = key; }
public SessionKey getAliceIntroKey() { return _aliceIntroKey; }
public void setAliceIntroKey(SessionKey key) { _aliceIntroKey = key; }
public SessionKey getCharlieIntroKey() { return _charlieIntroKey; }
public void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; }
public SessionKey getBobCipherKey() { return _bobCipherKey; }
public void setBobCipherKey(SessionKey key) { _bobCipherKey = key; }
public SessionKey getBobMACKey() { return _bobMACKey; }
public void setBobMACKey(SessionKey key) { _bobMACKey = key; }
/** when did this test begin? */
public synchronized long getBeginTime() { return _beginTime; }
public synchronized void setBeginTime(long when) { _beginTime = when; }
public long getBeginTime() { return _beginTime; }
public void setBeginTime(long when) { _beginTime = when; }
/** when did we last send out a packet? */
public synchronized long getLastSendTime() { return _lastSendTime; }
public synchronized void setLastSendTime(long when) { _lastSendTime = when; }
public long getLastSendTime() { return _lastSendTime; }
public void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last hear from alice? */
public synchronized long getReceiveAliceTime() { return _receiveAliceTime; }
public synchronized void setReceiveAliceTime(long when) { _receiveAliceTime = when; }
public long getReceiveAliceTime() { return _receiveAliceTime; }
public void setReceiveAliceTime(long when) { _receiveAliceTime = when; }
/** when did we last hear from bob? */
public synchronized long getReceiveBobTime() { return _receiveBobTime; }
public synchronized void setReceiveBobTime(long when) { _receiveBobTime = when; }
public long getReceiveBobTime() { return _receiveBobTime; }
public void setReceiveBobTime(long when) { _receiveBobTime = when; }
/** when did we last hear from charlie? */
public synchronized long getReceiveCharlieTime() { return _receiveCharlieTime; }
public synchronized void setReceiveCharlieTime(long when) { _receiveCharlieTime = when; }
public long getReceiveCharlieTime() { return _receiveCharlieTime; }
public void setReceiveCharlieTime(long when) { _receiveCharlieTime = when; }
public int getPacketsRelayed() { return _packetsRelayed; }
public void incrementPacketsRelayed() { ++_packetsRelayed; }
@Override
public synchronized String toString() {
public String toString() {
StringBuilder buf = new StringBuilder(512);
buf.append("Role: ");
if (_ourRole == ALICE) buf.append("Alice");

View File

@@ -2,9 +2,9 @@ package net.i2p.router.transport.udp;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
@@ -40,15 +40,17 @@ public class UDPPacket {
private int _validateCount;
// private boolean _isInbound;
private static final List _packetCache;
private static final Queue<UDPPacket> _packetCache;
private static final boolean CACHE = true;
private static final int CACHE_SIZE = 64;
static {
_packetCache = new ArrayList(256);
if (CACHE)
_packetCache = new LinkedBlockingQueue(CACHE_SIZE);
else
_packetCache = null;
_log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class);
}
private static final boolean CACHE = true; // TODO: support caching to cut churn down a /lot/
private static final int CACHE_SIZE = 64;
static final int MAX_PACKET_SIZE = 2048;
public static final int IV_SIZE = 16;
public static final int MAC_SIZE = 16;
@@ -121,7 +123,9 @@ public class UDPPacket {
private int _messageType;
private int _fragmentCount;
/** only for debugging and stats, does not go on the wire */
int getMessageType() { return _messageType; }
/** only for debugging and stats, does not go on the wire */
void setMessageType(int type) { _messageType = type; }
int getFragmentCount() { return _fragmentCount; }
void setFragmentCount(int count) { _fragmentCount = count; }
@@ -238,7 +242,7 @@ public class UDPPacket {
@Override
public String toString() {
verifyNotReleased();
StringBuilder buf = new StringBuilder(64);
StringBuilder buf = new StringBuilder(256);
buf.append(_packet.getLength());
buf.append(" byte packet with ");
buf.append(_packet.getAddress().getHostAddress()).append(":");
@@ -256,12 +260,7 @@ public class UDPPacket {
public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) {
UDPPacket rv = null;
if (CACHE) {
synchronized (_packetCache) {
if (_packetCache.size() > 0) {
rv = (UDPPacket)_packetCache.remove(0);
}
}
rv = _packetCache.poll();
if (rv != null)
rv.init(ctx, inbound);
}
@@ -284,11 +283,7 @@ public class UDPPacket {
//_dataCache.release(_dataBuf);
if (!CACHE)
return;
synchronized (_packetCache) {
if (_packetCache.size() <= CACHE_SIZE) {
_packetCache.add(this);
}
}
_packetCache.offer(this);
}
private void verifyNotReleased() {

View File

@@ -402,7 +402,7 @@ public class UDPPacketReader {
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);
StringBuilder buf = new StringBuilder(512);
long msAgo = _context.clock().now() - readTimestamp()*1000;
buf.append("Data packet sent ").append(msAgo).append("ms ago ");
buf.append("IV ");

View File

@@ -2,8 +2,8 @@ package net.i2p.router.transport.udp;
import java.io.IOException;
import java.net.DatagramSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
@@ -24,19 +24,20 @@ public class UDPReceiver {
private Log _log;
private DatagramSocket _socket;
private String _name;
private final List _inboundQueue;
private final BlockingQueue<UDPPacket> _inboundQueue;
private boolean _keepRunning;
private Runner _runner;
private UDPTransport _transport;
// private static int __id;
private static int __id;
private int _id;
private static final int TYPE_POISON = -99999;
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPReceiver.class);
_id++;
_id = ++__id;
_name = name;
_inboundQueue = new ArrayList(128);
_inboundQueue = new LinkedBlockingQueue();
_socket = socket;
_transport = transport;
_runner = new Runner();
@@ -50,17 +51,22 @@ public class UDPReceiver {
public void startup() {
//adjustDropProbability();
_keepRunning = true;
I2PThread t = new I2PThread(_runner, _name + "." + _id);
t.setDaemon(true);
I2PThread t = new I2PThread(_runner, _name + '.' + _id, true);
t.start();
}
public void shutdown() {
_keepRunning = false;
synchronized (_inboundQueue) {
_inboundQueue.clear();
_inboundQueue.notifyAll();
_inboundQueue.clear();
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_inboundQueue.offer(poison);
for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_inboundQueue.clear();
}
/*********
@@ -96,6 +102,7 @@ public class UDPReceiver {
private static final int ARTIFICIAL_DELAY_BASE = 0; //600;
**********/
/** @return zero (was queue size) */
private int receive(UDPPacket packet) {
/*********
//adjustDropProbability();
@@ -126,7 +133,12 @@ public class UDPReceiver {
return doReceive(packet);
}
/** @return zero (was queue size) */
private final int doReceive(UDPPacket packet) {
if (!_keepRunning)
return 0;
if (_log.shouldLog(Log.INFO))
_log.info("Received: " + packet);
@@ -143,26 +155,25 @@ public class UDPReceiver {
boolean rejected = false;
int queueSize = 0;
long headPeriod = 0;
synchronized (_inboundQueue) {
queueSize = _inboundQueue.size();
if (queueSize > 0) {
headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime();
UDPPacket head = _inboundQueue.peek();
if (head != null) {
headPeriod = head.getLifetime();
if (headPeriod > MAX_QUEUE_PERIOD) {
rejected = true;
_inboundQueue.notifyAll();
}
}
if (!rejected) {
_inboundQueue.add(packet);
_inboundQueue.notifyAll();
return queueSize + 1;
_inboundQueue.offer(packet);
//return queueSize + 1;
return 0;
}
}
// rejected
packet.release();
_context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod);
if (_log.shouldLog(Log.WARN)) {
queueSize = _inboundQueue.size();
StringBuilder msg = new StringBuilder();
msg.append("Dropping inbound packet with ");
msg.append(queueSize);
@@ -188,21 +199,15 @@ public class UDPReceiver {
*/
public UDPPacket receiveNext() {
UDPPacket rv = null;
int remaining = 0;
while (_keepRunning) {
synchronized (_inboundQueue) {
if (_inboundQueue.size() <= 0)
try { _inboundQueue.wait(); } catch (InterruptedException ie) {}
if (_inboundQueue.size() > 0) {
rv = (UDPPacket)_inboundQueue.remove(0);
remaining = _inboundQueue.size();
if (remaining > 0)
_inboundQueue.notifyAll();
break;
}
}
//int remaining = 0;
while (_keepRunning && rv == null) {
try {
rv = _inboundQueue.take();
} catch (InterruptedException ie) {}
if (rv != null && rv.getMessageType() == TYPE_POISON)
return null;
}
_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
//_context.statManager().addRateData("udp.receiveRemaining", remaining, 0);
return rv;
}

View File

@@ -3,8 +3,8 @@ package net.i2p.router.transport.udp;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
@@ -20,16 +20,17 @@ public class UDPSender {
private Log _log;
private DatagramSocket _socket;
private String _name;
private final List _outboundQueue;
private final BlockingQueue<UDPPacket> _outboundQueue;
private boolean _keepRunning;
private Runner _runner;
private static final int TYPE_POISON = 99999;
private static final int MAX_QUEUED = 4;
//private static final int MAX_QUEUED = 4;
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPSender.class);
_outboundQueue = new ArrayList(128);
_outboundQueue = new LinkedBlockingQueue();
_socket = socket;
_runner = new Runner();
_name = name;
@@ -44,49 +45,40 @@ public class UDPSender {
// used in RouterWatchdog
_context.statManager().createRateStat("udp.sendException", "How frequently we fail to send a packet (likely due to a windows exception)", "udp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("udp.sendPacketSize.1", "db store message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.2", "db lookup message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.3", "db search reply message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.6", "tunnel create message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.7", "tunnel create status message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.10", "delivery status message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.11", "garlic message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.16", "date message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.18", "tunnel data message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.19", "tunnel gateway message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.20", "data message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.21", "tunnel build", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.22", "tunnel build reply", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.20", "data message size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.42", "ack-only packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.43", "hole punch packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.44", "relay response packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.45", "relay intro packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.46", "relay request packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.47", "peer test charlie to bob packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.48", "peer test bob to charlie packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.49", "peer test to alice packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.50", "peer test from alice packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.51", "session confirmed packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.52", "session request packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize.53", "session created packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_ACK, "ack-only packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_PUNCH, "hole punch packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_RESP, "relay response packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_INTRO, "relay intro packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_RREQ, "relay request packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TCB, "peer test charlie to bob packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TBC, "peer test bob to charlie packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TTA, "peer test to alice packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TFA, "peer test from alice packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_CONF, "session confirmed packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_SREQ, "session request packet size", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_CREAT, "session created packet size", "udp", UDPTransport.RATES);
}
public void startup() {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Starting the runner: " + _name);
_keepRunning = true;
I2PThread t = new I2PThread(_runner, _name);
t.setDaemon(true);
I2PThread t = new I2PThread(_runner, _name, true);
t.start();
}
public void shutdown() {
_keepRunning = false;
synchronized (_outboundQueue) {
_outboundQueue.clear();
_outboundQueue.notifyAll();
_outboundQueue.clear();
UDPPacket poison = UDPPacket.acquire(_context, false);
poison.setMessageType(TYPE_POISON);
_outboundQueue.offer(poison);
for (int i = 1; i <= 5 && !_outboundQueue.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
} catch (InterruptedException ie) {}
}
_outboundQueue.clear();
}
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
@@ -98,10 +90,12 @@ public class UDPSender {
* Add the packet to the queue. This may block until there is space
* available, if requested, otherwise it returns immediately
*
* @param blockTime how long to block
* @param blockTime how long to block IGNORED
* @return number of packets queued
* @deprecated use add(packet)
*/
public int add(UDPPacket packet, int blockTime) {
/********
//long expiration = _context.clock().now() + blockTime;
int remaining = -1;
long lifetime = -1;
@@ -124,13 +118,12 @@ public class UDPSender {
}
}
//if (true || (_outboundQueue.size() < MAX_QUEUED)) {
if (true || (_outboundQueue.size() < MAX_QUEUED)) {
lifetime = packet.getLifetime();
_outboundQueue.add(packet);
added = true;
remaining = _outboundQueue.size();
_outboundQueue.notifyAll();
/*****
} else {
long remainingTime = expiration - _context.clock().now();
if (remainingTime > 0) {
@@ -141,7 +134,6 @@ public class UDPSender {
}
lifetime = packet.getLifetime();
}
*****/
}
//} catch (InterruptedException ie) {}
}
@@ -153,42 +145,26 @@ public class UDPSender {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime);
return remaining;
********/
return add(packet);
}
private static final int MAX_HEAD_LIFETIME = 1000;
/**
*
* @return number of packets in the queue
* Put it on the queue
* @return ZERO (used to be number of packets in the queue)
*/
public int add(UDPPacket packet) {
if (packet == null) return 0;
if (packet == null || !_keepRunning) return 0;
int size = 0;
long lifetime = -1;
int removed = 0;
synchronized (_outboundQueue) {
lifetime = packet.getLifetime();
UDPPacket head = null;
if (_outboundQueue.size() > 0) {
head = (UDPPacket)_outboundQueue.get(0);
while (head.getLifetime() > MAX_HEAD_LIFETIME) {
_outboundQueue.remove(0);
removed++;
if (_outboundQueue.size() > 0)
head = (UDPPacket)_outboundQueue.get(0);
else
break;
}
}
_outboundQueue.add(packet);
_outboundQueue.offer(packet);
//size = _outboundQueue.size();
//_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
if (_log.shouldLog(Log.DEBUG)) {
size = _outboundQueue.size();
_outboundQueue.notifyAll();
_log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + packet.getLifetime());
}
_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
if (removed > 0)
_context.statManager().addRateData("udp.sendQueueTrimmed", removed, size);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + lifetime);
return size;
}
@@ -227,7 +203,8 @@ public class UDPSender {
//_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size));
}
_context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount());
if (packet.getMessageType() >= PacketBuilder.TYPE_FIRST)
_context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount());
//packet.getPacket().setLength(size);
try {
@@ -267,20 +244,17 @@ public class UDPSender {
_log.debug("Stop sending...");
}
/** @return next packet in queue. Will discard any packet older than MAX_HEAD_LIFETIME */
private UDPPacket getNextPacket() {
UDPPacket packet = null;
while ( (_keepRunning) && (packet == null) ) {
while ( (_keepRunning) && (packet == null || packet.getLifetime() > MAX_HEAD_LIFETIME) ) {
if (packet != null)
_context.statManager().addRateData("udp.sendQueueTrimmed", 1, 0);
try {
synchronized (_outboundQueue) {
if (_outboundQueue.size() <= 0) {
_outboundQueue.notifyAll();
_outboundQueue.wait();
} else {
packet = (UDPPacket)_outboundQueue.remove(0);
_outboundQueue.notifyAll();
}
}
packet = _outboundQueue.take();
} catch (InterruptedException ie) {}
if (packet != null && packet.getMessageType() == TYPE_POISON)
return null;
}
return packet;
}

View File

@@ -13,8 +13,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
@@ -31,6 +33,7 @@ import net.i2p.router.RouterContext;
import net.i2p.router.transport.Transport;
import net.i2p.router.transport.TransportBid;
import net.i2p.router.transport.TransportImpl;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
@@ -75,8 +78,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** introduction key */
private SessionKey _introKey;
/** list of RemoteHostId for peers whose packets we want to drop outright */
private final List<RemoteHostId> _dropList;
/**
* List of RemoteHostId for peers whose packets we want to drop outright
* This is only for old network IDs (pre-0.6.1.10), so it isn't really used now.
*/
private final Set<RemoteHostId> _dropList;
private int _expireTimeout;
@@ -167,9 +173,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
super(ctx);
_context = ctx;
_log = ctx.logManager().getLog(UDPTransport.class);
_peersByIdent = new HashMap(128);
_peersByRemoteHost = new HashMap(128);
_dropList = new ArrayList(256);
_peersByIdent = new ConcurrentHashMap(128);
_peersByRemoteHost = new ConcurrentHashMap(128);
_dropList = new ConcurrentHashSet(2);
_endpoint = null;
// See comments in DQAT.java
@@ -608,9 +614,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* if no state exists
*/
PeerState getPeerState(RemoteHostId hostInfo) {
synchronized (_peersByRemoteHost) {
return _peersByRemoteHost.get(hostInfo);
}
}
/**
@@ -618,9 +622,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
* if no state exists
*/
public PeerState getPeerState(Hash remotePeer) {
synchronized (_peersByIdent) {
return _peersByIdent.get(remotePeer);
}
}
/**
@@ -697,14 +699,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long oldEstablishedOn = -1;
PeerState oldPeer = null;
if (remotePeer != null) {
synchronized (_peersByIdent) {
oldPeer = _peersByIdent.put(remotePeer, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
oldEstablishedOn = oldPeer.getKeyEstablishedTime();
}
}
}
if (oldPeer != null) {
@@ -717,13 +717,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId == null) return false;
synchronized (_peersByRemoteHost) {
oldPeer = _peersByRemoteHost.put(remoteId, peer);
if ( (oldPeer != null) && (oldPeer != peer) ) {
// transfer over the old state/inbound message fragments/etc
peer.loadFrom(oldPeer);
oldEstablishedOn = oldPeer.getKeyEstablishedTime();
}
}
if (oldPeer != null) {
@@ -773,6 +771,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
DatabaseStoreMessage dsm = (DatabaseStoreMessage)inMsg;
if ( (dsm.getRouterInfo() != null) &&
(dsm.getRouterInfo().getNetworkId() != Router.NETWORK_ID) ) {
// this is pre-0.6.1.10, so it isn't going to happen any more
/*
if (remoteIdentHash != null) {
_context.shitlist().shitlistRouter(remoteIdentHash, "Sent us a peer from the wrong network");
@@ -792,21 +792,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
PeerState peer = getPeerState(peerHash);
if (peer != null) {
RemoteHostId remote = peer.getRemoteHostId();
boolean added = false;
int droplistSize = 0;
synchronized (_dropList) {
if (!_dropList.contains(remote)) {
while (_dropList.size() > MAX_DROPLIST_SIZE)
_dropList.remove(0);
_dropList.add(remote);
added = true;
}
droplistSize = _dropList.size();
}
if (added) {
_context.statManager().addRateData("udp.dropPeerDroplist", droplistSize, 0);
SimpleScheduler.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD);
}
_dropList.add(remote);
_context.statManager().addRateData("udp.dropPeerDroplist", 1, 0);
SimpleScheduler.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD);
}
markUnreachable(peerHash);
_context.shitlist().shitlistRouter(peerHash, "Part of the wrong network, version = " + dsm.getRouterInfo().getOption("router.version"));
@@ -838,13 +826,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private RemoteHostId _peer;
public RemoveDropList(RemoteHostId peer) { _peer = peer; }
public void timeReached() {
synchronized (_dropList) {
_dropList.remove(_peer);
}
_dropList.remove(_peer);
}
}
boolean isInDropList(RemoteHostId peer) { synchronized (_dropList) { return _dropList.contains(peer); } }
boolean isInDropList(RemoteHostId peer) { return _dropList.contains(peer); }
void dropPeer(Hash peer, boolean shouldShitlist, String why) {
PeerState state = getPeerState(peer);
@@ -916,16 +902,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
long now = _context.clock().now();
_context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
synchronized (_peersByIdent) {
altByIdent = _peersByIdent.remove(peer.getRemotePeer());
}
altByIdent = _peersByIdent.remove(peer.getRemotePeer());
}
RemoteHostId remoteId = peer.getRemoteHostId();
if (remoteId != null) {
synchronized (_peersByRemoteHost) {
altByHost = _peersByRemoteHost.remove(remoteId);
}
}
// unchoke 'em, but just because we'll never talk again...
@@ -1087,10 +1069,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// also introduce us, also bid aggressively so we are preferred over NTCP.
// (Otherwise we only talk UDP to those that are firewalled, and we will
// never get any introducers)
int count;
synchronized (_peersByIdent) {
count = _peersByIdent.size();
}
int count = _peersByIdent.size();
if (alwaysPreferUDP() || count < MIN_PEERS ||
(introducersRequired() && _introManager.introducerCount() < MIN_INTRODUCER_POOL))
return _cachedBid[SLOW_PREFERRED_BID];
@@ -1474,9 +1453,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
@Override
public int countPeers() {
synchronized (_peersByIdent) {
return _peersByIdent.size();
}
}
@Override
@@ -1484,7 +1461,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long now = _context.clock().now();
int active = 0;
int inactive = 0;
synchronized (_peersByIdent) {
for (Iterator<PeerState> iter = _peersByIdent.values().iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
if (now-peer.getLastReceiveTime() > 5*60*1000)
@@ -1492,7 +1468,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
else
active++;
}
}
return active;
}
@@ -1501,7 +1476,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long now = _context.clock().now();
int active = 0;
int inactive = 0;
synchronized (_peersByIdent) {
for (Iterator<PeerState> iter = _peersByIdent.values().iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
if (now-peer.getLastSendFullyTime() > 1*60*1000)
@@ -1509,7 +1483,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
else
active++;
}
}
return active;
}
@@ -1519,9 +1492,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
public boolean allowConnection() {
synchronized (_peersByIdent) {
return _peersByIdent.size() < getMaxConnections();
}
}
/**
@@ -1534,9 +1505,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
Vector<Long> skews = new Vector();
Vector<PeerState> peers = new Vector();
synchronized (_peersByIdent) {
peers.addAll(_peersByIdent.values());
}
peers.addAll(_peersByIdent.values());
// If our clock is way off, we may not have many (or any) successful connections,
// so try hard in that case to return good data
@@ -1557,15 +1526,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/** **internal, do not use** */
public static final UDPTransport _instance() { return __instance; }
/** **internal, do not use** return the peers (Hash) of active peers. */
public List _getActivePeers() {
List peers = new ArrayList(128);
synchronized (_peersByIdent) {
peers.addAll(_peersByIdent.keySet());
}
public List<Hash> _getActivePeers() {
List<Hash> peers = new ArrayList(128);
peers.addAll(_peersByIdent.keySet());
long now = _context.clock().now();
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
for (Iterator<Hash> iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = iter.next();
PeerState state = getPeerState(peer);
if (now-state.getLastReceiveTime() > 5*60*1000)
iter.remove(); // don't include old peers
@@ -1886,9 +1853,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
@Override
public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException {
TreeSet<PeerState> peers = new TreeSet(getComparator(sortFlags));
synchronized (_peersByIdent) {
peers.addAll(_peersByIdent.values());
}
peers.addAll(_peersByIdent.values());
long offsetTotal = 0;
int bpsIn = 0;
@@ -2205,12 +2170,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private final List _expirePeers;
private List _expireBuffer;
private final Set<PeerState> _expirePeers;
private final List<PeerState> _expireBuffer;
private boolean _alive;
public ExpirePeerEvent() {
_expirePeers = new ArrayList(128);
_expireBuffer = new ArrayList(128);
_expirePeers = new ConcurrentHashSet(128);
_expireBuffer = new ArrayList();
}
public void timeReached() {
// Increase allowed idle time if we are well under allowed connections, otherwise decrease
@@ -2222,10 +2187,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long longInactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT;
long pingCutoff = _context.clock().now() - (2 * 60*60*1000);
_expireBuffer.clear();
synchronized (_expirePeers) {
int sz = _expirePeers.size();
for (int i = 0; i < sz; i++) {
PeerState peer = (PeerState)_expirePeers.get(i);
for (Iterator<PeerState> iter = _expirePeers.iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
long inactivityCutoff;
// if we offered to introduce them, or we used them as introducer in last 2 hours
if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff)
@@ -2234,28 +2198,22 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
inactivityCutoff = shortInactivityCutoff;
if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) {
_expireBuffer.add(peer);
_expirePeers.remove(i);
i--;
sz--;
iter.remove();
}
}
}
for (int i = 0; i < _expireBuffer.size(); i++)
dropPeer((PeerState)_expireBuffer.get(i), false, "idle too long");
dropPeer(_expireBuffer.get(i), false, "idle too long");
_expireBuffer.clear();
if (_alive)
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
}
public void add(PeerState peer) {
synchronized (_expirePeers) {
_expirePeers.add(peer);
}
}
public void remove(PeerState peer) {
synchronized (_expirePeers) {
_expirePeers.remove(peer);
}
}
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
@@ -2263,9 +2221,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
} else {
SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this);
synchronized (_expirePeers) {
_expirePeers.clear();
}
_expirePeers.clear();
}
}
}
@@ -2348,10 +2304,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
PeerState pickTestPeer(RemoteHostId dontInclude) {
List<PeerState> peers = null;
synchronized (_peersByIdent) {
peers = new ArrayList(_peersByIdent.values());
}
List<PeerState> peers = new ArrayList(_peersByIdent.values());
Collections.shuffle(peers, _context.random());
for (int i = 0; i < peers.size(); i++) {
PeerState peer = peers.get(i);