From a69267dc87626d999fcf182ffb2ee4f10bdbf3e6 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Thu, 1 Sep 2011 13:24:31 +0000 Subject: [PATCH] UDP cleanups --- .../i2p/router/transport/udp/ACKSender.java | 2 +- .../transport/udp/EstablishmentManager.java | 34 ++--- .../router/transport/udp/PacketBuilder.java | 7 +- .../i2p/router/transport/udp/PeerState.java | 124 +++++++++++------- .../router/transport/udp/UDPPacketReader.java | 2 +- 5 files changed, 94 insertions(+), 75 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 7136f8a9de..3c56d11ab7 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -62,7 +62,7 @@ class ACKSender implements Runnable { public void shutdown() { _alive = false; - PeerState poison = new PeerState(_context, _transport); + PeerState poison = new PeerState(_context, _transport, null, 0, null, false); poison.setTheyRelayToUsAs(POISON_PS); _peersToACK.offer(poison); for (int i = 1; i <= 5 && !_peersToACK.isEmpty(); i++) { diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 5d5021b597..d5febc514d 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -285,7 +285,8 @@ class EstablishmentManager { // count as connections, we have to keep the connection to this peer up longer if // we are offering introductions. if ((!_context.router().isHidden()) && (!_transport.introducersRequired()) && _transport.haveCapacity()) { - long tag = _context.random().nextLong(MAX_TAG_VALUE); + // ensure > 0 + long tag = 1 + _context.random().nextLong(MAX_TAG_VALUE); state.setSentRelayTag(tag); if (_log.shouldLog(Log.INFO)) _log.info("Received session request from " + from + ", sending relay tag " + tag); @@ -460,18 +461,13 @@ class EstablishmentManager { long now = _context.clock().now(); RouterIdentity remote = state.getConfirmedIdentity(); - PeerState peer = new PeerState(_context, _transport); + PeerState peer = new PeerState(_context, _transport, + state.getSentIP(), state.getSentPort(), remote.calculateHash(), true); peer.setCurrentCipherKey(state.getCipherKey()); peer.setCurrentMACKey(state.getMACKey()); - peer.setCurrentReceiveSecond(now - (now % 1000)); - peer.setKeyEstablishedTime(now); - peer.setLastReceiveTime(now); - peer.setLastSendTime(now); - peer.setRemoteAddress(state.getSentIP(), state.getSentPort()); - peer.setRemotePeer(remote.calculateHash()); peer.setWeRelayToThemAs(state.getSentRelayTag()); - peer.setTheyRelayToUsAs(0); - peer.setInbound(); + // 0 is the default + //peer.setTheyRelayToUsAs(0); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handle completely established (inbound): " + state.getRemoteHostId().toString() @@ -547,17 +543,13 @@ class EstablishmentManager { long now = _context.clock().now(); RouterIdentity remote = state.getRemoteIdentity(); - PeerState peer = new PeerState(_context, _transport); + PeerState peer = new PeerState(_context, _transport, + state.getSentIP(), state.getSentPort(), remote.calculateHash(), false); peer.setCurrentCipherKey(state.getCipherKey()); peer.setCurrentMACKey(state.getMACKey()); - peer.setCurrentReceiveSecond(now - (now % 1000)); - peer.setKeyEstablishedTime(now); - peer.setLastReceiveTime(now); - peer.setLastSendTime(now); - peer.setRemoteAddress(state.getSentIP(), state.getSentPort()); - peer.setRemotePeer(remote.calculateHash()); peer.setTheyRelayToUsAs(state.getReceivedRelayTag()); - peer.setWeRelayToThemAs(0); + // 0 is the default + //peer.setWeRelayToThemAs(0); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handle completely established (outbound): " + state.getRemoteHostId().toString() @@ -598,6 +590,7 @@ class EstablishmentManager { _transport.send(m, peer); } + /** the relay tag is a 4-byte field in the protocol */ public static final long MAX_TAG_VALUE = 0xFFFFFFFFl; private void sendCreated(InboundEstablishState state) { @@ -610,8 +603,9 @@ class EstablishmentManager { // offer to relay // (perhaps we should check our bw usage and/or how many peers we are // already offering introducing?) - if (state.getSentRelayTag() < 0) { - state.setSentRelayTag(_context.random().nextLong(MAX_TAG_VALUE)); + if (state.getSentRelayTag() == 0) { + // ensure > 0 + state.setSentRelayTag(1 + _context.random().nextLong(MAX_TAG_VALUE)); } else { // don't change it, since we've already prepared our sig } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index a96c9d1cf4..7db9bc3feb 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -634,8 +634,7 @@ class PacketBuilder { // BUG: NPE here if null signature System.arraycopy(state.getSentSignature().getData(), 0, data, off, Signature.SIGNATURE_BYTES); - packet.getPacket().setLength(off + Signature.SIGNATURE_BYTES); - authenticate(packet, state.getCipherKey(), state.getMACKey()); + off += Signature.SIGNATURE_BYTES; } else { // nothing more to add beyond the identity fragment, though we can // pad here if we want. maybe randomized? @@ -644,9 +643,9 @@ class PacketBuilder { // TODO: why not random data? if ( (off % 16) != 0) off += 16 - (off % 16); - packet.getPacket().setLength(off); - authenticate(packet, state.getCipherKey(), state.getMACKey()); } + packet.getPacket().setLength(off); + authenticate(packet, state.getCipherKey(), state.getMACKey()); setTo(packet, to, state.getSentPort()); packet.setMessageType(TYPE_CONF); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index fdc8aa17cb..3cbfcac6bf 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -21,7 +21,7 @@ import net.i2p.util.ConcurrentHashSet; /** * Contain all of the state about a UDP connection to a peer. - * + * This is instantiated only after a connection is fully established. */ class PeerState { private final RouterContext _context; @@ -32,7 +32,7 @@ class PeerState { * receiving the connection this will be set only after the connection * is established. */ - private Hash _remotePeer; + private final Hash _remotePeer; /** * The AES key used to verify packets, set only after the connection is * established. @@ -118,10 +118,10 @@ class PeerState { private int _sendBytes; private int _receiveBps; private int _receiveBytes; - private int _sendACKBps; - private int _sendACKBytes; - private int _receiveACKBps; - private int _receiveACKBytes; + //private int _sendACKBps; + //private int _sendZACKBytes; + //private int _receiveACKBps; + //private int _receiveACKBytes; private long _receivePeriodBegin; private volatile long _lastCongestionOccurred; /** @@ -131,13 +131,14 @@ class PeerState { */ private volatile int _slowStartThreshold; /** what IP is the peer sending and receiving packets on? */ - private byte[] _remoteIP; + private final byte[] _remoteIP; /** cached IP address */ private transient InetAddress _remoteIPAddress; /** what port is the peer sending and receiving packets on? */ - private int _remotePort; + private final int _remotePort; /** cached RemoteHostId, used to find the peerState by remote info */ - private RemoteHostId _remoteHostId; + private final RemoteHostId _remoteHostId; + /** if we need to contact them, do we need to talk to an introducer? */ private boolean _remoteRequiresIntroduction; /** @@ -208,7 +209,7 @@ class PeerState { /** how many concurrency rejections have we had in a row */ private volatile int _consecutiveRejections = 0; /** is it inbound? **/ - private boolean _isInbound; + private final boolean _isInbound; /** Last time it was made an introducer **/ private long _lastIntroducerTime; @@ -248,23 +249,25 @@ class PeerState { /** override the default MTU */ private static final String PROP_DEFAULT_MTU = "i2np.udp.mtu"; - public PeerState(RouterContext ctx, UDPTransport transport) { + public PeerState(RouterContext ctx, UDPTransport transport, + byte[] remoteIP, int remotePort, Hash remotePeer, boolean isInbound) { _context = ctx; _log = ctx.logManager().getLog(PeerState.class); _transport = transport; - _keyEstablishedTime = -1; - _currentReceiveSecond = -1; - _lastSendTime = -1; - _lastReceiveTime = -1; + long now = ctx.clock().now(); + _keyEstablishedTime = now; + _currentReceiveSecond = now - (now % 1000); + _lastSendTime = now; + _lastReceiveTime = now; _currentACKs = new ConcurrentHashSet(); _currentACKsResend = new LinkedBlockingQueue(); _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES; _slowStartThreshold = MAX_SEND_WINDOW_BYTES/2; - _lastSendRefill = _context.clock().now(); - _receivePeriodBegin = _lastSendRefill; + _lastSendRefill = now; + _receivePeriodBegin = now; _lastCongestionOccurred = -1; - _remotePort = -1; + _remotePort = remotePort; _mtu = getDefaultMTU(); _mtuReceive = _mtu; //_mtuLastChecked = -1; @@ -275,6 +278,10 @@ class PeerState { _inboundMessages = new HashMap(8); _outboundMessages = new ArrayList(32); // all createRateStat() moved to EstablishmentManager + _remoteIP = remoteIP; + _remotePeer = remotePeer; + _isInbound = isInbound; + _remoteHostId = new RemoteHostId(remoteIP, remotePort); } private int getDefaultMTU() { @@ -303,17 +310,21 @@ class PeerState { * connection, or null if we are not in the process of rekeying. */ public SessionKey getNextMACKey() { return _nextMACKey; } + /** * The pending AES key for encrypting/decrypting packets if we are * rekeying the connection, or null if we are not in the process * of rekeying. */ public SessionKey getNextCipherKey() { return _nextCipherKey; } + /** * The keying material used for the rekeying, or null if we are not in * the process of rekeying. + * @deprecated unused */ public byte[] getNextKeyingMaterial() { return _nextKeyingMaterial; } + /** true if we began the current rekeying, false otherwise */ public boolean getRekeyBeganLocally() { return _rekeyBeganLocally; } /** when were the current cipher and MAC keys established/rekeyed? */ @@ -348,6 +359,10 @@ class PeerState { public int getSendWindowBytesRemaining() { return _sendWindowBytesRemaining; } /** what IP is the peer sending and receiving packets on? */ public byte[] getRemoteIP() { return _remoteIP; } + + /** + * @return may be null if IP is invalid + */ public InetAddress getRemoteIPAddress() { if (_remoteIPAddress == null) { try { @@ -359,21 +374,28 @@ class PeerState { } return _remoteIPAddress; } + /** what port is the peer sending and receiving packets on? */ public int getRemotePort() { return _remotePort; } + /** if we need to contact them, do we need to talk to an introducer? */ public boolean getRemoteRequiresIntroduction() { return _remoteRequiresIntroduction; } + /** * if we are serving as an introducer to them, this is the the tag that * they can publish that, when presented to us, will cause us to send * a relay introduction to the current peer + * @return 0 (no relay) if unset previously */ public long getWeRelayToThemAs() { return _weRelayToThemAs; } + /** * If they have offered to serve as an introducer to us, this is the tag * we can use to publish that fact. + * @return 0 (no relay) if unset previously */ public long getTheyRelayToUsAs() { return _theyRelayToUsAs; } + /** what is the largest packet we can send to the peer? */ public int getMTU() { return _mtu; } @@ -391,42 +413,50 @@ class PeerState { public long getMTUDecreases() { return _mtuDecreases; } ****/ - /** - * The peer are we talking to. This should be set as soon as this - * state is created if we are initiating a connection, but if we are - * receiving the connection this will be set only after the connection - * is established. - */ - public void setRemotePeer(Hash peer) { _remotePeer = peer; } /** * The AES key used to verify packets, set only after the connection is * established. */ public void setCurrentMACKey(SessionKey key) { _currentMACKey = key; } + /** * The AES key used to encrypt/decrypt packets, set only after the * connection is established. */ public void setCurrentCipherKey(SessionKey key) { _currentCipherKey = key; } + /** * The pending AES key for verifying packets if we are rekeying the * connection, or null if we are not in the process of rekeying. + * @deprecated unused */ public void setNextMACKey(SessionKey key) { _nextMACKey = key; } + /** * The pending AES key for encrypting/decrypting packets if we are * rekeying the connection, or null if we are not in the process * of rekeying. + * @deprecated unused */ public void setNextCipherKey(SessionKey key) { _nextCipherKey = key; } + /** * The keying material used for the rekeying, or null if we are not in * the process of rekeying. + * @deprecated unused */ public void setNextKeyingMaterial(byte data[]) { _nextKeyingMaterial = data; } - /** true if we began the current rekeying, false otherwise */ + + /** + * @param local true if we began the current rekeying, false otherwise + * @deprecated unused + */ public void setRekeyBeganLocally(boolean local) { _rekeyBeganLocally = local; } - /** when were the current cipher and MAC keys established/rekeyed? */ + + /** + * when were the current cipher and MAC keys established/rekeyed? + * @deprecated unused + */ public void setKeyEstablishedTime(long when) { _keyEstablishedTime = when; } /** @@ -469,8 +499,8 @@ class PeerState { } /** how fast we are sending *ack* packets */ - public int getSendACKBps() { return _sendACKBps; } - public int getReceiveACKBps() { return _receiveACKBps; } + //public int getSendACKBps() { return _sendACKBps; } + //public int getReceiveACKBps() { return _receiveACKBps; } /** * have all of the packets received in the current second requested that @@ -498,12 +528,12 @@ class PeerState { _sendWindowBytesRemaining = _sendWindowBytes; _sendBytes += size; _sendBps = (int)(0.9f*(float)_sendBps + 0.1f*((float)_sendBytes * (1000f/(float)duration))); - if (isForACK) { - _sendACKBytes += size; - _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration))); - } + //if (isForACK) { + // _sendACKBytes += size; + // _sendACKBps = (int)(0.9f*(float)_sendACKBps + 0.1f*((float)_sendACKBytes * (1000f/(float)duration))); + //} _sendBytes = 0; - _sendACKBytes = 0; + //_sendACKBytes = 0; _lastSendRefill = now; } //if (true) return true; @@ -522,32 +552,29 @@ class PeerState { _sendWindowBytesRemaining -= size; _sendBytes += size; _lastSendTime = now; - if (isForACK) - _sendACKBytes += size; + //if (isForACK) + // _sendACKBytes += size; return true; } else { return false; } } - /** what IP+port is the peer sending and receiving packets on? */ - public void setRemoteAddress(byte ip[], int port) { - _remoteIP = ip; - _remoteIPAddress = null; - _remotePort = port; - _remoteHostId = new RemoteHostId(ip, port); - } /** if we need to contact them, do we need to talk to an introducer? */ public void setRemoteRequiresIntroduction(boolean required) { _remoteRequiresIntroduction = required; } + /** * if we are serving as an introducer to them, this is the the tag that * they can publish that, when presented to us, will cause us to send * a relay introduction to the current peer + * @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled */ public void setWeRelayToThemAs(long tag) { _weRelayToThemAs = tag; } + /** * If they have offered to serve as an introducer to us, this is the tag * we can use to publish that fact. + * @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled */ public void setTheyRelayToUsAs(long tag) { _theyRelayToUsAs = tag; } @@ -564,7 +591,6 @@ class PeerState { public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; } public int getConsecutiveSendRejections() { return _consecutiveRejections; } public boolean isInbound() { return _isInbound; } - public void setInbound() { _isInbound = true; } public long getIntroducerTime() { return _lastIntroducerTime; } public void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); } @@ -573,8 +599,8 @@ class PeerState { public void messageFullyReceived(Long messageId, int bytes, boolean isForACK) { if (bytes > 0) { _receiveBytes += bytes; - if (isForACK) - _receiveACKBytes += bytes; + //if (isForACK) + // _receiveACKBytes += bytes; } else { if (true || _retransmissionPeriodStart + 1000 < _context.clock().now()) { _packetsReceivedDuplicate++; @@ -588,9 +614,9 @@ class PeerState { long duration = now - _receivePeriodBegin; if (duration >= 1000) { _receiveBps = (int)(0.9f*(float)_receiveBps + 0.1f*((float)_receiveBytes * (1000f/(float)duration))); - if (isForACK) - _receiveACKBps = (int)(0.9f*(float)_receiveACKBps + 0.1f*((float)_receiveACKBytes * (1000f/(float)duration))); - _receiveACKBytes = 0; + //if (isForACK) + // _receiveACKBps = (int)(0.9f*(float)_receiveACKBps + 0.1f*((float)_receiveACKBytes * (1000f/(float)duration))); + //_receiveACKBytes = 0; _receiveBytes = 0; _receivePeriodBegin = now; _context.statManager().addRateData("udp.receiveBps", _receiveBps, 0); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index 6421ed93ef..2455da2861 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -189,7 +189,7 @@ class UDPPacketReader { return (int)DataHelper.fromLong(_message, offset, 2); } - /** write out the 4 byte relayAs tag */ + /** read in the 4 byte relayAs tag */ public long readRelayTag() { int offset = readBodyOffset() + Y_LENGTH + 1 + readIPSize() + 2; return DataHelper.fromLong(_message, offset, 4); -- GitLab