I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 3b2aa946 authored by zzz's avatar zzz
Browse files

* I2CP:

      - Add new option i2cp.messageReliability=none, which prevents the
        router from sending MessageStatusMessages back in reply to an
        outbound SendMessageMessage. Since the streaming lib always ignored
        the MSMs anyway, make it the default for streaming.
        This will reduce the I2CP traffic significantly.
        MSM handling now avoided, but it is still fairly broken, see
        comments in I2PSessionImpl2.
      - Cleanups, javadoc, rate reduction
parent a687180d
No related branches found
No related tags found
No related merge requests found
...@@ -130,16 +130,19 @@ public class I2PSocketManagerFactory { ...@@ -130,16 +130,19 @@ public class I2PSocketManagerFactory {
if (!opts.containsKey(name)) if (!opts.containsKey(name))
opts.setProperty(name, System.getProperty(name)); opts.setProperty(name, System.getProperty(name));
} }
boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER)); //boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER));
if (oldLib && false) { //if (oldLib && false) {
// for the old streaming lib // for the old streaming lib
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); // opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED);
//opts.setProperty("tunnels.depthInbound", "0"); //opts.setProperty("tunnels.depthInbound", "0");
} else { //} else {
// for new streaming lib: // for new streaming lib:
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); //opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT);
// as of 0.8.1 (I2CP default is BestEffort)
if (!opts.containsKey(I2PClient.PROP_RELIABILITY))
opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE);
//p.setProperty("tunnels.depthInbound", "0"); //p.setProperty("tunnels.depthInbound", "0");
} //}
if (i2cpHost != null) if (i2cpHost != null)
opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost); opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost);
......
...@@ -34,6 +34,8 @@ public interface I2PClient { ...@@ -34,6 +34,8 @@ public interface I2PClient {
public final static String PROP_RELIABILITY_BEST_EFFORT = "BestEffort"; public final static String PROP_RELIABILITY_BEST_EFFORT = "BestEffort";
/** Reliability value: guaranteed */ /** Reliability value: guaranteed */
public final static String PROP_RELIABILITY_GUARANTEED = "Guaranteed"; public final static String PROP_RELIABILITY_GUARANTEED = "Guaranteed";
/** @since 0.8.1 */
public final static String PROP_RELIABILITY_NONE = "none";
/** protocol flag that must be sent when opening the i2cp connection to the router */ /** protocol flag that must be sent when opening the i2cp connection to the router */
public final static int PROTOCOL_BYTE = 0x2A; public final static int PROTOCOL_BYTE = 0x2A;
...@@ -64,4 +66,4 @@ public interface I2PClient { ...@@ -64,4 +66,4 @@ public interface I2PClient {
* @return newly created destination * @return newly created destination
*/ */
public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException; public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException;
} }
\ No newline at end of file
...@@ -33,12 +33,14 @@ import net.i2p.util.Log; ...@@ -33,12 +33,14 @@ import net.i2p.util.Log;
class I2PSessionImpl2 extends I2PSessionImpl { class I2PSessionImpl2 extends I2PSessionImpl {
/** set of MessageState objects, representing all of the messages in the process of being sent */ /** set of MessageState objects, representing all of the messages in the process of being sent */
private /* FIXME final FIXME */ Set _sendingStates; private /* FIXME final FIXME */ Set<MessageState> _sendingStates;
/** max # seconds to wait for confirmation of the message send */ /** max # seconds to wait for confirmation of the message send */
private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send
/** should we gzip each payload prior to sending it? */ /** should we gzip each payload prior to sending it? */
private final static boolean SHOULD_COMPRESS = true; private final static boolean SHOULD_COMPRESS = true;
private final static boolean SHOULD_DECOMPRESS = true; private final static boolean SHOULD_DECOMPRESS = true;
/** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */
private boolean _noEffort;
/** for extension */ /** for extension */
public I2PSessionImpl2() {} public I2PSessionImpl2() {}
...@@ -53,6 +55,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -53,6 +55,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
super(ctx, destKeyStream, options); super(ctx, destKeyStream, options);
_log = ctx.logManager().getLog(I2PSessionImpl2.class); _log = ctx.logManager().getLog(I2PSessionImpl2.class);
_sendingStates = new HashSet(32); _sendingStates = new HashSet(32);
// default is BestEffort
_noEffort = "none".equalsIgnoreCase(options.getProperty(I2PClient.PROP_RELIABILITY));
ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } ); ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
...@@ -60,15 +64,16 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -60,15 +64,16 @@ class I2PSessionImpl2 extends I2PSessionImpl {
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage3", "fourth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage3", "fourth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
//ctx.statManager().createRateStat("i2cp.sendBestEffortStage4", "fifth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage4", "fifth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } );
_context.statManager().createRateStat("i2cp.receiveStatusTime.0", "How long it took to get status=0 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); //_context.statManager().createRateStat("i2cp.receiveStatusTime.0", "How long it took to get status=0 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.2", "How long it took to get status=2 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); // best effort codes unused
_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); //_context.statManager().createRateStat("i2cp.receiveStatusTime.2", "How long it took to get status=2 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); //_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 60*1000, 30*60*1000 }); _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 60*1000, 30*60*1000 }); _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 });
_context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 });
} }
protected long getTimeout() { protected long getTimeout() {
...@@ -186,7 +191,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -186,7 +191,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
} }
_context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0); _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); if (_noEffort)
return sendNoEffort(dest, payload, expires);
else
return sendBestEffort(dest, payload, keyUsed, tagsSent, expires);
} }
/** /**
...@@ -213,6 +221,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -213,6 +221,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private static final int NUM_TAGS = 50; private static final int NUM_TAGS = 50;
/** /**
* TODO - Don't need to save MessageState since actuallyWait is false...
* But for now just use sendNoEffort() instead.
*
* @param keyUsed unused - no end-to-end crypto * @param keyUsed unused - no end-to-end crypto
* @param tagsSent unused - no end-to-end crypto * @param tagsSent unused - no end-to-end crypto
*/ */
...@@ -257,7 +268,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -257,7 +268,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
+ "ms left, " + oldTags + " tags known and " + "ms left, " + oldTags + " tags known and "
+ (tag == null ? "no tag" : " a valid tag")); + (tag == null ? "no tag" : " a valid tag"));
} }
if (false) // rekey if (false) // rekey
newKey = _context.keyGenerator().generateSessionKey(); newKey = _context.keyGenerator().generateSessionKey();
...@@ -371,6 +382,37 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -371,6 +382,37 @@ class I2PSessionImpl2 extends I2PSessionImpl {
return found; return found;
} }
/**
* Same as sendBestEffort(), except we do not expect any MessageStatusMessage responses -
* not for accepted, or success, or failure.
* So we don't create a MessageState and save it on the _sendingStates HashSet
*
* @return true always
* @since 0.8.1
*/
protected boolean sendNoEffort(Destination dest, byte payload[], long expires)
throws I2PSessionException {
// nonce always 0
_producer.sendMessage(this, dest, 0, payload, null, null, null, null, expires);
return true;
}
/**
* Only call this with nonzero status, i.e. for outbound messages
* whose MessageState may be queued on _sendingStates.
*
* Even when using sendBestEffort(), this is a waste, because the
* MessageState is removed from _sendingStates immediately and
* so the lookup here fails.
* And iterating through the HashSet instead of having a map
* is bad too.
*
* This is now pretty much avoided since streaming now sets
* i2cp.messageReliability = none, which forces sendNoEffort() instead of sendBestEffort(),
* so the router won't send us any MSM's for outbound traffic.
*
* @param status != 0
*/
@Override @Override
public void receiveStatus(int msgId, long nonce, int status) { public void receiveStatus(int msgId, long nonce, int status) {
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce);
...@@ -413,12 +455,13 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -413,12 +455,13 @@ class I2PSessionImpl2 extends I2PSessionImpl {
case 1: case 1:
_context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0); _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0);
break; break;
case 2: // best effort codes unused
_context.statManager().addRateData("i2cp.receiveStatusTime.2", lifetime, 0); //case 2:
break; // _context.statManager().addRateData("i2cp.receiveStatusTime.2", lifetime, 0);
case 3: // break;
_context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0); //case 3:
break; // _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0);
// break;
case 4: case 4:
_context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0); _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0);
break; break;
......
...@@ -17,6 +17,7 @@ import java.util.List; ...@@ -17,6 +17,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import net.i2p.client.I2PClient;
import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.SessionKeyManager;
import net.i2p.crypto.TransientSessionKeyManager; import net.i2p.crypto.TransientSessionKeyManager;
import net.i2p.data.Destination; import net.i2p.data.Destination;
...@@ -76,11 +77,13 @@ public class ClientConnectionRunner { ...@@ -76,11 +77,13 @@ public class ClientConnectionRunner {
* This contains the last 10 MessageIds that have had their (non-ack) status * This contains the last 10 MessageIds that have had their (non-ack) status
* delivered to the client (so that we can be sure only to update when necessary) * delivered to the client (so that we can be sure only to update when necessary)
*/ */
private final List _alreadyProcessed; private final List<MessageId> _alreadyProcessed;
private ClientWriterRunner _writer; private ClientWriterRunner _writer;
private Hash _destHashCache; private Hash _destHashCache;
/** are we, uh, dead */ /** are we, uh, dead */
private boolean _dead; private boolean _dead;
/** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */
private boolean _dontSendMSM;
/** /**
* Create a new runner against the given socket * Create a new runner against the given socket
...@@ -91,11 +94,9 @@ public class ClientConnectionRunner { ...@@ -91,11 +94,9 @@ public class ClientConnectionRunner {
_log = _context.logManager().getLog(ClientConnectionRunner.class); _log = _context.logManager().getLog(ClientConnectionRunner.class);
_manager = manager; _manager = manager;
_socket = socket; _socket = socket;
_config = null;
_messages = new ConcurrentHashMap(); _messages = new ConcurrentHashMap();
_alreadyProcessed = new ArrayList(); _alreadyProcessed = new ArrayList();
_acceptedPending = new ConcurrentHashSet(); _acceptedPending = new ConcurrentHashSet();
_dead = false;
} }
private static volatile int __id = 0; private static volatile int __id = 0;
...@@ -189,6 +190,9 @@ public class ClientConnectionRunner { ...@@ -189,6 +190,9 @@ public class ClientConnectionRunner {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64());
_config = config; _config = config;
// This is the only option that is interpreted here, not at the tunnel manager
if (config.getOptions() != null)
_dontSendMSM = "none".equalsIgnoreCase(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY));
// per-destination session key manager to prevent rather easy correlation // per-destination session key manager to prevent rather easy correlation
if (_sessionKeyManager == null) if (_sessionKeyManager == null)
_sessionKeyManager = new TransientSessionKeyManager(_context); _sessionKeyManager = new TransientSessionKeyManager(_context);
...@@ -197,10 +201,18 @@ public class ClientConnectionRunner { ...@@ -197,10 +201,18 @@ public class ClientConnectionRunner {
_manager.destinationEstablished(this); _manager.destinationEstablished(this);
} }
/**
* Send a notification to the client that their message (id specified) was
* delivered (or failed delivery)
* Note that this sends the Guaranteed status codes, even though we only support best effort.
* Doesn't do anything if i2cp.messageReliability = "none"
*/
void updateMessageDeliveryStatus(MessageId id, boolean delivered) { void updateMessageDeliveryStatus(MessageId id, boolean delivered) {
if (_dead) return; if (_dead || _dontSendMSM)
return;
_context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, delivered)); _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, delivered));
} }
/** /**
* called after a new leaseSet is granted by the client, the NetworkDb has been * called after a new leaseSet is granted by the client, the NetworkDb has been
* updated. This takes care of all the LeaseRequestState stuff (including firing any jobs) * updated. This takes care of all the LeaseRequestState stuff (including firing any jobs)
...@@ -254,7 +266,8 @@ public class ClientConnectionRunner { ...@@ -254,7 +266,8 @@ public class ClientConnectionRunner {
long expiration = 0; long expiration = 0;
if (message instanceof SendMessageExpiresMessage) if (message instanceof SendMessageExpiresMessage)
expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime(); expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime();
_acceptedPending.add(id); if (!_dontSendMSM)
_acceptedPending.add(id);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size [" _log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size ["
...@@ -276,9 +289,11 @@ public class ClientConnectionRunner { ...@@ -276,9 +289,11 @@ public class ClientConnectionRunner {
/** /**
* Send a notification to the client that their message (id specified) was accepted * Send a notification to the client that their message (id specified) was accepted
* for delivery (but not necessarily delivered) * for delivery (but not necessarily delivered)
* * Doesn't do anything if i2cp.messageReliability = "none"
*/ */
void ackSendMessage(MessageId id, long nonce) { void ackSendMessage(MessageId id, long nonce) {
if (_dontSendMSM)
return;
SessionId sid = _sessionId; SessionId sid = _sessionId;
if (sid == null) return; if (sid == null) return;
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
...@@ -517,12 +532,17 @@ public class ClientConnectionRunner { ...@@ -517,12 +532,17 @@ public class ClientConnectionRunner {
} }
public String getName() { return "Update Delivery Status"; } public String getName() { return "Update Delivery Status"; }
/**
* Note that this sends the Guaranteed status codes, even though we only support best effort.
*/
public void runJob() { public void runJob() {
if (_dead) return; if (_dead) return;
MessageStatusMessage msg = new MessageStatusMessage(); MessageStatusMessage msg = new MessageStatusMessage();
msg.setMessageId(_messageId.getMessageId()); msg.setMessageId(_messageId.getMessageId());
msg.setSessionId(_sessionId.getSessionId()); msg.setSessionId(_sessionId.getSessionId());
// has to be >= 0, it is initialized to -1
msg.setNonce(2); msg.setNonce(2);
msg.setSize(0); msg.setSize(0);
if (_success) if (_success)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment