From 6a19501214b3f1c26f1f015332a7e4a13106599f Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Mon, 8 Aug 2005 20:35:50 +0000
Subject: [PATCH] 2005-08-08  jrandom     * Add a configurable throttle to the
 number of concurrent outbound SSU       connection negotiations (via
 i2np.udp.maxConcurrentEstablish=4).  This       may help those with slow
 connections to get integrated at the start.     * Further fixlets to the
 streaming lib

---
 .../net/i2p/client/streaming/Connection.java  |   2 +-
 .../client/streaming/ConnectionOptions.java   |   4 +-
 .../streaming/ConnectionPacketHandler.java    |  19 ++-
 .../client/streaming/MessageInputStream.java  |   2 +-
 core/java/src/net/i2p/CoreVersion.java        |   4 +-
 history.txt                                   |  10 +-
 .../src/net/i2p/router/RouterVersion.java     |   6 +-
 .../router/message/GarlicMessageBuilder.java  |   2 +-
 .../transport/udp/EstablishmentManager.java   | 125 +++++++++++++++++-
 .../transport/udp/InboundEstablishState.java  |  19 +++
 .../pool/HandleTunnelCreateMessageJob.java    |   4 +-
 11 files changed, 176 insertions(+), 21 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index 128e2f991a..aaf7d0a546 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -72,7 +72,7 @@ public class Connection {
     private long _lifetimeDupMessageSent;
     private long _lifetimeDupMessageReceived;
     
-    public static final long MAX_RESEND_DELAY = 10*1000;
+    public static final long MAX_RESEND_DELAY = 5*1000;
     public static final long MIN_RESEND_DELAY = 3*1000;
 
     /** wait up to 5 minutes after disconnection so we can ack/close packets */
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
index a239a62605..30175938df 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java
@@ -98,7 +98,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
         setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000));
         setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500));
         setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
-        setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
+        setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10));
         setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
         setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000));
         setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT));
@@ -130,7 +130,7 @@ public class ConnectionOptions extends I2PSocketOptionsImpl {
         if (opts.containsKey(PROP_INITIAL_WINDOW_SIZE))
             setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1));
         if (opts.containsKey(PROP_MAX_RESENDS))
-            setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5));
+            setMaxResends(getInt(opts, PROP_MAX_RESENDS, 10));
         if (opts.containsKey(PROP_WRITE_TIMEOUT))
             setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1));
         if (opts.containsKey(PROP_INACTIVITY_TIMEOUT))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index 479b464836..cbe42e8b4f 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -62,7 +62,7 @@ public class ConnectionPacketHandler {
                 con.getOutputStream().setBufferSize(packet.getOptionalMaxSize());
             }
         }
-
+        
         con.packetReceived();
         
         boolean choke = false;
