From e254c5f31a40ce1c71f689286b45129e7f63d9f7 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Wed, 7 Apr 2010 23:18:58 +0000
Subject: [PATCH]     * Streaming:       - Detect and drop dup SYNs rather than
 create         a duplicate connection - will hopefully fix         "Received
 a syn with the wrong IDs"       - Send reset for a SYN ACK with the wrong IDs
       - Don't send a reset to a null dest       - Logging tweaks       -
 Cleanups

---
 .../net/i2p/client/streaming/Connection.java  |  2 +-
 .../streaming/ConnectionDataReceiver.java     |  3 +-
 .../client/streaming/ConnectionHandler.java   | 26 ++++++++++++--
 .../client/streaming/ConnectionManager.java   |  4 +--
 .../client/streaming/ConnectionOptions.java   |  4 +++
 .../streaming/ConnectionPacketHandler.java    | 32 +++++++++++------
 .../i2p/client/streaming/PacketHandler.java   | 35 ++++++++++++++-----
 7 files changed, 79 insertions(+), 27 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index f825f25a72..2cc1a0cf1f 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -385,7 +385,7 @@ public class Connection {
      *  Process the acks and nacks received in a packet
      *  @return List of packets acked or null
      */
-    List ackPackets(long ackThrough, long nacks[]) {
+    List<PacketLocal> ackPackets(long ackThrough, long nacks[]) {
         if (ackThrough < _highestAckedThrough) {
             // dupack which won't tell us anything
         } else {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
index acec982f6f..07247e670e 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
@@ -2,7 +2,6 @@ package net.i2p.client.streaming;
 
 import net.i2p.I2PAppContext;
 import net.i2p.data.ByteArray;
-import net.i2p.data.DataHelper;
 import net.i2p.util.Log;
 
 /**
@@ -187,7 +186,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
             packet.setOptionalFrom(con.getSession().getMyDestination());
             packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize());
         }
-        if (DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) {
+        if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) {
             packet.setFlag(Packet.FLAG_NO_ACK);
         }
         
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
index 382c984d9a..ba801ae95b 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
@@ -4,6 +4,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import net.i2p.I2PAppContext;
+import net.i2p.data.Destination;
 import net.i2p.util.Log;
 import net.i2p.util.SimpleScheduler;
 import net.i2p.util.SimpleTimer;
@@ -73,8 +74,8 @@ public class ConnectionHandler {
                 sendReset(packet);
             return;
         }
-        if (_log.shouldLog(Log.DEBUG))
-            _log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
+        if (_log.shouldLog(Log.INFO))
+            _log.info("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout);
         // also check if expiration of the head is long past for overload detection with peek() ?
         boolean success = _synQueue.offer(packet); // fail immediately if full
         if (success) {
@@ -145,10 +146,29 @@ public class ConnectionHandler {
                 if (syn.getOptionalDelay() == PoisonPacket.POISON_MAX_DELAY_REQUEST)
                     return null;
 
-                // deal with forged / invalid syn packets
+                // deal with forged / invalid syn packets in _manager.receiveConnection()
 
                 // Handle both SYN and non-SYN packets in the queue
                 if (syn.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
+                    // We are single-threaded here, so this is
+                    // a good place to check for dup SYNs and drop them
+                    Destination from = syn.getOptionalFrom();
+                    if (from == null) {
+                        if (_log.shouldLog(Log.WARN))
+                            _log.warn("Dropping SYN packet with no FROM: " + syn);
+                        // drop it
+                        continue;
+                    }
+                    Connection oldcon = _manager.getConnectionByOutboundId(syn.getReceiveStreamId());
+                    if (oldcon != null) {
+                        // His ID not guaranteed to be unique to us, but probably is...
+                        // only drop it on a destination match too
+                        if (from.equals(oldcon.getRemotePeer())) {
+                            if (_log.shouldLog(Log.WARN))
+                                _log.warn("Dropping dup SYN: " + syn);
+                            continue;
+                        }
+                    }
                     Connection con = _manager.receiveConnection(syn);
                     if (con != null)
                         return con;
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index 685b87b061..f69a0059cb 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -9,7 +9,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import net.i2p.I2PAppContext;
 import net.i2p.I2PException;
 import net.i2p.client.I2PSession;
-import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
 import net.i2p.data.SessionKey;
 import net.i2p.util.Log;
@@ -83,7 +82,7 @@ public class ConnectionManager {
      */
     Connection getConnectionByOutboundId(long id) {
             for (Connection con : _connectionByInboundId.values()) {
-                if (DataHelper.eq(con.getSendStreamId(), id))
+                if (con.getSendStreamId() == id)
                     return con;
             }
         return null;
@@ -169,6 +168,7 @@ public class ConnectionManager {
         
         con.setReceiveStreamId(receiveId);
         try {
+            // This validates the packet, and sets the con's SendStreamID and RemotePeer
             con.getPacketHandler().receivePacket(synPacket, con);
         } catch (I2PException ie) {
             _connectionByInboundId.remove(Long.valueOf(receiveId));
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index d8f50fbcf7..a3f691eceb 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -359,7 +359,11 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
         if (_rtt > 60*1000)
             _rtt = 60*1000;
     }
+
     public int getRTO() { return _rto; }
+
+    /** for debugging @since 0.7.13 */
+    int getRTTDev() { return _rttDev; }
     
     /**
      * If we have 3 consecutive rtt increases, we are trending upwards (1), or if we have
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index ba705b2d5c..b9ebbb7d24 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -4,7 +4,6 @@ import java.util.List;
 
 import net.i2p.I2PAppContext;
 import net.i2p.I2PException;
-import net.i2p.data.DataHelper;
 import net.i2p.data.Destination;
 import net.i2p.util.Log;
 import net.i2p.util.SimpleScheduler;
@@ -239,17 +238,17 @@ public class ConnectionPacketHandler {
         boolean firstAck = isNew && con.getHighestAckedThrough() < 0;
 
         int numResends = 0;
-        List acked = null;
+        List<PacketLocal> acked = null;
         // if we don't know the streamIds for both sides of the connection, there's no way we
         // could actually be acking data (this fixes the buggered up ack of packet 0 problem).
         // this is called after packet verification, which places the stream IDs as necessary if
         // the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet)
         if ( (packet != null) && (packet.getSendStreamId() > 0) && (packet.getReceiveStreamId() > 0) &&
              (con != null) && (con.getSendStreamId() > 0) && (con.getReceiveStreamId() > 0) &&
-             (!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
-             (!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
-             (!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
-             (!DataHelper.eq(con.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) )
+             (packet.getSendStreamId() != Packet.STREAM_ID_UNKNOWN) &&
+             (packet.getReceiveStreamId() != Packet.STREAM_ID_UNKNOWN) &&
+             (con.getSendStreamId() != Packet.STREAM_ID_UNKNOWN) &&
+             (con.getReceiveStreamId() != Packet.STREAM_ID_UNKNOWN) )
             acked = con.ackPackets(ackThrough, nacks);
         else
             return false;
@@ -261,7 +260,7 @@ public class ConnectionPacketHandler {
             // and the highest rtt lets us set our resend delay properly
             int highestRTT = -1;
             for (int i = 0; i < acked.size(); i++) {
-                PacketLocal p = (PacketLocal)acked.get(i);
+                PacketLocal p = acked.get(i);
                 if (p.getAckTime() > highestRTT) {
                     //if (p.getNumSends() <= 1)
                     highestRTT = p.getAckTime();
@@ -282,7 +281,15 @@ public class ConnectionPacketHandler {
                     _log.debug("Packet acked after " + p.getAckTime() + "ms: " + p);
             }
             if (highestRTT > 0) {
+                int oldrtt = con.getOptions().getRTT();
+                int oldrto = con.getOptions().getRTO();
+                int olddev = con.getOptions().getRTTDev();
                 con.getOptions().updateRTT(highestRTT);
+                if (_log.shouldLog(Log.INFO))
+                    _log.info("acked: " + acked.size() + " highestRTT: " + highestRTT +
+                              " RTT: " + oldrtt + " -> " + con.getOptions().getRTT() +
+                              " RTO: " + oldrto + " -> " + con.getOptions().getRTO() +
+                              " Dev: " + olddev + " -> " + con.getOptions().getRTTDev());
                 if (firstAck) {
                     if (con.isInbound())
                         _context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0);
@@ -357,7 +364,7 @@ public class ConnectionPacketHandler {
                     // integers, so lets use a random distribution instead
                     int shouldIncrement = _context.random().nextInt(con.getOptions().getCongestionAvoidanceGrowthRateFactor()*newWindowSize);
                     if (shouldIncrement < acked)
-                        newWindowSize += 1;
+                        newWindowSize++;
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("cong. avoid acks = " + acked + " for " + con);
                 }
@@ -387,6 +394,11 @@ public class ConnectionPacketHandler {
     
     /**
      * Make sure this packet is ok and that we can continue processing its data.
+     *
+     * SIDE EFFECT:
+     * Sets the SendStreamId and RemotePeer for the con,
+     * using the packet's ReceiveStreamId and OptionalFrom,
+     * If this is a SYN packet and the con's SendStreamId is not set.
      * 
      * @return true if the packet is ok for this connection, false if we shouldn't
      *         continue processing.
@@ -415,7 +427,7 @@ public class ConnectionPacketHandler {
                     }
                 }
             } else {
-                if (!DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) {
+                if (con.getSendStreamId() != packet.getReceiveStreamId()) {
                     if (_log.shouldLog(Log.ERROR))
                         _log.error("Packet received with the wrong reply stream id: " 
                                   + con + " / " + packet);
@@ -431,7 +443,7 @@ public class ConnectionPacketHandler {
      * Make sure this RST packet is valid, and if it is, act on it.
      */
     private void verifyReset(Packet packet, Connection con) {
-        if (DataHelper.eq(con.getReceiveStreamId(), packet.getSendStreamId())) {
+        if (con.getReceiveStreamId() == packet.getSendStreamId()) {
             boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null);
             if (!ok) {
                 if (_log.shouldLog(Log.ERROR))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
index 19e62db0c9..95674b433e 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
@@ -7,7 +7,7 @@ import java.util.Set;
 
 import net.i2p.I2PAppContext;
 import net.i2p.I2PException;
-import net.i2p.data.DataHelper;
+import net.i2p.data.Destination;
 import net.i2p.util.Log;
 
 /**
@@ -19,15 +19,15 @@ public class PacketHandler {
     private ConnectionManager _manager;
     private I2PAppContext _context;
     private Log _log;
-    private int _lastDelay;
-    private int _dropped;
+    //private int _lastDelay;
+    //private int _dropped;
     
     public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) {
         _manager = mgr;
         _context = ctx;
-        _dropped = 0;
+        //_dropped = 0;
         _log = ctx.logManager().getLog(PacketHandler.class);
-        _lastDelay = _context.random().nextInt(30*1000);
+        //_lastDelay = _context.random().nextInt(30*1000);
     }
     
 /** what is the point of this ? */
@@ -167,7 +167,7 @@ public class PacketHandler {
                 }
             } else {
                 if ( (con.getSendStreamId() <= 0) || 
-                     (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ||
+                     (con.getSendStreamId() == packet.getReceiveStreamId()) ||
                      (packet.getSequenceNum() <= ConnectionOptions.MIN_WINDOW_SIZE) ) { // its in flight from the first batch
                     long oldId = con.getSendStreamId();
                     if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
@@ -179,6 +179,7 @@ public class PacketHandler {
                         } else {
                             if (_log.shouldLog(Log.ERROR))
                                 _log.error("Received a syn with the wrong IDs, con=" + con + " packet=" + packet);
+                            sendReset(packet);
                             packet.releasePayload();
                             return;
                         }
@@ -199,16 +200,18 @@ public class PacketHandler {
                 } else {
                     if (!con.getResetSent()) {
                         // someone is sending us a packet on the wrong stream 
+                        // It isn't a SYN so it isn't likely to have a FROM to send a reset back to
                         if (_log.shouldLog(Log.ERROR)) {
                             Set cons = _manager.listConnections();
                             StringBuilder buf = new StringBuilder(512);
                             buf.append("Received a packet on the wrong stream: ");
                             buf.append(packet);
-                            buf.append(" connection: ");
+                            buf.append("\nthis connection:\n");
                             buf.append(con);
+                            buf.append("\nall connections:");
                             for (Iterator iter = cons.iterator(); iter.hasNext();) {
                                 Connection cur = (Connection)iter.next();
-                                buf.append(" ").append(cur);
+                                buf.append('\n').append(cur);
                             }
                             _log.error(buf.toString(), new Exception("Wrong stream"));
                         }
@@ -219,8 +222,22 @@ public class PacketHandler {
         }
     }
     
+    /**
+     *  This sends a reset back to the place this packet came from.
+     *  If the packet has no 'optional from' or valid signature, this does nothing.
+     *  This is not associated with a connection, so no con stats are updated.
+     */
     private void sendReset(Packet packet) {
-        PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom());
+        Destination from = packet.getOptionalFrom();
+        if (from == null)
+            return;
+        boolean ok = packet.verifySignature(_context, from, null);
+        if (!ok) {
+            if (_log.shouldLog(Log.WARN))
+                _log.warn("Can't send reset after recv spoofed packet: " + packet);
+            return;
+        }
+        PacketLocal reply = new PacketLocal(_context, from);
         reply.setFlag(Packet.FLAG_RESET);
         reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
         reply.setSendStreamId(packet.getReceiveStreamId());
-- 
GitLab