diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 96ceec9c0..e901ac859 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -3,9 +3,11 @@ package net.i2p.router.transport.udp; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import net.i2p.router.Router; import net.i2p.router.RouterContext; +import net.i2p.router.util.CoDelBlockingQueue; import net.i2p.data.DataHelper; import net.i2p.util.I2PThread; import net.i2p.util.LHMCache; @@ -26,7 +28,6 @@ class PacketHandler { private final RouterContext _context; private final Log _log; private final UDPTransport _transport; - private final UDPEndpoint _endpoint; private final EstablishmentManager _establisher; private final InboundMessageFragments _inbound; private final PeerTestManager _testManager; @@ -34,19 +35,22 @@ class PacketHandler { private volatile boolean _keepReading; private final Handler[] _handlers; private final Map _failCache; + private final BlockingQueue _inboundQueue; private static final Object DUMMY = new Object(); + private static final int TYPE_POISON = -99999; + private static final int MIN_QUEUE_SIZE = 16; + private static final int MAX_QUEUE_SIZE = 192; private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB private static final int MAX_NUM_HANDLERS = 1; /** let packets be up to 30s slow */ private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000; - PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, + PacketHandler(RouterContext ctx, UDPTransport transport, EstablishmentManager establisher, InboundMessageFragments inbound, PeerTestManager testManager, IntroductionManager introManager) { _context = ctx; _log = ctx.logManager().getLog(PacketHandler.class); _transport = transport; - _endpoint = endpoint; _establisher = establisher; _inbound = inbound; _testManager = testManager; @@ -56,6 +60,8 @@ class PacketHandler { long maxMemory = Runtime.getRuntime().maxMemory(); if (maxMemory == Long.MAX_VALUE) maxMemory = 96*1024*1024l; + int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024))); + _inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize); int num_handlers; if (maxMemory < 32*1024*1024) num_handlers = 1; @@ -107,6 +113,7 @@ class PacketHandler { public synchronized void shutdown() { _keepReading = false; + stopQueue(); } String getHandlerStatus() { @@ -119,9 +126,55 @@ class PacketHandler { return rv.toString(); } - /** @since 0.8.8 */ - int getHandlerCount() { - return _handlers.length; + /** + * Blocking call to retrieve the next inbound packet, or null if we have + * shut down. + * + * @since IPv6 moved from UDPReceiver + */ + public void queueReceived(UDPPacket packet) throws InterruptedException { + _inboundQueue.put(packet); + } + + + /** + * Blocking for a while + * + * @since IPv6 moved from UDPReceiver + */ + private void stopQueue() { + _inboundQueue.clear(); + for (int i = 0; i < _handlers.length; i++) { + UDPPacket poison = UDPPacket.acquire(_context, false); + poison.setMessageType(TYPE_POISON); + _inboundQueue.offer(poison); + } + for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) { + try { + Thread.sleep(i * 50); + } catch (InterruptedException ie) {} + } + _inboundQueue.clear(); + } + + /** + * Blocking call to retrieve the next inbound packet, or null if we have + * shut down. + * + * @since IPv6 moved from UDPReceiver + */ + public UDPPacket receiveNext() { + UDPPacket rv = null; + //int remaining = 0; + while (_keepReading && rv == null) { + try { + rv = _inboundQueue.take(); + } catch (InterruptedException ie) {} + if (rv != null && rv.getMessageType() == TYPE_POISON) + return null; + } + //_context.statManager().addRateData("udp.receiveRemaining", remaining, 0); + return rv; } /** the packet is from a peer we are establishing an outbound con to, but failed validation, so fallback */ @@ -144,7 +197,7 @@ class PacketHandler { _state = 1; while (_keepReading) { _state = 2; - UDPPacket packet = _endpoint.receive(); + UDPPacket packet = receiveNext(); _state = 3; if (packet == null) break; // keepReading is probably false, or bind failed... diff --git a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java index cf7ce2192..f40e05e08 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketPusher.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketPusher.java @@ -1,26 +1,29 @@ package net.i2p.router.transport.udp; +import java.util.List; + import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; import net.i2p.util.Log; /** * Blocking thread to grab new packets off the outbound fragment - * pool and toss 'em onto the outbound packet queue + * pool and toss 'em onto the outbound packet queues. * + * Here we select which UDPEndpoint/UDPSender to send it out. */ class PacketPusher implements Runnable { // private RouterContext _context; private final Log _log; private final OutboundMessageFragments _fragments; - private final UDPSender _sender; + private final List _endpoints; private volatile boolean _alive; - public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, UDPSender sender) { + public PacketPusher(RouterContext ctx, OutboundMessageFragments fragments, List endpoints) { // _context = ctx; _log = ctx.logManager().getLog(PacketPusher.class); _fragments = fragments; - _sender = sender; + _endpoints = endpoints; } public synchronized void startup() { @@ -38,8 +41,7 @@ class PacketPusher implements Runnable { if (packets != null) { for (int i = 0; i < packets.length; i++) { if (packets[i] != null) // null for ACKed fragments - // BLOCKING if queue is full - _sender.add(packets[i]); + send(packets[i]); } } } catch (Exception e) { @@ -47,4 +49,41 @@ class PacketPusher implements Runnable { } } } + + /** + * This sends it directly out, bypassing OutboundMessageFragments + * and the PacketPusher. The only queueing is for the bandwidth limiter. + * BLOCKING if OB queue is full. + * + * @param packet non-null + * @since IPv6 + */ + public void send(UDPPacket packet) { + boolean handled = false; + boolean isIPv4 = packet.getPacket().getAddress().getAddress().length == 4; + for (int j = 0; j < _endpoints.size(); j++) { + // Find the best endpoint (socket) to send this out. + // TODO if we have multiple IPv4, or multiple IPv6 endpoints, + // we have to track which one we're using in the PeerState and + // somehow set that in the UDPPacket so we're consistent + UDPEndpoint ep; + try { + ep = _endpoints.get(j); + } catch (IndexOutOfBoundsException ioobe) { + // whups, list changed + break; + } + if ((isIPv4 && ep.isIPv4()) || + ((!isIPv4) && ep.isIPv6())) { + // BLOCKING if queue is full + ep.getSender().add(packet); + handled = true; + break; + } + } + if (!handled) { + _log.error("No endpoint to send " + packet); + packet.release(); + } + } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java index 179649de5..2822fe8fb 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpoint.java @@ -2,14 +2,16 @@ package net.i2p.router.transport.udp; import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.net.SocketException; import net.i2p.router.RouterContext; import net.i2p.util.Log; /** - * Coordinate the low level datagram socket, managing the UDPSender and - * UDPReceiver + * Coordinate the low-level datagram socket, creating and managing the UDPSender and + * UDPReceiver. */ class UDPEndpoint { private final RouterContext _context; @@ -20,6 +22,7 @@ class UDPEndpoint { private UDPReceiver _receiver; private DatagramSocket _socket; private final InetAddress _bindAddress; + private final boolean _isIPv4, _isIPv6; /** * @param listenPort -1 or the requested port, may not be honored @@ -31,17 +34,19 @@ class UDPEndpoint { _transport = transport; _bindAddress = bindAddress; _listenPort = listenPort; + _isIPv4 = bindAddress == null || bindAddress instanceof Inet4Address; + _isIPv6 = bindAddress == null || bindAddress instanceof Inet6Address; } /** caller should call getListenPort() after this to get the actual bound port and determine success */ - public synchronized void startup() { + public synchronized void startup() throws SocketException { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up the UDP endpoint"); shutdown(); _socket = getSocket(); if (_socket == null) { _log.log(Log.CRIT, "UDP Unable to open a port"); - return; + throw new SocketException("SSU Unable to bind to a port on " + _bindAddress); } _sender = new UDPSender(_context, _socket, "UDPSender"); _receiver = new UDPReceiver(_context, _transport, _socket, "UDPReceiver"); @@ -144,16 +149,6 @@ class UDPEndpoint { _sender.add(packet); } - /** - * Blocking call to receive the next inbound UDP packet from any peer. - * @return null if we have shut down - */ - public UDPPacket receive() { - if (_receiver == null) - return null; - return _receiver.receiveNext(); - } - /** * Clear outbound queue, probably in preparation for sending destroy() to everybody. * @since 0.9.2 @@ -162,4 +157,20 @@ class UDPEndpoint { if (_sender != null) _sender.clear(); } + + /** + * @return true for wildcard too + * @since IPv6 + */ + public boolean isIPv4() { + return _isIPv4; + } + + /** + * @return true for wildcard too + * @since IPv6 + */ + public boolean isIPv6() { + return _isIPv6; + } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index b7b06bb8d..1807af308 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -3,11 +3,9 @@ package net.i2p.router.transport.udp; import java.io.IOException; import java.net.DatagramSocket; import java.util.Arrays; -import java.util.concurrent.BlockingQueue; import net.i2p.router.RouterContext; import net.i2p.router.transport.FIFOBandwidthLimiter; -import net.i2p.router.util.CoDelBlockingQueue; import net.i2p.util.I2PThread; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -20,37 +18,34 @@ import net.i2p.util.SystemVersion; * waiting around too long, they are dropped. Packets should be pulled off * from the queue ASAP by a {@link PacketHandler} * + * There is a UDPReceiver for each UDPEndpoint. + * It contains a thread but no queue. Received packets are queued + * in the common PacketHandler queue. */ class UDPReceiver { private final RouterContext _context; private final Log _log; private final DatagramSocket _socket; private String _name; - private final BlockingQueue _inboundQueue; private volatile boolean _keepRunning; private final Runner _runner; private final UDPTransport _transport; + private final PacketHandler _handler; private static int __id; private final int _id; private static final boolean _isAndroid = SystemVersion.isAndroid(); - private static final int TYPE_POISON = -99999; - private static final int MIN_QUEUE_SIZE = 16; - private static final int MAX_QUEUE_SIZE = 192; - public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) { _context = ctx; _log = ctx.logManager().getLog(UDPReceiver.class); _id = ++__id; _name = name; - long maxMemory = Runtime.getRuntime().maxMemory(); - if (maxMemory == Long.MAX_VALUE) - maxMemory = 96*1024*1024l; - int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024))); - _inboundQueue = new CoDelBlockingQueue(ctx, "UDP-Receiver", qsize); _socket = socket; _transport = transport; + _handler = transport.getPacketHandler(); + if (_handler == null) + throw new IllegalStateException(); _runner = new Runner(); //_context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", UDPTransport.RATES); //_context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", UDPTransport.RATES); @@ -68,18 +63,6 @@ class UDPReceiver { public synchronized void shutdown() { _keepRunning = false; - _inboundQueue.clear(); - for (int i = 0; i < _transport.getPacketHandlerCount(); i++) { - UDPPacket poison = UDPPacket.acquire(_context, false); - poison.setMessageType(TYPE_POISON); - _inboundQueue.offer(poison); - } - for (int i = 1; i <= 5 && !_inboundQueue.isEmpty(); i++) { - try { - Thread.sleep(i * 50); - } catch (InterruptedException ie) {} - } - _inboundQueue.clear(); } /********* @@ -194,7 +177,7 @@ class UDPReceiver { if (!rejected) { ****/ try { - _inboundQueue.put(packet); + _handler.queueReceived(packet); } catch (InterruptedException ie) { packet.release(); _keepRunning = false; @@ -229,24 +212,6 @@ class UDPReceiver { } ****/ - /** - * Blocking call to retrieve the next inbound packet, or null if we have - * shut down. - * - */ - public UDPPacket receiveNext() { - UDPPacket rv = null; - //int remaining = 0; - while (_keepRunning && rv == null) { - try { - rv = _inboundQueue.take(); - } catch (InterruptedException ie) {} - if (rv != null && rv.getMessageType() == TYPE_POISON) - return null; - } - //_context.statManager().addRateData("udp.receiveRemaining", remaining, 0); - return rv; - } private class Runner implements Runnable { //private volatile boolean _socketChanged; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 885cc7299..923dcba61 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -14,6 +14,9 @@ import net.i2p.util.Log; /** * Lowest level packet sender, pushes anything on its queue ASAP. * + * There is a UDPSender for each UDPEndpoint. + * It contains a thread and a queue. Packet to be sent are queued + * by the PacketPusher. */ class UDPSender { private final RouterContext _context; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index ba69faf99..47c822025 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -3,6 +3,7 @@ package net.i2p.router.transport.udp; import java.io.IOException; import java.io.Writer; import java.net.InetAddress; +import java.net.SocketException; import java.net.UnknownHostException; import java.text.DecimalFormat; import java.util.ArrayList; @@ -18,6 +19,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.Vector; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import net.i2p.data.DatabaseEntry; import net.i2p.data.DataHelper; @@ -51,7 +53,7 @@ import net.i2p.util.Translate; */ public class UDPTransport extends TransportImpl implements TimedWeightedPriorityMessageQueue.FailedListener { private final Log _log; - private UDPEndpoint _endpoint; + private final List _endpoints; private final Object _addDropLock = new Object(); /** Peer (Hash) to PeerState */ private final Map _peersByIdent; @@ -206,6 +208,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _peersByIdent = new ConcurrentHashMap(128); _peersByRemoteHost = new ConcurrentHashMap(128); _dropList = new ConcurrentHashSet(2); + _endpoints = new CopyOnWriteArrayList(); // See comments in DummyThrottle.java if (USE_PRIORITY) { @@ -263,8 +266,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _pusher.shutdown(); if (_handler != null) _handler.shutdown(); - if (_endpoint != null) - _endpoint.shutdown(); + for (UDPEndpoint endpoint : _endpoints) { + endpoint.shutdown(); + // should we remove? + _endpoints.remove(endpoint); + } if (_establisher != null) _establisher.shutdown(); if (_refiller != null) @@ -327,14 +333,24 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _log.warn("Binding only to " + bindToAddr); if (_log.shouldLog(Log.INFO)) _log.info("Binding to the port: " + port); - if (_endpoint == null) { - _endpoint = new UDPEndpoint(_context, this, port, bindToAddr); + if (_endpoints.isEmpty()) { + // will always be empty since we are removing them above + UDPEndpoint endpoint = new UDPEndpoint(_context, this, port, bindToAddr); + _endpoints.add(endpoint); + // TODO add additional endpoints for additional addresses/ports } else { - // todo, set bind address too - _endpoint.setListenPort(port); + // unused for now + for (UDPEndpoint endpoint : _endpoints) { + if (endpoint.isIPv4()) { + // hack, first IPv4 endpoint, FIXME + // todo, set bind address too + endpoint.setListenPort(port); + break; + } + } } setMTU(bindToAddr); - + if (_establisher == null) _establisher = new EstablishmentManager(_context, this); @@ -342,7 +358,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _testManager = new PeerTestManager(_context, this); if (_handler == null) - _handler = new PacketHandler(_context, this, _endpoint, _establisher, _inboundFragments, _testManager, _introManager); + _handler = new PacketHandler(_context, this, _establisher, _inboundFragments, _testManager, _introManager); // See comments in DummyThrottle.java if (USE_PRIORITY && _refiller == null) @@ -353,15 +369,26 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // Startup the endpoint with the requested port, check the actual port, and // take action if it failed or was different than requested or it needs to be saved - _endpoint.startup(); - int newPort = _endpoint.getListenPort(); - _externalListenPort = newPort; - if (newPort <= 0) { + int newPort = -1; + for (UDPEndpoint endpoint : _endpoints) { + try { + endpoint.startup(); + // hack, first IPv4 endpoint, FIXME + if (newPort < 0 && endpoint.isIPv4()) { + newPort = endpoint.getListenPort(); + _externalListenPort = newPort; + } + } catch (SocketException se) { + _endpoints.remove(endpoint); + } + } + if (_endpoints.isEmpty()) { _log.log(Log.CRIT, "Unable to open UDP port"); setReachabilityStatus(CommSystemFacade.STATUS_HOSED); return; } - if (newPort != port || newPort != oldIPort || newPort != oldEPort) { + if (newPort > 0 && + (newPort != port || newPort != oldIPort || newPort != oldEPort)) { // attempt to use it as our external port - this will be overridden by // externalAddressReceived(...) Map changes = new HashMap(); @@ -374,7 +401,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _handler.startup(); _fragments.startup(); _inboundFragments.startup(); - _pusher = new PacketPusher(_context, _fragments, _endpoint.getSender()); + _pusher = new PacketPusher(_context, _fragments, _endpoints); _pusher.startup(); if (USE_PRIORITY) _refiller.startup(); @@ -387,8 +414,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public synchronized void shutdown() { destroyAll(); - if (_endpoint != null) - _endpoint.shutdown(); + for (UDPEndpoint endpoint : _endpoints) { + endpoint.shutdown(); + // should we remove? + _endpoints.remove(endpoint); + } //if (_flooder != null) // _flooder.shutdown(); if (_refiller != null) @@ -416,11 +446,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority */ SessionKey getIntroKey() { return _introKey; } - /** @deprecated unused */ - public int getLocalPort() { - return _endpoint != null ? _endpoint.getListenPort() : -1; - } - public InetAddress getLocalAddress() { return _externalListenHost; } public int getExternalPort() { return _externalListenPort; } @@ -1205,7 +1230,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority void send(UDPPacket packet) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending packet " + packet); - _endpoint.send(packet); + _pusher.send(packet); } /** @@ -1231,7 +1256,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority * @since 0.8.9 */ private void destroyAll() { - _endpoint.clearOutbound(); + for (UDPEndpoint endpoint : _endpoints) { + endpoint.clearOutbound(); + } int howMany = _peersByIdent.size(); // use no more than 1/4 of configured bandwidth final int burst = 8; @@ -1662,13 +1689,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority return ""; } - /** @since 0.8.8 */ - int getPacketHandlerCount() { - PacketHandler handler = _handler; - if (handler != null) - return handler.getHandlerCount(); - else - return 0; + /** @since IPv6 */ + PacketHandler getPacketHandler() { + return _handler; } public void failed(OutboundMessageState msg) { failed(msg, true); } @@ -2521,7 +2544,15 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long pingCutoff = now - (2 * 60*60*1000); long pingFirewallCutoff = now - PING_FIREWALL_CUTOFF; boolean shouldPingFirewall = _reachabilityStatus != CommSystemFacade.STATUS_OK; - boolean pingOneOnly = shouldPingFirewall && _externalListenPort == _endpoint.getListenPort(); + int currentListenPort = -1; + for (UDPEndpoint endpoint : _endpoints) { + // hack, first IPv4 endpoint, FIXME + if (endpoint.isIPv4()) { + currentListenPort = endpoint.getListenPort(); + break; + } + } + boolean pingOneOnly = shouldPingFirewall && _externalListenPort == currentListenPort; boolean shortLoop = shouldPingFirewall; _expireBuffer.clear(); _runCount++;