From d7c3a53f2dbb5e0936f49ed6a07719e50dd79bbf Mon Sep 17 00:00:00 2001 From: human <human> Date: Wed, 21 Apr 2004 17:56:16 +0000 Subject: [PATCH] Initial implementation of read() timeout on I2PSocket. Let's see whether it could solve duck's problems with dangling threads... (human) --- .../net/i2p/client/streaming/I2PSocket.java | 16 +++++++- .../i2p/client/streaming/I2PSocketImpl.java | 40 +++++++++++++++++-- .../client/streaming/I2PSocketOptions.java | 6 ++- 3 files changed, 56 insertions(+), 6 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index 1dad61f817..3a12a508e8 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -32,8 +32,22 @@ public interface I2PSocket { */ public OutputStream getOutputStream() throws IOException; + /** + * How long we will wait blocked on a read() operation. + * + * @return milliseconds to wait, or -1 if we will wait indefinitely + */ + public long getReadTimeout(); + + /** + * Define how long we will wait blocked on a read() operation (-1 will make + * the socket wait forever). + * + */ + public void setReadTimeout(long ms); + /** * Closes the socket if not closed yet */ public void close() throws IOException; -} \ No newline at end of file +} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 39105b1c4d..eb7e641468 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -65,14 +65,14 @@ class I2PSocketImpl implements I2PSocket { synchronized (remoteIDWaiter) { if (wait) { try { - if (maxWait > 0) + if (maxWait >= 0) remoteIDWaiter.wait(maxWait); else remoteIDWaiter.wait(); } catch (InterruptedException ex) { } - if ((maxWait > 0) && (System.currentTimeMillis() > dieAfter)) + if ((maxWait >= 0) && (System.currentTimeMillis() >= dieAfter)) throw new InterruptedIOException("Timed out waiting for remote ID"); _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for " @@ -146,11 +146,29 @@ class I2PSocketImpl implements I2PSocket { return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add); } + public long getReadTimeout() { + return in.getReadTimeout(); + } + + public void setReadTimeout(long ms) { + in.setReadTimeout(ms); + } + //-------------------------------------------------- public class I2PInputStream extends InputStream { private ByteCollector bc = new ByteCollector(); + private long readTimeout = -1; + + public long getReadTimeout() { + return readTimeout; + } + + public void setReadTimeout(long ms) { + readTimeout = ms; + } + public int read() throws IOException { byte[] b = new byte[1]; int res = read(b); @@ -162,7 +180,10 @@ class I2PSocketImpl implements I2PSocket { public synchronized int read(byte[] b, int off, int len) throws IOException { _log.debug("Read called: " + this.hashCode()); if (len == 0) return 0; + long dieAfter = System.currentTimeMillis() + readTimeout; byte[] read = bc.startToByteArray(len); + boolean timedOut = false; + while (read.length == 0) { synchronized (flagLock) { if (closed) { @@ -171,9 +192,18 @@ class I2PSocketImpl implements I2PSocket { } } try { - wait(); - } catch (InterruptedException ex) { + if (readTimeout >= 0) { + wait(readTimeout); + } else { + wait(); + } + } catch (InterruptedException ex) {} + + if ((readTimeout >= 0) + && (System.currentTimeMillis() >= dieAfter)) { + throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)"); } + read = bc.startToByteArray(len); } if (read.length > len) throw new RuntimeException("BUG"); @@ -304,6 +334,8 @@ class I2PSocketImpl implements I2PSocket { } } manager.removeSocket(I2PSocketImpl.this); + } catch (InterruptedIOException ex) { + _log.error("BUG! read() operations should not timeout!", ex); } catch (IOException ex) { // WHOEVER removes this event on inconsistent // state before fixing the inconsistent state (a diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index ee71cf72a6..d29360fb1d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -21,7 +21,11 @@ public class I2PSocketOptions { return _connectTimeout; } + /** + * Define how long we will wait for the ACK from a SYN, in milliseconds. + * + */ public void setConnectTimeout(long ms) { _connectTimeout = ms; } -} \ No newline at end of file +} -- GitLab