forked from I2P_Developers/i2p.i2p
* SSU: Move UDPSender and UDPReceiver queues to CoDel
This commit is contained in:
@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||
import net.i2p.I2PAppContext;
|
||||
import net.i2p.data.DataHelper;
|
||||
import net.i2p.data.SessionKey;
|
||||
import net.i2p.router.util.CDQEntry;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
/**
|
||||
@@ -16,7 +17,7 @@ import net.i2p.util.Log;
|
||||
* of object instances to allow rapid reuse.
|
||||
*
|
||||
*/
|
||||
class UDPPacket {
|
||||
class UDPPacket implements CDQEntry {
|
||||
private I2PAppContext _context;
|
||||
private final DatagramPacket _packet;
|
||||
private volatile short _priority;
|
||||
@@ -246,8 +247,12 @@ class UDPPacket {
|
||||
_context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, _ivBuf, len - MAC_SIZE - IV_SIZE);
|
||||
}
|
||||
|
||||
/** the UDPReceiver has tossed it onto the inbound queue */
|
||||
void enqueue() { _enqueueTime = _context.clock().now(); }
|
||||
/**
|
||||
* For CDQ
|
||||
* @since 0.9.3
|
||||
*/
|
||||
public void setEnqueueTime(long now) { _enqueueTime = now; }
|
||||
|
||||
/** a packet handler has pulled it off the inbound queue */
|
||||
void received() { _receivedTime = _context.clock().now(); }
|
||||
|
||||
@@ -256,8 +261,11 @@ class UDPPacket {
|
||||
/** a packet handler has finished parsing out the good bits */
|
||||
//void afterHandling() { _afterHandlingTime = _context.clock().now(); }
|
||||
|
||||
/** the UDPReceiver has tossed it onto the inbound queue */
|
||||
//long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); }
|
||||
/**
|
||||
* For CDQ
|
||||
* @since 0.9.3
|
||||
*/
|
||||
public long getEnqueueTime() { return _enqueueTime; }
|
||||
|
||||
/** a packet handler has pulled it off the inbound queue */
|
||||
long getTimeSinceReceived() { return (_receivedTime > 0 ? _context.clock().now() - _receivedTime : 0); }
|
||||
@@ -269,8 +277,6 @@ class UDPPacket {
|
||||
|
||||
// Following 5: All used only for stats in PacketHandler, commented out
|
||||
|
||||
/** when it was added to the endpoint's receive queue */
|
||||
//long getEnqueueTime() { return _enqueueTime; }
|
||||
/** when it was pulled off the endpoint receive queue */
|
||||
//long getReceivedTime() { return _receivedTime; }
|
||||
/** when we began validate() */
|
||||
@@ -326,6 +332,14 @@ class UDPPacket {
|
||||
return rv;
|
||||
}
|
||||
|
||||
/**
|
||||
* For CDQ
|
||||
* @since 0.9.3
|
||||
*/
|
||||
public void drop() {
|
||||
release();
|
||||
}
|
||||
|
||||
public void release() {
|
||||
verifyNotReleased();
|
||||
_released = true;
|
||||
|
||||
@@ -4,10 +4,10 @@ import java.io.IOException;
|
||||
import java.net.DatagramSocket;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.router.util.CoDelBlockingQueue;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
import net.i2p.util.SimpleTimer;
|
||||
@@ -47,7 +47,7 @@ class UDPReceiver {
|
||||
if (maxMemory == Long.MAX_VALUE)
|
||||
maxMemory = 96*1024*1024l;
|
||||
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
|
||||
_inboundQueue = new LinkedBlockingQueue(qsize);
|
||||
_inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize);
|
||||
_socket = socket;
|
||||
_transport = transport;
|
||||
_runner = new Runner();
|
||||
@@ -177,6 +177,7 @@ class UDPReceiver {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/****
|
||||
packet.enqueue();
|
||||
boolean rejected = false;
|
||||
int queueSize = 0;
|
||||
@@ -190,6 +191,7 @@ class UDPReceiver {
|
||||
}
|
||||
}
|
||||
if (!rejected) {
|
||||
****/
|
||||
try {
|
||||
_inboundQueue.put(packet);
|
||||
} catch (InterruptedException ie) {
|
||||
@@ -198,6 +200,7 @@ class UDPReceiver {
|
||||
}
|
||||
//return queueSize + 1;
|
||||
return 0;
|
||||
/****
|
||||
}
|
||||
|
||||
// rejected
|
||||
@@ -214,6 +217,7 @@ class UDPReceiver {
|
||||
_log.warn(msg.toString());
|
||||
}
|
||||
return queueSize;
|
||||
****/
|
||||
}
|
||||
|
||||
/****
|
||||
|
||||
@@ -4,10 +4,10 @@ import java.io.IOException;
|
||||
import java.net.DatagramPacket;
|
||||
import java.net.DatagramSocket;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.transport.FIFOBandwidthLimiter;
|
||||
import net.i2p.router.util.CoDelBlockingQueue;
|
||||
import net.i2p.util.I2PThread;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@@ -35,7 +35,7 @@ class UDPSender {
|
||||
if (maxMemory == Long.MAX_VALUE)
|
||||
maxMemory = 96*1024*1024l;
|
||||
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
|
||||
_outboundQueue = new LinkedBlockingQueue(qsize);
|
||||
_outboundQueue = new CoDelBlockingQueue(ctx, "UDP-Sender", qsize);
|
||||
_socket = socket;
|
||||
_runner = new Runner();
|
||||
_name = name;
|
||||
|
||||
Reference in New Issue
Block a user