diff --git a/core/java/src/net/i2p/client/I2CPMessageProducer.java b/core/java/src/net/i2p/client/I2CPMessageProducer.java index afb7e69bde5cddcd40d1441846ec22fc15b2aecc..bcf3876aea9c701054b0434f1c0f29af3a2b605e 100644 --- a/core/java/src/net/i2p/client/I2CPMessageProducer.java +++ b/core/java/src/net/i2p/client/I2CPMessageProducer.java @@ -158,6 +158,26 @@ class I2CPMessageProducer { session.sendMessage(msg); } + /** + * Package up and send the payload to the router for delivery + * @since 0.9.2 + */ + public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, + SendMessageOptions options) throws I2PSessionException { + + long expires = options.getTime(); + if (!updateBps(payload.length, expires)) + // drop the message... send fail notification? + return; + SendMessageMessage msg = new SendMessageExpiresMessage(options); + msg.setDestination(dest); + msg.setSessionId(session.getSessionId()); + msg.setNonce(nonce); + Payload data = createPayload(dest, payload, null, null, null, null); + msg.setPayload(data); + session.sendMessage(msg); + } + /** * Super-simple bandwidth throttler. * We only calculate on a one-second basis, so large messages diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index c273e378489869dd762734dce7ed30a8cdd66645..598ddf1f90f244c9130c7d3455b2ee859c399724 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -97,6 +97,7 @@ public interface I2PSession { * End-to-End Crypto is disabled, tags and keys are ignored. * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. + * @param expire absolute expiration timestamp, NOT interval from now * @since 0.7.1 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException; @@ -116,6 +117,7 @@ public interface I2PSession { * End-to-End Crypto is disabled, tags and keys are ignored. * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. + * @param expire absolute expiration timestamp, NOT interval from now * @since 0.7.1 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, @@ -126,11 +128,20 @@ public interface I2PSession { * End-to-End Crypto is disabled, tags and keys are ignored. * @param keyUsed UNUSED, IGNORED. * @param tagsSent UNUSED, IGNORED. + * @param expire absolute expiration timestamp, NOT interval from now * @since 0.8.4 */ public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire, int proto, int fromport, int toport, int flags) throws I2PSessionException; + /** + * See I2PSessionMuxedImpl for proto/port details. + * See SendMessageOptions for option details. + * @since 0.9.2 + */ + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, + int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException; + /** Receive a message that the router has notified the client about, returning * the payload. * This may only be called once for a given msgId (until the counter wraps) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 812119df123d201d701124ec37a768c68633d2c3..02371eea3478222a513094219990e1f2e84ff98b 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -42,7 +42,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { private final static boolean SHOULD_COMPRESS = true; private final static boolean SHOULD_DECOMPRESS = true; /** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */ - private boolean _noEffort; + protected boolean _noEffort; /** for extension */ protected I2PSessionImpl2(I2PAppContext context, Properties options) { @@ -139,11 +139,17 @@ class I2PSessionImpl2 extends I2PSessionImpl { int proto, int fromport, int toport, int flags) throws I2PSessionException { throw new IllegalArgumentException("Use MuxedImpl"); } + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, + int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException { + throw new IllegalArgumentException("Use MuxedImpl"); + } + /** unused, see MuxedImpl override */ @Override public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException { return sendMessage(dest, payload, 0, payload.length); } + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException { // we don't do end-to-end crypto any more //return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64), 0); @@ -169,6 +175,8 @@ class I2PSessionImpl2 extends I2PSessionImpl { } /** + * Unused? see MuxedImpl override + * * @param keyUsed unused - no end-to-end crypto * @param tagsSent unused - no end-to-end crypto */ @@ -202,7 +210,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { _context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); if (_noEffort) - return sendNoEffort(dest, payload, expires); + return sendNoEffort(dest, payload, expires, 0); else return sendBestEffort(dest, payload, keyUsed, tagsSent, expires); } @@ -407,10 +415,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { * @return true always * @since 0.8.1 */ - protected boolean sendNoEffort(Destination dest, byte payload[], long expires) + protected boolean sendNoEffort(Destination dest, byte payload[], long expires, int flags) throws I2PSessionException { // nonce always 0 - _producer.sendMessage(this, dest, 0, payload, null, null, null, null, expires); + _producer.sendMessage(this, dest, 0, payload, expires, flags); return true; } diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index 45a3510efa4fc70df20b0ec0b7822b579236ad1d..e1f208e5531ac8efda69ca1ad1ae6fea3c8dc9dc 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -63,6 +63,7 @@ import net.i2p.util.SimpleScheduler; * Needs some streaming lib hacking * * @author zzz + * @since 0.7.1 */ class I2PSessionMuxedImpl extends I2PSessionImpl2 { @@ -208,7 +209,64 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 { _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0); _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); - return sendBestEffort(dest, payload, expires, flags); + if (_noEffort) + return sendNoEffort(dest, payload, expires, flags); + else + return sendBestEffort(dest, payload, expires, flags); + } + + /** + * See SendMessageOptions for option details. + * + * Always uses sendNoEffort for now. These are presumed to be datagrams. + * SendMessageOptions 16-bit flag field is currently undefined, so + * serialization won't work; therefore this only makes sense in RouterContext, + * for now. + * + * @param proto 1-254 or 0 for unset; recommended: + * I2PSession.PROTO_UNSPECIFIED + * I2PSession.PROTO_STREAMING + * I2PSession.PROTO_DATAGRAM + * 255 disallowed + * @param fromPort 1-65535 or 0 for unset + * @param toPort 1-65535 or 0 for unset + * @param options to be passed to the router + * @since 0.9.2 + */ + @Override + public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, + int proto, int fromPort, int toPort, SendMessageOptions options) throws I2PSessionException { + if (isClosed()) throw new I2PSessionException("Already closed"); + updateActivity(); + + boolean sc = shouldCompress(size); + if (sc) + payload = DataHelper.compress(payload, offset, size); + else + payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION); + + setProto(payload, proto); + setFromPort(payload, fromPort); + setToPort(payload, toPort); + + _context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0); + _context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0); + //if (_noEffort) { + sendNoEffort(dest, payload, options); + return true; + //} else { + // unimplemented + //return sendBestEffort(dest, payload, options); + //} + } + + /** + * @since 0.9.2 + */ + private void sendNoEffort(Destination dest, byte payload[], SendMessageOptions options) + throws I2PSessionException { + // nonce always 0 + _producer.sendMessage(this, dest, 0, payload, options); } /** diff --git a/core/java/src/net/i2p/client/SendMessageOptions.java b/core/java/src/net/i2p/client/SendMessageOptions.java new file mode 100644 index 0000000000000000000000000000000000000000..8706a76599b757d70ff18da28a78f0ed9e91c78c --- /dev/null +++ b/core/java/src/net/i2p/client/SendMessageOptions.java @@ -0,0 +1,151 @@ +package net.i2p.client; + +import net.i2p.data.DateAndFlags; + +/** + * Advanced options attached to a single outgoing I2CP message. + * + * Note that the packing of options into the 16-bit flags field is + * is subject to change. Therefore, for now, this is only recommended + * within RouterContext. + * + * Static methods are for OutboundClientMessageOneShotJob to decode the + * flags field on the router side. + * + * @since 0.9.2 + */ +public class SendMessageOptions extends DateAndFlags { + + /** all subject to change */ + + /** + * 1 means don't send, 0 means default + */ + private static final int LS_MASK = 0x0001; + + /** + * Tags to send field: + *<pre> + * 000 - default + * 001 - 2 + * 010 - 4 + * 011 - 8 + * 100 - 16 + * 101 - 32 + * 110 - 64 + * 111 - 128 + *</pre> + */ + private static final int TAGS_SEND_MASK = 0x000e; + private static final int MAX_SEND_TAGS = 128; + + /** + * Tags threshold field: + *<pre> + * 000 - default + * 001 - 1 + * 010 - 2 + * 011 - 4 + * 100 - 8 + * 101 - 16 + * 110 - 32 + * 111 - 64 + *</pre> + */ + private static final int TAGS_REQD_MASK = 0x0070; + private static final int MAX_REQD_TAGS = 64; + + /** default true */ + public void setSendLeaseSet(boolean yes) { + if (yes) + _flags &= ~LS_MASK; + else + _flags |= LS_MASK; + } + + /** default true */ + public boolean getSendLeaseSet() { + return getSendLeaseSet(_flags); + } + + /** default true */ + public static boolean getSendLeaseSet(int flags) { + return (flags & LS_MASK) == 0; + } + + /** + * If we are low on tags, send this many. + * Power of 2 recommended - rounds down. + * default 0, meaning unset + * @param tags 0 or 2 to 128 + */ + public void setTagsToSend(int tags) { + if (tags < 0) + throw new IllegalArgumentException(); + _flags &= ~TAGS_SEND_MASK; + _flags |= linToExp(Math.min(tags, MAX_SEND_TAGS) / 2) << 1; + } + + /** + * If we are low on tags, send this many. + * @return default 0, meaning unset + */ + public int getTagsToSend() { + return getTagsToSend(_flags); + } + + /** + * If we are low on tags, send this many. + * @return default 0, meaning unset + */ + public static int getTagsToSend(int flags) { + int exp = (flags & TAGS_SEND_MASK) >> 1; + return 2 * expToLin(exp); + } + + /** + * Low tag threshold. If less than this many, send more. + * Power of 2 recommended - rounds down. + * default 0, meaning unset + * @param tags 0 to 64 + */ + public void setTagThreshold(int tags) { + if (tags < 0) + throw new IllegalArgumentException(); + _flags &= ~TAGS_REQD_MASK; + _flags |= linToExp(Math.min(tags, MAX_REQD_TAGS)) << 4; + } + + /** + * Low tag threshold. If less than this many, send more. + * @return default 0, meaning unset + */ + public int getTagThreshold() { + return getTagThreshold(_flags); + } + + /** + * Low tag threshold. If less than this many, send more. + * @return default 0, meaning unset + */ + public static int getTagThreshold(int flags) { + int exp = (flags & TAGS_REQD_MASK) >> 4; + return expToLin(exp); + } + + /** rounds down */ + private static int linToExp(int lin) { + int exp = 0; + while (lin > 0) { + exp++; + lin >>= 1; + } + return exp; + } + + private static int expToLin(int exp) { + if (exp <= 0) + return 0; + return 1 << (exp - 1); + } +} diff --git a/core/java/src/net/i2p/data/DateAndFlags.java b/core/java/src/net/i2p/data/DateAndFlags.java index 0810cddca6e9db88719b7ace262f38803c0ca30b..72e33c7079c1cb0f6d028826e7d30649fc5d1755 100644 --- a/core/java/src/net/i2p/data/DateAndFlags.java +++ b/core/java/src/net/i2p/data/DateAndFlags.java @@ -28,7 +28,7 @@ import java.util.Date; * @since 0.8.4 */ public class DateAndFlags extends DataStructureImpl { - private int _flags; + protected int _flags; private long _date; public DateAndFlags() {} diff --git a/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java b/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java index 072ab59e3cd5796fae589536c54ee7155146f4c3..c473e44c5f758c2d7f08ef5588ca647a4ed4803c 100644 --- a/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java +++ b/core/java/src/net/i2p/data/i2cp/SendMessageExpiresMessage.java @@ -31,8 +31,13 @@ public class SendMessageExpiresMessage extends SendMessageMessage { private final DateAndFlags _daf; public SendMessageExpiresMessage() { + this(new DateAndFlags()); + } + + /** @since 0.9.2 */ + public SendMessageExpiresMessage(DateAndFlags options) { super(); - _daf = new DateAndFlags(); + _daf = options; } /** diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index 946a2f32a765ec3e75911b8d786ae937a4b24b23..ec34b005aa84df2cb4f92bbe70d9867a4aea2c18 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -33,14 +33,19 @@ import net.i2p.util.Log; */ public class GarlicMessageBuilder { - /** @param local non-null; do not use this method for the router's SessionKeyManager */ - public static boolean needsTags(RouterContext ctx, PublicKey key, Hash local) { + /** + * @param local non-null; do not use this method for the router's SessionKeyManager + * @param minTagOverride 0 for no override, > 0 to override SKM's settings + */ + static boolean needsTags(RouterContext ctx, PublicKey key, Hash local, int minTagOverride) { SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(local); if (skm == null) return true; SessionKey curKey = skm.getCurrentKey(key); if (curKey == null) return true; + if (minTagOverride > 0) + return skm.shouldSendTags(key, curKey, minTagOverride); return skm.shouldSendTags(key, curKey); } @@ -141,16 +146,17 @@ public class GarlicMessageBuilder { } if (log.shouldLog(Log.INFO)) - log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration())); + log.info("Encrypted with public key to expire on " + new Date(config.getExpiration())); SessionKey curKey = skm.getCurrentOrNewKey(key); SessionTag curTag = null; curTag = skm.consumeNextAvailableTag(key, curKey); - int availTags = skm.getAvailableTags(key, curKey); - if (log.shouldLog(Log.DEBUG)) - log.debug("Available tags for encryption to " + key + ": " + availTags); + if (log.shouldLog(Log.DEBUG)) { + int availTags = skm.getAvailableTags(key, curKey); + log.debug("Available tags for encryption: " + availTags + " low threshold: " + lowTagsThreshold); + } if (numTagsToDeliver > 0 && skm.shouldSendTags(key, curKey, lowTagsThreshold)) { for (int i = 0; i < numTagsToDeliver; i++) diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java index cbbb834343f89d3be8c196c335f55ddf84397407..820e7d6b12dfbc88d58f5e5f1b7af5626dcc3b08 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java @@ -61,8 +61,8 @@ class OutboundClientMessageJobHelper { SessionKey wrappedKey, Set<SessionTag> wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) { PayloadGarlicConfig dataClove = buildDataClove(ctx, data, dest, expiration); - return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, wrappedKey, - wrappedTags, requireAck, bundledReplyLeaseSet); + return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, + 0, 0, wrappedKey, wrappedTags, requireAck, bundledReplyLeaseSet); } /** * Allow the app to specify the data clove directly, which enables OutboundClientMessage to resend the @@ -70,12 +70,15 @@ class OutboundClientMessageJobHelper { * * This is called from OCMOSJ * + * @param tagsToSendOverride if > 0, use this instead of skm's default + * @param lowTagsOverride if > 0, use this instead of skm's default * @param wrappedKey output parameter that will be filled with the sessionKey used * @param wrappedTags output parameter that will be filled with the sessionTags used * @return garlic, or null if no tunnels were found (or other errors) */ static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, - PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, SessionKey wrappedKey, + PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, + int tagsToSendOverride, int lowTagsOverride, SessionKey wrappedKey, Set<SessionTag> wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) { GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, requireAck, bundledReplyLeaseSet); if (config == null) @@ -84,9 +87,10 @@ class OutboundClientMessageJobHelper { if (skm == null) return null; // no use sending tags unless we have a reply token set up already - int tagsToSend = replyToken >= 0 ? skm.getTagsToSend() : 0; + int tagsToSend = replyToken >= 0 ? (tagsToSendOverride > 0 ? tagsToSendOverride : skm.getTagsToSend()) : 0; + int lowThreshold = lowTagsOverride > 0 ? lowTagsOverride : skm.getLowThreshold(); GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags, - tagsToSend, skm); + tagsToSend, lowThreshold, skm); return msg; } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 67a490d9fa022b2d9b0bb547474a3912888f853c..349dd7a5499a544e6926bc11c923c62a1e1b525c 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Properties; import java.util.Set; +import net.i2p.client.SendMessageOptions; import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.TagSetHandle; import net.i2p.data.Certificate; @@ -135,8 +136,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl { // otherwise router config, otherwise default long overallExpiration = msg.getExpiration(); if (overallExpiration > 0) { + if (overallExpiration < 24*60*60*1000l) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Client bug - interval instead of timestamp " + overallExpiration); + overallExpiration += _start; + } // Unless it's already expired, set a min and max expiration - if (overallExpiration <= _start) { + if (overallExpiration > _start) { overallExpiration = Math.max(overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN); overallExpiration = Math.min(overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT); if (_log.shouldLog(Log.INFO)) @@ -430,19 +436,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl { boolean shouldRequestReply = lastReplyRequestSent == null || lastReplyRequestSent.longValue() < now - REPLY_REQUEST_INTERVAL; + int sendFlags = _clientMessage.getFlags(); + // Per-message flag > 0 overrides per-session option + int tagsRequired = SendMessageOptions.getTagThreshold(sendFlags); boolean wantACK = _wantACK || shouldRequestReply || - // TODO: check the per-message flags also - GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(), _from.calculateHash()); + GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(), + _from.calculateHash(), tagsRequired); PublicKey key = _leaseSet.getEncryptionKey(); SessionKey sessKey = new SessionKey(); Set<SessionTag> tags = new HashSet(); LeaseSet replyLeaseSet; - // TODO: check the per-message flags also + // Per-message flag == false overrides session option which is default true String allow = _clientMessage.getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET); - boolean allowLeaseBundle = allow == null || Boolean.valueOf(allow).booleanValue(); + boolean allowLeaseBundle = SendMessageOptions.getSendLeaseSet(sendFlags) && + (allow == null || Boolean.valueOf(allow).booleanValue()); if (allowLeaseBundle) { // If we want an ack, bundle a leaseSet... //replyLeaseSet = getReplyLeaseSet(wantACK); @@ -473,11 +483,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl { //if (_log.shouldLog(Log.DEBUG)) // _log.debug(getJobId() + ": Clove built to " + _toString); long msgExpiration = _overallExpiration; // getContext().clock().now() + OVERALL_TIMEOUT_MS_DEFAULT; + // Per-message flag > 0 overrides per-session option + int tagsToSend = SendMessageOptions.getTagsToSend(sendFlags); GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token, msgExpiration, key, _clove, _from.calculateHash(), - _to, _inTunnel, - sessKey, tags, + _to, _inTunnel, tagsToSend, + tagsRequired, sessKey, tags, wantACK, replyLeaseSet); if (msg == null) { // set to null if there are no tunnels to ack the reply back through