I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit bc46ad43 authored by jrandom's avatar jrandom Committed by zzz
Browse files

only block adding more *outbound* data, not handling data received from I2P. ...

only block adding more *outbound* data, not handling data received from I2P.  The data has already been received by the router and delivered to the streaming lib (and is sitting in RAM anyway...)
logging
parent be08e8f2
No related branches found
No related tags found
No related merge requests found
......@@ -75,8 +75,10 @@ class I2PSocketImpl implements I2PSocket {
remote = peer;
_socketId = ++__socketId;
local = mgr.getSession().getMyDestination();
in = new I2PInputStream();
I2PInputStream pin = new I2PInputStream();
String us = mgr.getSession().getMyDestination().calculateHash().toBase64().substring(0,4);
String name = us + (outgoing ? "->" : "<-") + peer.calculateHash().toBase64().subSequence(0,4);
in = new I2PInputStream(name + " in");
I2PInputStream pin = new I2PInputStream(name + " out");
out = new I2POutputStream(pin);
new I2PSocketRunner(pin);
this.localID = localID;
......@@ -180,19 +182,9 @@ class I2PSocketImpl implements I2PSocket {
public void queueData(byte[] data) {
_bytesRead += data.length;
try {
in.queueData(data);
} catch (InterruptedIOException iie) {
if (_log.shouldLog(Log.ERROR))
_log.error("Queue overflow, closing the stream", iie);
try {
close();
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Error closing the stream due to overflow", ioe);
}
in.queueData(data, false);
} catch (IOException ioe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Connection closed while writing to the socket", ioe);
_log.log(Log.CRIT, "wtf, we said DONT block, how can we timeout?", ioe);
}
}
......@@ -311,16 +303,24 @@ class I2PSocketImpl implements I2PSocket {
//--------------------------------------------------
private class I2PInputStream extends InputStream {
private String streamName;
private ByteCollector bc = new ByteCollector();
private boolean inStreamClosed = false;
private long readTimeout = -1;
public I2PInputStream(String name) {
streamName = name;
}
public long getReadTimeout() {
return readTimeout;
}
private String getStreamPrefix() {
return getPrefix() + streamName + ": ";
}
public void setReadTimeout(long ms) {
readTimeout = ms;
}
......@@ -335,7 +335,8 @@ class I2PSocketImpl implements I2PSocket {
public int read(byte[] b, int off, int len) throws IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Read called for " + len + " bytes (avail=" + bc.getCurrentSize() + "): " + this.hashCode());
_log.debug(getStreamPrefix() + "Read called for " + len + " bytes (avail="
+ bc.getCurrentSize() + "): " + this.hashCode());
if (len == 0) return 0;
long dieAfter = System.currentTimeMillis() + readTimeout;
byte[] read = null;
......@@ -349,7 +350,9 @@ class I2PSocketImpl implements I2PSocket {
synchronized (flagLock) {
if (closed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Closed is set after reading " + _bytesRead + " and writing " + _bytesWritten + ", so closing stream: " + hashCode());
_log.debug(getStreamPrefix() + "Closed is set after reading "
+ _bytesRead + " and writing " + _bytesWritten
+ ", so closing stream: " + hashCode());
return -1;
}
}
......@@ -365,7 +368,8 @@ class I2PSocketImpl implements I2PSocket {
if ((readTimeout >= 0)
&& (System.currentTimeMillis() >= dieAfter)) {
throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)");
throw new InterruptedIOException(getStreamPrefix() + "Timeout reading from I2PSocket ("
+ readTimeout + " msecs)");
}
synchronized (bc) {
......@@ -377,7 +381,7 @@ class I2PSocketImpl implements I2PSocket {
System.arraycopy(read, 0, b, off, read.length);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getPrefix() + "Read from I2PInputStream " + hashCode() + " returned "
_log.debug(getStreamPrefix() + "Read from I2PInputStream " + hashCode() + " returned "
+ read.length + " bytes");
}
//if (_log.shouldLog(Log.DEBUG)) {
......@@ -397,46 +401,54 @@ class I2PSocketImpl implements I2PSocket {
/**
* Add the data to the queue
*
* @param allowBlock if true, we will block if the buffer and the socket options
* say so, otherwise we simply take the data regardless.
* @throws InterruptedIOException if the queue's buffer is full, the socket has
* a write timeout, and that timeout is exceeded
* @throws IOException if the connection was closed while queueing up the data
*/
public void queueData(byte[] data) throws InterruptedIOException, IOException {
queueData(data, 0, data.length);
void queueData(byte[] data, boolean allowBlock) throws InterruptedIOException, IOException {
queueData(data, 0, data.length, allowBlock);
}
/**
* Add the data to the queue
*
* @param allowBlock if true, we will block if the buffer and the socket options
* say so, otherwise we simply take the data regardless.
* @throws InterruptedIOException if the queue's buffer is full, the socket has
* a write timeout, and that timeout is exceeded
* @throws IOException if the connection was closed while queueing up the data
*/
public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException {
public void queueData(byte[] data, int off, int len, boolean allowBlock) throws InterruptedIOException, IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
_log.debug(getStreamPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
Clock clock = I2PAppContext.getGlobalContext().clock();
long endAfter = clock.now() + _options.getWriteTimeout();
synchronized (bc) {
if (_options.getMaxBufferSize() > 0) {
while (bc.getCurrentSize() > _options.getMaxBufferSize()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
if (_options.getWriteTimeout() > 0) {
long timeLeft = endAfter - clock.now();
if (timeLeft <= 0) {
long waited = _options.getWriteTimeout() - timeLeft;
throw new InterruptedIOException("Waited too long (" + waited + "ms) to write "
+ len + " with a buffer at " + bc.getCurrentSize());
if (allowBlock) {
if (_options.getMaxBufferSize() > 0) {
while (bc.getCurrentSize() > _options.getMaxBufferSize()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "Buffer size exceeded: pending "
+ bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
if (_options.getWriteTimeout() > 0) {
long timeLeft = endAfter - clock.now();
if (timeLeft <= 0) {
long waited = _options.getWriteTimeout() - timeLeft;
throw new InterruptedIOException(getStreamPrefix() + "Waited too long ("
+ waited + "ms) to write "
+ len + " with a buffer at " + bc.getCurrentSize());
}
}
if (inStreamClosed)
throw new IOException(getStreamPrefix() + "Stream closed while writing");
if (_closedOn > 0)
throw new IOException(getStreamPrefix() + "I2PSocket closed while writing");
try {
bc.wait(1000);
} catch (InterruptedException ie) {}
}
if (inStreamClosed)
throw new IOException("Stream closed while writing");
if (_closedOn > 0)
throw new IOException("I2PSocket closed while writing");
try {
bc.wait(1000);
} catch (InterruptedException ie) {}
}
}
bc.append(data, off, len);
......@@ -477,7 +489,7 @@ class I2PSocketImpl implements I2PSocket {
public void write(byte[] b, int off, int len) throws IOException {
_bytesWritten += len;
sendTo.queueData(b, off, len);
sendTo.queueData(b, off, len, true);
}
public void close() {
......@@ -536,6 +548,9 @@ class I2PSocketImpl implements I2PSocket {
_log.warn(getPrefix() + "Error sending message to peer. Killing socket runner");
errorOccurred();
return false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message sent to peer");
}
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment