diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 3ab620d5275f1466bafd6474f2c66d3ebcdfa370..a249b2c2974d003b2b7cd6e670af50c5017e48f3 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -5,12 +5,14 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; +import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.I2PThread; import net.i2p.util.Log; + /** * Initial stub implementation for the socket * @@ -35,6 +37,9 @@ class I2PSocketImpl implements I2PSocket { private static long __socketId = 0; private long _bytesRead = 0; private long _bytesWritten = 0; + private long _createdOn; + private long _closedOn; + private long _remoteIdSetTime; private Object flagLock = new Object(); /** @@ -73,6 +78,9 @@ class I2PSocketImpl implements I2PSocket { out = new I2POutputStream(pin); new I2PSocketRunner(pin); this.localID = localID; + _createdOn = I2PAppContext.getGlobalContext().clock().now(); + _remoteIdSetTime = -1; + _closedOn = -1; } /** @@ -89,6 +97,7 @@ class I2PSocketImpl implements I2PSocket { public void setRemoteID(String id) { synchronized (remoteIDWaiter) { remoteID = id; + _remoteIdSetTime = System.currentTimeMillis(); remoteIDWaiter.notifyAll(); } } @@ -123,18 +132,22 @@ class I2PSocketImpl implements I2PSocket { long dieAfter = System.currentTimeMillis() + maxWait; synchronized (remoteIDWaiter) { if (wait) { - try { - if (maxWait >= 0) - remoteIDWaiter.wait(maxWait); - else - remoteIDWaiter.wait(); - } catch (InterruptedException ex) { + if (remoteID == null) { + try { + if (maxWait >= 0) + remoteIDWaiter.wait(maxWait); + else + remoteIDWaiter.wait(); + } catch (InterruptedException ex) { + } } long now = System.currentTimeMillis(); if ((maxWait >= 0) && (now >= dieAfter)) { long waitedExcess = now - dieAfter; - throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess + "ms too long [" + maxWait + "ms])"); + throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess + + "ms too long [" + maxWait + "ms, remId " + remoteID + + ", remId set " + (now-_remoteIdSetTime) + "ms ago])"); } if (_log.shouldLog(Log.DEBUG)) @@ -202,8 +215,10 @@ class I2PSocketImpl implements I2PSocket { */ public void close() throws IOException { synchronized (flagLock) { - _log.debug("Closing connection"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closing connection"); closed = true; + _closedOn = I2PAppContext.getGlobalContext().clock().now(); } out.close(); in.notifyClosed(); @@ -217,6 +232,7 @@ class I2PSocketImpl implements I2PSocket { closed = true; closed2 = true; sendClose = false; + _closedOn = I2PAppContext.getGlobalContext().clock().now(); } out.close(); in.notifyClosed(); @@ -251,6 +267,12 @@ class I2PSocketImpl implements I2PSocket { _socketErrorListener.errorOccurred(); } + public long getBytesSent() { return _bytesWritten; } + public long getBytesReceived() { return _bytesRead; } + public long getCreatedOn() { return _createdOn; } + public long getClosedOn() { return _closedOn; } + + private String getPrefix() { return "[" + _socketId + "]: "; } //-------------------------------------------------- @@ -276,12 +298,15 @@ class I2PSocketImpl implements I2PSocket { throw new RuntimeException("Incorrect read() result"); } - public synchronized int read(byte[] b, int off, int len) throws IOException { + public int read(byte[] b, int off, int len) throws IOException { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Read called: " + this.hashCode()); + _log.debug(getPrefix() + "Read called for " + len + " bytes (avail=" + bc.getCurrentSize() + "): " + this.hashCode()); if (len == 0) return 0; long dieAfter = System.currentTimeMillis() + readTimeout; - byte[] read = bc.startToByteArray(len); + byte[] read = null; + synchronized (bc) { + read = bc.startToByteArray(len); + } boolean timedOut = false; while (read.length == 0) { @@ -293,10 +318,12 @@ class I2PSocketImpl implements I2PSocket { } } try { - if (readTimeout >= 0) { - wait(readTimeout); - } else { - wait(); + synchronized (I2PSocketImpl.I2PInputStream.this) { + if (readTimeout >= 0) { + wait(readTimeout); + } else { + wait(); + } } } catch (InterruptedException ex) {} @@ -305,7 +332,9 @@ class I2PSocketImpl implements I2PSocket { throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)"); } - read = bc.startToByteArray(len); + synchronized (bc) { + read = bc.startToByteArray(len); + } } if (read.length > len) throw new RuntimeException("BUG"); System.arraycopy(read, 0, b, off, read.length); @@ -330,15 +359,21 @@ class I2PSocketImpl implements I2PSocket { queueData(data, 0, data.length); } - public synchronized void queueData(byte[] data, int off, int len) { + public void queueData(byte[] data, int off, int len) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); - bc.append(data, off, len); - notifyAll(); + synchronized (bc) { + bc.append(data, off, len); + } + synchronized (I2PInputStream.this) { + notifyAll(); + } } public synchronized void notifyClosed() { - I2PInputStream.this.notifyAll(); + synchronized (I2PInputStream.this) { + notifyAll(); + } } public void close() throws IOException {