diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java
index 844052782..cb021ed48 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java
@@ -32,8 +32,18 @@ public interface I2PSocket {
*/
public OutputStream getOutputStream() throws IOException;
+ /**
+ * Retrieve this socket's configuration
+ */
+ public I2PSocketOptions getOptions();
+ /**
+ * Configure the socket
+ */
+ public void setOptions(I2PSocketOptions options);
+
/**
- * How long we will wait blocked on a read() operation.
+ * How long we will wait blocked on a read() operation. This is simply a
+ * helper to query the I2PSocketOptions
*
* @return milliseconds to wait, or -1 if we will wait indefinitely
*/
@@ -41,7 +51,8 @@ public interface I2PSocket {
/**
* Define how long we will wait blocked on a read() operation (-1 will make
- * the socket wait forever).
+ * the socket wait forever). This is simply a helper to adjust the
+ * I2PSocketOptions
*
*/
public void setReadTimeout(long ms);
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
index c2fd95c9d..5e6bcaf47 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
@@ -40,6 +40,7 @@ class I2PSocketImpl implements I2PSocket {
private long _createdOn;
private long _closedOn;
private long _remoteIdSetTime;
+ private I2PSocketOptions _options;
private Object flagLock = new Object();
/**
@@ -81,6 +82,7 @@ class I2PSocketImpl implements I2PSocket {
_createdOn = I2PAppContext.getGlobalContext().clock().now();
_remoteIdSetTime = -1;
_closedOn = -1;
+ _options = mgr.getDefaultOptions();
}
/**
@@ -176,7 +178,21 @@ class I2PSocketImpl implements I2PSocket {
*/
public void queueData(byte[] data) {
_bytesRead += data.length;
- in.queueData(data);
+ try {
+ in.queueData(data);
+ } catch (InterruptedIOException iie) {
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Queue overflow, closing the stream", iie);
+ try {
+ close();
+ } catch (IOException ioe) {
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Error closing the stream due to overflow", ioe);
+ }
+ } catch (IOException ioe) {
+ if (_log.shouldLog(Log.ERROR))
+ _log.error("Connection closed while writing to the socket", ioe);
+ }
}
/**
@@ -245,19 +261,36 @@ class I2PSocketImpl implements I2PSocket {
return (byte)(I2PSocketManager.DATA_OUT + (byte)add);
}
+ public void setOptions(I2PSocketOptions options) {
+ _options = options;
+ in.setReadTimeout(options.getReadTimeout());
+ }
+
+ public I2PSocketOptions getOptions() {
+ return _options;
+ }
+
/**
- * What is the longest we'll block on the input stream while waiting
- * for more data? If this value is exceeded, the read() throws
- * InterruptedIOException
+ * How long we will wait blocked on a read() operation. This is simply a
+ * helper to query the I2PSocketOptions
+ *
+ * @return milliseconds to wait, or -1 if we will wait indefinitely
*/
public long getReadTimeout() {
- return in.getReadTimeout();
+ return _options.getReadTimeout();
}
+ /**
+ * Define how long we will wait blocked on a read() operation (-1 will make
+ * the socket wait forever). This is simply a helper to adjust the
+ * I2PSocketOptions
+ *
+ */
public void setReadTimeout(long ms) {
+ _options.setReadTimeout(ms);
in.setReadTimeout(ms);
}
-
+
public void setSocketErrorListener(SocketErrorListener lsnr) {
_socketErrorListener = lsnr;
}
@@ -279,6 +312,7 @@ class I2PSocketImpl implements I2PSocket {
private class I2PInputStream extends InputStream {
private ByteCollector bc = new ByteCollector();
+ private boolean inStreamClosed = false;
private long readTimeout = -1;
@@ -306,6 +340,7 @@ class I2PSocketImpl implements I2PSocket {
byte[] read = null;
synchronized (bc) {
read = bc.startToByteArray(len);
+ bc.notifyAll();
}
boolean timedOut = false;
@@ -334,6 +369,7 @@ class I2PSocketImpl implements I2PSocket {
synchronized (bc) {
read = bc.startToByteArray(len);
+ bc.notifyAll();
}
}
if (read.length > len) throw new RuntimeException("BUG");
@@ -357,14 +393,44 @@ class I2PSocketImpl implements I2PSocket {
}
}
- public void queueData(byte[] data) {
+ /**
+ * Add the data to the queue
+ *
+ * @throws InterruptedIOException if the queue's buffer is full, the socket has
+ * a write timeout, and that timeout is exceeded
+ * @throws IOException if the connection was closed while queueing up the data
+ */
+ public void queueData(byte[] data) throws InterruptedIOException, IOException {
queueData(data, 0, data.length);
}
- public void queueData(byte[] data, int off, int len) {
+ /**
+ * Add the data to the queue
+ *
+ * @throws InterruptedIOException if the queue's buffer is full, the socket has
+ * a write timeout, and that timeout is exceeded
+ * @throws IOException if the connection was closed while queueing up the data
+ */
+ public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
synchronized (bc) {
+ if (_options.getMaxBufferSize() > 0) {
+ int waited = 0;
+ while (bc.getCurrentSize() + len > _options.getMaxBufferSize()) {
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
+ if ( (_options.getWriteTimeout() > 0) && (waited > _options.getWriteTimeout()) ) {
+ throw new InterruptedIOException("Waited " + waited + "ms to write " + len + " with a buffer at " + bc.getCurrentSize());
+ }
+ if (inStreamClosed)
+ throw new IOException("Stream closed while writing");
+ try {
+ bc.wait(1000);
+ waited += 1000;
+ } catch (InterruptedException ie) {}
+ }
+ }
bc.append(data, off, len);
}
synchronized (I2PInputStream.this) {
@@ -381,6 +447,10 @@ class I2PSocketImpl implements I2PSocket {
public void close() throws IOException {
super.close();
notifyClosed();
+ synchronized (bc) {
+ inStreamClosed = true;
+ bc.notifyAll();
+ }
}
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java
index d6d592bc6..b20d2bf4b 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java
@@ -377,7 +377,7 @@ public class I2PSocketManager implements I2PSessionListener {
*
* @throws IllegalStateException if the socket isn't open or isn't known
*/
- private void sendIncoming(String id, byte payload[]) {
+ private void sendIncoming(String id, byte payload[]) throws IllegalStateException {
I2PSocketImpl s = null;
synchronized (lock) {
s = (I2PSocketImpl) _inSockets.get(id);
@@ -469,7 +469,10 @@ public class I2PSocketManager implements I2PSessionListener {
_context.statManager().addRateData("streaming.synNoAck", 1, 1);
throw new I2PException("Error sending through I2P network");
}
- remoteID = s.getRemoteID(true, options.getConnectTimeout());
+ if (options != null)
+ remoteID = s.getRemoteID(true, options.getConnectTimeout());
+ else
+ remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout());
if (remoteID == null) {
_context.statManager().addRateData("streaming.nackReceived", 1, 1);
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
index 1f92a7952..2f4f3a787 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java
@@ -81,6 +81,7 @@ public class I2PSocketManagerFactory {
private static I2PSocketManager createManager(I2PSession session) {
I2PSocketManager mgr = new I2PSocketManager();
mgr.setSession(session);
+ mgr.setDefaultOptions(new I2PSocketOptions());
return mgr;
}
}
\ No newline at end of file
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java
index 58e19b872..3bc46aeb8 100644
--- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java
@@ -6,9 +6,17 @@ package net.i2p.client.streaming;
*/
public class I2PSocketOptions {
private long _connectTimeout;
+ private long _readTimeout;
+ private long _writeTimeout;
+ private int _maxBufferSize;
+ public static final int DEFAULT_BUFFER_SIZE = 1024*128;
+
public I2PSocketOptions() {
_connectTimeout = -1;
+ _readTimeout = -1;
+ _writeTimeout = -1;
+ _maxBufferSize = DEFAULT_BUFFER_SIZE;
}
/**
@@ -27,4 +35,65 @@ public class I2PSocketOptions {
public void setConnectTimeout(long ms) {
_connectTimeout = ms;
}
+
+ /**
+ * What is the longest we'll block on the input stream while waiting
+ * for more data? If this value is exceeded, the read() throws
+ * InterruptedIOException
+ */
+ public long getReadTimeout() {
+ return _readTimeout;
+ }
+
+ /**
+ * What is the longest we'll block on the input stream while waiting
+ * for more data? If this value is exceeded, the read() throws
+ * InterruptedIOException
+ */
+ public void setReadTimeout(long ms) {
+ _readTimeout = ms;
+ }
+
+ /**
+ * How much data will we accept that hasn't been written out yet. After
+ * this amount has been exceeded, subsequent .write calls will block until
+ * either some data is removed or the connection is closed. If this is
+ * less than or equal to zero, there is no limit (warning: can eat ram)
+ *
+ * @return buffer size limit, in bytes
+ */
+ public int getMaxBufferSize() {
+ return _maxBufferSize;
+ }
+
+ /**
+ * How much data will we accept that hasn't been written out yet. After
+ * this amount has been exceeded, subsequent .write calls will block until
+ * either some data is removed or the connection is closed. If this is
+ * less than or equal to zero, there is no limit (warning: can eat ram)
+ *
+ */
+ public void setMaxBufferSize(int numBytes) {
+ _maxBufferSize = numBytes;
+ }
+
+ /**
+ * What is the longest we'll block on the output stream while waiting
+ * for the data to flush? If this value is exceeded, the write() throws
+ * InterruptedIOException. If this is less than or equal to zero, there
+ * is no timeout.
+ */
+ public long getWriteTimeout() {
+ return _writeTimeout;
+ }
+
+ /**
+ * What is the longest we'll block on the output stream while waiting
+ * for the data to flush? If this value is exceeded, the write() throws
+ * InterruptedIOException. If this is less than or equal to zero, there
+ * is no timeout.
+ */
+ public void setWriteTimeout(long ms) {
+ _writeTimeout = ms;
+ }
}
diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java
new file mode 100644
index 000000000..5b4cdad63
--- /dev/null
+++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java
@@ -0,0 +1,135 @@
+package net.i2p.client.streaming;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+
+import java.util.Random;
+
+import net.i2p.I2PAppContext;
+import net.i2p.I2PException;
+import net.i2p.data.Destination;
+import net.i2p.data.DataFormatException;
+import net.i2p.util.Log;
+
+/**
+ * Simple streaming lib test app that connects to a given destination and sends
+ * it a particular amount of random data, then disconnects. See the {@link main}
+ *
+ */
+public class StreamSinkClient {
+ private Log _log;
+ private int _sendSize;
+ private int _writeDelay;
+ private String _peerDestFile;
+
+
+ /**
+ * Build the client but don't fire it up.
+ * @param sendSize how many KB to send
+ * @param writeDelayMs how long to wait between each .write (0 for no delay)
+ * @param serverDestFile file containing the StreamSinkServer's binary Destination
+ */
+ public StreamSinkClient(int sendSize, int writeDelayMs, String serverDestFile) {
+ _sendSize = sendSize;
+ _writeDelay = writeDelayMs;
+ _peerDestFile = serverDestFile;
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkClient.class);
+ }
+
+ /**
+ * Actually connect and run the client - this call blocks until completion.
+ *
+ */
+ public void runClient() {
+ I2PSocketManager mgr = I2PSocketManagerFactory.createManager();
+ Destination peer = null;
+ FileInputStream fis = null;
+ try {
+ fis = new FileInputStream(_peerDestFile);
+ peer = new Destination();
+ peer.readBytes(fis);
+ } catch (IOException ioe) {
+ _log.error("Error finding the peer destination to contact in " + _peerDestFile, ioe);
+ return;
+ } catch (DataFormatException dfe) {
+ _log.error("Peer destination is not valid in " + _peerDestFile, dfe);
+ return;
+ } finally {
+ if (fis == null) try { fis.close(); } catch (IOException ioe) {}
+ }
+
+
+ System.out.println("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64());
+
+ try {
+ I2PSocket sock = mgr.connect(peer);
+ byte buf[] = new byte[32*1024];
+ Random rand = new Random();
+ OutputStream out = sock.getOutputStream();
+ long beforeSending = System.currentTimeMillis();
+ for (int i = 0; i < _sendSize; i+= 32) {
+ rand.nextBytes(buf);
+ out.write(buf);
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Wrote " + (i+32) + "/" + _sendSize + "KB");
+ if (_writeDelay > 0) {
+ try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {}
+ }
+ }
+ long afterSending = System.currentTimeMillis();
+ System.out.println("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms");
+ sock.close();
+ } catch (InterruptedIOException iie) {
+ _log.error("Timeout connecting to the peer", iie);
+ return;
+ } catch (NoRouteToHostException nrthe) {
+ _log.error("Unable to connect to the peer", nrthe);
+ return;
+ } catch (ConnectException ce) {
+ _log.error("Connection already dropped", ce);
+ return;
+ } catch (I2PException ie) {
+ _log.error("Error connecting to the peer", ie);
+ return;
+ } catch (IOException ioe) {
+ _log.error("IO error sending", ioe);
+ return;
+ }
+ }
+
+ /**
+ * Fire up the client. Usage: StreamSinkClient sendSizeKB writeDelayMs serverDestFile
+ *
Usage: StreamSinkServer sinkDir ourDestFile