diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index b74bdb538a999a1990fc32756736a6079bdb92cc..f0ef8df08586a9aac1a6020bc9fe3e919ced9630 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -65,6 +65,7 @@ public class Connection { private Object _connectLock; /** how many messages have been resent and not yet ACKed? */ private int _activeResends; + private ConEvent _connectionEvent; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -116,6 +117,7 @@ public class Connection { _connectLock = new Object(); _activeResends = 0; _resetSentOn = -1; + _connectionEvent = new ConEvent(); _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -806,6 +808,24 @@ public class Connection { buf.append("]"); return buf.toString(); } + + public SimpleTimer.TimedEvent getConnectionEvent() { return _connectionEvent; } + + /** + * fired to reschedule event notification + */ + class ConEvent implements SimpleTimer.TimedEvent { + private Exception _addedBy; + public ConEvent() { + //_addedBy = new Exception("added by"); + } + public void timeReached() { + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("firing event on " + _connection, _addedBy); + eventOccurred(); + } + public String toString() { return "event on " + Connection.this.toString(); } + } /** * Coordinate the resends of a given packet diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java index f907bc173005a1e920bac6b274a92ee470ccee4e..4ee5440a8520623455d619d305bf84e99d2f8bba 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -17,21 +17,6 @@ abstract class SchedulerImpl implements TaskScheduler { } protected void reschedule(long msToWait, Connection con) { - SimpleTimer.getInstance().addEvent(new ConEvent(con), msToWait); - } - - private class ConEvent implements SimpleTimer.TimedEvent { - private Connection _connection; - private Exception _addedBy; - public ConEvent(Connection con) { - _connection = con; - //_addedBy = new Exception("added by"); - } - public void timeReached() { - //if (_log.shouldLog(Log.DEBUG)) - // _log.debug("firing event on " + _connection, _addedBy); - _connection.eventOccurred(); - } - public String toString() { return "event on " + _connection; } + SimpleTimer.getInstance().addEvent(con.getConnectionEvent(), msToWait); } } diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 13931314ade06cda78b2da2144c888dee8e4323c..0c3c809c2e671082e0cdd13016489f574c63fe73 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -45,7 +45,10 @@ public class SimpleTimer { } /** - * Queue up the given event to be fired no sooner than timeoutMs from now + * Queue up the given event to be fired no sooner than timeoutMs from now. + * However, if this event is already scheduled, the event will be scheduled + * for the earlier of the two timeouts, which may be before this stated + * timeout. If this is not the desired behavior, call removeEvent first. * */ public void addEvent(TimedEvent event, long timeoutMs) { @@ -55,8 +58,15 @@ public class SimpleTimer { Long time = new Long(eventTime); synchronized (_events) { // remove the old scheduled position, then reinsert it - if (_eventTimes.containsKey(event)) - _events.remove(_eventTimes.get(event)); + Long oldTime = (Long)_eventTimes.get(event); + if (oldTime != null) { + if (oldTime.longValue() < eventTime) { + _events.notifyAll(); + return; // already scheduled for sooner than requested + } else { + _events.remove(oldTime); + } + } while (_events.containsKey(time)) time = new Long(time.longValue() + 1); _events.put(time, event); diff --git a/history.txt b/history.txt index e79dc3f269a30950978576832fa2b25cd914dae2..0a219297fbfd96c472301cda457afb87b26ad8bf 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,7 @@ -$Id: history.txt,v 1.200 2005/04/28 16:54:27 jrandom Exp $ +$Id: history.txt,v 1.201 2005/04/29 01:24:15 jrandom Exp $ + +2005-04-30 jrandom + * Reduced some SimpleTimer churn 2005-04-29 jrandom * Reduce the peer profile stat coallesce overhead by inlining it with the diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 60d318e5e4786964cfa92b79f3b20000b82ee1f5..6d6fde89f3f5d7a4bb8d8e665932dda0c8ec6892 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.191 $ $Date: 2005/04/28 16:54:28 $"; + public final static String ID = "$Revision: 1.192 $ $Date: 2005/04/29 01:24:15 $"; public final static String VERSION = "0.5.0.7"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index aaa99447bd87580d0c32e5d29896fb15f03f1780..ff9fc431de75b4c260aa3b1389363afa29e4b8c3 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -22,7 +22,7 @@ public class ACKSender implements Runnable { private boolean _alive; /** how frequently do we want to send ACKs to a peer? */ - static final int ACK_FREQUENCY = 400; + static final int ACK_FREQUENCY = 200; public ACKSender(RouterContext ctx, UDPTransport transport) { _context = ctx; diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 4d0fc2571403b725bb2d759b69d6db4f67b4d880..d411df370c209551d19b74dbf3a030fcd32a2e73 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -25,6 +25,7 @@ public class OutboundMessageFragments { private RouterContext _context; private Log _log; private UDPTransport _transport; + private ActiveThrottle _throttle; /** OutboundMessageState for messages being sent */ private List _activeMessages; private boolean _alive; @@ -38,10 +39,11 @@ public class OutboundMessageFragments { // don't send a packet more than 10 times static final int MAX_VOLLEYS = 10; - public OutboundMessageFragments(RouterContext ctx, UDPTransport transport) { + public OutboundMessageFragments(RouterContext ctx, UDPTransport transport, ActiveThrottle throttle) { _context = ctx; _log = ctx.logManager().getLog(OutboundMessageFragments.class); _transport = transport; + _throttle = throttle; _activeMessages = new ArrayList(MAX_ACTIVE); _nextPacketMessage = 0; _builder = new PacketBuilder(ctx); @@ -130,6 +132,8 @@ public class OutboundMessageFragments { if (state.isComplete()) { _activeMessages.remove(i); _transport.succeeded(state.getMessage()); + if (state.getPeer().getSendWindowBytesRemaining() > 0) + _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); if (i < _nextPacketMessage) { _nextPacketMessage--; @@ -149,6 +153,8 @@ public class OutboundMessageFragments { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send an expired direct message: " + state); } + if (state.getPeer().getSendWindowBytesRemaining() > 0) + _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); if (i < _nextPacketMessage) { _nextPacketMessage--; @@ -170,6 +176,8 @@ public class OutboundMessageFragments { if (_log.shouldLog(Log.WARN)) _log.warn("Unable to send a direct message after too many volleys: " + state); } + if (state.getPeer().getSendWindowBytesRemaining() > 0) + _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); if (i < _nextPacketMessage) { _nextPacketMessage--; @@ -185,17 +193,17 @@ public class OutboundMessageFragments { private static final long SECOND_MASK = 1023l; - /** - * Grab the next packet that we want to send, blocking until one is ready. - * This is the main driver for the packet scheduler + * Fetch all the packets for a message volley, blocking until there is a + * message which can be fully transmitted (or the transport is shut down). + * The returned array may be sparse, with null packets taking the place of + * already ACKed fragments. * */ - public UDPPacket getNextPacket() { + public UDPPacket[] getNextVolley() { PeerState peer = null; OutboundMessageState state = null; - int currentFragment = -1; - while (_alive && (currentFragment < 0) ) { + while (_alive && (state == null) ) { long now = _context.clock().now(); long nextSend = -1; finishMessages(); @@ -203,88 +211,36 @@ public class OutboundMessageFragments { for (int i = 0; i < _activeMessages.size(); i++) { int cur = (i + _nextPacketMessage) % _activeMessages.size(); state = (OutboundMessageState)_activeMessages.get(cur); - if (state.getNextSendTime() <= now) { - peer = state.getPeer(); // known if this is immediately after establish - if (peer == null) - peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash()); - + peer = state.getPeer(); // known if this is immediately after establish + if (peer == null) + peer = _transport.getPeerState(state.getMessage().getTarget().getIdentity().calculateHash()); + + if ((peer != null) && locked_shouldSend(state, peer)) { + // for fairness, we move on in a round robin + _nextPacketMessage = i + 1; + break; + } else { if (peer == null) { // peer disconnected _activeMessages.remove(cur); _transport.failed(state); if (_log.shouldLog(Log.WARN)) _log.warn("Peer disconnected for " + state); + if (state.getPeer().getSendWindowBytesRemaining() > 0) + _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); i--; - } else { - if (!state.isFragmented()) { - state.fragment(fragmentSize(peer.getMTU())); - - if (_log.shouldLog(Log.INFO)) - _log.info("Fragmenting " + state); - } - - int oldVolley = state.getPushCount(); - // pickNextFragment increments the pushCount every - // time we cycle through all of the packets - currentFragment = state.pickNextFragment(); - - int fragmentSize = state.fragmentSize(currentFragment); - if (peer.allocateSendingBytes(fragmentSize)) { - state.incrementCurrentFragment(); - if (_log.shouldLog(Log.INFO)) - _log.info("Allocation of " + fragmentSize + " allowed with " - + peer.getSendWindowBytesRemaining() - + "/" + peer.getSendWindowBytes() - + " remaining" - + " for message " + state.getMessageId() + ": " + state); - - if (state.justBeganVolley() && (state.getPushCount() > 0) && (state.getFragmentCount() > 1)) { - peer.messageRetransmitted(); - if (_log.shouldLog(Log.WARN)) - _log.warn("Retransmitting " + state + " to " + peer); - } - - // for fairness, we move on in a round robin - //_nextPacketMessage = i + 1; - - if (currentFragment >= state.getFragmentCount() - 1) { - // this is the last fragment - _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); - if (state.getPeer() != null) { - int rto = state.getPeer().getRTO() * state.getPushCount(); - state.setNextSendTime(now + rto); - } else { - if (_log.shouldLog(Log.ERROR)) - _log.error("changed volley, unknown peer"); - state.setNextSendTime(now + 1000 + _context.random().nextInt(2000)); - } - // only move on in round robin after sending a full volley - _nextPacketMessage = (i + 1) % _activeMessages.size(); - } else { - if (peer.getSendWindowBytesRemaining() > 0) - state.setNextSendTime(now); - else - state.setNextSendTime((now + 1024) & ~SECOND_MASK); - } - break; - } else { - _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes() - + " available=" + peer.getSendWindowBytesRemaining() - + " for message " + state.getMessageId() + ": " + state); - state.setNextSendTime((now + 1024) & ~SECOND_MASK); - currentFragment = -1; - } } - } - long time = state.getNextSendTime(); - if ( (nextSend < 0) || (time < nextSend) ) - nextSend = time; + + long time = state.getNextSendTime(); + if ( (nextSend < 0) || (time < nextSend) ) + nextSend = time; + state = null; + peer = null; + } } // end of the for(activeMessages) - if (currentFragment < 0) { + if (state == null) { if (nextSend <= 0) { try { _activeMessages.notifyAll(); @@ -309,12 +265,73 @@ public class OutboundMessageFragments { _allowExcess = false; } // end of the synchronized block } // end of the while (alive && !found) + + return preparePackets(state, peer); + } + + private boolean locked_shouldSend(OutboundMessageState state, PeerState peer) { + long now = _context.clock().now(); + if (state.getNextSendTime() <= now) { + if (!state.isFragmented()) { + state.fragment(fragmentSize(peer.getMTU())); + + if (_log.shouldLog(Log.INFO)) + _log.info("Fragmenting " + state); + } + + int size = state.getUnackedSize(); + if (peer.allocateSendingBytes(size)) { + if (_log.shouldLog(Log.INFO)) + _log.info("Allocation of " + size + " allowed with " + + peer.getSendWindowBytesRemaining() + + "/" + peer.getSendWindowBytes() + + " remaining" + + " for message " + state.getMessageId() + ": " + state); + + if (state.getPushCount() > 0) { + peer.messageRetransmitted(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Retransmitting " + state + " to " + peer); + _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); + } + + state.push(); + + int rto = peer.getRTO() * state.getPushCount(); + state.setNextSendTime(now + rto); + + if (peer.getSendWindowBytesRemaining() > 0) + _throttle.unchoke(peer.getRemotePeer()); + return true; + } else { + _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Allocation of " + size + " rejected w/ wsize=" + peer.getSendWindowBytes() + + " available=" + peer.getSendWindowBytesRemaining() + + " for message " + state.getMessageId() + ": " + state); + state.setNextSendTime((now + 1024) & ~SECOND_MASK); + if (_log.shouldLog(Log.WARN)) + _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); + _throttle.choke(peer.getRemotePeer()); + } + } // nextTime <= now - if (currentFragment >= 0) { + return false; + } + + private UDPPacket[] preparePackets(OutboundMessageState state, PeerState peer) { + if (state != null) { + int fragments = state.getFragmentCount(); + if (fragments < 0) + return null; + if (_log.shouldLog(Log.INFO)) - _log.info("Building packet for fragment " + currentFragment - + " of " + state + " to " + peer); - UDPPacket rv = _builder.buildPacket(state, currentFragment, peer); + _log.info("Building packet for " + state + " to " + peer); + UDPPacket rv[] = new UDPPacket[fragments]; //sparse + for (int i = 0; i < fragments; i++) { + if (state.needsSending(i)) + rv[i] = _builder.buildPacket(state, i, peer); + } return rv; } else { // !alive @@ -384,6 +401,8 @@ public class OutboundMessageFragments { } else { _log.warn("message acked, but no peer attacked: " + state); } + if (state.getPeer().getSendWindowBytesRemaining() > 0) + _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); return numFragments; } else { @@ -434,7 +453,15 @@ public class OutboundMessageFragments { _context.statManager().addRateData("udp.sendConfirmTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); _transport.succeeded(state.getMessage()); + if (state.getPeer().getSendWindowBytesRemaining() > 0) + _throttle.unchoke(state.getPeer().getRemotePeer()); state.releaseResources(); } } + + public interface ActiveThrottle { + public void choke(Hash peer); + public void unchoke(Hash peer); + public boolean isChoked(Hash peer); + } } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 221032357e9821e3a14fa2cd138f6fa1f60b5c10..be99b325775b2b0934f65fa14ba817a0556b5df4 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -114,6 +114,29 @@ public class OutboundMessageState { // nothing else pending ack return true; } + public synchronized int getUnackedSize() { + int rv = 0; + if ( (_messageBuf != null) && (_fragmentSends != null) ) { + int totalSize = _messageBuf.getValid(); + int lastSize = totalSize % _fragmentSize; + if (lastSize == 0) + lastSize = _fragmentSize; + for (int i = 0; i < _fragmentSends.length; i++) { + if (_fragmentSends[i] >= (short)0) { + if (i + 1 == _fragmentSends.length) + rv += lastSize; + else + rv += _fragmentSize; + } + } + } + return rv; + } + public synchronized boolean needsSending(int fragment) { + if ( (_fragmentSends == null) || (fragment >= _fragmentSends.length) || (fragment < 0) ) + return false; + return (_fragmentSends[fragment] >= (short)0); + } public long getLifetime() { return _context.clock().now() - _startedOn; } /** @@ -133,7 +156,16 @@ public class OutboundMessageState { public int getMaxSends() { return _maxSends; } public int getPushCount() { return _pushCount; } /** note that we have pushed the message fragments */ - public void push() { _pushCount++; } + public synchronized void push() { + _pushCount++; + if (_pushCount > _maxSends) + _maxSends = (short)_pushCount; + if (_fragmentSends != null) + for (int i = 0; i < _fragmentSends.length; i++) + if (_fragmentSends[i] >= (short)0) + _fragmentSends[i] = (short)(1 + _fragmentSends[i]); + + } public boolean isFragmented() { return _fragmentSends != null; } /** * Prepare the message for fragmented delivery, using no more than diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index 7ac81999183c0f33145ed17bab02c6daefd8e54d..b4e620983d6ded054f2ede5073ba6c450e60f037 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -35,9 +35,13 @@ public class PacketPusher implements Runnable { public void run() { while (_alive) { - UDPPacket packet = _fragments.getNextPacket(); - if (packet != null) - _sender.add(packet, 1000); // blocks for up to a second + UDPPacket packets[] = _fragments.getNextVolley(); + if (packets != null) { + for (int i = 0; i < packets.length; i++) { + if (packets[i] != null) // null for ACKed fragments + _sender.add(packets[i], 1000); // blocks for up to a second + } + } } } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index e160245764ee1488a203b58e041d755199466924..e84c1f39a63792323a9decb72d5d3de3dd3e5fab 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -136,7 +136,7 @@ public class PeerState { private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; private static final int DEFAULT_MTU = 1472; - private static final int MIN_RTO = ACKSender.ACK_FREQUENCY + 100; + private static final int MIN_RTO = 600; private static final int MAX_RTO = 5000; public PeerState(I2PAppContext ctx) { @@ -179,6 +179,8 @@ public class PeerState { _rto = 6000; _messagesReceived = 0; _messagesSent = 0; + _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); } /** @@ -425,6 +427,8 @@ public class PeerState { return false; // only shrink once every 10 seconds _lastCongestionOccurred = now; + _context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes, _sendBps); + //if (true) // _sendWindowBytes -= 10000; //else @@ -469,6 +473,12 @@ public class PeerState { if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES) _sendWindowBytes = MAX_SEND_WINDOW_BYTES; _lastReceiveTime = _context.clock().now(); + + if (_sendWindowBytesRemaining + bytesACKed <= _sendWindowBytes) + _sendWindowBytesRemaining += bytesACKed; + else + _sendWindowBytesRemaining = _sendWindowBytes; + _messagesSent++; if (numSends <= 2) recalculateTimeouts(lifetime); @@ -492,6 +502,7 @@ public class PeerState { /** we are resending a packet, so lets jack up the rto */ public void messageRetransmitted() { congestionOccurred(); + _context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation); //_rto *= 2; } /** how long does it usually take to get a message ACKed? */ diff --git a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java index 335dd548c62d32419ca484834a88ffd71bf9bae6..6334ac303e40b3a7520929a25c291025a68c1582 100644 --- a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java +++ b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java @@ -2,8 +2,11 @@ package net.i2p.router.transport.udp; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import net.i2p.data.Hash; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; @@ -14,7 +17,7 @@ import net.i2p.util.Log; * with code to fail messages that expire. * */ -public class TimedWeightedPriorityMessageQueue implements MessageQueue { +public class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle { private RouterContext _context; private Log _log; /** FIFO queue of messages in a particular priority */ @@ -39,6 +42,8 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue { private volatile boolean _addedSincePassBegan; private Expirer _expirer; private FailedListener _listener; + /** set of peers (Hash) whose congestion window is exceeded in the active queue */ + private Set _chokedPeers; /** * Build up a new queue @@ -69,6 +74,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue { _alive = true; _nextLock = this; _nextQueue = 0; + _chokedPeers = new HashSet(16); _listener = lsnr; _context.statManager().createRateStat("udp.timeToEntrance", "Message lifetime until it reaches the UDP system", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.messageQueueSize", "How many messages are on the current class queue at removal", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -114,8 +120,16 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue { for (int i = 0; i < _queue.length; i++) { int currentQueue = (_nextQueue + i) % _queue.length; synchronized (_queue[currentQueue]) { - if (_queue[currentQueue].size() > 0) { - OutNetMessage msg = (OutNetMessage)_queue[currentQueue].remove(0); + for (int j = 0; j < _queue[currentQueue].size(); j++) { + OutNetMessage msg = (OutNetMessage)_queue[currentQueue].get(j); + Hash to = msg.getTarget().getIdentity().getHash(); + synchronized (_nextLock) { // yikes! + if (_chokedPeers.contains(to)) + continue; + } + // not choked, lets push it to active + _queue[currentQueue].remove(j); + long size = msg.getMessageSize(); _bytesQueued[currentQueue] -= size; _bytesTransferred[currentQueue] += size; @@ -129,12 +143,12 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue { _log.debug("Pulling a message off queue " + currentQueue + " with " + _queue[currentQueue].size() + " remaining"); return msg; - } else { - // nothing waiting - _messagesFlushed[currentQueue] = 0; - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Nothing on queue " + currentQueue); } + + // nothing waiting, or only choked peers + _messagesFlushed[currentQueue] = 0; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Nothing available on queue " + currentQueue); } } @@ -173,6 +187,26 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue { } } + public void choke(Hash peer) { + if (true) return; + synchronized (_nextLock) { + _chokedPeers.add(peer); + _nextLock.notifyAll(); + } + } + public void unchoke(Hash peer) { + if (true) return; + synchronized (_nextLock) { + _chokedPeers.remove(peer); + _nextLock.notifyAll(); + } + } + public boolean isChoked(Hash peer) { + synchronized (_nextLock) { + return _chokedPeers.contains(peer); + } + } + private int pickQueue(OutNetMessage message) { int target = message.getPriority(); for (int i = 0; i < _priorityLimits.length; i++) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index c46457a8f01e918afb9aaa63dce97fa7b93abcfb..8e026eacd72029050351b8df3602f9d23f850f77 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -48,6 +48,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private EstablishmentManager _establisher; private MessageQueue _outboundMessages; private OutboundMessageFragments _fragments; + private OutboundMessageFragments.ActiveThrottle _activeThrottle; private OutboundRefiller _refiller; private PacketPusher _pusher; private InboundMessageFragments _inboundFragments; @@ -101,13 +102,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _peersByRelayTag = new HashMap(128); _endpoint = null; - _outboundMessages = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); + TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); + _outboundMessages = mq; + _activeThrottle = mq; _relayPeers = new ArrayList(1); _fastBid = new SharedBid(50); _slowBid = new SharedBid(1000); - _fragments = new OutboundMessageFragments(_context, this); + _fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); _flooder = new UDPFlooder(_context, this); @@ -314,6 +317,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + _activeThrottle.unchoke(peer.getRemotePeer()); _context.shitlist().unshitlistRouter(peer.getRemotePeer()); if (SHOULD_FLOOD_PEERS) @@ -341,6 +345,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + // unchoke 'em, but just because we'll never talk again... + _activeThrottle.unchoke(peer.getRemotePeer()); + if (SHOULD_FLOOD_PEERS) _flooder.removePeer(peer); } @@ -569,6 +576,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } buf.append(':').append(peer.getRemotePort()); buf.append("</a>"); + if (_activeThrottle.isChoked(peer.getRemotePeer())) + buf.append(" [choked]"); if (peer.getConsecutiveFailedSends() > 0) buf.append(" [").append(peer.getConsecutiveFailedSends()).append(" failures]"); buf.append("</td>");