- Increase max outbound establishments based on bandwidth
   - Synchronization fix for Java 5
   - Use multiple buffer sizes in OutboundMessageState to
     reduce memory usage
   - Adjust skew calculation, synchronize too
   - Ping loop improvements
This commit is contained in:
zzz
2012-10-03 19:05:56 +00:00
parent 13ef00cb2e
commit 4d1ea6e4cd
7 changed files with 125 additions and 27 deletions

View File

@@ -1,3 +1,13 @@
2012-10-03 zzz
* NTCP: Reduce conLock contention
* SSU:
- Increase max outbound establishments based on bandwidth
- Synchronization fix for Java 5
- Use multiple buffer sizes in OutboundMessageState to
reduce memory usage
- Adjust skew calculation, synchronize too
- Ping loop improvements
2012-10-02 zzz
* I2CP: Delay after sending disconnect message to
help it get through

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 5;
public final static long BUILD = 6;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -86,7 +86,9 @@ class EstablishmentManager {
private int _activity;
/** max outbound in progress - max inbound is half of this */
private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 30;
private final int DEFAULT_MAX_CONCURRENT_ESTABLISH;
private static final int DEFAULT_LOW_MAX_CONCURRENT_ESTABLISH = 20;
private static final int DEFAULT_HIGH_MAX_CONCURRENT_ESTABLISH = 150;
private static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
/** max pending outbound connections (waiting because we are at MAX_CONCURRENT_ESTABLISH) */
@@ -132,6 +134,9 @@ class EstablishmentManager {
_outboundByClaimedAddress = new ConcurrentHashMap();
_outboundByHash = new ConcurrentHashMap();
_activityLock = new Object();
DEFAULT_MAX_CONCURRENT_ESTABLISH = Math.max(DEFAULT_LOW_MAX_CONCURRENT_ESTABLISH,
Math.min(DEFAULT_HIGH_MAX_CONCURRENT_ESTABLISH,
ctx.bandwidthLimiter().getOutboundKBytesPerSecond() / 2));
_context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", UDPTransport.RATES);
@@ -573,7 +578,14 @@ class EstablishmentManager {
}
}
if (_outboundStates.size() < getMaxConcurrentEstablish() && !_queuedOutbound.isEmpty()) {
// in theory shouldn't need locking, but
// getting IllegalStateExceptions on old Java 5,
// which hoses this state.
synchronized(_queuedOutbound) {
locked_admitQueued();
}
}
//remaining = _queuedOutbound.size();
//if (admitted > 0)
@@ -600,6 +612,7 @@ class EstablishmentManager {
// ok, active shrunk, lets let some queued in.
Map.Entry<RemoteHostId, List<OutNetMessage>> entry = iter.next();
// java 5 IllegalStateException here
iter.remove();
RemoteHostId to = entry.getKey();
List<OutNetMessage> allQueued = entry.getValue();
@@ -709,7 +722,7 @@ class EstablishmentManager {
private void sendInboundComplete(PeerState peer) {
// SimpleTimer.getInstance().addEvent(new PublishToNewInbound(peer), 10*1000);
if (_log.shouldLog(Log.INFO))
_log.info("Completing to the peer after confirm: " + peer);
_log.info("Completing to the peer after IB confirm: " + peer);
DeliveryStatusMessage dsm = new DeliveryStatusMessage(_context);
dsm.setArrival(Router.NETWORK_ID); // overloaded, sure, but future versions can check this
// This causes huge values in the inNetPool.droppedDeliveryStatusDelay stat

View File

@@ -61,10 +61,10 @@ class MessageReceiver {
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
_context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
_context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
//_context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
_alive = true;
}

View File

@@ -42,10 +42,20 @@ class OutboundMessageState implements CDPQEntry {
private long _seqNum;
public static final int MAX_MSG_SIZE = 32 * 1024;
/** is this enough for a high-bandwidth router? */
private static final int MAX_ENTRIES = 64;
/** would two caches, one for small and one for large messages, be better? */
private static final ByteCache _cache = ByteCache.getInstance(MAX_ENTRIES, MAX_MSG_SIZE);
private static final int CACHE4_BYTES = MAX_MSG_SIZE;
private static final int CACHE3_BYTES = CACHE4_BYTES / 4;
private static final int CACHE2_BYTES = CACHE3_BYTES / 4;
private static final int CACHE1_BYTES = CACHE2_BYTES / 4;
private static final int CACHE1_MAX = 256;
private static final int CACHE2_MAX = CACHE1_MAX / 4;
private static final int CACHE3_MAX = CACHE2_MAX / 4;
private static final int CACHE4_MAX = CACHE3_MAX / 4;
private static final ByteCache _cache1 = ByteCache.getInstance(CACHE1_MAX, CACHE1_BYTES);
private static final ByteCache _cache2 = ByteCache.getInstance(CACHE2_MAX, CACHE2_BYTES);
private static final ByteCache _cache3 = ByteCache.getInstance(CACHE3_MAX, CACHE3_BYTES);
private static final ByteCache _cache4 = ByteCache.getInstance(CACHE4_MAX, CACHE4_BYTES);
private static final long EXPIRATION = 10*1000;
@@ -72,6 +82,7 @@ class OutboundMessageState implements CDPQEntry {
* Called from UDPTransport
* TODO make two constructors, remove this, and make more things final
* @return success
* @throws IAE if too big
*/
public boolean initialize(I2NPMessage msg, PeerState peer) {
if (msg == null)
@@ -91,6 +102,7 @@ class OutboundMessageState implements CDPQEntry {
* Called from OutboundMessageFragments
* TODO make two constructors, remove this, and make more things final
* @return success
* @throws IAE if too big
*/
public boolean initialize(OutNetMessage m, I2NPMessage msg) {
if ( (m == null) || (msg == null) )
@@ -110,19 +122,13 @@ class OutboundMessageState implements CDPQEntry {
* Called from OutboundMessageFragments
* @param m null if msg is "injected"
* @return success
* @throws IAE if too big
*/
private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
_message = m;
_peer = peer;
if (_messageBuf != null) {
_cache.release(_messageBuf);
_messageBuf = null;
}
_messageBuf = _cache.acquire();
int size = msg.getRawMessageSize();
if (size > _messageBuf.getData().length)
throw new IllegalArgumentException("Size too large! " + size + " in " + msg);
acquireBuf(size);
try {
int len = msg.toRawByteArray(_messageBuf.getData());
_messageBuf.setValid(len);
@@ -137,13 +143,49 @@ class OutboundMessageState implements CDPQEntry {
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
return true;
} catch (IllegalStateException ise) {
_cache.release(_messageBuf);
_messageBuf = null;
_released = true;
releaseBuf();
return false;
}
}
/**
* @throws IAE if too big
* @since 0.9.3
*/
private void acquireBuf(int size) {
if (_messageBuf != null)
releaseBuf();
if (size <= CACHE1_BYTES)
_messageBuf = _cache1.acquire();
else if (size <= CACHE2_BYTES)
_messageBuf = _cache2.acquire();
else if (size <= CACHE3_BYTES)
_messageBuf = _cache3.acquire();
else if (size <= CACHE4_BYTES)
_messageBuf = _cache4.acquire();
else
throw new IllegalArgumentException("Size too large! " + size);
}
/**
* @since 0.9.3
*/
private void releaseBuf() {
if (_messageBuf == null)
return;
int size = _messageBuf.getData().length;
if (size == CACHE1_BYTES)
_cache1.release(_messageBuf);
else if (size == CACHE2_BYTES)
_cache2.release(_messageBuf);
else if (size == CACHE3_BYTES)
_cache3.release(_messageBuf);
else if (size == CACHE4_BYTES)
_cache4.release(_messageBuf);
_messageBuf = null;
_released = true;
}
/**
* This is synchronized with writeFragment(),
* so we do not release (probably due to an ack) while we are retransmitting.
@@ -151,8 +193,7 @@ class OutboundMessageState implements CDPQEntry {
*/
public synchronized void releaseResources() {
if (_messageBuf != null && !_released) {
_cache.release(_messageBuf);
_released = true;
releaseBuf();
if (_log.shouldLog(Log.WARN))
_releasedBy = new Exception ("Released on " + new Date() + " by:");
}

View File

@@ -72,6 +72,7 @@ class PeerState {
* A positive number means our clock is ahead of theirs.
*/
private long _clockSkew;
private final Object _clockSkewLock = new Object();
/** what is the current receive second, for congestion control? */
private long _currentReceiveSecond;
@@ -79,6 +80,8 @@ class PeerState {
private long _lastSendTime;
/** when did we last send them a message that was ACKed */
private long _lastSendFullyTime;
/** when did we last send them a ping? */
private long _lastPingTime;
/** when did we last receive a packet from them? */
private long _lastReceiveTime;
/** how many consecutive messages have we sent and not received an ACK to */
@@ -289,6 +292,7 @@ class PeerState {
private static final int INIT_RTO = 3*1000;
public static final int INIT_RTT = INIT_RTO / 2;
private static final int MAX_RTO = 15*1000;
private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3;
public PeerState(RouterContext ctx, UDPTransport transport,
byte[] remoteIP, int remotePort, Hash remotePeer, boolean isInbound) {
@@ -522,7 +526,12 @@ class PeerState {
* A positive number means our clock is ahead of theirs.
*/
public void adjustClockSkew(long skew) {
_clockSkew = (long) (0.9*_clockSkew + 0.1*(skew - (_rtt / 2)));
// the real one-way delay is much less than RTT / 2, due to ack delays,
// so add a fudge factor
double adj = 0.1 * (skew + CLOCK_SKEW_FUDGE - (_rtt / 2));
synchronized(_clockSkewLock) {
_clockSkew = (long) (0.9*_clockSkew + adj);
}
}
/** what is the current receive second, for congestion control? */
@@ -531,6 +540,19 @@ class PeerState {
public void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
/**
* Note ping sent. Does not update last send time.
* @since 0.9.3
*/
public void setLastPingTime(long when) { _lastPingTime = when; }
/**
* Latest of last sent and last ping
* @since 0.9.3
*/
public long getLastSendOrPingTime() { return Math.max(_lastSendTime, _lastPingTime); }
/** return the smoothed send transfer rate */
public int getSendBps() { return _sendBps; }
public int getReceiveBps() { return _receiveBps; }

View File

@@ -2440,6 +2440,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
private final Set<PeerState> _expirePeers;
private final List<PeerState> _expireBuffer;
private volatile boolean _alive;
private int _runCount;
// we've seen firewalls change ports after 40 seconds
private static final long PING_FIREWALL_TIME = 30*1000;
private static final long PING_FIREWALL_CUTOFF = PING_FIREWALL_TIME / 2;
// ping 1/4 of the peers every loop
private static final int SLICES = 4;
private static final long SHORT_LOOP_TIME = PING_FIREWALL_CUTOFF / (SLICES + 1);
private static final long LONG_LOOP_TIME = 25*1000;
public ExpirePeerEvent() {
super(_context.simpleTimer2());
@@ -2457,10 +2465,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long shortInactivityCutoff = now - _expireTimeout;
long longInactivityCutoff = now - EXPIRE_TIMEOUT;
long pingCutoff = now - (2 * 60*60*1000);
long pingFirewallCutoff = now - (60 * 1000);
long pingFirewallCutoff = now - PING_FIREWALL_CUTOFF;
boolean shouldPingFirewall = _reachabilityStatus != CommSystemFacade.STATUS_OK;
boolean pingOneOnly = shouldPingFirewall && _externalListenPort == _endpoint.getListenPort();
boolean shortLoop = shouldPingFirewall;
_expireBuffer.clear();
_runCount++;
for (Iterator<PeerState> iter = _expirePeers.iterator(); iter.hasNext(); ) {
PeerState peer = iter.next();
@@ -2474,7 +2484,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_expireBuffer.add(peer);
iter.remove();
} else if (shouldPingFirewall &&
peer.getLastSendTime() < pingFirewallCutoff &&
((_runCount ^ peer.hashCode()) & (SLICES - 1)) == 0 &&
peer.getLastSendOrPingTime() < pingFirewallCutoff &&
peer.getLastReceiveTime() < pingFirewallCutoff) {
// ping if firewall is mapping the port to keep port the same...
// if the port changes we are screwed
@@ -2485,6 +2496,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// or else session will stay open forever?
//peer.setLastSendTime(now);
send(_destroyBuilder.buildPing(peer));
peer.setLastPingTime(now);
// If external port is different, it may be changing the port for every
// session, so ping all of them. Otherwise only one.
if (pingOneOnly)
@@ -2499,7 +2511,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_expireBuffer.clear();
if (_alive)
schedule(30*1000);
schedule(shortLoop ? SHORT_LOOP_TIME : LONG_LOOP_TIME);
}
public void add(PeerState peer) {
@@ -2513,7 +2525,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {
reschedule(30*1000);
reschedule(LONG_LOOP_TIME);
} else {
cancel();
_expirePeers.clear();