diff --git a/history.txt b/history.txt index 8e23ebbea60869f0213357174a5c2a75a4f4533a..c5d894765dea1eebaf7967291023a30cf5311133 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,9 @@ -$Id: history.txt,v 1.36 2004/10/06 08:23:38 jrandom Exp $ +$Id: history.txt,v 1.37 2004/10/06 16:03:52 jrandom Exp $ + +2004-10-07 jrandom + * Expire queued messages even when the writer is blocked. + * Reimplement most of the I2NP writing with less temporary memory + allocations (I2NP reading still gobbles memory). 2004-10-06 jrandom * Implement an active queue management scheme on the TCP transports, diff --git a/router/java/src/net/i2p/data/i2np/DataMessage.java b/router/java/src/net/i2p/data/i2np/DataMessage.java index d55ce8a06d970781cd8fc8fef40199242a18cc90..99a9d45e7f37d75bec27cef338f4595c6713e8a8 100644 --- a/router/java/src/net/i2p/data/i2np/DataMessage.java +++ b/router/java/src/net/i2p/data/i2np/DataMessage.java @@ -54,15 +54,28 @@ public class DataMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { - ByteArrayOutputStream os = new ByteArrayOutputStream((_data != null ? _data.length + 4 : 4)); - try { - DataHelper.writeLong(os, 4, (_data != null ? _data.length : 0)); - os.write(_data); - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + if (_data == null) + return 4; + else + return 4 + _data.length; + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) { + if (_data == null) { + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + } else { + byte len[] = DataHelper.toLong(4, _data.length); + System.arraycopy(len, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_data, 0, out, curIndex, _data.length); + curIndex += _data.length; } - return os.toByteArray(); + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java b/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java index e2e9514b69591b958e5528cfb468c620c9508686..f1e1805c9da02baa797266dc31663e74053b6b02 100644 --- a/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java +++ b/router/java/src/net/i2p/data/i2np/DatabaseLookupMessage.java @@ -154,33 +154,48 @@ public class DatabaseLookupMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { + protected int calculateWrittenLength() { + int totalLength = 0; + totalLength += Hash.HASH_LENGTH*2; // key+fromHash + totalLength += 1; // hasTunnel? + if (_replyTunnel != null) + totalLength += 4; + totalLength += 2; // numPeers + if (_dontIncludePeers != null) + totalLength += Hash.HASH_LENGTH * _dontIncludePeers.size(); + return totalLength; + } + + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { if (_key == null) throw new I2NPMessageException("Key being searched for not specified"); if (_fromHash == null) throw new I2NPMessageException("From address not specified"); - ByteArrayOutputStream os = new ByteArrayOutputStream(32); - try { - _key.writeBytes(os); - _fromHash.writeBytes(os); - if (_replyTunnel != null) { - DataHelper.writeBoolean(os, Boolean.TRUE); - _replyTunnel.writeBytes(os); - } else { - DataHelper.writeBoolean(os, Boolean.FALSE); - } - if ( (_dontIncludePeers == null) || (_dontIncludePeers.size() <= 0) ) { - DataHelper.writeLong(os, 2, 0); - } else { - DataHelper.writeLong(os, 2, _dontIncludePeers.size()); - for (Iterator iter = _dontIncludePeers.iterator(); iter.hasNext(); ) { - Hash peer = (Hash)iter.next(); - peer.writeBytes(os); - } + System.arraycopy(_key.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + System.arraycopy(_fromHash.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + if (_replyTunnel != null) { + out[curIndex++] = DataHelper.BOOLEAN_TRUE; + byte id[] = DataHelper.toLong(4, _replyTunnel.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + } else { + out[curIndex++] = DataHelper.BOOLEAN_FALSE; + } + if ( (_dontIncludePeers == null) || (_dontIncludePeers.size() <= 0) ) { + out[curIndex++] = 0x0; + out[curIndex++] = 0x0; + } else { + byte len[] = DataHelper.toLong(2, _dontIncludePeers.size()); + out[curIndex++] = len[0]; + out[curIndex++] = len[1]; + for (Iterator iter = _dontIncludePeers.iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + System.arraycopy(peer.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; } - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); } - return os.toByteArray(); + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java b/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java index 6bb332f329a24b08da277f54b6058328bea57873..c842c8f5dc0cb853e9174baa4a92e21cb431f067 100644 --- a/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java +++ b/router/java/src/net/i2p/data/i2np/DatabaseSearchReplyMessage.java @@ -80,33 +80,30 @@ public class DatabaseSearchReplyMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + return Hash.HASH_LENGTH + 1 + getNumReplies()*Hash.HASH_LENGTH + Hash.HASH_LENGTH; + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { if (_key == null) throw new I2NPMessageException("Key in reply to not specified"); if (_peerHashes == null) throw new I2NPMessageException("Peer replies are null"); if (_from == null) throw new I2NPMessageException("No 'from' address specified!"); - - byte rv[] = null; - ByteArrayOutputStream os = new ByteArrayOutputStream(32); - try { - _key.writeBytes(os); - - DataHelper.writeLong(os, 1, _peerHashes.size()); - for (int i = 0; i < getNumReplies(); i++) { - Hash peer = getReply(i); - peer.writeBytes(os); - } - - _from.writeBytes(os); - rv = os.toByteArray(); - _context.statManager().addRateData("netDb.searchReplyMessageSendSize", rv.length, 1); - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); + System.arraycopy(_key.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + byte len[] = DataHelper.toLong(1, _peerHashes.size()); + out[curIndex++] = len[0]; + for (int i = 0; i < getNumReplies(); i++) { + System.arraycopy(getReply(i).getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; } - return rv; + System.arraycopy(_from.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java b/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java index aedba86136d0e135eaf236c02a2dc9525bcc2fee..5398447c858b5ef382a51720ade20c1238c9d7b9 100644 --- a/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java +++ b/router/java/src/net/i2p/data/i2np/DatabaseStoreMessage.java @@ -35,6 +35,8 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { private int _type; private LeaseSet _leaseSet; private RouterInfo _info; + private byte[] _leaseSetCache; + private byte[] _routerInfoCache; private long _replyToken; private TunnelId _replyTunnel; private Hash _replyGateway; @@ -156,37 +158,57 @@ public class DatabaseStoreMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + int len = Hash.HASH_LENGTH + 1 + 4; // key+type+replyToken + if (_replyToken > 0) + len += 4 + Hash.HASH_LENGTH; // replyTunnel+replyGateway + if (_type == KEY_TYPE_LEASESET) { + _leaseSetCache = _leaseSet.toByteArray(); + len += _leaseSetCache.length; + } else if (_type == KEY_TYPE_ROUTERINFO) { + byte uncompressed[] = _info.toByteArray(); + byte compressed[] = DataHelper.compress(uncompressed); + _routerInfoCache = compressed; + len += compressed.length + 2; + } + return len; + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { if (_key == null) throw new I2NPMessageException("Invalid key"); if ( (_type != KEY_TYPE_LEASESET) && (_type != KEY_TYPE_ROUTERINFO) ) throw new I2NPMessageException("Invalid key type"); if ( (_type == KEY_TYPE_LEASESET) && (_leaseSet == null) ) throw new I2NPMessageException("Missing lease set"); if ( (_type == KEY_TYPE_ROUTERINFO) && (_info == null) ) throw new I2NPMessageException("Missing router info"); - ByteArrayOutputStream os = new ByteArrayOutputStream(256); - try { - _key.writeBytes(os); - DataHelper.writeLong(os, 1, _type); - DataHelper.writeLong(os, 4, _replyToken); - if (_replyToken > 0) { - _replyTunnel.writeBytes(os); - _replyGateway.writeBytes(os); - } else { - // noop - } - if (_type == KEY_TYPE_LEASESET) { - _leaseSet.writeBytes(os); - } else if (_type == KEY_TYPE_ROUTERINFO) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(4*1024); - _info.writeBytes(baos); - byte uncompressed[] = baos.toByteArray(); - byte compressed[] = DataHelper.compress(uncompressed); - DataHelper.writeLong(os, 2, compressed.length); - os.write(compressed); - } - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); + System.arraycopy(_key.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + byte type[] = DataHelper.toLong(1, _type); + out[curIndex++] = type[0]; + byte tok[] = DataHelper.toLong(4, _replyToken); + System.arraycopy(tok, 0, out, curIndex, 4); + curIndex += 4; + + if (_replyToken > 0) { + byte id[] = DataHelper.toLong(4, _replyTunnel.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_replyGateway.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + } + + if (_type == KEY_TYPE_LEASESET) { + // initialized in calculateWrittenLength + System.arraycopy(_leaseSetCache, 0, out, curIndex, _leaseSetCache.length); + curIndex += _leaseSetCache.length; + } else if (_type == KEY_TYPE_ROUTERINFO) { + byte len[] = DataHelper.toLong(2, _routerInfoCache.length); + out[curIndex++] = len[0]; + out[curIndex++] = len[1]; + System.arraycopy(_routerInfoCache, 0, out, curIndex, _routerInfoCache.length); + curIndex += _routerInfoCache.length; } - return os.toByteArray(); + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/DeliveryStatusMessage.java b/router/java/src/net/i2p/data/i2np/DeliveryStatusMessage.java index 41514cdf543fcd3dda3032e87b882ad3cbfe1548..4626e449b820dbd15777eef513d80b62f5d236fa 100644 --- a/router/java/src/net/i2p/data/i2np/DeliveryStatusMessage.java +++ b/router/java/src/net/i2p/data/i2np/DeliveryStatusMessage.java @@ -52,17 +52,21 @@ public class DeliveryStatusMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + return 4 + DataHelper.DATE_LENGTH; // id + arrival + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { if ( (_id < 0) || (_arrival == null) ) throw new I2NPMessageException("Not enough data to write out"); - ByteArrayOutputStream os = new ByteArrayOutputStream(32); - try { - DataHelper.writeLong(os, 4, _id); - DataHelper.writeDate(os, _arrival); - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); - } - return os.toByteArray(); + byte id[] = DataHelper.toLong(4, _id); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + byte date[] = DataHelper.toDate(_arrival); + System.arraycopy(date, 0, out, curIndex, DataHelper.DATE_LENGTH); + curIndex += DataHelper.DATE_LENGTH; + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/GarlicClove.java b/router/java/src/net/i2p/data/i2np/GarlicClove.java index 32d2463d6b654288adf507eab7e0ea1d1a48dc6e..6fb6e24c819e5b971822d3fff58cbbab088846fd 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicClove.java +++ b/router/java/src/net/i2p/data/i2np/GarlicClove.java @@ -98,7 +98,7 @@ public class GarlicClove extends DataStructureImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Wrote instructions: " + _instructions); - _msg.writeBytes(out); + out.write(_msg.toByteArray()); DataHelper.writeLong(out, 4, _cloveId); DataHelper.writeDate(out, _expiration); if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/data/i2np/GarlicMessage.java b/router/java/src/net/i2p/data/i2np/GarlicMessage.java index d5dbf5abfb4597cb53dffe12733591eb9b1e5f81..537bf24fd8c71efaf2fd5fabeb4a5143c88f3630 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicMessage.java +++ b/router/java/src/net/i2p/data/i2np/GarlicMessage.java @@ -48,17 +48,18 @@ public class GarlicMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { - if ( (_data == null) || (_data.length <= 0) ) throw new I2NPMessageException("Not enough data to write out"); - - ByteArrayOutputStream os = new ByteArrayOutputStream(32); - try { - DataHelper.writeLong(os, 4, _data.length); - os.write(_data); - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); - } - return os.toByteArray(); + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + return 4 + _data.length; + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { + byte len[] = DataHelper.toLong(4, _data.length); + System.arraycopy(len, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_data, 0, out, curIndex, _data.length); + curIndex += _data.length; + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessage.java b/router/java/src/net/i2p/data/i2np/I2NPMessage.java index e4e54757f886401f4996aec4b3b67a551d95ccd9..a581ac9552b8e3fa233b31f9bbe0a504af412585 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessage.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessage.java @@ -52,5 +52,8 @@ public interface I2NPMessage extends DataStructure { public Date getMessageExpiration(); /** How large the message is, including any checksums */ - public int getSize(); + public int getMessageSize(); + + /** write the message to the buffer, returning the number of bytes written */ + public int toByteArray(byte buffer[]); } diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index faf69a6474f9882b77c191e7f4318b6552443bf4..9c182ea1b17f5ebf3d5d72a14be28db7d320fda0 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -31,6 +31,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM protected I2PAppContext _context; private Date _expiration; private long _uniqueId; + private byte _data[]; public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default @@ -39,15 +40,10 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM _log = context.logManager().getLog(I2NPMessageImpl.class); _expiration = new Date(_context.clock().now() + DEFAULT_EXPIRATION_MS); _uniqueId = _context.random().nextLong(MAX_ID_VALUE); + _context.statManager().createRateStat("i2np.writeTime", "How long it takes to write an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("i2np.readTime", "How long it takes to read an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); } - /** - * Write out the payload part of the message (not including the initial - * 1 byte type) - * - */ - protected abstract byte[] writeMessage() throws I2NPMessageException, IOException; - /** * Read the body into the data structures, after the initial type byte and * the uniqueId / expiration, using the current class's format as defined by @@ -70,6 +66,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM } public void readBytes(InputStream in, int type) throws I2NPMessageException, IOException { try { + long start = _context.clock().now(); if (type < 0) type = (int)DataHelper.readLong(in, 1); _uniqueId = DataHelper.readLong(in, 4); @@ -88,25 +85,20 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM if (_log.shouldLog(Log.DEBUG)) _log.debug("Reading bytes: type = " + type + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration); readMessage(new ByteArrayInputStream(data), type); + long time = _context.clock().now() - start; + if (time > 50) + _context.statManager().addRateData("i2np.readTime", time, time); } catch (DataFormatException dfe) { throw new I2NPMessageException("Error reading the message header", dfe); } } public void writeBytes(OutputStream out) throws DataFormatException, IOException { - try { - DataHelper.writeLong(out, 1, getType()); - DataHelper.writeLong(out, 4, _uniqueId); - DataHelper.writeDate(out, _expiration); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing bytes: type = " + getType() + " / uniqueId : " + _uniqueId + " / expiration : " + _expiration); - byte[] data = writeMessage(); - DataHelper.writeLong(out, 2, data.length); - Hash h = _context.sha().calculateHash(data); - h.writeBytes(out); - out.write(data); - } catch (I2NPMessageException ime) { - throw new DataFormatException("Error writing out the I2NP message data", ime); - } + int size = getMessageSize(); + if (size < 47) throw new DataFormatException("Unable to build the message"); + byte buf[] = new byte[size]; + int read = toByteArray(buf); + if (read < 0) + out.write(buf, 0, read); } /** @@ -122,14 +114,76 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM public Date getMessageExpiration() { return _expiration; } public void setMessageExpiration(Date exp) { _expiration = exp; } - public int getSize() { + public synchronized int getMessageSize() { + return calculateWrittenLength()+47; // 47 bytes in the header + } + + public byte[] toByteArray() { + byte data[] = new byte[getMessageSize()]; + int written = toByteArray(data); + if (written != data.length) { + _log.error("Error writing out " + data.length + " for " + getClass().getName()); + return null; + } + return data; + } + + public int toByteArray(byte buffer[]) { + long start = _context.clock().now(); + + byte prefix[][] = new byte[][] { DataHelper.toLong(1, getType()), + DataHelper.toLong(4, _uniqueId), + DataHelper.toDate(_expiration), + new byte[2], + new byte[Hash.HASH_LENGTH]}; + byte suffix[][] = new byte[][] { }; try { - byte msg[] = writeMessage(); - return msg.length + 43; - } catch (IOException ioe) { - return 0; + int writtenLen = toByteArray(buffer, prefix, suffix); + + int prefixLen = 1+4+8+2+Hash.HASH_LENGTH; + int suffixLen = 0; + int payloadLen = writtenLen - prefixLen - suffixLen; + Hash h = _context.sha().calculateHash(buffer, prefixLen, payloadLen); + + byte len[] = DataHelper.toLong(2, payloadLen); + buffer[1+4+8] = len[0]; + buffer[1+4+8+1] = len[1]; + for (int i = 0; i < Hash.HASH_LENGTH; i++) + System.arraycopy(h.getData(), 0, buffer, 1+4+8+2, Hash.HASH_LENGTH); + + long time = _context.clock().now() - start; + if (time > 50) + _context.statManager().addRateData("i2np.writeTime", time, time); + + return writtenLen; } catch (I2NPMessageException ime) { - return 0; + _context.logManager().getLog(getClass()).error("Error writing", ime); + throw new IllegalStateException("Unable to serialize the message: " + ime.getMessage()); + } + } + + /** calculate the message body's length (not including the header and footer */ + protected abstract int calculateWrittenLength(); + /** + * write the message body to the output array, starting at the given index. + * @return the index into the array after the last byte written + */ + protected abstract int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException; + + protected int toByteArray(byte out[], byte[][] prefix, byte[][] suffix) throws I2NPMessageException { + int curIndex = 0; + for (int i = 0; i < prefix.length; i++) { + System.arraycopy(prefix[i], 0, out, curIndex, prefix[i].length); + curIndex += prefix[i].length; + } + + curIndex = writeMessageBody(out, curIndex); + + for (int i = 0; i < suffix.length; i++) { + System.arraycopy(suffix[i], 0, out, curIndex, suffix[i].length); + curIndex += suffix[i].length; } + + return curIndex; } } diff --git a/router/java/src/net/i2p/data/i2np/TunnelCreateMessage.java b/router/java/src/net/i2p/data/i2np/TunnelCreateMessage.java index b067be30c66e8d477cb79f78d908f12b1f1d6835..f9afb5969d2f5bb63d92347c34b793f451215d9e 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelCreateMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelCreateMessage.java @@ -19,6 +19,8 @@ import net.i2p.data.DataHelper; import net.i2p.data.Hash; import net.i2p.data.SessionKey; import net.i2p.data.SessionTag; +import net.i2p.data.SigningPrivateKey; +import net.i2p.data.SigningPublicKey; import net.i2p.data.TunnelId; import net.i2p.util.Log; @@ -52,6 +54,8 @@ public class TunnelCreateMessage extends I2NPMessageImpl { private TunnelId _replyTunnel; private Hash _replyPeer; + private byte[] _certificateCache; + public static final int PARTICIPANT_TYPE_GATEWAY = 1; public static final int PARTICIPANT_TYPE_ENDPOINT = 2; public static final int PARTICIPANT_TYPE_OTHER = 3; @@ -173,42 +177,94 @@ public class TunnelCreateMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { - ByteArrayOutputStream os = new ByteArrayOutputStream(32); - try { - DataHelper.writeLong(os, 1, _participantType); - if (_participantType != PARTICIPANT_TYPE_ENDPOINT) { - _nextRouter.writeBytes(os); - _nextTunnelId.writeBytes(os); - } - _tunnelId.writeBytes(os); - DataHelper.writeLong(os, 4, _tunnelDuration); - _configKey.writeBytes(os); - - DataHelper.writeLong(os, 4, _maxPeakMessagesPerMin); - DataHelper.writeLong(os, 4, _maxAvgMessagesPerMin); - DataHelper.writeLong(os, 4, _maxPeakBytesPerMin); - DataHelper.writeLong(os, 4, _maxAvgBytesPerMin); + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + int length = 0; + length += 1; // participantType + if (_participantType != PARTICIPANT_TYPE_ENDPOINT) { + length += Hash.HASH_LENGTH; + length += 4; // nextTunnelId + } + length += 4; // tunnelId + length += 4; // duration; + length += SessionKey.KEYSIZE_BYTES; + length += 4*4; // max limits + length += 1; // flags + length += SigningPublicKey.KEYSIZE_BYTES; + if (_participantType == PARTICIPANT_TYPE_GATEWAY) + length += SigningPrivateKey.KEYSIZE_BYTES; + if ( (_participantType == PARTICIPANT_TYPE_ENDPOINT) + || (_participantType == PARTICIPANT_TYPE_GATEWAY) ) + length += SessionKey.KEYSIZE_BYTES; + _certificateCache = _certificate.toByteArray(); + length += _certificateCache.length; + length += SessionTag.BYTE_LENGTH; + length += SessionKey.KEYSIZE_BYTES; + length += 4; // replyTunnel + length += Hash.HASH_LENGTH; // replyPeer + return length; + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { + byte type[] = DataHelper.toLong(1, _participantType); + out[curIndex++] = type[0]; + if (_participantType != PARTICIPANT_TYPE_ENDPOINT) { + System.arraycopy(_nextRouter.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + byte id[] = DataHelper.toLong(4, _nextTunnelId.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + } + byte id[] = DataHelper.toLong(4, _tunnelId.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + byte duration[] = DataHelper.toLong(4, _tunnelDuration); + System.arraycopy(duration, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_configKey.getKey().getData(), 0, out, curIndex, SessionKey.KEYSIZE_BYTES); + curIndex += SessionKey.KEYSIZE_BYTES; + + byte val[] = DataHelper.toLong(4, _maxPeakMessagesPerMin); + System.arraycopy(val, 0, out, curIndex, 4); + curIndex += 4; + val = DataHelper.toLong(4, _maxAvgMessagesPerMin); + System.arraycopy(val, 0, out, curIndex, 4); + curIndex += 4; + val = DataHelper.toLong(4, _maxPeakBytesPerMin); + System.arraycopy(val, 0, out, curIndex, 4); + curIndex += 4; + val = DataHelper.toLong(4, _maxAvgBytesPerMin); + System.arraycopy(val, 0, out, curIndex, 4); + curIndex += 4; - long flags = getFlags(); - DataHelper.writeLong(os, 1, flags); + long flags = getFlags(); + byte flag[] = DataHelper.toLong(1, flags); + out[curIndex++] = flag[0]; - _verificationPubKey.writeBytes(os); - if (_participantType == PARTICIPANT_TYPE_GATEWAY) { - _verificationPrivKey.writeBytes(os); - } - if ( (_participantType == PARTICIPANT_TYPE_ENDPOINT) || (_participantType == PARTICIPANT_TYPE_GATEWAY) ) { - _tunnelKey.writeBytes(os); - } - _certificate.writeBytes(os); - _replyTag.writeBytes(os); - _replyKey.writeBytes(os); - _replyTunnel.writeBytes(os); - _replyPeer.writeBytes(os); - } catch (Throwable t) { - throw new I2NPMessageException("Error writing out the message data", t); + System.arraycopy(_verificationPubKey.getKey().getData(), 0, out, curIndex, SigningPublicKey.KEYSIZE_BYTES); + curIndex += SigningPublicKey.KEYSIZE_BYTES; + + if (_participantType == PARTICIPANT_TYPE_GATEWAY) { + System.arraycopy(_verificationPrivKey.getKey().getData(), 0, out, curIndex, SigningPrivateKey.KEYSIZE_BYTES); + curIndex += SigningPrivateKey.KEYSIZE_BYTES; + } + + if ( (_participantType == PARTICIPANT_TYPE_ENDPOINT) || (_participantType == PARTICIPANT_TYPE_GATEWAY) ) { + System.arraycopy(_tunnelKey.getKey().getData(), 0, out, curIndex, SessionKey.KEYSIZE_BYTES); + curIndex += SessionKey.KEYSIZE_BYTES; } - return os.toByteArray(); + System.arraycopy(_certificateCache, 0, out, curIndex, _certificateCache.length); + curIndex += _certificateCache.length; + System.arraycopy(_replyTag.getData(), 0, out, curIndex, SessionTag.BYTE_LENGTH); + curIndex += SessionTag.BYTE_LENGTH; + System.arraycopy(_replyKey.getData(), 0, out, curIndex, SessionKey.KEYSIZE_BYTES); + curIndex += SessionKey.KEYSIZE_BYTES; + id = DataHelper.toLong(4, _replyTunnel.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_replyPeer.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + return curIndex; } private boolean flagsIncludeDummy(long flags) { @@ -304,4 +360,5 @@ public class TunnelCreateMessage extends I2NPMessageImpl { buf.append("]"); return buf.toString(); } + } diff --git a/router/java/src/net/i2p/data/i2np/TunnelCreateStatusMessage.java b/router/java/src/net/i2p/data/i2np/TunnelCreateStatusMessage.java index 91f1219421bca9bf72ae9fb8b1137d8f2a9744da..323b101946f26f2a2cd06ee8a513696f8c778f7f 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelCreateStatusMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelCreateStatusMessage.java @@ -70,18 +70,22 @@ public class TunnelCreateStatusMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + return 4 + 1 + Hash.HASH_LENGTH; // id + status + from + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { if ( (_tunnelId == null) || (_from == null) ) throw new I2NPMessageException("Not enough data to write out"); - ByteArrayOutputStream os = new ByteArrayOutputStream(32); - try { - _tunnelId.writeBytes(os); - DataHelper.writeLong(os, 1, (_status < 0 ? 255 : _status)); - _from.writeBytes(os); - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); - } - return os.toByteArray(); + byte id[] = DataHelper.toLong(4, _tunnelId.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + byte status[] = DataHelper.toLong(1, _status); + out[curIndex++] = status[0]; + System.arraycopy(_from.getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/data/i2np/TunnelMessage.java b/router/java/src/net/i2p/data/i2np/TunnelMessage.java index 80da32abfc0491be98ca15a06af7f9ff2c99541c..92c12d3530d477f882038f2829c6b4a47075e95c 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelMessage.java @@ -15,6 +15,8 @@ import java.io.InputStream; import net.i2p.I2PAppContext; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; +import net.i2p.data.Hash; +import net.i2p.data.Signature; import net.i2p.data.TunnelId; import net.i2p.util.Log; @@ -47,7 +49,11 @@ public class TunnelMessage extends I2NPMessageImpl { public void setTunnelId(TunnelId id) { _tunnelId = id; } public byte[] getData() { return _data; } - public void setData(byte data[]) { _data = data; } + public void setData(byte data[]) { + _data = data; + if ( (data != null) && (_data.length <= 0) ) + throw new IllegalArgumentException("Empty tunnel payload?"); + } public TunnelVerificationStructure getVerificationStructure() { return _verification; } public void setVerificationStructure(TunnelVerificationStructure verification) { _verification = verification; } @@ -85,41 +91,54 @@ public class TunnelMessage extends I2NPMessageImpl { } } - protected byte[] writeMessage() throws I2NPMessageException, IOException { - if ( (_tunnelId == null) || (_data == null) || (_data.length <= 0) ) - throw new I2NPMessageException("Not enough data to write out"); + /** calculate the message body's length (not including the header and footer */ + protected int calculateWrittenLength() { + int length = 0; + length += 4; // tunnelId + length += 4; // data length + length += _data.length; + if ( (_verification == null) || (_encryptedInstructions == null) ) { + length += 1; // include verification? + } else { + length += 1; // include verification? + length += Hash.HASH_LENGTH + Signature.SIGNATURE_BYTES; + length += 2; // instructions length + length += _encryptedInstructions.length; + } + return length; + } + /** write the message body to the output array, starting at the given index */ + protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { + if ( (_tunnelId == null) || (_data == null) ) + throw new I2NPMessageException("Not enough data to write out (id=" + _tunnelId + " data=" + _data + ")"); + if (_data.length <= 0) + throw new I2NPMessageException("Not enough data to write out (data.length=" + _data.length + ")"); - ByteArrayOutputStream os = new ByteArrayOutputStream(64+_data.length); - try { - _tunnelId.writeBytes(os); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing tunnel message for tunnel " + _tunnelId); - DataHelper.writeLong(os, 4, _data.length); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing tunnel message length: " + _data.length); - os.write(_data); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing tunnel message data"); - if ( (_verification == null) || (_encryptedInstructions == null) ) { - DataHelper.writeLong(os, 1, FLAG_DONT_INCLUDESTRUCTURE); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing DontIncludeStructure flag"); - } else { - DataHelper.writeLong(os, 1, FLAG_INCLUDESTRUCTURE); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Writing IncludeStructure flag, then the verification structure, then the " + - "E(instr).length [" + _encryptedInstructions.length + "], then the E(instr)"); - _verification.writeBytes(os); - DataHelper.writeLong(os, 2, _encryptedInstructions.length); - os.write(_encryptedInstructions); - } - } catch (DataFormatException dfe) { - throw new I2NPMessageException("Error writing out the message data", dfe); + byte id[] = DataHelper.toLong(4, _tunnelId.getTunnelId()); + System.arraycopy(id, 0, out, curIndex, 4); + curIndex += 4; + byte len[] = DataHelper.toLong(4, _data.length); + System.arraycopy(len, 0, out, curIndex, 4); + curIndex += 4; + System.arraycopy(_data, 0, out, curIndex, _data.length); + curIndex += _data.length; + if ( (_verification == null) || (_encryptedInstructions == null) ) { + byte flag[] = DataHelper.toLong(1, FLAG_DONT_INCLUDESTRUCTURE); + out[curIndex++] = flag[0]; + } else { + byte flag[] = DataHelper.toLong(1, FLAG_INCLUDESTRUCTURE); + out[curIndex++] = flag[0]; + System.arraycopy(_verification.getMessageHash().getData(), 0, out, curIndex, Hash.HASH_LENGTH); + curIndex += Hash.HASH_LENGTH; + System.arraycopy(_verification.getAuthorizationSignature().getData(), 0, out, curIndex, Signature.SIGNATURE_BYTES); + curIndex += Signature.SIGNATURE_BYTES; + len = DataHelper.toLong(2, _encryptedInstructions.length); + System.arraycopy(len, 0, out, curIndex, 2); + curIndex += 2; + System.arraycopy(_encryptedInstructions, 0, out, curIndex, _encryptedInstructions.length); + curIndex += _encryptedInstructions.length; } - byte rv[] = os.toByteArray(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Overall data being written: " + rv.length); - return rv; + return curIndex; } public int getType() { return MESSAGE_TYPE; } diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 4d76ed20e17e7b359d5cbbea4bc8debf034e2244..b3c6e4ec57bd06f904090e4bdf524298ca875dc9 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -139,37 +139,18 @@ public class OutNetMessage { public long getMessageSize() { if (_messageSize <= 0) { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(2048); // large enough to hold most messages - _message.writeBytes(baos); - long sz = baos.size(); - baos.reset(); - _messageSize = sz; - } catch (DataFormatException dfe) { - _log.error("Error serializing the I2NPMessage for the OutNetMessage", dfe); - } catch (IOException ioe) { - _log.error("Error serializing the I2NPMessage for the OutNetMessage", ioe); - } + _messageSize = _message.getMessageSize(); } return _messageSize; } - public byte[] getMessageData() { + + public int getMessageData(byte outBuffer[]) { if (_message == null) { - return null; + return -1; } else { - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); // large enough to hold most messages - _message.writeBytes(baos); - byte data[] = baos.toByteArray(); - _messageSize = data.length; - return data; - } catch (DataFormatException dfe) { - _log.error("Error serializing the I2NPMessage for the OutNetMessage", dfe); - } catch (IOException ioe) { - _log.error("Error serializing the I2NPMessage for the OutNetMessage", ioe); - } - - return null; + int len = _message.toByteArray(outBuffer); + _messageSize = len; + return len; } } diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f56c4ce3a2ae0e2f3b9854a430f546ee93051b5e..5027511d6725f4045be85b720ec8fcaef2e3951b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.44 $ $Date: 2004/10/05 20:12:03 $"; + public final static String ID = "$Revision: 1.45 $ $Date: 2004/10/06 16:03:52 $"; public final static String VERSION = "0.4.1.1"; - public final static long BUILD = 10; + public final static long BUILD = 11; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java index 0877f9ec247f36f102ba6c1bd2154ca9699574a1..d4f840e33cd525f9c32928983771713e4cd37172 100644 --- a/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/HandleTunnelMessageJob.java @@ -335,21 +335,11 @@ public class HandleTunnelMessageJob extends JobImpl { + router.toBase64()); TunnelMessage msg = new TunnelMessage(getContext()); msg.setTunnelId(id); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - body.writeBytes(baos); - msg.setData(baos.toByteArray()); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, FORWARD_TIMEOUT, FORWARD_PRIORITY)); + msg.setData(body.toByteArray()); + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, router, FORWARD_TIMEOUT, FORWARD_PRIORITY)); - String bodyType = body.getClass().getName(); - getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the message to forward to the tunnel", dfe); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the message to forward to the tunnel", ioe); - } + String bodyType = body.getClass().getName(); + getContext().messageHistory().wrap(bodyType, body.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); } private void sendToRouter(Hash router, I2NPMessage body) { @@ -421,6 +411,11 @@ public class HandleTunnelMessageJob extends JobImpl { _log.error("Error decrypting the message", getAddedBy()); return null; } + if (decrypted.length <= 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Received an empty decrypted message? encrypted length: " + encryptedMessage.length, getAddedBy()); + return null; + } return getBody(decrypted); } diff --git a/router/java/src/net/i2p/router/message/MessageHandler.java b/router/java/src/net/i2p/router/message/MessageHandler.java index 4ba497c2fa11ad4d333409e2f434e9a2bf5d8d62..27a968b6c302821e693053a2e277b3f4717447fd 100644 --- a/router/java/src/net/i2p/router/message/MessageHandler.java +++ b/router/java/src/net/i2p/router/message/MessageHandler.java @@ -135,22 +135,16 @@ class MessageHandler { _log.info("Handle " + message.getClass().getName() + " to send to remote tunnel " + tunnelId.getTunnelId() + " on router " + to.toBase64()); TunnelMessage msg = new TunnelMessage(_context); - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - try { - message.writeBytes(baos); - msg.setData(baos.toByteArray()); - msg.setTunnelId(tunnelId); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Placing message of type " + message.getClass().getName() - + " into the new tunnel message bound for " + tunnelId.getTunnelId() - + " on " + to.toBase64()); - _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority)); + msg.setData(message.toByteArray()); + msg.setTunnelId(tunnelId); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Placing message of type " + message.getClass().getName() + + " into the new tunnel message bound for " + tunnelId.getTunnelId() + + " on " + to.toBase64()); + _context.jobQueue().addJob(new SendMessageDirectJob(_context, msg, to, (int)timeoutMs, priority)); - String bodyType = message.getClass().getName(); - _context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); - } catch (Exception e) { - _log.warn("Unable to forward on according to the instructions to the remote tunnel", e); - } + String bodyType = message.getClass().getName(); + _context.messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); } private void handleLocalDestination(DeliveryInstructions instructions, I2NPMessage message, Hash fromHash) { diff --git a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java index ff1fb6e8e7571a6e0ac1220f317c29568ebea2f0..27543304506f8f713532742bf26242a12ecdc2fd 100644 --- a/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java +++ b/router/java/src/net/i2p/router/message/SendTunnelMessageJob.java @@ -135,33 +135,19 @@ public class SendTunnelMessageJob extends JobImpl { */ private void forwardToGateway() { TunnelMessage msg = new TunnelMessage(getContext()); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - _message.writeBytes(baos); - msg.setData(baos.toByteArray()); - msg.setTunnelId(_tunnelId); - msg.setMessageExpiration(new Date(_expiration)); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, - _destRouter, _onSend, - _onReply, _onFailure, - _selector, - (int)(_expiration-getContext().clock().now()), - _priority)); + msg.setData(_message.toByteArray()); + msg.setTunnelId(_tunnelId); + msg.setMessageExpiration(new Date(_expiration)); + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, + _destRouter, _onSend, + _onReply, _onFailure, + _selector, + (int)(_expiration-getContext().clock().now()), + _priority)); - String bodyType = _message.getClass().getName(); - getContext().messageHistory().wrap(bodyType, _message.getUniqueId(), - TunnelMessage.class.getName(), msg.getUniqueId()); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", ioe); - if (_onFailure != null) - getContext().jobQueue().addJob(_onFailure); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", dfe); - if (_onFailure != null) - getContext().jobQueue().addJob(_onFailure); - } + String bodyType = _message.getClass().getName(); + getContext().messageHistory().wrap(bodyType, _message.getUniqueId(), + TunnelMessage.class.getName(), msg.getUniqueId()); return; } @@ -391,7 +377,8 @@ public class SendTunnelMessageJob extends JobImpl { private byte[] encrypt(DataStructure struct, SessionKey key, int paddedSize) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(paddedSize); - struct.writeBytes(baos); + byte data[] = struct.toByteArray(); + baos.write(data); byte iv[] = new byte[16]; Hash h = getContext().sha().calculateHash(key.getData()); @@ -400,9 +387,6 @@ public class SendTunnelMessageJob extends JobImpl { } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Error writing out data to encrypt", ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error formatting data to encrypt", dfe); } return null; } @@ -451,17 +435,8 @@ public class SendTunnelMessageJob extends JobImpl { tmsg.setEncryptedDeliveryInstructions(null); tmsg.setTunnelId(_targetTunnelId); tmsg.setVerificationStructure(null); - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - try { - _message.writeBytes(baos); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the message to be forwarded...??", ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing message to be forwarded...???", dfe); - } - tmsg.setData(baos.toByteArray()); + byte data[] = _message.toByteArray(); + tmsg.setData(data); msg = tmsg; } else { if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java index 4b7424cf07851088b1a62a7f78ca24bea80825b6..b46e0877c24ba1394001e7fea0b27114a5de8727 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java @@ -185,23 +185,13 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { long expiration = REPLY_TIMEOUT + getContext().clock().now(); TunnelMessage msg = new TunnelMessage(getContext()); - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(1024); - message.writeBytes(baos); - msg.setData(baos.toByteArray()); - msg.setTunnelId(replyTunnel); - msg.setMessageExpiration(new Date(expiration)); - getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, toPeer, null, null, null, null, REPLY_TIMEOUT, MESSAGE_PRIORITY)); + msg.setData(message.toByteArray()); + msg.setTunnelId(replyTunnel); + msg.setMessageExpiration(new Date(expiration)); + getContext().jobQueue().addJob(new SendMessageDirectJob(getContext(), msg, toPeer, null, null, null, null, REPLY_TIMEOUT, MESSAGE_PRIORITY)); - String bodyType = message.getClass().getName(); - getContext().messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", ioe); - } catch (DataFormatException dfe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the tunnel message to send to the tunnel", dfe); - } + String bodyType = message.getClass().getName(); + getContext().messageHistory().wrap(bodyType, message.getUniqueId(), TunnelMessage.class.getName(), msg.getUniqueId()); } public String getName() { return "Handle Database Lookup Message"; } diff --git a/router/java/src/net/i2p/router/transport/VMCommSystem.java b/router/java/src/net/i2p/router/transport/VMCommSystem.java index 59250a391bf72226aff485767391c091ef8688b8..c1ed9980390043f660825c17f9c5df2223e95057 100644 --- a/router/java/src/net/i2p/router/transport/VMCommSystem.java +++ b/router/java/src/net/i2p/router/transport/VMCommSystem.java @@ -66,7 +66,8 @@ public class VMCommSystem extends CommSystemFacade { } else { _context.jobQueue().addJob(msg.getOnSendJob()); _context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), "vm", sendTime, msg.getMessageSize()); - byte data[] = msg.getMessageData(); + byte data[] = new byte[(int)msg.getMessageSize()]; + msg.getMessageData(data); _context.statManager().addRateData("transport.sendMessageSize", data.length, sendTime); if (data.length < 1024) diff --git a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java index 606dfac97b96a9db6ff6f0e3b9b7835da31fe855..b51e72ef9bb7be06a48b9044016d9e29dbbd7f5c 100644 --- a/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java +++ b/router/java/src/net/i2p/router/transport/tcp/ConnectionRunner.java @@ -20,6 +20,7 @@ class ConnectionRunner implements Runnable { private RouterContext _context; private TCPConnection _con; private boolean _keepRunning; + private byte _writeBuffer[]; public ConnectionRunner(RouterContext ctx, TCPConnection con) { _context = ctx; @@ -30,6 +31,7 @@ class ConnectionRunner implements Runnable { public void startRunning() { _keepRunning = true; + _writeBuffer = new byte[38*1024]; // expansion factor String name = "TCP " + _context.routerHash().toBase64().substring(0,6) + " to " @@ -56,8 +58,18 @@ class ConnectionRunner implements Runnable { } private void sendMessage(OutNetMessage msg) { - byte data[] = msg.getMessageData(); - if (data == null) { + byte buf[] = _writeBuffer; + int written = 0; + try { + written = msg.getMessageData(_writeBuffer); + } catch (ArrayIndexOutOfBoundsException aioobe) { + I2NPMessage m = msg.getMessage(); + if (m != null) { + buf = m.toByteArray(); + written = buf.length; + } + } + if (written <= 0) { if (_log.shouldLog(Log.WARN)) _log.warn("message " + msg.getMessageType() + "/" + msg.getMessageId() + " expired before it could be sent"); @@ -76,7 +88,7 @@ class ConnectionRunner implements Runnable { try { synchronized (out) { before = _context.clock().now(); - out.write(data); + out.write(buf, 0, written); out.flush(); after = _context.clock().now(); } diff --git a/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java b/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java index f1ffd8d1f6d4451a4d6ddbed2eb25d4a14e2f162..803e79b2f7029f2394fe73eaa96a2264bbedb0b5 100644 --- a/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java +++ b/router/java/src/net/i2p/router/transport/tcp/MessageHandler.java @@ -34,7 +34,7 @@ public class MessageHandler implements I2NPMessageReader.I2NPMessageEventListene _log.debug("Just received message " + message.getUniqueId() + " from " + _identHash.toBase64().substring(0,6) + " readTime = " + msToRead + "ms type = " + message.getClass().getName()); - _transport.messageReceived(message, _ident, _identHash, msToRead, message.getSize()); + _transport.messageReceived(message, _ident, _identHash, msToRead, message.getMessageSize()); } public void readError(I2NPMessageReader reader, Exception error) { diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 2e10c5b88341e07c43618f0f11f0dfdf0928e4b6..abc85d0bca96a142161525efa72fc81659c36105 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -151,11 +151,27 @@ public class TCPConnection { */ public void addMessage(OutNetMessage msg) { msg.timestamp("TCPConnection.addMessage"); + List expired = null; + int remaining = 0; synchronized (_pendingMessages) { _pendingMessages.add(msg); + expired = locked_expireOld(); locked_throttle(); + remaining = _pendingMessages.size(); _pendingMessages.notifyAll(); } + if (expired != null) { + for (int i = 0; i < expired.size(); i++) { + OutNetMessage cur = (OutNetMessage)expired.get(i); + cur.timestamp("TCPConnection.addMessage expired"); + if (_log.shouldLog(Log.WARN)) + _log.warn("Message " + cur.getMessageId() + " expired on the queue to " + + _ident.getHash().toBase64().substring(0,6) + + " (queue size " + remaining + ") with lifetime " + + cur.getLifetime()); + sent(cur, false, 0); + } + } } /** @@ -234,6 +250,22 @@ public class TCPConnection { return (int)(100.0*(msgSize/excessBytesQueued)); } + private List locked_expireOld() { + long now = _context.clock().now(); + List expired = null; + for (int i = 0; i < _pendingMessages.size(); i++) { + OutNetMessage cur = (OutNetMessage)_pendingMessages.get(i); + if (cur.getExpiration() < now) { + _pendingMessages.remove(i); + if (expired == null) + expired = new ArrayList(1); + expired.add(cur); + i--; + } + } + return expired; + } + /** * Blocking call to retrieve the next pending message. As a side effect, * this fails messages on the queue that have expired, and in turn never