From f85ce180ed416e364109be047a4bceae36726977 Mon Sep 17 00:00:00 2001 From: jrandom Date: Sun, 1 Aug 2004 18:34:02 +0000 Subject: [PATCH] * added a way to control how large we let the buffers grow before we block, or even whether to have the blocking action timeout and close the socket after a certain delay * refactored the I2PSocketOptions to be more actively used * added a pair of ministreaming lib demo apps: - StreamSinkServer listens to a destination and dumps any data it receives on a socket to a per-socket file - StreamSinkClient sends a destination a specified number of random bytes, then disconnects --- .../net/i2p/client/streaming/I2PSocket.java | 15 +- .../i2p/client/streaming/I2PSocketImpl.java | 86 +++++++++-- .../client/streaming/I2PSocketManager.java | 7 +- .../streaming/I2PSocketManagerFactory.java | 1 + .../client/streaming/I2PSocketOptions.java | 69 +++++++++ .../client/streaming/StreamSinkClient.java | 135 ++++++++++++++++++ .../client/streaming/StreamSinkServer.java | 135 ++++++++++++++++++ 7 files changed, 436 insertions(+), 12 deletions(-) create mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java create mode 100644 apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index 844052782c..cb021ed485 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -32,8 +32,18 @@ public interface I2PSocket { */ public OutputStream getOutputStream() throws IOException; + /** + * Retrieve this socket's configuration + */ + public I2PSocketOptions getOptions(); + /** + * Configure the socket + */ + public void setOptions(I2PSocketOptions options); + /** - * How long we will wait blocked on a read() operation. + * How long we will wait blocked on a read() operation. This is simply a + * helper to query the I2PSocketOptions * * @return milliseconds to wait, or -1 if we will wait indefinitely */ @@ -41,7 +51,8 @@ public interface I2PSocket { /** * Define how long we will wait blocked on a read() operation (-1 will make - * the socket wait forever). + * the socket wait forever). This is simply a helper to adjust the + * I2PSocketOptions * */ public void setReadTimeout(long ms); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index c2fd95c9de..5e6bcaf47f 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -40,6 +40,7 @@ class I2PSocketImpl implements I2PSocket { private long _createdOn; private long _closedOn; private long _remoteIdSetTime; + private I2PSocketOptions _options; private Object flagLock = new Object(); /** @@ -81,6 +82,7 @@ class I2PSocketImpl implements I2PSocket { _createdOn = I2PAppContext.getGlobalContext().clock().now(); _remoteIdSetTime = -1; _closedOn = -1; + _options = mgr.getDefaultOptions(); } /** @@ -176,7 +178,21 @@ class I2PSocketImpl implements I2PSocket { */ public void queueData(byte[] data) { _bytesRead += data.length; - in.queueData(data); + try { + in.queueData(data); + } catch (InterruptedIOException iie) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Queue overflow, closing the stream", iie); + try { + close(); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error closing the stream due to overflow", ioe); + } + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Connection closed while writing to the socket", ioe); + } } /** @@ -245,19 +261,36 @@ class I2PSocketImpl implements I2PSocket { return (byte)(I2PSocketManager.DATA_OUT + (byte)add); } + public void setOptions(I2PSocketOptions options) { + _options = options; + in.setReadTimeout(options.getReadTimeout()); + } + + public I2PSocketOptions getOptions() { + return _options; + } + /** - * What is the longest we'll block on the input stream while waiting - * for more data? If this value is exceeded, the read() throws - * InterruptedIOException + * How long we will wait blocked on a read() operation. This is simply a + * helper to query the I2PSocketOptions + * + * @return milliseconds to wait, or -1 if we will wait indefinitely */ public long getReadTimeout() { - return in.getReadTimeout(); + return _options.getReadTimeout(); } + /** + * Define how long we will wait blocked on a read() operation (-1 will make + * the socket wait forever). This is simply a helper to adjust the + * I2PSocketOptions + * + */ public void setReadTimeout(long ms) { + _options.setReadTimeout(ms); in.setReadTimeout(ms); } - + public void setSocketErrorListener(SocketErrorListener lsnr) { _socketErrorListener = lsnr; } @@ -279,6 +312,7 @@ class I2PSocketImpl implements I2PSocket { private class I2PInputStream extends InputStream { private ByteCollector bc = new ByteCollector(); + private boolean inStreamClosed = false; private long readTimeout = -1; @@ -306,6 +340,7 @@ class I2PSocketImpl implements I2PSocket { byte[] read = null; synchronized (bc) { read = bc.startToByteArray(len); + bc.notifyAll(); } boolean timedOut = false; @@ -334,6 +369,7 @@ class I2PSocketImpl implements I2PSocket { synchronized (bc) { read = bc.startToByteArray(len); + bc.notifyAll(); } } if (read.length > len) throw new RuntimeException("BUG"); @@ -357,14 +393,44 @@ class I2PSocketImpl implements I2PSocket { } } - public void queueData(byte[] data) { + /** + * Add the data to the queue + * + * @throws InterruptedIOException if the queue's buffer is full, the socket has + * a write timeout, and that timeout is exceeded + * @throws IOException if the connection was closed while queueing up the data + */ + public void queueData(byte[] data) throws InterruptedIOException, IOException { queueData(data, 0, data.length); } - public void queueData(byte[] data, int off, int len) { + /** + * Add the data to the queue + * + * @throws InterruptedIOException if the queue's buffer is full, the socket has + * a write timeout, and that timeout is exceeded + * @throws IOException if the connection was closed while queueing up the data + */ + public void queueData(byte[] data, int off, int len) throws InterruptedIOException, IOException { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); synchronized (bc) { + if (_options.getMaxBufferSize() > 0) { + int waited = 0; + while (bc.getCurrentSize() + len > _options.getMaxBufferSize()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Buffer size exceeded: pending " + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize()); + if ( (_options.getWriteTimeout() > 0) && (waited > _options.getWriteTimeout()) ) { + throw new InterruptedIOException("Waited " + waited + "ms to write " + len + " with a buffer at " + bc.getCurrentSize()); + } + if (inStreamClosed) + throw new IOException("Stream closed while writing"); + try { + bc.wait(1000); + waited += 1000; + } catch (InterruptedException ie) {} + } + } bc.append(data, off, len); } synchronized (I2PInputStream.this) { @@ -381,6 +447,10 @@ class I2PSocketImpl implements I2PSocket { public void close() throws IOException { super.close(); notifyClosed(); + synchronized (bc) { + inStreamClosed = true; + bc.notifyAll(); + } } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index d6d592bc60..b20d2bf4b7 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -377,7 +377,7 @@ public class I2PSocketManager implements I2PSessionListener { * * @throws IllegalStateException if the socket isn't open or isn't known */ - private void sendIncoming(String id, byte payload[]) { + private void sendIncoming(String id, byte payload[]) throws IllegalStateException { I2PSocketImpl s = null; synchronized (lock) { s = (I2PSocketImpl) _inSockets.get(id); @@ -469,7 +469,10 @@ public class I2PSocketManager implements I2PSessionListener { _context.statManager().addRateData("streaming.synNoAck", 1, 1); throw new I2PException("Error sending through I2P network"); } - remoteID = s.getRemoteID(true, options.getConnectTimeout()); + if (options != null) + remoteID = s.getRemoteID(true, options.getConnectTimeout()); + else + remoteID = s.getRemoteID(true, getDefaultOptions().getConnectTimeout()); if (remoteID == null) { _context.statManager().addRateData("streaming.nackReceived", 1, 1); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 1f92a7952e..2f4f3a7873 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -81,6 +81,7 @@ public class I2PSocketManagerFactory { private static I2PSocketManager createManager(I2PSession session) { I2PSocketManager mgr = new I2PSocketManager(); mgr.setSession(session); + mgr.setDefaultOptions(new I2PSocketOptions()); return mgr; } } \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 58e19b872b..3bc46aeb8c 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -6,9 +6,17 @@ package net.i2p.client.streaming; */ public class I2PSocketOptions { private long _connectTimeout; + private long _readTimeout; + private long _writeTimeout; + private int _maxBufferSize; + public static final int DEFAULT_BUFFER_SIZE = 1024*128; + public I2PSocketOptions() { _connectTimeout = -1; + _readTimeout = -1; + _writeTimeout = -1; + _maxBufferSize = DEFAULT_BUFFER_SIZE; } /** @@ -27,4 +35,65 @@ public class I2PSocketOptions { public void setConnectTimeout(long ms) { _connectTimeout = ms; } + + /** + * What is the longest we'll block on the input stream while waiting + * for more data? If this value is exceeded, the read() throws + * InterruptedIOException + */ + public long getReadTimeout() { + return _readTimeout; + } + + /** + * What is the longest we'll block on the input stream while waiting + * for more data? If this value is exceeded, the read() throws + * InterruptedIOException + */ + public void setReadTimeout(long ms) { + _readTimeout = ms; + } + + /** + * How much data will we accept that hasn't been written out yet. After + * this amount has been exceeded, subsequent .write calls will block until + * either some data is removed or the connection is closed. If this is + * less than or equal to zero, there is no limit (warning: can eat ram) + * + * @return buffer size limit, in bytes + */ + public int getMaxBufferSize() { + return _maxBufferSize; + } + + /** + * How much data will we accept that hasn't been written out yet. After + * this amount has been exceeded, subsequent .write calls will block until + * either some data is removed or the connection is closed. If this is + * less than or equal to zero, there is no limit (warning: can eat ram) + * + */ + public void setMaxBufferSize(int numBytes) { + _maxBufferSize = numBytes; + } + + /** + * What is the longest we'll block on the output stream while waiting + * for the data to flush? If this value is exceeded, the write() throws + * InterruptedIOException. If this is less than or equal to zero, there + * is no timeout. + */ + public long getWriteTimeout() { + return _writeTimeout; + } + + /** + * What is the longest we'll block on the output stream while waiting + * for the data to flush? If this value is exceeded, the write() throws + * InterruptedIOException. If this is less than or equal to zero, there + * is no timeout. + */ + public void setWriteTimeout(long ms) { + _writeTimeout = ms; + } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java new file mode 100644 index 0000000000..5b4cdad638 --- /dev/null +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java @@ -0,0 +1,135 @@ +package net.i2p.client.streaming; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; + +import java.net.ConnectException; +import java.net.NoRouteToHostException; + +import java.util.Random; + +import net.i2p.I2PAppContext; +import net.i2p.I2PException; +import net.i2p.data.Destination; +import net.i2p.data.DataFormatException; +import net.i2p.util.Log; + +/** + * Simple streaming lib test app that connects to a given destination and sends + * it a particular amount of random data, then disconnects. See the {@link main} + * + */ +public class StreamSinkClient { + private Log _log; + private int _sendSize; + private int _writeDelay; + private String _peerDestFile; + + + /** + * Build the client but don't fire it up. + * @param sendSize how many KB to send + * @param writeDelayMs how long to wait between each .write (0 for no delay) + * @param serverDestFile file containing the StreamSinkServer's binary Destination + */ + public StreamSinkClient(int sendSize, int writeDelayMs, String serverDestFile) { + _sendSize = sendSize; + _writeDelay = writeDelayMs; + _peerDestFile = serverDestFile; + _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkClient.class); + } + + /** + * Actually connect and run the client - this call blocks until completion. + * + */ + public void runClient() { + I2PSocketManager mgr = I2PSocketManagerFactory.createManager(); + Destination peer = null; + FileInputStream fis = null; + try { + fis = new FileInputStream(_peerDestFile); + peer = new Destination(); + peer.readBytes(fis); + } catch (IOException ioe) { + _log.error("Error finding the peer destination to contact in " + _peerDestFile, ioe); + return; + } catch (DataFormatException dfe) { + _log.error("Peer destination is not valid in " + _peerDestFile, dfe); + return; + } finally { + if (fis == null) try { fis.close(); } catch (IOException ioe) {} + } + + + System.out.println("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64()); + + try { + I2PSocket sock = mgr.connect(peer); + byte buf[] = new byte[32*1024]; + Random rand = new Random(); + OutputStream out = sock.getOutputStream(); + long beforeSending = System.currentTimeMillis(); + for (int i = 0; i < _sendSize; i+= 32) { + rand.nextBytes(buf); + out.write(buf); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Wrote " + (i+32) + "/" + _sendSize + "KB"); + if (_writeDelay > 0) { + try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {} + } + } + long afterSending = System.currentTimeMillis(); + System.out.println("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms"); + sock.close(); + } catch (InterruptedIOException iie) { + _log.error("Timeout connecting to the peer", iie); + return; + } catch (NoRouteToHostException nrthe) { + _log.error("Unable to connect to the peer", nrthe); + return; + } catch (ConnectException ce) { + _log.error("Connection already dropped", ce); + return; + } catch (I2PException ie) { + _log.error("Error connecting to the peer", ie); + return; + } catch (IOException ioe) { + _log.error("IO error sending", ioe); + return; + } + } + + /** + * Fire up the client. Usage: StreamSinkClient sendSizeKB writeDelayMs serverDestFile
+ * + */ + public static void main(String args[]) { + if (args.length != 3) { + System.out.println("Usage: StreamSinkClient sendSizeKB writeDelayMs serverDestFile"); + } else { + int sendSizeKB = -1; + int writeDelayMs = -1; + try { + sendSizeKB = Integer.parseInt(args[0]); + } catch (NumberFormatException nfe) { + System.err.println("Send size invalid [" + args[0] + "]"); + return; + } + try { + writeDelayMs = Integer.parseInt(args[1]); + } catch (NumberFormatException nfe) { + System.err.println("Write delay ms invalid [" + args[1] + "]"); + return; + } + StreamSinkClient client = new StreamSinkClient(sendSizeKB, writeDelayMs, args[2]); + client.runClient(); + } + } +} diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java new file mode 100644 index 0000000000..832ee4ee99 --- /dev/null +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -0,0 +1,135 @@ +package net.i2p.client.streaming; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import java.net.ConnectException; + +import net.i2p.I2PAppContext; +import net.i2p.I2PException; +import net.i2p.data.DataFormatException; +import net.i2p.data.Destination; +import net.i2p.util.I2PThread; +import net.i2p.util.Log; + +/** + * Listen to a destination, receiving any sockets and writing anything they + * send to a new file. + * + */ +public class StreamSinkServer { + private Log _log; + private String _sinkDir; + private String _destFile; + + /** + * Create but do not start the streaming server. + * + * @param sinkDir Directory to store received files in + * @param ourDestFile filename to write our binary destination to + */ + public StreamSinkServer(String sinkDir, String ourDestFile) { + _sinkDir = sinkDir; + _destFile = ourDestFile; + _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); + } + + /** + * Actually fire up the server - this call blocks forever (or until the server + * socket closes) + * + */ + public void runServer() { + I2PSocketManager mgr = I2PSocketManagerFactory.createManager(); + Destination dest = mgr.getSession().getMyDestination(); + System.out.println("Listening for connections on: " + dest.calculateHash().toBase64()); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(_destFile); + dest.writeBytes(fos); + } catch (IOException ioe) { + _log.error("Error writing out our destination to " + _destFile, ioe); + return; + } catch (DataFormatException dfe) { + _log.error("Error formatting the destination", dfe); + return; + } finally { + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + } + + I2PServerSocket sock = mgr.getServerSocket(); + while (true) { + try { + I2PSocket curSock = sock.accept(); + handle(curSock); + } catch (I2PException ie) { + _log.error("Error accepting connection", ie); + return; + } catch (ConnectException ce) { + _log.error("Connection already dropped", ce); + return; + } + } + } + + private void handle(I2PSocket socket) { + I2PThread t = new I2PThread(new ClientRunner(socket)); + t.setName("Handle " + socket.getPeerDestination().calculateHash().toBase64().substring(0,4)); + t.start(); + } + + /** + * Actually deal with a client - pull anything they send us and write it to a file. + * + */ + private class ClientRunner implements Runnable { + private I2PSocket _sock; + private FileOutputStream _fos; + public ClientRunner(I2PSocket socket) { + _sock = socket; + try { + File sink = new File(_sinkDir); + if (!sink.exists()) + sink.mkdirs(); + File cur = File.createTempFile("clientSink", ".dat", sink); + _fos = new FileOutputStream(cur); + } catch (IOException ioe) { + _log.error("Error creating sink", ioe); + _fos = null; + } + } + public void run() { + if (_fos == null) return; + try { + InputStream in = _sock.getInputStream(); + byte buf[] = new byte[4096]; + int read = 0; + while ( (read = in.read(buf)) != -1) { + _fos.write(buf, 0, read); + } + } catch (IOException ioe) { + _log.error("Error writing the sink", ioe); + } finally { + if (_fos != null) try { _fos.close(); } catch (IOException ioe) {} + } + } + } + + /** + * Fire up the streaming server. Usage: StreamSinkServer sinkDir ourDestFile
+ * + */ + public static void main(String args[]) { + if (args.length != 2) { + System.out.println("Usage: StreamSinkServer sinkDir ourDestFile"); + } else { + StreamSinkServer server = new StreamSinkServer(args[0], args[1]); + server.runServer(); + } + } +}