@@ -92,7 +92,20 @@ public class ConnectionPacketHandler {
 
         _context.statManager().addRateData("stream.con.receiveMessageSize", packet.getPayloadSize(), 0);
         
-        boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
+        boolean isNew = false;
+        boolean allowAck = true;
+        
+        if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) && 
+             ( (packet.getSendStreamId() == null) ||  
+               (packet.getReceiveStreamId() == null) || 
+               (DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) ||
+               (DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) ) )
+            allowAck = false;
+
+        if (allowAck)
+            isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
+        else
+            isNew = con.getInputStream().messageReceived(con.getInputStream().getHighestReadyBockId(), null);
         
         if ( (packet.getSequenceNum() == 0) && (packet.getPayloadSize() > 0) ) {
             if (_log.shouldLog(Log.DEBUG))
@@ -190,6 +203,8 @@ public class ConnectionPacketHandler {
              (!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
              (!DataHelper.eq(con.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) )
             acked = con.ackPackets(ackThrough, nacks);
+        else
+            return false;
         
         if ( (acked != null) && (acked.size() > 0) ) {
             if (_log.shouldLog(Log.DEBUG))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
index fac4ad2050..09f02c6b53 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
@@ -202,7 +202,7 @@ public class MessageInputStream extends InputStream {
     public boolean messageReceived(long messageId, ByteArray payload) {
         synchronized (_dataLock) {
             if (_log.shouldLog(Log.DEBUG))
-                _log.debug("received " + messageId + " with " + payload.getValid());
+                _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload"));
             if (messageId <= _highestReadyBlockId) {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("ignoring dup message " + messageId);
diff --git a/core/java/src/net/i2p/CoreVersion.java b/core/java/src/net/i2p/CoreVersion.java
index 7d72508d35..eadcefde58 100644
--- a/core/java/src/net/i2p/CoreVersion.java
+++ b/core/java/src/net/i2p/CoreVersion.java
@@ -14,8 +14,8 @@ package net.i2p;
  *
  */
 public class CoreVersion {
-    public final static String ID = "$Revision: 1.36 $ $Date: 2005/07/27 14:04:49 $";
-    public final static String VERSION = "0.6.0.1";
+    public final static String ID = "$Revision: 1.37 $ $Date: 2005/08/03 13:58:12 $";
+    public final static String VERSION = "0.6.0.2";
 
     public static void main(String args[]) {
         System.out.println("I2P Core version: " + VERSION);
diff --git a/history.txt b/history.txt
index 411cdfc025..aa2878b2d1 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,12 @@
-$Id: history.txt,v 1.222 2005/08/03 13:58:13 jrandom Exp $
+$Id: history.txt,v 1.223 2005/08/07 14:31:58 jrandom Exp $
+
+* 2005-08-08  0.6.0.2 released
+
+2005-08-08  jrandom
+    * Add a configurable throttle to the number of concurrent outbound SSU
+      connection negotiations (via i2np.udp.maxConcurrentEstablish=4).  This
+      may help those with slow connections to get integrated at the start.
+    * Further fixlets to the streaming lib
 
 2005-08-07  Complication
     * Display the average clock skew for both SSU and TCP connections
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index e39e3e2e0b..10e487c5e7 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
  *
  */
 public class RouterVersion {
-    public final static String ID = "$Revision: 1.211 $ $Date: 2005/08/03 13:58:13 $";
-    public final static String VERSION = "0.6.0.1";
-    public final static long BUILD = 1;
+    public final static String ID = "$Revision: 1.212 $ $Date: 2005/08/07 14:31:58 $";
+    public final static String VERSION = "0.6.0.2";
+    public final static long BUILD = 0;
     public static void main(String args[]) {
         System.out.println("I2P Router version: " + VERSION);
         System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java
index 7220b8c630..b9f8660eb7 100644
--- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java
+++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java
@@ -110,7 +110,7 @@ public class GarlicMessageBuilder {
         msg.setMessageExpiration(config.getExpiration());
         
         long timeFromNow = config.getExpiration() - ctx.clock().now();
-        if (timeFromNow < 10*1000)
+        if (timeFromNow < 15*1000)
             log.error("Building a message expiring in " + timeFromNow + "ms: " + config, new Exception("created by"));
         
         if (log.shouldLog(Log.WARN))
diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
index 76cd9a488f..31f4326621 100644
--- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
+++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java
@@ -1,7 +1,9 @@
 package net.i2p.router.transport.udp;
 
 import java.net.InetAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -31,10 +33,15 @@ public class EstablishmentManager {
     private Map _inboundStates;
     /** map of RemoteHostId to OutboundEstablishState */
     private Map _outboundStates;
+    /** map of RemoteHostId to List of OutNetMessage for messages exceeding capacity */
+    private Map _queuedOutbound;
     private boolean _alive;
     private Object _activityLock;
     private int _activity;
     
+    private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 16;
+    public static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish";
+    
     public EstablishmentManager(RouterContext ctx, UDPTransport transport) {
         _context = ctx;
         _log = ctx.logManager().getLog(EstablishmentManager.class);
@@ -42,6 +49,7 @@ public class EstablishmentManager {
         _builder = new PacketBuilder(ctx);
         _inboundStates = new HashMap(32);
         _outboundStates = new HashMap(32);
+        _queuedOutbound = new HashMap(32);
         _activityLock = new Object();
         _context.statManager().createRateStat("udp.inboundEstablishTime", "How long it takes for a new inbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
         _context.statManager().createRateStat("udp.outboundEstablishTime", "How long it takes for a new outbound session to be established", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
@@ -80,6 +88,18 @@ public class EstablishmentManager {
             return state;
         }
     }
+    
+    private int getMaxConcurrentEstablish() {
+        String val = _context.getProperty(PROP_MAX_CONCURRENT_ESTABLISH);
+        if (val != null) {
+            try {
+                return Integer.parseInt(val);
+            } catch (NumberFormatException nfe) {
+                return DEFAULT_MAX_CONCURRENT_ESTABLISH;
+            }
+        }
+        return DEFAULT_MAX_CONCURRENT_ESTABLISH;
+    }
   
     /**
      * Send the message to its specified recipient by establishing a connection
@@ -104,12 +124,27 @@ public class EstablishmentManager {
         synchronized (_outboundStates) {
             OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(to);
             if (state == null) {
-                state = new OutboundEstablishState(_context, remAddr, port, 
-                                                   msg.getTarget().getIdentity(), 
-                                                   new SessionKey(addr.getIntroKey()));
-                _outboundStates.put(to, state);
+                if (_outboundStates.size() >= getMaxConcurrentEstablish()) {
+                    List queued = (List)_queuedOutbound.get(to);
+                    if (queued == null) {
+                        queued = new ArrayList(1);
+                        _queuedOutbound.put(to, queued);
+                    }
+                    queued.add(msg);
+                } else {
+                    state = new OutboundEstablishState(_context, remAddr, port, 
+                                                       msg.getTarget().getIdentity(), 
+                                                       new SessionKey(addr.getIntroKey()));
+                    _outboundStates.put(to, state);
+                }
+            }
+            if (state != null) {
+                state.addMessage(msg);
+                List queued = (List)_queuedOutbound.remove(to);
+                if (queued != null)
+                    for (int i = 0; i < queued.size(); i++)
+                        state.addMessage((OutNetMessage)queued.get(i));
             }
-            state.addMessage(msg);
         }
         
         notifyActivity();
@@ -177,9 +212,27 @@ public class EstablishmentManager {
      */
     PeerState receiveData(OutboundEstablishState state) {
         state.dataReceived();
+        int active = 0;
+        int admitted = 0;
+        int remaining = 0;
         synchronized (_outboundStates) {
+            active = _outboundStates.size();
             _outboundStates.remove(state.getRemoteHostId());
+            if (_queuedOutbound.size() > 0) {
+                // there shouldn't have been queued messages for this active state, but just in case...
+                List queued = (List)_queuedOutbound.remove(state.getRemoteHostId());
+                if (queued != null) {
+                    for (int i = 0; i < queued.size(); i++) 
+                        state.addMessage((OutNetMessage)queued.get(i));
+                }
+                
+                admitted = locked_admitQueued();
+            }
+            remaining = _queuedOutbound.size();
         }
+        //if (admitted > 0)
+        //    _log.log(Log.CRIT, "Admitted " + admitted + " with " + remaining + " remaining queued and " + active + " active");
+        
         if (_log.shouldLog(Log.INFO))
             _log.info("Outbound established completely!  yay");
         PeerState peer = handleCompletelyEstablished(state);
@@ -187,6 +240,40 @@ public class EstablishmentManager {
         return peer;
     }
 
+    private int locked_admitQueued() {
+        int admitted = 0;
+        while ( (_queuedOutbound.size() > 0) && (_outboundStates.size() < getMaxConcurrentEstablish()) ) {
+            // ok, active shrunk, lets let some queued in.  duplicate the synchronized 
+            // section from the add(
+
+            RemoteHostId to = (RemoteHostId)_queuedOutbound.keySet().iterator().next();
+            List queued = (List)_queuedOutbound.remove(to);
+
+            if (queued.size() <= 0)
+                continue;
+            
+            OutNetMessage msg = (OutNetMessage)queued.get(0);
+            RouterAddress ra = msg.getTarget().getTargetAddress(_transport.getStyle());
+            if (ra == null) {
+                for (int i = 0; i < queued.size(); i++) 
+                    _transport.failed((OutNetMessage)queued.get(i));
+                continue;
+            }
+            UDPAddress addr = new UDPAddress(ra);
+            InetAddress remAddr = addr.getHostAddress();
+            int port = addr.getPort();
+
+            OutboundEstablishState qstate = new OutboundEstablishState(_context, remAddr, port, 
+                                               msg.getTarget().getIdentity(), 
+                                               new SessionKey(addr.getIntroKey()));
+            _outboundStates.put(to, qstate);
+
+            for (int i = 0; i < queued.size(); i++)
+                qstate.addMessage((OutNetMessage)queued.get(i));
+            admitted++;
+        }
+        return admitted;
+    }
     
     private void notifyActivity() {
         synchronized (_activityLock) { 
@@ -429,7 +516,11 @@ public class EstablishmentManager {
         long now = _context.clock().now();
         long nextSendTime = -1;
         OutboundEstablishState outboundState = null;
+        int admitted = 0;
+        int remaining = 0;
+        int active = 0;
         synchronized (_outboundStates) {
+            active = _outboundStates.size();
             //if (_log.shouldLog(Log.DEBUG))
             //    _log.debug("# outbound states: " + _outboundStates.size());
             for (Iterator iter = _outboundStates.values().iterator(); iter.hasNext(); ) {
@@ -473,8 +564,14 @@ public class EstablishmentManager {
                     }
                 }
             }
+            
+            admitted = locked_admitQueued();    
+            remaining = _queuedOutbound.size();
         }
 
+        //if (admitted > 0)
+        //    _log.log(Log.CRIT, "Admitted " + admitted + " in push with " + remaining + " remaining queued and " + active + " active");
+        
         if (outboundState != null) {
             if (outboundState.getLifetime() > MAX_ESTABLISH_TIME) {
                 if (outboundState.getState() != OutboundEstablishState.STATE_CONFIRMED_COMPLETELY) {
@@ -484,7 +581,23 @@ public class EstablishmentManager {
                             break;
                         _transport.failed(msg);
                     }
-                    _context.shitlist().shitlistRouter(outboundState.getRemoteIdentity().calculateHash(), "Unable to establish with SSU");
+                    String err = null;
+                    switch (outboundState.getState()) {
+                        case OutboundEstablishState.STATE_CONFIRMED_PARTIALLY:
+                            err = "Took too long to establish remote connection (confirmed partially)";
+                            break;
+                        case OutboundEstablishState.STATE_CREATED_RECEIVED:
+                            err = "Took too long to establish remote connection (created received)";
+                            break;
+                        case OutboundEstablishState.STATE_REQUEST_SENT:
+                            err = "Took too long to establish remote connection (request sent)";
+                            break;
+                        case OutboundEstablishState.STATE_UNKNOWN: // fallthrough
+                        default:
+                            err = "Took too long to establish remote connection (unknown state)";
+                    }
+                    
+                    _context.shitlist().shitlistRouter(outboundState.getRemoteIdentity().calculateHash(), err);
                 } else {
                     while (true) {
                         OutNetMessage msg = outboundState.getNextQueuedMessage();
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java
index f39f67d473..4a9979ed44 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java
@@ -319,4 +319,23 @@ public class InboundEstablishState {
         _lastReceive = _context.clock().now();
         _nextSend = _lastReceive;
     }
+    
+    public String toString() {            
+        StringBuffer buf = new StringBuffer(128);
+        buf.append(super.toString());
+        if (_receivedX != null)
+            buf.append(" ReceivedX: ").append(Base64.encode(_receivedX, 0, 4));
+        if (_sentY != null)
+            buf.append(" SentY: ").append(Base64.encode(_sentY, 0, 4));
+        if (_aliceIP != null)
+            buf.append(" AliceIP: ").append(Base64.encode(_aliceIP));
+        buf.append(" AlicePort: ").append(_alicePort);
+        if (_bobIP != null)
+            buf.append(" BobIP: ").append(Base64.encode(_bobIP));
+        buf.append(" BobPort: ").append(_bobPort);
+        buf.append(" RelayTag: ").append(_sentRelayTag);
+        buf.append(" SignedOn: ").append(_sentSignedOnTime);
+        buf.append(" state: ").append(_currentState);
+        return buf.toString();
+    }
 }
diff --git a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java
index ed458314b3..0fb4d4bca4 100644
--- a/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java
+++ b/router/java/src/net/i2p/router/tunnel/pool/HandleTunnelCreateMessageJob.java
@@ -96,8 +96,8 @@ public class HandleTunnelCreateMessageJob extends JobImpl {
         public void runJob() {
             HandleTunnelCreateMessageJob.this.runJob();
         }
-        private static final String NAME_OK = "Deferred netDb accept";
-        private static final String NAME_REJECT = "Deferred netDb reject";
+        private static final String NAME_OK = "Deferred tunnel accept";
+        private static final String NAME_REJECT = "Deferred tunnel reject";
         public String getName() { return _shouldAccept ? NAME_OK : NAME_REJECT; }
     }
     
-- 
GitLab