From 40e820cabba06065fc874e9215fc71bfb639a0e3 Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 9 Mar 2010 20:44:46 +0000 Subject: [PATCH] * UDP: - Big refactor of several classes for concurrent, elimination of several locks - Reduce max number of resent acks in a packet to lower overhead - Take incoming messages from the head of the queue, not sure why taking them from the tail "reduces latency" - Java 5 cleanup --- .../i2p/router/transport/udp/ACKSender.java | 141 +++++++---- .../transport/udp/EstablishmentManager.java | 163 +++++------- .../transport/udp/InboundEstablishState.java | 28 +-- .../udp/InboundMessageFragments.java | 6 +- .../transport/udp/InboundMessageState.java | 4 +- .../transport/udp/IntroductionManager.java | 31 +-- .../router/transport/udp/MessageReceiver.java | 65 ++--- .../transport/udp/OutboundEstablishState.java | 46 ++-- .../udp/OutboundMessageFragments.java | 14 +- .../transport/udp/OutboundMessageState.java | 4 +- .../transport/udp/OutboundRefiller.java | 3 +- .../router/transport/udp/PacketBuilder.java | 64 +++-- .../router/transport/udp/PacketHandler.java | 20 +- .../router/transport/udp/PacketPusher.java | 6 +- .../i2p/router/transport/udp/PeerState.java | 232 +++++++++++------- .../router/transport/udp/PeerTestManager.java | 32 +-- .../router/transport/udp/PeerTestState.java | 78 +++--- .../i2p/router/transport/udp/UDPPacket.java | 33 ++- .../router/transport/udp/UDPPacketReader.java | 2 +- .../i2p/router/transport/udp/UDPReceiver.java | 73 +++--- .../i2p/router/transport/udp/UDPSender.java | 128 ++++------ .../router/transport/udp/UDPTransport.java | 125 +++------- 22 files changed, 653 insertions(+), 645 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index ec2078312..180232d6f 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -1,7 +1,10 @@ package net.i2p.router.transport.udp; -import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; @@ -18,8 +21,9 @@ public class ACKSender implements Runnable { private UDPTransport _transport; private PacketBuilder _builder; /** list of peers (PeerState) who we have received data from but not yet ACKed to */ - private final List _peersToACK; + private final BlockingQueue _peersToACK; private boolean _alive; + private static final long POISON_PS = -9999999999l; /** how frequently do we want to send ACKs to a peer? */ static final int ACK_FREQUENCY = 500; @@ -28,7 +32,7 @@ public class ACKSender implements Runnable { _context = ctx; _log = ctx.logManager().getLog(ACKSender.class); _transport = transport; - _peersToACK = new ArrayList(4); + _peersToACK = new LinkedBlockingQueue(); _builder = new PacketBuilder(_context, transport); _alive = true; _context.statManager().createRateStat("udp.sendACKCount", "how many ack messages were sent to a peer", "udp", UDPTransport.RATES); @@ -37,27 +41,34 @@ public class ACKSender implements Runnable { _context.statManager().createRateStat("udp.abortACK", "How often do we schedule up an ACK send only to find it had already been sent (through piggyback)?", "udp", UDPTransport.RATES); } + /** + * Add to the queue. + * For speed, don't check for duplicates here. + * The runner will remove them in its own thread. + */ public void ackPeer(PeerState peer) { - synchronized (_peersToACK) { - if (!_peersToACK.contains(peer)) - _peersToACK.add(peer); - _peersToACK.notifyAll(); - } + if (_alive) + _peersToACK.offer(peer); } public void startup() { _alive = true; - I2PThread t = new I2PThread(this, "UDP ACK sender"); - t.setDaemon(true); + _peersToACK.clear(); + I2PThread t = new I2PThread(this, "UDP ACK sender", true); t.start(); } public void shutdown() { _alive = false; - synchronized (_peersToACK) { - _peersToACK.clear(); - _peersToACK.notifyAll(); + PeerState poison = new PeerState(_context, _transport); + poison.setTheyRelayToUsAs(POISON_PS); + _peersToACK.offer(poison); + for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) { + try { + Thread.sleep(i * 50); + } catch (InterruptedException ie) {} } + _peersToACK.clear(); } private long ackFrequency(long timeSinceACK, long rtt) { @@ -71,47 +82,89 @@ public class ACKSender implements Runnable { } public void run() { + + // we use a Set to strip out dups that come in on the Queue + Set notYet = new HashSet(); while (_alive) { PeerState peer = null; - long now = _context.clock().now(); + long now = 0; long remaining = -1; - try { - synchronized (_peersToACK) { - for (int i = 0; i < _peersToACK.size(); i++) { - PeerState cur = (PeerState)_peersToACK.get(i); - long wanted = cur.getWantedACKSendSince(); - long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now; - if ( ( (wanted > 0) && (delta < 0) ) || (cur.unsentACKThresholdReached()) ) { - _peersToACK.remove(i); - peer = cur; - break; - } - } - - if (peer == null) { - if (_peersToACK.size() <= 0) - _peersToACK.wait(); + long wanted = 0; + + while (_alive) { + // Pull from the queue until we find one ready to ack + // Any that are not ready we will put back on the queue + PeerState cur = null; + try { + if (notYet.isEmpty()) + // wait forever + cur = _peersToACK.take(); else - _peersToACK.wait(50); - } else { - remaining = _peersToACK.size(); - } - } - } catch (InterruptedException ie) {} - + // Don't wait if nothing there, just put everybody back and sleep below + cur = _peersToACK.poll(); + } catch (InterruptedException ie) {} + + if (cur != null) { + if (cur.getTheyRelayToUsAs() == POISON_PS) + return; + wanted = cur.getWantedACKSendSince(); + now = _context.clock().now(); + long delta = wanted + ackFrequency(now-cur.getLastACKSend(), cur.getRTT()) - now; + if (wanted <= 0) { + // it got acked by somebody - discard, remove any dups, and go around again + notYet.remove(cur); + } else if ( (delta <= 0) || (cur.unsentACKThresholdReached()) ) { + // found one to ack + peer = cur; + notYet.remove(cur); // in case a dup + try { + // bulk operations may throw an exception + _peersToACK.addAll(notYet); + } catch (Exception e) {} + notYet.clear(); + break; + } else { + // not yet, go around again + // moving from the Queue to the Set and then back removes duplicates + boolean added = notYet.add(cur); + if (added && _log.shouldLog(Log.DEBUG)) + _log.debug("Pending ACK (delta = " + delta + ") for " + cur); + } + } else if (!notYet.isEmpty()) { + // put them all back and wait a while + try { + // bulk operations may throw an exception + _peersToACK.addAll(notYet); + } catch (Exception e) {} + if (_log.shouldLog(Log.INFO)) + _log.info("sleeping, pending size = " + notYet.size()); + notYet.clear(); + try { + // sleep a little longer than the divided frequency, + // so it will be ready after we circle around a few times + Thread.sleep(5 + (ACK_FREQUENCY / 3)); + } catch (InterruptedException ie) {} + } // else go around again where we will wait at take() + } // inner while() + if (peer != null) { long lastSend = peer.getLastACKSend(); - long wanted = peer.getWantedACKSendSince(); - List ackBitfields = peer.retrieveACKBitfields(false); + // set above before the break + //long wanted = peer.getWantedACKSendSince(); + List ackBitfields = peer.retrieveACKBitfields(false); - if (wanted < 0) - _log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields); + if (wanted < 0) { + if (_log.shouldLog(Log.WARN)) + _log.warn("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields); + continue; + } - if ( (ackBitfields != null) && (ackBitfields.size() > 0) ) { + if ( (ackBitfields != null) && (!ackBitfields.isEmpty()) ) { _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0); if (remaining > 0) _context.statManager().addRateData("udp.sendACKRemaining", remaining, 0); - now = _context.clock().now(); + // set above before the break + //now = _context.clock().now(); if (lastSend < 0) lastSend = now - 1; _context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted); @@ -119,7 +172,7 @@ public class ACKSender implements Runnable { UDPPacket ack = _builder.buildACK(peer, ackBitfields); ack.markType(1); ack.setFragmentCount(-1); - ack.setMessageType(42); + ack.setMessageType(PacketBuilder.TYPE_ACK); if (_log.shouldLog(Log.INFO)) _log.info("Sending ACK for " + ackBitfields); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 3b5938918..91c23b444 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -7,6 +7,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.crypto.DHSessionKeyBuilder; import net.i2p.data.Base64; @@ -37,13 +38,13 @@ public class EstablishmentManager { private UDPTransport _transport; private PacketBuilder _builder; /** map of RemoteHostId to InboundEstablishState */ - private final Map _inboundStates; + private final ConcurrentHashMap _inboundStates; /** map of RemoteHostId to OutboundEstablishState */ - private final Map _outboundStates; + private final ConcurrentHashMap _outboundStates; /** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */ - private final Map _queuedOutbound; + private final ConcurrentHashMap> _queuedOutbound; /** map of nonce (Long) to OutboundEstablishState */ - private final Map _liveIntroductions; + private final ConcurrentHashMap _liveIntroductions; private boolean _alive; private final Object _activityLock; private int _activity; @@ -56,10 +57,10 @@ public class EstablishmentManager { _log = ctx.logManager().getLog(EstablishmentManager.class); _transport = transport; _builder = new PacketBuilder(ctx, transport); - _inboundStates = new HashMap(32); - _outboundStates = new HashMap(32); - _queuedOutbound = new HashMap(32); - _liveIntroductions = new HashMap(32); + _inboundStates = new ConcurrentHashMap(); + _outboundStates = new ConcurrentHashMap(); + _queuedOutbound = new ConcurrentHashMap(); + _liveIntroductions = new ConcurrentHashMap(); _activityLock = new Object(); _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES); @@ -74,8 +75,7 @@ public class EstablishmentManager { public void startup() { _alive = true; - I2PThread t = new I2PThread(new Establisher(), "UDP Establisher"); - t.setDaemon(true); + I2PThread t = new I2PThread(new Establisher(), "UDP Establisher", true); t.start(); } public void shutdown() { @@ -87,21 +87,17 @@ public class EstablishmentManager { * Grab the active establishing state */ InboundEstablishState getInboundState(RemoteHostId from) { - synchronized (_inboundStates) { - InboundEstablishState state = (InboundEstablishState)_inboundStates.get(from); + InboundEstablishState state = _inboundStates.get(from); // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) ) // _log.debug("No inbound states for " + from + ", with remaining: " + _inboundStates); return state; - } } OutboundEstablishState getOutboundState(RemoteHostId from) { - synchronized (_outboundStates) { - OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(from); + OutboundEstablishState state = _outboundStates.get(from); // if ( (state == null) && (_log.shouldLog(Log.DEBUG)) ) // _log.debug("No outbound states for " + from + ", with remaining: " + _outboundStates); return state; - } } private int getMaxConcurrentEstablish() { @@ -163,39 +159,42 @@ public class EstablishmentManager { int deferred = 0; boolean rejected = false; int queueCount = 0; - synchronized (_outboundStates) { - state = (OutboundEstablishState)_outboundStates.get(to); + + state = _outboundStates.get(to); if (state == null) { if (_outboundStates.size() >= getMaxConcurrentEstablish()) { - List queued = (List)_queuedOutbound.get(to); - if (queued == null) { - queued = new ArrayList(1); - if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) { - rejected = true; - } else { - _queuedOutbound.put(to, queued); - } + if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) { + rejected = true; + } else { + List newQueued = new ArrayList(1); + List queued = _queuedOutbound.putIfAbsent(to, newQueued); + if (queued == null) + queued = newQueued; + queueCount = queued.size(); + if (queueCount < MAX_QUEUED_PER_PEER) + queued.add(msg); } - queueCount = queued.size(); - if ( (queueCount < MAX_QUEUED_PER_PEER) && (!rejected) ) - queued.add(msg); deferred = _queuedOutbound.size(); } else { state = new OutboundEstablishState(_context, remAddr, port, msg.getTarget().getIdentity(), new SessionKey(addr.getIntroKey()), addr); - _outboundStates.put(to, state); - SimpleScheduler.getInstance().addEvent(new Expire(to, state), 10*1000); + OutboundEstablishState oldState = _outboundStates.putIfAbsent(to, state); + boolean isNew = oldState == null; + if (!isNew) + // whoops, somebody beat us to it, throw out the state we just created + state = oldState; + else + SimpleScheduler.getInstance().addEvent(new Expire(to, state), 10*1000); } } if (state != null) { state.addMessage(msg); - List queued = (List)_queuedOutbound.remove(to); + List queued = _queuedOutbound.remove(to); if (queued != null) for (int i = 0; i < queued.size(); i++) - state.addMessage((OutNetMessage)queued.get(i)); + state.addMessage(queued.get(i)); } - } if (rejected) { _transport.failed(msg, "Too many pending outbound connections"); @@ -223,17 +222,9 @@ public class EstablishmentManager { _state = state; } public void timeReached() { - Object removed = null; - synchronized (_outboundStates) { - removed = _outboundStates.remove(_to); - if ( (removed != null) && (removed != _state) ) { // oops, we must have failed, then retried - _outboundStates.put(_to, removed); - removed = null; - }/* else { - locked_admitQueued(); - }*/ - } - if (removed != null) { + // remove only if value == state + boolean removed = _outboundStates.remove(_to, _state); + if (removed) { _context.statManager().addRateData("udp.outboundEstablishFailedState", _state.getState(), _state.getLifetime()); if (_log.shouldLog(Log.WARN)) _log.warn("Timing out expired outbound: " + _state); @@ -260,12 +251,11 @@ public class EstablishmentManager { int maxInbound = getMaxInboundEstablishers(); boolean isNew = false; - InboundEstablishState state = null; - synchronized (_inboundStates) { + if (_inboundStates.size() >= maxInbound) return; // drop the packet - state = (InboundEstablishState)_inboundStates.get(from); + InboundEstablishState state = _inboundStates.get(from); if (state == null) { if (_context.blocklist().isBlocklisted(from.getIP())) { if (_log.shouldLog(Log.WARN)) @@ -276,10 +266,13 @@ public class EstablishmentManager { return; // drop the packet state = new InboundEstablishState(_context, from.getIP(), from.getPort(), _transport.getLocalPort()); state.receiveSessionRequest(reader.getSessionRequestReader()); - isNew = true; - _inboundStates.put(from, state); + InboundEstablishState oldState = _inboundStates.putIfAbsent(from, state); + isNew = oldState == null; + if (!isNew) + // whoops, somebody beat us to it, throw out the state we just created + state = oldState; } - } + if (isNew) { // we don't expect inbound connections when hidden, but it could happen // Don't offer if we are approaching max connections. While Relay Intros do not @@ -307,10 +300,7 @@ public class EstablishmentManager { * establishment) */ void receiveSessionConfirmed(RemoteHostId from, UDPPacketReader reader) { - InboundEstablishState state = null; - synchronized (_inboundStates) { - state = (InboundEstablishState)_inboundStates.get(from); - } + InboundEstablishState state = _inboundStates.get(from); if (state != null) { state.receiveSessionConfirmed(reader.getSessionConfirmedReader()); notifyActivity(); @@ -324,10 +314,7 @@ public class EstablishmentManager { * */ void receiveSessionCreated(RemoteHostId from, UDPPacketReader reader) { - OutboundEstablishState state = null; - synchronized (_outboundStates) { - state = (OutboundEstablishState)_outboundStates.get(from); - } + OutboundEstablishState state = _outboundStates.get(from); if (state != null) { state.receiveSessionCreated(reader.getSessionCreatedReader()); notifyActivity(); @@ -346,21 +333,19 @@ public class EstablishmentManager { //int active = 0; //int admitted = 0; //int remaining = 0; - synchronized (_outboundStates) { + //active = _outboundStates.size(); _outboundStates.remove(state.getRemoteHostId()); - if (_queuedOutbound.size() > 0) { // there shouldn't have been queued messages for this active state, but just in case... - List queued = (List)_queuedOutbound.remove(state.getRemoteHostId()); + List queued = _queuedOutbound.remove(state.getRemoteHostId()); if (queued != null) { for (int i = 0; i < queued.size(); i++) - state.addMessage((OutNetMessage)queued.get(i)); + state.addMessage(queued.get(i)); } //admitted = locked_admitQueued(); - } //remaining = _queuedOutbound.size(); - } + //if (admitted > 0) // _log.log(Log.CRIT, "Admitted " + admitted + " with " + remaining + " remaining queued and " + active + " active"); @@ -371,6 +356,7 @@ public class EstablishmentManager { return peer; } +/******** private int locked_admitQueued() { int admitted = 0; while ( (_queuedOutbound.size() > 0) && (_outboundStates.size() < getMaxConcurrentEstablish()) ) { @@ -409,6 +395,7 @@ public class EstablishmentManager { } return admitted; } +*******/ private void notifyActivity() { synchronized (_activityLock) { @@ -596,9 +583,7 @@ public class EstablishmentManager { } catch (DHSessionKeyBuilder.InvalidPublicParameterException ippe) { if (_log.shouldLog(Log.ERROR)) _log.error("Peer " + state.getRemoteHostId() + " sent us an invalid DH parameter (or were spoofed)", ippe); - synchronized (_inboundStates) { - _inboundStates.remove(state.getRemoteHostId()); - } + _inboundStates.remove(state.getRemoteHostId()); return; } _transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey())); @@ -627,14 +612,12 @@ public class EstablishmentManager { private void handlePendingIntro(OutboundEstablishState state) { long nonce = _context.random().nextLong(MAX_NONCE); while (true) { - synchronized (_liveIntroductions) { - OutboundEstablishState old = (OutboundEstablishState)_liveIntroductions.put(new Long(nonce), state); + OutboundEstablishState old = _liveIntroductions.putIfAbsent(new Long(nonce), state); if (old != null) { nonce = _context.random().nextLong(MAX_NONCE); } else { break; } - } } SimpleScheduler.getInstance().addEvent(new FailIntroduction(state, nonce), INTRO_ATTEMPT_TIMEOUT); state.setIntroNonce(nonce); @@ -656,16 +639,9 @@ public class EstablishmentManager { _state = state; } public void timeReached() { - OutboundEstablishState removed = null; - synchronized (_liveIntroductions) { - removed = (OutboundEstablishState)_liveIntroductions.remove(new Long(_nonce)); - if (removed != _state) { - // another one with the same nonce in a very brief time... - _liveIntroductions.put(new Long(_nonce), removed); - removed = null; - } - } - if (removed != null) { + // remove only if value equal to state + boolean removed = _liveIntroductions.remove(new Long(_nonce), _state); + if (removed) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Send intro for " + _state.getRemoteHostId().toString() + " timed out"); _context.statManager().addRateData("udp.sendIntroRelayTimeout", 1, 0); @@ -677,10 +653,7 @@ public class EstablishmentManager { /* FIXME Exporting non-public type through public API FIXME */ public void receiveRelayResponse(RemoteHostId bob, UDPPacketReader reader) { long nonce = reader.getRelayResponseReader().readNonce(); - OutboundEstablishState state = null; - synchronized (_liveIntroductions) { - state = (OutboundEstablishState)_liveIntroductions.remove(new Long(nonce)); - } + OutboundEstablishState state = _liveIntroductions.remove(new Long(nonce)); if (state == null) return; // already established @@ -705,10 +678,8 @@ public class EstablishmentManager { + addr.toString() + ":" + port + " (according to " + bob.toString(true) + ")"); RemoteHostId oldId = state.getRemoteHostId(); state.introduced(addr, ip, port); - synchronized (_outboundStates) { - _outboundStates.remove(oldId); - _outboundStates.put(state.getRemoteHostId(), state); - } + _outboundStates.remove(oldId); + _outboundStates.put(state.getRemoteHostId(), state); notifyActivity(); } @@ -748,11 +719,11 @@ public class EstablishmentManager { long now = _context.clock().now(); long nextSendTime = -1; InboundEstablishState inboundState = null; - synchronized (_inboundStates) { + //if (_log.shouldLog(Log.DEBUG)) // _log.debug("# inbound states: " + _inboundStates.size()); - for (Iterator iter = _inboundStates.values().iterator(); iter.hasNext(); ) { - InboundEstablishState cur = (InboundEstablishState)iter.next(); + for (Iterator iter = _inboundStates.values().iterator(); iter.hasNext(); ) { + InboundEstablishState cur = iter.next(); if (cur.getState() == InboundEstablishState.STATE_CONFIRMED_COMPLETELY) { // completely received (though the signature may be invalid) iter.remove(); @@ -791,7 +762,6 @@ public class EstablishmentManager { } } } - } if (inboundState != null) { //if (_log.shouldLog(Log.DEBUG)) @@ -853,12 +823,12 @@ public class EstablishmentManager { //int admitted = 0; //int remaining = 0; //int active = 0; - synchronized (_outboundStates) { + //active = _outboundStates.size(); //if (_log.shouldLog(Log.DEBUG)) // _log.debug("# outbound states: " + _outboundStates.size()); - for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) { - OutboundEstablishState cur = (OutboundEstablishState)iter.next(); + for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) { + OutboundEstablishState cur = iter.next(); if (cur == null) continue; if (cur.getState() == OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) { // completely received @@ -902,7 +872,6 @@ public class EstablishmentManager { //admitted = locked_admitQueued(); //remaining = _queuedOutbound.size(); - } //if (admitted > 0) // _log.log(Log.CRIT, "Admitted " + admitted + " in push with " + remaining + " remaining queued and " + active + " active"); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java index 46085e3b4..829f62ee0 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java @@ -21,17 +21,17 @@ import net.i2p.util.Log; * */ public class InboundEstablishState { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; // SessionRequest message private byte _receivedX[]; private byte _bobIP[]; - private int _bobPort; + private final int _bobPort; private DHSessionKeyBuilder _keyBuilder; // SessionCreated message private byte _sentY[]; - private byte _aliceIP[]; - private int _alicePort; + private final byte _aliceIP[]; + private final int _alicePort; private long _sentRelayTag; private long _sentSignedOnTime; private SessionKey _sessionKey; @@ -44,11 +44,11 @@ public class InboundEstablishState { private boolean _verificationAttempted; private RouterIdentity _receivedConfirmedIdentity; // general status - private long _establishBegin; - private long _lastReceive; + private final long _establishBegin; + //private long _lastReceive; // private long _lastSend; private long _nextSend; - private RemoteHostId _remoteHostId; + private final RemoteHostId _remoteHostId; private int _currentState; private boolean _complete; @@ -121,9 +121,10 @@ public class InboundEstablishState { public synchronized SessionKey getMACKey() { return _macKey; } /** what IP do they appear to be on? */ - public synchronized byte[] getSentIP() { return _aliceIP; } + public byte[] getSentIP() { return _aliceIP; } + /** what port number do they appear to be coming from? */ - public synchronized int getSentPort() { return _alicePort; } + public int getSentPort() { return _alicePort; } public synchronized byte[] getBobIP() { return _bobIP; } @@ -205,8 +206,8 @@ public class InboundEstablishState { } /** how long have we been trying to establish this session? */ - public synchronized long getLifetime() { return _context.clock().now() - _establishBegin; } - public synchronized long getEstablishBeginTime() { return _establishBegin; } + public long getLifetime() { return _context.clock().now() - _establishBegin; } + public long getEstablishBeginTime() { return _establishBegin; } public synchronized long getNextSendTime() { return _nextSend; } public synchronized void setNextSendTime(long when) { _nextSend = when; } @@ -328,8 +329,7 @@ public class InboundEstablishState { } private void packetReceived() { - _lastReceive = _context.clock().now(); - _nextSend = _lastReceive; + _nextSend = _context.clock().now(); } @Override diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 04c2a3184..ddd44b143 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -96,8 +96,8 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource if (fragments <= 0) return fragments; Hash fromPeer = from.getRemotePeer(); - Map messages = from.getInboundMessages(); - + Map messages = from.getInboundMessages(); + for (int i = 0; i < fragments; i++) { long mid = data.readMessageId(i); Long messageId = new Long(mid); @@ -122,7 +122,7 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource boolean partialACK = false; synchronized (messages) { - state = (InboundMessageState)messages.get(messageId); + state = messages.get(messageId); if (state == null) { state = new InboundMessageState(_context, mid, fromPeer); messages.put(messageId, state); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index 6a1229579..3159cad87 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -172,8 +172,8 @@ public class InboundMessageState { @Override public String toString() { - StringBuilder buf = new StringBuilder(32); - buf.append("Message: ").append(_messageId); + StringBuilder buf = new StringBuilder(256); + buf.append("IB Message: ").append(_messageId); if (isComplete()) { buf.append(" completely received with "); buf.append(getCompleteSize()).append(" bytes"); diff --git a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java index 0fe2f96a0..6a2707f60 100644 --- a/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java +++ b/router/java/src/net/i2p/router/transport/udp/IntroductionManager.java @@ -6,12 +6,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.Base64; import net.i2p.data.RouterAddress; import net.i2p.data.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.router.RouterContext; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -23,17 +26,17 @@ public class IntroductionManager { private UDPTransport _transport; private PacketBuilder _builder; /** map of relay tag to PeerState that should receive the introduction */ - private Map _outbound; + private final Map _outbound; /** list of peers (PeerState) who have given us introduction tags */ - private final List _inbound; + private final Set _inbound; public IntroductionManager(RouterContext ctx, UDPTransport transport) { _context = ctx; _log = ctx.logManager().getLog(IntroductionManager.class); _transport = transport; _builder = new PacketBuilder(ctx, transport); - _outbound = Collections.synchronizedMap(new HashMap(128)); - _inbound = new ArrayList(128); + _outbound = new ConcurrentHashMap(128); + _inbound = new ConcurrentHashSet(128); ctx.statManager().createRateStat("udp.receiveRelayIntro", "How often we get a relayed request for us to talk to someone?", "udp", UDPTransport.RATES); ctx.statManager().createRateStat("udp.receiveRelayRequest", "How often we receive a good request to relay to someone else?", "udp", UDPTransport.RATES); ctx.statManager().createRateStat("udp.receiveRelayRequestBadTag", "Received relay requests with bad/expired tag", "udp", UDPTransport.RATES); @@ -52,10 +55,7 @@ public class IntroductionManager { if (peer.getWeRelayToThemAs() > 0) _outbound.put(new Long(peer.getWeRelayToThemAs()), peer); if (peer.getTheyRelayToUsAs() > 0) { - synchronized (_inbound) { - if (!_inbound.contains(peer)) _inbound.add(peer); - } } } @@ -67,9 +67,7 @@ public class IntroductionManager { if (peer.getWeRelayToThemAs() > 0) _outbound.remove(new Long(peer.getWeRelayToThemAs())); if (peer.getTheyRelayToUsAs() > 0) { - synchronized (_inbound) { - _inbound.remove(peer); - } + _inbound.remove(peer); } } @@ -90,14 +88,11 @@ public class IntroductionManager { * and we want to keep our introducers valid. */ public int pickInbound(Properties ssuOptions, int howMany) { - List peers = null; int start = _context.random().nextInt(Integer.MAX_VALUE); - synchronized (_inbound) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Picking inbound out of " + _inbound.size()); - if (_inbound.size() <= 0) return 0; - peers = new ArrayList(_inbound); - } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Picking inbound out of " + _inbound.size()); + if (_inbound.isEmpty()) return 0; + List peers = new ArrayList(_inbound); int sz = peers.size(); start = start % sz; int found = 0; @@ -164,9 +159,7 @@ public class IntroductionManager { * @return number of peers that have volunteerd to introduce us */ int introducerCount() { - synchronized(_inbound) { return _inbound.size(); - } } void receiveRelayIntro(RemoteHostId bob, UDPPacketReader reader) { diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index d7f0421e3..39b4163e7 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -1,7 +1,7 @@ package net.i2p.router.transport.udp; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.Base64; import net.i2p.data.ByteArray; @@ -24,15 +24,17 @@ public class MessageReceiver { private Log _log; private UDPTransport _transport; /** list of messages (InboundMessageState) fully received but not interpreted yet */ - private final List _completeMessages; + private final BlockingQueue _completeMessages; private boolean _alive; private ByteCache _cache; + private static final int THREADS = 5; + private static final long POISON_IMS = -99999999999l; public MessageReceiver(RouterContext ctx, UDPTransport transport) { _context = ctx; _log = ctx.logManager().getLog(MessageReceiver.class); _transport = transport; - _completeMessages = new ArrayList(16); + _completeMessages = new LinkedBlockingQueue(); _cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE); _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES); @@ -46,9 +48,8 @@ public class MessageReceiver { public void startup() { _alive = true; - for (int i = 0; i < 5; i++) { - I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i); - t.setDaemon(true); + for (int i = 0; i < THREADS; i++) { + I2PThread t = new I2PThread(new Runner(), "UDP message receiver " + i + '/' + THREADS, true); t.start(); } } @@ -61,26 +62,31 @@ public class MessageReceiver { public void shutdown() { _alive = false; - synchronized (_completeMessages) { - _completeMessages.clear(); - _completeMessages.notifyAll(); + _completeMessages.clear(); + for (int i = 0; i < THREADS; i++) { + InboundMessageState ims = new InboundMessageState(_context, POISON_IMS, null); + _completeMessages.offer(ims); } + for (int i = 1; i <= 5 && !_completeMessages.isEmpty(); i++) { + try { + Thread.sleep(i * 50); + } catch (InterruptedException ie) {} + } + _completeMessages.clear(); } public void receiveMessage(InboundMessageState state) { - int total = 0; - long lag = -1; - synchronized (_completeMessages) { - _completeMessages.add(state); - total = _completeMessages.size(); - if (total > 1) - lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime(); - _completeMessages.notifyAll(); - } - if (total > 1) - _context.statManager().addRateData("udp.inboundReady", total, 0); - if (lag > 1000) - _context.statManager().addRateData("udp.inboundLag", lag, total); + //int total = 0; + //long lag = -1; + if (_alive) + _completeMessages.offer(state); + //total = _completeMessages.size(); + //if (total > 1) + // lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime(); + //if (total > 1) + // _context.statManager().addRateData("udp.inboundReady", total, 0); + //if (lag > 1000) + // _context.statManager().addRateData("udp.inboundLag", lag, total); } public void loop(I2NPMessageHandler handler) { @@ -91,19 +97,18 @@ public class MessageReceiver { long expiredLifetime = 0; int remaining = 0; try { - synchronized (_completeMessages) { while (message == null) { - if (_completeMessages.size() > 0) // grab the tail for lowest latency - message = (InboundMessageState)_completeMessages.remove(_completeMessages.size()-1); - else - _completeMessages.wait(5000); + message = _completeMessages.take(); + if ( (message != null) && (message.getMessageId() == POISON_IMS) ) { + message = null; + break; + } if ( (message != null) && (message.isExpired()) ) { expiredLifetime += message.getLifetime(); message = null; expired++; } - remaining = _completeMessages.size(); - } + //remaining = _completeMessages.size(); } } catch (InterruptedException ie) {} diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index ece42da7a..fa01bc026 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -1,8 +1,8 @@ package net.i2p.router.transport.udp; import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.crypto.DHSessionKeyBuilder; import net.i2p.data.Base64; @@ -22,8 +22,8 @@ import net.i2p.util.Log; * */ public class OutboundEstablishState { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; // SessionRequest message private byte _sentX[]; private byte _bobIP[]; @@ -44,18 +44,18 @@ public class OutboundEstablishState { private long _sentSignedOnTime; private Signature _sentSignature; // general status - private long _establishBegin; - private long _lastReceive; + private final long _establishBegin; + //private long _lastReceive; private long _lastSend; private long _nextSend; private RemoteHostId _remoteHostId; - private RouterIdentity _remotePeer; + private final RouterIdentity _remotePeer; private SessionKey _introKey; - private final List _queuedMessages; + private final Queue _queuedMessages; private int _currentState; private long _introductionNonce; // intro - private UDPAddress _remoteAddress; + private final UDPAddress _remoteAddress; private boolean _complete; /** nothin sent yet */ @@ -87,7 +87,7 @@ public class OutboundEstablishState { _remotePeer = remotePeer; _introKey = introKey; _keyBuilder = null; - _queuedMessages = new ArrayList(4); + _queuedMessages = new LinkedBlockingQueue(); _currentState = STATE_UNKNOWN; _establishBegin = ctx.clock().now(); _remoteAddress = addr; @@ -113,22 +113,21 @@ public class OutboundEstablishState { public long getIntroNonce() { return _introductionNonce; } public void addMessage(OutNetMessage msg) { - synchronized (_queuedMessages) { - if (!_queuedMessages.contains(msg)) - _queuedMessages.add(msg); - } + // chance of a duplicate here in a race, that's ok + if (!_queuedMessages.contains(msg)) + _queuedMessages.offer(msg); + else if (_log.shouldLog(Log.WARN)) + _log.warn("attempt to add duplicate msg to queue: " + msg); } + public OutNetMessage getNextQueuedMessage() { - synchronized (_queuedMessages) { - if (_queuedMessages.size() > 0) - return (OutNetMessage)_queuedMessages.remove(0); - } - return null; + return _queuedMessages.poll(); } public RouterIdentity getRemoteIdentity() { return _remotePeer; } public SessionKey getIntroKey() { return _introKey; } + /** called from constructor, no need to synch */ private void prepareSessionRequest() { _keyBuilder = new DHSessionKeyBuilder(); byte X[] = _keyBuilder.getMyPublicValue().toByteArray(); @@ -142,7 +141,7 @@ public class OutboundEstablishState { System.arraycopy(X, 0, _sentX, _sentX.length - X.length, X.length); } - public synchronized byte[] getSentX() { return _sentX; } + public byte[] getSentX() { return _sentX; } public synchronized byte[] getSentIP() { return _bobIP; } public synchronized int getSentPort() { return _bobPort; } @@ -403,8 +402,8 @@ public class OutboundEstablishState { } /** how long have we been trying to establish this session? */ - public synchronized long getLifetime() { return _context.clock().now() - _establishBegin; } - public synchronized long getEstablishBeginTime() { return _establishBegin; } + public long getLifetime() { return _context.clock().now() - _establishBegin; } + public long getEstablishBeginTime() { return _establishBegin; } public synchronized long getNextSendTime() { return _nextSend; } public synchronized void setNextSendTime(long when) { _nextSend = when; @@ -422,8 +421,7 @@ public class OutboundEstablishState { } private void packetReceived() { - _lastReceive = _context.clock().now(); - _nextSend = _lastReceive; + _nextSend = _context.clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Got a packet, nextSend == now"); } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index f1d10e122..8c1f6904b 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -28,7 +28,7 @@ public class OutboundMessageFragments { private UDPTransport _transport; // private ActiveThrottle _throttle; // LINT not used ?? /** peers we are actively sending messages to */ - private final List _activePeers; + private final List _activePeers; private boolean _alive; /** which peer should we build the next packet out of? */ private int _nextPeer; @@ -207,7 +207,7 @@ public class OutboundMessageFragments { synchronized (_activePeers) { peers = new ArrayList(_activePeers.size()); for (int i = 0; i < _activePeers.size(); i++) { - PeerState state = (PeerState)_activePeers.get(i); + PeerState state = _activePeers.get(i); if (state.getOutboundMessageCount() <= 0) { _activePeers.remove(i); i--; @@ -255,7 +255,7 @@ public class OutboundMessageFragments { if (cycleTime > 1000) _context.statManager().addRateData("udp.sendCycleTimeSlow", cycleTime, _activePeers.size()); } - peer = (PeerState)_activePeers.get(i); + peer = _activePeers.get(i); state = peer.allocateSend(); if (state != null) { _nextPeer = i + 1; @@ -318,12 +318,12 @@ public class OutboundMessageFragments { return null; // ok, simplest possible thing is to always tack on the bitfields if - List msgIds = peer.getCurrentFullACKs(); + List msgIds = peer.getCurrentFullACKs(); if (msgIds == null) msgIds = new ArrayList(); - List partialACKBitfields = new ArrayList(); + List partialACKBitfields = new ArrayList(); peer.fetchPartialACKs(partialACKBitfields); int piggybackedPartialACK = partialACKBitfields.size(); - List remaining = new ArrayList(msgIds); + List remaining = new ArrayList(msgIds); int sparseCount = 0; UDPPacket rv[] = new UDPPacket[fragments]; //sparse for (int i = 0; i < fragments; i++) { @@ -356,7 +356,7 @@ public class OutboundMessageFragments { int piggybackedAck = 0; if (msgIds.size() != remaining.size()) { for (int i = 0; i < msgIds.size(); i++) { - Long id = (Long)msgIds.get(i); + Long id = msgIds.get(i); if (!remaining.contains(id)) { peer.removeACKMessage(id); piggybackedAck++; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 1a9a9c590..877ad10b1 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -342,8 +342,8 @@ public class OutboundMessageState { public String toString() { short sends[] = _fragmentSends; ByteArray messageBuf = _messageBuf; - StringBuilder buf = new StringBuilder(64); - buf.append("Message ").append(_messageId); + StringBuilder buf = new StringBuilder(256); + buf.append("OB Message ").append(_messageId); if (sends != null) buf.append(" with ").append(sends.length).append(" fragments"); if (messageBuf != null) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java b/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java index b2136ac8d..841588ee4 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java @@ -31,8 +31,7 @@ public class OutboundRefiller implements Runnable { public void startup() { _alive = true; - I2PThread t = new I2PThread(this, "UDP outbound refiller"); - t.setDaemon(true); + I2PThread t = new I2PThread(this, "UDP outbound refiller", true); t.start(); } public void shutdown() { _alive = false; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index 75258791e..26b82e667 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -2,8 +2,8 @@ package net.i2p.router.transport.udp; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.List; @@ -32,6 +32,25 @@ public class PacketBuilder { private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH); private static final ByteCache _blockCache = ByteCache.getInstance(64, 16); + /** + * For debugging and stats only - does not go out on the wire. + * These are chosen to be higher than the highest I2NP message type, + * as a data packet is set to the underlying I2NP message type. + */ + static final int TYPE_FIRST = 42; + static final int TYPE_ACK = TYPE_FIRST; + static final int TYPE_PUNCH = 43; + static final int TYPE_RESP = 44; + static final int TYPE_INTRO = 45; + static final int TYPE_RREQ = 46; + static final int TYPE_TCB = 47; + static final int TYPE_TBC = 48; + static final int TYPE_TTA = 49; + static final int TYPE_TFA = 50; + static final int TYPE_CONF = 51; + static final int TYPE_SREQ = 52; + static final int TYPE_CREAT = 53; + /** we only talk to people of the right version */ static final int PROTOCOL_VERSION = 0; @@ -58,7 +77,7 @@ public class PacketBuilder { * The list itself is passed by reference, and if a messageId is * included, it should be removed from the list. */ - public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) { + public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) { UDPPacket packet = UDPPacket.acquire(_context, false); StringBuilder msg = null; @@ -92,18 +111,18 @@ public class PacketBuilder { // is under the MTU, but for now, since the # of packets acked is so few (usually // just one or two), and since the packets are so small anyway, an additional five // or ten bytes doesn't hurt. - if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) + if ( (ackIdsRemaining != null) && (!ackIdsRemaining.isEmpty()) ) data[off] |= UDPPacket.DATA_FLAG_EXPLICIT_ACK; - if ( (partialACKsRemaining != null) && (partialACKsRemaining.size() > 0) ) + if ( (partialACKsRemaining != null) && (!partialACKsRemaining.isEmpty()) ) data[off] |= UDPPacket.DATA_FLAG_ACK_BITFIELDS; off++; - if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) { + if ( (ackIdsRemaining != null) && (!ackIdsRemaining.isEmpty()) ) { DataHelper.toLong(data, off, 1, ackIdsRemaining.size()); off++; for (int i = 0; i < ackIdsRemaining.size(); i++) { //while (ackIdsRemaining.size() > 0) { - Long ackId = (Long)ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0); + Long ackId = ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0); DataHelper.toLong(data, off, 4, ackId.longValue()); off += 4; if (msg != null) // logging it @@ -118,7 +137,7 @@ public class PacketBuilder { // leave it blank for now, since we could skip some off++; for (int i = 0; i < partialACKsRemaining.size(); i++) { - ACKBitfield bitfield = (ACKBitfield)partialACKsRemaining.get(i); + ACKBitfield bitfield = partialACKsRemaining.get(i); if (bitfield.receivedComplete()) continue; DataHelper.toLong(data, off, 4, bitfield.getMessageId()); off += 4; @@ -214,15 +233,18 @@ public class PacketBuilder { // We use this for keepalive purposes. // It doesn't generate a reply, but that's ok. public UDPPacket buildPing(PeerState peer) { - return buildACK(peer, new ArrayList(0)); + return buildACK(peer, Collections.EMPTY_LIST); } private static final int ACK_PRIORITY = 1; /** + * Build the ack packet. The list need not be sorted into full and partial; + * this method will put all fulls before the partials in the outgoing packet. + * * @param ackBitfields list of ACKBitfield instances to either fully or partially ACK */ - public UDPPacket buildACK(PeerState peer, List ackBitfields) { + public UDPPacket buildACK(PeerState peer, List ackBitfields) { UDPPacket packet = UDPPacket.acquire(_context, false); StringBuilder msg = null; @@ -263,7 +285,7 @@ public class PacketBuilder { DataHelper.toLong(data, off, 1, fullACKCount); off++; for (int i = 0; i < ackBitfields.size(); i++) { - ACKBitfield bf = (ACKBitfield)ackBitfields.get(i); + ACKBitfield bf = ackBitfields.get(i); if (bf.receivedComplete()) { DataHelper.toLong(data, off, 4, bf.getMessageId()); off += 4; @@ -415,7 +437,7 @@ public class PacketBuilder { authenticate(packet, ourIntroKey, ourIntroKey, iv); setTo(packet, to, state.getSentPort()); _ivCache.release(iv); - packet.setMessageType(53); + packet.setMessageType(TYPE_CREAT); return packet; } @@ -479,7 +501,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, state.getIntroKey(), state.getIntroKey()); setTo(packet, to, state.getSentPort()); - packet.setMessageType(52); + packet.setMessageType(TYPE_SREQ); return packet; } @@ -586,7 +608,7 @@ public class PacketBuilder { } setTo(packet, to, state.getSentPort()); - packet.setMessageType(51); + packet.setMessageType(TYPE_CONF); return packet; } @@ -639,7 +661,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, toCipherKey, toMACKey); setTo(packet, toIP, toPort); - packet.setMessageType(50); + packet.setMessageType(TYPE_TFA); return packet; } @@ -684,7 +706,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, aliceIntroKey, aliceIntroKey); setTo(packet, aliceIP, alicePort); - packet.setMessageType(49); + packet.setMessageType(TYPE_TTA); return packet; } @@ -731,7 +753,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, charlieCipherKey, charlieMACKey); setTo(packet, charlieIP, charliePort); - packet.setMessageType(48); + packet.setMessageType(TYPE_TBC); return packet; } @@ -776,7 +798,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, bobCipherKey, bobMACKey); setTo(packet, bobIP, bobPort); - packet.setMessageType(47); + packet.setMessageType(TYPE_TCB); return packet; } @@ -875,7 +897,7 @@ public class PacketBuilder { if (encrypt) authenticate(packet, new SessionKey(introKey), new SessionKey(introKey)); setTo(packet, introHost, introPort); - packet.setMessageType(46); + packet.setMessageType(TYPE_RREQ); return packet; } @@ -925,7 +947,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, charlie.getCurrentCipherKey(), charlie.getCurrentMACKey()); setTo(packet, charlie.getRemoteIPAddress(), charlie.getRemotePort()); - packet.setMessageType(45); + packet.setMessageType(TYPE_INTRO); return packet; } @@ -986,7 +1008,7 @@ public class PacketBuilder { packet.getPacket().setLength(off); authenticate(packet, aliceIntroKey, aliceIntroKey); setTo(packet, aliceAddr, alice.getPort()); - packet.setMessageType(44); + packet.setMessageType(TYPE_RESP); return packet; } @@ -1019,7 +1041,7 @@ public class PacketBuilder { packet.getPacket().setLength(0); setTo(packet, to, port); - packet.setMessageType(43); + packet.setMessageType(TYPE_PUNCH); return packet; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index e32530809..1a35c5d19 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -1,8 +1,6 @@ package net.i2p.router.transport.udp; -import java.util.ArrayList; import java.util.Date; -import java.util.List; import net.i2p.router.Router; import net.i2p.router.RouterContext; @@ -31,7 +29,7 @@ public class PacketHandler { private PeerTestManager _testManager; private IntroductionManager _introManager; private boolean _keepReading; - private List _handlers; + private final Handler[] _handlers; private static final int NUM_HANDLERS = 5; /** let packets be up to 30s slow */ @@ -46,9 +44,9 @@ public class PacketHandler { _inbound = inbound; _testManager = testManager; _introManager = introManager; - _handlers = new ArrayList(NUM_HANDLERS); + _handlers = new Handler[NUM_HANDLERS]; for (int i = 0; i < NUM_HANDLERS; i++) { - _handlers.add(new Handler()); + _handlers[i] = new Handler(); } _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", UDPTransport.RATES); @@ -81,9 +79,8 @@ public class PacketHandler { public void startup() { _keepReading = true; - for (int i = 0; i < _handlers.size(); i++) { - I2PThread t = new I2PThread((Handler)_handlers.get(i), "UDP Packet handler " + i + "/" + _handlers.size()); - t.setDaemon(true); + for (int i = 0; i < NUM_HANDLERS; i++) { + I2PThread t = new I2PThread(_handlers[i], "UDP Packet handler " + i + '/' + NUM_HANDLERS, true); t.start(); } } @@ -94,10 +91,9 @@ public class PacketHandler { String getHandlerStatus() { StringBuilder rv = new StringBuilder(); - int size = _handlers.size(); - rv.append("Handlers: ").append(size); - for (int i = 0; i < size; i++) { - Handler handler = (Handler)_handlers.get(i); + rv.append("Handlers: ").append(NUM_HANDLERS); + for (int i = 0; i < NUM_HANDLERS; i++) { + Handler handler = _handlers[i]; rv.append(" handler ").append(i).append(" state: ").append(handler._state); } return rv.toString(); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 582eed1c3..271e83597 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -25,8 +25,7 @@ public class PacketPusher implements Runnable { public void startup() { _alive = true; - I2PThread t = new I2PThread(this, "UDP packet pusher"); - t.setDaemon(true); + I2PThread t = new I2PThread(this, "UDP packet pusher", true); t.start(); } @@ -39,7 +38,8 @@ public class PacketPusher implements Runnable { if (packets != null) { for (int i = 0; i < packets.length; i++) { if (packets[i] != null) // null for ACKed fragments - _sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms + //_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms + _sender.add(packets[i]); } } } catch (Exception e) { diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index f36e2d95c..06e528a94 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -8,12 +8,16 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.util.Log; +import net.i2p.util.ConcurrentHashSet; /** * Contain all of the state about a UDP connection to a peer. @@ -73,14 +77,22 @@ public class PeerState { private int _consecutiveFailedSends; /** when did we last have a failed send (beginning of period) */ // private long _lastFailedSendPeriod; - /** list of messageIds (Long) that we have received but not yet sent */ - private final List _currentACKs; + + /** + * Set of messageIds (Long) that we have received but not yet sent + * Since even with the smallest MTU we can fit 131 acks in a message, + * we are unlikely to get backed up on acks, so we don't keep + * them in any particular order. + */ + private final Set _currentACKs; + /** * list of the most recent messageIds (Long) that we have received and sent * an ACK for. We keep a few of these around to retransmit with _currentACKs, * hopefully saving some spurious retransmissions */ - private final List _currentACKsResend; + private final Queue _currentACKsResend; + /** when did we last send ACKs to the peer? */ private volatile long _lastACKSend; /** when did we decide we need to ACK to this peer? */ @@ -169,9 +181,9 @@ public class PeerState { private long _packetsReceived; /** list of InboundMessageState for active message */ - private final Map _inboundMessages; + private final Map _inboundMessages; /** list of OutboundMessageState */ - private final List _outboundMessages; + private final List _outboundMessages; /** which outbound message is currently being retransmitted */ private OutboundMessageState _retransmitter; @@ -180,8 +192,10 @@ public class PeerState { /** have we migrated away from this peer to another newer one? */ private volatile boolean _dead; + /** Make sure a 4229 byte TunnelBuildMessage can be sent in one volley with small MTU */ + private static final int MIN_CONCURRENT_MSGS = 8; /** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */ - private volatile int _concurrentMessagesAllowed = 8; + private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS; /** * how many outbound messages are currently being transmitted. Not thread safe, as we're not strict */ @@ -203,6 +217,11 @@ public class PeerState { * we need 522 fragment bytes to fit it in 2 packets - add 46 for SSU, 20 * for UDP, and 8 for IP, giving us 596. round up to mod 16, giving a total * of 608 + * + * Well, we really need to count the acks as well, especially + * 4 * MAX_RESEND_ACKS which can take up a significant amount of space. + * We reduce the max acks when using the small MTU but it may not be enough... + * */ private static final int MIN_MTU = 608;//600; //1500; private static final int DEFAULT_MTU = MIN_MTU; @@ -234,8 +253,8 @@ public class PeerState { _currentReceiveSecond = -1; _lastSendTime = -1; _lastReceiveTime = -1; - _currentACKs = new ArrayList(8); - _currentACKsResend = new ArrayList(8); + _currentACKs = new ConcurrentHashSet(); + _currentACKsResend = new LinkedBlockingQueue(); _currentSecondECNReceived = false; _remoteWantsPreviousACKs = false; _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; @@ -582,12 +601,9 @@ public class PeerState { _context.statManager().addRateData("udp.receiveBps", _receiveBps, 0); } - synchronized (_currentACKs) { - if (_wantACKSendSince <= 0) - _wantACKSendSince = now; - if (!_currentACKs.contains(messageId)) - _currentACKs.add(messageId); - } + if (_wantACKSendSince <= 0) + _wantACKSendSince = now; + _currentACKs.add(messageId); _messagesReceived++; } @@ -600,7 +616,8 @@ public class PeerState { * Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages. * Access to this map must be synchronized explicitly! */ - public Map getInboundMessages() { return _inboundMessages; } + public Map getInboundMessages() { return _inboundMessages; } + /** * Expire partially received inbound messages, returning how many are still pending. * This should probably be fired periodically, in case a peer goes silent and we don't @@ -661,26 +678,36 @@ public class PeerState { * removeACKMessage(Long) should be called. * */ - public List getCurrentFullACKs() { - synchronized (_currentACKs) { - ArrayList rv = new ArrayList(_currentACKs); + public List getCurrentFullACKs() { + ArrayList rv = new ArrayList(_currentACKs); // include some for retransmission rv.addAll(_currentACKsResend); return rv; - } } + public void removeACKMessage(Long messageId) { - synchronized (_currentACKs) { _currentACKs.remove(messageId); - _currentACKsResend.add(messageId); + _currentACKsResend.offer(messageId); // trim down the resends while (_currentACKsResend.size() > MAX_RESEND_ACKS) - _currentACKsResend.remove(0); - } - _lastACKSend = _context.clock().now(); + _currentACKsResend.poll(); + _lastACKSend = _context.clock().now(); } + /** + * The max number of acks we save to send as duplicates + */ private static final int MAX_RESEND_ACKS = 16; + /** + * The number of duplicate acks sent in each messge - + * Warning, this directly affects network overhead + * Was 16 but that's too much (64 bytes in a max 608 byte packet, + * and often much smaller) + * @since 0.7.13 + */ + private static final int MAX_RESEND_ACKS_LARGE = 9; + /** for small MTU */ + private static final int MAX_RESEND_ACKS_SMALL = 4; /** * grab a list of ACKBitfield instances, some of which may fully @@ -691,51 +718,75 @@ public class PeerState { * will be unchanged if there are ACKs remaining. * */ - public List retrieveACKBitfields() { return retrieveACKBitfields(true); } - public List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { - List rv = null; + public List retrieveACKBitfields() { return retrieveACKBitfields(true); } + + public List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { + List rv = new ArrayList(MAX_RESEND_ACKS); int bytesRemaining = countMaxACKData(); - synchronized (_currentACKs) { - rv = new ArrayList(16); //_currentACKs.size()); - int oldIndex = _currentACKsResend.size(); - while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { - Long val = (Long)_currentACKs.remove(0); + + // Limit the overhead of all the resent acks when using small MTU + // 64 bytes in a 608-byte packet is too much... + // Send a random subset of all the queued resend acks. + int resendSize = _currentACKsResend.size(); + int maxResendAcks; + if (bytesRemaining < MIN_MTU) + maxResendAcks = MAX_RESEND_ACKS_SMALL; + else + maxResendAcks = MAX_RESEND_ACKS_LARGE; + List randomResends = new ArrayList(_currentACKsResend); + + // As explained above, we include the acks in any order + // since we are unlikely to get backed up - + // just take them using the Set iterator. + Iterator iter = _currentACKs.iterator(); + while (bytesRemaining >= 4 && iter.hasNext()) { + Long val = iter.next(); + iter.remove(); long id = val.longValue(); rv.add(new FullACKBitfield(id)); - _currentACKsResend.add(val); + _currentACKsResend.offer(val); bytesRemaining -= 4; } - if (_currentACKs.size() <= 0) + if (_currentACKs.isEmpty()) _wantACKSendSince = -1; if (alwaysIncludeRetransmissions || rv.size() > 0) { // now repeat by putting in some old ACKs - for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) { - Long cur = (Long)_currentACKsResend.get(i); + // randomly selected from the Resend queue. + // Maybe we should only resend each one a certain number of times... + int oldIndex = Math.min(resendSize, maxResendAcks); + if (oldIndex > 0 && oldIndex < resendSize) + Collections.shuffle(randomResends, _context.random()); + iter = randomResends.iterator(); + while (bytesRemaining >= 4 && oldIndex-- > 0 && iter.hasNext()) { + Long cur = iter.next(); long c = cur.longValue(); FullACKBitfield bf = new FullACKBitfield(c); - rv.add(bf); - bytesRemaining -= 4; + // try to avoid duplicates ?? + // ACKsResend is not checked for dups at add time + //if (rv.contains(bf)) { + // iter.remove(); + //} else { + rv.add(bf); + bytesRemaining -= 4; + //} } } // trim down the resends while (_currentACKsResend.size() > MAX_RESEND_ACKS) - _currentACKsResend.remove(0); - } + _currentACKsResend.poll(); int partialIncluded = 0; if (bytesRemaining > 4) { // ok, there's room to *try* to fit in some partial ACKs, so // we should try to find some packets to partially ACK // (preferably the ones which have the most received fragments) - List partial = new ArrayList(); + List partial = new ArrayList(); fetchPartialACKs(partial); // we may not be able to use them all, but lets try... for (int i = 0; (bytesRemaining > 4) && (i < partial.size()); i++) { - ACKBitfield bitfield = (ACKBitfield)partial.get(i); + ACKBitfield bitfield = partial.get(i); int bytes = (bitfield.fragmentCount() / 7) + 1; if (bytesRemaining > bytes + 4) { // msgId + bitfields - if (rv == null) - rv = new ArrayList(partial.size()); rv.add(bitfield); bytesRemaining -= bytes + 4; partialIncluded++; @@ -754,7 +805,7 @@ public class PeerState { return rv; } - void fetchPartialACKs(List rv) { + void fetchPartialACKs(List rv) { InboundMessageState states[] = null; int curState = 0; synchronized (_inboundMessages) { @@ -762,9 +813,8 @@ public class PeerState { if (numMessages <= 0) return; // todo: make this a list instead of a map, so we can iterate faster w/out the memory overhead? - int remaining = _inboundMessages.size(); - for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) { - InboundMessageState state = (InboundMessageState)iter.next(); + for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) { + InboundMessageState state = iter.next(); if (state.isExpired()) { //if (_context instanceof RouterContext) // ((RouterContext)_context).messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired partially received: " + state.toString()); @@ -795,6 +845,13 @@ public class PeerState { public boolean received(int fragmentNum) { return true; } public boolean receivedComplete() { return true; } @Override + public int hashCode() { return (int) _msgId; } + @Override + public boolean equals(Object o) { + if (!(o instanceof FullACKBitfield)) return false; + return _msgId == ((ACKBitfield)o).getMessageId(); + } + @Override public String toString() { return "Full ACK of " + _msgId; } } @@ -825,8 +882,8 @@ public class PeerState { } } else { int allow = _concurrentMessagesAllowed - 1; - if (allow < 8) - allow = 8; + if (allow < MIN_CONCURRENT_MSGS) + allow = MIN_CONCURRENT_MSGS; _concurrentMessagesAllowed = allow; } if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES) @@ -977,10 +1034,10 @@ public class PeerState { public long getWantedACKSendSince() { return _wantACKSendSince; } public boolean unsentACKThresholdReached() { int threshold = countMaxACKData() / 4; - synchronized (_currentACKs) { - return _currentACKs.size() >= threshold; - } + return _currentACKs.size() >= threshold; } + + /** @return MTU - 83 */ private int countMaxACKData() { return _mtu - IP_HEADER_SIZE @@ -1013,7 +1070,7 @@ public class PeerState { state.setPeer(this); if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); - List msgs = _outboundMessages; + List msgs = _outboundMessages; if (msgs == null) return 0; int rv = 0; boolean fail = false; @@ -1068,12 +1125,12 @@ public class PeerState { public void dropOutbound() { //if (_dead) return; _dead = true; - List msgs = _outboundMessages; + List msgs = _outboundMessages; //_outboundMessages = null; _retransmitter = null; if (msgs != null) { int sz = 0; - List tempList = null; + List tempList = null; synchronized (msgs) { sz = msgs.size(); if (sz > 0) { @@ -1082,12 +1139,14 @@ public class PeerState { } } for (int i = 0; i < sz; i++) - _transport.failed((OutboundMessageState)tempList.get(i), false); + _transport.failed(tempList.get(i), false); } + // so the ACKSender will drop this peer from its queue + _wantACKSendSince = -1; } public int getOutboundMessageCount() { - List msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return 0; if (msgs != null) { synchronized (msgs) { @@ -1104,17 +1163,17 @@ public class PeerState { */ public int finishMessages() { int rv = 0; - List msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) { dropOutbound(); return 0; } - List succeeded = null; - List failed = null; + List succeeded = null; + List failed = null; synchronized (msgs) { int size = msgs.size(); for (int i = 0; i < size; i++) { - OutboundMessageState state = (OutboundMessageState)msgs.get(i); + OutboundMessageState state = msgs.get(i); if (state.isComplete()) { msgs.remove(i); i--; @@ -1147,7 +1206,7 @@ public class PeerState { } for (int i = 0; succeeded != null && i < succeeded.size(); i++) { - OutboundMessageState state = (OutboundMessageState)succeeded.get(i); + OutboundMessageState state = succeeded.get(i); _transport.succeeded(state); state.releaseResources(); OutNetMessage msg = state.getMessage(); @@ -1156,7 +1215,7 @@ public class PeerState { } for (int i = 0; failed != null && i < failed.size(); i++) { - OutboundMessageState state = (OutboundMessageState)failed.get(i); + OutboundMessageState state = failed.get(i); OutNetMessage msg = state.getMessage(); if (msg != null) { msg.timestamp("expired in the active pool"); @@ -1180,12 +1239,12 @@ public class PeerState { */ public OutboundMessageState allocateSend() { int total = 0; - List msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return null; synchronized (msgs) { int size = msgs.size(); for (int i = 0; i < size; i++) { - OutboundMessageState state = (OutboundMessageState)msgs.get(i); + OutboundMessageState state = msgs.get(i); if (locked_shouldSend(state)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); @@ -1217,7 +1276,7 @@ public class PeerState { public int getNextDelay() { int rv = -1; long now = _context.clock().now(); - List msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return -1; synchronized (msgs) { if (_retransmitter != null) { @@ -1229,7 +1288,7 @@ public class PeerState { } int size = msgs.size(); for (int i = 0; i < size; i++) { - OutboundMessageState state = (OutboundMessageState)msgs.get(i); + OutboundMessageState state = msgs.get(i); int delay = (int)(state.getNextSendTime() - now); if (delay <= 0) delay = 1; @@ -1346,12 +1405,12 @@ public class PeerState { public int acked(long messageId) { OutboundMessageState state = null; - List msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return 0; synchronized (msgs) { int sz = msgs.size(); for (int i = 0; i < sz; i++) { - state = (OutboundMessageState)msgs.get(i); + state = msgs.get(i); if (state.getMessageId() == messageId) { msgs.remove(i); break; @@ -1407,13 +1466,13 @@ public class PeerState { return; } - List msgs = _outboundMessages; + List msgs = _outboundMessages; OutboundMessageState state = null; boolean isComplete = false; synchronized (msgs) { for (int i = 0; i < msgs.size(); i++) { - state = (OutboundMessageState)msgs.get(i); + state = msgs.get(i); if (state.getMessageId() == bitfield.getMessageId()) { boolean complete = state.acked(bitfield); if (complete) { @@ -1486,26 +1545,23 @@ public class PeerState { _sendWindowBytes = oldPeer._sendWindowBytes; oldPeer._dead = true; - List tmp = new ArrayList(); - synchronized (oldPeer._currentACKs) { - tmp.addAll(oldPeer._currentACKs); - oldPeer._currentACKs.clear(); - } + List tmp = new ArrayList(); + tmp.addAll(oldPeer._currentACKs); + oldPeer._currentACKs.clear(); + if (!_dead) { - synchronized (_currentACKs) { _currentACKs.addAll(tmp); } + _currentACKs.addAll(tmp); } tmp.clear(); - synchronized (oldPeer._currentACKsResend) { - tmp.addAll(oldPeer._currentACKsResend); - oldPeer._currentACKsResend.clear(); - } + tmp.addAll(oldPeer._currentACKsResend); + oldPeer._currentACKsResend.clear(); + if (!_dead) { - synchronized (_currentACKsResend) { _currentACKsResend.addAll(tmp); } + _currentACKsResend.addAll(tmp); } - tmp.clear(); - Map msgs = new HashMap(); + Map msgs = new HashMap(); synchronized (oldPeer._inboundMessages) { msgs.putAll(oldPeer._inboundMessages); oldPeer._inboundMessages.clear(); @@ -1515,20 +1571,20 @@ public class PeerState { } msgs.clear(); + List tmp2 = new ArrayList(); OutboundMessageState retransmitter = null; synchronized (oldPeer._outboundMessages) { - tmp.addAll(oldPeer._outboundMessages); + tmp2.addAll(oldPeer._outboundMessages); oldPeer._outboundMessages.clear(); retransmitter = oldPeer._retransmitter; oldPeer._retransmitter = null; } if (!_dead) { synchronized (_outboundMessages) { - _outboundMessages.addAll(tmp); + _outboundMessages.addAll(tmp2); _retransmitter = retransmitter; } } - tmp.clear(); } /* diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java index 9be1c26ad..aa145bbe9 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java @@ -2,11 +2,10 @@ package net.i2p.router.transport.udp; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.Base64; import net.i2p.data.DataHelper; @@ -102,7 +101,7 @@ class PeerTestManager { private PeerTestState _currentTest; private boolean _currentTestComplete; /** as Alice */ - private List _recentTests; + private Queue _recentTests; /** longest we will keep track of a Charlie nonce for */ private static final int MAX_CHARLIE_LIFETIME = 10*1000; @@ -116,8 +115,8 @@ class PeerTestManager { _context = context; _transport = transport; _log = context.logManager().getLog(PeerTestManager.class); - _activeTests = new HashMap(64); - _recentTests = Collections.synchronizedList(new ArrayList(16)); + _activeTests = new ConcurrentHashMap(); + _recentTests = new LinkedBlockingQueue(); _packetBuilder = new PacketBuilder(context, transport); _currentTest = null; _currentTestComplete = false; @@ -155,8 +154,8 @@ class PeerTestManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Running test with bob = " + bobIP + ":" + bobPort + " " + test.getNonce()); while (_recentTests.size() > 16) - _recentTests.remove(0); - _recentTests.add(new Long(test.getNonce())); + _recentTests.poll(); + _recentTests.offer(new Long(test.getNonce())); sendTestToBob(); @@ -435,10 +434,7 @@ class PeerTestManager { testInfo.readIP(testIP, 0); } - PeerTestState state = null; - synchronized (_activeTests) { - state = (PeerTestState)_activeTests.get(new Long(nonce)); - } + PeerTestState state = _activeTests.get(new Long(nonce)); if (state == null) { if ( (testIP == null) || (testPort <= 0) ) { @@ -542,9 +538,7 @@ class PeerTestManager { _log.debug("Receive from bob (" + from + ") as charlie, sending back to bob and sending to alice @ " + aliceIP + ":" + alicePort); if (isNew) { - synchronized (_activeTests) { - _activeTests.put(new Long(nonce), state); - } + _activeTests.put(new Long(nonce), state); SimpleScheduler.getInstance().addEvent(new RemoveTest(nonce), MAX_CHARLIE_LIFETIME); } @@ -623,9 +617,7 @@ class PeerTestManager { } if (isNew) { - synchronized (_activeTests) { - _activeTests.put(new Long(nonce), state); - } + _activeTests.put(new Long(nonce), state); SimpleScheduler.getInstance().addEvent(new RemoveTest(nonce), MAX_CHARLIE_LIFETIME); } @@ -701,9 +693,7 @@ class PeerTestManager { _nonce = nonce; } public void timeReached() { - synchronized (_activeTests) { _activeTests.remove(new Long(_nonce)); - } } } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestState.java b/router/java/src/net/i2p/router/transport/udp/PeerTestState.java index 16fff5fa8..b5219476c 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestState.java @@ -33,71 +33,71 @@ class PeerTestState { public static final short BOB = 2; public static final short CHARLIE = 3; - public synchronized long getNonce() { return _testNonce; } - public synchronized void setNonce(long nonce) { _testNonce = nonce; } + public long getNonce() { return _testNonce; } + public void setNonce(long nonce) { _testNonce = nonce; } /** Are we Alice, bob, or Charlie. */ - public synchronized short getOurRole() { return _ourRole; } - public synchronized void setOurRole(short role) { _ourRole = role; } + public short getOurRole() { return _ourRole; } + public void setOurRole(short role) { _ourRole = role; } /** * If we are Alice, this will contain the IP that Bob says we * can be reached at - the IP Charlie says we can be reached * at is _aliceIPFromCharlie * */ - public synchronized InetAddress getAliceIP() { return _aliceIP; } - public synchronized void setAliceIP(InetAddress ip) { _aliceIP = ip; } - public synchronized InetAddress getBobIP() { return _bobIP; } - public synchronized void setBobIP(InetAddress ip) { _bobIP = ip; } - public synchronized InetAddress getCharlieIP() { return _charlieIP; } - public synchronized void setCharlieIP(InetAddress ip) { _charlieIP = ip; } - public synchronized InetAddress getAliceIPFromCharlie() { return _aliceIPFromCharlie; } - public synchronized void setAliceIPFromCharlie(InetAddress ip) { _aliceIPFromCharlie = ip; } + public InetAddress getAliceIP() { return _aliceIP; } + public void setAliceIP(InetAddress ip) { _aliceIP = ip; } + public InetAddress getBobIP() { return _bobIP; } + public void setBobIP(InetAddress ip) { _bobIP = ip; } + public InetAddress getCharlieIP() { return _charlieIP; } + public void setCharlieIP(InetAddress ip) { _charlieIP = ip; } + public InetAddress getAliceIPFromCharlie() { return _aliceIPFromCharlie; } + public void setAliceIPFromCharlie(InetAddress ip) { _aliceIPFromCharlie = ip; } /** * If we are Alice, this will contain the port that Bob says we * can be reached at - the port Charlie says we can be reached * at is _alicePortFromCharlie * */ - public synchronized int getAlicePort() { return _alicePort; } - public synchronized void setAlicePort(int alicePort) { _alicePort = alicePort; } - public synchronized int getBobPort() { return _bobPort; } - public synchronized void setBobPort(int bobPort) { _bobPort = bobPort; } - public synchronized int getCharliePort() { return _charliePort; } - public synchronized void setCharliePort(int charliePort) { _charliePort = charliePort; } + public int getAlicePort() { return _alicePort; } + public void setAlicePort(int alicePort) { _alicePort = alicePort; } + public int getBobPort() { return _bobPort; } + public void setBobPort(int bobPort) { _bobPort = bobPort; } + public int getCharliePort() { return _charliePort; } + public void setCharliePort(int charliePort) { _charliePort = charliePort; } - public synchronized int getAlicePortFromCharlie() { return _alicePortFromCharlie; } - public synchronized void setAlicePortFromCharlie(int alicePortFromCharlie) { _alicePortFromCharlie = alicePortFromCharlie; } + public int getAlicePortFromCharlie() { return _alicePortFromCharlie; } + public void setAlicePortFromCharlie(int alicePortFromCharlie) { _alicePortFromCharlie = alicePortFromCharlie; } - public synchronized SessionKey getAliceIntroKey() { return _aliceIntroKey; } - public synchronized void setAliceIntroKey(SessionKey key) { _aliceIntroKey = key; } - public synchronized SessionKey getCharlieIntroKey() { return _charlieIntroKey; } - public synchronized void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; } - public synchronized SessionKey getBobCipherKey() { return _bobCipherKey; } - public synchronized void setBobCipherKey(SessionKey key) { _bobCipherKey = key; } - public synchronized SessionKey getBobMACKey() { return _bobMACKey; } - public synchronized void setBobMACKey(SessionKey key) { _bobMACKey = key; } + public SessionKey getAliceIntroKey() { return _aliceIntroKey; } + public void setAliceIntroKey(SessionKey key) { _aliceIntroKey = key; } + public SessionKey getCharlieIntroKey() { return _charlieIntroKey; } + public void setCharlieIntroKey(SessionKey key) { _charlieIntroKey = key; } + public SessionKey getBobCipherKey() { return _bobCipherKey; } + public void setBobCipherKey(SessionKey key) { _bobCipherKey = key; } + public SessionKey getBobMACKey() { return _bobMACKey; } + public void setBobMACKey(SessionKey key) { _bobMACKey = key; } /** when did this test begin? */ - public synchronized long getBeginTime() { return _beginTime; } - public synchronized void setBeginTime(long when) { _beginTime = when; } + public long getBeginTime() { return _beginTime; } + public void setBeginTime(long when) { _beginTime = when; } /** when did we last send out a packet? */ - public synchronized long getLastSendTime() { return _lastSendTime; } - public synchronized void setLastSendTime(long when) { _lastSendTime = when; } + public long getLastSendTime() { return _lastSendTime; } + public void setLastSendTime(long when) { _lastSendTime = when; } /** when did we last hear from alice? */ - public synchronized long getReceiveAliceTime() { return _receiveAliceTime; } - public synchronized void setReceiveAliceTime(long when) { _receiveAliceTime = when; } + public long getReceiveAliceTime() { return _receiveAliceTime; } + public void setReceiveAliceTime(long when) { _receiveAliceTime = when; } /** when did we last hear from bob? */ - public synchronized long getReceiveBobTime() { return _receiveBobTime; } - public synchronized void setReceiveBobTime(long when) { _receiveBobTime = when; } + public long getReceiveBobTime() { return _receiveBobTime; } + public void setReceiveBobTime(long when) { _receiveBobTime = when; } /** when did we last hear from charlie? */ - public synchronized long getReceiveCharlieTime() { return _receiveCharlieTime; } - public synchronized void setReceiveCharlieTime(long when) { _receiveCharlieTime = when; } + public long getReceiveCharlieTime() { return _receiveCharlieTime; } + public void setReceiveCharlieTime(long when) { _receiveCharlieTime = when; } public int getPacketsRelayed() { return _packetsRelayed; } public void incrementPacketsRelayed() { ++_packetsRelayed; } @Override - public synchronized String toString() { + public String toString() { StringBuilder buf = new StringBuilder(512); buf.append("Role: "); if (_ourRole == ALICE) buf.append("Alice"); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index 97e112df3..5dc001e18 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -2,9 +2,9 @@ package net.i2p.router.transport.udp; import java.net.DatagramPacket; import java.net.InetAddress; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; @@ -40,15 +40,17 @@ public class UDPPacket { private int _validateCount; // private boolean _isInbound; - private static final List _packetCache; + private static final Queue _packetCache; + private static final boolean CACHE = true; + private static final int CACHE_SIZE = 64; static { - _packetCache = new ArrayList(256); + if (CACHE) + _packetCache = new LinkedBlockingQueue(CACHE_SIZE); + else + _packetCache = null; _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); } - private static final boolean CACHE = true; // TODO: support caching to cut churn down a /lot/ - private static final int CACHE_SIZE = 64; - static final int MAX_PACKET_SIZE = 2048; public static final int IV_SIZE = 16; public static final int MAC_SIZE = 16; @@ -121,7 +123,9 @@ public class UDPPacket { private int _messageType; private int _fragmentCount; + /** only for debugging and stats, does not go on the wire */ int getMessageType() { return _messageType; } + /** only for debugging and stats, does not go on the wire */ void setMessageType(int type) { _messageType = type; } int getFragmentCount() { return _fragmentCount; } void setFragmentCount(int count) { _fragmentCount = count; } @@ -238,7 +242,7 @@ public class UDPPacket { @Override public String toString() { verifyNotReleased(); - StringBuilder buf = new StringBuilder(64); + StringBuilder buf = new StringBuilder(256); buf.append(_packet.getLength()); buf.append(" byte packet with "); buf.append(_packet.getAddress().getHostAddress()).append(":"); @@ -256,12 +260,7 @@ public class UDPPacket { public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) { UDPPacket rv = null; if (CACHE) { - synchronized (_packetCache) { - if (_packetCache.size() > 0) { - rv = (UDPPacket)_packetCache.remove(0); - } - } - + rv = _packetCache.poll(); if (rv != null) rv.init(ctx, inbound); } @@ -284,11 +283,7 @@ public class UDPPacket { //_dataCache.release(_dataBuf); if (!CACHE) return; - synchronized (_packetCache) { - if (_packetCache.size() <= CACHE_SIZE) { - _packetCache.add(this); - } - } + _packetCache.offer(this); } private void verifyNotReleased() { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index efb85ac48..395a2fcf0 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -402,7 +402,7 @@ public class UDPPacketReader { @Override public String toString() { - StringBuilder buf = new StringBuilder(256); + StringBuilder buf = new StringBuilder(512); long msAgo = _context.clock().now() - readTimestamp()*1000; buf.append("Data packet sent ").append(msAgo).append("ms ago "); buf.append("IV "); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index c1bd5133a..88cb20779 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -2,8 +2,8 @@ package net.i2p.router.transport.udp; import java.io.IOException; import java.net.DatagramSocket; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; @@ -24,19 +24,20 @@ public class UDPReceiver { private Log _log; private DatagramSocket _socket; private String _name; - private final List _inboundQueue; + private final BlockingQueue _inboundQueue; private boolean _keepRunning; private Runner _runner; private UDPTransport _transport; - // private static int __id; + private static int __id; private int _id; + private static final int TYPE_POISON = -99999; public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPReceiver.class); - _id++; + _id = ++__id; _name = name; - _inboundQueue = new ArrayList(128); + _inboundQueue = new LinkedBlockingQueue(); _socket = socket; _transport = transport; _runner = new Runner(); @@ -50,17 +51,22 @@ public class UDPReceiver { public void startup() { //adjustDropProbability(); _keepRunning = true; - I2PThread t = new I2PThread(_runner, _name + "." + _id); - t.setDaemon(true); + I2PThread t = new I2PThread(_runner, _name + '.' + _id, true); t.start(); } public void shutdown() { _keepRunning = false; - synchronized (_inboundQueue) { - _inboundQueue.clear(); - _inboundQueue.notifyAll(); + _inboundQueue.clear(); + UDPPacket poison = UDPPacket.acquire(_context, false); + poison.setMessageType(TYPE_POISON); + _inboundQueue.offer(poison); + for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) { + try { + Thread.sleep(i * 50); + } catch (InterruptedException ie) {} } + _inboundQueue.clear(); } /********* @@ -96,6 +102,7 @@ public class UDPReceiver { private static final int ARTIFICIAL_DELAY_BASE = 0; //600; **********/ + /** @return zero (was queue size) */ private int receive(UDPPacket packet) { /********* //adjustDropProbability(); @@ -126,7 +133,12 @@ public class UDPReceiver { return doReceive(packet); } + + /** @return zero (was queue size) */ private final int doReceive(UDPPacket packet) { + if (!_keepRunning) + return 0; + if (_log.shouldLog(Log.INFO)) _log.info("Received: " + packet); @@ -143,26 +155,25 @@ public class UDPReceiver { boolean rejected = false; int queueSize = 0; long headPeriod = 0; - synchronized (_inboundQueue) { - queueSize = _inboundQueue.size(); - if (queueSize > 0) { - headPeriod = ((UDPPacket)_inboundQueue.get(0)).getLifetime(); + + UDPPacket head = _inboundQueue.peek(); + if (head != null) { + headPeriod = head.getLifetime(); if (headPeriod > MAX_QUEUE_PERIOD) { rejected = true; - _inboundQueue.notifyAll(); } } if (!rejected) { - _inboundQueue.add(packet); - _inboundQueue.notifyAll(); - return queueSize + 1; + _inboundQueue.offer(packet); + //return queueSize + 1; + return 0; } - } // rejected packet.release(); _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod); if (_log.shouldLog(Log.WARN)) { + queueSize = _inboundQueue.size(); StringBuilder msg = new StringBuilder(); msg.append("Dropping inbound packet with "); msg.append(queueSize); @@ -188,21 +199,15 @@ public class UDPReceiver { */ public UDPPacket receiveNext() { UDPPacket rv = null; - int remaining = 0; - while (_keepRunning) { - synchronized (_inboundQueue) { - if (_inboundQueue.size() <= 0) - try { _inboundQueue.wait(); } catch (InterruptedException ie) {} - if (_inboundQueue.size() > 0) { - rv = (UDPPacket)_inboundQueue.remove(0); - remaining = _inboundQueue.size(); - if (remaining > 0) - _inboundQueue.notifyAll(); - break; - } - } + //int remaining = 0; + while (_keepRunning && rv == null) { + try { + rv = _inboundQueue.take(); + } catch (InterruptedException ie) {} + if (rv != null && rv.getMessageType() == TYPE_POISON) + return null; } - _context.statManager().addRateData("udp.receiveRemaining", remaining, 0); + //_context.statManager().addRateData("udp.receiveRemaining", remaining, 0); return rv; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 446fd9d63..6d1cd7c3a 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -3,8 +3,8 @@ package net.i2p.router.transport.udp; import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; -import java.util.ArrayList; -import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; @@ -20,16 +20,17 @@ public class UDPSender { private Log _log; private DatagramSocket _socket; private String _name; - private final List _outboundQueue; + private final BlockingQueue _outboundQueue; private boolean _keepRunning; private Runner _runner; + private static final int TYPE_POISON = 99999; - private static final int MAX_QUEUED = 4; + //private static final int MAX_QUEUED = 4; public UDPSender(RouterContext ctx, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPSender.class); - _outboundQueue = new ArrayList(128); + _outboundQueue = new LinkedBlockingQueue(); _socket = socket; _runner = new Runner(); _name = name; @@ -44,49 +45,40 @@ public class UDPSender { // used in RouterWatchdog _context.statManager().createRateStat("udp.sendException", "How frequently we fail to send a packet (likely due to a windows exception)", "udp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("udp.sendPacketSize.1", "db store message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.2", "db lookup message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.3", "db search reply message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.6", "tunnel create message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.7", "tunnel create status message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.10", "delivery status message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.11", "garlic message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.16", "date message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.18", "tunnel data message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.19", "tunnel gateway message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.20", "data message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.21", "tunnel build", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.22", "tunnel build reply", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.20", "data message size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.42", "ack-only packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.43", "hole punch packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.44", "relay response packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.45", "relay intro packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.46", "relay request packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.47", "peer test charlie to bob packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.48", "peer test bob to charlie packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.49", "peer test to alice packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.50", "peer test from alice packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.51", "session confirmed packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.52", "session request packet size", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.sendPacketSize.53", "session created packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_ACK, "ack-only packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_PUNCH, "hole punch packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_RESP, "relay response packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_INTRO, "relay intro packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_RREQ, "relay request packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TCB, "peer test charlie to bob packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TBC, "peer test bob to charlie packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TTA, "peer test to alice packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_TFA, "peer test from alice packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_CONF, "session confirmed packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_SREQ, "session request packet size", "udp", UDPTransport.RATES); + _context.statManager().createRateStat("udp.sendPacketSize." + PacketBuilder.TYPE_CREAT, "session created packet size", "udp", UDPTransport.RATES); } public void startup() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting the runner: " + _name); _keepRunning = true; - I2PThread t = new I2PThread(_runner, _name); - t.setDaemon(true); + I2PThread t = new I2PThread(_runner, _name, true); t.start(); } public void shutdown() { _keepRunning = false; - synchronized (_outboundQueue) { - _outboundQueue.clear(); - _outboundQueue.notifyAll(); + _outboundQueue.clear(); + UDPPacket poison = UDPPacket.acquire(_context, false); + poison.setMessageType(TYPE_POISON); + _outboundQueue.offer(poison); + for (int i = 1; i <= 5 && !_outboundQueue.isEmpty(); i++) { + try { + Thread.sleep(i * 50); + } catch (InterruptedException ie) {} } + _outboundQueue.clear(); } public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { @@ -98,10 +90,12 @@ public class UDPSender { * Add the packet to the queue. This may block until there is space * available, if requested, otherwise it returns immediately * - * @param blockTime how long to block + * @param blockTime how long to block IGNORED * @return number of packets queued + * @deprecated use add(packet) */ public int add(UDPPacket packet, int blockTime) { + /******** //long expiration = _context.clock().now() + blockTime; int remaining = -1; long lifetime = -1; @@ -124,13 +118,12 @@ public class UDPSender { } } - //if (true || (_outboundQueue.size() < MAX_QUEUED)) { + if (true || (_outboundQueue.size() < MAX_QUEUED)) { lifetime = packet.getLifetime(); _outboundQueue.add(packet); added = true; remaining = _outboundQueue.size(); _outboundQueue.notifyAll(); - /***** } else { long remainingTime = expiration - _context.clock().now(); if (remainingTime > 0) { @@ -141,7 +134,6 @@ public class UDPSender { } lifetime = packet.getLifetime(); } - *****/ } //} catch (InterruptedException ie) {} } @@ -153,42 +145,26 @@ public class UDPSender { if (_log.shouldLog(Log.DEBUG)) _log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime); return remaining; + ********/ + return add(packet); } private static final int MAX_HEAD_LIFETIME = 1000; /** - * - * @return number of packets in the queue + * Put it on the queue + * @return ZERO (used to be number of packets in the queue) */ public int add(UDPPacket packet) { - if (packet == null) return 0; + if (packet == null || !_keepRunning) return 0; int size = 0; - long lifetime = -1; - int removed = 0; - synchronized (_outboundQueue) { - lifetime = packet.getLifetime(); - UDPPacket head = null; - if (_outboundQueue.size() > 0) { - head = (UDPPacket)_outboundQueue.get(0); - while (head.getLifetime() > MAX_HEAD_LIFETIME) { - _outboundQueue.remove(0); - removed++; - if (_outboundQueue.size() > 0) - head = (UDPPacket)_outboundQueue.get(0); - else - break; - } - } - _outboundQueue.add(packet); + _outboundQueue.offer(packet); + //size = _outboundQueue.size(); + //_context.statManager().addRateData("udp.sendQueueSize", size, lifetime); + if (_log.shouldLog(Log.DEBUG)) { size = _outboundQueue.size(); - _outboundQueue.notifyAll(); + _log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + packet.getLifetime()); } - _context.statManager().addRateData("udp.sendQueueSize", size, lifetime); - if (removed > 0) - _context.statManager().addRateData("udp.sendQueueTrimmed", removed, size); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + lifetime); return size; } @@ -227,7 +203,8 @@ public class UDPSender { //_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size)); } - _context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); + if (packet.getMessageType() >= PacketBuilder.TYPE_FIRST) + _context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); //packet.getPacket().setLength(size); try { @@ -267,20 +244,17 @@ public class UDPSender { _log.debug("Stop sending..."); } + /** @return next packet in queue. Will discard any packet older than MAX_HEAD_LIFETIME */ private UDPPacket getNextPacket() { UDPPacket packet = null; - while ( (_keepRunning) && (packet == null) ) { + while ( (_keepRunning) && (packet == null || packet.getLifetime() > MAX_HEAD_LIFETIME) ) { + if (packet != null) + _context.statManager().addRateData("udp.sendQueueTrimmed", 1, 0); try { - synchronized (_outboundQueue) { - if (_outboundQueue.size() <= 0) { - _outboundQueue.notifyAll(); - _outboundQueue.wait(); - } else { - packet = (UDPPacket)_outboundQueue.remove(0); - _outboundQueue.notifyAll(); - } - } + packet = _outboundQueue.take(); } catch (InterruptedException ie) {} + if (packet != null && packet.getMessageType() == TYPE_POISON) + return null; } return packet; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 853d8c18a..fd0e5c9e4 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -13,8 +13,10 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.DataHelper; import net.i2p.data.Hash; @@ -31,6 +33,7 @@ import net.i2p.router.RouterContext; import net.i2p.router.transport.Transport; import net.i2p.router.transport.TransportBid; import net.i2p.router.transport.TransportImpl; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; @@ -75,8 +78,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** introduction key */ private SessionKey _introKey; - /** list of RemoteHostId for peers whose packets we want to drop outright */ - private final List _dropList; + /** + * List of RemoteHostId for peers whose packets we want to drop outright + * This is only for old network IDs (pre-0.6.1.10), so it isn't really used now. + */ + private final Set _dropList; private int _expireTimeout; @@ -167,9 +173,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority super(ctx); _context = ctx; _log = ctx.logManager().getLog(UDPTransport.class); - _peersByIdent = new HashMap(128); - _peersByRemoteHost = new HashMap(128); - _dropList = new ArrayList(256); + _peersByIdent = new ConcurrentHashMap(128); + _peersByRemoteHost = new ConcurrentHashMap(128); + _dropList = new ConcurrentHashSet(2); _endpoint = null; // See comments in DQAT.java @@ -608,9 +614,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority * if no state exists */ PeerState getPeerState(RemoteHostId hostInfo) { - synchronized (_peersByRemoteHost) { return _peersByRemoteHost.get(hostInfo); - } } /** @@ -618,9 +622,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority * if no state exists */ public PeerState getPeerState(Hash remotePeer) { - synchronized (_peersByIdent) { return _peersByIdent.get(remotePeer); - } } /** @@ -697,14 +699,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long oldEstablishedOn = -1; PeerState oldPeer = null; if (remotePeer != null) { - synchronized (_peersByIdent) { oldPeer = _peersByIdent.put(remotePeer, peer); if ( (oldPeer != null) && (oldPeer != peer) ) { // transfer over the old state/inbound message fragments/etc peer.loadFrom(oldPeer); oldEstablishedOn = oldPeer.getKeyEstablishedTime(); } - } } if (oldPeer != null) { @@ -717,13 +717,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority RemoteHostId remoteId = peer.getRemoteHostId(); if (remoteId == null) return false; - synchronized (_peersByRemoteHost) { oldPeer = _peersByRemoteHost.put(remoteId, peer); if ( (oldPeer != null) && (oldPeer != peer) ) { // transfer over the old state/inbound message fragments/etc peer.loadFrom(oldPeer); oldEstablishedOn = oldPeer.getKeyEstablishedTime(); - } } if (oldPeer != null) { @@ -773,6 +771,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority DatabaseStoreMessage dsm = (DatabaseStoreMessage)inMsg; if ( (dsm.getRouterInfo() != null) && (dsm.getRouterInfo().getNetworkId() != Router.NETWORK_ID) ) { + // this is pre-0.6.1.10, so it isn't going to happen any more + /* if (remoteIdentHash != null) { _context.shitlist().shitlistRouter(remoteIdentHash, "Sent us a peer from the wrong network"); @@ -792,21 +792,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority PeerState peer = getPeerState(peerHash); if (peer != null) { RemoteHostId remote = peer.getRemoteHostId(); - boolean added = false; - int droplistSize = 0; - synchronized (_dropList) { - if (!_dropList.contains(remote)) { - while (_dropList.size() > MAX_DROPLIST_SIZE) - _dropList.remove(0); - _dropList.add(remote); - added = true; - } - droplistSize = _dropList.size(); - } - if (added) { - _context.statManager().addRateData("udp.dropPeerDroplist", droplistSize, 0); - SimpleScheduler.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD); - } + _dropList.add(remote); + _context.statManager().addRateData("udp.dropPeerDroplist", 1, 0); + SimpleScheduler.getInstance().addEvent(new RemoveDropList(remote), DROPLIST_PERIOD); } markUnreachable(peerHash); _context.shitlist().shitlistRouter(peerHash, "Part of the wrong network, version = " + dsm.getRouterInfo().getOption("router.version")); @@ -838,13 +826,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private RemoteHostId _peer; public RemoveDropList(RemoteHostId peer) { _peer = peer; } public void timeReached() { - synchronized (_dropList) { - _dropList.remove(_peer); - } + _dropList.remove(_peer); } } - boolean isInDropList(RemoteHostId peer) { synchronized (_dropList) { return _dropList.contains(peer); } } + boolean isInDropList(RemoteHostId peer) { return _dropList.contains(peer); } void dropPeer(Hash peer, boolean shouldShitlist, String why) { PeerState state = getPeerState(peer); @@ -916,16 +902,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } long now = _context.clock().now(); _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); - synchronized (_peersByIdent) { - altByIdent = _peersByIdent.remove(peer.getRemotePeer()); - } + altByIdent = _peersByIdent.remove(peer.getRemotePeer()); } RemoteHostId remoteId = peer.getRemoteHostId(); if (remoteId != null) { - synchronized (_peersByRemoteHost) { altByHost = _peersByRemoteHost.remove(remoteId); - } } // unchoke 'em, but just because we'll never talk again... @@ -1087,10 +1069,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // also introduce us, also bid aggressively so we are preferred over NTCP. // (Otherwise we only talk UDP to those that are firewalled, and we will // never get any introducers) - int count; - synchronized (_peersByIdent) { - count = _peersByIdent.size(); - } + int count = _peersByIdent.size(); if (alwaysPreferUDP() || count < MIN_PEERS || (introducersRequired() && _introManager.introducerCount() < MIN_INTRODUCER_POOL)) return _cachedBid[SLOW_PREFERRED_BID]; @@ -1474,9 +1453,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority @Override public int countPeers() { - synchronized (_peersByIdent) { return _peersByIdent.size(); - } } @Override @@ -1484,7 +1461,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long now = _context.clock().now(); int active = 0; int inactive = 0; - synchronized (_peersByIdent) { for (Iterator iter = _peersByIdent.values().iterator(); iter.hasNext(); ) { PeerState peer = iter.next(); if (now-peer.getLastReceiveTime() > 5*60*1000) @@ -1492,7 +1468,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority else active++; } - } return active; } @@ -1501,7 +1476,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long now = _context.clock().now(); int active = 0; int inactive = 0; - synchronized (_peersByIdent) { for (Iterator iter = _peersByIdent.values().iterator(); iter.hasNext(); ) { PeerState peer = iter.next(); if (now-peer.getLastSendFullyTime() > 1*60*1000) @@ -1509,7 +1483,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority else active++; } - } return active; } @@ -1519,9 +1492,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } public boolean allowConnection() { - synchronized (_peersByIdent) { return _peersByIdent.size() < getMaxConnections(); - } } /** @@ -1534,9 +1505,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority Vector skews = new Vector(); Vector peers = new Vector(); - synchronized (_peersByIdent) { - peers.addAll(_peersByIdent.values()); - } + peers.addAll(_peersByIdent.values()); // If our clock is way off, we may not have many (or any) successful connections, // so try hard in that case to return good data @@ -1557,15 +1526,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** **internal, do not use** */ public static final UDPTransport _instance() { return __instance; } /** **internal, do not use** return the peers (Hash) of active peers. */ - public List _getActivePeers() { - List peers = new ArrayList(128); - synchronized (_peersByIdent) { - peers.addAll(_peersByIdent.keySet()); - } + public List _getActivePeers() { + List peers = new ArrayList(128); + peers.addAll(_peersByIdent.keySet()); long now = _context.clock().now(); - for (Iterator iter = peers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); + for (Iterator iter = peers.iterator(); iter.hasNext(); ) { + Hash peer = iter.next(); PeerState state = getPeerState(peer); if (now-state.getLastReceiveTime() > 5*60*1000) iter.remove(); // don't include old peers @@ -1886,9 +1853,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority @Override public void renderStatusHTML(Writer out, String urlBase, int sortFlags) throws IOException { TreeSet peers = new TreeSet(getComparator(sortFlags)); - synchronized (_peersByIdent) { - peers.addAll(_peersByIdent.values()); - } + peers.addAll(_peersByIdent.values()); long offsetTotal = 0; int bpsIn = 0; @@ -2205,12 +2170,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } private class ExpirePeerEvent implements SimpleTimer.TimedEvent { - private final List _expirePeers; - private List _expireBuffer; + private final Set _expirePeers; + private final List _expireBuffer; private boolean _alive; public ExpirePeerEvent() { - _expirePeers = new ArrayList(128); - _expireBuffer = new ArrayList(128); + _expirePeers = new ConcurrentHashSet(128); + _expireBuffer = new ArrayList(); } public void timeReached() { // Increase allowed idle time if we are well under allowed connections, otherwise decrease @@ -2222,10 +2187,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long longInactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT; long pingCutoff = _context.clock().now() - (2 * 60*60*1000); _expireBuffer.clear(); - synchronized (_expirePeers) { - int sz = _expirePeers.size(); - for (int i = 0; i < sz; i++) { - PeerState peer = (PeerState)_expirePeers.get(i); + + for (Iterator iter = _expirePeers.iterator(); iter.hasNext(); ) { + PeerState peer = iter.next(); long inactivityCutoff; // if we offered to introduce them, or we used them as introducer in last 2 hours if (peer.getWeRelayToThemAs() > 0 || peer.getIntroducerTime() > pingCutoff) @@ -2234,28 +2198,22 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority inactivityCutoff = shortInactivityCutoff; if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) { _expireBuffer.add(peer); - _expirePeers.remove(i); - i--; - sz--; + iter.remove(); } } - } + for (int i = 0; i < _expireBuffer.size(); i++) - dropPeer((PeerState)_expireBuffer.get(i), false, "idle too long"); + dropPeer(_expireBuffer.get(i), false, "idle too long"); _expireBuffer.clear(); if (_alive) SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000); } public void add(PeerState peer) { - synchronized (_expirePeers) { _expirePeers.add(peer); - } } public void remove(PeerState peer) { - synchronized (_expirePeers) { _expirePeers.remove(peer); - } } public void setIsAlive(boolean isAlive) { _alive = isAlive; @@ -2263,9 +2221,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000); } else { SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this); - synchronized (_expirePeers) { - _expirePeers.clear(); - } + _expirePeers.clear(); } } } @@ -2348,10 +2304,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } PeerState pickTestPeer(RemoteHostId dontInclude) { - List peers = null; - synchronized (_peersByIdent) { - peers = new ArrayList(_peersByIdent.values()); - } + List peers = new ArrayList(_peersByIdent.values()); Collections.shuffle(peers, _context.random()); for (int i = 0; i < peers.size(); i++) { PeerState peer = peers.get(i);