From 937ae8ad60d3d2ae0118e518700dbbbbcfbcd7d8 Mon Sep 17 00:00:00 2001
From: zzz <zzz@mail.i2p>
Date: Fri, 9 Dec 2011 16:43:54 +0000
Subject: [PATCH]   * UDP:     - Round expiration times when converting to
 seconds     - Zero-copy of single-fragment messages in MessageReceiver     -
 Optimizations, log tweaks, comments

---
 .../i2p/router/transport/udp/ACKSender.java   |  4 +-
 .../transport/udp/InboundMessageState.java    | 23 +++++-
 .../router/transport/udp/MessageReceiver.java | 75 +++++++++++--------
 .../router/transport/udp/PacketBuilder.java   |  4 +
 .../i2p/router/transport/udp/PeerState.java   | 50 +++++--------
 5 files changed, 90 insertions(+), 66 deletions(-)

diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
index da5e1034e5..012af9e972 100644
--- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java
+++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java
@@ -138,8 +138,8 @@ class ACKSender implements Runnable {
                             // bulk operations may throw an exception
                             _peersToACK.addAll(notYet);
                         } catch (Exception e) {}
-                        if (_log.shouldLog(Log.INFO))
-                            _log.info("sleeping, pending size = " + notYet.size());
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("sleeping, pending size = " + notYet.size());
                         notYet.clear();
                         try {
                             // sleep a little longer than the divided frequency,
diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
index 16a194d13b..2118170688 100644
--- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
+++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java
@@ -69,6 +69,17 @@ class InboundMessageState {
             try {
                 data.readMessageFragment(dataFragment, message.getData(), 0);
                 int size = data.readMessageFragmentSize(dataFragment);
+                if (size <= 0) {
+                    // Bug in routers prior to 0.8.12
+                    // If the msg size was an exact multiple of the fragment size,
+                    // it would send a zero-length last fragment.
+                    // This message is almost certainly doomed.
+                    // We might as well ack it, keep going, and pass it along to I2NP where it
+                    // will get dropped as corrupted.
+                    // If we don't ack the fragment he will just send a zero-length fragment again.
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn("Zero-length fragment " + fragmentNum + " for message " + _messageId + " from " + _from);
+                }
                 message.setValid(size);
                 _fragments[fragmentNum] = message;
                 boolean isLast = data.readMessageIsLast(dataFragment);
@@ -91,7 +102,7 @@ class InboundMessageState {
                     _log.debug("New fragment " + fragmentNum + " for message " + _messageId 
                                + ", size=" + size
                                + ", isLast=" + isLast
-                               + ", data=" + Base64.encode(message.getData(), 0, size));
+                          /*   + ", data=" + Base64.encode(message.getData(), 0, size)   */  );
             } catch (ArrayIndexOutOfBoundsException aioobe) {
                 _log.warn("Corrupt SSU fragment " + fragmentNum, aioobe);
                 return false;
@@ -106,11 +117,15 @@ class InboundMessageState {
     }
     
     /**
-     *  May not be valid after released
+     *  May not be valid after released.
+     *  Probably doesn't need to be synced by caller, given the order of
+     *  events in receiveFragment() above, but you might want to anyway
+     *  to be safe.
      */
     public boolean isComplete() {
-        if (_lastFragment < 0) return false;
-        for (int i = 0; i <= _lastFragment; i++)
+        int last = _lastFragment;
+        if (last < 0) return false;
+        for (int i = 0; i <= last; i++)
             if (_fragments[i] == null)
                 return false;
         return true;
diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
index a55fa92261..276d2d1c8e 100644
--- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
+++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java
@@ -11,6 +11,7 @@ import net.i2p.data.i2np.I2NPMessageHandler;
 import net.i2p.data.i2np.I2NPMessageImpl;
 import net.i2p.router.RouterContext;
 //import net.i2p.util.ByteCache;
+import net.i2p.util.HexDump;
 import net.i2p.util.I2PThread;
 import net.i2p.util.Log;
 
@@ -53,8 +54,8 @@ class MessageReceiver {
         _context.statManager().createRateStat("udp.inboundExpired", "How many messages were expired before reception?", "udp", UDPTransport.RATES);
         //_context.statManager().createRateStat("udp.inboundRemaining", "How many messages were remaining when a message is pulled off the complete queue?", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
-        _context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
-        _context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
+        //_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
+        //_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
         _context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
         
         _alive = true;
@@ -136,16 +137,16 @@ class MessageReceiver {
                 _context.statManager().addRateData("udp.inboundExpired", expired, expiredLifetime);
             
             if (message != null) {
-                long before = System.currentTimeMillis();
+                //long before = System.currentTimeMillis();
                 //if (remaining > 0)
                 //    _context.statManager().addRateData("udp.inboundRemaining", remaining, 0);
                 int size = message.getCompleteSize();
-                if (_log.shouldLog(Log.INFO))
-                    _log.info("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
-                long afterRead = -1;
+                //if (_log.shouldLog(Log.DEBUG))
+                //    _log.debug("Full message received (" + message.getMessageId() + ") after " + message.getLifetime());
+                //long afterRead = -1;
                 try {
                     I2NPMessage msg = readMessage(buf, message, handler);
-                    afterRead = System.currentTimeMillis();
+                    //afterRead = System.currentTimeMillis();
                     if (msg != null)
                         _transport.messageReceived(msg, null, message.getFrom(), message.getLifetime(), size);
                 } catch (RuntimeException re) {
@@ -153,11 +154,11 @@ class MessageReceiver {
                     continue;
                 }
                 message = null;
-                long after = System.currentTimeMillis();
-                if (afterRead - before > 100)
-                    _context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
-                if (after - afterRead > 100)
-                    _context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
+                //long after = System.currentTimeMillis();
+                //if (afterRead - before > 100)
+                //    _context.statManager().addRateData("udp.inboundReadTime", afterRead - before, remaining);
+                //if (after - afterRead > 100)
+                //    _context.statManager().addRateData("udp.inboundReceiveProcessTime", after - afterRead, remaining);
             }
         }
         
@@ -168,36 +169,50 @@ class MessageReceiver {
     /**
      *  Assemble all the fragments into an I2NP message.
      *  This calls state.releaseResources(), do not access state after calling this.
+     *
+     *  @param buf temp buffer for convenience
      *  @return null on error
      */
     private I2NPMessage readMessage(ByteArray buf, InboundMessageState state, I2NPMessageHandler handler) {
         try {
             //byte buf[] = new byte[state.getCompleteSize()];
-            ByteArray fragments[] = state.getFragments();
+            I2NPMessage m;
             int numFragments = state.getFragmentCount();
-            int off = 0;
-            for (int i = 0; i < numFragments; i++) {
-                System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
-                if (_log.shouldLog(Log.DEBUG))
-                    _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": " 
-                               + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
-                               + " (valid: " + fragments[i].getValid() 
-                               + " raw: " + Base64.encode(fragments[i].getData()) + ")");
-                off += fragments[i].getValid();
+            if (numFragments > 1) {
+                ByteArray fragments[] = state.getFragments();
+                int off = 0;
+                for (int i = 0; i < numFragments; i++) {
+                    System.arraycopy(fragments[i].getData(), 0, buf.getData(), off, fragments[i].getValid());
+                    //if (_log.shouldLog(Log.DEBUG))
+                    //    _log.debug("Raw fragment[" + i + "] for " + state.getMessageId() + ": " 
+                    //               + Base64.encode(fragments[i].getData(), 0, fragments[i].getValid())
+                    //               + " (valid: " + fragments[i].getValid() 
+                    //               + " raw: " + Base64.encode(fragments[i].getData()) + ")");
+                    off += fragments[i].getValid();
+                }
+                if (off != state.getCompleteSize()) {
+                    if (_log.shouldLog(Log.WARN))
+                        _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
+                    return null;
+                }
+                //if (_log.shouldLog(Log.DEBUG))
+                //    _log.debug("Raw byte array for " + state.getMessageId() + ": " + HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
+                m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
+            } else {
+                // zero copy for single fragment
+                m = I2NPMessageImpl.fromRawByteArray(_context, state.getFragments()[0].getData(), 0, state.getCompleteSize(), handler);
             }
-            if (off != state.getCompleteSize()) {
-                if (_log.shouldLog(Log.WARN))
-                    _log.warn("Hmm, offset of the fragments = " + off + " while the state says " + state.getCompleteSize());
-                return null;
+            if (state.getCompleteSize() == 534 && _log.shouldLog(Log.INFO)) {
+                _log.info(HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
             }
-            if (_log.shouldLog(Log.DEBUG))
-                _log.debug("Raw byte array for " + state.getMessageId() + ": " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
-            I2NPMessage m = I2NPMessageImpl.fromRawByteArray(_context, buf.getData(), 0, state.getCompleteSize(), handler);
             m.setUniqueId(state.getMessageId());
             return m;
         } catch (I2NPMessageException ime) {
-            if (_log.shouldLog(Log.WARN))
+            if (_log.shouldLog(Log.WARN)) {
                 _log.warn("Message invalid: " + state, ime);
+                _log.warn(HexDump.dump(buf.getData(), 0, state.getCompleteSize()));
+                _log.warn("RAW: " + Base64.encode(buf.getData(), 0, state.getCompleteSize()));
+            }
             _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "error: " + ime.toString() + ": " + state.toString());
             return null;
         } catch (Exception e) {
diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
index 42b93c5ae7..815ce9f4fe 100644
--- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
+++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java
@@ -238,6 +238,10 @@ class PacketBuilder {
             packet.release();
             return null;
         }
+        if (dataSize == 0) {
+            // OK according to the protocol but if we send it, it's a bug
+            _log.error("Sending zero-size fragment " + fragment + " of " + state + " for " + peer);
+        }
         int currentMTU = peer.getMTU();
         int availableForAcks = currentMTU - MIN_DATA_PACKET_OVERHEAD - dataSize;
         int availableForExplicitAcks = availableForAcks;
diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java
index 0a92476878..8975dbc73b 100644
--- a/router/java/src/net/i2p/router/transport/udp/PeerState.java
+++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java
@@ -1169,11 +1169,10 @@ class PeerState {
         state.setPeer(this);
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId());
-        List<OutboundMessageState> msgs = _outboundMessages;
         int rv = 0;
         boolean fail = false;
-        synchronized (msgs) {
-            rv = msgs.size() + 1;
+        synchronized (_outboundMessages) {
+            rv = _outboundMessages.size() + 1;
             if (rv > 32) { 
                 // 32 queued messages?  to *one* peer?  nuh uh.
                 fail = true;
@@ -1218,7 +1217,7 @@ class PeerState {
              *******/
 
             } else {
-                msgs.add(state);
+                _outboundMessages.add(state);
             }
         }
         if (fail)
@@ -1230,17 +1229,16 @@ class PeerState {
     public void dropOutbound() {
         //if (_dead) return;
         _dead = true;
-        List<OutboundMessageState> msgs = _outboundMessages;
         //_outboundMessages = null;
         _retransmitter = null;
 
             int sz = 0;
             List<OutboundMessageState> tempList = null;
-            synchronized (msgs) {
-                sz = msgs.size();
+            synchronized (_outboundMessages) {
+                sz = _outboundMessages.size();
                 if (sz > 0) {
-                    tempList = new ArrayList(msgs);
-                    msgs.clear();
+                    tempList = new ArrayList(_outboundMessages);
+                    _outboundMessages.clear();
 		}
             }
             for (int i = 0; i < sz; i++)
@@ -1263,9 +1261,8 @@ class PeerState {
      * @return number of active outbound messages remaining
      */
     public int finishMessages() {
-        List<OutboundMessageState> msgs = _outboundMessages;
         // short circuit, unsynchronized
-        if (msgs.isEmpty())
+        if (_outboundMessages.isEmpty())
             return 0;
 
         if (_dead) {
@@ -1276,8 +1273,8 @@ class PeerState {
         int rv = 0;
         List<OutboundMessageState> succeeded = null;
         List<OutboundMessageState> failed = null;
-        synchronized (msgs) {
-            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
+        synchronized (_outboundMessages) {
+            for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 OutboundMessageState state = iter.next();
                 if (state.isComplete()) {
                     iter.remove();
@@ -1301,7 +1298,7 @@ class PeerState {
                     failed.add(state);
                 } // end (pushCount > maxVolleys)
             } // end iterating over outbound messages
-            rv = msgs.size();
+            rv = _outboundMessages.size();
         }
         
         for (int i = 0; succeeded != null && i < succeeded.size(); i++) {
@@ -1337,11 +1334,9 @@ class PeerState {
      *
      */
     public OutboundMessageState allocateSend() {
-        int total = 0;
-        List<OutboundMessageState> msgs = _outboundMessages;
         if (_dead) return null;
-        synchronized (msgs) {
-            for (OutboundMessageState state : msgs) {
+        synchronized (_outboundMessages) {
+            for (OutboundMessageState state : _outboundMessages) {
                 if (locked_shouldSend(state)) {
                     if (_log.shouldLog(Log.DEBUG))
                         _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId());
@@ -1360,10 +1355,9 @@ class PeerState {
                         msg.timestamp("passed over for allocation with " + msgs.size() + " peers");
                 } */
             }
-            total = msgs.size();
         }
         if (_log.shouldLog(Log.DEBUG))
-            _log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + total + " remaining");
+            _log.debug("Nothing to send to " + _remotePeer.toBase64() + ", with " + _outboundMessages.size() + " remaining");
         return null;
     }
     
@@ -1375,13 +1369,12 @@ class PeerState {
         int rv = Integer.MAX_VALUE;
         if (_dead) return rv;
         long now = _context.clock().now();
-        List<OutboundMessageState> msgs = _outboundMessages;
-        synchronized (msgs) {
+        synchronized (_outboundMessages) {
             if (_retransmitter != null) {
                 rv = (int)(_retransmitter.getNextSendTime() - now);
                 return rv;
             }
-            for (OutboundMessageState state : msgs) {
+            for (OutboundMessageState state : _outboundMessages) {
                 int delay = (int)(state.getNextSendTime() - now);
                 if (delay < rv)
                     rv = delay;
@@ -1512,9 +1505,8 @@ class PeerState {
     public boolean acked(long messageId) {
         if (_dead) return false;
         OutboundMessageState state = null;
-        List<OutboundMessageState> msgs = _outboundMessages;
-        synchronized (msgs) {
-            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
+        synchronized (_outboundMessages) {
+            for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
                 if (state.getMessageId() == messageId) {
                     iter.remove();
@@ -1574,12 +1566,10 @@ class PeerState {
             return acked(bitfield.getMessageId());
         }
     
-        List<OutboundMessageState> msgs = _outboundMessages;
-        
         OutboundMessageState state = null;
         boolean isComplete = false;
-        synchronized (msgs) {
-            for (Iterator<OutboundMessageState> iter = msgs.iterator(); iter.hasNext(); ) {
+        synchronized (_outboundMessages) {
+            for (Iterator<OutboundMessageState> iter = _outboundMessages.iterator(); iter.hasNext(); ) {
                 state = iter.next();
                 if (state.getMessageId() == bitfield.getMessageId()) {
                     boolean complete = state.acked(bitfield);
-- 
GitLab