diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index 2f9f9620a38a275d8804a4f54bc79e1147a7f903..958a6d4c9e1582e966cbaf5eb31c5e68b7a09144 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; import net.i2p.data.SessionKey; +import net.i2p.router.util.CDQEntry; import net.i2p.util.Log; /** @@ -16,7 +17,7 @@ import net.i2p.util.Log; * of object instances to allow rapid reuse. * */ -class UDPPacket { +class UDPPacket implements CDQEntry { private I2PAppContext _context; private final DatagramPacket _packet; private volatile short _priority; @@ -246,8 +247,12 @@ class UDPPacket { _context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, _ivBuf, len - MAC_SIZE - IV_SIZE); } - /** the UDPReceiver has tossed it onto the inbound queue */ - void enqueue() { _enqueueTime = _context.clock().now(); } + /** + * For CDQ + * @since 0.9.3 + */ + public void setEnqueueTime(long now) { _enqueueTime = now; } + /** a packet handler has pulled it off the inbound queue */ void received() { _receivedTime = _context.clock().now(); } @@ -256,8 +261,11 @@ class UDPPacket { /** a packet handler has finished parsing out the good bits */ //void afterHandling() { _afterHandlingTime = _context.clock().now(); } - /** the UDPReceiver has tossed it onto the inbound queue */ - //long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); } + /** + * For CDQ + * @since 0.9.3 + */ + public long getEnqueueTime() { return _enqueueTime; } /** a packet handler has pulled it off the inbound queue */ long getTimeSinceReceived() { return (_receivedTime > 0 ? _context.clock().now() - _receivedTime : 0); } @@ -269,8 +277,6 @@ class UDPPacket { // Following 5: All used only for stats in PacketHandler, commented out - /** when it was added to the endpoint's receive queue */ - //long getEnqueueTime() { return _enqueueTime; } /** when it was pulled off the endpoint receive queue */ //long getReceivedTime() { return _receivedTime; } /** when we began validate() */ @@ -326,6 +332,14 @@ class UDPPacket { return rv; } + /** + * For CDQ + * @since 0.9.3 + */ + public void drop() { + release(); + } + public void release() { verifyNotReleased(); _released = true; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 4500680eab9df75983ed1d5914bb7945a1c03cd5..09fb43822c787e5facf8691cde4b0aedc9a6183d 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -4,10 +4,10 @@ import java.io.IOException; import java.net.DatagramSocket; import java.util.Arrays; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; +import net.i2p.router.util.CoDelBlockingQueue; import net.i2p.util.I2PThread; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -47,7 +47,7 @@ class UDPReceiver { if (maxMemory == Long.MAX_VALUE) maxMemory = 96*1024*1024l; int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024))); - _inboundQueue = new LinkedBlockingQueue(qsize); + _inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize); _socket = socket; _transport = transport; _runner = new Runner(); @@ -177,6 +177,7 @@ class UDPReceiver { return 0; } +/**** packet.enqueue(); boolean rejected = false; int queueSize = 0; @@ -190,6 +191,7 @@ class UDPReceiver { } } if (!rejected) { +****/ try { _inboundQueue.put(packet); } catch (InterruptedException ie) { @@ -198,6 +200,7 @@ class UDPReceiver { } //return queueSize + 1; return 0; +/**** } // rejected @@ -214,6 +217,7 @@ class UDPReceiver { _log.warn(msg.toString()); } return queueSize; +****/ } /**** diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index f8eaaf5f85c939b0ba8fe29bbcfe9bc33a346827..88bdfb5b9857c2676712345682815957eece4e80 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -4,10 +4,10 @@ import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; +import net.i2p.router.util.CoDelBlockingQueue; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -35,7 +35,7 @@ class UDPSender { if (maxMemory == Long.MAX_VALUE) maxMemory = 96*1024*1024l; int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024))); - _outboundQueue = new LinkedBlockingQueue(qsize); + _outboundQueue = new CoDelBlockingQueue(ctx, "UDP-Sender", qsize); _socket = socket; _runner = new Runner(); _name = name;