diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index e841160d6bf96eb7b11d728b58869d35d3dd8bb7..6dd63a6abdea5d4b67e679d161b2cf0129c111bf 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -11,8 +11,8 @@ package net.i2p.data.i2np; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.I2PAppContext; import net.i2p.data.DataFormatException; @@ -39,10 +39,11 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM private static final boolean RAW_FULL_SIZE = false; - /** unsynchronized as its pretty much read only (except at startup) */ - private static final Map _builders = new HashMap(8); + /** unused */ + private static final Map<Integer, Builder> _builders = new ConcurrentHashMap(1); + /** @deprecated unused */ public static final void registerBuilder(Builder builder, int type) { _builders.put(Integer.valueOf(type), builder); } - /** interface for extending the types of messages handled */ + /** interface for extending the types of messages handled - unused */ public interface Builder { /** instantiate a new I2NPMessage to be populated shortly */ public I2NPMessage build(I2PAppContext ctx); @@ -385,20 +386,22 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM case DataMessage.MESSAGE_TYPE: return new DataMessage(context); // unused since 0.6.1.10 - //case TunnelCreateMessage.MESSAGE_TYPE: - // return new TunnelCreateMessage(context); - //case TunnelCreateStatusMessage.MESSAGE_TYPE: - // return new TunnelCreateStatusMessage(context); case TunnelBuildMessage.MESSAGE_TYPE: return new TunnelBuildMessage(context); case TunnelBuildReplyMessage.MESSAGE_TYPE: return new TunnelBuildReplyMessage(context); + // since 0.7.10 + case VariableTunnelBuildMessage.MESSAGE_TYPE: + return new VariableTunnelBuildMessage(context); + // since 0.7.10 + case VariableTunnelBuildReplyMessage.MESSAGE_TYPE: + return new VariableTunnelBuildReplyMessage(context); default: - Builder builder = (Builder)_builders.get(Integer.valueOf(type)); - if (builder == null) - return null; - else + // unused + Builder builder = _builders.get(Integer.valueOf(type)); + if (builder != null) return builder.build(context); + return new UnknownI2NPMessage(context, type); } } } diff --git a/router/java/src/net/i2p/data/i2np/TunnelBuildMessage.java b/router/java/src/net/i2p/data/i2np/TunnelBuildMessage.java index 17bada1c196a18c9eef6c75219cca1c53ba9980d..ab880031f340a224425d26cad0776b3425e102ca 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelBuildMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelBuildMessage.java @@ -9,18 +9,30 @@ import net.i2p.data.ByteArray; * */ public class TunnelBuildMessage extends I2NPMessageImpl { - private ByteArray _records[]; + protected ByteArray _records[]; + protected int RECORD_COUNT; + public static final int MAX_RECORD_COUNT = 8; public static final int MESSAGE_TYPE = 21; - public static final int RECORD_COUNT = 8; public TunnelBuildMessage(I2PAppContext context) { + this(context, MAX_RECORD_COUNT); + } + + /** @since 0.7.10 */ + protected TunnelBuildMessage(I2PAppContext context, int records) { super(context); - _records = new ByteArray[RECORD_COUNT]; + if (records > 0) { + RECORD_COUNT = records; + _records = new ByteArray[records]; + } + // else will be initialized by readMessage() in VTBM } public void setRecord(int index, ByteArray record) { _records[index] = record; } public ByteArray getRecord(int index) { return _records[index]; } + /** @since 0.7.10 */ + public int getRecordCount() { return RECORD_COUNT; } public static final int RECORD_SIZE = 512+16; @@ -50,4 +62,9 @@ public class TunnelBuildMessage extends I2NPMessageImpl { } return curIndex; } + + @Override + public String toString() { + return "[TunnelBuildMessage]"; + } } diff --git a/router/java/src/net/i2p/data/i2np/TunnelBuildReplyMessage.java b/router/java/src/net/i2p/data/i2np/TunnelBuildReplyMessage.java index 737acd69a704e24182c9c300e21bd5b5025ce194..7a0fc8563794b8df81f1329e6d056909e80c86b8 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelBuildReplyMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelBuildReplyMessage.java @@ -10,18 +10,30 @@ import net.i2p.data.ByteArray; * reply tunnel */ public class TunnelBuildReplyMessage extends I2NPMessageImpl { - private ByteArray _records[]; + protected ByteArray _records[]; + protected int RECORD_COUNT; + public static final int MAX_RECORD_COUNT = TunnelBuildMessage.MAX_RECORD_COUNT; public static final int MESSAGE_TYPE = 22; - public static final int RECORD_COUNT = TunnelBuildMessage.RECORD_COUNT; public TunnelBuildReplyMessage(I2PAppContext context) { + this(context, MAX_RECORD_COUNT); + } + + /** @since 0.7.10 */ + protected TunnelBuildReplyMessage(I2PAppContext context, int records) { super(context); - _records = new ByteArray[RECORD_COUNT]; + if (records > 0) { + RECORD_COUNT = records; + _records = new ByteArray[records]; + } + // else will be initialized by readMessage() in VTBRM } public void setRecord(int index, ByteArray record) { _records[index] = record; } public ByteArray getRecord(int index) { return _records[index]; } + /** @since 0.7.10 */ + public int getRecordCount() { return RECORD_COUNT; } public static final int RECORD_SIZE = TunnelBuildMessage.RECORD_SIZE; @@ -53,4 +65,9 @@ public class TunnelBuildReplyMessage extends I2NPMessageImpl { } return curIndex; } + + @Override + public String toString() { + return "[TunnelBuildReplyMessage]"; + } } diff --git a/router/java/src/net/i2p/data/i2np/UnknownI2NPMessage.java b/router/java/src/net/i2p/data/i2np/UnknownI2NPMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..dc18cfaa9815449653254dc3ddc3cafdc2a766aa --- /dev/null +++ b/router/java/src/net/i2p/data/i2np/UnknownI2NPMessage.java @@ -0,0 +1,114 @@ +package net.i2p.data.i2np; +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.IOException; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; + +/** + * This is the same as DataMessage but with a variable message type. + * This is defined so routers can route messages they don't know about. + * We don't extend DataMessage so that any code that does (instanceof DataMessage) + * won't return true for this type. Load tests use DataMessage, for example. + * See InboundMessageDistributor. + * + * There is no setData() method, the only way to create one of these is to + * read it with readMessage() (i.e., it came from some other router) + * + * @since 0.7.12 + */ +public class UnknownI2NPMessage extends I2NPMessageImpl { + private byte _data[]; + private int _type; + + /** @param type 0-255 */ + public UnknownI2NPMessage(I2PAppContext context, int type) { + super(context); + _type = type; + } + + /** warning - only public for equals() */ + public byte[] getData() { + verifyUnwritten(); + return _data; + } + + public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException { + if (type != _type) throw new I2NPMessageException("Message type is incorrect for this message"); + int curIndex = offset; + long size = DataHelper.fromLong(data, curIndex, 4); + curIndex += 4; + if (size > MAX_SIZE) + throw new I2NPMessageException("wtf, size=" + size); + _data = new byte[(int)size]; + System.arraycopy(data, curIndex, _data, 0, (int)size); + } + + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + if (_data == null) + return 4; + else + return 4 + _data.length; + } + + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) { + verifyUnwritten(); + if (_data == null) { + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + } else { + byte len[] = DataHelper.toLong(4, _data.length); + System.arraycopy(len, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_data, 0, out, curIndex, _data.length); + curIndex += _data.length; + } + return curIndex; + } + + @Override + protected void written() { + super.written(); + _data = null; + } + + /** @return 0-255 */ + public int getType() { return _type; } + + @Override + public int hashCode() { + return _type + DataHelper.hashCode(getData()); + } + + @Override + public boolean equals(Object object) { + if ( (object != null) && (object instanceof UnknownI2NPMessage) ) { + UnknownI2NPMessage msg = (UnknownI2NPMessage)object; + return _type == msg.getType() && DataHelper.eq(getData(), msg.getData()); + } else { + return false; + } + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("[UnknownI2NPMessage: "); + buf.append("\n\tType: ").append(_type); + buf.append("\n\tLength: ").append(calculateWrittenLength() - 4); + buf.append("]"); + return buf.toString(); + } +} diff --git a/router/java/src/net/i2p/data/i2np/VariableTunnelBuildMessage.java b/router/java/src/net/i2p/data/i2np/VariableTunnelBuildMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..cd196c2011dacb95b8dde0f731fffeaa15138924 --- /dev/null +++ b/router/java/src/net/i2p/data/i2np/VariableTunnelBuildMessage.java @@ -0,0 +1,69 @@ +package net.i2p.data.i2np; + +import java.io.IOException; + +import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; +import net.i2p.data.DataHelper; + +/** + * @since 0.7.12 + */ +public class VariableTunnelBuildMessage extends TunnelBuildMessage { + public static final int MESSAGE_TYPE = 23; + + /** zero record count, will be set with readMessage() */ + public VariableTunnelBuildMessage(I2PAppContext context) { + super(context, 0); + } + + public VariableTunnelBuildMessage(I2PAppContext context, int records) { + super(context, records); + } + + @Override + protected int calculateWrittenLength() { return 1 + super.calculateWrittenLength(); } + + @Override + public int getType() { return MESSAGE_TYPE; } + + @Override + public void readMessage(byte[] data, int offset, int dataSize, int type) throws I2NPMessageException, IOException { + if (type != MESSAGE_TYPE) + throw new I2NPMessageException("Message type is incorrect for this message"); + int r = (int)DataHelper.fromLong(data, offset, 1); + if (r <= 0 || r > MAX_RECORD_COUNT) + throw new I2NPMessageException("Bad record count " + r); + RECORD_COUNT = r; + if (dataSize != calculateWrittenLength()) + throw new I2NPMessageException("Wrong length (expects " + calculateWrittenLength() + ", recv " + dataSize + ")"); + _records = new ByteArray[RECORD_COUNT]; + super.readMessage(data, offset + 1, dataSize, TunnelBuildMessage.MESSAGE_TYPE); + } + + @Override + protected int writeMessageBody(byte[] out, int curIndex) throws I2NPMessageException { + int remaining = out.length - (curIndex + calculateWrittenLength()); + if (remaining < 0) + throw new I2NPMessageException("Not large enough (too short by " + remaining + ")"); + if (RECORD_COUNT <= 0 || RECORD_COUNT > MAX_RECORD_COUNT) + throw new I2NPMessageException("Bad record count " + RECORD_COUNT); + DataHelper.toLong(out, curIndex++, 1, RECORD_COUNT); + // can't call super, written length check will fail + //return super.writeMessageBody(out, curIndex + 1); + for (int i = 0; i < RECORD_COUNT; i++) { + System.arraycopy(_records[i].getData(), _records[i].getOffset(), out, curIndex, RECORD_SIZE); + curIndex += RECORD_SIZE; + } + return curIndex; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(64); + buf.append("[VariableTunnelBuildMessage: " + + "\n\tRecords: ").append(getRecordCount()) + .append(']'); + return buf.toString(); + } +} diff --git a/router/java/src/net/i2p/data/i2np/VariableTunnelBuildReplyMessage.java b/router/java/src/net/i2p/data/i2np/VariableTunnelBuildReplyMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..bd256f35ce1d78f875262b432715a3a521383d39 --- /dev/null +++ b/router/java/src/net/i2p/data/i2np/VariableTunnelBuildReplyMessage.java @@ -0,0 +1,70 @@ +package net.i2p.data.i2np; + +import java.io.IOException; + +import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; +import net.i2p.data.DataHelper; + +/** + * Transmitted from the new outbound endpoint to the creator through a + * reply tunnel + * + * @since 0.7.12 + */ +public class VariableTunnelBuildReplyMessage extends TunnelBuildReplyMessage { + public static final int MESSAGE_TYPE = 24; + + /** zero record count, will be set with readMessage() */ + public VariableTunnelBuildReplyMessage(I2PAppContext context) { + super(context, 0); + } + + public VariableTunnelBuildReplyMessage(I2PAppContext context, int records) { + super(context, records); + } + + @Override + protected int calculateWrittenLength() { return 1 + super.calculateWrittenLength(); } + + @Override + public int getType() { return MESSAGE_TYPE; } + + public void readMessage(byte[] data, int offset, int dataSize, int type) throws I2NPMessageException, IOException { + if (type != MESSAGE_TYPE) + throw new I2NPMessageException("Message type is incorrect for this message"); + int r = (int)DataHelper.fromLong(data, offset, 1); + if (r <= 0 || r > MAX_RECORD_COUNT) + throw new I2NPMessageException("Bad record count " + r); + RECORD_COUNT = r; + if (dataSize != calculateWrittenLength()) + throw new I2NPMessageException("Wrong length (expects " + calculateWrittenLength() + ", recv " + dataSize + ")"); + _records = new ByteArray[RECORD_COUNT]; + super.readMessage(data, offset + 1, dataSize, TunnelBuildReplyMessage.MESSAGE_TYPE); + } + + protected int writeMessageBody(byte[] out, int curIndex) throws I2NPMessageException { + int remaining = out.length - (curIndex + calculateWrittenLength()); + if (remaining < 0) + throw new I2NPMessageException("Not large enough (too short by " + remaining + ")"); + if (RECORD_COUNT <= 0 || RECORD_COUNT > MAX_RECORD_COUNT) + throw new I2NPMessageException("Bad record count " + RECORD_COUNT); + DataHelper.toLong(out, curIndex++, 1, RECORD_COUNT); + // can't call super, written length check will fail + //return super.writeMessageBody(out, curIndex + 1); + for (int i = 0; i < RECORD_COUNT; i++) { + System.arraycopy(_records[i].getData(), _records[i].getOffset(), out, curIndex, RECORD_SIZE); + curIndex += RECORD_SIZE; + } + return curIndex; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(64); + buf.append("[VariableTunnelBuildReplyMessage: " + + "\n\tRecords: ").append(getRecordCount()) + .append(']'); + return buf.toString(); + } +} diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index d3194095669c27ef3ed0160bb8398222b8a1a1af..267556b36b33ba9fb82c5a1b0cb37fb300a667ed 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -42,6 +42,9 @@ public class InNetMessagePool implements Service { private boolean _alive; private boolean _dispatchThreaded; + /** Make this >= the max I2NP message type number (currently 24) */ + private static final int MAX_I2NP_MESSAGE_TYPE = 31; + /** * If set to true, we will have two additional threads - one for dispatching * tunnel data messages, and another for dispatching tunnel gateway messages. @@ -62,8 +65,7 @@ public class InNetMessagePool implements Service { public InNetMessagePool(RouterContext context) { _context = context; - // 32 is greater than the max I2NP message type number (currently 22) + 1 - _handlerJobBuilders = new HandlerJobBuilder[32]; + _handlerJobBuilders = new HandlerJobBuilder[MAX_I2NP_MESSAGE_TYPE + 1]; if (DISPATCH_DIRECT) { // keep the compiler happy since they are final _pendingDataMessages = null; @@ -160,6 +162,7 @@ public class InNetMessagePool implements Service { shortCircuitTunnelData(messageBody, fromRouterHash); allowMatches = false; } else { + // why don't we allow type 0? There used to be a message of type 0 long ago... if ( (type > 0) && (type < _handlerJobBuilders.length) ) { HandlerJobBuilder builder = _handlerJobBuilders[type]; diff --git a/router/java/src/net/i2p/router/tunnel/BuildMessageGenerator.java b/router/java/src/net/i2p/router/tunnel/BuildMessageGenerator.java index ec33678e9a05eb76f79512a591d6ab3af7f0e372..da169db7517f3125aa930af617244f605ac2dfbc 100644 --- a/router/java/src/net/i2p/router/tunnel/BuildMessageGenerator.java +++ b/router/java/src/net/i2p/router/tunnel/BuildMessageGenerator.java @@ -22,18 +22,24 @@ import net.i2p.util.Log; */ public class BuildMessageGenerator { // cached, rather than creating lots of temporary Integer objects whenever we build a tunnel - public static final Integer ORDER[] = new Integer[TunnelBuildMessage.RECORD_COUNT]; + public static final Integer ORDER[] = new Integer[TunnelBuildMessage.MAX_RECORD_COUNT]; static { for (int i = 0; i < ORDER.length; i++) ORDER[i] = Integer.valueOf(i); } /** return null if it is unable to find a router's public key (etc) */ +/**** public TunnelBuildMessage createInbound(RouterContext ctx, TunnelCreatorConfig cfg) { return create(ctx, cfg, null, -1); } +****/ + /** return null if it is unable to find a router's public key (etc) */ +/**** public TunnelBuildMessage createOutbound(RouterContext ctx, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel) { return create(ctx, cfg, replyRouter, replyTunnel); } +****/ +/**** private TunnelBuildMessage create(RouterContext ctx, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel) { TunnelBuildMessage msg = new TunnelBuildMessage(ctx); List order = new ArrayList(ORDER.length); @@ -50,14 +56,15 @@ public class BuildMessageGenerator { layeredEncrypt(ctx, msg, cfg, order); return msg; } +****/ /** * Place the asymmetrically encrypted record in the specified record slot, * containing the hop's configuration (as well as the reply info, if it is an outbound endpoint) */ - public void createRecord(int recordNum, int hop, TunnelBuildMessage msg, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel, I2PAppContext ctx, PublicKey peerKey) { + public static void createRecord(int recordNum, int hop, TunnelBuildMessage msg, TunnelCreatorConfig cfg, Hash replyRouter, long replyTunnel, I2PAppContext ctx, PublicKey peerKey) { byte encrypted[] = new byte[TunnelBuildMessage.RECORD_SIZE]; - Log log = ctx.logManager().getLog(getClass()); + Log log = ctx.logManager().getLog(BuildMessageGenerator.class); if (peerKey != null) { BuildRequestRecord req = null; if ( (!cfg.isInbound()) && (hop + 1 == cfg.getLength()) ) //outbound endpoint @@ -79,7 +86,7 @@ public class BuildMessageGenerator { msg.setRecord(recordNum, new ByteArray(encrypted)); } - private BuildRequestRecord createUnencryptedRecord(I2PAppContext ctx, TunnelCreatorConfig cfg, int hop, Hash replyRouter, long replyTunnel) { + private static BuildRequestRecord createUnencryptedRecord(I2PAppContext ctx, TunnelCreatorConfig cfg, int hop, Hash replyRouter, long replyTunnel) { Log log = ctx.logManager().getLog(BuildMessageGenerator.class); if (hop < cfg.getLength()) { // ok, now lets fill in some data @@ -143,10 +150,10 @@ public class BuildMessageGenerator { * Encrypt the records so their hop ident is visible at the appropriate times * @param order list of hop #s as Integers. For instance, if (order.get(1) is 4), it is peer cfg.getPeer(4) */ - public void layeredEncrypt(I2PAppContext ctx, TunnelBuildMessage msg, TunnelCreatorConfig cfg, List order) { + public static void layeredEncrypt(I2PAppContext ctx, TunnelBuildMessage msg, TunnelCreatorConfig cfg, List order) { Log log = ctx.logManager().getLog(BuildMessageGenerator.class); // encrypt the records so that the right elements will be visible at the right time - for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) { + for (int i = 0; i < msg.getRecordCount(); i++) { ByteArray rec = msg.getRecord(i); Integer hopNum = (Integer)order.get(i); int hop = hopNum.intValue(); diff --git a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java index ab6aa4cab35814637a92dc7c08cab8562ec8581c..fddccd52f308cfbcbddbd61a3e4c44cb428b4037 100644 --- a/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java +++ b/router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java @@ -43,7 +43,7 @@ public class BuildMessageProcessor { long totalEq = 0; long totalDup = 0; long beforeLoop = System.currentTimeMillis(); - for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) { + for (int i = 0; i < msg.getRecordCount(); i++) { ByteArray rec = msg.getRecord(i); int off = rec.getOffset(); int len = BuildRequestRecord.PEER_SIZE; @@ -87,7 +87,7 @@ public class BuildMessageProcessor { SessionKey replyKey = rv.readReplyKey(); byte iv[] = rv.readReplyIV(); int ivOff = 0; - for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) { + for (int i = 0; i < msg.getRecordCount(); i++) { if (i != ourHop) { ByteArray data = msg.getRecord(i); if (log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/tunnel/BuildReplyHandler.java b/router/java/src/net/i2p/router/tunnel/BuildReplyHandler.java index 539d2d01ee525ea3286eef7ca2ef76ff6dd4eca3..e2c4653ee47d3a869458b3457ae6202ae2e5da02 100644 --- a/router/java/src/net/i2p/router/tunnel/BuildReplyHandler.java +++ b/router/java/src/net/i2p/router/tunnel/BuildReplyHandler.java @@ -17,7 +17,6 @@ import net.i2p.util.Log; * */ public class BuildReplyHandler { - public BuildReplyHandler() {} /** * Decrypt the tunnel build reply records. This overwrites the contents of the reply @@ -25,11 +24,16 @@ public class BuildReplyHandler { * @return status for the records (in record order), or null if the replies were not valid. Fake records * always have 0 as their value */ - public int[] decrypt(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, List recordOrder) { - Log log = ctx.logManager().getLog(getClass()); - int rv[] = new int[TunnelBuildReplyMessage.RECORD_COUNT]; + public static int[] decrypt(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, List<Integer> recordOrder) { + Log log = ctx.logManager().getLog(BuildReplyHandler.class); + if (reply.getRecordCount() != recordOrder.size()) { + // somebody messed with us + log.error("Corrupted build reply, expected " + recordOrder.size() + " records, got " + reply.getRecordCount()); + return null; + } + int rv[] = new int[reply.getRecordCount()]; for (int i = 0; i < rv.length; i++) { - int hop = ((Integer)recordOrder.get(i)).intValue(); + int hop = recordOrder.get(i).intValue(); if (BuildMessageGenerator.isBlank(cfg, hop)) { // self... if (log.shouldLog(Log.DEBUG)) @@ -56,8 +60,8 @@ public class BuildReplyHandler { * * @return -1 on decrypt failure */ - private int decryptRecord(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, int recordNum, int hop) { - Log log = ctx.logManager().getLog(getClass()); + private static int decryptRecord(I2PAppContext ctx, TunnelBuildReplyMessage reply, TunnelCreatorConfig cfg, int recordNum, int hop) { + Log log = ctx.logManager().getLog(BuildReplyHandler.class); if (BuildMessageGenerator.isBlank(cfg, hop)) { if (log.shouldLog(Log.DEBUG)) log.debug(reply.getUniqueId() + ": Record " + recordNum + "/" + hop + " is fake, so consider it valid..."); diff --git a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java index 4a9c48dc22ab1c0ce3e15b0c7316638b139feba5..33e9067096b771d9ee2bb9ec1894135a0eaee37f 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/InboundMessageDistributor.java @@ -11,6 +11,7 @@ import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.GarlicMessage; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.TunnelBuildReplyMessage; +import net.i2p.data.i2np.VariableTunnelBuildReplyMessage; import net.i2p.router.ClientMessage; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; @@ -83,7 +84,8 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec // as long as there's no reply token (FVSJ will never set a reply token but an attacker might) ((msg.getType() != DatabaseStoreMessage.MESSAGE_TYPE) || (!_client.equals(((DatabaseStoreMessage)msg).getKey())) || (((DatabaseStoreMessage)msg).getReplyToken() != 0)) && - (msg.getType() != TunnelBuildReplyMessage.MESSAGE_TYPE)) { + (msg.getType() != TunnelBuildReplyMessage.MESSAGE_TYPE) && + (msg.getType() != VariableTunnelBuildReplyMessage.MESSAGE_TYPE)) { // drop it, since we should only get tunnel test messages and garlic messages down // client tunnels _context.statManager().addRateData("tunnel.dropDangerousClientTunnelMessage", 1, msg.getType()); @@ -204,6 +206,7 @@ public class InboundMessageDistributor implements GarlicMessageReceiver.CloveRec return; } case DeliveryInstructions.DELIVERY_MODE_DESTINATION: + // Can we route UnknownI2NPMessages to a destination too? if (!(data instanceof DataMessage)) { if (_log.shouldLog(Log.ERROR)) _log.error("cant send a " + data.getClass().getName() + " to a destination"); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java index c91af69293b3458975dcc56c618626dfb124c9b9..99b612dd271770bb6a668a3f1ce45888ece70395 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java @@ -26,7 +26,7 @@ public class TunnelCreatorConfig implements TunnelInfo { /** gateway first */ private Hash _peers[]; private long _expiration; - private List _order; + private List<Integer> _order; private long _replyMessageId; private boolean _isInbound; private long _messagesProcessed; @@ -54,7 +54,11 @@ public class TunnelCreatorConfig implements TunnelInfo { _failures = 0; } - /** how many hops are there in the tunnel? */ + /** + * How many hops are there in the tunnel? + * INCLUDING US. + * i.e. one more than the TunnelCreatorConfig length. + */ public int getLength() { return _config.length; } public Properties getOptions() { return null; } @@ -91,8 +95,8 @@ public class TunnelCreatorConfig implements TunnelInfo { public void setExpiration(long when) { _expiration = when; } /** component ordering in the new style request */ - public List getReplyOrder() { return _order; } - public void setReplyOrder(List order) { _order = order; } + public List<Integer> getReplyOrder() { return _order; } + public void setReplyOrder(List<Integer> order) { _order = order; } /** new style reply message id */ public long getReplyMessageId() { return _replyMessageId; } public void setReplyMessageId(long id) { _replyMessageId = id; } diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java index 5e4456490f201bf51ef2e2f7b1aea631243ee0de..15ffd117012493f1800c5b5dbb56fbc13d703adc 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildExecutor.java @@ -3,9 +3,12 @@ package net.i2p.router.tunnel.pool; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; +import java.util.Iterator; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.Hash; +import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.RouterInfo; import net.i2p.router.RouterContext; import net.i2p.router.TunnelManagerFacade; @@ -28,17 +31,25 @@ class BuildExecutor implements Runnable { private Log _log; private TunnelPoolManager _manager; /** list of TunnelCreatorConfig elements of tunnels currently being built */ - private final List<PooledTunnelCreatorConfig> _currentlyBuilding; + private final Object _currentlyBuilding; + /** indexed by ptcc.getReplyMessageId() */ + private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _currentlyBuildingMap; + /** indexed by ptcc.getReplyMessageId() */ + private final ConcurrentHashMap<Long, PooledTunnelCreatorConfig> _recentlyBuildingMap; private boolean _isRunning; private BuildHandler _handler; private boolean _repoll; private static final int MAX_CONCURRENT_BUILDS = 10; + /** accept replies up to a minute after we gave up on them */ + private static final long GRACE_PERIOD = 60*1000; public BuildExecutor(RouterContext ctx, TunnelPoolManager mgr) { _context = ctx; _log = ctx.logManager().getLog(getClass()); _manager = mgr; - _currentlyBuilding = new ArrayList(MAX_CONCURRENT_BUILDS); + _currentlyBuilding = new Object(); + _currentlyBuildingMap = new ConcurrentHashMap(MAX_CONCURRENT_BUILDS); + _recentlyBuildingMap = new ConcurrentHashMap(4 * MAX_CONCURRENT_BUILDS); _context.statManager().createRateStat("tunnel.concurrentBuilds", "How many builds are going at once", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _context.statManager().createRateStat("tunnel.concurrentBuildsLagged", "How many builds are going at once when we reject further builds, due to job lag (period is lag)", "Tunnels", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _context.statManager().createRateStat("tunnel.buildExploratoryExpire", "How often an exploratory tunnel times out during creation", "Tunnels", new long[] { 10*60*1000, 60*60*1000 }); @@ -51,6 +62,7 @@ class BuildExecutor implements Runnable { _context.statManager().createRateStat("tunnel.buildRequestZeroHopTime", "How long it takes to build a zero hop tunnel", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.pendingRemaining", "How many inbound requests are pending after a pass (period is how long the pass takes)?", "Tunnels", new long[] { 60*1000, 10*60*1000 }); _context.statManager().createRateStat("tunnel.buildFailFirstHop", "How often we fail to build a OB tunnel because we can't contact the first hop", "Tunnels", new long[] { 60*1000, 10*60*1000 }); + _context.statManager().createRateStat("tunnel.buildReplySlow", "Build reply late, but not too late", "Tunnels", new long[] { 10*60*1000 }); // Get stat manager, get recognized bandwidth tiers StatManager statMgr = _context.statManager(); @@ -78,25 +90,33 @@ class BuildExecutor implements Runnable { if (allowed > MAX_CONCURRENT_BUILDS) allowed = MAX_CONCURRENT_BUILDS; // Never go beyond 10, that is uncharted territory (old limit was 5) allowed = _context.getProperty("router.tunnelConcurrentBuilds", allowed); + // expire any REALLY old requests + long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT - GRACE_PERIOD; + for (Iterator<PooledTunnelCreatorConfig> iter = _recentlyBuildingMap.values().iterator(); iter.hasNext(); ) { + PooledTunnelCreatorConfig cfg = iter.next(); + if (cfg.getExpiration() <= expireBefore) { + iter.remove(); + } + } + + // expire any old requests List<PooledTunnelCreatorConfig> expired = null; int concurrent = 0; // Todo: Make expiration variable - long expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT; - synchronized (_currentlyBuilding) { - // expire any old requests - for (int i = 0; i < _currentlyBuilding.size(); i++) { - PooledTunnelCreatorConfig cfg = _currentlyBuilding.get(i); - if (cfg.getExpiration() <= expireBefore) { - _currentlyBuilding.remove(i); - i--; - if (expired == null) - expired = new ArrayList(); - expired.add(cfg); - } + expireBefore = _context.clock().now() + 10*60*1000 - BuildRequestor.REQUEST_TIMEOUT; + for (Iterator<PooledTunnelCreatorConfig> iter = _currentlyBuildingMap.values().iterator(); iter.hasNext(); ) { + PooledTunnelCreatorConfig cfg = iter.next(); + if (cfg.getExpiration() <= expireBefore) { + // save them for another minute + _recentlyBuildingMap.putIfAbsent(Long.valueOf(cfg.getReplyMessageId()), cfg); + iter.remove(); + if (expired == null) + expired = new ArrayList(); + expired.add(cfg); } - concurrent = _currentlyBuilding.size(); - allowed -= concurrent; } + concurrent = _currentlyBuildingMap.size(); + allowed -= concurrent; if (expired != null) { for (int i = 0; i < expired.size(); i++) { @@ -111,7 +131,7 @@ class BuildExecutor implements Runnable { // Look up peer Hash peer = cfg.getPeer(iPeer); // Avoid recording ourselves - if (peer.toBase64().equals(_context.routerHash().toBase64())) + if (peer.equals(_context.routerHash())) continue; // Look up routerInfo RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer); @@ -303,9 +323,6 @@ class BuildExecutor implements Runnable { } if (_log.shouldLog(Log.DEBUG)) _log.debug("Configuring new tunnel " + i + " for " + pool + ": " + cfg); - synchronized (_currentlyBuilding) { - _currentlyBuilding.add(cfg); - } buildTunnel(pool, cfg); realBuilt++; @@ -400,9 +417,6 @@ class BuildExecutor implements Runnable { if (cfg != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Configuring short tunnel " + i + " for " + pool + ": " + cfg); - synchronized (_currentlyBuilding) { - _currentlyBuilding.add(cfg); - } buildTunnel(pool, cfg); if (cfg.getLength() > 1) { allowed--; // oops... shouldn't have done that, but hey, its not that bad... @@ -422,6 +436,15 @@ class BuildExecutor implements Runnable { void buildTunnel(TunnelPool pool, PooledTunnelCreatorConfig cfg) { long beforeBuild = System.currentTimeMillis(); + if (cfg.getLength() > 1) { + // should we allow an ID of 0? + cfg.setReplyMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + if (addToBuilding(cfg)) { + _log.error("Dup reply ID: " + cfg.getReplyMessageId()); + // fail + return; + } + } BuildRequestor.request(_context, pool, cfg, this); long buildTime = System.currentTimeMillis() - beforeBuild; if (cfg.getLength() <= 1) @@ -445,8 +468,9 @@ class BuildExecutor implements Runnable { if (_log.shouldLog(Log.DEBUG)) _log.debug("Build complete for " + cfg); pool.buildComplete(cfg); + if (cfg.getLength() > 1) + removeFromBuilding(cfg.getReplyMessageId()); synchronized (_currentlyBuilding) { - _currentlyBuilding.remove(cfg); _currentlyBuilding.notifyAll(); } @@ -479,6 +503,41 @@ class BuildExecutor implements Runnable { _log.info(tunnel + ": Peer " + peer.toBase64() + " did not reply to the tunnel join request"); } - List locked_getCurrentlyBuilding() { return _currentlyBuilding; } + /** + * Only do this for non-fallback tunnels. + * @return true if refused because of a duplicate key + */ + private boolean addToBuilding(PooledTunnelCreatorConfig cfg) { + //_log.error("Adding ID: " + cfg.getReplyMessageId() + "; size was: " + _currentlyBuildingMap.size()); + return _currentlyBuildingMap.putIfAbsent(Long.valueOf(cfg.getReplyMessageId()), cfg) != null; + } + + /** + * This returns the PTCC up to a minute after it 'expired', thus allowing us to + * still use a tunnel if it was accepted, and to update peer stats. + * This means that manager.buildComplete() could be called more than once, and + * a build can be failed or successful after it was timed out, + * which will affect the stats and profiles. + * But that's ok. A peer that rejects slowly gets penalized twice, for example. + * + * @return ptcc or null + */ + PooledTunnelCreatorConfig removeFromBuilding(long id) { + //_log.error("Removing ID: " + id + "; size was: " + _currentlyBuildingMap.size()); + Long key = Long.valueOf(id); + PooledTunnelCreatorConfig rv = _currentlyBuildingMap.remove(key); + if (rv != null) + return rv; + rv = _recentlyBuildingMap.remove(key); + if (rv != null) { + long requestedOn = rv.getExpiration() - 10*60*1000; + long rtt = _context.clock().now() - requestedOn; + _context.statManager().addRateData("tunnel.buildReplySlow", rtt, 0); + if (_log.shouldLog(Log.WARN)) + _log.warn("Got reply late (rtt = " + rtt + ") for: " + rv); + } + return rv; + } + public int getInboundBuildQueueSize() { return _handler.getInboundBuildQueueSize(); } } diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index 971b20a5455487dc6dee0b9984c7181d2ae99b53..01eb674f4dbaad73883f716936c557f673f9da54 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -16,6 +16,8 @@ import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.TunnelBuildMessage; import net.i2p.data.i2np.TunnelBuildReplyMessage; import net.i2p.data.i2np.TunnelGatewayMessage; +import net.i2p.data.i2np.VariableTunnelBuildMessage; +import net.i2p.data.i2np.VariableTunnelBuildReplyMessage; import net.i2p.router.HandlerJobBuilder; import net.i2p.router.Job; import net.i2p.router.JobImpl; @@ -87,12 +89,18 @@ class BuildHandler { _context.statManager().createRateStat("tunnel.receiveRejectionTransient", "How often we are rejected due to transient overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("tunnel.receiveRejectionBandwidth", "How often we are rejected due to bandwidth overload?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); _context.statManager().createRateStat("tunnel.receiveRejectionCritical", "How often we are rejected due to critical failure?", "Tunnels", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + + _context.statManager().createRateStat("tunnel.corruptBuildReply", "", "Tunnels", new long[] { 24*60*60*1000l }); _processor = new BuildMessageProcessor(ctx); _buildMessageHandlerJob = new TunnelBuildMessageHandlerJob(ctx); _buildReplyMessageHandlerJob = new TunnelBuildReplyMessageHandlerJob(ctx); - ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildMessage.MESSAGE_TYPE, new TunnelBuildMessageHandlerJobBuilder()); - ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildReplyMessage.MESSAGE_TYPE, new TunnelBuildReplyMessageHandlerJobBuilder()); + TunnelBuildMessageHandlerJobBuilder tbmhjb = new TunnelBuildMessageHandlerJobBuilder(); + TunnelBuildReplyMessageHandlerJobBuilder tbrmhjb = new TunnelBuildReplyMessageHandlerJobBuilder(); + ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildMessage.MESSAGE_TYPE, tbmhjb); + ctx.inNetMessagePool().registerHandlerJobBuilder(TunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb); + ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildMessage.MESSAGE_TYPE, tbmhjb); + ctx.inNetMessagePool().registerHandlerJobBuilder(VariableTunnelBuildReplyMessage.MESSAGE_TYPE, tbrmhjb); } private static final int MAX_HANDLE_AT_ONCE = 2; @@ -219,28 +227,13 @@ class BuildHandler { private void handleReply(BuildReplyMessageState state) { // search through the tunnels for a reply long replyMessageId = state.msg.getUniqueId(); - PooledTunnelCreatorConfig cfg = null; - List building = _exec.locked_getCurrentlyBuilding(); + PooledTunnelCreatorConfig cfg = _exec.removeFromBuilding(replyMessageId); StringBuilder buf = null; - synchronized (building) { - for (int i = 0; i < building.size(); i++) { - PooledTunnelCreatorConfig cur = (PooledTunnelCreatorConfig)building.get(i); - if (cur.getReplyMessageId() == replyMessageId) { - building.remove(i); - cfg = cur; - break; - } - } - if ( (cfg == null) && (_log.shouldLog(Log.DEBUG)) ) - buf = new StringBuilder(building.toString()); - } if (cfg == null) { // cannot handle - not pending... took too long? if (_log.shouldLog(Log.WARN)) _log.warn("The reply " + replyMessageId + " did not match any pending tunnels"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Pending tunnels: " + buf.toString()); _context.statManager().addRateData("tunnel.buildReplyTooSlow", 1, 0); } else { handleReply(state.msg, cfg, System.currentTimeMillis()-state.recvTime); @@ -253,14 +246,19 @@ class BuildHandler { if (_log.shouldLog(Log.INFO)) _log.info(msg.getUniqueId() + ": Handling the reply after " + rtt + ", delayed " + delay + " waiting for " + cfg); - BuildReplyHandler handler = new BuildReplyHandler(); - List order = cfg.getReplyOrder(); - int statuses[] = handler.decrypt(_context, msg, cfg, order); + List<Integer> order = cfg.getReplyOrder(); + int statuses[] = BuildReplyHandler.decrypt(_context, msg, cfg, order); if (statuses != null) { boolean allAgree = true; // For each peer in the tunnel for (int i = 0; i < cfg.getLength(); i++) { Hash peer = cfg.getPeer(i); + // If this tunnel member is us, skip this record, don't update profile or stats + // for ourselves, we always agree + // Why must we save a slot for ourselves anyway? + if (peer.equals(_context.routerHash())) + continue; + int record = order.indexOf(Integer.valueOf(i)); if (record < 0) { _log.error("Bad status index " + i); @@ -268,9 +266,9 @@ class BuildHandler { _exec.buildComplete(cfg, cfg.getTunnelPool()); return; } + int howBad = statuses[record]; - // If this tunnel member isn't ourselves - if (!peer.toBase64().equals(_context.routerHash().toBase64())) { + // Look up routerInfo RouterInfo ri = _context.netDb().lookupRouterInfoLocally(peer); // Default and detect bandwidth tier @@ -285,7 +283,6 @@ class BuildHandler { } if (_log.shouldLog(Log.INFO)) _log.info(msg.getUniqueId() + ": Peer " + peer.toBase64() + " replied with status " + howBad); - } if (howBad == 0) { // w3wt @@ -338,6 +335,7 @@ class BuildHandler { } else { if (_log.shouldLog(Log.WARN)) _log.warn(msg.getUniqueId() + ": Tunnel reply could not be decrypted for tunnel " + cfg); + _context.statManager().addRateData("tunnel.corruptBuildReply", 1, 0); // don't leak _exec.buildComplete(cfg, cfg.getTunnelPool()); } @@ -403,8 +401,13 @@ class BuildHandler { * This request is actually a reply, process it as such */ private void handleRequestAsInboundEndpoint(BuildEndMessageState state) { - TunnelBuildReplyMessage msg = new TunnelBuildReplyMessage(_context); - for (int i = 0; i < TunnelBuildMessage.RECORD_COUNT; i++) + int records = state.msg.getRecordCount(); + TunnelBuildReplyMessage msg; + if (records == TunnelBuildMessage.MAX_RECORD_COUNT) + msg = new TunnelBuildReplyMessage(_context); + else + msg = new VariableTunnelBuildReplyMessage(_context, records); + for (int i = 0; i < records; i++) msg.setRecord(i, state.msg.getRecord(i)); msg.setUniqueId(state.msg.getUniqueId()); handleReply(msg, state.cfg, System.currentTimeMillis() - state.recvTime); @@ -490,7 +493,6 @@ class BuildHandler { * If we did credit the reply to the tunnel, it would * prevent the classification of the tunnel as 'inactive' on tunnels.jsp. */ - @SuppressWarnings("static-access") private void handleReq(RouterInfo nextPeerInfo, BuildMessageState state, BuildRequestRecord req, Hash nextPeer) { long ourId = req.readReceiveTunnelId(); long nextId = req.readNextTunnelId(); @@ -613,7 +615,8 @@ class BuildHandler { } byte reply[] = BuildResponseRecord.create(_context, response, req.readReplyKey(), req.readReplyIV(), state.msg.getUniqueId()); - for (int j = 0; j < TunnelBuildMessage.RECORD_COUNT; j++) { + int records = state.msg.getRecordCount(); + for (int j = 0; j < records; j++) { if (state.msg.getRecord(j) == null) { ourSlot = j; state.msg.setRecord(j, new ByteArray(reply)); @@ -648,9 +651,12 @@ class BuildHandler { } else { // send it to the reply tunnel on the reply peer within a new TunnelBuildReplyMessage // (enough layers jrandom?) - TunnelBuildReplyMessage replyMsg = new TunnelBuildReplyMessage(_context); - /* FIXME Accessing static field "RECORD_COUNT" FIXME */ - for (int i = 0; i < state.msg.RECORD_COUNT; i++) + TunnelBuildReplyMessage replyMsg; + if (records == TunnelBuildMessage.MAX_RECORD_COUNT) + replyMsg = new TunnelBuildReplyMessage(_context); + else + replyMsg = new VariableTunnelBuildReplyMessage(_context, records); + for (int i = 0; i < records; i++) replyMsg.setRecord(i, state.msg.getRecord(i)); replyMsg.setUniqueId(req.readReplyMessageId()); replyMsg.setMessageExpiration(_context.clock().now() + 10*1000); @@ -693,28 +699,16 @@ class BuildHandler { // need to figure out if this is a reply to an inbound tunnel request (where we are the // endpoint, receiving the request at the last hop) long reqId = receivedMessage.getUniqueId(); - PooledTunnelCreatorConfig cfg = null; - List building = _exec.locked_getCurrentlyBuilding(); - List ids = new ArrayList(); - synchronized (building) { - for (int i = 0; i < building.size(); i++) { - PooledTunnelCreatorConfig cur = (PooledTunnelCreatorConfig)building.get(i); - ids.add(new Long(cur.getReplyMessageId())); - if ( (cur.isInbound()) && (cur.getReplyMessageId() == reqId) ) { - building.remove(i); - cfg = cur; - break; - } else if (cur.getReplyMessageId() == reqId) { - _log.error("received it, but its not inbound? " + cur); - } - } - } + PooledTunnelCreatorConfig cfg = _exec.removeFromBuilding(reqId); if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive tunnel build message " + reqId + " from " + (from != null ? from.calculateHash().toBase64() : fromHash != null ? fromHash.toBase64() : "tunnels") - + ", waiting ids: " + ids + ", found matching tunnel? " + (cfg != null), - null);//new Exception("source")); + + ", found matching tunnel? " + (cfg != null)); if (cfg != null) { + if (!cfg.isInbound()) { + // shouldnt happen - should we put it back? + _log.error("received it, but its not inbound? " + cfg); + } BuildEndMessageState state = new BuildEndMessageState(cfg, receivedMessage); if (HANDLE_REPLIES_INLINE) { handleRequestAsInboundEndpoint(state); diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java index 5340dfa1afd390d59faa92ddc31aa1641079926f..d6c3e0a5f3f276847152be2bf46b5d44837b72ca 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java @@ -12,18 +12,20 @@ import net.i2p.data.RouterInfo; import net.i2p.data.TunnelId; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.TunnelBuildMessage; +import net.i2p.data.i2np.VariableTunnelBuildMessage; import net.i2p.router.JobImpl; import net.i2p.router.OutNetMessage; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; import net.i2p.router.tunnel.BuildMessageGenerator; import net.i2p.util.Log; +import net.i2p.util.VersionComparator; /** * */ class BuildRequestor { - private static final List ORDER = new ArrayList(BuildMessageGenerator.ORDER.length); + private static final List<Integer> ORDER = new ArrayList(BuildMessageGenerator.ORDER.length); static { for (int i = 0; i < BuildMessageGenerator.ORDER.length; i++) ORDER.add(Integer.valueOf(i)); @@ -40,7 +42,13 @@ class BuildRequestor { * */ static final int REQUEST_TIMEOUT = 13*1000; + + /** make this shorter than REQUEST_TIMEOUT */ + private static final int FIRST_HOP_TIMEOUT = 10*1000; + /** some randomization is added on to this */ + private static final int BUILD_MSG_TIMEOUT = 60*1000; + private static boolean usePairedTunnels(RouterContext ctx) { String val = ctx.getProperty("router.usePairedTunnels"); if ( (val == null) || (Boolean.valueOf(val).booleanValue()) ) @@ -50,7 +58,7 @@ class BuildRequestor { } /** new style requests need to fill in the tunnel IDs before hand */ - public static void prepare(RouterContext ctx, PooledTunnelCreatorConfig cfg) { + private static void prepare(RouterContext ctx, PooledTunnelCreatorConfig cfg) { for (int i = 0; i < cfg.getLength(); i++) { if ( (!cfg.isInbound()) && (i == 0) ) { // outbound gateway (us) doesn't receive on a tunnel id @@ -67,8 +75,14 @@ class BuildRequestor { cfg.getConfig(i).setReplyIV(new ByteArray(iv)); cfg.getConfig(i).setReplyKey(ctx.keyGenerator().generateSessionKey()); } - cfg.setReplyMessageId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + // This is in BuildExecutor.buildTunnel() now + // And it was overwritten by the one in createTunnelBuildMessage() anyway! + //cfg.setReplyMessageId(ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE)); } + + /** + * @param cfg ReplyMessageId must be set + */ public static void request(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, BuildExecutor exec) { // new style crypto fills in all the blanks, while the old style waits for replies to fill in the next hop, etc prepare(ctx, cfg); @@ -136,8 +150,12 @@ class BuildRequestor { + " with msgId=" + msg.getUniqueId()); // send it directly to the first hop OutNetMessage outMsg = new OutNetMessage(ctx); - // Todo: add some fuzz to the expiration to make it harder to guess how many hops? - outMsg.setExpiration(msg.getMessageExpiration()); + // Add some fuzz to the TBM expiration to make it harder to guess how many hops + // or placement in the tunnel + msg.setMessageExpiration(ctx.clock().now() + BUILD_MSG_TIMEOUT + ctx.random().nextLong(20*1000)); + // We set the OutNetMessage expiration much shorter, so that the + // TunnelBuildFirstHopFailJob fires before the 13s build expiration. + outMsg.setExpiration(ctx.clock().now() + FIRST_HOP_TIMEOUT); outMsg.setMessage(msg); outMsg.setPriority(PRIORITY); RouterInfo peer = ctx.netDb().lookupRouterInfoLocally(cfg.getPeer(1)); @@ -156,33 +174,97 @@ class BuildRequestor { + "ms and dispatched in " + (System.currentTimeMillis()-beforeDispatch)); } + private static final String MIN_VARIABLE_VERSION = "0.7.12"; + /** change this to true in 0.7.13 if testing goes well */ + private static final boolean SEND_VARIABLE = false; + /** 5 (~2600 bytes) fits nicely in 3 tunnel messages */ + private static final int SHORT_RECORDS = 5; + private static final int LONG_RECORDS = TunnelBuildMessage.MAX_RECORD_COUNT; + private static final VersionComparator _versionComparator = new VersionComparator(); + private static final List<Integer> SHORT_ORDER = new ArrayList(SHORT_RECORDS); + static { + for (int i = 0; i < SHORT_RECORDS; i++) + SHORT_ORDER.add(Integer.valueOf(i)); + } + + private static boolean supportsVariable(RouterContext ctx, Hash h) { + RouterInfo ri = ctx.netDb().lookupRouterInfoLocally(h); + if (ri == null) + return false; + String v = ri.getOption("router.version"); + if (v == null) + return false; + return _versionComparator.compare(v, MIN_VARIABLE_VERSION) >= 0; + } + + /** + * If the tunnel is short enough, and everybody in the tunnel, and the + * OBEP or IBGW for the paired tunnel, all support the new variable-sized tunnel build message, + * then use that, otherwise the old 8-entry version. + */ private static TunnelBuildMessage createTunnelBuildMessage(RouterContext ctx, TunnelPool pool, PooledTunnelCreatorConfig cfg, TunnelInfo pairedTunnel, BuildExecutor exec) { Log log = ctx.logManager().getLog(BuildRequestor.class); long replyTunnel = 0; Hash replyRouter = null; + boolean useVariable = SEND_VARIABLE && cfg.getLength() <= SHORT_RECORDS; if (cfg.isInbound()) { - replyTunnel = 0; + //replyTunnel = 0; // as above replyRouter = ctx.routerHash(); + if (useVariable) { + // check the reply OBEP and all the tunnel peers except ourselves + if (!supportsVariable(ctx, pairedTunnel.getPeer(pairedTunnel.getLength() - 1))) { + useVariable = false; + } else { + for (int i = 0; i < cfg.getLength() - 1; i++) { + if (!supportsVariable(ctx, cfg.getPeer(i))) { + useVariable = false; + break; + } + } + } + } } else { replyTunnel = pairedTunnel.getReceiveTunnelId(0).getTunnelId(); replyRouter = pairedTunnel.getPeer(0); + if (useVariable) { + // check the reply IBGW and all the tunnel peers except ourselves + if (!supportsVariable(ctx, replyRouter)) { + useVariable = false; + } else { + for (int i = 1; i < cfg.getLength() - 1; i++) { + if (!supportsVariable(ctx, cfg.getPeer(i))) { + useVariable = false; + break; + } + } + } + } } // populate and encrypt the message - BuildMessageGenerator gen = new BuildMessageGenerator(); - TunnelBuildMessage msg = new TunnelBuildMessage(ctx); + TunnelBuildMessage msg; + List<Integer> order; + if (useVariable) { + msg = new VariableTunnelBuildMessage(ctx, SHORT_RECORDS); + order = new ArrayList(SHORT_ORDER); + if (log.shouldLog(Log.INFO)) + log.info("Using new VTBM"); + } else { + msg = new TunnelBuildMessage(ctx); + order = new ArrayList(ORDER); + } - long replyMessageId = ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE); - cfg.setReplyMessageId(replyMessageId); + // This is in BuildExecutor.buildTunnel() now + //long replyMessageId = ctx.random().nextLong(I2NPMessage.MAX_ID_VALUE); + //cfg.setReplyMessageId(replyMessageId); - List order = new ArrayList(ORDER); Collections.shuffle(order, ctx.random()); // randomized placement within the message cfg.setReplyOrder(order); if (log.shouldLog(Log.DEBUG)) log.debug("Build order: " + order + " for " + cfg); - for (int i = 0; i < BuildMessageGenerator.ORDER.length; i++) { + for (int i = 0; i < msg.getRecordCount(); i++) { int hop = ((Integer)order.get(i)).intValue(); PublicKey key = null; @@ -202,9 +284,9 @@ class BuildRequestor { } if (log.shouldLog(Log.DEBUG)) log.debug(cfg.getReplyMessageId() + ": record " + i + "/" + hop + " has key " + key); - gen.createRecord(i, hop, msg, cfg, replyRouter, replyTunnel, ctx, key); + BuildMessageGenerator.createRecord(i, hop, msg, cfg, replyRouter, replyTunnel, ctx, key); } - gen.layeredEncrypt(ctx, msg, cfg, order); + BuildMessageGenerator.layeredEncrypt(ctx, msg, cfg, order); return msg; }