diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 339e41caa5bbeabc1ea097c9ba48319f03ee38fe..a9708be19f82e17b3a47ff1deb0f5710e4868f5f 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -751,7 +751,7 @@ class EstablishmentManager { int port = reader.getRelayResponseReader().readCharliePort(); if (_log.shouldLog(Log.INFO)) _log.info("Received relay intro for " + state.getRemoteIdentity().calculateHash().toBase64() + " - they are on " - + addr.toString() + ":" + port + " (according to " + bob.toString(true) + ")"); + + addr.toString() + ":" + port + " (according to " + bob + ")"); RemoteHostId oldId = state.getRemoteHostId(); state.introduced(addr, ip, port); _outboundStates.remove(oldId); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 484e302a5660d9e5b09903ac0ad503e902a676b3..b45444379f21afd968a44554e79d1f3e6da2dad2 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -51,9 +51,8 @@ class OutboundMessageFragments { */ private boolean _isWaiting; - private boolean _alive; + private volatile boolean _alive; private final PacketBuilder _builder; - private long _lastCycleTime = System.currentTimeMillis(); /** if we can handle more messages explicitly, set this to true */ // private boolean _allowExcess; // LINT not used?? @@ -205,8 +204,6 @@ class OutboundMessageFragments { if (added) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Add a new message to a new peer " + peer.getRemotePeer().toBase64()); - if (wasEmpty) - _lastCycleTime = System.currentTimeMillis(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Add a new message to an existing peer " + peer.getRemotePeer().toBase64()); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 064fe4c403e5a5aba7a6f7331ca67837a53425aa..d2039559b82e55c14ba2ce4ce366e583c51e1ee7 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -28,7 +28,7 @@ class PacketHandler { private final InboundMessageFragments _inbound; private final PeerTestManager _testManager; private final IntroductionManager _introManager; - private boolean _keepReading; + private volatile boolean _keepReading; private final Handler[] _handlers; private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB @@ -73,11 +73,11 @@ class PacketHandler { _context.statManager().createRateStat("udp.droppedInvalidEstablish.new", "How old the packet we dropped due to invalidity (even though we do not have any active establishment with the peer) was", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", UDPTransport.RATES); - _context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.packetDequeueTime", "How long it takes the UDPReader to pull a packet off the inbound packet queue (when its slow)", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.packetVerifyTime", "How long it takes the PacketHandler to verify a data packet after dequeueing (period is dequeue time)", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.packetVerifyTimeSlow", "How long it takes the PacketHandler to verify a data packet after dequeueing when its slow (period is dequeue time)", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.packetValidateMultipleCount", "How many times we validate a packet, if done more than once (period = afterValidate-enqueue)", "udp", UDPTransport.RATES); + //_context.statManager().createRateStat("udp.packetNoValidationLifetime", "How long packets that are never validated are around for", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSize.sessionRequest", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSize.sessionConfirmed", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES); _context.statManager().createRateStat("udp.receivePacketSize.sessionCreated", "Packet size of the given inbound packet type (period is the packet's lifetime)", "udp", UDPTransport.RATES); @@ -158,7 +158,7 @@ class PacketHandler { _log.error("Crazy error handling a packet: " + packet, e); } long handleTime = _context.clock().now() - handleStart; - packet.afterHandling(); + //packet.afterHandling(); _context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime()); _context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime()); _state = 8; @@ -166,6 +166,7 @@ class PacketHandler { if (_log.shouldLog(Log.INFO)) _log.info("Done receiving the packet " + packet); + /******** if (handleTime > 1000) { if (_log.shouldLog(Log.WARN)) _log.warn("Took " + handleTime + " to process the packet " @@ -198,6 +199,7 @@ class PacketHandler { _context.statManager().addRateData("udp.packetValidateMultipleCount", validateCount, timeToValidate); else if (validateCount <= 0) _context.statManager().addRateData("udp.packetNoValidationLifetime", packet.getLifetime(), 0); + ********/ // back to the cache with thee! packet.release(); @@ -211,7 +213,6 @@ class PacketHandler { * Find the state and call the correct receivePacket() variant */ private void handlePacket(UDPPacketReader reader, UDPPacket packet) { - if (packet == null) return; _state = 10; @@ -538,7 +539,7 @@ class PacketHandler { msg.append(": ").append(dr.toString()); _log.info(msg.toString()); } - packet.beforeReceiveFragments(); + //packet.beforeReceiveFragments(); _inbound.receiveData(state, dr); _context.statManager().addRateData("udp.receivePacketSize.dataKnown", packet.getPacket().getLength(), packet.getLifetime()); if (dr.readFragmentCount() <= 0) diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java index 752c5d37bebabdd780b0a8e6621fa946c2033957..b7544467b33356fd742d37c50cba9bc3f138f33d 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java @@ -13,6 +13,7 @@ import net.i2p.data.RouterInfo; import net.i2p.data.SessionKey; import net.i2p.router.CommSystemFacade; import net.i2p.router.RouterContext; +import net.i2p.util.Addresses; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; @@ -446,7 +447,7 @@ class PeerTestManager { // initiated test } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("We are charlie, as the testIP/port is " + RemoteHostId.toString(testIP) + ":" + testPort + " and the state is unknown for " + nonce); + _log.debug("We are charlie, as the testIP/port is " + Addresses.toString(testIP, testPort) + " and the state is unknown for " + nonce); // we are charlie, since alice never sends us her IP and port, only bob does (and, // erm, we're not alice, since it isn't our nonce) receiveFromBobAsCharlie(from, testInfo, nonce, null); diff --git a/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java b/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java index 837a50268e62d94a68855eb59bf7232498a4f57b..399b82eec81b29552b29a65a9e597ed1e7118213 100644 --- a/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java +++ b/router/java/src/net/i2p/router/transport/udp/RemoteHostId.java @@ -2,33 +2,49 @@ package net.i2p.router.transport.udp; import net.i2p.data.Base64; import net.i2p.data.DataHelper; +import net.i2p.util.Addresses; /** * Unique ID for a peer - its IP + port, all bundled into a tidy obj. - * If the remote peer is not reachabe through an IP+port, this contains + * If the remote peer is not reachable through an IP+port, this contains * the hash of their identity. * */ final class RemoteHostId { - private byte _ip[]; - private int _port; - private byte _peerHash[]; + private final byte _ip[]; + private final int _port; + private final byte _peerHash[]; + private final int _hashCode; + /** direct */ public RemoteHostId(byte ip[], int port) { - _ip = ip; - _port = port; + this(ip, port, null); } + + /** indirect */ public RemoteHostId(byte peerHash[]) { - _peerHash = peerHash; + this(null, 0, peerHash); } + private RemoteHostId(byte ip[], int port, byte peerHash[]) { + _ip = ip; + _port = port; + _peerHash = peerHash; + _hashCode = DataHelper.hashCode(_ip) ^ DataHelper.hashCode(_peerHash) ^ _port; + } + + /** @return null if indirect */ public byte[] getIP() { return _ip; } + + /** @return 0 if indirect */ public int getPort() { return _port; } + + /** @return null if direct */ public byte[] getPeerHash() { return _peerHash; } @Override public int hashCode() { - return DataHelper.hashCode(_ip) ^ DataHelper.hashCode(_peerHash) ^ _port; + return _hashCode; } @Override @@ -38,29 +54,22 @@ final class RemoteHostId { if (!(obj instanceof RemoteHostId)) return false; RemoteHostId id = (RemoteHostId)obj; - return (_port == id.getPort()) && DataHelper.eq(_ip, id.getIP()) && DataHelper.eq(_peerHash, id.getPeerHash()); + return (_port == id._port) && DataHelper.eq(_ip, id._ip) && DataHelper.eq(_peerHash, id._peerHash); } @Override public String toString() { return toString(true); } - public String toString(boolean includePort) { + + private String toString(boolean includePort) { if (_ip != null) { if (includePort) - return toString(_ip) + ':' + _port; + return Addresses.toString(_ip, _port); else - return toString(_ip); + return Addresses.toString(_ip); } else { return Base64.encode(_peerHash); } } - public static String toString(byte ip[]) { - StringBuilder buf = new StringBuilder(ip.length+5); - for (int i = 0; i < ip.length; i++) { - buf.append(ip[i]&0xFF); - if (i + 1 < ip.length) - buf.append('.'); - } - return buf.toString(); - } + public String toHostString() { return toString(false); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 6017ebcf8eb44ed2a956d622d8ddafb9e44ea6d7..9531335b42a19172d0c2a0cbc966f21994c40979 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -12,14 +12,14 @@ import net.i2p.util.Log; * UDPReceiver */ class UDPEndpoint { - private RouterContext _context; - private Log _log; + private final RouterContext _context; + private final Log _log; private int _listenPort; - private UDPTransport _transport; + private final UDPTransport _transport; private UDPSender _sender; private UDPReceiver _receiver; private DatagramSocket _socket; - private InetAddress _bindAddress; + private final InetAddress _bindAddress; /** * @param listenPort -1 or the requested port, may not be honored @@ -34,7 +34,7 @@ class UDPEndpoint { } /** caller should call getListenPort() after this to get the actual bound port and determine success */ - public void startup() { + public synchronized void startup() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up the UDP endpoint"); shutdown(); @@ -49,7 +49,7 @@ class UDPEndpoint { _receiver.startup(); } - public void shutdown() { + public synchronized void shutdown() { if (_sender != null) { _sender.shutdown(); _receiver.shutdown(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index eb4c3e23c50d28d2aa029c8c2a968d0c2a6ece18..d20af43387b3000a35c2c97cc3c5403e546a32f2 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -32,13 +32,14 @@ class UDPPacket { private volatile Exception _acquiredBy; private long _enqueueTime; private long _receivedTime; - private long _beforeValidate; - private long _afterValidate; - private long _beforeReceiveFragments; - private long _afterHandlingTime; + //private long _beforeValidate; + //private long _afterValidate; + //private long _beforeReceiveFragments; + //private long _afterHandlingTime; private int _validateCount; // private boolean _isInbound; + // Warning - this mixes contexts in a multi-router JVM private static final Queue<UDPPacket> _packetCache; private static final boolean CACHE = true; private static final int CACHE_SIZE = 64; @@ -92,7 +93,7 @@ class UDPPacket { private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE; private UDPPacket(I2PAppContext ctx) { - ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES); + //ctx.statManager().createRateStat("udp.fetchRemoteSlow", "How long it takes to grab the remote ip info", "udp", UDPTransport.RATES); // the data buffer is clobbered on init(..), but we need it to bootstrap _data = new byte[MAX_PACKET_SIZE]; _packet = new DatagramPacket(_data, MAX_PACKET_SIZE); @@ -134,7 +135,7 @@ class UDPPacket { public short getPriority() { verifyNotReleased(); return _priority; } public long getExpiration() { verifyNotReleased(); return _expiration; } public long getBegin() { verifyNotReleased(); return _initializeTime; } - public long getLifetime() { verifyNotReleased(); return _context.clock().now() - _initializeTime; } + public long getLifetime() { /** verifyNotReleased(); */ return _context.clock().now() - _initializeTime; } public void resetBegin() { _initializeTime = _context.clock().now(); } /** flag this packet as a particular type for accounting purposes */ public void markType(int type) { verifyNotReleased(); _markedType = type; } @@ -156,14 +157,14 @@ class UDPPacket { RemoteHostId getRemoteHost() { if (_remoteHost == null) { - long before = System.currentTimeMillis(); + //long before = System.currentTimeMillis(); InetAddress addr = _packet.getAddress(); byte ip[] = addr.getAddress(); int port = _packet.getPort(); _remoteHost = new RemoteHostId(ip, port); - long timeToFetch = System.currentTimeMillis() - before; - if (timeToFetch > 50) - _context.statManager().addRateData("udp.fetchRemoteSlow", timeToFetch, getLifetime()); + //long timeToFetch = System.currentTimeMillis() - before; + //if (timeToFetch > 50) + // _context.statManager().addRateData("udp.fetchRemoteSlow", timeToFetch, getLifetime()); } return _remoteHost; } @@ -175,7 +176,7 @@ class UDPPacket { */ public boolean validate(SessionKey macKey) { verifyNotReleased(); - _beforeValidate = _context.clock().now(); + //_beforeValidate = _context.clock().now(); boolean eq = false; Arrays.fill(_validateBuf, (byte)0); @@ -216,7 +217,7 @@ class UDPPacket { // _log.warn("Payload length is " + payloadLength); } - _afterValidate = _context.clock().now(); + //_afterValidate = _context.clock().now(); _validateCount++; return eq; } @@ -238,30 +239,35 @@ class UDPPacket { void enqueue() { _enqueueTime = _context.clock().now(); } /** a packet handler has pulled it off the inbound queue */ void received() { _receivedTime = _context.clock().now(); } + /** a packet handler has decrypted and verified the packet and is about to parse out the good bits */ - void beforeReceiveFragments() { _beforeReceiveFragments = _context.clock().now(); } + //void beforeReceiveFragments() { _beforeReceiveFragments = _context.clock().now(); } /** a packet handler has finished parsing out the good bits */ - void afterHandling() { _afterHandlingTime = _context.clock().now(); } + //void afterHandling() { _afterHandlingTime = _context.clock().now(); } /** the UDPReceiver has tossed it onto the inbound queue */ - long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); } + //long getTimeSinceEnqueue() { return (_enqueueTime > 0 ? _context.clock().now() - _enqueueTime : 0); } + /** a packet handler has pulled it off the inbound queue */ long getTimeSinceReceived() { return (_receivedTime > 0 ? _context.clock().now() - _receivedTime : 0); } + /** a packet handler has decrypted and verified the packet and is about to parse out the good bits */ - long getTimeSinceReceiveFragments() { return (_beforeReceiveFragments > 0 ? _context.clock().now() - _beforeReceiveFragments : 0); } + //long getTimeSinceReceiveFragments() { return (_beforeReceiveFragments > 0 ? _context.clock().now() - _beforeReceiveFragments : 0); } /** a packet handler has finished parsing out the good bits */ - long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); } + //long getTimeSinceHandling() { return (_afterHandlingTime > 0 ? _context.clock().now() - _afterHandlingTime : 0); } + // Following 5: All used only for stats in PacketHandler, commented out + /** when it was added to the endpoint's receive queue */ - long getEnqueueTime() { return _enqueueTime; } + //long getEnqueueTime() { return _enqueueTime; } /** when it was pulled off the endpoint receive queue */ - long getReceivedTime() { return _receivedTime; } + //long getReceivedTime() { return _receivedTime; } /** when we began validate() */ - long getBeforeValidate() { return _beforeValidate; } + //long getBeforeValidate() { return _beforeValidate; } /** when we finished validate() */ - long getAfterValidate() { return _afterValidate; } + //long getAfterValidate() { return _afterValidate; } /** how many times we tried to validate the packet */ - int getValidateCount() { return _validateCount; } + //int getValidateCount() { return _validateCount; } @Override public String toString() { @@ -278,8 +284,8 @@ class UDPPacket { buf.append(" sinceEnqueued=").append((_enqueueTime > 0 ? _context.clock().now()-_enqueueTime : -1)); buf.append(" sinceReceived=").append((_receivedTime > 0 ? _context.clock().now()-_receivedTime : -1)); - buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1)); - buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1)); + //buf.append(" beforeReceiveFragments=").append((_beforeReceiveFragments > 0 ? _context.clock().now()-_beforeReceiveFragments : -1)); + //buf.append(" sinceHandled=").append((_afterHandlingTime > 0 ? _context.clock().now()-_afterHandlingTime : -1)); //buf.append("\ndata=").append(Base64.encode(_packet.getData(), _packet.getOffset(), _packet.getLength())); return buf.toString(); } @@ -316,13 +322,22 @@ class UDPPacket { _packetCache.offer(this); } + /** + * Call at shutdown/startup to not hold ctx refs + * @since 0.9.2 + */ + public static void clearCache() { + if (CACHE) + _packetCache.clear(); + } + private void verifyNotReleased() { - if (CACHE) return; + if (!CACHE) return; if (_released) { - Log log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); - log.log(Log.CRIT, "Already released. current stack trace is:", new Exception()); - log.log(Log.CRIT, "Released by: ", _releasedBy); - log.log(Log.CRIT, "Acquired by: ", _acquiredBy); + Log log = _context.logManager().getLog(UDPPacket.class); + log.error("Already released", new Exception()); + //log.log(Log.CRIT, "Released by: ", _releasedBy); + //log.log(Log.CRIT, "Acquired by: ", _acquiredBy); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 1491030bb6f0c500b888cb15c7ba493cf06f6326..46db0aca32a976fdaab93b8bcf62d96e12cd8881 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -22,10 +22,10 @@ import net.i2p.util.SimpleTimer; class UDPReceiver { private final RouterContext _context; private final Log _log; - private DatagramSocket _socket; + private final DatagramSocket _socket; private String _name; private final BlockingQueue<UDPPacket> _inboundQueue; - private boolean _keepRunning; + private volatile boolean _keepRunning; private final Runner _runner; private final UDPTransport _transport; private static int __id; @@ -90,9 +90,11 @@ class UDPReceiver { * NOTE: this closes the old socket so that blocking calls unblock! * */ +/********* public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { return _runner.updateListeningPort(socket, newPort); } +**********/ /** if a packet been sitting in the queue for a full second (meaning the handlers are overwhelmed), drop subsequent packets */ private static final long MAX_QUEUE_PERIOD = 2*1000; @@ -215,26 +217,27 @@ class UDPReceiver { } private class Runner implements Runnable { - private boolean _socketChanged; + //private volatile boolean _socketChanged; + public void run() { - _socketChanged = false; + //_socketChanged = false; FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); while (_keepRunning) { - if (_socketChanged) { - Thread.currentThread().setName(_name + "." + _id); - _socketChanged = false; - } + //if (_socketChanged) { + // Thread.currentThread().setName(_name + "." + _id); + // _socketChanged = false; + //} UDPPacket packet = UDPPacket.acquire(_context, true); // block before we read... - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Before throttling receive"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Before throttling receive"); while (!_context.throttle().acceptNetworkMessage()) try { Thread.sleep(10); } catch (InterruptedException ie) {} try { - if (_log.shouldLog(Log.INFO)) - _log.info("Before blocking socket.receive on " + System.identityHashCode(packet)); + //if (_log.shouldLog(Log.INFO)) + // _log.info("Before blocking socket.receive on " + System.identityHashCode(packet)); synchronized (Runner.this) { _socket.receive(packet.getPacket()); } @@ -263,15 +266,16 @@ class UDPReceiver { // nat hole punch packets are 0 bytes if (_log.shouldLog(Log.INFO)) _log.info("Received a 0 byte udp packet from " + packet.getPacket().getAddress() + ":" + packet.getPacket().getPort()); + packet.release(); } } catch (IOException ioe) { - if (_socketChanged) { - if (_log.shouldLog(Log.INFO)) - _log.info("Changing ports..."); - } else { + //if (_socketChanged) { + // if (_log.shouldLog(Log.INFO)) + // _log.info("Changing ports..."); + //} else { if (_log.shouldLog(Log.WARN)) _log.warn("Error receiving", ioe); - } + //} packet.release(); } } @@ -279,6 +283,7 @@ class UDPReceiver { _log.debug("Stop receiving..."); } + /****** public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { _name = "UDPReceive on " + newPort; DatagramSocket old = null; @@ -291,6 +296,6 @@ class UDPReceiver { old.close(); return old; } + *****/ } - } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index b431ab4c50c2d6e34e873be0f68d31ca45d0eed4..202609fab3f83a524cc0ae1eae4b4a0af9879beb 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -18,10 +18,10 @@ import net.i2p.util.Log; class UDPSender { private final RouterContext _context; private final Log _log; - private DatagramSocket _socket; + private final DatagramSocket _socket; private String _name; private final BlockingQueue<UDPPacket> _outboundQueue; - private boolean _keepRunning; + private volatile boolean _keepRunning; private final Runner _runner; private static final int TYPE_POISON = 99999; @@ -81,9 +81,11 @@ class UDPSender { _outboundQueue.clear(); } +/********* public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { return _runner.updateListeningPort(socket, newPort); } +**********/ /** @@ -172,17 +174,18 @@ class UDPSender { } private class Runner implements Runnable { - private boolean _socketChanged; + //private volatile boolean _socketChanged; + FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); public void run() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Running the UDP sender"); - _socketChanged = false; + //_socketChanged = false; while (_keepRunning) { - if (_socketChanged) { - Thread.currentThread().setName(_name); - _socketChanged = false; - } + //if (_socketChanged) { + // Thread.currentThread().setName(_name); + // _socketChanged = false; + //} UDPPacket packet = getNextPacket(); if (packet != null) { @@ -200,11 +203,11 @@ class UDPSender { long afterBW = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) { + //if (_log.shouldLog(Log.DEBUG)) { //if (len > 128) // len = 128; //_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size)); - } + //} if (packet.getMessageType() >= PacketBuilder.TYPE_FIRST) _context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); @@ -216,11 +219,11 @@ class UDPSender { // synchronization lets us update safely //_log.debug("Break out datagram for " + packet); DatagramPacket dp = packet.getPacket(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Just before socket.send of " + packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Just before socket.send of " + packet); _socket.send(dp); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Just after socket.send of " + packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Just after socket.send of " + packet); } long sendTime = _context.clock().now() - before; _context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime()); @@ -251,8 +254,10 @@ class UDPSender { private UDPPacket getNextPacket() { UDPPacket packet = null; while ( (_keepRunning) && (packet == null || packet.getLifetime() > MAX_HEAD_LIFETIME) ) { - if (packet != null) + if (packet != null) { _context.statManager().addRateData("udp.sendQueueTrimmed", 1, 0); + packet.release(); + } try { packet = _outboundQueue.take(); } catch (InterruptedException ie) {} @@ -261,6 +266,8 @@ class UDPSender { } return packet; } + + /****** public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) { _name = "UDPSend on " + newPort; DatagramSocket old = null; @@ -271,5 +278,6 @@ class UDPSender { _socketChanged = true; return old; } + *****/ } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index ef1e381cf5af11539ca39074736eda9fbd371e8e..2eab61fce8a8771ef454ebc11770b0ac63d36bd3 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -247,6 +247,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_flooder != null) _flooder.shutdown(); _introManager.reset(); + UDPPacket.clearCache(); _introKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]); System.arraycopy(_context.routerHash().getData(), 0, _introKey.getData(), 0, SessionKey.KEYSIZE_BYTES); @@ -379,6 +380,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _peersByIdent.clear(); _dropList.clear(); _introManager.reset(); + UDPPacket.clearCache(); } /** @@ -421,9 +423,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ @Override public void externalAddressReceived(String source, byte[] ip, int port) { - String s = RemoteHostId.toString(ip); if (_log.shouldLog(Log.WARN)) - _log.warn("Received address: " + s + " port: " + port + " from: " + source); + _log.warn("Received address: " + Addresses.toString(ip, port) + " from: " + source); if (explicitAddressSpecified()) return; String sources = _context.getProperty(PROP_SOURCES, DEFAULT_SOURCES);