diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index e883a3b6f9ae1201c292cd7b9020b179a46f8521..0d4e95d2c93a6c048a53c68921c0a95a00fa453e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -75,8 +75,10 @@ class I2PSocketImpl implements I2PSocket { remote = peer; _socketId = ++__socketId; local = mgr.getSession().getMyDestination(); - in = new I2PInputStream(); - I2PInputStream pin = new I2PInputStream(); + String us = mgr.getSession().getMyDestination().calculateHash().toBase64().substring(0,4); + String name = us + (outgoing ? "->" : "<-") + peer.calculateHash().toBase64().subSequence(0,4); + in = new I2PInputStream(name + " in"); + I2PInputStream pin = new I2PInputStream(name + " out"); out = new I2POutputStream(pin); new I2PSocketRunner(pin); this.localID = localID; @@ -180,19 +182,9 @@ class I2PSocketImpl implements I2PSocket { public void queueData(byte[] data) { _bytesRead += data.length; try { - in.queueData(data); - } catch (InterruptedIOException iie) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Queue overflow, closing the stream", iie); - try { - close(); - } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error closing the stream due to overflow", ioe); - } + in.queueData(data, false); } catch (IOException ioe) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Connection closed while writing to the socket", ioe); + _log.log(Log.CRIT, "wtf, we said DONT block, how can we timeout?", ioe); } } @@ -311,16 +303,24 @@ class I2PSocketImpl implements I2PSocket { //-------------------------------------------------- private class I2PInputStream extends InputStream { - + private String streamName; private ByteCollector bc = new ByteCollector(); private boolean inStreamClosed = false; private long readTimeout = -1; + public I2PInputStream(String name) { + streamName = name; + } + public long getReadTimeout() { return readTimeout; } + private String getStreamPrefix() { + return getPrefix() + streamName + ": "; + } + public void setReadTimeout(long ms) { readTimeout = ms; } @@ -335,7 +335,8 @@ class I2PSocketImpl implements I2PSocket { public int read(byte[] b, int off, int len) throws IOException { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Read called for " + len + " bytes (avail=" + bc.getCurrentSize() + "): " + this.hashCode()); + _log.debug(getStreamPrefix() + "Read called for " + len + " bytes (avail=" + + bc.getCurrentSize() + "): " + this.hashCode()); if (len == 0) return 0; long dieAfter = System.currentTimeMillis() + readTimeout; byte[] read = null; @@ -349,7 +350,9 @@ class I2PSocketImpl implements I2PSocket { synchronized (flagLock) { if (closed) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Closed is set after reading " + _bytesRead + " and writing " + _bytesWritten + ", so closing stream: " + hashCode()); + _log.debug(getStreamPrefix() + "Closed is set after reading " + + _bytesRead + " and writing " + _bytesWritten + + ", so closing stream: " + hashCode()); return -1; } } @@ -365,7 +368,8 @@ class I2PSocketImpl implements I2PSocket { if ((readTimeout >= 0) && (System.currentTimeMillis() >= dieAfter)) { - throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)"); + throw new InterruptedIOException(getStreamPrefix() + "Timeout reading from I2PSocket (" + + readTimeout + " msecs)"); } synchronized (bc) { @@ -377,7 +381,7 @@ class I2PSocketImpl implements I2PSocket { System.arraycopy(read, 0, b, off, read.length); if (_log.shouldLog(Log.DEBUG)) { - _log.debug(getPrefix() + "Read from I2PInputStream " + hashCode() + " returned " + _log.debug(getStreamPrefix() + "Read from I2PInputStream " + hashCode() + " returned " + read.length + " bytes"); } //if (_log.shouldLog(Log.DEBUG)) { @@ -397,46 +401,54 @@ class I2PSocketImpl implements I2PSocket { /** * Add the data to the queue * + * @param allowBlock if true, we will block if the buffer and the socket options + * say so, otherwise we simply take the data regardless. * @throws InterruptedIOException if the queue's buffer is full, the socket has * a write timeout, and that timeout is exceeded * @throws IOException if the connection was closed while queueing up the data */ - public void queueData(byte[] data) throws InterruptedIOException, IOException { - queueData(data, 0, data.length); + void queueData(byte[] data, boolean allowBlock) throws InterruptedIOException, IOException { + queueData(data, 0, data.length, allowBlock); } /** * Add the data to the queue * + * @param allowBlock if true, we will block if the buffer and the socket options + * say so, otherwise we simply take the data regardless. * @throws InterruptedIOException if the queue's buffer is full, the socket has * a write timeout, and that timeout is exceeded * @throws IOException if the connection was closed while queueing up the data */ - public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException { + public void queueData(byte[] data, int off, int len, boolean allowBlock) throws InterruptedIOException, IOException { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); + _log.debug(getStreamPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); Clock clock = I2PAppContext.getGlobalContext().clock(); long endAfter = clock.now() + _options.getWriteTimeout(); synchronized (bc) { - if (_options.getMaxBufferSize() > 0) { - while (bc.getCurrentSize() > _options.getMaxBufferSize()) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize()); - if (_options.getWriteTimeout() > 0) { - long timeLeft = endAfter - clock.now(); - if (timeLeft <= 0) { - long waited = _options.getWriteTimeout() - timeLeft; - throw new InterruptedIOException("Waited too long (" + waited + "ms) to write " - + len + " with a buffer at " + bc.getCurrentSize()); + if (allowBlock) { + if (_options.getMaxBufferSize() > 0) { + while (bc.getCurrentSize() > _options.getMaxBufferSize()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getStreamPrefix() + "Buffer size exceeded: pending " + + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize()); + if (_options.getWriteTimeout() > 0) { + long timeLeft = endAfter - clock.now(); + if (timeLeft <= 0) { + long waited = _options.getWriteTimeout() - timeLeft; + throw new InterruptedIOException(getStreamPrefix() + "Waited too long (" + + waited + "ms) to write " + + len + " with a buffer at " + bc.getCurrentSize()); + } } + if (inStreamClosed) + throw new IOException(getStreamPrefix() + "Stream closed while writing"); + if (_closedOn > 0) + throw new IOException(getStreamPrefix() + "I2PSocket closed while writing"); + try { + bc.wait(1000); + } catch (InterruptedException ie) {} } - if (inStreamClosed) - throw new IOException("Stream closed while writing"); - if (_closedOn > 0) - throw new IOException("I2PSocket closed while writing"); - try { - bc.wait(1000); - } catch (InterruptedException ie) {} } } bc.append(data, off, len); @@ -477,7 +489,7 @@ class I2PSocketImpl implements I2PSocket { public void write(byte[] b, int off, int len) throws IOException { _bytesWritten += len; - sendTo.queueData(b, off, len); + sendTo.queueData(b, off, len, true); } public void close() { @@ -536,6 +548,9 @@ class I2PSocketImpl implements I2PSocket { _log.warn(getPrefix() + "Error sending message to peer. Killing socket runner"); errorOccurred(); return false; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message sent to peer"); } } }