diff --git a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java index ce257dd60061ec058a24c4b838f478f2363019ba..de7752cd6bbdf9be08edc7d3626c9f3b8eb6edfe 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/impl/MessageInputStream.java @@ -265,7 +265,7 @@ class MessageInputStream extends InputStream { */ public void setReadTimeout(int timeout) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout); + _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout + ": " + hashCode()); _readTimeout = timeout; } @@ -286,7 +286,7 @@ class MessageInputStream extends InputStream { buf.append(available); buf.append(" blocks: ").append(_readyDataBlocks.size()); - buf.append(" not ready blocks: "); + buf.append(" not ready blocks: ["); long notAvailable = 0; for (Long id : _notYetReadyBlocks.keySet()) { ByteArray ba = _notYetReadyBlocks.get(id); @@ -296,8 +296,9 @@ class MessageInputStream extends InputStream { notAvailable += ba.getValid(); } - buf.append("not ready bytes: ").append(notAvailable); + buf.append("] not ready bytes: ").append(notAvailable); buf.append(" highest ready block: ").append(_highestReadyBlockId); + buf.append(" ID: ").append(hashCode()); _log.debug(buf.toString(), new Exception("Input stream closed")); } @@ -408,7 +409,8 @@ class MessageInputStream extends InputStream { synchronized (_dataLock) { if (_locallyClosed) throw new IOException("Input stream closed"); throwAnyError(); - for (int i = 0; i < length; i++) { + int i = 0; + while (i < length) { if ( (_readyDataBlocks.isEmpty()) && (i == 0) ) { // ok, we havent found anything, so lets block until we get // at least one byte @@ -475,35 +477,33 @@ class MessageInputStream extends InputStream { } } } - // we looped a few times then got data, so this pass doesnt count - i--; } else if (_readyDataBlocks.isEmpty()) { if (shouldDebug) _log.debug("read(...," + offset+", " + length+ ")[" + i - + "] no more ready blocks, returning"); + + "] no more ready blocks, returning: " + hashCode()); return i; } else { // either was already ready, or we wait()ed and it arrived ByteArray cur = _readyDataBlocks.get(0); - byte rv = cur.getData()[cur.getOffset()+_readyDataBlockIndex]; - _readyDataBlockIndex++; + int toRead = Math.min(cur.getValid() - _readyDataBlockIndex, length - i); + System.arraycopy(cur.getData(), cur.getOffset() + _readyDataBlockIndex, target, offset + i, toRead); + _readyDataBlockIndex += toRead; if (cur.getValid() <= _readyDataBlockIndex) { _readyDataBlockIndex = 0; _readyDataBlocks.remove(0); } - _readTotal++; - target[offset + i] = rv; // rv < 0 ? rv + 256 : rv - if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getValid() - 5) ) { - if (shouldDebug) + _readTotal += toRead; + if (shouldDebug) { _log.debug("read(...," + offset+", " + length+ ")[" + i - + "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex + + "] copied " + toRead + " after ready data: readyDataBlockIndex=" + _readyDataBlockIndex + " readyBlocks=" + _readyDataBlocks.size() - + " readTotal=" + _readTotal); + + " readTotal=" + _readTotal + ": " + hashCode()); } + i += toRead; //if (removed) // _cache.release(cur); } - } // for (int i = 0; i < length; i++) { + } // while (i < length) { } // synchronized (_dataLock) if (shouldDebug) @@ -527,7 +527,7 @@ class MessageInputStream extends InputStream { } } if (_log.shouldLog(Log.DEBUG)) - _log.debug("available(): " + numBytes); + _log.debug("available(): " + numBytes + ": " + hashCode()); return numBytes; } @@ -587,7 +587,7 @@ class MessageInputStream extends InputStream { available -= _readyDataBlockIndex; buf.append(available); buf.append(" blocks: ").append(_readyDataBlocks.size()); - buf.append(" not ready blocks: "); + buf.append(" not ready blocks: ["); long notAvailable = 0; for (Long id : _notYetReadyBlocks.keySet()) { ByteArray ba = _notYetReadyBlocks.get(id); @@ -595,8 +595,9 @@ class MessageInputStream extends InputStream { if (ba != null) notAvailable += ba.getValid(); } - buf.append("not ready bytes: ").append(notAvailable); + buf.append("] not ready bytes: ").append(notAvailable); buf.append(" highest ready block: ").append(_highestReadyBlockId); + buf.append(" ID: ").append(hashCode()); _log.debug(buf.toString()); } //while (_readyDataBlocks.size() > 0)