- Add methods for sending a message with extended options
   - Fix cases where the efficient sendNoEffort() wasn't being used
 * OCMOSJ:
   - Implement per-message overrides for tag threshold,
     tags to send, and bundle leaseset
   - Fix bug adjusting timeouts
   - Warn on client expiration time bugs
This commit is contained in:
zzz
2012-08-24 22:11:02 +00:00
parent 07c21c3bfd
commit ba0408a741
10 changed files with 300 additions and 25 deletions

View File

@@ -158,6 +158,26 @@ class I2CPMessageProducer {
session.sendMessage(msg);
}
/**
* Package up and send the payload to the router for delivery
* @since 0.9.2
*/
public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload,
SendMessageOptions options) throws I2PSessionException {
long expires = options.getTime();
if (!updateBps(payload.length, expires))
// drop the message... send fail notification?
return;
SendMessageMessage msg = new SendMessageExpiresMessage(options);
msg.setDestination(dest);
msg.setSessionId(session.getSessionId());
msg.setNonce(nonce);
Payload data = createPayload(dest, payload, null, null, null, null);
msg.setPayload(data);
session.sendMessage(msg);
}
/**
* Super-simple bandwidth throttler.
* We only calculate on a one-second basis, so large messages

View File

@@ -97,6 +97,7 @@ public interface I2PSession {
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param expire absolute expiration timestamp, NOT interval from now
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire) throws I2PSessionException;
@@ -116,6 +117,7 @@ public interface I2PSession {
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param expire absolute expiration timestamp, NOT interval from now
* @since 0.7.1
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
@@ -126,11 +128,20 @@ public interface I2PSession {
* End-to-End Crypto is disabled, tags and keys are ignored.
* @param keyUsed UNUSED, IGNORED.
* @param tagsSent UNUSED, IGNORED.
* @param expire absolute expiration timestamp, NOT interval from now
* @since 0.8.4
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expire,
int proto, int fromport, int toport, int flags) throws I2PSessionException;
/**
* See I2PSessionMuxedImpl for proto/port details.
* See SendMessageOptions for option details.
* @since 0.9.2
*/
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException;
/** Receive a message that the router has notified the client about, returning
* the payload.
* This may only be called once for a given msgId (until the counter wraps)

View File

@@ -42,7 +42,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private final static boolean SHOULD_COMPRESS = true;
private final static boolean SHOULD_DECOMPRESS = true;
/** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */
private boolean _noEffort;
protected boolean _noEffort;
/** for extension */
protected I2PSessionImpl2(I2PAppContext context, Properties options) {
@@ -139,11 +139,17 @@ class I2PSessionImpl2 extends I2PSessionImpl {
int proto, int fromport, int toport, int flags) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
}
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromport, int toport, SendMessageOptions options) throws I2PSessionException {
throw new IllegalArgumentException("Use MuxedImpl");
}
/** unused, see MuxedImpl override */
@Override
public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException {
return sendMessage(dest, payload, 0, payload.length);
}
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size) throws I2PSessionException {
// we don't do end-to-end crypto any more
//return sendMessage(dest, payload, offset, size, new SessionKey(), new HashSet(64), 0);
@@ -169,6 +175,8 @@ class I2PSessionImpl2 extends I2PSessionImpl {
}
/**
* Unused? see MuxedImpl override
*
* @param keyUsed unused - no end-to-end crypto
* @param tagsSent unused - no end-to-end crypto
*/
@@ -202,7 +210,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
_context.statManager().addRateData("i2cp.tx.msgCompressed", compressed, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
if (_noEffort)
return sendNoEffort(dest, payload, expires);
return sendNoEffort(dest, payload, expires, 0);
else
return sendBestEffort(dest, payload, keyUsed, tagsSent, expires);
}
@@ -407,10 +415,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
* @return true always
* @since 0.8.1
*/
protected boolean sendNoEffort(Destination dest, byte payload[], long expires)
protected boolean sendNoEffort(Destination dest, byte payload[], long expires, int flags)
throws I2PSessionException {
// nonce always 0
_producer.sendMessage(this, dest, 0, payload, null, null, null, null, expires);
_producer.sendMessage(this, dest, 0, payload, expires, flags);
return true;
}

View File

@@ -63,6 +63,7 @@ import net.i2p.util.SimpleScheduler;
* Needs some streaming lib hacking
*
* @author zzz
* @since 0.7.1
*/
class I2PSessionMuxedImpl extends I2PSessionImpl2 {
@@ -208,7 +209,64 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 {
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
return sendBestEffort(dest, payload, expires, flags);
if (_noEffort)
return sendNoEffort(dest, payload, expires, flags);
else
return sendBestEffort(dest, payload, expires, flags);
}
/**
* See SendMessageOptions for option details.
*
* Always uses sendNoEffort for now. These are presumed to be datagrams.
* SendMessageOptions 16-bit flag field is currently undefined, so
* serialization won't work; therefore this only makes sense in RouterContext,
* for now.
*
* @param proto 1-254 or 0 for unset; recommended:
* I2PSession.PROTO_UNSPECIFIED
* I2PSession.PROTO_STREAMING
* I2PSession.PROTO_DATAGRAM
* 255 disallowed
* @param fromPort 1-65535 or 0 for unset
* @param toPort 1-65535 or 0 for unset
* @param options to be passed to the router
* @since 0.9.2
*/
@Override
public boolean sendMessage(Destination dest, byte[] payload, int offset, int size,
int proto, int fromPort, int toPort, SendMessageOptions options) throws I2PSessionException {
if (isClosed()) throw new I2PSessionException("Already closed");
updateActivity();
boolean sc = shouldCompress(size);
if (sc)
payload = DataHelper.compress(payload, offset, size);
else
payload = DataHelper.compress(payload, offset, size, DataHelper.NO_COMPRESSION);
setProto(payload, proto);
setFromPort(payload, fromPort);
setToPort(payload, toPort);
_context.statManager().addRateData("i2cp.tx.msgCompressed", payload.length, 0);
_context.statManager().addRateData("i2cp.tx.msgExpanded", size, 0);
//if (_noEffort) {
sendNoEffort(dest, payload, options);
return true;
//} else {
// unimplemented
//return sendBestEffort(dest, payload, options);
//}
}
/**
* @since 0.9.2
*/
private void sendNoEffort(Destination dest, byte payload[], SendMessageOptions options)
throws I2PSessionException {
// nonce always 0
_producer.sendMessage(this, dest, 0, payload, options);
}
/**

View File

@@ -0,0 +1,151 @@
package net.i2p.client;
import net.i2p.data.DateAndFlags;
/**
* Advanced options attached to a single outgoing I2CP message.
*
* Note that the packing of options into the 16-bit flags field is
* is subject to change. Therefore, for now, this is only recommended
* within RouterContext.
*
* Static methods are for OutboundClientMessageOneShotJob to decode the
* flags field on the router side.
*
* @since 0.9.2
*/
public class SendMessageOptions extends DateAndFlags {
/** all subject to change */
/**
* 1 means don't send, 0 means default
*/
private static final int LS_MASK = 0x0001;
/**
* Tags to send field:
*<pre>
* 000 - default
* 001 - 2
* 010 - 4
* 011 - 8
* 100 - 16
* 101 - 32
* 110 - 64
* 111 - 128
*</pre>
*/
private static final int TAGS_SEND_MASK = 0x000e;
private static final int MAX_SEND_TAGS = 128;
/**
* Tags threshold field:
*<pre>
* 000 - default
* 001 - 1
* 010 - 2
* 011 - 4
* 100 - 8
* 101 - 16
* 110 - 32
* 111 - 64
*</pre>
*/
private static final int TAGS_REQD_MASK = 0x0070;
private static final int MAX_REQD_TAGS = 64;
/** default true */
public void setSendLeaseSet(boolean yes) {
if (yes)
_flags &= ~LS_MASK;
else
_flags |= LS_MASK;
}
/** default true */
public boolean getSendLeaseSet() {
return getSendLeaseSet(_flags);
}
/** default true */
public static boolean getSendLeaseSet(int flags) {
return (flags & LS_MASK) == 0;
}
/**
* If we are low on tags, send this many.
* Power of 2 recommended - rounds down.
* default 0, meaning unset
* @param tags 0 or 2 to 128
*/
public void setTagsToSend(int tags) {
if (tags < 0)
throw new IllegalArgumentException();
_flags &= ~TAGS_SEND_MASK;
_flags |= linToExp(Math.min(tags, MAX_SEND_TAGS) / 2) << 1;
}
/**
* If we are low on tags, send this many.
* @return default 0, meaning unset
*/
public int getTagsToSend() {
return getTagsToSend(_flags);
}
/**
* If we are low on tags, send this many.
* @return default 0, meaning unset
*/
public static int getTagsToSend(int flags) {
int exp = (flags & TAGS_SEND_MASK) >> 1;
return 2 * expToLin(exp);
}
/**
* Low tag threshold. If less than this many, send more.
* Power of 2 recommended - rounds down.
* default 0, meaning unset
* @param tags 0 to 64
*/
public void setTagThreshold(int tags) {
if (tags < 0)
throw new IllegalArgumentException();
_flags &= ~TAGS_REQD_MASK;
_flags |= linToExp(Math.min(tags, MAX_REQD_TAGS)) << 4;
}
/**
* Low tag threshold. If less than this many, send more.
* @return default 0, meaning unset
*/
public int getTagThreshold() {
return getTagThreshold(_flags);
}
/**
* Low tag threshold. If less than this many, send more.
* @return default 0, meaning unset
*/
public static int getTagThreshold(int flags) {
int exp = (flags & TAGS_REQD_MASK) >> 4;
return expToLin(exp);
}
/** rounds down */
private static int linToExp(int lin) {
int exp = 0;
while (lin > 0) {
exp++;
lin >>= 1;
}
return exp;
}
private static int expToLin(int exp) {
if (exp <= 0)
return 0;
return 1 << (exp - 1);
}
}

View File

@@ -28,7 +28,7 @@ import java.util.Date;
* @since 0.8.4
*/
public class DateAndFlags extends DataStructureImpl {
private int _flags;
protected int _flags;
private long _date;
public DateAndFlags() {}

View File

@@ -31,8 +31,13 @@ public class SendMessageExpiresMessage extends SendMessageMessage {
private final DateAndFlags _daf;
public SendMessageExpiresMessage() {
this(new DateAndFlags());
}
/** @since 0.9.2 */
public SendMessageExpiresMessage(DateAndFlags options) {
super();
_daf = new DateAndFlags();
_daf = options;
}
/**

View File

@@ -33,14 +33,19 @@ import net.i2p.util.Log;
*/
public class GarlicMessageBuilder {
/** @param local non-null; do not use this method for the router's SessionKeyManager */
public static boolean needsTags(RouterContext ctx, PublicKey key, Hash local) {
/**
* @param local non-null; do not use this method for the router's SessionKeyManager
* @param minTagOverride 0 for no override, > 0 to override SKM's settings
*/
static boolean needsTags(RouterContext ctx, PublicKey key, Hash local, int minTagOverride) {
SessionKeyManager skm = ctx.clientManager().getClientSessionKeyManager(local);
if (skm == null)
return true;
SessionKey curKey = skm.getCurrentKey(key);
if (curKey == null)
return true;
if (minTagOverride > 0)
return skm.shouldSendTags(key, curKey, minTagOverride);
return skm.shouldSendTags(key, curKey);
}
@@ -141,16 +146,17 @@ public class GarlicMessageBuilder {
}
if (log.shouldLog(Log.INFO))
log.info("Encrypted with public key " + key + " to expire on " + new Date(config.getExpiration()));
log.info("Encrypted with public key to expire on " + new Date(config.getExpiration()));
SessionKey curKey = skm.getCurrentOrNewKey(key);
SessionTag curTag = null;
curTag = skm.consumeNextAvailableTag(key, curKey);
int availTags = skm.getAvailableTags(key, curKey);
if (log.shouldLog(Log.DEBUG))
log.debug("Available tags for encryption to " + key + ": " + availTags);
if (log.shouldLog(Log.DEBUG)) {
int availTags = skm.getAvailableTags(key, curKey);
log.debug("Available tags for encryption: " + availTags + " low threshold: " + lowTagsThreshold);
}
if (numTagsToDeliver > 0 && skm.shouldSendTags(key, curKey, lowTagsThreshold)) {
for (int i = 0; i < numTagsToDeliver; i++)

View File

@@ -61,8 +61,8 @@ class OutboundClientMessageJobHelper {
SessionKey wrappedKey, Set<SessionTag> wrappedTags,
boolean requireAck, LeaseSet bundledReplyLeaseSet) {
PayloadGarlicConfig dataClove = buildDataClove(ctx, data, dest, expiration);
return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, wrappedKey,
wrappedTags, requireAck, bundledReplyLeaseSet);
return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel,
0, 0, wrappedKey, wrappedTags, requireAck, bundledReplyLeaseSet);
}
/**
* Allow the app to specify the data clove directly, which enables OutboundClientMessage to resend the
@@ -70,12 +70,15 @@ class OutboundClientMessageJobHelper {
*
* This is called from OCMOSJ
*
* @param tagsToSendOverride if > 0, use this instead of skm's default
* @param lowTagsOverride if > 0, use this instead of skm's default
* @param wrappedKey output parameter that will be filled with the sessionKey used
* @param wrappedTags output parameter that will be filled with the sessionTags used
* @return garlic, or null if no tunnels were found (or other errors)
*/
static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK,
PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, SessionKey wrappedKey,
PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel,
int tagsToSendOverride, int lowTagsOverride, SessionKey wrappedKey,
Set<SessionTag> wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) {
GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, from, dest, replyTunnel, requireAck, bundledReplyLeaseSet);
if (config == null)
@@ -84,9 +87,10 @@ class OutboundClientMessageJobHelper {
if (skm == null)
return null;
// no use sending tags unless we have a reply token set up already
int tagsToSend = replyToken >= 0 ? skm.getTagsToSend() : 0;
int tagsToSend = replyToken >= 0 ? (tagsToSendOverride > 0 ? tagsToSendOverride : skm.getTagsToSend()) : 0;
int lowThreshold = lowTagsOverride > 0 ? lowTagsOverride : skm.getLowThreshold();
GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags,
tagsToSend, skm);
tagsToSend, lowThreshold, skm);
return msg;
}

View File

@@ -7,6 +7,7 @@ import java.util.List;
import java.util.Properties;
import java.util.Set;
import net.i2p.client.SendMessageOptions;
import net.i2p.crypto.SessionKeyManager;
import net.i2p.crypto.TagSetHandle;
import net.i2p.data.Certificate;
@@ -135,8 +136,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
// otherwise router config, otherwise default
long overallExpiration = msg.getExpiration();
if (overallExpiration > 0) {
if (overallExpiration < 24*60*60*1000l) {
if (_log.shouldLog(Log.WARN))
_log.warn("Client bug - interval instead of timestamp " + overallExpiration);
overallExpiration += _start;
}
// Unless it's already expired, set a min and max expiration
if (overallExpiration <= _start) {
if (overallExpiration > _start) {
overallExpiration = Math.max(overallExpiration, _start + OVERALL_TIMEOUT_MS_MIN);
overallExpiration = Math.min(overallExpiration, _start + OVERALL_TIMEOUT_MS_DEFAULT);
if (_log.shouldLog(Log.INFO))
@@ -430,19 +436,23 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
boolean shouldRequestReply = lastReplyRequestSent == null ||
lastReplyRequestSent.longValue() < now - REPLY_REQUEST_INTERVAL;
int sendFlags = _clientMessage.getFlags();
// Per-message flag > 0 overrides per-session option
int tagsRequired = SendMessageOptions.getTagThreshold(sendFlags);
boolean wantACK = _wantACK ||
shouldRequestReply ||
// TODO: check the per-message flags also
GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(), _from.calculateHash());
GarlicMessageBuilder.needsTags(getContext(), _leaseSet.getEncryptionKey(),
_from.calculateHash(), tagsRequired);
PublicKey key = _leaseSet.getEncryptionKey();
SessionKey sessKey = new SessionKey();
Set<SessionTag> tags = new HashSet();
LeaseSet replyLeaseSet;
// TODO: check the per-message flags also
// Per-message flag == false overrides session option which is default true
String allow = _clientMessage.getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET);
boolean allowLeaseBundle = allow == null || Boolean.valueOf(allow).booleanValue();
boolean allowLeaseBundle = SendMessageOptions.getSendLeaseSet(sendFlags) &&
(allow == null || Boolean.valueOf(allow).booleanValue());
if (allowLeaseBundle) {
// If we want an ack, bundle a leaseSet...
//replyLeaseSet = getReplyLeaseSet(wantACK);
@@ -473,11 +483,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(getJobId() + ": Clove built to " + _toString);
long msgExpiration = _overallExpiration; // getContext().clock().now() + OVERALL_TIMEOUT_MS_DEFAULT;
// Per-message flag > 0 overrides per-session option
int tagsToSend = SendMessageOptions.getTagsToSend(sendFlags);
GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token,
msgExpiration, key,
_clove, _from.calculateHash(),
_to, _inTunnel,
sessKey, tags,
_to, _inTunnel, tagsToSend,
tagsRequired, sessKey, tags,
wantACK, replyLeaseSet);
if (msg == null) {
// set to null if there are no tunnels to ack the reply back through