From 734444e183bcbe7ac3238d9a73617f8423530546 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 24 Jul 2011 18:57:51 +0000 Subject: [PATCH] * UDP: - Implement destroy message sending (receiving was implemented in 0.8.1) - More cleanups at shutdown - Log tweaks --- .../transport/udp/EstablishmentManager.java | 3 ++ .../router/transport/udp/PacketBuilder.java | 2 + .../router/transport/udp/PacketHandler.java | 12 ++--- .../i2p/router/transport/udp/PeerState.java | 8 +-- .../i2p/router/transport/udp/UDPEndpoint.java | 2 +- .../i2p/router/transport/udp/UDPSender.java | 2 +- .../router/transport/udp/UDPTransport.java | 54 ++++++++++++++++++- 7 files changed, 69 insertions(+), 14 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 55b0c60742..5d5021b597 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -331,6 +331,7 @@ class EstablishmentManager { /** * Got a SessionDestroy on an established conn + * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from, PeerState state) { if (_log.shouldLog(Log.DEBUG)) @@ -340,6 +341,7 @@ class EstablishmentManager { /** * Got a SessionDestroy during outbound establish + * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from, OutboundEstablishState state) { if (_log.shouldLog(Log.DEBUG)) @@ -351,6 +353,7 @@ class EstablishmentManager { /** * Got a SessionDestroy - maybe after an inbound establish + * @since 0.8.1 */ void receiveSessionDestroy(RemoteHostId from) { if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index 441c5cb7fd..a96c9d1cf4 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -656,6 +656,8 @@ class PacketBuilder { /** * Build a destroy packet, which contains a header but no body. + * Session must be established or this will NPE in authenticate(). + * Unused until 0.8.9. * * @since 0.8.1 */ diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 1355f09e90..972632077a 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -372,8 +372,8 @@ class PacketHandler { if (state.getMACKey() != null) { isValid = packet.validate(state.getMACKey()); if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for inbound con: " + packet); + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received for inbound con: " + packet); _state = 32; packet.decrypt(state.getCipherKey()); @@ -418,8 +418,8 @@ class PacketHandler { _state = 36; isValid = packet.validate(state.getMACKey()); if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for outbound established con: " + packet); + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received for outbound established con: " + packet); _state = 37; packet.decrypt(state.getCipherKey()); @@ -432,8 +432,8 @@ class PacketHandler { // keys not yet exchanged, lets try it with the peer's intro key isValid = packet.validate(state.getIntroKey()); if (isValid) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); + if (_log.shouldLog(Log.INFO)) + _log.info("Valid introduction packet received for outbound established con with old intro key: " + packet); _state = 39; packet.decrypt(state.getIntroKey()); handlePacket(reader, packet, null, state, null); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index dbfa8263cf..fb42ea023d 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1380,14 +1380,14 @@ class PeerState { _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); //if (state.getMessage() != null) // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); - if (_log.shouldLog(Log.WARN)) - _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + if (_log.shouldLog(Log.INFO)) + _log.info("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + " available=" + getSendWindowBytesRemaining() + " for message " + state.getMessageId() + ": " + state); state.setNextSendTime(now + (ACKSender.ACK_FREQUENCY / 2) + _context.random().nextInt(ACKSender.ACK_FREQUENCY)); //(now + 1024) & ~SECOND_MASK); - if (_log.shouldLog(Log.WARN)) - _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); + if (_log.shouldLog(Log.INFO)) + _log.info("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); //_throttle.choke(peer.getRemotePeer()); //if (state.getMessage() != null) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 1773095125..6017ebcf8e 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -137,7 +137,7 @@ class UDPEndpoint { * Add the packet to the outobund queue to be sent ASAP (as allowed by * the bandwidth limiter) * - * @return number of packets in the send queue + * @return ZERO (used to be number of packets in the queue) */ public int send(UDPPacket packet) { if (_sender == null) diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index bd6bb204c8..b39e78f453 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -91,7 +91,7 @@ class UDPSender { * available, if requested, otherwise it returns immediately * * @param blockTime how long to block IGNORED - * @return number of packets queued + * @return ZERO (used to be number of packets in the queue) * @deprecated use add(packet) */ public int add(UDPPacket packet, int blockTime) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index fbb1e68066..ec3e69482c 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -64,6 +64,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private final IntroductionManager _introManager; private final ExpirePeerEvent _expireEvent; private final PeerTestEvent _testEvent; + private final PacketBuilder _destroyBuilder; private short _reachabilityStatus; private long _reachabilityStatusLastUpdated; private long _introducersSelectedOn; @@ -200,6 +201,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _cachedBid[i] = new SharedBid(BID_VALUES[i]); } + _destroyBuilder = new PacketBuilder(_context, this); _fragments = new OutboundMessageFragments(_context, this, _activeThrottle); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); if (SHOULD_FLOOD_PEERS) @@ -337,6 +339,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } public void shutdown() { + destroyAll(); if (_endpoint != null) _endpoint.shutdown(); if (_flooder != null) @@ -353,6 +356,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _inboundFragments.shutdown(); _expireEvent.setIsAlive(false); _testEvent.setIsAlive(false); + _peersByRemoteHost.clear(); + _peersByIdent.clear(); + _dropList.clear(); + _introManager.reset(); } /** @@ -1011,12 +1018,53 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ } + /** + * This sends it directly out, bypassing OutboundMessageFragments + * and the PacketPusher. The only queueing is for the bandwidth limiter. + * + * @return ZERO (used to be number of packets in the queue) + */ int send(UDPPacket packet) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending packet " + packet); return _endpoint.send(packet); } + /** + * Send a session destroy message, bypassing OMF and PacketPusher. + * + * @since 0.8.9 + */ + private void sendDestroy(PeerState peer) { + // peer must be fully established + if (peer.getCurrentCipherKey() == null) + return; + UDPPacket pkt = _destroyBuilder.buildSessionDestroyPacket(peer); + if (_log.shouldLog(Log.WARN)) + _log.warn("Sending destroy to : " + peer); + send(pkt); + } + + /** + * Send a session destroy message to everybody + * + * @since 0.8.9 + */ + private void destroyAll() { + int howMany = _peersByIdent.size(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Sending destroy to : " + howMany + " peers"); + for (PeerState peer : _peersByIdent.values()) { + sendDestroy(peer); + } + int toSleep = Math.min(howMany / 3, 750); + if (toSleep > 0) { + try { + Thread.sleep(toSleep); + } catch (InterruptedException ie) {} + } + } + /** minimum active peers to maintain IP detection, etc. */ private static final int MIN_PEERS = 3; /** minimum peers volunteering to be introducers if we need that */ @@ -2236,8 +2284,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } - for (int i = 0; i < _expireBuffer.size(); i++) - dropPeer(_expireBuffer.get(i), false, "idle too long"); + for (PeerState peer : _expireBuffer) { + sendDestroy(peer); + dropPeer(peer, false, "idle too long"); + } _expireBuffer.clear(); if (_alive) -- GitLab