diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 4980d2c9d..d73ae8c43 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -256,10 +256,14 @@ public class Connection { remaining = 0; if (packet.isFlagSet(Packet.FLAG_CLOSE) || (remaining < 2)) { packet.setOptionalDelay(0); + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); } else { int delay = _options.getRTO() / 2; packet.setOptionalDelay(delay); - _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); + if (delay > 0) + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Requesting ack delay of " + delay + "ms for packet " + packet); } packet.setFlag(Packet.FLAG_DELAY_REQUESTED); @@ -516,17 +520,30 @@ public class Connection { synchronized (_connectLock) { _connectLock.notifyAll(); } } + private boolean _remotePeerSet = false; /** who are we talking with */ public Destination getRemotePeer() { return _remotePeer; } - public void setRemotePeer(Destination peer) { _remotePeer = peer; } + public void setRemotePeer(Destination peer) { + if (_remotePeerSet) throw new RuntimeException("Remote peer already set [" + _remotePeer + ", " + peer + "]"); + _remotePeerSet = true; + _remotePeer = peer; + } + private boolean _sendStreamIdSet = false; /** what stream do we send data to the peer on? */ public long getSendStreamId() { return _sendStreamId; } - public void setSendStreamId(long id) { _sendStreamId = id; } + public void setSendStreamId(long id) { + if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]"); + _sendStreamIdSet = true; + _sendStreamId = id; + } + private boolean _receiveStreamIdSet = false; /** stream the peer sends data to us on. (may be null) */ public long getReceiveStreamId() { return _receiveStreamId; } public void setReceiveStreamId(long id) { + if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]"); + _receiveStreamIdSet = true; _receiveStreamId = id; synchronized (_connectLock) { _connectLock.notifyAll(); } } @@ -909,11 +926,14 @@ public class Connection { } // revamp various fields, in case we need to ack more, etc _inputStream.updateAcks(_packet); - _packet.setOptionalDelay(getOptions().getChoke()); + int choke = getOptions().getChoke(); + _packet.setOptionalDelay(choke); + if (choke > 0) + _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); _packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); _packet.setResendDelay(getOptions().getResendDelay()); - _packet.setReceiveStreamId(_receiveStreamId); - _packet.setSendStreamId(_sendStreamId); + //_packet.setReceiveStreamId(_receiveStreamId); + //_packet.setSendStreamId(_sendStreamId); int newWindowSize = getOptions().getWindowSize(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index a1bae2253..d5f278de0 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -143,15 +143,18 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { data.setValid(size); data.setOffset(0); packet.setPayload(data); - if ( (ackOnly && !forceIncrement) && (!isFirst) ) - packet.setSequenceNum(0); + if ( (ackOnly && !forceIncrement) && (!isFirst) ) + packet.setSequenceNum(0); else packet.setSequenceNum(con.getNextOutboundPacketNum()); packet.setSendStreamId(con.getSendStreamId()); packet.setReceiveStreamId(con.getReceiveStreamId()); con.getInputStream().updateAcks(packet); - packet.setOptionalDelay(con.getOptions().getChoke()); + int choke = con.getOptions().getChoke(); + packet.setOptionalDelay(choke); + if (choke > 0) + packet.setFlag(Packet.FLAG_DELAY_REQUESTED); packet.setResendDelay(con.getOptions().getResendDelay()); if (con.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index bdeac2060..06418beae 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -10,6 +10,7 @@ import net.i2p.data.Destination; import net.i2p.data.Signature; import net.i2p.data.SigningPrivateKey; import net.i2p.util.ByteCache; +import net.i2p.util.Log; /** * Contain a single packet transferred as part of a streaming connection. @@ -64,7 +65,6 @@ public class Packet { private Destination _optionFrom; private int _optionDelay; private int _optionMaxSize; - private ByteCache _cache; /** * The receiveStreamId will be set to this when the packet doesn't know @@ -146,22 +146,28 @@ public class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; private static final int MAX_DELAY_REQUEST = 65535; - public Packet() { - _cache = ByteCache.getInstance(128, MAX_PAYLOAD_SIZE); + public Packet() { } + + private boolean _sendStreamIdSet = false; + /** what stream do we send data to the peer on? */ + public long getSendStreamId() { return _sendStreamId; } + public void setSendStreamId(long id) { + if (_sendStreamIdSet) throw new RuntimeException("Send stream ID already set [" + _sendStreamId + ", " + id + "]"); + _sendStreamIdSet = true; + _sendStreamId = id; } - /** what stream is this packet a part of? */ - public long getSendStreamId() { return _sendStreamId; } - public void setSendStreamId(long id) { _sendStreamId = id; } - + private boolean _receiveStreamIdSet = false; /** - * Stream that replies should be sent on. if the - * connection is still being built, this should be - * null. - * + * stream the replies should be sent on. this should be 0 if the + * connection is still being built. */ public long getReceiveStreamId() { return _receiveStreamId; } - public void setReceiveStreamId(long id) { _receiveStreamId = id; } + public void setReceiveStreamId(long id) { + if (_receiveStreamIdSet) throw new RuntimeException("Receive stream ID already set [" + _receiveStreamId + ", " + id + "]"); + _receiveStreamIdSet = true; + _receiveStreamId = id; + } /** 0-indexed sequence number for this Packet in the sendStream */ public long getSequenceNum() { return _sequenceNum; } @@ -208,8 +214,6 @@ public class Packet { /** get the actual payload of the message. may be null */ public ByteArray getPayload() { return _payload; } public void setPayload(ByteArray payload) { - //if ( (_payload != null) && (_payload != payload) ) - // _cache.release(_payload); _payload = payload; if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) throw new IllegalArgumentException("Too large payload: " + payload.getValid()); @@ -218,15 +222,11 @@ public class Packet { return (_payload == null ? 0 : _payload.getValid()); } public void releasePayload() { - //if (_payload != null) - // _cache.release(_payload); _payload = null; } public ByteArray acquirePayload() { ByteArray old = _payload; - _payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]); //_cache.acquire(); - //if (old != null) - // _cache.release(old); + _payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]); return _payload; } @@ -239,6 +239,7 @@ public class Packet { else _flags &= ~flag; } + public void setFlags(int flags) { _flags = flags; } /** the signature on the packet (only included if the flag for it is set) */ public Signature getOptionalSignature() { return _optionSignature; } @@ -262,7 +263,6 @@ public class Packet { */ public int getOptionalDelay() { return _optionDelay; } public void setOptionalDelay(int delayMs) { - setFlag(FLAG_DELAY_REQUESTED, delayMs > 0); if (delayMs > MAX_DELAY_REQUEST) _optionDelay = MAX_DELAY_REQUEST; else if (delayMs < 0) @@ -418,30 +418,31 @@ public class Packet { if (length < 22) // min header size throw new IllegalArgumentException("Too small: len=" + buffer.length); int cur = offset; - _sendStreamId = DataHelper.fromLong(buffer, cur, 4); + setSendStreamId(DataHelper.fromLong(buffer, cur, 4)); cur += 4; - _receiveStreamId = DataHelper.fromLong(buffer, cur, 4); + setReceiveStreamId(DataHelper.fromLong(buffer, cur, 4)); cur += 4; - _sequenceNum = DataHelper.fromLong(buffer, cur, 4); + setSequenceNum(DataHelper.fromLong(buffer, cur, 4)); cur += 4; - _ackThrough = DataHelper.fromLong(buffer, cur, 4); + setAckThrough(DataHelper.fromLong(buffer, cur, 4)); cur += 4; int numNacks = (int)DataHelper.fromLong(buffer, cur, 1); cur++; if (length < 22 + numNacks*4) throw new IllegalArgumentException("Too small with " + numNacks + " nacks: " + length); if (numNacks > 0) { - _nacks = new long[numNacks]; + long nacks[] = new long[numNacks]; for (int i = 0; i < numNacks; i++) { - _nacks[i] = DataHelper.fromLong(buffer, cur, 4); + nacks[i] = DataHelper.fromLong(buffer, cur, 4); cur += 4; } + setNacks(nacks); } else { - _nacks = null; + setNacks(null); } - _resendDelay = (int)DataHelper.fromLong(buffer, cur, 1); + setResendDelay((int)DataHelper.fromLong(buffer, cur, 1)); cur++; - _flags = (int)DataHelper.fromLong(buffer, cur, 2); + setFlags((int)DataHelper.fromLong(buffer, cur, 2)); cur += 2; int optionSize = (int)DataHelper.fromLong(buffer, cur, 2); @@ -457,33 +458,36 @@ public class Packet { throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); // skip ahead to the payload - _payload = new ByteArray(new byte[payloadSize]); //_cache.acquire(); - System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize); - _payload.setValid(payloadSize); - _payload.setOffset(0); + //_payload = new ByteArray(new byte[payloadSize]); + _payload = new ByteArray(buffer, payloadBegin, payloadSize); + //System.arraycopy(buffer, payloadBegin, _payload.getData(), 0, payloadSize); + //_payload.setValid(payloadSize); + //_payload.setOffset(0); // ok now lets go back and deal with the options if (isFlagSet(FLAG_DELAY_REQUESTED)) { - _optionDelay = (int)DataHelper.fromLong(buffer, cur, 2); + setOptionalDelay((int)DataHelper.fromLong(buffer, cur, 2)); cur += 2; } if (isFlagSet(FLAG_FROM_INCLUDED)) { - _optionFrom = new Destination(); + Destination optionFrom = new Destination(); try { - cur += _optionFrom.readBytes(buffer, cur); + cur += optionFrom.readBytes(buffer, cur); + setOptionalFrom(optionFrom); } catch (DataFormatException dfe) { throw new IllegalArgumentException("Bad from field: " + dfe.getMessage()); } } if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) { - _optionMaxSize = (int)DataHelper.fromLong(buffer, cur, 2); + setOptionalMaxSize((int)DataHelper.fromLong(buffer, cur, 2)); cur += 2; } if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) { - _optionSignature = new Signature(); + Signature optionSignature = new Signature(); byte buf[] = new byte[Signature.SIGNATURE_BYTES]; System.arraycopy(buffer, cur, buf, 0, Signature.SIGNATURE_BYTES); - _optionSignature.setData(buf); + optionSignature.setData(buf); + setOptionalSignature(optionSignature); cur += Signature.SIGNATURE_BYTES; } } @@ -509,7 +513,12 @@ public class Packet { } boolean ok = ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey()); if (!ok) { - ctx.logManager().getLog(Packet.class).error("Signature failed on " + toString(), new Exception("moo")); + Log l = ctx.logManager().getLog(Packet.class); + l.error("Signature failed on " + toString(), new Exception("moo")); + if (false) { + l.error(Base64.encode(buffer, 0, size)); + l.error("Signature: " + Base64.encode(_optionSignature.getData())); + } } return ok; } @@ -524,6 +533,12 @@ public class Packet { setFlag(FLAG_SIGNATURE_INCLUDED); int size = writePacket(buffer, offset, false); _optionSignature = ctx.dsa().sign(buffer, offset, size, key); + if (false) { + Log l = ctx.logManager().getLog(Packet.class); + l.error("Signing: " + toString()); + l.error(Base64.encode(buffer, 0, size)); + l.error("Signature: " + Base64.encode(_optionSignature.getData())); + } // jump into the signed data and inject the signature where we // previously placed a bunch of zeroes int signatureOffset = offset diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 310c5e36b..6c27ebde9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -169,8 +169,8 @@ public class PacketHandler { try { con.getPacketHandler().receivePacket(packet, con); } catch (I2PException ie) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Received forged packet for " + con + ": " + packet, ie); + if (_log.shouldLog(Log.ERROR)) + _log.error("Received forged packet for " + con + "/" + oldId + ": " + packet, ie); con.setSendStreamId(oldId); } } else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index fce529e95..9b869fd5c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -5,7 +5,6 @@ import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.data.SessionKey; -import net.i2p.util.ByteCache; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -27,7 +26,6 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat private long _ackOn; private long _cancelledOn; private SimpleTimer.TimedEvent _resendEvent; - private ByteCache _cache = ByteCache.getInstance(128, MAX_PAYLOAD_SIZE); public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); @@ -71,8 +69,11 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public void prepare() { if (_connection != null) _connection.getInputStream().updateAcks(this); - if (_numSends > 0) // so we can debug to differentiate resends + if (_numSends > 0) { + // so we can debug to differentiate resends setOptionalDelay(_numSends * 1000); + setFlag(FLAG_DELAY_REQUESTED); + } } public long getCreatedOn() { return _createdOn; } diff --git a/history.txt b/history.txt index 7cd505071..c4232bb1d 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,8 @@ -$Id: history.txt,v 1.267 2005/09/27 02:17:41 jrandom Exp $ +$Id: history.txt,v 1.268 2005/09/27 17:42:49 jrandom Exp $ + +2005-09-28 jrandom + * Fix for at least some (all?) of the wrong stream errors in the streaming + lib 2005-09-27 jrandom * Properly suggest filenames for attachments in Syndie (thanks all!) diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index ec1b8a9f4..59dbf0996 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.247 $ $Date: 2005/09/27 02:17:40 $"; + public final static String ID = "$Revision: 1.248 $ $Date: 2005/09/27 17:42:49 $"; public final static String VERSION = "0.6.0.6"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);