From 71c1cb4e124899980fe8dda50c1ef5d399d3840d Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Mon, 8 Nov 2004 05:42:57 +0000
Subject: [PATCH] * min resend delay = 20s * rework the messageInputStream to
 implement read(byte[], off, len), and fix some fencepost   bugs in the byte
 retrieval

---
 .../net/i2p/client/streaming/Connection.java  |   5 +-
 .../streaming/ConnectionDataReceiver.java     |   4 +-
 .../client/streaming/MessageInputStream.java  | 146 ++++++++++++------
 3 files changed, 103 insertions(+), 52 deletions(-)

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index ed2b56e7af..d2eee7c89c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -55,6 +55,7 @@ public class Connection {
     private String _connectionError;
     
     public static final long MAX_RESEND_DELAY = 60*1000;
+    public static final long MIN_RESEND_DELAY = 20*1000;
     
     public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) {
         this(ctx, manager, chooser, queue, handler, null);
@@ -178,7 +179,7 @@ public class Connection {
             }
             packet.setFlag(Packet.FLAG_DELAY_REQUESTED);
             
-            long timeout = (_options.getRTT() < 10000 ? 10000 : _options.getRTT());
+            long timeout = (_options.getRTT() < MIN_RESEND_DELAY ? MIN_RESEND_DELAY : _options.getRTT());
             if (timeout > MAX_RESEND_DELAY)
                 timeout = MAX_RESEND_DELAY;
             if (_log.shouldLog(Log.DEBUG))
@@ -491,6 +492,8 @@ public class Connection {
                 } else {
                     //long timeout = _options.getResendDelay() << numSends;
                     long timeout = _options.getRTT() << (numSends-1);
+                    if (timeout < MIN_RESEND_DELAY)
+                        timeout = MIN_RESEND_DELAY;
                     if ( (timeout > MAX_RESEND_DELAY) || (timeout <= 0) )
                         timeout = MAX_RESEND_DELAY;
                     if (_log.shouldLog(Log.DEBUG))
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
index 1dc8747e0c..ee014ccb18 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java
@@ -40,8 +40,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver {
         if (_connection.getUnackedPacketsReceived() > 0)
             doSend = true;
         
-        if (_log.shouldLog(Log.ERROR) && !doSend)
-            _log.error("writeData called: size="+size + " doSend=" + doSend 
+        if (_log.shouldLog(Log.INFO) && !doSend)
+            _log.info("writeData called: size="+size + " doSend=" + doSend 
                        + " unackedReceived: " + _connection.getUnackedPacketsReceived()
                        + " con: " + _connection, new Exception("write called by"));
 
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
index 4331034870..b87c5c0d2d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
@@ -53,6 +53,8 @@ public class MessageInputStream extends InputStream {
     private IOException _streamError;
     private long _readTotal;
     
+    private byte[] _oneByte = new byte[1];
+    
     private Object _dataLock;
     
     public MessageInputStream(I2PAppContext ctx) {
@@ -205,6 +207,7 @@ public class MessageInputStream extends InputStream {
             if (messageId <= _highestReadyBlockId) {
                 if (_log.shouldLog(Log.DEBUG))
                     _log.debug("ignoring dup message " + messageId);
+                _dataLock.notifyAll();
                 return false; // already received
             }
             if (messageId > _highestBlockId)
@@ -238,76 +241,118 @@ public class MessageInputStream extends InputStream {
                     _notYetReadyBlocks.put(new Long(messageId), new ByteArray(null));
                 else
                     _notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload));
+                _dataLock.notifyAll();
             }
         }
         return true;
     }
     
     public int read() throws IOException {
+        int read = read(_oneByte, 0, 1);
+        if (read < 0)
+            return -1;
+        else
+            return _oneByte[0];
+    }
+    
+    public int read(byte target[]) throws IOException {
+        return read(target, 0, target.length);
+    }
+    
+    public int read(byte target[], int offset, int length) throws IOException {
         if (_locallyClosed) throw new IOException("Already locally closed");
         throwAnyError();
         long expiration = -1;
         if (_readTimeout > 0)
             expiration = _readTimeout + System.currentTimeMillis();
         synchronized (_dataLock) {
-            while (_readyDataBlocks.size() <= 0) {
-                //if (_log.shouldLog(Log.DEBUG))
-                //    _log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString());
-                
-                if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
-                    if (_log.shouldLog(Log.DEBUG))
-                        _log.debug("read() got EOF after " + _readTotal + " " + toString());
-                    return -1;
+            for (int i = 0; i < length; i++) {
+                if ( (_readyDataBlocks.size() <= 0) && (i == 0) ) {
+                    // ok, we havent found anything, so lets block until we get 
+                    // at least one byte
+                    
+                    while (_readyDataBlocks.size() <= 0) {
+                        if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
+                            if (_log.shouldLog(Log.INFO))
+                                _log.info("read(...," + offset + ", " + length + ")[" + i 
+                                           + "] got EOF after " + _readTotal + " " + toString());
+                            return -1;
+                        } else {
+                            if (_readTimeout < 0) {
+                                if (_log.shouldLog(Log.DEBUG))
+                                    _log.debug("read(...," + offset+", " + length+ ")[" + i 
+                                               + ") with no timeout: " + toString());
+                                try { _dataLock.wait(); } catch (InterruptedException ie) { }
+                                if (_log.shouldLog(Log.DEBUG))
+                                    _log.debug("read(...," + offset+", " + length+ ")[" + i 
+                                               + ") with no timeout complete: " + toString());
+                                throwAnyError();
+                            } else if (_readTimeout > 0) {
+                                if (_log.shouldLog(Log.DEBUG))
+                                    _log.debug("read(...," + offset+", " + length+ ")[" + i 
+                                               + ") with timeout: " + _readTimeout + ": " + toString());
+                                try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { }
+                                if (_log.shouldLog(Log.DEBUG))
+                                    _log.debug("read(...," + offset+", " + length+ ")[" + i 
+                                               + ") with timeout complete: " + _readTimeout + ": " + toString());
+                                throwAnyError();
+                            } else { // readTimeout == 0
+                                // noop, don't block
+                                if (_log.shouldLog(Log.INFO))
+                                    _log.info("read(...," + offset+", " + length+ ")[" + i 
+                                               + ") with nonblocking setup: " + toString());
+                                return i;
+                            }
+                            if (_readyDataBlocks.size() <= 0) {
+                                if ( (_readTimeout > 0) && (expiration < System.currentTimeMillis()) ) {
+                                    if (_log.shouldLog(Log.INFO))
+                                        _log.info("read(...," + offset+", " + length+ ")[" + i 
+                                                   + ") expired: " + toString());
+                                    return i;
+                                }
+                            }
+                        }
+                    }
+                    // we looped a few times then got data, so this pass doesnt count
+                    i--;
+                } else if (_readyDataBlocks.size() <= 0) {
+                    if (_log.shouldLog(Log.INFO))
+                        _log.info("read(...," + offset+", " + length+ ")[" + i 
+                                   + "] no more ready blocks, returning");
+                    return i;
                 } else {
-                    if (_readTimeout < 0) {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("read() with no timeout: " + toString());
-                        try { _dataLock.wait(); } catch (InterruptedException ie) { }
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("read() with no timeout complete: " + toString());
-                        throwAnyError();
-                    } else if (_readTimeout > 0) {
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("read() with timeout: " + _readTimeout + ": " + toString());
-                        try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { }
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("read() with timeout complete: " + _readTimeout + ": " + toString());
-                        throwAnyError();
-                    } else { // readTimeout == 0
-                        // noop, don't block
-                        if (_log.shouldLog(Log.DEBUG))
-                            _log.debug("read() with nonblocking setup: " + toString());
+                    // either was already ready, or we wait()ed and it arrived
+                    ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
+                    byte rv = cur.getData()[_readyDataBlockIndex];
+                    _readyDataBlockIndex++;
+                    if (cur.getData().length <= _readyDataBlockIndex) {
+                        _readyDataBlockIndex = 0;
+                        _readyDataBlocks.remove(0);
                     }
-                    if (_readyDataBlocks.size() <= 0) {
-                        if ( (_readTimeout > 0) && (expiration > System.currentTimeMillis()) )
-                        throw new InterruptedIOException("Timeout reading (timeout=" + _readTimeout + ")");
+                    _readTotal++;
+                    target[offset + i] = rv; // rv < 0 ? rv + 256 : rv
+                    if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getData().length - 5) ) {
+                        if (_log.shouldLog(Log.DEBUG))
+                            _log.debug("read(...," + offset+", " + length+ ")[" + i 
+                                       + "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex 
+                                       + " readyBlocks=" + _readyDataBlocks.size()
+                                       + " readTotal=" + _readTotal);
                     }
                 }
-            }
-            
-            //if (_log.shouldLog(Log.DEBUG))
-            //    _log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString());
-            
-            // either was already ready, or we wait()ed and it arrived
-            ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
-            byte rv = cur.getData()[_readyDataBlockIndex];
-            _readyDataBlockIndex++;
-            if (cur.getData().length <= _readyDataBlockIndex) {
-                _readyDataBlockIndex = 0;
-                _readyDataBlocks.remove(0);
-            }
-            _readTotal++;
-            return (rv < 0 ? rv + 256 : rv);
-        }
+            } // for (int i = 0; i < length; i++) {
+        }  // synchronized (_dataLock)
+        
+        if (_log.shouldLog(Log.DEBUG))
+            _log.info("read(...," + offset+", " + length+ ") read fully total read: " +_readTotal);
+
+        return length;
     }
     
     public int available() throws IOException {
         if (_locallyClosed) throw new IOException("Already closed, you wanker");
         throwAnyError();
+        int numBytes = 0;
         synchronized (_dataLock) {
-            if (_readyDataBlocks.size() <= 0) 
-                return 0;
-            int numBytes = 0;
             for (int i = 0; i < _readyDataBlocks.size(); i++) {
                 ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
                 if (i == 0)
@@ -315,8 +360,11 @@ public class MessageInputStream extends InputStream {
                 else
                     numBytes += cur.getData().length;
             }
-            return numBytes;
         }
+        if (_log.shouldLog(Log.DEBUG))
+            _log.info("available(): " + numBytes + " " + toString());
+        
+        return numBytes;
     }
     
     /**
-- 
GitLab