diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index a123708e4a515efd022d81ee12546620e62cd952..d468372ec578fe72929fa5f299d2213f8cfe19c4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -1,5 +1,7 @@ package net.i2p.client.streaming; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.ArrayList; import java.util.List; @@ -10,24 +12,36 @@ import net.i2p.util.SimpleTimer; /** * Receive new connection attempts + * + * Use a bounded queue to limit the damage from SYN floods, + * router overload, or a slow client + * + * @author zzz modded to use concurrent and bound queue size */ class ConnectionHandler { private I2PAppContext _context; private Log _log; private ConnectionManager _manager; - private List _synQueue; + private LinkedBlockingQueue<Packet> _synQueue; private boolean _active; private int _acceptTimeout; /** max time after receiveNewSyn() and before the matched accept() */ private static final int DEFAULT_ACCEPT_TIMEOUT = 3*1000; + + /** + * This is both SYNs and subsequent packets, and with an initial window size of 12, + * this is a backlog of 5 to 64 Syns, which seems like plenty for now + * Don't make this too big because the removal by all the TimeoutSyns is O(n**2) - sortof. + */ + private static final int MAX_QUEUE_SIZE = 64; /** Creates a new instance of ConnectionHandler */ public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) { _context = context; _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; - _synQueue = new ArrayList(5); + _synQueue = new LinkedBlockingQueue(MAX_QUEUE_SIZE); _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } @@ -35,9 +49,11 @@ class ConnectionHandler { public void setActive(boolean active) { if (_log.shouldLog(Log.DEBUG)) _log.debug("setActive(" + active + ") called"); - synchronized (_synQueue) { - _active = active; - _synQueue.notifyAll(); // so we break from the accept() + _active = active; + if (!active) { + try { + _synQueue.put(new PoisonPacket()); // so we break from the accept() - waits until space is available + } catch (InterruptedException ie) {} } } public boolean getActive() { return _active; } @@ -45,6 +61,11 @@ class ConnectionHandler { /** * Non-SYN packets with a zero SendStreamID may also be queued here so * that they don't get thrown away while the SYN packet before it is queued. + * + * Additional overload protection may be required here... + * We don't have a 3-way handshake, so the SYN fully opens a connection. + * Does that make us more or less vulnerable to SYN flooding? + * */ public void receiveNewSyn(Packet packet) { if (!_active) { @@ -55,10 +76,15 @@ class ConnectionHandler { } if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout); - SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); - synchronized (_synQueue) { - _synQueue.add(packet); - _synQueue.notifyAll(); + // also check if expiration of the head is long past for overload detection with peek() ? + boolean success = _synQueue.offer(packet); // fail immediately if full + if (success) { + SimpleScheduler.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping new SYN request, as the queue is full"); + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + sendReset(packet); } } @@ -82,41 +108,44 @@ class ConnectionHandler { return null; if (!_active) { // fail all the ones we had queued up - synchronized (_synQueue) { - for (int i = 0; i < _synQueue.size(); i++) { - Packet packet = (Packet)_synQueue.get(i); - sendReset(packet); - } - _synQueue.clear(); + while(true) { + Packet packet = _synQueue.poll(); // fails immediately if empty + if (packet == null || packet.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST) + break; + sendReset(packet); } return null; } Packet syn = null; - synchronized (_synQueue) { - while ( _active && (_synQueue.size() <= 0) ) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " - + _synQueue.size()); - if (timeoutMs <= 0) { - try { _synQueue.wait(); } catch (InterruptedException ie) {} - } else { - long remaining = expiration - _context.clock().now(); -// BUGFIX -// The specified amount of real time has elapsed, more or less. -// If timeout is zero, however, then real time is not taken into consideration -// and the thread simply waits until notified. - if (remaining < 1) - break; - try { _synQueue.wait(remaining); } catch (InterruptedException ie) {} - } - } - if (_active && _synQueue.size() > 0) { - syn = (Packet)_synQueue.remove(0); + while ( _active && syn == null) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + + _synQueue.size()); + if (timeoutMs <= 0) { + try { + syn = _synQueue.take(); // waits forever + } catch (InterruptedException ie) {} + } else { + long remaining = expiration - _context.clock().now(); + // (dont think this applies anymore for LinkedBlockingQueue) + // BUGFIX + // The specified amount of real time has elapsed, more or less. + // If timeout is zero, however, then real time is not taken into consideration + // and the thread simply waits until notified. + if (remaining < 1) + break; + try { + syn = _synQueue.poll(remaining, TimeUnit.MILLISECONDS); // waits the specified time max + } catch (InterruptedException ie) {} + break; } } if (syn != null) { + if (syn.getOptionalDelay() == PoisonPacket.MAX_DELAY_REQUEST) + return null; + // deal with forged / invalid syn packets // Handle both SYN and non-SYN packets in the queue @@ -179,10 +208,7 @@ class ConnectionHandler { } public void timeReached() { - boolean removed = false; - synchronized (_synQueue) { - removed = _synQueue.remove(_synPacket); - } + boolean removed = _synQueue.remove(_synPacket); if (removed) { if (_synPacket.isFlagSet(Packet.FLAG_SYNCHRONIZE)) @@ -196,4 +222,17 @@ class ConnectionHandler { } } } + + /** + * Simple end-of-queue marker. + * The standard class limits the delay to MAX_DELAY_REQUEST so + * an evil user can't use this to shut us down + */ + private static class PoisonPacket extends Packet { + public static final int MAX_DELAY_REQUEST = Packet.MAX_DELAY_REQUEST + 1; + + public PoisonPacket() { + setOptionalDelay(MAX_DELAY_REQUEST); + } + } }