I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit a51d260a authored by zzz's avatar zzz
Browse files

Streaming: More efficient copying in MessageInputStream

Log tweaks
parent b5ed39f1
No related branches found
No related tags found
No related merge requests found
......@@ -265,7 +265,7 @@ class MessageInputStream extends InputStream {
*/
public void setReadTimeout(int timeout) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Changing read timeout from " + _readTimeout + " to " + timeout);
_log.debug("Changing read timeout from " + _readTimeout + " to " + timeout + ": " + hashCode());
_readTimeout = timeout;
}
......@@ -286,7 +286,7 @@ class MessageInputStream extends InputStream {
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
buf.append(" not ready blocks: ");
buf.append(" not ready blocks: [");
long notAvailable = 0;
for (Long id : _notYetReadyBlocks.keySet()) {
ByteArray ba = _notYetReadyBlocks.get(id);
......@@ -296,8 +296,9 @@ class MessageInputStream extends InputStream {
notAvailable += ba.getValid();
}
buf.append("not ready bytes: ").append(notAvailable);
buf.append("] not ready bytes: ").append(notAvailable);
buf.append(" highest ready block: ").append(_highestReadyBlockId);
buf.append(" ID: ").append(hashCode());
_log.debug(buf.toString(), new Exception("Input stream closed"));
}
......@@ -408,7 +409,8 @@ class MessageInputStream extends InputStream {
synchronized (_dataLock) {
if (_locallyClosed) throw new IOException("Input stream closed");
throwAnyError();
for (int i = 0; i < length; i++) {
int i = 0;
while (i < length) {
if ( (_readyDataBlocks.isEmpty()) && (i == 0) ) {
// ok, we havent found anything, so lets block until we get
// at least one byte
......@@ -475,35 +477,33 @@ class MessageInputStream extends InputStream {
}
}
}
// we looped a few times then got data, so this pass doesnt count
i--;
} else if (_readyDataBlocks.isEmpty()) {
if (shouldDebug)
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] no more ready blocks, returning");
+ "] no more ready blocks, returning: " + hashCode());
return i;
} else {
// either was already ready, or we wait()ed and it arrived
ByteArray cur = _readyDataBlocks.get(0);
byte rv = cur.getData()[cur.getOffset()+_readyDataBlockIndex];
_readyDataBlockIndex++;
int toRead = Math.min(cur.getValid() - _readyDataBlockIndex, length - i);
System.arraycopy(cur.getData(), cur.getOffset() + _readyDataBlockIndex, target, offset + i, toRead);
_readyDataBlockIndex += toRead;
if (cur.getValid() <= _readyDataBlockIndex) {
_readyDataBlockIndex = 0;
_readyDataBlocks.remove(0);
}
_readTotal++;
target[offset + i] = rv; // rv < 0 ? rv + 256 : rv
if ( (_readyDataBlockIndex <= 3) || (_readyDataBlockIndex >= cur.getValid() - 5) ) {
if (shouldDebug)
_readTotal += toRead;
if (shouldDebug) {
_log.debug("read(...," + offset+", " + length+ ")[" + i
+ "] after ready data: readyDataBlockIndex=" + _readyDataBlockIndex
+ "] copied " + toRead + " after ready data: readyDataBlockIndex=" + _readyDataBlockIndex
+ " readyBlocks=" + _readyDataBlocks.size()
+ " readTotal=" + _readTotal);
+ " readTotal=" + _readTotal + ": " + hashCode());
}
i += toRead;
//if (removed)
// _cache.release(cur);
}
} // for (int i = 0; i < length; i++) {
} // while (i < length) {
} // synchronized (_dataLock)
if (shouldDebug)
......@@ -527,7 +527,7 @@ class MessageInputStream extends InputStream {
}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("available(): " + numBytes);
_log.debug("available(): " + numBytes + ": " + hashCode());
return numBytes;
}
......@@ -587,7 +587,7 @@ class MessageInputStream extends InputStream {
available -= _readyDataBlockIndex;
buf.append(available);
buf.append(" blocks: ").append(_readyDataBlocks.size());
buf.append(" not ready blocks: ");
buf.append(" not ready blocks: [");
long notAvailable = 0;
for (Long id : _notYetReadyBlocks.keySet()) {
ByteArray ba = _notYetReadyBlocks.get(id);
......@@ -595,8 +595,9 @@ class MessageInputStream extends InputStream {
if (ba != null)
notAvailable += ba.getValid();
}
buf.append("not ready bytes: ").append(notAvailable);
buf.append("] not ready bytes: ").append(notAvailable);
buf.append(" highest ready block: ").append(_highestReadyBlockId);
buf.append(" ID: ").append(hashCode());
_log.debug(buf.toString());
}
//while (_readyDataBlocks.size() > 0)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment