propagate from branch 'i2p.i2p' (head 86f3e7e668b7ec9f2ddf75be7586719944bbc37f)

to branch 'i2p.i2p.zzz.test' (head da9536c250bc4c0b7523ed748574de1cc97f3028)
This commit is contained in:
zzz
2012-09-08 12:57:09 +00:00
88 changed files with 2243 additions and 643 deletions

View File

@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.TreeSet;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
@@ -438,14 +439,8 @@ public class Blocklist {
* of IP ranges read in from the file.
*/
public void add(String ip) {
InetAddress pi;
try {
pi = InetAddress.getByName(ip);
} catch (UnknownHostException uhe) {
return;
}
if (pi == null) return;
byte[] pib = pi.getAddress();
byte[] pib = Addresses.getIP(ip);
if (pib == null) return;
add(pib);
}
@@ -478,21 +473,13 @@ public class Blocklist {
List<byte[]> rv = new ArrayList(1);
RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer);
if (pinfo == null) return rv;
String oldphost = null;
byte[] oldpib = null;
// for each peer address
for (RouterAddress pa : pinfo.getAddresses()) {
String phost = pa.getOption("host");
if (phost == null) continue;
if (oldphost != null && oldphost.equals(phost)) continue;
oldphost = phost;
InetAddress pi;
try {
pi = InetAddress.getByName(phost);
} catch (UnknownHostException uhe) {
continue;
}
if (pi == null) continue;
byte[] pib = pi.getAddress();
byte[] pib = pa.getIP();
if (pib == null) continue;
if (DataHelper.eq(oldpib, pib)) continue;
oldpib = pib;
rv.add(pib);
}
return rv;
@@ -520,14 +507,8 @@ public class Blocklist {
* calling this externally won't shitlist the peer, this is just an IP check
*/
public boolean isBlocklisted(String ip) {
InetAddress pi;
try {
pi = InetAddress.getByName(ip);
} catch (UnknownHostException uhe) {
return false;
}
if (pi == null) return false;
byte[] pib = pi.getAddress();
byte[] pib = Addresses.getIP(ip);
if (pib == null) return false;
return isBlocklisted(pib);
}

View File

@@ -20,6 +20,7 @@ import java.util.Set;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.util.CDPQEntry;
import net.i2p.util.Log;
/**
@@ -27,7 +28,7 @@ import net.i2p.util.Log;
* delivery and jobs to be fired off if particular events occur.
*
*/
public class OutNetMessage {
public class OutNetMessage implements CDPQEntry {
private final Log _log;
private final RouterContext _context;
private RouterInfo _target;
@@ -47,6 +48,8 @@ public class OutNetMessage {
private long _sendBegin;
//private Exception _createdBy;
private final long _created;
private long _enqueueTime;
private long _seqNum;
/** for debugging, contains a mapping of even name to Long (e.g. "begin sending", "handleOutbound", etc) */
private HashMap<String, Long> _timestamps;
/**
@@ -56,6 +59,26 @@ public class OutNetMessage {
private List<String> _timestampOrder;
private Object _preparationBuf;
/**
* Priorities, higher is higher priority.
* @since 0.9.3
*/
public static final int PRIORITY_HIGHEST = 1000;
public static final int PRIORITY_MY_BUILD_REQUEST = 500;
public static final int PRIORITY_MY_NETDB_LOOKUP = 500;
public static final int PRIORITY_MY_NETDB_STORE = 400;
public static final int PRIORITY_MY_DATA = 400;
public static final int PRIORITY_MY_NETDB_STORE_LOW = 300;
public static final int PRIORITY_HIS_BUILD_REQUEST = 300;
public static final int PRIORITY_BUILD_REPLY = 300;
public static final int PRIORITY_NETDB_REPLY = 300;
public static final int PRIORITY_HIS_NETDB_STORE = 200;
public static final int PRIORITY_NETDB_FLOOD = 200;
public static final int PRIORITY_PARTICIPATING = 200;
public static final int PRIORITY_NETDB_EXPLORE = 100;
public static final int PRIORITY_NETDB_HARVEST = 100;
public static final int PRIORITY_LOWEST = 100;
public OutNetMessage(RouterContext context) {
_context = context;
_log = context.logManager().getLog(OutNetMessage.class);
@@ -264,6 +287,45 @@ public class OutNetMessage {
/** time the transport tries to send the message (including any queueing) */
public long getSendTime() { return _context.clock().now() - _sendBegin; }
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
}
/**
* For CDPQ
* @since 0.9.3
*/
public void setSeqNum(long num) {
_seqNum = num;
}
/**
* For CDPQ
* @since 0.9.3
*/
public long getSeqNum() {
return _seqNum;
}
/**
* We've done what we need to do with the data from this message, though
* we may keep the object around for a while to use its ID, jobs, etc.

View File

@@ -40,6 +40,7 @@ import net.i2p.router.startup.WorkingDir;
import net.i2p.router.tasks.*;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.transport.udp.UDPTransport;
import net.i2p.router.util.EventLog;
import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager;
import net.i2p.util.ByteCache;
@@ -77,6 +78,7 @@ public class Router implements RouterClock.ClockShiftListener {
private I2PThread _gracefulShutdownDetector;
private RouterWatchdog _watchdog;
private Thread _watchdogThread;
private final EventLog _eventLog;
public final static String PROP_CONFIG_FILE = "router.configLocation";
@@ -100,6 +102,7 @@ public class Router implements RouterClock.ClockShiftListener {
public final static String PROP_KEYS_FILENAME_DEFAULT = "router.keys";
public final static String PROP_SHUTDOWN_IN_PROGRESS = "__shutdownInProgress";
public final static String DNS_CACHE_TIME = "" + (5*60);
private static final String EVENTLOG = "eventlog.txt";
private static final String originalTimeZoneID;
static {
@@ -219,12 +222,14 @@ public class Router implements RouterClock.ClockShiftListener {
// i2p.dir.pid defaults to i2p.dir.router
// i2p.dir.base defaults to user.dir == $CWD
_context = new RouterContext(this, envProps);
_eventLog = new EventLog(_context, new File(_context.getRouterDir(), EVENTLOG));
// This is here so that we can get the directory location from the context
// for the ping file
// Check for other router but do not start a thread yet so the update doesn't cause
// a NCDFE
if (!isOnlyRouterRunning()) {
_eventLog.addEvent(EventLog.ABORTED, "Another router running");
System.err.println("ERROR: There appears to be another router already running!");
System.err.println(" Please make sure to shut down old instances before starting up");
System.err.println(" a new one. If you are positive that no other instance is running,");
@@ -410,6 +415,12 @@ public class Router implements RouterClock.ClockShiftListener {
public void runRouter() {
if (_isAlive)
throw new IllegalStateException();
String last = _config.get("router.previousFullVersion");
if (last != null) {
_eventLog.addEvent(EventLog.UPDATED, "from " + last + " to " + RouterVersion.FULL_VERSION);
saveConfig("router.previousFullVersion", null);
}
_eventLog.addEvent(EventLog.STARTED, RouterVersion.FULL_VERSION);
startupStuff();
_isAlive = true;
_started = _context.clock().now();
@@ -631,6 +642,13 @@ public class Router implements RouterClock.ClockShiftListener {
return Certificate.NULL_CERT;
}
/**
* @since 0.9.3
*/
public EventLog eventLog() {
return _eventLog;
}
/**
* Ugly list of files that we need to kill if we are building a new identity
*
@@ -646,7 +664,6 @@ public class Router implements RouterClock.ClockShiftListener {
"sessionKeys.dat" // no longer used
};
static final String IDENTLOG = "identlog.txt";
public void killKeys() {
//new Exception("Clearing identity files").printStackTrace();
int remCount = 0;
@@ -671,18 +688,10 @@ public class Router implements RouterClock.ClockShiftListener {
}
if (remCount > 0) {
FileOutputStream log = null;
try {
log = new FileOutputStream(new File(_context.getRouterDir(), IDENTLOG), true);
log.write((new Date() + ": Old router identity keys cleared\n").getBytes());
} catch (IOException ioe) {
// ignore
} finally {
if (log != null)
try { log.close(); } catch (IOException ioe) {}
}
_eventLog.addEvent(EventLog.REKEYED);
}
}
/**
* Rebuild a new identity the hard way - delete all of our old identity
* files, then reboot the router.
@@ -836,6 +845,7 @@ public class Router implements RouterClock.ClockShiftListener {
// logManager shut down in finalShutdown()
_watchdog.shutdown();
_watchdogThread.interrupt();
_eventLog.addEvent(EventLog.STOPPED, Integer.toString(exitCode));
finalShutdown(exitCode);
}
@@ -1139,6 +1149,7 @@ public class Router implements RouterClock.ClockShiftListener {
_config.put("router.updateLastInstalled", "" + System.currentTimeMillis());
// Set the last version to the current version, since 0.8.13
_config.put("router.previousVersion", RouterVersion.VERSION);
_config.put("router.previousFullVersion", RouterVersion.FULL_VERSION);
saveConfig();
ok = FileUtil.extractZip(updateFile, _context.getBaseDir());
}

View File

@@ -52,6 +52,8 @@ class ClientManager {
/** SSL interface (only) @since 0.8.3 */
private static final String PROP_ENABLE_SSL = "i2cp.SSL";
private static final int INTERNAL_QUEUE_SIZE = 256;
public ClientManager(RouterContext context, int port) {
_ctx = context;
_log = context.logManager().getLog(ClientManager.class);
@@ -125,9 +127,8 @@ class ClientManager {
public I2CPMessageQueue internalConnect() throws I2PSessionException {
if (!_isStarted)
throw new I2PSessionException("Router client manager is shut down");
// for now we make these unlimited size
LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(INTERNAL_QUEUE_SIZE);
I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue);

View File

@@ -1,6 +1,7 @@
package net.i2p.router.client;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.internal.I2CPMessageQueue;
@@ -32,6 +33,16 @@ class I2CPMessageQueueImpl extends I2CPMessageQueue {
return _out.offer(msg);
}
/**
* Send a message, blocking.
* @param timeout how long to wait for space (ms)
* @return success (false if no space available or if timed out)
* @since 0.9.3
*/
public boolean offer(I2CPMessage msg, long timeout) throws InterruptedException {
return _out.offer(msg, timeout, TimeUnit.MILLISECONDS);
}
/**
* Receive a message, nonblocking
* @return message or null if none available

View File

@@ -19,6 +19,9 @@ import net.i2p.router.RouterContext;
/**
* HandlerJobBuilder to build jobs to handle GarlicMessages
*
* This is essentially unused, as InNetMessagePool short circuits tunnel messages,
* and the garlics are handled in InboundMessageDistributor.
* Unless we get a garlic message not down a tunnel?
*/
public class GarlicMessageHandler implements HandlerJobBuilder {
private final RouterContext _context;

View File

@@ -97,10 +97,12 @@ public class GarlicMessageReceiver {
*/
private void handleClove(GarlicClove clove) {
if (!isValid(clove)) {
if (_log.shouldLog(Log.DEBUG))
_log.warn("Invalid clove " + clove);
//if (_log.shouldLog(Log.WARN))
// _log.warn("Invalid clove " + clove);
return;
}
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("valid clove " + clove);
_receiver.handleClove(clove.getInstructions(), clove.getData());
}

View File

@@ -15,6 +15,7 @@ import net.i2p.data.i2np.GarlicMessage;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
@@ -24,6 +25,9 @@ import net.i2p.util.Log;
* as if they arrived locally. Other instructions are not yet implemented (but
* need to be. soon)
*
* This is essentially unused, as InNetMessagePool short circuits tunnel messages,
* and the garlics are handled in InboundMessageDistributor.
* Unless we get a garlic message not down a tunnel?
*/
class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.CloveReceiver {
private final Log _log;
@@ -34,6 +38,9 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
//private MessageHandler _handler;
//private GarlicMessageParser _parser;
private final static int ROUTER_PRIORITY = OutNetMessage.PRIORITY_LOWEST;
private final static int TUNNEL_PRIORITY = OutNetMessage.PRIORITY_LOWEST;
/**
* @param from ignored
* @param fromHash ignored
@@ -42,8 +49,8 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
super(context);
_log = context.logManager().getLog(HandleGarlicMessageJob.class);
getContext().statManager().createRateStat("crypto.garlic.decryptFail", "How often garlic messages are undecryptable", "Encryption", new long[] { 5*60*1000, 60*60*1000, 24*60*60*1000 });
if (_log.shouldLog(Log.DEBUG))
_log.debug("New handle garlicMessageJob called w/ message from [" + from + "]", new Exception("Debug"));
if (_log.shouldLog(Log.WARN))
_log.warn("Garlic Message not down a tunnel??? from [" + from + "]", new Exception("I did it"));
_message = msg;
//_from = from;
//_fromHash = fromHash;
@@ -78,10 +85,10 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("router delivery instructions targetting "
+ instructions.getRouter().toBase64().substring(0,4));
+ instructions.getRouter().toBase64().substring(0,4) + " for " + data);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data,
instructions.getRouter(),
10*1000, 100);
10*1000, ROUTER_PRIORITY);
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
j.runJob();
//getContext().jobQueue().addJob(j);
@@ -92,9 +99,12 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
gw.setMessage(data);
gw.setTunnelId(instructions.getTunnelId());
gw.setMessageExpiration(data.getMessageExpiration());
if (_log.shouldLog(Log.DEBUG))
_log.debug("tunnel delivery instructions targetting "
+ instructions.getRouter().toBase64().substring(0,4) + " for " + data);
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw,
instructions.getRouter(),
10*1000, 100);
10*1000, TUNNEL_PRIORITY);
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
job.runJob();
// getContext().jobQueue().addJob(job);

View File

@@ -24,6 +24,7 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.message.SendMessageDirectJob;
@@ -40,7 +41,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
private final static int MAX_ROUTERS_RETURNED = 3;
private final static int CLOSENESS_THRESHOLD = 8; // FNDF.MAX_TO_FLOOD + 1
private final static int REPLY_TIMEOUT = 60*1000;
private final static int MESSAGE_PRIORITY = 300;
private final static int MESSAGE_PRIORITY = OutNetMessage.PRIORITY_NETDB_REPLY;
/**
* If a routerInfo structure isn't this recent, don't send it out.
@@ -283,7 +284,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
m.setMessage(message);
m.setMessageExpiration(message.getMessageExpiration());
m.setTunnelId(replyTunnel);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, 100);
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY);
j.runJob();
//getContext().jobQueue().addJob(j);
}

View File

@@ -41,6 +41,9 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
*/
private static final int MAX_TO_FLOOD = 4;
private static final int FLOOD_PRIORITY = OutNetMessage.PRIORITY_NETDB_FLOOD;
private static final int FLOOD_TIMEOUT = 30*1000;
public FloodfillNetworkDatabaseFacade(RouterContext context) {
super(context);
_activeFloodQueries = new HashMap();
@@ -224,9 +227,6 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
}
}
private static final int FLOOD_PRIORITY = 200;
private static final int FLOOD_TIMEOUT = 30*1000;
@Override
protected PeerSelector createPeerSelector() { return new FloodfillPeerSelector(_context); }

View File

@@ -11,6 +11,7 @@ import net.i2p.data.Hash;
import net.i2p.data.RouterInfo;
import net.i2p.data.i2np.DatabaseLookupMessage;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.router.message.SendMessageDirectJob;
@@ -39,7 +40,7 @@ class HarvesterJob extends JobImpl {
/** don't try to update more than 5 peers during each run */
private static final int MAX_PER_RUN = 5;
/** background job, who cares */
private static final int PRIORITY = 100;
private static final int PRIORITY = OutNetMessage.PRIORITY_NETDB_HARVEST;
public static final String PROP_ENABLED = "netDb.shouldHarvest";

View File

@@ -25,6 +25,7 @@ import net.i2p.data.i2np.DatabaseSearchReplyMessage;
import net.i2p.data.i2np.DatabaseStoreMessage;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.TunnelInfo;
import net.i2p.util.Log;
@@ -78,6 +79,10 @@ class SearchJob extends JobImpl {
*
*/
private static final long REQUEUE_DELAY = 1000;
// TODO pass to the tunnel dispatcher
//private final static int LOOKUP_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_LOOKUP;
//private final static int STORE_PRIORITY = OutNetMessage.PRIORITY_HIS_NETDB_STORE;
/**
* Create a new search for the routingKey specified
@@ -445,6 +450,7 @@ class SearchJob extends JobImpl {
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
_floodfillSearchesOutstanding++;
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
// TODO pass a priority to the dispatcher
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, to);
}
@@ -652,6 +658,7 @@ class SearchJob extends JobImpl {
if (outTunnel != null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("resending leaseSet out to " + to + " through " + outTunnel + ": " + msg);
// TODO pass a priority to the dispatcher
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to);
return true;
} else {

View File

@@ -43,7 +43,7 @@ class StoreJob extends JobImpl {
private final static int PARALLELIZATION = 4; // how many sent at a time
private final static int REDUNDANCY = 4; // we want the data sent to 6 peers
private final static int STORE_PRIORITY = 100;
private final static int STORE_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE;
/**
* Send a data structure to the floodfills

View File

@@ -1261,16 +1261,8 @@ public class ProfileOrganizer {
if (paddr == null)
return rv;
for (RouterAddress pa : paddr) {
String phost = pa.getOption("host");
if (phost == null) continue;
InetAddress pi;
try {
pi = InetAddress.getByName(phost);
} catch (UnknownHostException uhe) {
continue;
}
if (pi == null) continue;
byte[] pib = pi.getAddress();
byte[] pib = pa.getIP();
if (pib == null) continue;
rv.add(maskedIP(pib, mask));
}
return rv;

View File

@@ -6,6 +6,7 @@ import net.i2p.data.DataHelper;
import net.i2p.router.Job;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.util.EventLog;
import net.i2p.stat.Rate;
import net.i2p.stat.RateStat;
import net.i2p.util.ShellCommand;
@@ -107,6 +108,7 @@ public class RouterWatchdog implements Runnable {
_log.error("Memory: " + DataHelper.formatSize(used) + '/' + DataHelper.formatSize(max));
if (_consecutiveErrors == 1) {
_log.log(Log.CRIT, "Router appears hung, or there is severe network congestion. Watchdog starts barking!");
_context.router().eventLog().addEvent(EventLog.WATCHDOG);
// This works on linux...
// It won't on windows, and we can't call i2prouter.bat either, it does something
// completely different...

View File

@@ -10,6 +10,8 @@ package net.i2p.router.tasks;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.RouterVersion;
import net.i2p.router.util.EventLog;
import net.i2p.util.Log;
/**
@@ -35,6 +37,7 @@ public class ShutdownHook extends Thread {
// Needed to make the wrapper happy, otherwise it gets confused
// and thinks we haven't shut down, possibly because it
// prevents other shutdown hooks from running
_context.router().eventLog().addEvent(EventLog.CRASHED, RouterVersion.FULL_VERSION);
_context.router().setKillVMOnEnd(false);
_context.router().shutdown2(Router.EXIT_HARD);
}

View File

@@ -62,6 +62,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
public void shutdown() {
if (_manager != null)
_manager.shutdown();
_geoIP.shutdown();
}
public void restart() {
@@ -250,7 +251,7 @@ public class CommSystemFacadeImpl extends CommSystemFacade {
props.setProperty(NTCPAddress.PROP_PORT, port);
RouterAddress addr = new RouterAddress();
addr.setCost(NTCPAddress.DEFAULT_COST);
addr.setExpiration(null);
//addr.setExpiration(null);
addr.setOptions(props);
addr.setTransportStyle(NTCPTransport.STYLE);
//if (isNew) {

View File

@@ -21,6 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import net.i2p.data.Hash;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Addresses;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
@@ -72,6 +73,17 @@ class GeoIP {
static final String COUNTRY_FILE_DEFAULT = "countries.txt";
public static final String PROP_IP_COUNTRY = "i2np.lastCountry";
/**
* @since 0.9.3
*/
public void shutdown() {
_codeToName.clear();
_codeCache.clear();
_IPToCountry.clear();
_pendingSearch.clear();
_notFound.clear();
}
/**
* Fire off a thread to lookup all pending IPs.
* There is no indication of completion.
@@ -297,14 +309,8 @@ class GeoIP {
* Add to the list needing lookup
*/
public void add(String ip) {
InetAddress pi;
try {
pi = InetAddress.getByName(ip);
} catch (UnknownHostException uhe) {
return;
}
if (pi == null) return;
byte[] pib = pi.getAddress();
byte[] pib = Addresses.getIP(ip);
if (pib == null) return;
add(pib);
}
@@ -325,14 +331,8 @@ class GeoIP {
* @return lower-case code, generally two letters, or null.
*/
public String get(String ip) {
InetAddress pi;
try {
pi = InetAddress.getByName(ip);
} catch (UnknownHostException uhe) {
return null;
}
if (pi == null) return null;
byte[] pib = pi.getAddress();
byte[] pib = Addresses.getIP(ip);
if (pib == null) return null;
return get(pib);
}

View File

@@ -25,7 +25,7 @@ import net.i2p.router.ReplyJob;
import net.i2p.router.RouterContext;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Tracks outbound messages.
@@ -254,10 +254,11 @@ public class OutboundMessageRegistry {
/** @deprecated unused */
public void renderStatusHTML(Writer out) throws IOException {}
private class CleanupTask implements SimpleTimer.TimedEvent {
private class CleanupTask extends SimpleTimer2.TimedEvent {
private long _nextExpire;
public CleanupTask() {
super(_context.simpleTimer2());
_nextExpire = -1;
}
@@ -312,14 +313,14 @@ public class OutboundMessageRegistry {
if (_nextExpire <= now)
_nextExpire = now + 10*1000;
SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
schedule(_nextExpire - now);
}
public void scheduleExpiration(MessageSelector sel) {
long now = _context.clock().now();
if ( (_nextExpire <= now) || (sel.getExpiration() < _nextExpire) ) {
_nextExpire = sel.getExpiration();
SimpleTimer.getInstance().addEvent(CleanupTask.this, _nextExpire - now);
reschedule(_nextExpire - now);
}
}
}

View File

@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import net.i2p.data.DataHelper;
import net.i2p.data.Hash;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterIdentity;
@@ -35,6 +36,7 @@ import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.LHMCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
@@ -53,7 +55,18 @@ public abstract class TransportImpl implements Transport {
private final Map<Hash, Long> _unreachableEntries;
private final Set<Hash> _wasUnreachableEntries;
/** global router ident -> IP */
private static final Map<Hash, byte[]> _IPMap = new ConcurrentHashMap(128);
private static final Map<Hash, byte[]> _IPMap;
static {
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
long min = 512;
long max = 4096;
// 1024 nominal for 128 MB
int size = (int) Math.max(min, Math.min(max, 1 + (maxMemory / (128*1024))));
_IPMap = new LHMCache(size);
}
/**
* Initialize the new transport
@@ -585,12 +598,27 @@ public abstract class TransportImpl implements Transport {
}
public void setIP(Hash peer, byte[] ip) {
_IPMap.put(peer, ip);
_context.commSystem().queueLookup(ip);
byte[] old;
synchronized (_IPMap) {
old = _IPMap.put(peer, ip);
}
if (!DataHelper.eq(old, ip))
_context.commSystem().queueLookup(ip);
}
public static byte[] getIP(Hash peer) {
return _IPMap.get(peer);
synchronized (_IPMap) {
return _IPMap.get(peer);
}
}
/**
* @since 0.9.3
*/
static void clearCaches() {
synchronized(_IPMap) {
_IPMap.clear();
}
}
/** @param addr non-null */

View File

@@ -185,6 +185,8 @@ public class TransportManager implements TransportEventListener {
public void shutdown() {
stopListening();
_dhThread.shutdown();
Addresses.clearCaches();
TransportImpl.clearCaches();
}
public Transport getTransport(String style) {

View File

@@ -8,13 +8,13 @@ package net.i2p.router.transport.ntcp;
*
*/
import java.net.InetAddress;
import java.util.Properties;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
import net.i2p.router.transport.TransportImpl;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
/**
@@ -25,9 +25,9 @@ public class NTCPAddress {
private final String _host;
//private InetAddress _addr;
/** Port number used in RouterAddress definitions */
public final static String PROP_PORT = "port";
public final static String PROP_PORT = RouterAddress.PROP_PORT;
/** Host name used in RouterAddress definitions */
public final static String PROP_HOST = "host";
public final static String PROP_HOST = RouterAddress.PROP_HOST;
public static final int DEFAULT_COST = 10;
public NTCPAddress(String host, int port) {
@@ -59,23 +59,8 @@ public class NTCPAddress {
_port = -1;
return;
}
String host = addr.getOption(PROP_HOST);
int iport = -1;
if (host == null) {
_host = null;
} else {
_host = host.trim();
String port = addr.getOption(PROP_PORT);
if ( (port != null) && (port.trim().length() > 0) && !("null".equals(port)) ) {
try {
iport = Integer.parseInt(port.trim());
} catch (NumberFormatException nfe) {
Log log = I2PAppContext.getGlobalContext().logManager().getLog(NTCPAddress.class);
log.error("Invalid port [" + port + "]", nfe);
}
}
}
_port = iport;
_host = addr.getOption(PROP_HOST);
_port = addr.getPort();
}
public RouterAddress toRouterAddress() {
@@ -85,7 +70,7 @@ public class NTCPAddress {
RouterAddress addr = new RouterAddress();
addr.setCost(DEFAULT_COST);
addr.setExpiration(null);
//addr.setExpiration(null);
Properties props = new Properties();
props.setProperty(PROP_HOST, _host);
@@ -106,24 +91,11 @@ public class NTCPAddress {
public boolean isPubliclyRoutable() {
return isPubliclyRoutable(_host);
}
public static boolean isPubliclyRoutable(String host) {
if (host == null) return false;
try {
InetAddress addr = InetAddress.getByName(host);
byte quad[] = addr.getAddress();
// allow ipv6 for ntcpaddress, since we've still got ssu
//if (quad.length != 4) {
// if (_log.shouldLog(Log.ERROR))
// _log.error("Refusing IPv6 address (" + host + " / " + addr.getHostAddress() + ") "
// + " since not all peers support it, and we don't support restricted routes");
// return false;
//}
return TransportImpl.isPubliclyRoutable(quad);
} catch (Throwable t) {
//if (_log.shouldLog(Log.WARN))
// _log.warn("Error checking routability", t);
return false;
}
byte quad[] = Addresses.getIP(host);
return TransportImpl.isPubliclyRoutable(quad);
}
@Override

View File

@@ -4,7 +4,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -24,6 +26,7 @@ import net.i2p.router.OutNetMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.HexDump;
import net.i2p.util.Log;
@@ -83,7 +86,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
/**
* pending unprepared OutNetMessage instances
*/
private final Queue<OutNetMessage> _outbound;
private final CoDelPriorityBlockingQueue<OutNetMessage> _outbound;
/**
* current prepared OutNetMessage, or null - synchronize on _outbound to modify
* FIXME why do we need this???
@@ -136,6 +139,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
public static final int BUFFER_SIZE = 16*1024;
/** 2 bytes for length and 4 for CRC */
public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
/**
* Create an inbound connected (though not established) NTCP connection
@@ -150,8 +155,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(2);
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = true;
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
@@ -175,8 +179,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_readBufs = new ConcurrentLinkedQueue();
_writeBufs = new ConcurrentLinkedQueue();
_bwRequests = new ConcurrentHashSet(8);
// TODO possible switch to CLQ but beware non-constant size() - see below
_outbound = new LinkedBlockingQueue();
_outbound = new CoDelPriorityBlockingQueue(ctx, "NTCP-Connection", 32);
_isInbound = false;
_decryptBlockBuf = new byte[BLOCK_SIZE];
_curReadState = new ReadState();
@@ -295,15 +298,16 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
EventPumper.releaseBuf(bb);
}
OutNetMessage msg;
while ((msg = _outbound.poll()) != null) {
List<OutNetMessage> pending = new ArrayList();
_outbound.drainAllTo(pending);
for (OutNetMessage msg : pending) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
releaseBuf((PrepBuffer)buf);
_transport.afterSend(msg, false, allowRequeue, msg.getLifetime());
}
msg = _currentOutbound;
OutNetMessage msg = _currentOutbound;
if (msg != null) {
Object buf = msg.releasePreparationBuffer();
if (buf != null)
@@ -316,6 +320,9 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
* toss the message onto the connection's send queue
*/
public void send(OutNetMessage msg) {
/****
always enqueue, let the queue do the dropping
if (tooBacklogged()) {
boolean allowRequeue = false; // if we are too backlogged in tcp, don't try ssu
boolean successful = false;
@@ -335,20 +342,20 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
return;
}
_consecutiveBacklog = 0;
int enqueued = 0;
****/
//if (FAST_LARGE)
bufferedPrepare(msg);
boolean noOutbound = false;
_outbound.offer(msg);
enqueued = _outbound.size();
//int enqueued = _outbound.size();
// although stat description says ahead of this one, not including this one...
_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
noOutbound = (_currentOutbound == null);
if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
//_context.statManager().addRateData("ntcp.sendQueueSize", enqueued);
boolean noOutbound = (_currentOutbound == null);
//if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType());
if (_established && noOutbound)
_transport.getWriter().wantsWrite(this, "enqueued");
}
/****
private long queueTime() {
OutNetMessage msg = _currentOutbound;
if (msg == null) {
@@ -358,29 +365,31 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
}
return msg.getSendTime(); // does not include any of the pre-send(...) preparation
}
****/
public boolean tooBacklogged() {
long queueTime = queueTime();
if (queueTime <= 0) return false;
boolean currentOutboundSet = _currentOutbound != null;
//long queueTime = queueTime();
//if (queueTime <= 0) return false;
// perhaps we could take into account the size of the queued messages too, our
// current transmission rate, and how much time is left before the new message's expiration?
// ok, maybe later...
if (getUptime() < 10*1000) // allow some slack just after establishment
return false;
if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
//if (queueTime > 5*1000) { // bloody arbitrary. well, its half the average message lifetime...
if (_outbound.isBacklogged()) { // bloody arbitrary. well, its half the average message lifetime...
int size = _outbound.size();
if (_log.shouldLog(Log.WARN)) {
int writeBufs = _writeBufs.size();
boolean currentOutboundSet = _currentOutbound != null;
try {
_log.warn("Too backlogged: queue time " + queueTime + " and the size is " + size
_log.warn("Too backlogged: size is " + size
+ ", wantsWrite? " + (0 != (_conKey.interestOps()&SelectionKey.OP_WRITE))
+ ", currentOut set? " + currentOutboundSet
+ ", writeBufs: " + writeBufs + " on " + toString());
} catch (Exception e) {} // java.nio.channels.CancelledKeyException
}
_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
//_context.statManager().addRateData("ntcp.sendBacklogTime", queueTime);
return true;
//} else if (size > 32) { // another arbitrary limit.
// if (_log.shouldLog(Log.ERROR))
@@ -397,7 +406,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
dsm.setEntry(_context.router().getRouterInfo());
infoMsg.setMessage(dsm);
infoMsg.setPriority(100);
infoMsg.setPriority(PRIORITY);
RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
if (target != null) {
infoMsg.setTarget(target);
@@ -649,11 +658,14 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
_log.info("attempt for multiple outbound messages with " + System.identityHashCode(_currentOutbound) + " already waiting and " + _outbound.size() + " queued");
return;
}
/****
//throw new RuntimeException("We should not be preparing a write while we still have one pending");
if (queueTime() > 3*1000) { // don't stall low-priority messages
****/
msg = _outbound.poll();
if (msg == null)
return;
/****
} else {
// FIXME
// This is a linear search to implement a priority queue, O(n**2)
@@ -679,6 +691,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
if ((!removed) && _log.shouldLog(Log.WARN))
_log.warn("Already removed??? " + msg.getMessage().getType());
}
****/
_currentOutbound = msg;
}

View File

@@ -298,8 +298,8 @@ public class NTCPTransport extends TransportImpl {
_log.debug("no bid when trying to send to " + peer.toBase64() + " as they don't have an ntcp address");
return null;
}
NTCPAddress naddr = new NTCPAddress(addr);
if ( (naddr.getPort() <= 0) || (naddr.getHost() == null) ) {
byte[] ip = addr.getIP();
if ( (addr.getPort() <= 0) || (ip == null) ) {
_context.statManager().addRateData("ntcp.connectFailedInvalidPort", 1);
markUnreachable(peer);
//_context.shitlist().shitlistRouter(toAddress.getIdentity().calculateHash(), "Invalid NTCP address", STYLE);
@@ -307,7 +307,7 @@ public class NTCPTransport extends TransportImpl {
_log.debug("no bid when trying to send to " + peer.toBase64() + " as they don't have a valid ntcp address");
return null;
}
if (!naddr.isPubliclyRoutable()) {
if (!isPubliclyRoutable(ip)) {
if (! _context.getProperty("i2np.ntcp.allowLocal", "false").equals("true")) {
_context.statManager().addRateData("ntcp.bidRejectedLocalAddress", 1);
markUnreachable(peer);

View File

@@ -3,6 +3,7 @@ package net.i2p.router.transport.udp;
import net.i2p.data.ByteArray;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
@@ -12,7 +13,7 @@ import net.i2p.util.Log;
* Warning - there is no synchronization in this class, take care in
* InboundMessageFragments to avoid use-after-release, etc.
*/
class InboundMessageState {
class InboundMessageState implements CDQEntry {
private final RouterContext _context;
private final Log _log;
private final long _messageId;
@@ -29,6 +30,7 @@ class InboundMessageState {
*/
private int _lastFragment;
private final long _receiveBegin;
private long _enqueueTime;
private int _completeSize;
private boolean _released;
@@ -138,6 +140,30 @@ class InboundMessageState {
return _context.clock().now() - _receiveBegin;
}
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
releaseResources();
}
public Hash getFrom() { return _from; }
public long getMessageId() { return _messageId; }

View File

@@ -1,7 +1,6 @@
package net.i2p.router.transport.udp;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
@@ -10,6 +9,7 @@ import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageHandler;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelBlockingQueue;
//import net.i2p.util.ByteCache;
import net.i2p.util.HexDump;
import net.i2p.util.I2PThread;
@@ -55,7 +55,7 @@ class MessageReceiver {
_threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
}
_completeMessages = new LinkedBlockingQueue(qsize);
_completeMessages = new CoDelBlockingQueue(ctx, "UDP-MessageReceiver", qsize);
// the runners run forever, no need to have a cache
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);

View File

@@ -165,9 +165,9 @@ class OutboundMessageFragments {
state.releaseResources();
return;
}
int active = peer.add(state);
peer.add(state);
add(peer);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Error initializing " + msg);
@@ -182,9 +182,9 @@ class OutboundMessageFragments {
PeerState peer = state.getPeer();
if (peer == null)
throw new RuntimeException("wtf, null peer for " + state);
int active = peer.add(state);
peer.add(state);
add(peer);
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
//_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
}
/**

View File

@@ -7,6 +7,7 @@ import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.OutNetMessage;
import net.i2p.router.util.CDPQEntry;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
@@ -14,7 +15,7 @@ import net.i2p.util.Log;
* Maintain the outbound fragmentation for resending, for a single message.
*
*/
class OutboundMessageState {
class OutboundMessageState implements CDPQEntry {
private final I2PAppContext _context;
private final Log _log;
/** may be null if we are part of the establishment */
@@ -36,6 +37,9 @@ class OutboundMessageState {
/** for tracking use-after-free bugs */
private boolean _released;
private Exception _releasedBy;
// we can't use the ones in _message since it is null for injections
private long _enqueueTime;
private long _seqNum;
public static final int MAX_MSG_SIZE = 32 * 1024;
/** is this enough for a high-bandwidth router? */
@@ -104,6 +108,7 @@ class OutboundMessageState {
/**
* Called from OutboundMessageFragments
* @param m null if msg is "injected"
* @return success
*/
private boolean initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) {
@@ -128,8 +133,8 @@ class OutboundMessageState {
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len));
return true;
} catch (IllegalStateException ise) {
_cache.release(_messageBuf);
@@ -368,6 +373,56 @@ class OutboundMessageState {
}
}
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
_peer.getTransport().failed(this, false);
releaseResources();
}
/**
* For CDPQ
* @since 0.9.3
*/
public void setSeqNum(long num) {
_seqNum = num;
}
/**
* For CDPQ
* @since 0.9.3
*/
public long getSeqNum() {
return _seqNum;
}
/**
* For CDPQ
* @return OutNetMessage priority or 1000 for injected
* @since 0.9.3
*/
public int getPriority() {
return _message != null ? _message.getPriority() : 1000;
}
@Override
public String toString() {
short sends[] = _fragmentSends;

View File

@@ -16,9 +16,9 @@ import net.i2p.data.Hash;
import net.i2p.data.RouterIdentity;
import net.i2p.data.SessionKey;
import net.i2p.data.Signature;
import net.i2p.util.ByteCache;
import net.i2p.util.Addresses;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
/**
* Big ol' class to do all our packet formatting. The UDPPackets generated are
@@ -102,9 +102,6 @@ class PacketBuilder {
private final Log _log;
private final UDPTransport _transport;
private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE);
private static final ByteCache _hmacCache = ByteCache.getInstance(64, Hash.HASH_LENGTH);
/**
* For debugging and stats only - does not go out on the wire.
* These are chosen to be higher than the highest I2NP message type,
@@ -607,12 +604,12 @@ class PacketBuilder {
// ok, now the full data is in there, but we also need to encrypt
// the signature, which means we need the IV
ByteArray iv = _ivCache.acquire();
_context.random().nextBytes(iv.getData());
byte[] iv = SimpleByteCache.acquire(UDPPacket.IV_SIZE);
_context.random().nextBytes(iv);
int encrWrite = Signature.SIGNATURE_BYTES + 8;
int sigBegin = off - encrWrite;
_context.aes().encrypt(data, sigBegin, data, sigBegin, state.getCipherKey(), iv.getData(), encrWrite);
_context.aes().encrypt(data, sigBegin, data, sigBegin, state.getCipherKey(), iv, encrWrite);
// pad up so we're on the encryption boundary
if ( (off % 16) != 0)
@@ -620,7 +617,7 @@ class PacketBuilder {
packet.getPacket().setLength(off);
authenticate(packet, ourIntroKey, ourIntroKey, iv);
setTo(packet, to, state.getSentPort());
_ivCache.release(iv);
SimpleByteCache.release(iv);
packet.setMessageType(TYPE_CREAT);
return packet;
}
@@ -1290,10 +1287,10 @@ class PacketBuilder {
* @param macKey key to generate the, er, MAC
*/
private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey) {
ByteArray iv = _ivCache.acquire();
_context.random().nextBytes(iv.getData());
byte[] iv = SimpleByteCache.acquire(UDPPacket.IV_SIZE);
_context.random().nextBytes(iv);
authenticate(packet, cipherKey, macKey, iv);
_ivCache.release(iv);
SimpleByteCache.release(iv);
}
/**
@@ -1308,38 +1305,38 @@ class PacketBuilder {
* @param macKey key to generate the, er, MAC
* @param iv IV to deliver
*/
private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, ByteArray iv) {
private void authenticate(UDPPacket packet, SessionKey cipherKey, SessionKey macKey, byte[] iv) {
long before = System.currentTimeMillis();
int encryptOffset = packet.getPacket().getOffset() + UDPPacket.IV_SIZE + UDPPacket.MAC_SIZE;
int encryptSize = packet.getPacket().getLength() - UDPPacket.IV_SIZE - UDPPacket.MAC_SIZE - packet.getPacket().getOffset();
byte data[] = packet.getPacket().getData();
_context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv.getData(), encryptSize);
_context.aes().encrypt(data, encryptOffset, data, encryptOffset, cipherKey, iv, encryptSize);
// ok, now we need to prepare things for the MAC, which requires reordering
int off = packet.getPacket().getOffset();
System.arraycopy(data, encryptOffset, data, off, encryptSize);
off += encryptSize;
System.arraycopy(iv.getData(), 0, data, off, UDPPacket.IV_SIZE);
System.arraycopy(iv, 0, data, off, UDPPacket.IV_SIZE);
off += UDPPacket.IV_SIZE;
DataHelper.toLong(data, off, 2, encryptSize ^ PROTOCOL_VERSION);
int hmacOff = packet.getPacket().getOffset();
int hmacLen = encryptSize + UDPPacket.IV_SIZE + 2;
//Hash hmac = _context.hmac().calculate(macKey, data, hmacOff, hmacLen);
ByteArray ba = _hmacCache.acquire();
_context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba.getData(), 0);
byte[] ba = SimpleByteCache.acquire(Hash.HASH_LENGTH);
_context.hmac().calculate(macKey, data, hmacOff, hmacLen, ba, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Authenticating " + packet.getPacket().getLength() +
"\nIV: " + Base64.encode(iv.getData()) +
"\nraw mac: " + Base64.encode(ba.getData()) +
"\nIV: " + Base64.encode(iv) +
"\nraw mac: " + Base64.encode(ba) +
"\nMAC key: " + macKey);
// ok, now lets put it back where it belongs...
System.arraycopy(data, hmacOff, data, encryptOffset, encryptSize);
//System.arraycopy(hmac.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
System.arraycopy(ba.getData(), 0, data, hmacOff, UDPPacket.MAC_SIZE);
System.arraycopy(iv.getData(), 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
_hmacCache.release(ba);
System.arraycopy(ba, 0, data, hmacOff, UDPPacket.MAC_SIZE);
System.arraycopy(iv, 0, data, hmacOff + UDPPacket.MAC_SIZE, UDPPacket.IV_SIZE);
SimpleByteCache.release(ba);
long timeToAuth = System.currentTimeMillis() - before;
_context.statManager().addRateData("udp.packetAuthTime", timeToAuth, timeToAuth);
if (timeToAuth > 100)

View File

@@ -16,6 +16,7 @@ import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
import net.i2p.router.OutNetMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.Log;
import net.i2p.util.ConcurrentHashSet;
@@ -188,8 +189,19 @@ class PeerState {
/** list of InboundMessageState for active message */
private final Map<Long, InboundMessageState> _inboundMessages;
/** list of OutboundMessageState */
/**
* Mostly messages that have been transmitted and are awaiting acknowledgement,
* although there could be some that have not been sent yet.
*/
private final List<OutboundMessageState> _outboundMessages;
/**
* Priority queue of messages that have not yet been sent.
* They are taken from here and put in _outboundMessages.
*/
private final CoDelPriorityBlockingQueue<OutboundMessageState> _outboundQueue;
/** which outbound message is currently being retransmitted */
private OutboundMessageState _retransmitter;
@@ -298,6 +310,7 @@ class PeerState {
_rttDeviation = _rtt;
_inboundMessages = new HashMap(8);
_outboundMessages = new ArrayList(32);
_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
// all createRateStat() moved to EstablishmentManager
_remoteIP = remoteIP;
_remotePeer = remotePeer;
@@ -726,8 +739,8 @@ class PeerState {
public List<Long> getCurrentFullACKs() {
// no such element exception seen here
List<Long> rv = new ArrayList(_currentACKs);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Returning " + _currentACKs.size() + " current acks");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Returning " + _currentACKs.size() + " current acks");
return rv;
}
@@ -748,8 +761,8 @@ class PeerState {
public List<Long> getCurrentResendACKs() {
List<Long> randomResends = new ArrayList(_currentACKsResend);
Collections.shuffle(randomResends, _context.random());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Returning " + randomResends.size() + " resend acks");
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Returning " + randomResends.size() + " resend acks");
return randomResends;
}
@@ -1194,24 +1207,26 @@ class PeerState {
* TODO priority queue? (we don't implement priorities in SSU now)
* TODO backlog / pushback / block instead of dropping? Can't really block here.
* TODO SSU does not support isBacklogged() now
* @return total pending messages
*/
public int add(OutboundMessageState state) {
public void add(OutboundMessageState state) {
if (_dead) {
_transport.failed(state, false);
return 0;
return;
}
state.setPeer(this);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
int rv = 0;
boolean fail = false;
// will never fail for CDPQ
boolean fail = !_outboundQueue.offer(state);
/****
synchronized (_outboundMessages) {
rv = _outboundMessages.size() + 1;
if (rv > MAX_SEND_MSGS_PENDING) {
// too many queued messages to one peer? nuh uh.
fail = true;
rv--;
****/
/******* proactive tail drop disabled by jr 2006-04-19 so all this is pointless
@@ -1250,17 +1265,17 @@ class PeerState {
}
*******/
/****
} else {
_outboundMessages.add(state);
}
}
****/
if (fail) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping msg, OB queue full for " + toString());
_transport.failed(state, false);
}
return rv;
}
/** drop all outbound messages */
@@ -1268,19 +1283,17 @@ class PeerState {
//if (_dead) return;
_dead = true;
//_outboundMessages = null;
_retransmitter = null;
int sz = 0;
List<OutboundMessageState> tempList = null;
List<OutboundMessageState> tempList;
synchronized (_outboundMessages) {
sz = _outboundMessages.size();
if (sz > 0) {
_retransmitter = null;
tempList = new ArrayList(_outboundMessages);
_outboundMessages.clear();
}
}
for (int i = 0; i < sz; i++)
_transport.failed(tempList.get(i), false);
_outboundQueue.drainAllTo(tempList);
for (OutboundMessageState oms : tempList) {
_transport.failed(oms, false);
}
// so the ACKSender will drop this peer from its queue
_wantACKSendSince = -1;
@@ -1291,7 +1304,7 @@ class PeerState {
*/
public int getOutboundMessageCount() {
if (_dead) return 0;
return _outboundMessages.size();
return _outboundMessages.size() + _outboundQueue.size();
}
/**
@@ -1305,7 +1318,7 @@ class PeerState {
public int finishMessages() {
// short circuit, unsynchronized
if (_outboundMessages.isEmpty())
return 0;
return _outboundQueue.size();
if (_dead) {
dropOutbound();
@@ -1367,7 +1380,7 @@ class PeerState {
state.releaseResources();
}
return rv;
return rv + _outboundQueue.size();
}
/**
@@ -1387,7 +1400,7 @@ class PeerState {
ShouldSend should = locked_shouldSend(state);
if (should == ShouldSend.YES) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
_log.debug("Allocate sending (OLD) to " + _remotePeer + ": " + state.getMessageId());
/*
while (iter.hasNext()) {
OutboundMessageState later = (OutboundMessageState)iter.next();
@@ -1402,16 +1415,37 @@ class PeerState {
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
break;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return null;
} /* else {
OutNetMessage msg = state.getMessage();
if (msg != null)
msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
} */
}
// Peek at head of _outboundQueue and see if we can send it.
// If so, pull it off, put it in _outbundMessages, test
// again for bandwidth if necessary, and return it.
OutboundMessageState state = _outboundQueue.peek();
if (state != null && ShouldSend.YES == locked_shouldSend(state)) {
// we could get a different state, or null, when we poll,
// due to AQM drops, so we test again if necessary
OutboundMessageState dequeuedState = _outboundQueue.poll();
if (dequeuedState != null) {
_outboundMessages.add(dequeuedState);
if (dequeuedState == state || ShouldSend.YES == locked_shouldSend(dequeuedState)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending (NEW) to " + _remotePeer + ": " + dequeuedState.getMessageId());
return dequeuedState;
}
}
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() + " remaining");
_log.debug("Nothing to send to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
return null;
}
@@ -1441,9 +1475,19 @@ class PeerState {
rv = delay;
}
}
// failsafe... is this OK?
if (rv > 100 && !_outboundQueue.isEmpty())
rv = 100;
return rv;
}
/**
* @since 0.9.3
*/
public boolean isBacklogged() {
return _dead || _outboundQueue.isBacklogged();
}
/**
* If set to true, we should throttle retransmissions of all but the first message in
* flight to a peer. If set to false, we will only throttle the initial flight of a
@@ -1521,8 +1565,8 @@ class PeerState {
int size = state.getUnackedSize();
if (allocateSendingBytes(size, state.getPushCount())) {
if (_log.shouldLog(Log.INFO))
_log.info("Allocation of " + size + " allowed with "
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocation of " + size + " allowed with "
+ getSendWindowBytesRemaining()
+ "/" + getSendWindowBytes()
+ " remaining"
@@ -1566,7 +1610,7 @@ class PeerState {
/**
* A full ACK was received.
* TODO if messages awaiting ack were a HashSet this would be faster.
* TODO if messages awaiting ack were a HashMap<Long, OutboundMessageState> this would be faster.
*
* @return true if the message was acked for the first time
*/
@@ -1620,8 +1664,8 @@ class PeerState {
state.releaseResources();
} else {
// dupack, likely
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received an ACK for a message not pending: " + messageId);
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Received an ACK for a message not pending: " + messageId);
}
return state != null;
}
@@ -1767,6 +1811,14 @@ class PeerState {
}
}
/**
* Convenience for OutboundMessageState so it can fail itself
* @since 0.9.3
*/
public UDPTransport getTransport() {
return _transport;
}
// why removed? Some risk of dups in OutboundMessageFragments._activePeers ???
/*

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Base64;
import net.i2p.data.DataHelper;
import net.i2p.data.RouterAddress;
import net.i2p.data.RouterInfo;
import net.i2p.data.SessionKey;
import net.i2p.router.CommSystemFacade;
@@ -584,7 +585,13 @@ class PeerTestManager {
aliceIntroKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]);
testInfo.readIntroKey(aliceIntroKey.getData(), 0);
UDPAddress addr = new UDPAddress(charlieInfo.getTargetAddress(UDPTransport.STYLE));
RouterAddress raddr = charlieInfo.getTargetAddress(UDPTransport.STYLE);
if (raddr == null) {
if (_log.shouldLog(Log.WARN))
_log.warn("Unable to pick a charlie");
return;
}
UDPAddress addr = new UDPAddress(raddr);
SessionKey charlieIntroKey = new SessionKey(addr.getIntroKey());
//UDPPacket packet = _packetBuilder.buildPeerTestToAlice(aliceIP, from.getPort(), aliceIntroKey, charlieIntroKey, nonce);

View File

@@ -12,9 +12,9 @@ import net.i2p.data.SessionKey;
* FIXME public for ConfigNetHelper
*/
public class UDPAddress {
private String _host;
private final String _host;
private InetAddress _hostAddress;
private int _port;
private final int _port;
private byte[] _introKey;
private String _introHosts[];
private InetAddress _introAddresses[];
@@ -23,8 +23,8 @@ public class UDPAddress {
private long _introTags[];
private int _mtu;
public static final String PROP_PORT = "port";
public static final String PROP_HOST = "host";
public static final String PROP_PORT = RouterAddress.PROP_PORT;
public static final String PROP_HOST = RouterAddress.PROP_HOST;
public static final String PROP_INTRO_KEY = "key";
public static final String PROP_MTU = "mtu";
@@ -40,16 +40,13 @@ public class UDPAddress {
public UDPAddress(RouterAddress addr) {
// TODO make everything final
if (addr == null) return;
_host = addr.getOption(PROP_HOST);
if (_host != null) _host = _host.trim();
try {
String port = addr.getOption(PROP_PORT);
if (port != null)
_port = Integer.parseInt(port);
} catch (NumberFormatException nfe) {
_port = -1;
if (addr == null) {
_host = null;
_port = 0;
return;
}
_host = addr.getOption(PROP_HOST);
_port = addr.getPort();
try {
String mtu = addr.getOption(PROP_MTU);
if (mtu != null)
@@ -146,7 +143,7 @@ public class UDPAddress {
}
/**
* @return 0 if unset; -1 if invalid
* @return 0 if unset or invalid
*/
public int getPort() { return _port; }

View File

@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.data.SessionKey;
import net.i2p.router.util.CDQEntry;
import net.i2p.util.Log;
/**
@@ -16,7 +17,7 @@ import net.i2p.util.Log;
* of object instances to allow rapid reuse.
*
*/
class UDPPacket {
class UDPPacket implements CDQEntry {
private I2PAppContext _context;
private final DatagramPacket _packet;
private volatile short _priority;
@@ -246,8 +247,12 @@ class UDPPacket {
_context.aes().decrypt(_data, _packet.getOffset() + MAC_SIZE + IV_SIZE, _data, _packet.getOffset() + MAC_SIZE + IV_SIZE, cipherKey, _ivBuf, len - MAC_SIZE - IV_SIZE);
}
/** the UDPReceiver has tossed it onto the inbound queue */
void enqueue() { _enqueueTime = _context.clock().now(); }
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) { _enqueueTime = now; }
/** a packet handler has pulled it off the inbound queue */
void received() { _receivedTime = _context.clock().now(); }
@@ -256,8 +261,11 @@ class UDPPacket {
/** a packet handler has finished parsing out the good bits */
//void afterHandling() { _afterHandlingTime = _context.clock().now(); }
/** the UDPReceiver has tossed it onto the inbound queue */
//long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); }
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() { return _enqueueTime; }
/** a packet handler has pulled it off the inbound queue */
long getTimeSinceReceived() { return (_receivedTime > 0 ? _context.clock().now() - _receivedTime : 0); }
@@ -269,8 +277,6 @@ class UDPPacket {
// Following 5: All used only for stats in PacketHandler, commented out
/** when it was added to the endpoint's receive queue */
//long getEnqueueTime() { return _enqueueTime; }
/** when it was pulled off the endpoint receive queue */
//long getReceivedTime() { return _receivedTime; }
/** when we began validate() */
@@ -326,6 +332,14 @@ class UDPPacket {
return rv;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
release();
}
public void release() {
verifyNotReleased();
_released = true;

View File

@@ -4,10 +4,10 @@ import java.io.IOException;
import java.net.DatagramSocket;
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
@@ -47,7 +47,7 @@ class UDPReceiver {
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
_inboundQueue = new LinkedBlockingQueue(qsize);
_inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize);
_socket = socket;
_transport = transport;
_runner = new Runner();
@@ -177,6 +177,7 @@ class UDPReceiver {
return 0;
}
/****
packet.enqueue();
boolean rejected = false;
int queueSize = 0;
@@ -190,6 +191,7 @@ class UDPReceiver {
}
}
if (!rejected) {
****/
try {
_inboundQueue.put(packet);
} catch (InterruptedException ie) {
@@ -198,6 +200,7 @@ class UDPReceiver {
}
//return queueSize + 1;
return 0;
/****
}
// rejected
@@ -214,6 +217,7 @@ class UDPReceiver {
_log.warn(msg.toString());
}
return queueSize;
****/
}
/****

View File

@@ -4,10 +4,10 @@ import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.FIFOBandwidthLimiter;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -35,7 +35,7 @@ class UDPSender {
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
_outboundQueue = new LinkedBlockingQueue(qsize);
_outboundQueue = new CoDelBlockingQueue(ctx, "UDP-Sender", qsize);
_socket = socket;
_runner = new Runner();
_name = name;

View File

@@ -42,6 +42,7 @@ import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
import net.i2p.util.Translate;
/**
@@ -369,7 +370,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
// _flooder.startup();
_expireEvent.setIsAlive(true);
_testEvent.setIsAlive(true); // this queues it for 3-6 minutes in the future...
SimpleTimer.getInstance().addEvent(_testEvent, 10*1000); // lets requeue it for Real Soon
_testEvent.reschedule(10*1000); // lets requeue it for Real Soon
}
public void shutdown() {
@@ -681,7 +682,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_context.router().rebuildRouterInfo();
}
_testEvent.forceRun();
SimpleTimer.getInstance().addEvent(_testEvent, 5*1000);
_testEvent.reschedule(5*1000);
return updated;
}
@@ -859,7 +860,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
if (getReachabilityStatus() != CommSystemFacade.STATUS_OK) {
_testEvent.forceRun();
SimpleTimer.getInstance().addEvent(_testEvent, 0);
_testEvent.reschedule(0);
}
return true;
}
@@ -933,7 +934,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
}
private class RemoveDropList implements SimpleTimer.TimedEvent {
private RemoteHostId _peer;
private final RemoteHostId _peer;
public RemoveDropList(RemoteHostId peer) { _peer = peer; }
public void timeReached() {
_dropList.remove(_peer);
@@ -1202,27 +1203,14 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//UDPAddress ua = new UDPAddress(addr);
//if (ua.getIntroducerCount() <= 0) {
if (addr.getOption("ihost0") == null) {
String host = addr.getOption(UDPAddress.PROP_HOST);
String port = addr.getOption(UDPAddress.PROP_PORT);
if (host == null || port == null) {
byte[] ip = addr.getIP();
int port = addr.getPort();
if (ip == null || port <= 0 ||
(!isValid(ip)) ||
Arrays.equals(ip, getExternalIP())) {
markUnreachable(to);
return null;
}
try {
InetAddress ia = InetAddress.getByName(host);
int iport = Integer.parseInt(port);
if (iport <= 0 || iport > 65535 || (!isValid(ia.getAddress())) ||
Arrays.equals(ia.getAddress(), getExternalIP())) {
markUnreachable(to);
return null;
}
} catch (UnknownHostException uhe) {
markUnreachable(to);
return null;
} catch (NumberFormatException nfe) {
markUnreachable(to);
return null;
}
}
if (!allowConnection())
return _cachedBid[TRANSIENT_FAIL_BID];
@@ -1337,6 +1325,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_fragments.add(msg);
}
/**
* "injected" message from the EstablishmentManager
*/
void send(I2NPMessage msg, PeerState peer) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Injecting a data message to a new peer: " + peer);
@@ -1446,7 +1437,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
addr.setCost(DEFAULT_COST + 1);
else
addr.setCost(DEFAULT_COST);
addr.setExpiration(null);
//addr.setExpiration(null);
addr.setTransportStyle(STYLE);
addr.setOptions(options);
@@ -1687,7 +1678,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
return getPeerState(dest) != null;
}
/**
* @since 0.9.3
*/
@Override
public boolean isBacklogged(Hash dest) {
PeerState peer = _peersByIdent.get(dest);
return peer != null && peer.isBacklogged();
}
public boolean allowConnection() {
return _peersByIdent.size() < getMaxConnections();
}
@@ -2196,6 +2197,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
buf.append(THINSP).append(peer.getConcurrentSends());
buf.append(THINSP).append(peer.getConcurrentSendWindow());
buf.append(THINSP).append(peer.getConsecutiveSendRejections());
if (peer.isBacklogged())
buf.append(' ').append(_("backlogged"));
buf.append("</td>");
buf.append("<td class=\"cells\" align=\"right\">");
@@ -2361,12 +2364,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public String toString() { return "UDP bid @ " + getLatencyMs(); }
}
private class ExpirePeerEvent implements SimpleTimer.TimedEvent {
private class ExpirePeerEvent extends SimpleTimer2.TimedEvent {
private final Set<PeerState> _expirePeers;
private final List<PeerState> _expireBuffer;
private volatile boolean _alive;
public ExpirePeerEvent() {
super(_context.simpleTimer2());
_expirePeers = new ConcurrentHashSet(128);
_expireBuffer = new ArrayList();
}
@@ -2403,7 +2407,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_expireBuffer.clear();
if (_alive)
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
schedule(30*1000);
}
public void add(PeerState peer) {
_expirePeers.add(peer);
@@ -2414,9 +2418,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
public void setIsAlive(boolean isAlive) {
_alive = isAlive;
if (isAlive) {
SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000);
reschedule(30*1000);
} else {
SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this);
cancel();
_expirePeers.clear();
}
}
@@ -2515,12 +2519,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
//return ( (val != null) && ("true".equals(val)) );
}
private class PeerTestEvent implements SimpleTimer.TimedEvent {
private class PeerTestEvent extends SimpleTimer2.TimedEvent {
private volatile boolean _alive;
/** when did we last test our reachability */
private long _lastTested;
private boolean _forceRun;
PeerTestEvent() {
super(_context.simpleTimer2());
}
public void timeReached() {
if (shouldTest()) {
long now = _context.clock().now();
@@ -2532,7 +2540,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long delay = (TEST_FREQUENCY / 2) + _context.random().nextInt(TEST_FREQUENCY);
if (delay <= 0)
throw new RuntimeException("wtf, delay is " + delay);
SimpleTimer.getInstance().addEvent(PeerTestEvent.this, delay);
schedule(delay);
}
}
@@ -2558,9 +2566,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_alive = isAlive;
if (isAlive) {
long delay = _context.random().nextInt(2*TEST_FREQUENCY);
SimpleTimer.getInstance().addEvent(PeerTestEvent.this, delay);
reschedule(delay);
} else {
SimpleTimer.getInstance().removeEvent(PeerTestEvent.this);
cancel();
}
}
}

View File

@@ -104,7 +104,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
/* See TunnelGateway.QueuePreprocessor for Javadoc */
@Override
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.INFO))
display(0, pending, "Starting");
StringBuilder timingBuf = null;
@@ -131,7 +131,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
// loop until we fill up a single message
for (int i = 0; i < pending.size(); i++) {
long pendingStart = System.currentTimeMillis();
TunnelGateway.Pending msg = pending.get(i);
PendingGatewayMessage msg = pending.get(i);
int instructionsSize = getInstructionsSize(msg);
instructionsSize += getInstructionAugmentationSize(msg, allocated, instructionsSize);
int curWanted = msg.getData().length - msg.getOffset() + instructionsSize;
@@ -169,7 +169,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
// Remove what we sent from the pending queue
for (int j = 0; j < i; j++) {
TunnelGateway.Pending cur = pending.remove(0);
PendingGatewayMessage cur = pending.remove(0);
if (cur.getOffset() < cur.getData().length)
throw new IllegalArgumentException("i=" + i + " j=" + j + " off=" + cur.getOffset()
+ " len=" + cur.getData().length + " alloc=" + allocated);
@@ -181,7 +181,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
}
if (msg.getOffset() >= msg.getData().length) {
// ok, this last message fit perfectly, remove it too
TunnelGateway.Pending cur = pending.remove(0);
PendingGatewayMessage cur = pending.remove(0);
if (timingBuf != null)
timingBuf.append(" sent perfect fit " + cur).append(".");
notePreprocessing(cur.getMessageId(), cur.getFragmentNumber(), msg.getData().length, msg.getMessageIds(), "flushed tail, remaining: " + pending);
@@ -230,7 +230,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
// Remove everything in the outgoing message from the pending queue
int beforeSize = pending.size();
for (int i = 0; i < beforeSize; i++) {
TunnelGateway.Pending cur = pending.get(0);
PendingGatewayMessage cur = pending.get(0);
if (cur.getOffset() < cur.getData().length)
break;
pending.remove(0);
@@ -316,7 +316,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
*
* title: allocated: X pending: X (delay: X) [0]:offset/length/lifetime [1]:etc.
*/
private void display(long allocated, List<TunnelGateway.Pending> pending, String title) {
private void display(long allocated, List<PendingGatewayMessage> pending, String title) {
if (_log.shouldLog(Log.INFO)) {
long highestDelay = 0;
StringBuilder buf = new StringBuilder(128);
@@ -327,7 +327,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
if (_pendingSince > 0)
buf.append(" delay: ").append(getDelayAmount(false));
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending curPending = pending.get(i);
PendingGatewayMessage curPending = pending.get(i);
buf.append(" [").append(i).append("]:");
buf.append(curPending.getOffset()).append('/').append(curPending.getData().length).append('/');
buf.append(curPending.getLifetime());
@@ -347,7 +347,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
* @param startAt first index in pending to send (inclusive)
* @param sendThrough last index in pending to send (inclusive)
*/
protected void send(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
protected void send(List<PendingGatewayMessage> pending, int startAt, int sendThrough, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + startAt + ":" + sendThrough + " out of " + pending);
@@ -384,7 +384,7 @@ class BatchedPreprocessor extends TrivialPreprocessor {
long msgId = sender.sendPreprocessed(preprocessed, rec);
for (int i = 0; i < pending.size(); i++) {
TunnelGateway.Pending cur = pending.get(i);
PendingGatewayMessage cur = pending.get(i);
cur.addMessageId(msgId);
}
if (_log.shouldLog(Log.DEBUG))
@@ -397,9 +397,9 @@ class BatchedPreprocessor extends TrivialPreprocessor {
*
* @return new offset into the target for further bytes to be written
*/
private int writeFragments(List<TunnelGateway.Pending> pending, int startAt, int sendThrough, byte target[], int offset) {
private int writeFragments(List<PendingGatewayMessage> pending, int startAt, int sendThrough, byte target[], int offset) {
for (int i = startAt; i <= sendThrough; i++) {
TunnelGateway.Pending msg = pending.get(i);
PendingGatewayMessage msg = pending.get(i);
int prevOffset = offset;
if (msg.getOffset() == 0) {
offset = writeFirstFragment(msg, target, offset);

View File

@@ -5,7 +5,7 @@ import net.i2p.data.DataHelper;
import net.i2p.router.RouterContext;
import net.i2p.router.util.DecayingBloomFilter;
import net.i2p.router.util.DecayingHashSet;
import net.i2p.util.ByteCache;
import net.i2p.util.SimpleByteCache;
/**
* Manage the IV validation for all of the router's tunnels by way of a big
@@ -15,7 +15,6 @@ import net.i2p.util.ByteCache;
class BloomFilterIVValidator implements IVValidator {
private final RouterContext _context;
private final DecayingBloomFilter _filter;
private final ByteCache _ivXorCache = ByteCache.getInstance(32, HopProcessor.IV_LENGTH);
/**
* After 2*halflife, an entry is completely forgotten from the bloom filter.
@@ -57,10 +56,10 @@ class BloomFilterIVValidator implements IVValidator {
}
public boolean receiveIV(byte ivData[], int ivOffset, byte payload[], int payloadOffset) {
ByteArray buf = _ivXorCache.acquire();
DataHelper.xor(ivData, ivOffset, payload, payloadOffset, buf.getData(), 0, HopProcessor.IV_LENGTH);
boolean dup = _filter.add(buf.getData());
_ivXorCache.release(buf);
byte[] buf = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
DataHelper.xor(ivData, ivOffset, payload, payloadOffset, buf, 0, HopProcessor.IV_LENGTH);
boolean dup = _filter.add(buf);
SimpleByteCache.release(buf);
if (dup) _context.statManager().addRateData("tunnel.duplicateIV", 1);
return !dup; // return true if it is OK, false if it isn't
}

View File

@@ -16,7 +16,7 @@ import net.i2p.util.ByteCache;
import net.i2p.util.HexDump;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Handle fragments at the endpoint of a tunnel, peeling off fully completed
@@ -369,7 +369,7 @@ class FragmentHandler {
_fragmentedMessages.remove(Long.valueOf(messageId));
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
msg.getExpireEvent().cancel();
receiveComplete(msg);
} else {
noteReception(msg.getMessageId(), 0, msg);
@@ -378,7 +378,7 @@ class FragmentHandler {
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + messageId);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
evt.schedule(MAX_DEFRAGMENT_TIME);
}
}
}
@@ -437,7 +437,7 @@ class FragmentHandler {
_fragmentedMessages.remove(Long.valueOf(messageId));
}
if (msg.getExpireEvent() != null)
SimpleTimer.getInstance().removeEvent(msg.getExpireEvent());
msg.getExpireEvent().cancel();
_context.statManager().addRateData("tunnel.fragmentedComplete", msg.getFragmentCount(), msg.getLifetime());
receiveComplete(msg);
} else {
@@ -447,7 +447,7 @@ class FragmentHandler {
msg.setExpireEvent(evt);
if (_log.shouldLog(Log.DEBUG))
_log.debug("In " + MAX_DEFRAGMENT_TIME + " dropping " + msg.getMessageId() + "/" + fragmentNum);
SimpleTimer.getInstance().addEvent(evt, MAX_DEFRAGMENT_TIME);
evt.schedule(MAX_DEFRAGMENT_TIME);
}
}
}
@@ -548,10 +548,11 @@ class FragmentHandler {
public void receiveComplete(I2NPMessage msg, Hash toRouter, TunnelId toTunnel);
}
private class RemoveFailed implements SimpleTimer.TimedEvent {
private class RemoveFailed extends SimpleTimer2.TimedEvent {
private final FragmentedMessage _msg;
public RemoveFailed(FragmentedMessage msg) {
super(_context.simpleTimer2());
_msg = msg;
}

View File

@@ -7,7 +7,7 @@ import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Gather fragments of I2NPMessages at a tunnel endpoint, making them available
@@ -28,7 +28,7 @@ class FragmentedMessage {
private final long _createdOn;
private boolean _completed;
private long _releasedAfter;
private SimpleTimer.TimedEvent _expireEvent;
private SimpleTimer2.TimedEvent _expireEvent;
private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
// 64 is pretty absurd, 32 is too, most likely
@@ -160,9 +160,11 @@ class FragmentedMessage {
found++;
return found;
}
/** used in the fragment handler so we can cancel the expire event on success */
SimpleTimer.TimedEvent getExpireEvent() { return _expireEvent; }
void setExpireEvent(SimpleTimer.TimedEvent evt) { _expireEvent = evt; }
public SimpleTimer2.TimedEvent getExpireEvent() { return _expireEvent; }
public void setExpireEvent(SimpleTimer2.TimedEvent evt) { _expireEvent = evt; }
/** have we received all of the fragments? */
public boolean isComplete() {

View File

@@ -2,7 +2,6 @@ package net.i2p.router.tunnel;
import net.i2p.I2PAppContext;
import net.i2p.data.Hash;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
/**
@@ -29,7 +28,6 @@ class HopProcessor {
*/
static final boolean USE_DOUBLE_IV_ENCRYPTION = true;
static final int IV_LENGTH = 16;
private static final ByteCache _cache = ByteCache.getInstance(128, IV_LENGTH);
/** @deprecated unused */
public HopProcessor(I2PAppContext ctx, HopConfig config) {

View File

@@ -3,8 +3,8 @@ package net.i2p.router.tunnel;
import net.i2p.data.ByteArray;
import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
/**
* Receive the inbound tunnel message, removing all of the layers
@@ -21,7 +21,6 @@ class InboundEndpointProcessor {
private final IVValidator _validator;
static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
private static final ByteCache _cache = ByteCache.getInstance(128, HopProcessor.IV_LENGTH);
/** @deprecated unused */
public InboundEndpointProcessor(RouterContext ctx, TunnelCreatorConfig cfg) {
@@ -54,8 +53,7 @@ class InboundEndpointProcessor {
return false;
}
ByteArray ba = _cache.acquire();
byte iv[] = ba.getData(); //new byte[HopProcessor.IV_LENGTH];
byte iv[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
System.arraycopy(orig, offset, iv, 0, iv.length);
//if (_config.getLength() > 1)
// _log.debug("IV at inbound endpoint before decrypt: " + Base64.encode(iv));
@@ -64,7 +62,7 @@ class InboundEndpointProcessor {
if (!ok) {
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid IV, dropping at IBEP " + _config);
_cache.release(ba);
SimpleByteCache.release(iv);
return false;
}
@@ -72,7 +70,7 @@ class InboundEndpointProcessor {
if (USE_ENCRYPTION)
decrypt(_context, _config, iv, orig, offset, length);
_cache.release(ba);
SimpleByteCache.release(iv);
if (_config.getLength() > 0) {
int rtt = 0; // dunno... may not be related to an rtt
@@ -91,8 +89,7 @@ class InboundEndpointProcessor {
*/
private void decrypt(RouterContext ctx, TunnelCreatorConfig cfg, byte iv[], byte orig[], int offset, int length) {
//Log log = ctx.logManager().getLog(OutboundGatewayProcessor.class);
ByteArray ba = _cache.acquire();
byte cur[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH]; // so we dont malloc
byte cur[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
for (int i = cfg.getLength()-2; i >= 0; i--) { // dont include the endpoint, since that is the creator
OutboundGatewayProcessor.decrypt(ctx, iv, orig, offset, length, cur, cfg.getConfig(i));
//if (log.shouldLog(Log.DEBUG)) {
@@ -100,7 +97,7 @@ class InboundEndpointProcessor {
//log.debug("hop " + i + ": " + Base64.encode(orig, offset + HopProcessor.IV_LENGTH, length - HopProcessor.IV_LENGTH));
//}
}
_cache.release(ba);
SimpleByteCache.release(cur);
}
}

View File

@@ -15,6 +15,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
private RouterInfo _target;
private static final long MAX_LOOKUP_TIME = 15*1000;
private static final int PRIORITY = OutNetMessage.PRIORITY_PARTICIPATING;
public InboundGatewayReceiver(RouterContext ctx, HopConfig cfg) {
_context = ctx;
@@ -58,7 +59,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
out.setMessage(msg);
out.setTarget(_target);
out.setExpiration(msg.getMessageExpiration());
out.setPriority(200);
out.setPriority(PRIORITY);
_context.outNetMessagePool().add(out);
return msg.getUniqueId();
}

View File

@@ -0,0 +1,91 @@
package net.i2p.router.tunnel;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.*;
import net.i2p.router.util.CDPQEntry;
/**
* Stores all the state for an unsent or partially-sent message
*
* @since 0.9.3
*/
class OutboundGatewayMessage extends PendingGatewayMessage implements CDPQEntry {
private long _seqNum;
private final int _priority;
public OutboundGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel);
_priority = getPriority(message);
}
/**
* For CDPQ
*/
public void setSeqNum(long num) {
_seqNum = num;
}
/**
* For CDPQ
*/
public long getSeqNum() {
return _seqNum;
}
/**
* For CDPQ
*/
public int getPriority() {
return _priority;
}
/**
* This is just for priority in the queue waiting for the fragmenter.
* After the fragmenter, they will be OutNetMessages with priority 400.
* We use the same 100-500 priority as OutNetMessage so the stats
* in CoDelPriorityBlockingQueue work.
*
* We could - perhaps - have BatchedPreprocessor pass the max priority of
* any message fragment in a TunnelDataMessage to the OutboundReceiver, to
* set the OutNetMessage priority - but that may just make more of an
* out-of-order mess and failed reconstruction of fragments.
*/
private static int getPriority(I2NPMessage message) {
switch (message.getType()) {
// tagset/LS reply
case DeliveryStatusMessage.MESSAGE_TYPE:
return 1000;
// building new IB tunnel
case TunnelBuildMessage.MESSAGE_TYPE:
case VariableTunnelBuildMessage.MESSAGE_TYPE:
return 500;
// LS store
case DatabaseStoreMessage.MESSAGE_TYPE:
return 400;
// LS verify
case DatabaseLookupMessage.MESSAGE_TYPE:
return 300;
// regular data
case GarlicMessage.MESSAGE_TYPE:
return 200;
// these shouldn't go into a OBGW
case DatabaseSearchReplyMessage.MESSAGE_TYPE:
case DataMessage.MESSAGE_TYPE:
case TunnelBuildReplyMessage.MESSAGE_TYPE:
case TunnelDataMessage.MESSAGE_TYPE:
case TunnelGatewayMessage.MESSAGE_TYPE:
case VariableTunnelBuildReplyMessage.MESSAGE_TYPE:
default:
return 100;
}
}
}

View File

@@ -3,8 +3,8 @@ package net.i2p.router.tunnel;
import net.i2p.I2PAppContext;
import net.i2p.data.Base64;
import net.i2p.data.ByteArray;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
/**
* Turn the preprocessed tunnel data into something that can be delivered to the
@@ -18,7 +18,6 @@ class OutboundGatewayProcessor {
private final TunnelCreatorConfig _config;
static final boolean USE_ENCRYPTION = HopProcessor.USE_ENCRYPTION;
private static final ByteCache _cache = ByteCache.getInstance(128, HopProcessor.IV_LENGTH);
public OutboundGatewayProcessor(I2PAppContext ctx, TunnelCreatorConfig cfg) {
_context = ctx;
@@ -35,8 +34,7 @@ class OutboundGatewayProcessor {
* @param length how much of orig can we write to (must be a multiple of 16).
*/
public void process(byte orig[], int offset, int length) {
ByteArray ba = _cache.acquire();
byte iv[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH];
byte iv[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
//_context.random().nextBytes(iv);
//System.arraycopy(iv, 0, orig, offset, HopProcessor.IV_LENGTH);
System.arraycopy(orig, offset, iv, 0, HopProcessor.IV_LENGTH);
@@ -49,7 +47,7 @@ class OutboundGatewayProcessor {
decrypt(_context, _config, iv, orig, offset, length);
if (_log.shouldLog(Log.DEBUG))
_log.debug("finished processing the preprocessed data");
_cache.release(ba);
SimpleByteCache.release(iv);
}
/**
@@ -58,8 +56,7 @@ class OutboundGatewayProcessor {
*/
private void decrypt(I2PAppContext ctx, TunnelCreatorConfig cfg, byte iv[], byte orig[], int offset, int length) {
Log log = ctx.logManager().getLog(OutboundGatewayProcessor.class);
ByteArray ba = _cache.acquire();
byte cur[] = ba.getData(); // new byte[HopProcessor.IV_LENGTH]; // so we dont malloc
byte cur[] = SimpleByteCache.acquire(HopProcessor.IV_LENGTH);
for (int i = cfg.getLength()-1; i >= 1; i--) { // dont include hop 0, since that is the creator
decrypt(ctx, iv, orig, offset, length, cur, cfg.getConfig(i));
if (log.shouldLog(Log.DEBUG)) {
@@ -67,7 +64,7 @@ class OutboundGatewayProcessor {
//log.debug("hop " + i + ": " + Base64.encode(orig, offset + HopProcessor.IV_LENGTH, length - HopProcessor.IV_LENGTH));
}
}
_cache.release(ba);
SimpleByteCache.release(cur);
}
/**

View File

@@ -20,6 +20,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
private RouterInfo _nextHopCache;
private static final long MAX_LOOKUP_TIME = 15*1000;
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_DATA;
public OutboundReceiver(RouterContext ctx, TunnelCreatorConfig cfg) {
_context = ctx;
@@ -61,7 +62,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
m.setMessage(msg);
m.setExpiration(msg.getMessageExpiration());
m.setTarget(ri);
m.setPriority(400);
m.setPriority(PRIORITY);
_context.outNetMessagePool().add(m);
_config.incrementProcessedMessages();
}

View File

@@ -0,0 +1,134 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.List;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CDQEntry;
/**
* Stores all the state for an unsent or partially-sent message
*
* @since 0.9.3 refactored from TunnelGateway.Pending
*/
class PendingGatewayMessage implements CDQEntry {
protected final Hash _toRouter;
protected final TunnelId _toTunnel;
protected final long _messageId;
protected final long _expiration;
protected final byte _remaining[];
protected int _offset;
protected int _fragmentNumber;
protected final long _created;
private List<Long> _messageIds;
private long _enqueueTime;
public PendingGatewayMessage(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
_toRouter = toRouter;
_toTunnel = toTunnel;
_messageId = message.getUniqueId();
_expiration = message.getMessageExpiration();
_remaining = message.toByteArray();
_created = System.currentTimeMillis();
}
/** may be null */
public Hash getToRouter() { return _toRouter; }
/** may be null */
public TunnelId getToTunnel() { return _toTunnel; }
public long getMessageId() { return _messageId; }
public long getExpiration() { return _expiration; }
/** raw unfragmented message to send */
public byte[] getData() { return _remaining; }
/** index into the data to be sent */
public int getOffset() { return _offset; }
/** move the offset */
public void setOffset(int offset) { _offset = offset; }
public long getLifetime() { return System.currentTimeMillis()-_created; }
/** which fragment are we working on (0 for the first fragment) */
public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; }
/**
* Add an ID to the list of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public void addMessageId(long id) {
synchronized (this) {
if (_messageIds == null)
_messageIds = new ArrayList();
_messageIds.add(Long.valueOf(id));
}
}
/**
* The IDs of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public List<Long> getMessageIds() {
synchronized (this) {
if (_messageIds != null)
return new ArrayList(_messageIds);
else
return new ArrayList();
}
}
/**
* For CDQ
* @since 0.9.3
*/
public void setEnqueueTime(long now) {
_enqueueTime = now;
}
/**
* For CDQ
* @since 0.9.3
*/
public long getEnqueueTime() {
return _enqueueTime;
}
/**
* For CDQ
* @since 0.9.3
*/
public void drop() {
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("Message ").append(_messageId); //.append(" on ");
//buf.append(TunnelGateway.this.toString());
if (_toRouter != null) {
buf.append(" targetting ");
buf.append(_toRouter.toBase64()).append(" ");
if (_toTunnel != null)
buf.append(_toTunnel.getTunnelId());
}
buf.append(" actual lifetime ");
buf.append(getLifetime()).append("ms");
buf.append(" potential lifetime ");
buf.append(_expiration - _created).append("ms");
buf.append(" size ").append(_remaining.length);
buf.append(" offset ").append(_offset);
buf.append(" frag ").append(_fragmentNumber);
return buf.toString();
}
}

View File

@@ -2,13 +2,14 @@ package net.i2p.router.tunnel;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.router.util.CoDelBlockingQueue;
import net.i2p.router.util.CoDelPriorityBlockingQueue;
import net.i2p.util.Log;
/**
@@ -35,9 +36,15 @@ import net.i2p.util.Log;
*
*/
class PumpedTunnelGateway extends TunnelGateway {
private final BlockingQueue<Pending> _prequeue;
private final BlockingQueue<PendingGatewayMessage> _prequeue;
private final TunnelGatewayPumper _pumper;
private final boolean _isInbound;
private static final int MAX_OB_MSGS_PER_PUMP = 16;
private static final int MAX_IB_MSGS_PER_PUMP = 8;
private static final int INITIAL_OB_QUEUE = 64;
private static final int MAX_IB_QUEUE = 1024;
/**
* @param preprocessor this pulls Pending messages off a list, builds some
* full preprocessed messages, and pumps those into the sender
@@ -48,7 +55,15 @@ class PumpedTunnelGateway extends TunnelGateway {
*/
public PumpedTunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver, TunnelGatewayPumper pumper) {
super(context, preprocessor, sender, receiver);
_prequeue = new LinkedBlockingQueue();
if (getClass() == PumpedTunnelGateway.class) {
// Unbounded priority queue for outbound
_prequeue = new CoDelPriorityBlockingQueue(context, "OBGW", INITIAL_OB_QUEUE);
_isInbound = false;
} else { // extended by ThrottledPTG for IB
// Bounded non-priority queue for inbound
_prequeue = new CoDelBlockingQueue(context, "IBGW", MAX_IB_QUEUE);
_isInbound = true;
}
_pumper = pumper;
}
@@ -57,16 +72,26 @@ class PumpedTunnelGateway extends TunnelGateway {
* coallesced with other pending messages) or after a brief pause (_flushFrequency).
* If it is queued up past its expiration, it is silently dropped
*
* This is only for OBGWs. See TPTG override for IBGWs.
*
* @param msg message to be sent through the tunnel
* @param toRouter router to send to after the endpoint (or null for endpoint processing)
* @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
*/
@Override
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
OutboundGatewayMessage cur = new OutboundGatewayMessage(msg, toRouter, toTunnel);
if (_log.shouldLog(Log.DEBUG))
_log.debug("OB PTG add type " + msg.getType() + " pri " + cur.getPriority());
add(cur);
}
protected void add(PendingGatewayMessage cur) {
_messagesSent++;
Pending cur = new PendingImpl(msg, toRouter, toTunnel);
_prequeue.offer(cur);
_pumper.wantsPumping(this);
if (_prequeue.offer(cur))
_pumper.wantsPumping(this);
else
_context.statManager().addRateData("tunnel.dropGatewayOverflow", 1);
}
/**
@@ -79,8 +104,12 @@ class PumpedTunnelGateway extends TunnelGateway {
* @param queueBuf Empty list for convenience, to use as a temporary buffer.
* Must be empty when called; will always be emptied before return.
*/
void pump(List<Pending> queueBuf) {
_prequeue.drainTo(queueBuf);
void pump(List<PendingGatewayMessage> queueBuf) {
// TODO if an IBGW, and the next hop is backlogged,
// drain less or none... better to let things back up here.
// Don't do this for OBGWs?
int max = _isInbound ? MAX_IB_MSGS_PER_PUMP : MAX_OB_MSGS_PER_PUMP;
_prequeue.drainTo(queueBuf, max);
if (queueBuf.isEmpty())
return;
@@ -105,7 +134,7 @@ class PumpedTunnelGateway extends TunnelGateway {
// expire any as necessary, even if its framented
for (int i = 0; i < _queue.size(); i++) {
Pending m = _queue.get(i);
PendingGatewayMessage m = _queue.get(i);
if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
@@ -120,18 +149,21 @@ class PumpedTunnelGateway extends TunnelGateway {
}
if (delayedFlush) {
_context.simpleTimer().addEvent(_delayedFlush, delayAmount);
_delayedFlush.reschedule(delayAmount);
}
_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
long complete = System.currentTimeMillis();
if (_log.shouldLog(Log.DEBUG))
//_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
if (_log.shouldLog(Log.DEBUG)) {
long complete = System.currentTimeMillis();
_log.debug("Time to add " + queueBuf.size() + " messages to " + toString() + ": " + (complete-startAdd)
+ " delayed? " + delayedFlush + " remaining: " + remaining
+ " add: " + (afterAdded-beforeLock)
+ " preprocess: " + (afterPreprocess-afterAdded)
+ " expire: " + (afterExpire-afterPreprocess)
+ " queue flush: " + (complete-afterExpire));
}
queueBuf.clear();
if (!_prequeue.isEmpty())
_pumper.wantsPumping(this);
}
}

View File

@@ -42,6 +42,6 @@ class ThrottledPumpedTunnelGateway extends PumpedTunnelGateway {
_config.incrementProcessedMessages();
return;
}
super.add(msg, toRouter, toTunnel);
add(new PendingGatewayMessage(msg, toRouter, toTunnel));
}
}

View File

@@ -9,6 +9,7 @@ import net.i2p.data.Hash;
import net.i2p.router.RouterContext;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
import net.i2p.util.SimpleByteCache;
/**
* Do the simplest thing possible for preprocessing - for each message available,
@@ -33,9 +34,6 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
*/
protected static final ByteCache _dataCache = ByteCache.getInstance(32, PREPROCESSED_SIZE);
private static final ByteCache _ivCache = ByteCache.getInstance(128, IV_SIZE);
private static final ByteCache _hashCache = ByteCache.getInstance(128, Hash.HASH_LENGTH);
public TrivialPreprocessor(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
@@ -50,7 +48,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
*
* NOTE: Unused here, see BatchedPreprocessor override, super is not called.
*/
public boolean preprocessQueue(List<TunnelGateway.Pending> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
public boolean preprocessQueue(List<PendingGatewayMessage> pending, TunnelGateway.Sender sender, TunnelGateway.Receiver rec) {
throw new IllegalArgumentException("unused, right?");
}
@@ -63,16 +61,15 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* @param fragmentLength fragments[0:fragmentLength] is used
*/
protected void preprocess(byte fragments[], int fragmentLength) {
ByteArray ivBuf = _ivCache.acquire();
byte iv[] = ivBuf.getData(); // new byte[IV_SIZE];
byte iv[] = SimpleByteCache.acquire(IV_SIZE);
_context.random().nextBytes(iv);
// payload ready, now H(instructions+payload+IV)
System.arraycopy(iv, 0, fragments, fragmentLength, IV_SIZE);
ByteArray hashBuf = _hashCache.acquire();
byte[] hashBuf = SimpleByteCache.acquire(Hash.HASH_LENGTH);
//Hash h = _context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE);
_context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, hashBuf.getData(), 0);
_context.sha().calculateHash(fragments, 0, fragmentLength + IV_SIZE, hashBuf, 0);
//Hash h = _context.sha().calculateHash(target, 0, offset + IV_SIZE);
//_log.debug("before shift: " + Base64.encode(target));
@@ -91,12 +88,12 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
System.arraycopy(iv, 0, fragments, offset, IV_SIZE);
offset += IV_SIZE;
//System.arraycopy(h.getData(), 0, fragments, offset, 4);
System.arraycopy(hashBuf.getData(), 0, fragments, offset, 4);
System.arraycopy(hashBuf, 0, fragments, offset, 4);
offset += 4;
//_log.debug("before pad : " + Base64.encode(target));
_hashCache.release(hashBuf);
_ivCache.release(ivBuf);
SimpleByteCache.release(hashBuf);
SimpleByteCache.release(iv);
// fits in a single message, so may be smaller than the full size
int numPadBytes = PREPROCESSED_SIZE // max
@@ -155,7 +152,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
private static final byte MASK_TUNNEL = (byte)(FragmentHandler.TYPE_TUNNEL << 5);
private static final byte MASK_ROUTER = (byte)(FragmentHandler.TYPE_ROUTER << 5);
protected int writeFirstFragment(TunnelGateway.Pending msg, byte target[], int offset) {
protected int writeFirstFragment(PendingGatewayMessage msg, byte target[], int offset) {
boolean fragmented = false;
int instructionsLength = getInstructionsSize(msg);
int payloadLength = msg.getData().length - msg.getOffset();
@@ -221,7 +218,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
return offset;
}
protected int writeSubsequentFragment(TunnelGateway.Pending msg, byte target[], int offset) {
protected int writeSubsequentFragment(PendingGatewayMessage msg, byte target[], int offset) {
boolean isLast = true;
int instructionsLength = getInstructionsSize(msg);
@@ -269,7 +266,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
* Does NOT include 4 for the message ID if the message will be fragmented;
* call getInstructionAugmentationSize() for that.
*/
protected int getInstructionsSize(TunnelGateway.Pending msg) {
protected int getInstructionsSize(PendingGatewayMessage msg) {
if (msg.getFragmentNumber() > 0)
return 7;
// control byte
@@ -287,7 +284,7 @@ class TrivialPreprocessor implements TunnelGateway.QueuePreprocessor {
}
/** @return 0 or 4 */
protected int getInstructionAugmentationSize(TunnelGateway.Pending msg, int offset, int instructionsSize) {
protected int getInstructionAugmentationSize(PendingGatewayMessage msg, int offset, int instructionsSize) {
int payloadLength = msg.getData().length - msg.getOffset();
if (offset + payloadLength + instructionsSize + IV_SIZE + 1 + 4 > PREPROCESSED_SIZE) {
// requires fragmentation, so include the messageId

View File

@@ -29,6 +29,43 @@ import net.i2p.util.Log;
* Handle the actual processing and forwarding of messages through the
* various tunnels.
*
*<pre>
* For each type of tunnel, it creates a chain of handlers, as follows:
*
* Following tunnels are created by us:
*
* Outbound Gateway > 0 hops:
* PumpedTunnelGateway
* BatchedRouterPreprocessor -> OutboundSender -> OutboundReceiver -> OutNetMessagePool
*
* Outbound zero-hop Gateway+Endpoint:
* TunnelGatewayZeroHop
* OutboundMessageDistributor -> OutNetMessagePool
*
* Inbound Endpoint > 0 hops:
* TunnelParticipant
* RouterFragmentHandler -> InboundEndpointProcessor -> InboundMessageDistributor -> InNetMessagePool
*
* Inbound zero-hop Gateway+Endpoint:
* TunnelGatewayZeroHop
* InboundMessageDistributor -> InNetMessagePool
*
*
* Following tunnels are NOT created by us:
*
* Participant (not gateway or endpoint)
* TunnelParticipant
* HopProcessor -> OutNetMessagePool
*
* Outbound Endpoint > 0 hops:
* OutboundTunnelEndpoint
* RouterFragmentHandler -> HopProcessor -> OutboundMessageDistributor -> OutNetMessagePool
*
* Inbound Gateway > 0 hops:
* ThrottledPumpedTunnelGateway
* BatchedRouterPreprocessor -> InboundSender -> InboundGatewayReceiver -> OutNetMessagePool
*
*</pre>
*/
public class TunnelDispatcher implements Service {
private final RouterContext _context;
@@ -174,6 +211,8 @@ public class TunnelDispatcher implements Service {
// following are for InboundMessageDistributor
ctx.statManager().createRateStat("tunnel.dropDangerousClientTunnelMessage", "How many tunnel messages come down a client tunnel that we shouldn't expect (lifetime is the 'I2NP type')", "Tunnels", new long[] { 60*60*1000 });
ctx.statManager().createRateStat("tunnel.handleLoadClove", "When do we receive load test cloves", "Tunnels", new long[] { 60*60*1000 });
// following is for PumpedTunnelGateway
ctx.statManager().createRateStat("tunnel.dropGatewayOverflow", "Dropped message at GW, queue full", "Tunnels", new long[] { 60*60*1000 });
}
/** for IBGW */
@@ -770,7 +809,7 @@ public class TunnelDispatcher implements Service {
}
******/
public void startup() {
public synchronized void startup() {
// Note that we only use the validator for participants and OBEPs, not IBGWs, so
// this BW estimate will be high by about 33% assuming 2-hop tunnels average
_validator = new BloomFilterIVValidator(_context, getShareBandwidth(_context));
@@ -784,7 +823,7 @@ public class TunnelDispatcher implements Service {
return (int) (pct * Math.min(irateKBps, orateKBps));
}
public void shutdown() {
public synchronized void shutdown() {
if (_validator != null)
_validator.destroy();
_validator = null;
@@ -794,6 +833,7 @@ public class TunnelDispatcher implements Service {
_participants.clear();
_inboundGateways.clear();
_participatingConfig.clear();
_leaveJob.clear();
}
public void restart() {
@@ -827,6 +867,10 @@ public class TunnelDispatcher implements Service {
public void add(HopConfig cfg) {
_configs.offer(cfg);
}
public void clear() {
_configs.clear();
}
public String getName() { return "Expire participating tunnels"; }
public void runJob() {

View File

@@ -10,7 +10,7 @@ import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
/**
* Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or
@@ -32,11 +32,12 @@ import net.i2p.util.SimpleTimer;
* or if debugging, verify that it can be decrypted properly)</li>
* </ol>
*
* Unused directly - see PumpedTunnelGateway, ThrottledPumpedTunnelGateway, and TunnelGatewayZeroHop overrides.
*/
class TunnelGateway {
protected final RouterContext _context;
protected final Log _log;
protected final List<Pending> _queue;
protected final List<PendingGatewayMessage> _queue;
protected final QueuePreprocessor _preprocessor;
protected final Sender _sender;
protected final Receiver _receiver;
@@ -53,7 +54,7 @@ class TunnelGateway {
* @param receiver this receives the encrypted message and forwards it off
* to the first hop
*/
public TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
protected TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
_context = context;
_log = context.logManager().getLog(getClass());
_queue = new ArrayList(4);
@@ -63,8 +64,8 @@ class TunnelGateway {
//_flushFrequency = 500;
_delayedFlush = new DelayedFlush();
_lastFlush = _context.clock().now();
_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
//_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
//_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
}
/**
@@ -81,11 +82,15 @@ class TunnelGateway {
* coallesced with other pending messages) or after a brief pause (_flushFrequency).
* If it is queued up past its expiration, it is silently dropped
*
* UNUSED - see overrides
*
* @param msg message to be sent through the tunnel
* @param toRouter router to send to after the endpoint (or null for endpoint processing)
* @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
*/
public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
throw new UnsupportedOperationException("unused, right?");
/****
_messagesSent++;
long startAdd = System.currentTimeMillis();
boolean delayedFlush = false;
@@ -124,7 +129,7 @@ class TunnelGateway {
}
if (delayedFlush) {
_context.simpleTimer().addEvent(_delayedFlush, delayAmount);
_delayedFlush.reschedule(delayAmount);
}
_context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
if (_log.shouldLog(Log.DEBUG)) {
@@ -137,6 +142,7 @@ class TunnelGateway {
+ " expire: " + (afterExpire-afterPreprocess)
+ " queue flush: " + (complete-afterExpire));
}
****/
}
public int getMessagesSent() { return _messagesSent; }
@@ -165,7 +171,7 @@ class TunnelGateway {
*
* @return true if we should delay before preprocessing again
*/
public boolean preprocessQueue(List<Pending> pending, Sender sender, Receiver receiver);
public boolean preprocessQueue(List<PendingGatewayMessage> pending, Sender sender, Receiver receiver);
/** how long do we want to wait before flushing */
public long getDelayAmount();
@@ -178,137 +184,43 @@ class TunnelGateway {
*/
public long receiveEncrypted(byte encrypted[]);
}
/**
* Stores all the state for an unsent or partially-sent message
*/
public static class Pending {
protected final Hash _toRouter;
protected final TunnelId _toTunnel;
protected final long _messageId;
protected final long _expiration;
protected final byte _remaining[];
protected int _offset;
protected int _fragmentNumber;
protected final long _created;
private List<Long> _messageIds;
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
this(message, toRouter, toTunnel, System.currentTimeMillis());
}
public Pending(I2NPMessage message, Hash toRouter, TunnelId toTunnel, long now) {
_toRouter = toRouter;
_toTunnel = toTunnel;
_messageId = message.getUniqueId();
_expiration = message.getMessageExpiration();
_remaining = message.toByteArray();
_created = now;
}
/** may be null */
public Hash getToRouter() { return _toRouter; }
/** may be null */
public TunnelId getToTunnel() { return _toTunnel; }
public long getMessageId() { return _messageId; }
public long getExpiration() { return _expiration; }
/** raw unfragmented message to send */
public byte[] getData() { return _remaining; }
/** index into the data to be sent */
public int getOffset() { return _offset; }
/** move the offset */
public void setOffset(int offset) { _offset = offset; }
public long getLifetime() { return System.currentTimeMillis()-_created; }
/** which fragment are we working on (0 for the first fragment) */
public int getFragmentNumber() { return _fragmentNumber; }
/** ok, fragment sent, increment what the next will be */
public void incrementFragmentNumber() { _fragmentNumber++; }
/**
* Add an ID to the list of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public void addMessageId(long id) {
synchronized (Pending.this) {
if (_messageIds == null)
_messageIds = new ArrayList();
_messageIds.add(Long.valueOf(id));
}
}
/**
* The IDs of the TunnelDataMssages this message was fragmented into.
* Unused except in notePreprocessing() calls for debugging
*/
public List<Long> getMessageIds() {
synchronized (Pending.this) {
if (_messageIds != null)
return new ArrayList(_messageIds);
else
return new ArrayList();
}
}
}
/** Extend for debugging */
class PendingImpl extends Pending {
public PendingImpl(I2NPMessage message, Hash toRouter, TunnelId toTunnel) {
super(message, toRouter, toTunnel, _context.clock().now());
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(64);
buf.append("Message ").append(_messageId).append(" on ");
buf.append(TunnelGateway.this.toString());
if (_toRouter != null) {
buf.append(" targetting ");
buf.append(_toRouter.toBase64()).append(" ");
if (_toTunnel != null)
buf.append(_toTunnel.getTunnelId());
}
long now = _context.clock().now();
buf.append(" actual lifetime ");
buf.append(now - _created).append("ms");
buf.append(" potential lifetime ");
buf.append(_expiration - _created).append("ms");
buf.append(" size ").append(_remaining.length);
buf.append(" offset ").append(_offset);
buf.append(" frag ").append(_fragmentNumber);
return buf.toString();
}
protected class DelayedFlush extends SimpleTimer2.TimedEvent {
DelayedFlush() {
super(_context.simpleTimer2());
}
@Override
public long getLifetime() { return _context.clock().now()-_created; }
}
private class DelayedFlush implements SimpleTimer.TimedEvent {
public void timeReached() {
boolean wantRequeue = false;
int remaining = 0;
long beforeLock = _context.clock().now();
long afterChecked = -1;
//int remaining = 0;
//long beforeLock = _context.clock().now();
//long afterChecked = -1;
long delayAmount = -1;
//if (_queue.size() > 10000) // stay out of the synchronized block
// System.out.println("foo!");
synchronized (_queue) {
//if (_queue.size() > 10000) // stay in the synchronized block
// System.out.println("foo!");
afterChecked = _context.clock().now();
//afterChecked = _context.clock().now();
if (!_queue.isEmpty()) {
if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
_log.debug("Remaining before delayed flush preprocessing: " + _queue);
//if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
// _log.debug("Remaining before delayed flush preprocessing: " + _queue);
wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
if (wantRequeue)
if (wantRequeue) {
delayAmount = _preprocessor.getDelayAmount();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Remaining after delayed flush preprocessing (requeue? " + wantRequeue + "): " + _queue);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Remaining after delayed flush preprocessing: " + _queue);
}
}
remaining = _queue.size();
//remaining = _queue.size();
}
if (wantRequeue)
_context.simpleTimer().addEvent(_delayedFlush, delayAmount);
schedule(delayAmount);
else
_lastFlush = _context.clock().now();
_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
//_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
}
}
}

View File

@@ -1,7 +1,10 @@
package net.i2p.router.tunnel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -9,13 +12,17 @@ import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
/**
* run through the tunnel gateways that have had messages added to them and push
* those messages through the preprocessing and sending process
* Run through the tunnel gateways that have had messages added to them and push
* those messages through the preprocessing and sending process.
*
* TODO do we need this many threads?
* TODO this combines IBGWs and OBGWs, do we wish to separate the two
* and/or prioritize OBGWs (i.e. our outbound traffic) over IBGWs (participating)?
*/
class TunnelGatewayPumper implements Runnable {
private final RouterContext _context;
private final BlockingQueue<PumpedTunnelGateway> _wantsPumping;
private boolean _stop;
private final Set<PumpedTunnelGateway> _wantsPumping;
private volatile boolean _stop;
private static final int MIN_PUMPERS = 1;
private static final int MAX_PUMPERS = 4;
private final int _pumpers;
@@ -23,7 +30,7 @@ class TunnelGatewayPumper implements Runnable {
/** Creates a new instance of TunnelGatewayPumper */
public TunnelGatewayPumper(RouterContext ctx) {
_context = ctx;
_wantsPumping = new LinkedBlockingQueue();
_wantsPumping = new LinkedHashSet(16);
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
@@ -35,9 +42,10 @@ class TunnelGatewayPumper implements Runnable {
public void stopPumping() {
_stop=true;
_wantsPumping.clear();
PumpedTunnelGateway poison = new PoisonPTG(_context);
for (int i = 0; i < _pumpers; i++)
_wantsPumping.offer(poison);
for (int i = 0; i < _pumpers; i++) {
PumpedTunnelGateway poison = new PoisonPTG(_context);
wantsPumping(poison);
}
for (int i = 1; i <= 5 && !_wantsPumping.isEmpty(); i++) {
try {
Thread.sleep(i * 50);
@@ -47,16 +55,28 @@ class TunnelGatewayPumper implements Runnable {
}
public void wantsPumping(PumpedTunnelGateway gw) {
if (!_stop)
_wantsPumping.offer(gw);
if (!_stop) {
synchronized (_wantsPumping) {
if (_wantsPumping.add(gw))
_wantsPumping.notify();
}
}
}
public void run() {
PumpedTunnelGateway gw = null;
List<TunnelGateway.Pending> queueBuf = new ArrayList(32);
List<PendingGatewayMessage> queueBuf = new ArrayList(32);
while (!_stop) {
try {
gw = _wantsPumping.take();
synchronized (_wantsPumping) {
if (_wantsPumping.isEmpty()) {
_wantsPumping.wait();
} else {
Iterator<PumpedTunnelGateway> iter = _wantsPumping.iterator();
gw = iter.next();
iter.remove();
}
}
} catch (InterruptedException ie) {}
if (gw != null) {
if (gw.getMessagesSent() == POISON_PTG)

View File

@@ -30,6 +30,7 @@ class TunnelParticipant {
private static final long MAX_LOOKUP_TIME = 15*1000;
/** for next hop when a tunnel is first created */
private static final long LONG_MAX_LOOKUP_TIME = 30*1000;
private static final int PRIORITY = OutNetMessage.PRIORITY_PARTICIPATING;
/** not an inbound endpoint */
public TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor) {
@@ -196,7 +197,7 @@ class TunnelParticipant {
m.setMessage(msg);
m.setExpiration(msg.getMessageExpiration());
m.setTarget(ri);
m.setPriority(200);
m.setPriority(PRIORITY);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Forward on from " + _config + ": " + msg);
_context.outNetMessagePool().add(m);

View File

@@ -61,6 +61,7 @@ class BuildHandler implements Runnable {
private static final int MAX_QUEUE = 192;
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
private static final int PRIORITY = OutNetMessage.PRIORITY_BUILD_REPLY;
/**
* This must be high, as if we timeout the send we remove the tunnel from
@@ -689,7 +690,7 @@ class BuildHandler implements Runnable {
OutNetMessage msg = new OutNetMessage(_context);
msg.setMessage(state.msg);
msg.setExpiration(state.msg.getMessageExpiration());
msg.setPriority(300);
msg.setPriority(PRIORITY);
msg.setTarget(nextPeerInfo);
if (response == 0)
msg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));
@@ -722,7 +723,7 @@ class BuildHandler implements Runnable {
OutNetMessage outMsg = new OutNetMessage(_context);
outMsg.setExpiration(m.getMessageExpiration());
outMsg.setMessage(m);
outMsg.setPriority(300);
outMsg.setPriority(PRIORITY);
outMsg.setTarget(nextPeerInfo);
if (response == 0)
outMsg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));

View File

@@ -29,7 +29,9 @@ abstract class BuildRequestor {
for (int i = 0; i < TunnelBuildMessage.MAX_RECORD_COUNT; i++)
ORDER.add(Integer.valueOf(i));
}
private static final int PRIORITY = 500;
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_BUILD_REQUEST;
/**
* At 10 seconds, we were receiving about 20% of replies after expiration
* Todo: make this variable on a per-request basis, to account for tunnel length,

View File

@@ -0,0 +1,9 @@
package net.i2p.router.util;
/**
* For CoDelPriorityQueue
* @since 0.9.3
*/
public interface CDPQEntry extends CDQEntry, PQEntry {
}

View File

@@ -0,0 +1,20 @@
package net.i2p.router.util;
/**
* For CoDelQueue
* @since 0.9.3
*/
public interface CDQEntry {
/**
* To be set by the queue
*/
public void setEnqueueTime(long time);
public long getEnqueueTime();
/**
* Implement any reclaimation of resources here
*/
public void drop();
}

View File

@@ -0,0 +1,317 @@
package net.i2p.router.util;
import java.util.Collection;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* CoDel implementation of Active Queue Management.
* Ref: http://queue.acm.org/detail.cfm?id=2209336
* Ref: http://queue.acm.org/appendices/codel.html
*
* Code and comments are directly from appendix above, apparently public domain.
*
* Input: add(), offer(), and put() are overridden to add a timestamp.
*
* Output : take(), poll(), and drainTo() are overridden to implement AQM and drop entries
* if necessary. peek(), and remove() are NOT overridden, and do
* NOT implement AQM or update stats.
*
* @since 0.9.3
*/
public class CoDelBlockingQueue<E extends CDQEntry> extends LinkedBlockingQueue<E> {
private final I2PAppContext _context;
private final Log _log;
private final String _name;
private final int _capacity;
// following 4 are state variables defined by sample code, locked by this
/** Time when we'll declare we're above target (0 if below) */
private long _first_above_time;
/** Time to drop next packet */
private long _drop_next;
/** Packets dropped since going into drop state */
private int _count;
/** true if in drop state */
private boolean _dropping;
/** following is a per-request global for ease of use, locked by this */
private long _now;
/** debugging */
private static final AtomicLong __id = new AtomicLong();
private final long _id;
/**
* Quote:
* Below a target of 5 ms, utilization suffers for some conditions and traffic loads;
* above 5 ms there is very little or no improvement in utilization.
*
* I2P: Raise to 15 due to multithreading environment
*
* Maybe need to make configurable per-instance.
*/
private static final long TARGET = 15;
/**
* Quote:
* A setting of 100 ms works well across a range of RTTs from 10 ms to 1 second
*
* Maybe need to make configurable per-instance.
*/
private static final long INTERVAL = 100;
//private static final int MAXPACKET = 512;
private final String STAT_DROP;
private final String STAT_DELAY;
private static final long[] RATES = {5*60*1000, 60*60*1000};
private static final long BACKLOG_TIME = 2*1000;
/**
* @param name for stats
*/
public CoDelBlockingQueue(I2PAppContext ctx, String name, int capacity) {
super(capacity);
_context = ctx;
_log = ctx.logManager().getLog(CoDelBlockingQueue.class);
_name = name;
_capacity = capacity;
STAT_DROP = "codel." + name + ".drop";
STAT_DELAY = "codel." + name + ".delay";
ctx.statManager().createRequiredRateStat(STAT_DROP, "queue delay of dropped items", "Router", RATES);
ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES);
_id = __id.incrementAndGet();
}
@Override
public boolean add(E o) {
o.setEnqueueTime(_context.clock().now());
return super.add(o);
}
@Override
public boolean offer(E o) {
o.setEnqueueTime(_context.clock().now());
return super.offer(o);
}
@Override
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
o.setEnqueueTime(_context.clock().now());
return super.offer(o, timeout, unit);
}
@Override
public void put(E o) throws InterruptedException {
o.setEnqueueTime(_context.clock().now());
super.put(o);
}
@Override
public void clear() {
super.clear();
synchronized(this) {
_first_above_time = 0;
_drop_next = 0;
_count = 0;
_dropping = false;
}
}
@Override
public E take() throws InterruptedException {
E rv;
do {
rv = deque();
} while (rv == null);
return rv;
}
@Override
public E poll() {
E rv = super.poll();
return codel(rv);
}
/**
* Updates stats and possibly drops while draining.
*/
@Override
public int drainTo(Collection<? super E> c) {
int rv = 0;
E e;
while ((e = poll()) != null) {
c.add(e);
rv++;
}
return rv;
}
/**
* Updates stats and possibly drops while draining.
*/
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
int rv = 0;
E e;
while ((e = poll()) != null && rv++ < maxElements) {
c.add(e);
}
return rv;
}
/**
* Drains all, without updating stats or dropping.
*/
public int drainAllTo(Collection<? super E> c) {
return super.drainTo(c);
}
/**
* Has the head of the queue been waiting too long,
* or is the queue almost full?
*/
public boolean isBacklogged() {
E e = peek();
if (e == null)
return false;
return _dropping ||
_context.clock().now() - e.getEnqueueTime() >= BACKLOG_TIME ||
remainingCapacity() < _capacity / 4;
}
/////// private below here
/**
* Caller must synch on this
* @param entry may be null
*/
private boolean updateVars(E entry) {
// This is a helper routine that tracks whether the sojourn time
// is above or below target and, if above, if it has remained above continuously for at least interval.
// It returns a boolean indicating whether it is OK to drop (sojourn time above target
// for at least interval)
if (entry == null) {
_first_above_time = 0;
return false;
}
_now = _context.clock().now();
boolean ok_to_drop = false;
long sojurn = _now - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DELAY, sojurn);
// I2P use isEmpty instead of size() < MAXPACKET
if (sojurn < TARGET || isEmpty()) {
_first_above_time = 0;
} else {
if (_first_above_time == 0) {
// just went above from below. if we stay above
// for at least INTERVAL we'll say it's ok to drop
_first_above_time = _now + INTERVAL;
} else if (_now >= _first_above_time) {
ok_to_drop = true;
}
}
return ok_to_drop;
}
/**
* @return if null, call again
*/
private E deque() throws InterruptedException {
E rv = super.take();
return codel(rv);
}
/**
* @param rv may be null
* @return rv or a subequent entry or null if dropped
*/
private E codel(E rv) {
synchronized (this) {
// non-blocking inside this synchronized block
boolean ok_to_drop = updateVars(rv);
// All of the work of CoDel is done here.
// There are two branches: if we're in packet-dropping state (meaning that the queue-sojourn
// time has gone above target and hasn't come down yet), then we need to check if it's time
// to leave or if it's time for the next drop(s); if we're not in dropping state, then we need
// to decide if it's time to enter and do the initial drop.
if (_dropping) {
if (!ok_to_drop) {
// sojurn time below target - leave dropping state
_dropping = false;
} else if (_now >= _drop_next) {
// It's time for the next drop. Drop the current packet and dequeue the next.
// The dequeue might take us out of dropping state. If not, schedule the next drop.
// A large backlog might result in drop rates so high that the next drop should happen now;
// hence, the while loop.
while (_now >= _drop_next && _dropping) {
drop(rv);
_count++;
// I2P - we poll here instead of lock so we don't get stuck
// inside the lock. If empty, deque() will be called again.
rv = super.poll();
ok_to_drop = updateVars(rv);
if (!ok_to_drop) {
// leave dropping state
_dropping = false;
} else {
// schedule the next drop
control_law(_drop_next);
}
}
}
} else if (ok_to_drop &&
(_now - _drop_next < INTERVAL || _now - _first_above_time >= INTERVAL)) {
// If we get here, then we're not in dropping state. If the sojourn time has been above
// target for interval, then we decide whether it's time to enter dropping state.
// We do so if we've been either in dropping state recently or above target for a relatively
// long time. The "recently" check helps ensure that when we're successfully controlling
// the queue we react quickly (in one interval) and start with the drop rate that controlled
// the queue last time rather than relearn the correct rate from scratch. If we haven't been
// dropping recently, the "long time above" check adds some hysteresis to the state entry
// so we don't drop on a slightly bigger-than-normal traffic pulse into an otherwise quiet queue.
drop(rv);
// I2P - we poll here instead of lock so we don't get stuck
// inside the lock. If empty, deque() will be called again.
rv = super.poll();
updateVars(rv);
_dropping = true;
// If we're in a drop cycle, the drop rate that controlled the queue
// on the last cycle is a good starting point to control it now.
if (_now - _drop_next < INTERVAL)
_count = _count > 2 ? _count - 2 : 1;
else
_count = 1;
control_law(_now);
}
}
return rv;
}
private void drop(E entry) {
long delay = _context.clock().now() - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DROP, delay);
if (_log.shouldLog(Log.WARN))
_log.warn("CDQ #" + _id + ' ' + _name + " dropped item with delay " + delay + ", " +
DataHelper.formatDuration(_context.clock().now() - _first_above_time) + " since first above, " +
DataHelper.formatDuration(_context.clock().now() - _drop_next) + " since drop next, " +
(_count+1) + " dropped in this phase, " +
size() + " remaining in queue: " + entry);
entry.drop();
}
/**
* Caller must synch on this
*/
private void control_law(long t) {
_drop_next = t + (long) (INTERVAL / Math.sqrt(_count));
}
}

View File

@@ -0,0 +1,316 @@
package net.i2p.router.util;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
/**
* CoDel implementation of Active Queue Management.
* Ref: http://queue.acm.org/detail.cfm?id=2209336
* Ref: http://queue.acm.org/appendices/codel.html
*
* Code and comments are directly from appendix above, apparently public domain.
*
* Input: add(), offer(), and put() are overridden to add a timestamp.
*
* Output : take(), poll(), and drainTo() are overridden to implement AQM and drop entries
* if necessary. peek(), and remove() are NOT overridden, and do
* NOT implement AQM or update stats.
*
* @since 0.9.3
*/
public class CoDelPriorityBlockingQueue<E extends CDPQEntry> extends PriBlockingQueue<E> {
private final I2PAppContext _context;
private final Log _log;
private final String _name;
private final AtomicLong _seqNum = new AtomicLong();
// following 4 are state variables defined by sample code, locked by this
/** Time when we'll declare we're above target (0 if below) */
private long _first_above_time;
/** Time to drop next packet */
private long _drop_next;
/** Packets dropped since going into drop state */
private int _count;
/** true if in drop state */
private boolean _dropping;
/** following is a per-request global for ease of use, locked by this */
private long _now;
private int _lastDroppedPriority;
/** debugging */
static final AtomicLong __id = new AtomicLong();
private final long _id;
/**
* Quote:
* Below a target of 5 ms, utilization suffers for some conditions and traffic loads;
* above 5 ms there is very little or no improvement in utilization.
*
* I2P: Raise to 15 due to multithreading environment
*
* Maybe need to make configurable per-instance.
*/
private static final long TARGET = 15;
/**
* Quote:
* A setting of 100 ms works well across a range of RTTs from 10 ms to 1 second
*
* I2P: Raise to 300 due to longer end-to-end RTTs
*
* Maybe need to make configurable per-instance.
*/
private static final long INTERVAL = 300;
//private static final int MAXPACKET = 512;
private final String STAT_DROP;
private final String STAT_DELAY;
private static final long[] RATES = {5*60*1000, 60*60*1000};
public static final int MIN_PRIORITY = 100;
private static final int[] PRIORITIES = {MIN_PRIORITY, 200, 300, 400, 500};
/** if priority is >= this, never drop */
public static final int DONT_DROP_PRIORITY = 1000;
private static final long BACKLOG_TIME = 2*1000;
/**
* @param name for stats
*/
public CoDelPriorityBlockingQueue(I2PAppContext ctx, String name, int initialCapacity) {
super(initialCapacity);
_context = ctx;
_log = ctx.logManager().getLog(CoDelPriorityBlockingQueue.class);
_name = name;
STAT_DROP = "codel." + name + ".drop.";
STAT_DELAY = "codel." + name + ".delay";
for (int i = 0; i < PRIORITIES.length; i++) {
ctx.statManager().createRequiredRateStat(STAT_DROP + PRIORITIES[i], "queue delay of dropped items by priority", "Router", RATES);
}
ctx.statManager().createRequiredRateStat(STAT_DELAY, "average queue delay", "Router", RATES);
_id = __id.incrementAndGet();
}
@Override
public void clear() {
super.clear();
synchronized(this) {
_first_above_time = 0;
_drop_next = 0;
_count = 0;
_dropping = false;
}
}
@Override
public E take() throws InterruptedException {
E rv;
do {
rv = deque();
} while (rv == null);
return rv;
}
@Override
public E poll() {
E rv = super.poll();
return codel(rv);
}
/**
* Updates stats and possibly drops while draining.
*/
@Override
public int drainTo(Collection<? super E> c) {
int rv = 0;
E e;
while ((e = poll()) != null) {
c.add(e);
rv++;
}
return rv;
}
/**
* Updates stats and possibly drops while draining.
*/
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
int rv = 0;
E e;
while ((e = poll()) != null && rv++ < maxElements) {
c.add(e);
}
return rv;
}
/**
* Drains all, without updating stats or dropping.
*/
public int drainAllTo(Collection<? super E> c) {
return super.drainTo(c);
}
/**
* Has the head of the queue been waiting too long,
* or is the queue too big?
*/
@Override
public boolean isBacklogged() {
E e = peek();
if (e == null)
return false;
return _dropping ||
_context.clock().now() - e.getEnqueueTime() >= BACKLOG_TIME ||
size() >= BACKLOG_SIZE;
}
/////// private below here
@Override
protected void timestamp(E o) {
o.setSeqNum(_seqNum.incrementAndGet());
o.setEnqueueTime(_context.clock().now());
if (o.getPriority() < MIN_PRIORITY && _log.shouldLog(Log.WARN))
_log.warn(_name + " added item with low priority " + o.getPriority() +
": " + o);
}
/**
* Caller must synch on this
* @param entry may be null
*/
private boolean updateVars(E entry) {
// This is a helper routine that tracks whether the sojourn time
// is above or below target and, if above, if it has remained above continuously for at least interval.
// It returns a boolean indicating whether it is OK to drop (sojourn time above target
// for at least interval)
if (entry == null) {
_first_above_time = 0;
return false;
}
_now = _context.clock().now();
boolean ok_to_drop = false;
long sojurn = _now - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DELAY, sojurn);
// I2P use isEmpty instead of size() < MAXPACKET
if (sojurn < TARGET || isEmpty()) {
_first_above_time = 0;
} else {
if (_first_above_time == 0) {
// just went above from below. if we stay above
// for at least INTERVAL we'll say it's ok to drop
_first_above_time = _now + INTERVAL;
} else if (_now >= _first_above_time) {
ok_to_drop = true;
}
}
return ok_to_drop;
}
/**
* @return if null, call again
*/
private E deque() throws InterruptedException {
E rv = super.take();
return codel(rv);
}
/**
* @param rv may be null
* @return rv or a subequent entry or null if dropped
*/
private E codel(E rv) {
synchronized (this) {
// non-blocking inside this synchronized block
boolean ok_to_drop = updateVars(rv);
// All of the work of CoDel is done here.
// There are two branches: if we're in packet-dropping state (meaning that the queue-sojourn
// time has gone above target and hasn't come down yet), then we need to check if it's time
// to leave or if it's time for the next drop(s); if we're not in dropping state, then we need
// to decide if it's time to enter and do the initial drop.
if (_dropping) {
if (!ok_to_drop) {
// sojurn time below target - leave dropping state
_dropping = false;
} else if (_now >= _drop_next) {
// It's time for the next drop. Drop the current packet and dequeue the next.
// The dequeue might take us out of dropping state. If not, schedule the next drop.
// A large backlog might result in drop rates so high that the next drop should happen now;
// hence, the while loop.
while (_now >= _drop_next && _dropping && rv.getPriority() <= _lastDroppedPriority) {
drop(rv);
_count++;
// I2P - we poll here instead of lock so we don't get stuck
// inside the lock. If empty, deque() will be called again.
rv = super.poll();
ok_to_drop = updateVars(rv);
if (!ok_to_drop) {
// leave dropping state
_dropping = false;
} else {
// schedule the next drop
control_law(_drop_next);
}
}
}
} else if (ok_to_drop &&
rv.getPriority() < DONT_DROP_PRIORITY &&
(_now - _drop_next < INTERVAL || _now - _first_above_time >= INTERVAL)) {
// If we get here, then we're not in dropping state. If the sojourn time has been above
// target for interval, then we decide whether it's time to enter dropping state.
// We do so if we've been either in dropping state recently or above target for a relatively
// long time. The "recently" check helps ensure that when we're successfully controlling
// the queue we react quickly (in one interval) and start with the drop rate that controlled
// the queue last time rather than relearn the correct rate from scratch. If we haven't been
// dropping recently, the "long time above" check adds some hysteresis to the state entry
// so we don't drop on a slightly bigger-than-normal traffic pulse into an otherwise quiet queue.
drop(rv);
_lastDroppedPriority = rv.getPriority();
// I2P - we poll here instead of lock so we don't get stuck
// inside the lock. If empty, deque() will be called again.
rv = super.poll();
updateVars(rv);
_dropping = true;
// If we're in a drop cycle, the drop rate that controlled the queue
// on the last cycle is a good starting point to control it now.
if (_now - _drop_next < INTERVAL)
_count = _count > 2 ? _count - 2 : 1;
else
_count = 1;
control_law(_now);
}
}
return rv;
}
private void drop(E entry) {
long delay = _context.clock().now() - entry.getEnqueueTime();
_context.statManager().addRateData(STAT_DROP + entry.getPriority(), delay);
if (_log.shouldLog(Log.WARN))
_log.warn("CDPQ #" + _id + ' ' + _name + " dropped item with delay " + delay + ", priority " +
entry.getPriority() + ", seq " +
entry.getSeqNum() + ", " +
DataHelper.formatDuration(_context.clock().now() - _first_above_time) + " since first above, " +
DataHelper.formatDuration(_context.clock().now() - _drop_next) + " since drop next, " +
(_count+1) + " dropped in this phase, " +
size() + " remaining in queue: " + entry);
entry.drop();
}
/**
* Caller must synch on this
*/
private void control_law(long t) {
_drop_next = t + (long) (INTERVAL / Math.sqrt(_count));
}
}

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
import net.i2p.util.SimpleTimer2;
import org.xlattice.crypto.filters.BloomSHA1;
@@ -38,7 +38,7 @@ public class DecayingBloomFilter {
private final long _longToEntryMask;
protected long _currentDuplicates;
protected volatile boolean _keepDecaying;
protected final SimpleTimer.TimedEvent _decayEvent;
protected final SimpleTimer2.TimedEvent _decayEvent;
/** just for logging */
protected final String _name;
/** synchronize against this lock when switching double buffers */
@@ -64,7 +64,7 @@ public class DecayingBloomFilter {
context.addShutdownTask(new Shutdown());
_decayEvent = new DecayEvent();
_keepDecaying = true;
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
_decayEvent.schedule(_durationMs);
}
/**
@@ -118,7 +118,7 @@ public class DecayingBloomFilter {
}
_decayEvent = new DecayEvent();
_keepDecaying = true;
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
_decayEvent.schedule(_durationMs);
if (_log.shouldLog(Log.WARN))
_log.warn("New DBF " + name + " m = " + m + " k = " + k + " entryBytes = " + entryBytes +
" numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000));
@@ -274,7 +274,7 @@ public class DecayingBloomFilter {
public void stopDecaying() {
_keepDecaying = false;
SimpleTimer.getInstance().removeEvent(_decayEvent);
_decayEvent.cancel();
}
protected void decay() {
@@ -310,11 +310,15 @@ public class DecayingBloomFilter {
}
}
private class DecayEvent implements SimpleTimer.TimedEvent {
private class DecayEvent extends SimpleTimer2.TimedEvent {
DecayEvent() {
super(_context.simpleTimer2());
}
public void timeReached() {
if (_keepDecaying) {
decay();
SimpleTimer.getInstance().addEvent(DecayEvent.this, _durationMs);
schedule(_durationMs);
}
}
}

View File

@@ -0,0 +1,145 @@
package net.i2p.router.util;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import net.i2p.I2PAppContext;
import net.i2p.util.SecureFileOutputStream;
/**
* Simple event logger for occasional events,
* with caching for reads.
* Does not keep the file open.
* @since 0.9.3
*/
public class EventLog {
private final I2PAppContext _context;
private final File _file;
/** event to cached map */
private final Map<String, SortedMap<Long, String>> _cache;
/** event to starting time of cached map */
private final Map<String, Long> _cacheTime;
/** for convenience, not required */
public static final String ABORTED = "aborted";
public static final String CHANGE_IP = "changeIP";
public static final String CHANGE_PORT = "changePort";
public static final String CLOCK_SHIFT = "clockShift";
public static final String CRASHED = "crashed";
public static final String INSTALLED = "installed";
public static final String INSTALL_FAILED = "intallFailed";
public static final String NEW_IDENT = "newIdent";
public static final String REKEYED = "rekeyed";
public static final String SOFT_RESTART = "softRestart";
public static final String STARTED = "started";
public static final String STOPPED = "stopped";
public static final String UPDATED = "updated";
public static final String WATCHDOG = "watchdog";
/**
* @param file must be absolute
* @throws IllegalArgumentException if not absolute
*/
public EventLog(I2PAppContext ctx, File file) {
if (!file.isAbsolute())
throw new IllegalArgumentException();
_context = ctx;
_file = file;
_cache = new HashMap(4);
_cacheTime = new HashMap(4);
}
/**
* Append an event. Fails silently.
* @param event no spaces, e.g. "started"
* @throws IllegalArgumentException if event contains a space or newline
*/
public void addEvent(String event) {
addEvent(event, null);
}
/**
* Append an event. Fails silently.
* @param event no spaces or newlines, e.g. "started"
* @param info no newlines, may be blank or null
* @throws IllegalArgumentException if event contains a space or either contains a newline
*/
public synchronized void addEvent(String event, String info) {
if (event.contains(" ") || event.contains("\n") ||
(info != null && info.contains("\n")))
throw new IllegalArgumentException();
_cache.remove(event);
_cacheTime.remove(event);
OutputStream out = null;
try {
out = new SecureFileOutputStream(_file, true);
StringBuilder buf = new StringBuilder(128);
buf.append(_context.clock().now()).append(' ').append(event);
if (info != null && info.length() > 0)
buf.append(' ').append(info);
buf.append('\n');
out.write(buf.toString().getBytes("UTF-8"));
} catch (IOException ioe) {
} finally {
if (out != null) try { out.close(); } catch (IOException ioe) {}
}
}
/**
* Caches.
* Fails silently.
* @param event matching this event only, case sensitive
* @param since since this time, 0 for all
* @return non-null, Map of times to (possibly empty) info strings, sorted, earliest first, unmodifiable
*/
public synchronized SortedMap<Long, String> getEvents(String event, long since) {
SortedMap<Long, String> rv = _cache.get(event);
if (rv != null) {
Long cacheTime = _cacheTime.get(event);
if (cacheTime != null) {
if (since >= cacheTime.longValue())
return rv.tailMap(Long.valueOf(since));
}
}
rv = new TreeMap();
InputStream in = null;
try {
in = new FileInputStream(_file);
BufferedReader br = new BufferedReader(new InputStreamReader(in, "UTF-8"));
String line = null;
while ( (line = br.readLine()) != null) {
try {
String[] s = line.split(" ", 3);
if (!s[1].equals(event))
continue;
long time = Long.parseLong(s[0]);
if (time <= since)
continue;
Long ltime = Long.valueOf(time);
String info = s.length > 2 ? s[2] : "";
rv.put(time, info);
} catch (IndexOutOfBoundsException ioobe) {
} catch (NumberFormatException nfe) {
}
}
rv = Collections.unmodifiableSortedMap(rv);
_cache.put(event, rv);
_cacheTime.put(event, Long.valueOf(since));
} catch (IOException ioe) {
} finally {
if (in != null) try { in.close(); } catch (IOException ioe) {}
}
return rv;
}
}

View File

@@ -0,0 +1,23 @@
package net.i2p.router.util;
/**
* For PriBlockingQueue
* @since 0.9.3
*/
public interface PQEntry {
/**
* Higher is higher priority
*/
public int getPriority();
/**
* To be set by the queue
*/
public void setSeqNum(long num);
/**
* Needed to ensure FIFO ordering within a single priority
*/
public long getSeqNum();
}

View File

@@ -0,0 +1,76 @@
package net.i2p.router.util;
import java.util.Comparator;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* Priority Blocking Queue using methods in the entries,
* as definied in PQEntry, to store priority and sequence number,
* ensuring FIFO order within a priority.
*
* Input: add(), offer(), and put() are overridden to add a sequence number.
*
* @since 0.9.3
*/
public class PriBlockingQueue<E extends PQEntry> extends PriorityBlockingQueue<E> {
private final AtomicLong _seqNum = new AtomicLong();
protected static final int BACKLOG_SIZE = 256;
public PriBlockingQueue(int initialCapacity) {
super(initialCapacity, new PriorityComparator());
}
@Override
public boolean add(E o) {
timestamp(o);
return super.add(o);
}
@Override
public boolean offer(E o) {
timestamp(o);
return super.offer(o);
}
@Override
public boolean offer(E o, long timeout, TimeUnit unit) {
timestamp(o);
return super.offer(o, timeout, unit);
}
@Override
public void put(E o) {
timestamp(o);
super.put(o);
}
/**
* Is the queue too big?
*/
public boolean isBacklogged() {
return size() >= BACKLOG_SIZE;
}
/////// private below here
protected void timestamp(E o) {
o.setSeqNum(_seqNum.incrementAndGet());
}
/**
* highest priority first, then lowest sequence number first
*/
private static class PriorityComparator<E extends PQEntry> implements Comparator<E> {
public int compare(E l, E r) {
int d = r.getPriority() - l.getPriority();
if (d != 0)
return d;
long ld = l.getSeqNum() - r.getSeqNum();
return ld > 0 ? 1 : -1;
}
}
}