From 7b70210c9a040ffdbfe6ac5ba56a3904fa32d7db Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Fri, 26 Feb 2010 16:54:41 +0000 Subject: [PATCH] * UDP Transport: - Replace the unused-since-2006 TimedWeightedPriorityMessageQueue with DummyThrottle - Don't instantiate and start TWPMQ Cleaner and OutboundRefiller threads, part of priority queues unused since 0.6.1.11 - Don't instantiate and start UDPFlooder, it is for testing only --- .../router/transport/udp/DummyThrottle.java | 32 +++++++++++++++ .../transport/udp/OutboundRefiller.java | 2 + .../TimedWeightedPriorityMessageQueue.java | 3 ++ .../i2p/router/transport/udp/UDPFlooder.java | 3 +- .../router/transport/udp/UDPTransport.java | 39 +++++++++++++------ 5 files changed, 66 insertions(+), 13 deletions(-) create mode 100644 router/java/src/net/i2p/router/transport/udp/DummyThrottle.java diff --git a/router/java/src/net/i2p/router/transport/udp/DummyThrottle.java b/router/java/src/net/i2p/router/transport/udp/DummyThrottle.java new file mode 100644 index 0000000000..383f807c59 --- /dev/null +++ b/router/java/src/net/i2p/router/transport/udp/DummyThrottle.java @@ -0,0 +1,32 @@ +package net.i2p.router.transport.udp; + +import net.i2p.data.Hash; +import net.i2p.router.OutNetMessage; + +/** + * Since the TimedWeightedPriorityMessageQueue.add() + * was disabled by jrandom in UDPTransport.java + * on 2006-02-19, and the choke/unchoke was disabled at the same time, + * all of TWPMQ is pointless, so just do this for now. + * + * It appears from his comments that it was a lock contention issue, + * so perhaps TWPMQ can be converted to concurrent and re-enabled. + * + * @since 0.7.12 + */ +public class DummyThrottle implements OutboundMessageFragments.ActiveThrottle { + + public DummyThrottle() { + } + + public void choke(Hash peer) { + } + + public void unchoke(Hash peer) { + } + + public boolean isChoked(Hash peer) { + return false; + } +} + diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java b/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java index 1f61b896c5..b2136ac8d6 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundRefiller.java @@ -9,6 +9,8 @@ import net.i2p.util.Log; * Blocking thread to grab new messages off the outbound queue and * plopping them into our active pool. * + * WARNING - UNUSED since 0.6.1.11 + * */ public class OutboundRefiller implements Runnable { private RouterContext _context; diff --git a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java index 44a26d4a4d..acd0bd0b53 100644 --- a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java +++ b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java @@ -16,6 +16,9 @@ import net.i2p.util.Log; * Weighted priority queue implementation for the outbound messages, coupled * with code to fail messages that expire. * + * WARNING - UNUSED since 0.6.1.11 + * See comments in DQAT.java and mtn history ca. 2006-02-19 + * */ public class TimedWeightedPriorityMessageQueue implements MessageQueue, OutboundMessageFragments.ActiveThrottle { private RouterContext _context; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java index ef091ff53d..38bf62246e 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java @@ -12,7 +12,8 @@ import net.i2p.util.I2PThread; // import net.i2p.util.Log; /** - * + * This sends random data to all UDP peers at a specified rate. + * It is for testing only! */ class UDPFlooder implements Runnable { private RouterContext _context; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index a7733edeef..81b2e209b9 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -135,12 +135,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority /** how many relays offered to us will we use at a time? */ public static final int PUBLIC_RELAY_COUNT = 3; + private static final boolean USE_PRIORITY = false; + /** configure the priority queue with the given split points */ private static final int PRIORITY_LIMITS[] = new int[] { 100, 200, 300, 400, 500, 1000 }; /** configure the priority queue with the given weighting per priority group */ private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; - /** should we flood all UDP peers with the configured rate? */ + /** should we flood all UDP peers with the configured rate? This is for testing only! */ private static final boolean SHOULD_FLOOD_PEERS = false; private static final int MAX_CONSECUTIVE_FAILED = 5; @@ -170,9 +172,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _dropList = new ArrayList(256); _endpoint = null; - TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); - _outboundMessages = mq; - _activeThrottle = mq; + // See comments in DQAT.java + if (USE_PRIORITY) { + TimedWeightedPriorityMessageQueue mq = new TimedWeightedPriorityMessageQueue(ctx, PRIORITY_LIMITS, PRIORITY_WEIGHT, this); + _outboundMessages = mq; + _activeThrottle = mq; + } else { + DummyThrottle mq = new DummyThrottle(); + _outboundMessages = null; + _activeThrottle = mq; + } _cachedBid = new SharedBid[BID_VALUES.length]; for (int i = 0; i < BID_VALUES.length; i++) { @@ -181,7 +190,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); - _flooder = new UDPFlooder(_context, this); + if (SHOULD_FLOOD_PEERS) + _flooder = new UDPFlooder(_context, this); _expireTimeout = EXPIRE_TIMEOUT; _expireEvent = new ExpirePeerEvent(); _testEvent = new PeerTestEvent(); @@ -276,10 +286,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_handler == null) _handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager); - if (_refiller == null) + // See comments in DQAT.java + if (USE_PRIORITY && _refiller == null) _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages); - if (_flooder == null) + if (SHOULD_FLOOD_PEERS && _flooder == null) _flooder = new UDPFlooder(_context, this); // Startup the endpoint with the requested port, check the actual port, and @@ -306,8 +317,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _inboundFragments.startup(); _pusher = new PacketPusher(_context, _fragments, _endpoint.getSender()); _pusher.startup(); - _refiller.startup(); - _flooder.startup(); + if (USE_PRIORITY) + _refiller.startup(); + if (SHOULD_FLOOD_PEERS) + _flooder.startup(); _expireEvent.setIsAlive(true); _testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future... SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon @@ -1150,10 +1163,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority msg.timestamp("enqueueing for an already established peer"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Add to fragments for " + to.toBase64()); - if (true) // skip the priority queue and go straight to the active pool - _fragments.add(msg); - else + + // See comments in DQAT.java + if (USE_PRIORITY) _outboundMessages.add(msg); + else // skip the priority queue and go straight to the active pool + _fragments.add(msg); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Establish new connection to " + to.toBase64()); -- GitLab