diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
index 7b344b496..aaa99447b 100644
--- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
@@ -22,7 +22,7 @@ public class ACKSender implements Runnable {
private boolean _alive;
/** how frequently do we want to send ACKs to a peer? */
- private static final int ACK_FREQUENCY = 400;
+ static final int ACK_FREQUENCY = 400;
public ACKSender(RouterContext ctx, UDPTransport transport) {
_context = ctx;
diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
index e4d7e7d0f..b030a8b6e 100644
--- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
+++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
@@ -42,6 +42,10 @@ public class EstablishmentManager {
_inboundStates = new HashMap(32);
_outboundStates = new HashMap(32);
_activityLock = new Object();
+ _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("udp.inboundEstablishFailedState", "What state a failed inbound establishment request fails in", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
+ _context.statManager().createRateStat("udp.outboundEstablishFailedState", "What state a failed outbound establishment request fails in", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@@ -221,6 +225,7 @@ public class EstablishmentManager {
_transport.addRemotePeerState(peer);
+ _context.statManager().addRateData("udp.inboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer);
}
@@ -250,6 +255,7 @@ public class EstablishmentManager {
_transport.addRemotePeerState(peer);
+ _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0);
sendOurInfo(peer);
while (true) {
@@ -343,6 +349,7 @@ public class EstablishmentManager {
} else if (cur.getLifetime() > MAX_ESTABLISH_TIME) {
// took too long, fuck 'em
iter.remove();
+ _context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing expired inbound state");
} else {
@@ -430,6 +437,7 @@ public class EstablishmentManager {
// took too long, fuck 'em
iter.remove();
outboundState = cur;
+ _context.statManager().addRateData("udp.outboundEstablishFailedState", cur.getState(), cur.getLifetime());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing expired outbound: " + cur);
break;
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
index cbb75c91a..eb952eac6 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java
@@ -36,7 +36,6 @@ public class InboundMessageFragments {
private MessageReceiver _messageReceiver;
private boolean _alive;
- private static final int RECENTLY_COMPLETED_SIZE = 100;
/** decay the recently completed every 2 minutes */
private static final int DECAY_PERIOD = 120*1000;
@@ -113,7 +112,7 @@ public class InboundMessageFragments {
if (_recentlyCompletedMessages.isKnown(messageId.longValue())) {
_context.statManager().addRateData("udp.ignoreRecentDuplicate", 1, 0);
- from.messageFullyReceived(messageId);
+ from.messageFullyReceived(messageId, -1);
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.WARN))
_log.warn("Message received is a dup: " + messageId + " dups: "
@@ -143,7 +142,7 @@ public class InboundMessageFragments {
_messageReceiver.receiveMessage(state);
- from.messageFullyReceived(messageId);
+ from.messageFullyReceived(messageId, state.getCompleteSize());
_ackSender.ackPeer(from);
if (_log.shouldLog(Log.INFO))
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
index a03bf47eb..03173dd3e 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
@@ -25,6 +25,7 @@ public class InboundMessageState {
*/
private int _lastFragment;
private long _receiveBegin;
+ private int _completeSize;
/** expire after 30s */
private static final long MAX_RECEIVE_TIME = 10*1000;
@@ -39,6 +40,7 @@ public class InboundMessageState {
_from = from;
_fragments = new ByteArray[MAX_FRAGMENTS];
_lastFragment = -1;
+ _completeSize = -1;
_receiveBegin = ctx.clock().now();
}
@@ -86,10 +88,13 @@ public class InboundMessageState {
public Hash getFrom() { return _from; }
public long getMessageId() { return _messageId; }
public synchronized int getCompleteSize() {
- int size = 0;
- for (int i = 0; i <= _lastFragment; i++)
- size += _fragments[i].getValid();
- return size;
+ if (_completeSize < 0) {
+ int size = 0;
+ for (int i = 0; i <= _lastFragment; i++)
+ size += _fragments[i].getValid();
+ _completeSize = size;
+ }
+ return _completeSize;
}
public void releaseResources() {
diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
index de2330f04..4d0fc2571 100644
--- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
+++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java
@@ -83,7 +83,7 @@ public class OutboundMessageFragments {
else if (_allowExcess)
return true;
else
- _activeMessages.wait();
+ _activeMessages.wait(1000);
}
} catch (InterruptedException ie) {}
}
@@ -177,9 +177,10 @@ public class OutboundMessageFragments {
_nextPacketMessage = 0;
}
i--;
- }
- }
- }
+ } // end (pushCount > maxVolleys)
+ } // end iterating over active
+ _activeMessages.notifyAll();
+ } // end synchronized
}
private static final long SECOND_MASK = 1023l;
@@ -286,6 +287,7 @@ public class OutboundMessageFragments {
if (currentFragment < 0) {
if (nextSend <= 0) {
try {
+ _activeMessages.notifyAll();
_activeMessages.wait();
} catch (InterruptedException ie) {}
} else {
@@ -301,6 +303,8 @@ public class OutboundMessageFragments {
_activeMessages.wait(delay);
} catch (InterruptedException ie) {}
}
+ } else {
+ _activeMessages.notifyAll();
}
_allowExcess = false;
} // end of the synchronized block
@@ -344,6 +348,7 @@ public class OutboundMessageFragments {
Hash expectedBy = msg.getTarget().getIdentity().getHash();
if (!expectedBy.equals(ackedBy)) {
state = null;
+ _activeMessages.notifyAll();
return 0;
}
}
@@ -355,12 +360,12 @@ public class OutboundMessageFragments {
if (_nextPacketMessage < 0)
_nextPacketMessage = 0;
}
- _activeMessages.notifyAll();
break;
} else {
state = null;
}
}
+ _activeMessages.notifyAll();
}
if (state != null) {
diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java
index 392f70299..7ac819991 100644
--- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java
+++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java
@@ -37,7 +37,7 @@ public class PacketPusher implements Runnable {
while (_alive) {
UDPPacket packet = _fragments.getNextPacket();
if (packet != null)
- _sender.add(packet, true); // blocks
+ _sender.add(packet, 1000); // blocks for up to a second
}
}
}
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index 5c4e55b57..e16024576 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -85,6 +85,11 @@ public class PeerState {
/** how many bytes can we send to the peer in the current second */
private volatile int _sendWindowBytesRemaining;
private long _lastSendRefill;
+ private int _sendBps;
+ private int _sendBytes;
+ private int _receiveBps;
+ private int _receiveBytes;
+ private long _receivePeriodBegin;
private volatile long _lastCongestionOccurred;
/**
* when sendWindowBytes is below this, grow the window size quickly,
@@ -131,6 +136,8 @@ public class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
private static final int DEFAULT_MTU = 1472;
+ private static final int MIN_RTO = ACKSender.ACK_FREQUENCY + 100;
+ private static final int MAX_RTO = 5000;
public PeerState(I2PAppContext ctx) {
_context = ctx;
@@ -154,6 +161,10 @@ public class PeerState {
_sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES;
_slowStartThreshold = MAX_SEND_WINDOW_BYTES/2;
_lastSendRefill = _context.clock().now();
+ _receivePeriodBegin = _lastSendRefill;
+ _sendBps = 0;
+ _sendBytes = 0;
+ _receiveBps = 0;
_lastCongestionOccurred = -1;
_remoteIP = null;
_remotePort = -1;
@@ -309,6 +320,9 @@ public class PeerState {
public void setLastSendTime(long when) { _lastSendTime = when; }
/** when did we last receive a packet from them? */
public void setLastReceiveTime(long when) { _lastReceiveTime = when; }
+ /** return the smoothed send transfer rate */
+ public int getSendBps() { return _sendBps; }
+ public int getReceiveBps() { return _receiveBps; }
public int incrementConsecutiveFailedSends() {
long now = _context.clock().now()/(10*1000);
if (_lastFailedSendPeriod >= now) {
@@ -333,13 +347,18 @@ public class PeerState {
*/
public boolean allocateSendingBytes(int size) {
long now = _context.clock().now();
- if (_lastSendRefill + 1000 <= now) {
+ long duration = now - _lastSendRefill;
+ if (duration >= 1000) {
_sendWindowBytesRemaining = _sendWindowBytes;
+ _sendBytes += size;
+ _sendBps = (int)(0.9f*(float)_sendBps + 0.1f*((float)_sendBytes * (1000f/(float)duration)));
+ _sendBytes = 0;
_lastSendRefill = now;
}
//if (true) return true;
if (size <= _sendWindowBytesRemaining) {
_sendWindowBytesRemaining -= size;
+ _sendBytes += size;
_lastSendTime = now;
return true;
} else {
@@ -374,10 +393,21 @@ public class PeerState {
public int getSlowStartThreshold() { return _slowStartThreshold; }
/** we received the message specified completely */
- public void messageFullyReceived(Long messageId) {
+ public void messageFullyReceived(Long messageId, int bytes) {
+ if (bytes > 0)
+ _receiveBytes += bytes;
+
+ long now = _context.clock().now();
+ long duration = now - _receivePeriodBegin;
+ if (duration >= 1000) {
+ _receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration)));
+ _receiveBytes = 0;
+ _receivePeriodBegin = now;
+ }
+
synchronized (_currentACKs) {
if (_wantACKSendSince <= 0)
- _wantACKSendSince = _context.clock().now();
+ _wantACKSendSince = now;
if (!_currentACKs.contains(messageId))
_currentACKs.add(messageId);
}
@@ -454,10 +484,10 @@ public class PeerState {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
+ " rttDev=" + _rttDeviation + " rto=" + _rto);
- if (_rto < 1000)
- _rto = 1000;
- if (_rto > 5000)
- _rto = 5000;
+ if (_rto < MIN_RTO)
+ _rto = MIN_RTO;
+ if (_rto > MAX_RTO)
+ _rto = MAX_RTO;
}
/** we are resending a packet, so lets jack up the rto */
public void messageRetransmitted() {
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java
index 144ae839d..3931815ee 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java
@@ -66,7 +66,16 @@ public class UDPReceiver {
/** if a packet been sitting in the queue for 2 seconds, drop subsequent packets */
private static final long MAX_QUEUE_PERIOD = 2*1000;
+ private static final float ARTIFICIAL_DROP_PROBABILITY = 0f; //0.02f;
+
private void receive(UDPPacket packet) {
+ if (ARTIFICIAL_DROP_PROBABILITY > 0) {
+ // the first check is to let the compiler optimize away this
+ // random block on the live system when the probability is == 0
+ if (_context.random().nextFloat() <= ARTIFICIAL_DROP_PROBABILITY)
+ return;
+ }
+
synchronized (_inboundQueue) {
int queueSize = _inboundQueue.size();
if (queueSize > 0) {
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
index c7b5fcaee..9511ca7f9 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java
@@ -66,9 +66,11 @@ public class UDPSender {
* Add the packet to the queue. This may block until there is space
* available, if requested, otherwise it returns immediately
*
+ * @param blockTime how long to block
* @return number of packets queued
*/
- public int add(UDPPacket packet, boolean blocking) {
+ public int add(UDPPacket packet, int blockTime) {
+ long expiration = _context.clock().now() + blockTime;
int remaining = -1;
while ( (_keepRunning) && (remaining < 0) ) {
try {
@@ -78,10 +80,12 @@ public class UDPSender {
remaining = _outboundQueue.size();
_outboundQueue.notifyAll();
} else {
- if (blocking) {
- _outboundQueue.wait();
+ long remainingTime = expiration - _context.clock().now();
+ if (remainingTime > 0) {
+ _outboundQueue.wait(remainingTime);
} else {
remaining = _outboundQueue.size();
+ _outboundQueue.notifyAll();
}
}
}
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 6536a2375..c46457a8f 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -7,6 +7,8 @@ import java.net.UnknownHostException;
import java.io.IOException;
import java.io.Writer;
+import java.text.DecimalFormat;
+
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -108,6 +110,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments = new OutboundMessageFragments(_context, this);
_inboundFragments = new InboundMessageFragments(_context, _fragments, this);
_flooder = new UDPFlooder(_context, this);
+
+ _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
}
public void startup() {
@@ -322,6 +326,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (_log.shouldLog(Log.WARN))
_log.debug("Dropping remote peer: " + peer);
if (peer.getRemotePeer() != null) {
+ long now = _context.clock().now();
+ _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime());
_context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries");
synchronized (_peersByIdent) {
_peersByIdent.remove(peer.getRemotePeer());
@@ -531,7 +537,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
StringBuffer buf = new StringBuffer(512);
buf.append("UDP connections: ").append(peers.size()).append("
\n");
buf.append("
| peer | activity (in/out) | \n"); + buf.append("|||||||||||
| peer | activity (in/out) | "); + buf.append("transfer (in/out) | \n"); buf.append("uptime | skew | \n"); buf.append("cwnd | ssthresh | \n"); buf.append("rtt | dev | rto | \n"); @@ -567,10 +574,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(""); buf.append(""); - buf.append(DataHelper.formatDuration(now-peer.getLastReceiveTime())); - buf.append("/"); - buf.append(DataHelper.formatDuration(now-peer.getLastSendTime())); - buf.append(" | "); + buf.append((now-peer.getLastReceiveTime())/1000); + buf.append("s/"); + buf.append((now-peer.getLastSendTime())/1000); + buf.append("s"); + + buf.append(""); + buf.append(formatKBps(peer.getReceiveBps())); + buf.append("KBps/"); + buf.append(formatKBps(peer.getSendBps())); + buf.append("KBps | "); buf.append(""); buf.append(DataHelper.formatDuration(now-peer.getKeyEstablishedTime())); @@ -616,6 +629,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority out.write(" |