- Limit UDPSender queue size
   - Increase UDPSender max packet lifetime
   - Clear UDPSender queue before sending destroys to all
   - Increase PeerState queue size so large streaming windows
     don't get dropped right away, especially at slow start
   - Various improvements on iterating over pending outbound
     messages in PeerState
This commit is contained in:
zzz
2012-08-27 20:39:00 +00:00
parent f8bc6f8612
commit d305eb6a9c
8 changed files with 135 additions and 39 deletions

View File

@@ -1,3 +1,17 @@
2012-08-27 zzz
* i2psnark: Notify threads awaiting DHT replies at shutdown
* Reseed: Remove forum.i2p2.de
* Streaming: Limit amount of slow-start exponential growth
* SSU:
- Limit UDPSender queue size
- Increase UDPSender max packet lifetime
- Clear UDPSender queue before sending destroys to all
- Increase PeerState queue size so large streaming windows
don't get dropped right away, especially at slow start
- Various improvements on iterating over pending outbound
messages in PeerState
* Wrapper: Update armv7 to 3.5.15
2012-08-27 kytv
* Update Java Service Wrapper to v3.5.15.
- Windows: Self-compiled with VS2010 in Windows 7. The icon has been

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 19;
public final static long BUILD = 20;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -42,6 +42,8 @@ class OutboundMessageState {
private static final int MAX_ENTRIES = 64;
/** would two caches, one for small and one for large messages, be better? */
private static final ByteCache _cache = ByteCache.getInstance(MAX_ENTRIES, MAX_MSG_SIZE);
private static final long EXPIRATION = 10*1000;
public OutboundMessageState(I2PAppContext context) {
_context = context;
@@ -64,6 +66,7 @@ class OutboundMessageState {
/**
* Called from UDPTransport
* TODO make two constructors, remove this, and make more things final
* @return success
*/
public boolean initialize(I2NPMessage msg, PeerState peer) {
@@ -82,6 +85,7 @@ class OutboundMessageState {
/**
* Called from OutboundMessageFragments
* TODO make two constructors, remove this, and make more things final
* @return success
*/
public boolean initialize(OutNetMessage m, I2NPMessage msg) {
@@ -121,7 +125,7 @@ class OutboundMessageState {
_startedOn = _context.clock().now();
_nextSendTime = _startedOn;
_expiration = _startedOn + 10*1000;
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
if (_log.shouldLog(Log.DEBUG))

View File

@@ -38,7 +38,7 @@ class PacketPusher implements Runnable {
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
//_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
// BLOCKING if queue is full
_sender.add(packets[i]);
}
}

View File

@@ -217,6 +217,13 @@ class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
/**
* Was 32 before 0.9.2, but since the streaming lib goes up to 128,
* we would just drop our own msgs right away during slow start.
* May need to adjust based on memory.
*/
private static final int MAX_SEND_MSGS_PENDING = 128;
/*
* 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message,
* 522 fragment bytes, which is enough to send a tunnel data message in 2
@@ -1181,6 +1188,14 @@ class PeerState {
RemoteHostId getRemoteHostId() { return _remoteHostId; }
/**
* TODO should this use a queue, separate from the list of msgs pending an ack?
* TODO bring back tail drop?
* TODO priority queue? (we don't implement priorities in SSU now)
* TODO backlog / pushback / block instead of dropping? Can't really block here.
* TODO SSU does not support isBacklogged() now
* @return total pending messages
*/
public int add(OutboundMessageState state) {
if (_dead) {
_transport.failed(state, false);
@@ -1193,8 +1208,8 @@ class PeerState {
boolean fail = false;
synchronized (_outboundMessages) {
rv = _outboundMessages.size() + 1;
if (rv > 32) {
// 32 queued messages? to *one* peer? nuh uh.
if (rv > MAX_SEND_MSGS_PENDING) {
// too many queued messages to one peer? nuh uh.
fail = true;
rv--;
@@ -1240,8 +1255,11 @@ class PeerState {
_outboundMessages.add(state);
}
}
if (fail)
if (fail) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping msg, OB queue full for " + toString());
_transport.failed(state, false);
}
return rv;
}
@@ -1278,6 +1296,10 @@ class PeerState {
/**
* Expire / complete any outbound messages
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 1st.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return number of active outbound messages remaining
*/
public int finishMessages() {
@@ -1350,14 +1372,20 @@ class PeerState {
/**
* Pick a message we want to send and allocate it out of our window
* @return allocated message to send, or null if no messages or no resources
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned > 0.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return allocated message to send, or null if no messages or no resources
*/
public OutboundMessageState allocateSend() {
if (_dead) return null;
synchronized (_outboundMessages) {
for (OutboundMessageState state : _outboundMessages) {
if (locked_shouldSend(state)) {
// We have 3 return values, because if allocateSendingBytes() returns false,
// then we can stop iterating.
ShouldSend should = locked_shouldSend(state);
if (should == ShouldSend.YES) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
/*
@@ -1369,6 +1397,12 @@ class PeerState {
}
*/
return state;
} else if (should == ShouldSend.NO_BW) {
// no more bandwidth available
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
break;
} /* else {
OutNetMessage msg = state.getMessage();
if (msg != null)
@@ -1382,6 +1416,10 @@ class PeerState {
}
/**
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value.
*/
@@ -1396,6 +1434,9 @@ class PeerState {
}
for (OutboundMessageState state : _outboundMessages) {
int delay = (int)(state.getNextSendTime() - now);
// short circuit once we hit something ready to go
if (delay <= 0)
return delay;
if (delay < rv)
rv = delay;
}
@@ -1435,7 +1476,13 @@ class PeerState {
return mtu - (PacketBuilder.MIN_DATA_PACKET_OVERHEAD + MIN_ACK_SIZE);
}
private boolean locked_shouldSend(OutboundMessageState state) {
private enum ShouldSend { YES, NO, NO_BW };
/**
* Have 3 return values, because if allocateSendingBytes() returns false,
* then allocateSend() can stop iterating
*/
private ShouldSend locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();
if (state.getNextSendTime() <= now) {
if (!state.isFragmented()) {
@@ -1465,7 +1512,7 @@ class PeerState {
} else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
//if (state.getMessage() != null)
// state.getMessage().timestamp("choked, with another message retransmitting");
return false;
return ShouldSend.NO;
} else {
//if (state.getMessage() != null)
// state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
@@ -1491,7 +1538,7 @@ class PeerState {
//if (peer.getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
return true;
return ShouldSend.YES;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
//if (state.getMessage() != null)
@@ -1510,15 +1557,16 @@ class PeerState {
// state.getMessage().timestamp("choked, not enough available, wsize="
// + getSendWindowBytes() + " available="
// + getSendWindowBytesRemaining());
return false;
return ShouldSend.NO_BW;
}
} // nextTime <= now
return false;
return ShouldSend.NO;
}
/**
* A full ACK was received.
* TODO if messages awaiting ack were a HashSet this would be faster.
*
* @return true if the message was acked for the first time
*/
@@ -1531,6 +1579,11 @@ class PeerState {
if (state.getMessageId() == messageId) {
iter.remove();
break;
} else if (state.getPushCount() <= 0) {
// _outboundMessages is ordered, so once we get to a msg that
// hasn't been transmitted yet, we can stop
state = null;
break;
} else {
state = null;
}
@@ -1600,6 +1653,11 @@ class PeerState {
_retransmitter = null;
}
break;
} else if (state.getPushCount() <= 0) {
// _outboundMessages is ordered, so once we get to a msg that
// hasn't been transmitted yet, we can stop
state = null;
break;
} else {
state = null;
}

View File

@@ -136,13 +136,10 @@ class UDPEndpoint {
/**
* Add the packet to the outobund queue to be sent ASAP (as allowed by
* the bandwidth limiter)
*
* @return ZERO (used to be number of packets in the queue)
* BLOCKING if queue is full.
*/
public int send(UDPPacket packet) {
if (_sender == null)
return 0;
return _sender.add(packet);
public void send(UDPPacket packet) {
_sender.add(packet);
}
/**
@@ -154,4 +151,12 @@ class UDPEndpoint {
return null;
return _receiver.receiveNext();
}
/**
* Clear outbound queue, probably in preparation for sending destroy() to everybody.
* @since 0.9.2
*/
public void clearOutbound() {
_sender.clear();
}
}

View File

@@ -25,12 +25,17 @@ class UDPSender {
private final Runner _runner;
private static final int TYPE_POISON = 99999;
//private static final int MAX_QUEUED = 4;
private static final int MIN_QUEUE_SIZE = 64;
private static final int MAX_QUEUE_SIZE = 384;
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPSender.class);
_outboundQueue = new LinkedBlockingQueue();
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
_outboundQueue = new LinkedBlockingQueue(qsize);
_socket = socket;
_runner = new Runner();
_name = name;
@@ -81,6 +86,14 @@ class UDPSender {
_outboundQueue.clear();
}
/**
* Clear outbound queue, probably in preparation for sending destroy() to everybody.
* @since 0.9.2
*/
public void clear() {
_outboundQueue.clear();
}
/*********
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
return _runner.updateListeningPort(socket, newPort);
@@ -93,10 +106,9 @@ class UDPSender {
* available, if requested, otherwise it returns immediately
*
* @param blockTime how long to block IGNORED
* @return ZERO (used to be number of packets in the queue)
* @deprecated use add(packet)
*/
public int add(UDPPacket packet, int blockTime) {
public void add(UDPPacket packet, int blockTime) {
/********
//long expiration = _context.clock().now() + blockTime;
int remaining = -1;
@@ -148,31 +160,32 @@ class UDPSender {
_log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime);
return remaining;
********/
return add(packet);
add(packet);
}
private static final int MAX_HEAD_LIFETIME = 1000;
private static final int MAX_HEAD_LIFETIME = 3*1000;
/**
* Put it on the queue
* @return ZERO (used to be number of packets in the queue)
* Put it on the queue.
* BLOCKING if queue is full (backs up PacketPusher thread)
*/
public int add(UDPPacket packet) {
if (packet == null || !_keepRunning) return 0;
int size = 0;
public void add(UDPPacket packet) {
if (packet == null || !_keepRunning) return;
int psz = packet.getPacket().getLength();
if (psz > PeerState.LARGE_MTU) {
_log.error("Dropping large UDP packet " + psz + " bytes: " + packet);
return 0;
return;
}
try {
_outboundQueue.put(packet);
} catch (InterruptedException ie) {
return;
}
_outboundQueue.offer(packet);
//size = _outboundQueue.size();
//_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
if (_log.shouldLog(Log.DEBUG)) {
size = _outboundQueue.size();
_log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + packet.getLifetime());
_log.debug("Added the packet onto the queue with a lifetime of " + packet.getLifetime());
}
return size;
}
private class Runner implements Runnable {

View File

@@ -1119,17 +1119,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/**
* This sends it directly out, bypassing OutboundMessageFragments
* and the PacketPusher. The only queueing is for the bandwidth limiter.
*
* @return ZERO (used to be number of packets in the queue)
* BLOCKING if OB queue is full.
*/
int send(UDPPacket packet) {
void send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet);
return _endpoint.send(packet);
_endpoint.send(packet);
}
/**
* Send a session destroy message, bypassing OMF and PacketPusher.
* BLOCKING if OB queue is full.
*
* @since 0.8.9
*/
@@ -1145,10 +1145,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/**
* Send a session destroy message to everybody
* BLOCKING if OB queue is full.
*
* @since 0.8.9
*/
private void destroyAll() {
_endpoint.clearOutbound();
int howMany = _peersByIdent.size();
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + howMany + " peers");