forked from I2P_Developers/i2p.i2p
OutNetMessage:
- Centralize priority definitions
- Raise netdb store and reply priority
GarlicMessage:
- Add notes about GarlicMessageHandler and HandleGarlicMessageJob
being unused in practice
This commit is contained in:
@@ -58,6 +58,26 @@ public class OutNetMessage {
|
||||
private List<String> _timestampOrder;
|
||||
private Object _preparationBuf;
|
||||
|
||||
/**
|
||||
* Priorities, higher is higher priority.
|
||||
* @since 0.9.3
|
||||
*/
|
||||
public static final int PRIORITY_HIGHEST = 1000;
|
||||
public static final int PRIORITY_MY_BUILD_REQUEST = 500;
|
||||
public static final int PRIORITY_MY_NETDB_LOOKUP = 500;
|
||||
public static final int PRIORITY_MY_NETDB_STORE = 400;
|
||||
public static final int PRIORITY_MY_DATA = 400;
|
||||
public static final int PRIORITY_MY_NETDB_STORE_LOW = 300;
|
||||
public static final int PRIORITY_HIS_BUILD_REQUEST = 300;
|
||||
public static final int PRIORITY_BUILD_REPLY = 300;
|
||||
public static final int PRIORITY_NETDB_REPLY = 300;
|
||||
public static final int PRIORITY_HIS_NETDB_STORE = 200;
|
||||
public static final int PRIORITY_NETDB_FLOOD = 200;
|
||||
public static final int PRIORITY_PARTICIPATING = 200;
|
||||
public static final int PRIORITY_NETDB_EXPLORE = 100;
|
||||
public static final int PRIORITY_NETDB_HARVEST = 100;
|
||||
public static final int PRIORITY_LOWEST = 100;
|
||||
|
||||
public OutNetMessage(RouterContext context) {
|
||||
_context = context;
|
||||
_log = context.logManager().getLog(OutNetMessage.class);
|
||||
|
||||
@@ -19,6 +19,9 @@ import net.i2p.router.RouterContext;
|
||||
/**
|
||||
* HandlerJobBuilder to build jobs to handle GarlicMessages
|
||||
*
|
||||
* This is essentially unused, as InNetMessagePool short circuits tunnel messages,
|
||||
* and the garlics are handled in InboundMessageDistributor.
|
||||
* Unless we get a garlic message not down a tunnel?
|
||||
*/
|
||||
public class GarlicMessageHandler implements HandlerJobBuilder {
|
||||
private final RouterContext _context;
|
||||
|
||||
@@ -97,10 +97,12 @@ public class GarlicMessageReceiver {
|
||||
*/
|
||||
private void handleClove(GarlicClove clove) {
|
||||
if (!isValid(clove)) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.warn("Invalid clove " + clove);
|
||||
//if (_log.shouldLog(Log.WARN))
|
||||
// _log.warn("Invalid clove " + clove);
|
||||
return;
|
||||
}
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("valid clove " + clove);
|
||||
_receiver.handleClove(clove.getInstructions(), clove.getData());
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import net.i2p.data.i2np.GarlicMessage;
|
||||
import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.data.i2np.TunnelGatewayMessage;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
@@ -24,6 +25,9 @@ import net.i2p.util.Log;
|
||||
* as if they arrived locally. Other instructions are not yet implemented (but
|
||||
* need to be. soon)
|
||||
*
|
||||
* This is essentially unused, as InNetMessagePool short circuits tunnel messages,
|
||||
* and the garlics are handled in InboundMessageDistributor.
|
||||
* Unless we get a garlic message not down a tunnel?
|
||||
*/
|
||||
class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.CloveReceiver {
|
||||
private final Log _log;
|
||||
@@ -34,6 +38,9 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
|
||||
//private MessageHandler _handler;
|
||||
//private GarlicMessageParser _parser;
|
||||
|
||||
private final static int ROUTER_PRIORITY = OutNetMessage.PRIORITY_LOWEST;
|
||||
private final static int TUNNEL_PRIORITY = OutNetMessage.PRIORITY_LOWEST;
|
||||
|
||||
/**
|
||||
* @param from ignored
|
||||
* @param fromHash ignored
|
||||
@@ -42,8 +49,8 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
|
||||
super(context);
|
||||
_log = context.logManager().getLog(HandleGarlicMessageJob.class);
|
||||
getContext().statManager().createRateStat("crypto.garlic.decryptFail", "How often garlic messages are undecryptable", "Encryption", new long[] { 5*60*1000, 60*60*1000, 24*60*60*1000 });
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("New handle garlicMessageJob called w/ message from [" + from + "]", new Exception("Debug"));
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Garlic Message not down a tunnel??? from [" + from + "]", new Exception("I did it"));
|
||||
_message = msg;
|
||||
//_from = from;
|
||||
//_fromHash = fromHash;
|
||||
@@ -78,10 +85,10 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
|
||||
} else {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("router delivery instructions targetting "
|
||||
+ instructions.getRouter().toBase64().substring(0,4));
|
||||
+ instructions.getRouter().toBase64().substring(0,4) + " for " + data);
|
||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), data,
|
||||
instructions.getRouter(),
|
||||
10*1000, 100);
|
||||
10*1000, ROUTER_PRIORITY);
|
||||
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
|
||||
j.runJob();
|
||||
//getContext().jobQueue().addJob(j);
|
||||
@@ -92,9 +99,12 @@ class HandleGarlicMessageJob extends JobImpl implements GarlicMessageReceiver.Cl
|
||||
gw.setMessage(data);
|
||||
gw.setTunnelId(instructions.getTunnelId());
|
||||
gw.setMessageExpiration(data.getMessageExpiration());
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("tunnel delivery instructions targetting "
|
||||
+ instructions.getRouter().toBase64().substring(0,4) + " for " + data);
|
||||
SendMessageDirectJob job = new SendMessageDirectJob(getContext(), gw,
|
||||
instructions.getRouter(),
|
||||
10*1000, 100);
|
||||
10*1000, TUNNEL_PRIORITY);
|
||||
// run it inline (adds to the outNetPool if it has the router info, otherwise queue a lookup)
|
||||
job.runJob();
|
||||
// getContext().jobQueue().addJob(job);
|
||||
|
||||
@@ -24,6 +24,7 @@ import net.i2p.data.i2np.I2NPMessage;
|
||||
import net.i2p.data.i2np.TunnelGatewayMessage;
|
||||
import net.i2p.router.Job;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.Router;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.message.SendMessageDirectJob;
|
||||
@@ -40,7 +41,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
|
||||
private final static int MAX_ROUTERS_RETURNED = 3;
|
||||
private final static int CLOSENESS_THRESHOLD = 8; // FNDF.MAX_TO_FLOOD + 1
|
||||
private final static int REPLY_TIMEOUT = 60*1000;
|
||||
private final static int MESSAGE_PRIORITY = 300;
|
||||
private final static int MESSAGE_PRIORITY = OutNetMessage.PRIORITY_NETDB_REPLY;
|
||||
|
||||
/**
|
||||
* If a routerInfo structure isn't this recent, don't send it out.
|
||||
@@ -283,7 +284,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl {
|
||||
m.setMessage(message);
|
||||
m.setMessageExpiration(message.getMessageExpiration());
|
||||
m.setTunnelId(replyTunnel);
|
||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, 100);
|
||||
SendMessageDirectJob j = new SendMessageDirectJob(getContext(), m, toPeer, 10*1000, MESSAGE_PRIORITY);
|
||||
j.runJob();
|
||||
//getContext().jobQueue().addJob(j);
|
||||
}
|
||||
|
||||
@@ -41,6 +41,9 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
*/
|
||||
private static final int MAX_TO_FLOOD = 4;
|
||||
|
||||
private static final int FLOOD_PRIORITY = OutNetMessage.PRIORITY_NETDB_FLOOD;
|
||||
private static final int FLOOD_TIMEOUT = 30*1000;
|
||||
|
||||
public FloodfillNetworkDatabaseFacade(RouterContext context) {
|
||||
super(context);
|
||||
_activeFloodQueries = new HashMap();
|
||||
@@ -224,9 +227,6 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad
|
||||
}
|
||||
}
|
||||
|
||||
private static final int FLOOD_PRIORITY = 200;
|
||||
private static final int FLOOD_TIMEOUT = 30*1000;
|
||||
|
||||
@Override
|
||||
protected PeerSelector createPeerSelector() { return new FloodfillPeerSelector(_context); }
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import net.i2p.data.Hash;
|
||||
import net.i2p.data.RouterInfo;
|
||||
import net.i2p.data.i2np.DatabaseLookupMessage;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.router.message.SendMessageDirectJob;
|
||||
@@ -39,7 +40,7 @@ class HarvesterJob extends JobImpl {
|
||||
/** don't try to update more than 5 peers during each run */
|
||||
private static final int MAX_PER_RUN = 5;
|
||||
/** background job, who cares */
|
||||
private static final int PRIORITY = 100;
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_NETDB_HARVEST;
|
||||
|
||||
public static final String PROP_ENABLED = "netDb.shouldHarvest";
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@ import net.i2p.data.i2np.DatabaseSearchReplyMessage;
|
||||
import net.i2p.data.i2np.DatabaseStoreMessage;
|
||||
import net.i2p.router.Job;
|
||||
import net.i2p.router.JobImpl;
|
||||
import net.i2p.router.OutNetMessage;
|
||||
import net.i2p.router.RouterContext;
|
||||
import net.i2p.router.TunnelInfo;
|
||||
import net.i2p.util.Log;
|
||||
@@ -78,6 +79,10 @@ class SearchJob extends JobImpl {
|
||||
*
|
||||
*/
|
||||
private static final long REQUEUE_DELAY = 1000;
|
||||
|
||||
// TODO pass to the tunnel dispatcher
|
||||
//private final static int LOOKUP_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_LOOKUP;
|
||||
//private final static int STORE_PRIORITY = OutNetMessage.PRIORITY_HIS_NETDB_STORE;
|
||||
|
||||
/**
|
||||
* Create a new search for the routingKey specified
|
||||
@@ -445,6 +450,7 @@ class SearchJob extends JobImpl {
|
||||
if (FloodfillNetworkDatabaseFacade.isFloodfill(router))
|
||||
_floodfillSearchesOutstanding++;
|
||||
getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout);
|
||||
// TODO pass a priority to the dispatcher
|
||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, to);
|
||||
}
|
||||
|
||||
@@ -652,6 +658,7 @@ class SearchJob extends JobImpl {
|
||||
if (outTunnel != null) {
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("resending leaseSet out to " + to + " through " + outTunnel + ": " + msg);
|
||||
// TODO pass a priority to the dispatcher
|
||||
getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, to);
|
||||
return true;
|
||||
} else {
|
||||
|
||||
@@ -43,7 +43,7 @@ class StoreJob extends JobImpl {
|
||||
|
||||
private final static int PARALLELIZATION = 4; // how many sent at a time
|
||||
private final static int REDUNDANCY = 4; // we want the data sent to 6 peers
|
||||
private final static int STORE_PRIORITY = 100;
|
||||
private final static int STORE_PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE;
|
||||
|
||||
/**
|
||||
* Send a data structure to the floodfills
|
||||
|
||||
@@ -137,6 +137,8 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
/** 2 bytes for length and 4 for CRC */
|
||||
public static final int MAX_MSG_SIZE = BUFFER_SIZE - (2 + 4);
|
||||
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_NETDB_STORE_LOW;
|
||||
|
||||
/**
|
||||
* Create an inbound connected (though not established) NTCP connection
|
||||
*
|
||||
@@ -397,7 +399,7 @@ class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener {
|
||||
DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context);
|
||||
dsm.setEntry(_context.router().getRouterInfo());
|
||||
infoMsg.setMessage(dsm);
|
||||
infoMsg.setPriority(100);
|
||||
infoMsg.setPriority(PRIORITY);
|
||||
RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash());
|
||||
if (target != null) {
|
||||
infoMsg.setTarget(target);
|
||||
|
||||
@@ -15,6 +15,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
|
||||
private RouterInfo _target;
|
||||
|
||||
private static final long MAX_LOOKUP_TIME = 15*1000;
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_PARTICIPATING;
|
||||
|
||||
public InboundGatewayReceiver(RouterContext ctx, HopConfig cfg) {
|
||||
_context = ctx;
|
||||
@@ -58,7 +59,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver {
|
||||
out.setMessage(msg);
|
||||
out.setTarget(_target);
|
||||
out.setExpiration(msg.getMessageExpiration());
|
||||
out.setPriority(200);
|
||||
out.setPriority(PRIORITY);
|
||||
_context.outNetMessagePool().add(out);
|
||||
return msg.getUniqueId();
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
|
||||
private RouterInfo _nextHopCache;
|
||||
|
||||
private static final long MAX_LOOKUP_TIME = 15*1000;
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_DATA;
|
||||
|
||||
public OutboundReceiver(RouterContext ctx, TunnelCreatorConfig cfg) {
|
||||
_context = ctx;
|
||||
@@ -61,7 +62,7 @@ class OutboundReceiver implements TunnelGateway.Receiver {
|
||||
m.setMessage(msg);
|
||||
m.setExpiration(msg.getMessageExpiration());
|
||||
m.setTarget(ri);
|
||||
m.setPriority(400);
|
||||
m.setPriority(PRIORITY);
|
||||
_context.outNetMessagePool().add(m);
|
||||
_config.incrementProcessedMessages();
|
||||
}
|
||||
|
||||
@@ -30,6 +30,7 @@ class TunnelParticipant {
|
||||
private static final long MAX_LOOKUP_TIME = 15*1000;
|
||||
/** for next hop when a tunnel is first created */
|
||||
private static final long LONG_MAX_LOOKUP_TIME = 30*1000;
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_PARTICIPATING;
|
||||
|
||||
/** not an inbound endpoint */
|
||||
public TunnelParticipant(RouterContext ctx, HopConfig config, HopProcessor processor) {
|
||||
@@ -196,7 +197,7 @@ class TunnelParticipant {
|
||||
m.setMessage(msg);
|
||||
m.setExpiration(msg.getMessageExpiration());
|
||||
m.setTarget(ri);
|
||||
m.setPriority(200);
|
||||
m.setPriority(PRIORITY);
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Forward on from " + _config + ": " + msg);
|
||||
_context.outNetMessagePool().add(m);
|
||||
|
||||
@@ -61,6 +61,7 @@ class BuildHandler implements Runnable {
|
||||
private static final int MAX_QUEUE = 192;
|
||||
|
||||
private static final int NEXT_HOP_LOOKUP_TIMEOUT = 15*1000;
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_BUILD_REPLY;
|
||||
|
||||
/**
|
||||
* This must be high, as if we timeout the send we remove the tunnel from
|
||||
@@ -689,7 +690,7 @@ class BuildHandler implements Runnable {
|
||||
OutNetMessage msg = new OutNetMessage(_context);
|
||||
msg.setMessage(state.msg);
|
||||
msg.setExpiration(state.msg.getMessageExpiration());
|
||||
msg.setPriority(300);
|
||||
msg.setPriority(PRIORITY);
|
||||
msg.setTarget(nextPeerInfo);
|
||||
if (response == 0)
|
||||
msg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));
|
||||
@@ -722,7 +723,7 @@ class BuildHandler implements Runnable {
|
||||
OutNetMessage outMsg = new OutNetMessage(_context);
|
||||
outMsg.setExpiration(m.getMessageExpiration());
|
||||
outMsg.setMessage(m);
|
||||
outMsg.setPriority(300);
|
||||
outMsg.setPriority(PRIORITY);
|
||||
outMsg.setTarget(nextPeerInfo);
|
||||
if (response == 0)
|
||||
outMsg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg));
|
||||
|
||||
@@ -29,7 +29,9 @@ abstract class BuildRequestor {
|
||||
for (int i = 0; i < TunnelBuildMessage.MAX_RECORD_COUNT; i++)
|
||||
ORDER.add(Integer.valueOf(i));
|
||||
}
|
||||
private static final int PRIORITY = 500;
|
||||
|
||||
private static final int PRIORITY = OutNetMessage.PRIORITY_MY_BUILD_REQUEST;
|
||||
|
||||
/**
|
||||
* At 10 seconds, we were receiving about 20% of replies after expiration
|
||||
* Todo: make this variable on a per-request basis, to account for tunnel length,
|
||||
|
||||
Reference in New Issue
Block a user