diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 393cf1ae216690d37b9903b1772607fb2ec7a902..34dc8ac593d0d1c33c9202d71a1eb0f7a354b5e8 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -130,16 +130,19 @@ public class I2PSocketManagerFactory { if (!opts.containsKey(name)) opts.setProperty(name, System.getProperty(name)); } - boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER)); - if (oldLib && false) { + //boolean oldLib = DEFAULT_MANAGER.equals(opts.getProperty(PROP_MANAGER, DEFAULT_MANAGER)); + //if (oldLib && false) { // for the old streaming lib - opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); + // opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); //opts.setProperty("tunnels.depthInbound", "0"); - } else { + //} else { // for new streaming lib: - opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); + //opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_BEST_EFFORT); + // as of 0.8.1 (I2CP default is BestEffort) + if (!opts.containsKey(I2PClient.PROP_RELIABILITY)) + opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_NONE); //p.setProperty("tunnels.depthInbound", "0"); - } + //} if (i2cpHost != null) opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost); diff --git a/core/java/src/net/i2p/client/I2PClient.java b/core/java/src/net/i2p/client/I2PClient.java index 9a732c3dba0a20fdf9d36560937c8886c254de1f..941549d5372bf453f949fd7af1c62b9a22079f3f 100644 --- a/core/java/src/net/i2p/client/I2PClient.java +++ b/core/java/src/net/i2p/client/I2PClient.java @@ -34,6 +34,8 @@ public interface I2PClient { public final static String PROP_RELIABILITY_BEST_EFFORT = "BestEffort"; /** Reliability value: guaranteed */ public final static String PROP_RELIABILITY_GUARANTEED = "Guaranteed"; + /** @since 0.8.1 */ + public final static String PROP_RELIABILITY_NONE = "none"; /** protocol flag that must be sent when opening the i2cp connection to the router */ public final static int PROTOCOL_BYTE = 0x2A; @@ -64,4 +66,4 @@ public interface I2PClient { * @return newly created destination */ public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException; -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index d8d8c8dfce202f8dd09c4dc37b1b4a6f42f0521d..62b0d3d08b9fcd1d398ad5c9fe2628881b7b7513 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -33,12 +33,14 @@ import net.i2p.util.Log; class I2PSessionImpl2 extends I2PSessionImpl { /** set of MessageState objects, representing all of the messages in the process of being sent */ - private /* FIXME final FIXME */ Set _sendingStates; + private /* FIXME final FIXME */ Set<MessageState> _sendingStates; /** max # seconds to wait for confirmation of the message send */ private final static long SEND_TIMEOUT = 60 * 1000; // 60 seconds to send /** should we gzip each payload prior to sending it? */ private final static boolean SHOULD_COMPRESS = true; private final static boolean SHOULD_DECOMPRESS = true; + /** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */ + private boolean _noEffort; /** for extension */ public I2PSessionImpl2() {} @@ -53,6 +55,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { super(ctx, destKeyStream, options); _log = ctx.logManager().getLog(I2PSessionImpl2.class); _sendingStates = new HashSet(32); + // default is BestEffort + _noEffort = "none".equalsIgnoreCase(options.getProperty(I2PClient.PROP_RELIABILITY)); ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); @@ -60,15 +64,16 @@ class I2PSessionImpl2 extends I2PSessionImpl { //ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage3", "fourth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); //ctx.statManager().createRateStat("i2cp.sendBestEffortStage4", "fifth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); - _context.statManager().createRateStat("i2cp.receiveStatusTime.0", "How long it took to get status=0 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime.2", "How long it took to get status=2 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 60*1000, 10*60*1000 }); - _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 60*1000, 30*60*1000 }); - _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 60*1000, 30*60*1000 }); + //_context.statManager().createRateStat("i2cp.receiveStatusTime.0", "How long it took to get status=0 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.1", "How long it took to get status=1 back", "i2cp", new long[] { 10*60*1000 }); + // best effort codes unused + //_context.statManager().createRateStat("i2cp.receiveStatusTime.2", "How long it took to get status=2 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); + //_context.statManager().createRateStat("i2cp.receiveStatusTime.3", "How long it took to get status=3 back", "i2cp", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.4", "How long it took to get status=4 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime.5", "How long it took to get status=5 back", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.receiveStatusTime", "How long it took to get any status", "i2cp", new long[] { 10*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgCompressed", "compressed size transferred", "i2cp", new long[] { 30*60*1000 }); + _context.statManager().createRateStat("i2cp.tx.msgExpanded", "size before compression", "i2cp", new long[] { 30*60*1000 }); } protected long getTimeout() { @@ -186,7 +191,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { } _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); - return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); + if (_noEffort) + return sendNoEffort(dest, payload, expires); + else + return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); } /** @@ -213,6 +221,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { private static final int NUM_TAGS = 50; /** + * TODO - Don't need to save MessageState since actuallyWait is false... + * But for now just use sendNoEffort() instead. + * * @param keyUsed unused - no end-to-end crypto * @param tagsSent unused - no end-to-end crypto */ @@ -257,7 +268,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { + "ms left, " + oldTags + " tags known and " + (tag == null ? "no tag" : " a valid tag")); } - + if (false) // rekey newKey = _context.keyGenerator().generateSessionKey(); @@ -371,6 +382,37 @@ class I2PSessionImpl2 extends I2PSessionImpl { return found; } + /** + * Same as sendBestEffort(), except we do not expect any MessageStatusMessage responses - + * not for accepted, or success, or failure. + * So we don't create a MessageState and save it on the _sendingStates HashSet + * + * @return true always + * @since 0.8.1 + */ + protected boolean sendNoEffort(Destination dest, byte payload[], long expires) + throws I2PSessionException { + // nonce always 0 + _producer.sendMessage(this, dest, 0, payload, null, null, null, null, expires); + return true; + } + + /** + * Only call this with nonzero status, i.e. for outbound messages + * whose MessageState may be queued on _sendingStates. + * + * Even when using sendBestEffort(), this is a waste, because the + * MessageState is removed from _sendingStates immediately and + * so the lookup here fails. + * And iterating through the HashSet instead of having a map + * is bad too. + * + * This is now pretty much avoided since streaming now sets + * i2cp.messageReliability = none, which forces sendNoEffort() instead of sendBestEffort(), + * so the router won't send us any MSM's for outbound traffic. + * + * @param status != 0 + */ @Override public void receiveStatus(int msgId, long nonce, int status) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); @@ -413,12 +455,13 @@ class I2PSessionImpl2 extends I2PSessionImpl { case 1: _context.statManager().addRateData("i2cp.receiveStatusTime.1", lifetime, 0); break; - case 2: - _context.statManager().addRateData("i2cp.receiveStatusTime.2", lifetime, 0); - break; - case 3: - _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0); - break; + // best effort codes unused + //case 2: + // _context.statManager().addRateData("i2cp.receiveStatusTime.2", lifetime, 0); + // break; + //case 3: + // _context.statManager().addRateData("i2cp.receiveStatusTime.3", lifetime, 0); + // break; case 4: _context.statManager().addRateData("i2cp.receiveStatusTime.4", lifetime, 0); break; diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index e5aa1b5abb4d2a9af84ff6f2d6a57c2178581ed5..f363bd27814dfd7600f4a85126ffa42e214c3cf6 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -17,6 +17,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import net.i2p.client.I2PClient; import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.TransientSessionKeyManager; import net.i2p.data.Destination; @@ -76,11 +77,13 @@ public class ClientConnectionRunner { * This contains the last 10 MessageIds that have had their (non-ack) status * delivered to the client (so that we can be sure only to update when necessary) */ - private final List _alreadyProcessed; + private final List<MessageId> _alreadyProcessed; private ClientWriterRunner _writer; private Hash _destHashCache; /** are we, uh, dead */ private boolean _dead; + /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ + private boolean _dontSendMSM; /** * Create a new runner against the given socket @@ -91,11 +94,9 @@ public class ClientConnectionRunner { _log = _context.logManager().getLog(ClientConnectionRunner.class); _manager = manager; _socket = socket; - _config = null; _messages = new ConcurrentHashMap(); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); - _dead = false; } private static volatile int __id = 0; @@ -189,6 +190,9 @@ public class ClientConnectionRunner { if (_log.shouldLog(Log.DEBUG)) _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); _config = config; + // This is the only option that is interpreted here, not at the tunnel manager + if (config.getOptions() != null) + _dontSendMSM = "none".equalsIgnoreCase(config.getOptions().getProperty(I2PClient.PROP_RELIABILITY)); // per-destination session key manager to prevent rather easy correlation if (_sessionKeyManager == null) _sessionKeyManager = new TransientSessionKeyManager(_context); @@ -197,10 +201,18 @@ public class ClientConnectionRunner { _manager.destinationEstablished(this); } + /** + * Send a notification to the client that their message (id specified) was + * delivered (or failed delivery) + * Note that this sends the Guaranteed status codes, even though we only support best effort. + * Doesn't do anything if i2cp.messageReliability = "none" + */ void updateMessageDeliveryStatus(MessageId id, boolean delivered) { - if (_dead) return; + if (_dead || _dontSendMSM) + return; _context.jobQueue().addJob(new MessageDeliveryStatusUpdate(id, delivered)); } + /** * called after a new leaseSet is granted by the client, the NetworkDb has been * updated. This takes care of all the LeaseRequestState stuff (including firing any jobs) @@ -254,7 +266,8 @@ public class ClientConnectionRunner { long expiration = 0; if (message instanceof SendMessageExpiresMessage) expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime(); - _acceptedPending.add(id); + if (!_dontSendMSM) + _acceptedPending.add(id); if (_log.shouldLog(Log.DEBUG)) _log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size [" @@ -276,9 +289,11 @@ public class ClientConnectionRunner { /** * Send a notification to the client that their message (id specified) was accepted * for delivery (but not necessarily delivered) - * + * Doesn't do anything if i2cp.messageReliability = "none" */ void ackSendMessage(MessageId id, long nonce) { + if (_dontSendMSM) + return; SessionId sid = _sessionId; if (sid == null) return; if (_log.shouldLog(Log.DEBUG)) @@ -517,12 +532,17 @@ public class ClientConnectionRunner { } public String getName() { return "Update Delivery Status"; } + + /** + * Note that this sends the Guaranteed status codes, even though we only support best effort. + */ public void runJob() { if (_dead) return; MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(_messageId.getMessageId()); msg.setSessionId(_sessionId.getSessionId()); + // has to be >= 0, it is initialized to -1 msg.setNonce(2); msg.setSize(0); if (_success)