diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
index 0d4e95d2c93a6c048a53c68921c0a95a00fa453e..14d1dac3fcf0523892fd588e33b173662b165c0e 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
@@ -346,7 +346,7 @@ class I2PSocketImpl implements I2PSocket {
             }
             boolean timedOut = false;
 
-            while (read.length == 0) {
+            while ( (read.length == 0) && (!inStreamClosed) ) {
                 synchronized (flagLock) {
                     if (closed) {
                         if (_log.shouldLog(Log.DEBUG))
@@ -378,6 +378,9 @@ class I2PSocketImpl implements I2PSocket {
                 }
             }
             if (read.length > len) throw new RuntimeException("BUG");
+            if ( (inStreamClosed) && ( (read == null) || (read.length <= 0) ) )
+                return -1;
+            
             System.arraycopy(read, 0, b, off, read.length);
 
             if (_log.shouldLog(Log.DEBUG)) {
@@ -456,6 +459,8 @@ class I2PSocketImpl implements I2PSocket {
             synchronized (I2PInputStream.this) {
                 I2PInputStream.this.notifyAll();
             }
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode());
         }
 
         public void notifyClosed() {
@@ -471,6 +476,8 @@ class I2PSocketImpl implements I2PSocket {
                 inStreamClosed = true;
                 bc.notifyAll();
             }
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(getStreamPrefix() + "After close");
         }
 
     }
@@ -518,10 +525,21 @@ class I2PSocketImpl implements I2PSocket {
          */
         private boolean handleNextPacket(ByteCollector bc, byte buffer[]) 
                                          throws IOException, I2PSessionException {
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket");
             int len = in.read(buffer);
-            int bcsize = bc.getCurrentSize();
+            int bcsize = 0;
+            synchronized (bc) {
+                bcsize = bc.getCurrentSize();
+            }
+
+            if (_log.shouldLog(Log.DEBUG))
+                _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize);
+
             if (len != -1) {
-                bc.append(buffer, len);
+                synchronized (bc) {
+                    bc.append(buffer, len);
+                }
             } else if (bcsize == 0) {
                 // nothing left in the buffer, and read(..) got EOF (-1).
                 // the bart the
@@ -529,7 +547,7 @@ class I2PSocketImpl implements I2PSocket {
             }
             if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
                 if (_log.shouldLog(Log.DEBUG))
-                    _log.debug(getPrefix() + "Runner Point d: " + hashCode());
+                    _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode());
 
                 try {
                     Thread.sleep(PACKET_DELAY);
@@ -538,19 +556,22 @@ class I2PSocketImpl implements I2PSocket {
                 }
             }
             if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
-                byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
+                byte data[] = null;
+                synchronized (bc) {
+                    data = bc.startToByteArray(MAX_PACKET_SIZE);
+                }
                 if (data.length > 0) {
                     if (_log.shouldLog(Log.DEBUG))
-                        _log.debug(getPrefix() + "Message size is: " + data.length);
+                        _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length);
                     boolean sent = sendBlock(data);
                     if (!sent) {
                         if (_log.shouldLog(Log.WARN))
-                            _log.warn(getPrefix() + "Error sending message to peer.  Killing socket runner");
+                            _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer.  Killing socket runner");
                         errorOccurred();
                         return false;
                     } else {
                         if (_log.shouldLog(Log.DEBUG))
-                            _log.debug(getPrefix() + "Message sent to peer");
+                            _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer");
                     }
                 }
             }
@@ -567,7 +588,14 @@ class I2PSocketImpl implements I2PSocket {
                 while (keepHandling) {
                     keepHandling = handleNextPacket(bc, buffer);
                     packetsHandled++;
+                    if (_log.shouldLog(Log.DEBUG))
+                        _log.debug(getPrefix() + ":" + Thread.currentThread().getName() 
+                                   + "Packets handled: " + packetsHandled);
                 }
+                if (_log.shouldLog(Log.INFO))
+                    _log.info(getPrefix() + ":" + Thread.currentThread().getName() 
+                               + "After handling packets, we're done.  Packets handled: " + packetsHandled);
+                
                 if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
                     if (_log.shouldLog(Log.WARN))
                         _log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: " 
@@ -583,16 +611,20 @@ class I2PSocketImpl implements I2PSocket {
                 } // FIXME: Race here?
                 if (sc) {
                     if (_log.shouldLog(Log.INFO))
-                        _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
+                        _log.info(getPrefix() + ":" + Thread.currentThread().getName() 
+                                  + "Sending close packet: (we started? " + outgoing 
+                                  + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
                     byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
                     boolean sent = manager.getSession().sendMessage(remote, packet);
                     if (!sent) {
                         if (_log.shouldLog(Log.WARN))
-                            _log.warn(getPrefix() + "Error sending close packet to peer");
+                            _log.warn(getPrefix() + ":" + Thread.currentThread().getName() 
+                                      + "Error sending close packet to peer");
                         errorOccurred();
                     }
                 }
                 manager.removeSocket(I2PSocketImpl.this);
+                internalClose();
             } catch (InterruptedIOException ex) {
                 _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
             } catch (IOException ex) {