diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index 1dad61f8177f8fea0b90369dfa29dd32d45afa55..3a12a508e889fbf5987e6ffbfdb22cd12f922d39 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -32,8 +32,22 @@ public interface I2PSocket { */ public OutputStream getOutputStream() throws IOException; + /** + * How long we will wait blocked on a read() operation. + * + * @return milliseconds to wait, or -1 if we will wait indefinitely + */ + public long getReadTimeout(); + + /** + * Define how long we will wait blocked on a read() operation (-1 will make + * the socket wait forever). + * + */ + public void setReadTimeout(long ms); + /** * Closes the socket if not closed yet */ public void close() throws IOException; -} \ No newline at end of file +} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 39105b1c4da9cf01877b9da8e84c78d18883f93e..eb7e6414688db9f587e36764f6564fb384826cfa 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -65,14 +65,14 @@ class I2PSocketImpl implements I2PSocket { synchronized (remoteIDWaiter) { if (wait) { try { - if (maxWait > 0) + if (maxWait >= 0) remoteIDWaiter.wait(maxWait); else remoteIDWaiter.wait(); } catch (InterruptedException ex) { } - if ((maxWait > 0) && (System.currentTimeMillis() > dieAfter)) + if ((maxWait >= 0) && (System.currentTimeMillis() >= dieAfter)) throw new InterruptedIOException("Timed out waiting for remote ID"); _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for " @@ -146,11 +146,29 @@ class I2PSocketImpl implements I2PSocket { return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add); } + public long getReadTimeout() { + return in.getReadTimeout(); + } + + public void setReadTimeout(long ms) { + in.setReadTimeout(ms); + } + //-------------------------------------------------- public class I2PInputStream extends InputStream { private ByteCollector bc = new ByteCollector(); + private long readTimeout = -1; + + public long getReadTimeout() { + return readTimeout; + } + + public void setReadTimeout(long ms) { + readTimeout = ms; + } + public int read() throws IOException { byte[] b = new byte[1]; int res = read(b); @@ -162,7 +180,10 @@ class I2PSocketImpl implements I2PSocket { public synchronized int read(byte[] b, int off, int len) throws IOException { _log.debug("Read called: " + this.hashCode()); if (len == 0) return 0; + long dieAfter = System.currentTimeMillis() + readTimeout; byte[] read = bc.startToByteArray(len); + boolean timedOut = false; + while (read.length == 0) { synchronized (flagLock) { if (closed) { @@ -171,9 +192,18 @@ class I2PSocketImpl implements I2PSocket { } } try { - wait(); - } catch (InterruptedException ex) { + if (readTimeout >= 0) { + wait(readTimeout); + } else { + wait(); + } + } catch (InterruptedException ex) {} + + if ((readTimeout >= 0) + && (System.currentTimeMillis() >= dieAfter)) { + throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)"); } + read = bc.startToByteArray(len); } if (read.length > len) throw new RuntimeException("BUG"); @@ -304,6 +334,8 @@ class I2PSocketImpl implements I2PSocket { } } manager.removeSocket(I2PSocketImpl.this); + } catch (InterruptedIOException ex) { + _log.error("BUG! read() operations should not timeout!", ex); } catch (IOException ex) { // WHOEVER removes this event on inconsistent // state before fixing the inconsistent state (a diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index ee71cf72a612dc875830307cab6507528f88cd8f..d29360fb1d025924e2fe790f540307975fb00396 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -21,7 +21,11 @@ public class I2PSocketOptions { return _connectTimeout; } + /** + * Define how long we will wait for the ACK from a SYN, in milliseconds. + * + */ public void setConnectTimeout(long ms) { _connectTimeout = ms; } -} \ No newline at end of file +}