diff --git a/history.txt b/history.txt index b37ffd1a3..5ab60ed87 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,17 @@ +2012-08-27 zzz + * i2psnark: Notify threads awaiting DHT replies at shutdown + * Reseed: Remove forum.i2p2.de + * Streaming: Limit amount of slow-start exponential growth + * SSU: + - Limit UDPSender queue size + - Increase UDPSender max packet lifetime + - Clear UDPSender queue before sending destroys to all + - Increase PeerState queue size so large streaming windows + don't get dropped right away, especially at slow start + - Various improvements on iterating over pending outbound + messages in PeerState + * Wrapper: Update armv7 to 3.5.15 + 2012-08-27 kytv * Update Java Service Wrapper to v3.5.15. - Windows: Self-compiled with VS2010 in Windows 7. The icon has been diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 5b1ba79e3..f5c926454 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 19; + public final static long BUILD = 20; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 1a637695d..4dc7c87fa 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -42,6 +42,8 @@ class OutboundMessageState { private static final int MAX_ENTRIES = 64; /** would two caches, one for small and one for large messages, be better? */ private static final ByteCache _cache = ByteCache.getInstance(MAX_ENTRIES, MAX_MSG_SIZE); + + private static final long EXPIRATION = 10*1000; public OutboundMessageState(I2PAppContext context) { _context = context; @@ -64,6 +66,7 @@ class OutboundMessageState { /** * Called from UDPTransport + * TODO make two constructors, remove this, and make more things final * @return success */ public boolean initialize(I2NPMessage msg, PeerState peer) { @@ -82,6 +85,7 @@ class OutboundMessageState { /** * Called from OutboundMessageFragments + * TODO make two constructors, remove this, and make more things final * @return success */ public boolean initialize(OutNetMessage m, I2NPMessage msg) { @@ -121,7 +125,7 @@ class OutboundMessageState { _startedOn = _context.clock().now(); _nextSendTime = _startedOn; - _expiration = _startedOn + 10*1000; + _expiration = _startedOn + EXPIRATION; //_expiration = msg.getExpiration(); if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 7b4d7f3e2..959c7a07c 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -38,7 +38,7 @@ class PacketPusher implements Runnable { if (packets != null) { for (int i = 0; i < packets.length; i++) { if (packets[i] != null) // null for ACKed fragments - //_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms + // BLOCKING if queue is full _sender.add(packets[i]); } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 96b40db43..e93859949 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -217,6 +217,13 @@ class PeerState { private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; + /** + * Was 32 before 0.9.2, but since the streaming lib goes up to 128, + * we would just drop our own msgs right away during slow start. + * May need to adjust based on memory. + */ + private static final int MAX_SEND_MSGS_PENDING = 128; + /* * 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message, * 522 fragment bytes, which is enough to send a tunnel data message in 2 @@ -1181,6 +1188,14 @@ class PeerState { RemoteHostId getRemoteHostId() { return _remoteHostId; } + /** + * TODO should this use a queue, separate from the list of msgs pending an ack? + * TODO bring back tail drop? + * TODO priority queue? (we don't implement priorities in SSU now) + * TODO backlog / pushback / block instead of dropping? Can't really block here. + * TODO SSU does not support isBacklogged() now + * @return total pending messages + */ public int add(OutboundMessageState state) { if (_dead) { _transport.failed(state, false); @@ -1193,8 +1208,8 @@ class PeerState { boolean fail = false; synchronized (_outboundMessages) { rv = _outboundMessages.size() + 1; - if (rv > 32) { - // 32 queued messages? to *one* peer? nuh uh. + if (rv > MAX_SEND_MSGS_PENDING) { + // too many queued messages to one peer? nuh uh. fail = true; rv--; @@ -1240,8 +1255,11 @@ class PeerState { _outboundMessages.add(state); } } - if (fail) + if (fail) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping msg, OB queue full for " + toString()); _transport.failed(state, false); + } return rv; } @@ -1278,6 +1296,10 @@ class PeerState { /** * Expire / complete any outbound messages + * High usage - + * OutboundMessageFragments.getNextVolley() calls this 1st. + * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times. + * * @return number of active outbound messages remaining */ public int finishMessages() { @@ -1350,14 +1372,20 @@ class PeerState { /** * Pick a message we want to send and allocate it out of our window - * @return allocated message to send, or null if no messages or no resources + * High usage - + * OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned > 0. + * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times. * + * @return allocated message to send, or null if no messages or no resources */ public OutboundMessageState allocateSend() { if (_dead) return null; synchronized (_outboundMessages) { for (OutboundMessageState state : _outboundMessages) { - if (locked_shouldSend(state)) { + // We have 3 return values, because if allocateSendingBytes() returns false, + // then we can stop iterating. + ShouldSend should = locked_shouldSend(state); + if (should == ShouldSend.YES) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId()); /* @@ -1369,6 +1397,12 @@ class PeerState { } */ return state; + } else if (should == ShouldSend.NO_BW) { + // no more bandwidth available + // we don't bother looking for a smaller msg that would fit. + // By not looking further, we keep strict sending order, and that allows + // some efficiency in acked() below. + break; } /* else { OutNetMessage msg = state.getMessage(); if (msg != null) @@ -1382,6 +1416,10 @@ class PeerState { } /** + * High usage - + * OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null. + * TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times. + * * @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send. * If ready now, will return 0 or a negative value. */ @@ -1396,6 +1434,9 @@ class PeerState { } for (OutboundMessageState state : _outboundMessages) { int delay = (int)(state.getNextSendTime() - now); + // short circuit once we hit something ready to go + if (delay <= 0) + return delay; if (delay < rv) rv = delay; } @@ -1435,7 +1476,13 @@ class PeerState { return mtu - (PacketBuilder.MIN_DATA_PACKET_OVERHEAD + MIN_ACK_SIZE); } - private boolean locked_shouldSend(OutboundMessageState state) { + private enum ShouldSend { YES, NO, NO_BW }; + + /** + * Have 3 return values, because if allocateSendingBytes() returns false, + * then allocateSend() can stop iterating + */ + private ShouldSend locked_shouldSend(OutboundMessageState state) { long now = _context.clock().now(); if (state.getNextSendTime() <= now) { if (!state.isFragmented()) { @@ -1465,7 +1512,7 @@ class PeerState { } else if ( (max <= 0) || (THROTTLE_RESENDS) ) { //if (state.getMessage() != null) // state.getMessage().timestamp("choked, with another message retransmitting"); - return false; + return ShouldSend.NO; } else { //if (state.getMessage() != null) // state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); @@ -1491,7 +1538,7 @@ class PeerState { //if (peer.getSendWindowBytesRemaining() > 0) // _throttle.unchoke(peer.getRemotePeer()); - return true; + return ShouldSend.YES; } else { _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); //if (state.getMessage() != null) @@ -1510,15 +1557,16 @@ class PeerState { // state.getMessage().timestamp("choked, not enough available, wsize=" // + getSendWindowBytes() + " available=" // + getSendWindowBytesRemaining()); - return false; + return ShouldSend.NO_BW; } } // nextTime <= now - return false; + return ShouldSend.NO; } /** * A full ACK was received. + * TODO if messages awaiting ack were a HashSet this would be faster. * * @return true if the message was acked for the first time */ @@ -1531,6 +1579,11 @@ class PeerState { if (state.getMessageId() == messageId) { iter.remove(); break; + } else if (state.getPushCount() <= 0) { + // _outboundMessages is ordered, so once we get to a msg that + // hasn't been transmitted yet, we can stop + state = null; + break; } else { state = null; } @@ -1600,6 +1653,11 @@ class PeerState { _retransmitter = null; } break; + } else if (state.getPushCount() <= 0) { + // _outboundMessages is ordered, so once we get to a msg that + // hasn't been transmitted yet, we can stop + state = null; + break; } else { state = null; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 9531335b4..7bdf756f4 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -136,13 +136,10 @@ class UDPEndpoint { /** * Add the packet to the outobund queue to be sent ASAP (as allowed by * the bandwidth limiter) - * - * @return ZERO (used to be number of packets in the queue) + * BLOCKING if queue is full. */ - public int send(UDPPacket packet) { - if (_sender == null) - return 0; - return _sender.add(packet); + public void send(UDPPacket packet) { + _sender.add(packet); } /** @@ -154,4 +151,12 @@ class UDPEndpoint { return null; return _receiver.receiveNext(); } + + /** + * Clear outbound queue, probably in preparation for sending destroy() to everybody. + * @since 0.9.2 + */ + public void clearOutbound() { + _sender.clear(); + } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index df017fedf..f8eaaf5f8 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -25,12 +25,17 @@ class UDPSender { private final Runner _runner; private static final int TYPE_POISON = 99999; - //private static final int MAX_QUEUED = 4; + private static final int MIN_QUEUE_SIZE = 64; + private static final int MAX_QUEUE_SIZE = 384; public UDPSender(RouterContext ctx, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPSender.class); - _outboundQueue = new LinkedBlockingQueue(); + long maxMemory = Runtime.getRuntime().maxMemory(); + if (maxMemory == Long.MAX_VALUE) + maxMemory = 96*1024*1024l; + int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024))); + _outboundQueue = new LinkedBlockingQueue(qsize); _socket = socket; _runner = new Runner(); _name = name; @@ -81,6 +86,14 @@ class UDPSender { _outboundQueue.clear(); } + /** + * Clear outbound queue, probably in preparation for sending destroy() to everybody. + * @since 0.9.2 + */ + public void clear() { + _outboundQueue.clear(); + } + /********* public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { return _runner.updateListeningPort(socket, newPort); @@ -93,10 +106,9 @@ class UDPSender { * available, if requested, otherwise it returns immediately * * @param blockTime how long to block IGNORED - * @return ZERO (used to be number of packets in the queue) * @deprecated use add(packet) */ - public int add(UDPPacket packet, int blockTime) { + public void add(UDPPacket packet, int blockTime) { /******** //long expiration = _context.clock().now() + blockTime; int remaining = -1; @@ -148,31 +160,32 @@ class UDPSender { _log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime); return remaining; ********/ - return add(packet); + add(packet); } - private static final int MAX_HEAD_LIFETIME = 1000; + private static final int MAX_HEAD_LIFETIME = 3*1000; /** - * Put it on the queue - * @return ZERO (used to be number of packets in the queue) + * Put it on the queue. + * BLOCKING if queue is full (backs up PacketPusher thread) */ - public int add(UDPPacket packet) { - if (packet == null || !_keepRunning) return 0; - int size = 0; + public void add(UDPPacket packet) { + if (packet == null || !_keepRunning) return; int psz = packet.getPacket().getLength(); if (psz > PeerState.LARGE_MTU) { _log.error("Dropping large UDP packet " + psz + " bytes: " + packet); - return 0; + return; + } + try { + _outboundQueue.put(packet); + } catch (InterruptedException ie) { + return; } - _outboundQueue.offer(packet); //size = _outboundQueue.size(); //_context.statManager().addRateData("udp.sendQueueSize", size, lifetime); if (_log.shouldLog(Log.DEBUG)) { - size = _outboundQueue.size(); - _log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + packet.getLifetime()); + _log.debug("Added the packet onto the queue with a lifetime of " + packet.getLifetime()); } - return size; } private class Runner implements Runnable { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 970801cb6..20f08eb41 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1119,17 +1119,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** * This sends it directly out, bypassing OutboundMessageFragments * and the PacketPusher. The only queueing is for the bandwidth limiter. - * - * @return ZERO (used to be number of packets in the queue) + * BLOCKING if OB queue is full. */ - int send(UDPPacket packet) { + void send(UDPPacket packet) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending packet " + packet); - return _endpoint.send(packet); + _endpoint.send(packet); } /** * Send a session destroy message, bypassing OMF and PacketPusher. + * BLOCKING if OB queue is full. * * @since 0.8.9 */ @@ -1145,10 +1145,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** * Send a session destroy message to everybody + * BLOCKING if OB queue is full. * * @since 0.8.9 */ private void destroyAll() { + _endpoint.clearOutbound(); int howMany = _peersByIdent.size(); if (_log.shouldLog(Log.WARN)) _log.warn("Sending destroy to : " + howMany + " peers");