diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 2add8641239a3189a9ee626827865ab8b04910f0..64a4708e2313f580e85f36fdf1f7819418480401 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -160,45 +160,59 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } public void run() { - try { I2PServerSocket i2pss = sockMgr.getServerSocket(); + for (int i = 0; i < 5; i++) { + I2PThread handler = new I2PThread(new Handler(i2pss), "Handle Server " + i); + handler.start(); + } + /* while (true) { I2PSocket i2ps = i2pss.accept(); if (i2ps == null) throw new I2PException("I2PServerSocket closed"); I2PThread t = new I2PThread(new Handler(i2ps)); t.start(); } - } catch (I2PException ex) { - _log.error("Error while waiting for I2PConnections", ex); - } catch (IOException ex) { - _log.error("Error while waiting for I2PConnections", ex); - } + */ } + /** - * Async handler to keep .accept() from blocking too long. - * todo: replace with a thread pool so we dont get overrun by threads if/when - * receiving a lot of connection requests concurrently. + * minor thread pool to pull off the accept() concurrently. there are still lots + * (and lots) of wasted threads within the I2PTunnelRunner, but its a start * */ private class Handler implements Runnable { - private I2PSocket _handleSocket; - public Handler(I2PSocket socket) { - _handleSocket = socket; + private I2PServerSocket _serverSocket; + public Handler(I2PServerSocket serverSocket) { + _serverSocket = serverSocket; } public void run() { + while (open) { + try { + handle(_serverSocket.accept()); + } catch (I2PException ex) { + _log.error("Error while waiting for I2PConnections", ex); + return; + } catch (IOException ex) { + _log.error("Error while waiting for I2PConnections", ex); + return; + } + } + } + + private void handle(I2PSocket socket) { long afterAccept = I2PAppContext.getGlobalContext().clock().now(); long afterSocket = -1; //local is fast, so synchronously. Does not need that many //threads. try { - _handleSocket.setReadTimeout(readTimeout); + socket.setReadTimeout(readTimeout); Socket s = new Socket(remoteHost, remotePort); afterSocket = I2PAppContext.getGlobalContext().clock().now(); - new I2PTunnelRunner(s, _handleSocket, slock, null, null); + new I2PTunnelRunner(s, socket, slock, null, null); } catch (SocketException ex) { try { - _handleSocket.close(); + socket.close(); } catch (IOException ioe) { _log.error("Error while closing the received i2p con", ex); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java index 91404a2d31781f07e6d90b3068a7eee8e1054b2b..74365b2ff1d3c1b702f9275ae0e3416e5e0b6def 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkClient.java @@ -15,6 +15,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.data.Destination; import net.i2p.data.DataFormatException; +import net.i2p.util.I2PThread; import net.i2p.util.Log; /** @@ -74,61 +75,67 @@ public class StreamSinkClient { } finally { if (fis == null) try { fis.close(); } catch (IOException ioe) {} } - - - System.out.println("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64()); - - try { - I2PSocket sock = mgr.connect(peer); - byte buf[] = new byte[32*1024]; - Random rand = new Random(); - OutputStream out = sock.getOutputStream(); - long beforeSending = System.currentTimeMillis(); - for (int i = 0; (_sendSize < 0) || (i < _sendSize); i+= 32) { - rand.nextBytes(buf); - out.write(buf); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send " + _sendSize + "KB to " + peer.calculateHash().toBase64()); + + while (true) { + try { + I2PSocket sock = mgr.connect(peer); + byte buf[] = new byte[Math.min(32*1024, _sendSize*1024)]; + Random rand = new Random(); + OutputStream out = sock.getOutputStream(); + long beforeSending = System.currentTimeMillis(); + for (int i = 0; (_sendSize < 0) || (i < _sendSize); i+= buf.length/1024) { + rand.nextBytes(buf); + out.write(buf); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Wrote " + ((1+i*buf.length)/1024) + "/" + _sendSize + "KB"); + if (_writeDelay > 0) { + try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {} + } + } + sock.close(); + long afterSending = System.currentTimeMillis(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Wrote " + (i+32) + "/" + _sendSize + "KB"); - if (_writeDelay > 0) { - try { Thread.sleep(_writeDelay); } catch (InterruptedException ie) {} - } - } - sock.close(); - long afterSending = System.currentTimeMillis(); - System.out.println("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms"); - } catch (InterruptedIOException iie) { - _log.error("Timeout connecting to the peer", iie); - return; - } catch (NoRouteToHostException nrthe) { - _log.error("Unable to connect to the peer", nrthe); - return; - } catch (ConnectException ce) { - _log.error("Connection already dropped", ce); - return; - } catch (I2PException ie) { - _log.error("Error connecting to the peer", ie); - return; - } catch (IOException ioe) { - _log.error("IO error sending", ioe); - return; + _log.debug("Sent " + _sendSize + "KB in " + (afterSending-beforeSending) + "ms"); + } catch (InterruptedIOException iie) { + _log.error("Timeout connecting to the peer", iie); + //return; + } catch (NoRouteToHostException nrthe) { + _log.error("Unable to connect to the peer", nrthe); + //return; + } catch (ConnectException ce) { + _log.error("Connection already dropped", ce); + //return; + } catch (I2PException ie) { + _log.error("Error connecting to the peer", ie); + return; + } catch (IOException ioe) { + _log.error("IO error sending", ioe); + return; + } } } /** - * Fire up the client. <code>Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile</code> <br /> + * Fire up the client. <code>Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile [concurrentSends]</code> <br /> * <ul> * <li><b>sendSizeKB</b>: how many KB to send, or -1 for unlimited</li> * <li><b>writeDelayMs</b>: how long to wait between each .write (0 for no delay)</li> * <li><b>serverDestFile</b>: file containing the StreamSinkServer's binary Destination</li> + * <li><b>concurrentSends</b>: how many concurrent threads should send to the server at once</li> * </ul> */ public static void main(String args[]) { StreamSinkClient client = null; int sendSizeKB = -1; int writeDelayMs = -1; + int concurrent = 1; switch (args.length) { - case 3: + case 3: // fall through + case 4: try { sendSizeKB = Integer.parseInt(args[0]); } catch (NumberFormatException nfe) { @@ -141,9 +148,13 @@ public class StreamSinkClient { System.err.println("Write delay ms invalid [" + args[1] + "]"); return; } + if (args.length == 4) { + try { concurrent = Integer.parseInt(args[3]); } catch (NumberFormatException nfe) {} + } client = new StreamSinkClient(sendSizeKB, writeDelayMs, args[2]); break; - case 5: + case 5: // fall through + case 6: try { int port = Integer.parseInt(args[1]); sendSizeKB = Integer.parseInt(args[2]); @@ -152,11 +163,26 @@ public class StreamSinkClient { } catch (NumberFormatException nfe) { System.err.println("arg error"); } + if (args.length == 6) { + try { concurrent = Integer.parseInt(args[5]); } catch (NumberFormatException nfe) {} + } break; default: - System.out.println("Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile"); + System.out.println("Usage: StreamSinkClient [i2cpHost i2cpPort] sendSizeKB writeDelayMs serverDestFile [concurrentSends]"); + } + if (client != null) { + for (int i = 0; i < concurrent; i++) + new I2PThread(new Runner(client), "Client " + i).start(); + } + } + + private static class Runner implements Runnable { + private StreamSinkClient _client; + public Runner(StreamSinkClient client) { + _client = client; + } + public void run() { + _client.runClient(); } - if (client != null) - client.runClient(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index 900d18b1de91a085f2249e1ce403a2bbe18a1565..7651cb5efe555b772e8434bcdcf9f20984795158 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -6,6 +6,8 @@ import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; import java.util.Properties; import net.i2p.I2PAppContext; @@ -26,6 +28,7 @@ public class StreamSinkServer { private String _destFile; private String _i2cpHost; private int _i2cpPort; + private int _handlers; /** * Create but do not start the streaming server. @@ -34,13 +37,14 @@ public class StreamSinkServer { * @param ourDestFile filename to write our binary destination to */ public StreamSinkServer(String sinkDir, String ourDestFile) { - this(sinkDir, ourDestFile, null, -1); + this(sinkDir, ourDestFile, null, -1, 3); } - public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort) { + public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { _sinkDir = sinkDir; _destFile = ourDestFile; _i2cpHost = i2cpHost; _i2cpPort = i2cpPort; + _handlers = handlers; _log = I2PAppContext.getGlobalContext().logManager().getLog(StreamSinkServer.class); } @@ -56,7 +60,8 @@ public class StreamSinkServer { else mgr = I2PSocketManagerFactory.createManager(); Destination dest = mgr.getSession().getMyDestination(); - System.out.println("Listening for connections on: " + dest.calculateHash().toBase64()); + if (_log.shouldLog(Log.INFO)) + _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); FileOutputStream fos = null; try { fos = new FileOutputStream(_destFile); @@ -72,24 +77,16 @@ public class StreamSinkServer { } I2PServerSocket sock = mgr.getServerSocket(); - while (true) { - try { - I2PSocket curSock = sock.accept(); - handle(curSock); - } catch (I2PException ie) { - _log.error("Error accepting connection", ie); - return; - } catch (ConnectException ce) { - _log.error("Connection already dropped", ce); - return; - } - } + startup(sock); } - private void handle(I2PSocket socket) { - I2PThread t = new I2PThread(new ClientRunner(socket)); - t.setName("Handle " + socket.getPeerDestination().calculateHash().toBase64().substring(0,4)); - t.start(); + public void startup(I2PServerSocket sock) { + for (int i = 0; i < _handlers; i++) { + I2PThread t = new I2PThread(new ClientRunner(sock)); + t.setName("Handler " + i); + t.setDaemon(false); + t.start(); + } } /** @@ -97,27 +94,44 @@ public class StreamSinkServer { * */ private class ClientRunner implements Runnable { - private I2PSocket _sock; - private FileOutputStream _fos; - public ClientRunner(I2PSocket socket) { - _sock = socket; + private I2PServerSocket _socket; + public ClientRunner(I2PServerSocket socket) { + _socket = socket; + } + public void run() { + while (true) { + try { + I2PSocket socket = _socket.accept(); + if (socket != null) + handle(socket); + } catch (I2PException ie) { + _log.error("Error accepting connection", ie); + return; + } catch (ConnectException ce) { + _log.error("Connection already dropped", ce); + return; + } + } + } + + private void handle(I2PSocket sock) { + FileOutputStream fos = null; try { File sink = new File(_sinkDir); if (!sink.exists()) sink.mkdirs(); File cur = File.createTempFile("clientSink", ".dat", sink); - _fos = new FileOutputStream(cur); - System.out.println("Writing to " + cur.getAbsolutePath()); + fos = new FileOutputStream(cur); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Writing to " + cur.getAbsolutePath()); } catch (IOException ioe) { _log.error("Error creating sink", ioe); - _fos = null; + return; } - } - public void run() { - if (_fos == null) return; + long start = System.currentTimeMillis(); try { - InputStream in = _sock.getInputStream(); + InputStream in = sock.getInputStream(); byte buf[] = new byte[4096]; long written = 0; int read = 0; @@ -125,47 +139,55 @@ public class StreamSinkServer { //_fos.write(buf, 0, read); written += read; if (_log.shouldLog(Log.DEBUG)) - _log.debug("read and wrote " + read); + _log.debug("read and wrote " + read + " (" + written + ")"); } - _fos.write(("written: [" + written + "]\n").getBytes()); + fos.write(("written: [" + written + "]\n").getBytes()); long lifetime = System.currentTimeMillis() - start; - _log.error("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); + _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); } catch (IOException ioe) { _log.error("Error writing the sink", ioe); } finally { - if (_fos != null) try { _fos.close(); } catch (IOException ioe) {} - if (_sock != null) try { _sock.close(); } catch (IOException ioe) {} - _log.error("Client socket closed"); + if (fos != null) try { fos.close(); } catch (IOException ioe) {} + if (sock != null) try { sock.close(); } catch (IOException ioe) {} + _log.debug("Client socket closed"); } } } /** - * Fire up the streaming server. <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile</code><br /> + * Fire up the streaming server. <code>Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [numHandlers]</code><br /> * <ul> * <li><b>sinkDir</b>: Directory to store received files in</li> * <li><b>ourDestFile</b>: filename to write our binary destination to</li> + * <li><b>numHandlers</b>: how many concurrent connections to handle</li> * </ul> */ public static void main(String args[]) { StreamSinkServer server = null; switch (args.length) { case 0: - server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654); + server = new StreamSinkServer("dataDir", "server.key", "localhost", 7654, 3); break; case 2: server = new StreamSinkServer(args[0], args[1]); break; case 4: + case 5: + int handlers = 3; + if (args.length == 5) { + try { + handlers = Integer.parseInt(args[4]); + } catch (NumberFormatException nfe) {} + } try { int port = Integer.parseInt(args[1]); - server = new StreamSinkServer(args[2], args[3], args[0], port); + server = new StreamSinkServer(args[2], args[3], args[0], port, handlers); } catch (NumberFormatException nfe) { - System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile"); + System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); } break; default: - System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile"); + System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); } if (server != null) server.runServer(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index cdfa6b2de91adabaa9a147db42470b249aa8365e..9ece426b803f9d70baf4176425f3fb5f20280f4c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -68,6 +68,7 @@ public class ConnectionManager { _context.statManager().createRateStat("stream.con.lifetimeRTT", "What is the final RTT when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); } Connection getConnectionByInboundId(byte[] id) { @@ -109,7 +110,14 @@ public class ConnectionManager { byte receiveId[] = new byte[4]; _context.random().nextBytes(receiveId); boolean reject = false; + int active = 0; + int total = 0; synchronized (_connectionLock) { + total = _connectionByInboundId.size(); + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + if ( ((Connection)iter.next()).getIsConnected() ) + active++; + } if (locked_tooManyStreams()) { reject = true; } else { @@ -127,6 +135,8 @@ public class ConnectionManager { } } + _context.statManager().addRateData("stream.receiveActive", active, total); + if (reject) { if (_log.shouldLog(Log.WARN)) _log.warn("Refusing connection since we have exceeded our max of " @@ -227,6 +237,8 @@ public class ConnectionManager { } private boolean locked_tooManyStreams() { + if (_maxConcurrentStreams <= 0) return false; + if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; int active = 0; for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); @@ -238,8 +250,6 @@ public class ConnectionManager { _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); - if (_maxConcurrentStreams <= 0) return false; - if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; return (active >= _maxConcurrentStreams); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index e4cc494d4084dd041287b8c511754833937e3098..07a5cdb798c5b7c25ac1e36bbef31ebebca25798 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -98,8 +98,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); - setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); - setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); + setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); + setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index fd5beab81408e78d88d1ecde9cff66cf1637eb7c..9117cc3dea1f11e339f6d1af832e572db52c7a84 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -256,7 +256,7 @@ public class ConnectionPacketHandler { newWindowSize += 1; } else { // slow start, but modified to take into account the fact - // that windows in the streaming lib are messages, not bytes, + // that windows in the streaming lib are messages, not bytes, // so we only grow 1 every N times (where N = the slow start factor) int shouldIncrement = _context.random().nextInt(con.getOptions().getSlowStartGrowthRateFactor()); if (shouldIncrement <= 0) diff --git a/build.xml b/build.xml index 8c51c770735807d5451ba09e0fb66b83a7ae5e96..c59305d2f2d01311e3eaf5f5333a6da458caf6f4 100644 --- a/build.xml +++ b/build.xml @@ -76,9 +76,9 @@ windowtitle="I2P"> <sourcepath> <pathelement location="core/java/src" /> - <pathelement location="core/java/test" /> + <!--<pathelement location="core/java/test" />--> <pathelement location="router/java/src" /> - <pathelement location="router/java/test" /> + <!--<pathelement location="router/java/test" />--> <pathelement location="apps/ministreaming/java/src" /> <pathelement location="apps/streaming/java/src" /> <pathelement location="apps/i2ptunnel/java/src" /> @@ -246,7 +246,15 @@ <tarfileset dir="pkg-temp" includes="**/*" prefix="i2p" /> </tar> </target> - <target name="updater" depends="distclean, build"> + <target name="updater" depends="prepupdate"> + <zip destfile="i2pupdate.zip" basedir="pkg-temp" /> + </target> + <target name="updateTest" depends="prepupdate"> + <ant dir="core/java/" target="jarTest" /> + <copy file="core/java/build/i2ptest.jar" todir="pkg-temp/lib" /> + <zip destfile="i2pupdate.zip" basedir="pkg-temp" /> + </target> + <target name="prepupdate" depends="distclean, build"> <delete dir="pkg-temp" /> <copy file="build/i2p.jar" todir="pkg-temp/lib/" /> <copy file="build/i2ptunnel.jar" todir="pkg-temp/lib/" /> @@ -286,7 +294,6 @@ <mkdir dir="pkg-temp/eepsite" /> <mkdir dir="pkg-temp/eepsite/webapps" /> <mkdir dir="pkg-temp/eepsite/cgi-bin" /> - <zip destfile="i2pupdate.zip" basedir="pkg-temp" /> </target> <taskdef name="izpack" classpath="${basedir}/installer/lib/izpack/standalone-compiler.jar" classname="com.izforge.izpack.ant.IzPackTask" /> <target name="installer" depends="preppkg"> diff --git a/core/java/build.xml b/core/java/build.xml index 85b82b8d4579361e41c30a10f93387be2cf0c2f1..84a433dbf7072261a99e6909d53bc861302467a7 100644 --- a/core/java/build.xml +++ b/core/java/build.xml @@ -18,6 +18,9 @@ <target name="jar" depends="compile"> <jar destfile="./build/i2p.jar" basedir="./build/obj" includes="**/*.class" /> </target> + <target name="jarTest" depends="compileTest"> + <jar destfile="./build/i2ptest.jar" basedir="./build/obj" includes="**/*.class" /> + </target> <target name="javadoc"> <mkdir dir="./build" /> <mkdir dir="./build/javadoc" /> diff --git a/core/java/src/net/i2p/crypto/DummyPooledRandomSource.java b/core/java/src/net/i2p/crypto/DummyPooledRandomSource.java index 6438ff58fdb3f79b3000c2305a6505fdbf778c94..6b47520c42618a06a11fbcf178d41cbd3abd1789 100644 --- a/core/java/src/net/i2p/crypto/DummyPooledRandomSource.java +++ b/core/java/src/net/i2p/crypto/DummyPooledRandomSource.java @@ -4,6 +4,7 @@ import java.util.Random; import net.i2p.I2PAppContext; import net.i2p.util.PooledRandomSource; import net.i2p.util.RandomSource; +import net.i2p.util.Log; /** * @@ -11,6 +12,9 @@ import net.i2p.util.RandomSource; public class DummyPooledRandomSource extends PooledRandomSource { public DummyPooledRandomSource(I2PAppContext context) { super(context); + } + + protected void initializePool(I2PAppContext context) { _pool = new RandomSource[POOL_SIZE]; for (int i = 0; i < POOL_SIZE; i++) { _pool[i] = new DummyRandomSource(context); diff --git a/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java b/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java index b385e87af927679aaaf6e5a78e0c76032114e5ad..811e6e4121377624bfca3dc75b6143233d2fc6cb 100644 --- a/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/PersistentSessionKeyManager.java @@ -58,6 +58,8 @@ public class PersistentSessionKeyManager extends TransientSessionKeyManager { * */ public void saveState(OutputStream out) throws IOException, DataFormatException { + if (true) return; + Set tagSets = getInboundTagSets(); Set sessions = getOutboundSessions(); if (_log.shouldLog(Log.INFO)) diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index 6aeedef93c10ce8a5a0e531178ea1a8f6891f4e1..9219bf519db9f6439d8809a71810df923047ef28 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -10,6 +10,7 @@ package net.i2p.crypto; */ import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; @@ -262,11 +263,23 @@ class TransientSessionKeyManager extends SessionKeyManager { if (old != null) { TagSet oldTS = (TagSet)old; if (!oldTS.getAssociatedKey().equals(tagSet.getAssociatedKey())) { - if (_log.shouldLog(Log.ERROR)) { - _log.error("Multiple tags matching! tag: " + tag.toString() + " matches for new tagSet: " + tagSet + " and old tagSet: " + old); - _log.error("Earlier tag set creation: " + old + ": key=" + oldTS.getAssociatedKey().toBase64(), oldTS.getCreatedBy()); - _log.error("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey().toBase64(), tagSet.getCreatedBy()); + if (_log.shouldLog(Log.WARN)) { + _log.warn("Multiple tags matching! tag: " + tag.toString() + " matches for new tagSet: " + tagSet + " and old tagSet: " + old); + _log.warn("Earlier tag set creation: " + old + ": key=" + oldTS.getAssociatedKey().toBase64(), oldTS.getCreatedBy()); + _log.warn("Current tag set creation: " + tagSet + ": key=" + tagSet.getAssociatedKey().toBase64(), tagSet.getCreatedBy()); } + // drop both, rather than sit around confused + _inboundTagSets.remove(tag); + + for (Iterator tsIter = oldTS.dropTags().iterator(); iter.hasNext(); ) { + SessionTag curTag = (SessionTag)tsIter.next(); + _inboundTagSets.remove(curTag); + } + for (Iterator tsIter = tagSet.dropTags().iterator(); iter.hasNext(); ) { + SessionTag curTag = (SessionTag)tsIter.next(); + _inboundTagSets.remove(curTag); + } + } else { if (_log.shouldLog(Log.DEBUG)) { //tagSet.getTags().addAll(oldTS.getTags()); @@ -307,7 +320,7 @@ class TransientSessionKeyManager extends SessionKeyManager { synchronized (_inboundTagSets) { for (Iterator iter = _inboundTagSets.values().iterator(); iter.hasNext(); ) { TagSet set = (TagSet)iter.next(); - int size = set.getTags().size(); + int size = set.getTagCount(); if (size > 1000) absurd++; if (size > 100) @@ -322,7 +335,7 @@ class TransientSessionKeyManager extends SessionKeyManager { } for (int i = 0; i < removed.size(); i++) { TagSet cur = (TagSet)removed.get(i); - for (Iterator iter = cur.getTags().iterator(); iter.hasNext(); ) { + for (Iterator iter = cur.dropTags().iterator(); iter.hasNext(); ) { SessionTag tag = (SessionTag)iter.next(); _inboundTagSets.remove(tag); tags++; @@ -465,7 +478,7 @@ class TransientSessionKeyManager extends SessionKeyManager { for (Iterator siter = sets.iterator(); siter.hasNext();) { TagSet ts = (TagSet) siter.next(); buf.append("<li><b>Received on:</b> ").append(new Date(ts.getDate())).append(" with ") - .append(ts.getTags().size()).append(" tags remaining</li>"); + .append(ts.getTagCount()).append(" tags remaining</li>"); } buf.append("</ul></td></tr>"); } @@ -485,9 +498,7 @@ class TransientSessionKeyManager extends SessionKeyManager { buf.append("<tr><td><ul>"); for (Iterator siter = sess.getTagSets().iterator(); siter.hasNext();) { TagSet ts = (TagSet) siter.next(); - buf.append("<li><b>Sent on:</b> ").append(new Date(ts.getDate())).append(" with ").append( - ts.getTags() - .size()) + buf.append("<li><b>Sent on:</b> ").append(new Date(ts.getDate())).append(" with ").append(ts.getTagCount()) .append(" tags remaining</li>"); } buf.append("</ul></td></tr>"); @@ -540,7 +551,7 @@ class TransientSessionKeyManager extends SessionKeyManager { _tagSets = new ArrayList(); for (int i = 0; i < sets.size(); i++) { TagSet set = (TagSet) sets.get(i); - dropped += set.getTags().size(); + dropped += set.getTagCount(); } if (_log.shouldLog(Log.INFO)) _log.info("Rekeyed from " + _currentKey + " to " + key @@ -604,7 +615,7 @@ class TransientSessionKeyManager extends SessionKeyManager { for (int i = 0; i < _tagSets.size(); i++) { TagSet set = (TagSet) _tagSets.get(i); if (set.getDate() + SESSION_TAG_DURATION_MS > now) - tags += set.getTags().size(); + tags += set.getTagCount(); } } return tags; @@ -620,7 +631,7 @@ class TransientSessionKeyManager extends SessionKeyManager { synchronized (_tagSets) { for (Iterator iter = _tagSets.iterator(); iter.hasNext();) { TagSet set = (TagSet) iter.next(); - if ( (set.getDate() > last) && (set.getTags().size() > 0) ) + if ( (set.getDate() > last) && (set.getTagCount() > 0) ) last = set.getDate(); } } @@ -665,10 +676,31 @@ class TransientSessionKeyManager extends SessionKeyManager { _date = when; } - /** tags still available */ - public Set getTags() { + /** + * raw tags still available - you MUST synchronize against the TagSet instance + * if you need to use this set + */ + Set getTags() { return _sessionTags; } + + public int getTagCount() { + synchronized (TagSet.this) { + if (_sessionTags == null) + return 0; + else + return _sessionTags.size(); + } + } + + public Set dropTags() { + Set rv = null; + synchronized (TagSet.this) { + rv = _sessionTags; + _sessionTags = Collections.EMPTY_SET; + } + return rv; + } public SessionKey getAssociatedKey() { return _key; diff --git a/core/java/src/net/i2p/util/BufferedRandomSource.java b/core/java/src/net/i2p/util/BufferedRandomSource.java index f78584057885f9d197351914a6d0fb5c50321301..8103a56e067595ea3cded86a8be16d5dfcf68d0d 100644 --- a/core/java/src/net/i2p/util/BufferedRandomSource.java +++ b/core/java/src/net/i2p/util/BufferedRandomSource.java @@ -28,12 +28,15 @@ public class BufferedRandomSource extends RandomSource { private int _nextBit; private static volatile long _reseeds; - private static final int BUFFER_SIZE = 256*1024; + private static final int DEFAULT_BUFFER_SIZE = 256*1024; public BufferedRandomSource(I2PAppContext context) { + this(context, DEFAULT_BUFFER_SIZE); + } + public BufferedRandomSource(I2PAppContext context, int bufferSize) { super(context); context.statManager().createRateStat("prng.reseedCount", "How many times the prng has been reseeded", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } ); - _buffer = new byte[BUFFER_SIZE]; + _buffer = new byte[bufferSize]; refillBuffer(); // stagger reseeding _nextByte = ((int)_reseeds-1) * 16 * 1024; @@ -73,7 +76,7 @@ public class BufferedRandomSource extends RandomSource { _nextBit = 0; _nextByte++; } - if (_nextByte >= BUFFER_SIZE) + if (_nextByte >= _buffer.length) refillBuffer(); rv += (_buffer[_nextByte] << curBit); _nextBit++; @@ -98,7 +101,7 @@ public class BufferedRandomSource extends RandomSource { _nextBit = 0; _nextByte++; } - if (_nextByte >= BUFFER_SIZE) + if (_nextByte >= _buffer.length) refillBuffer(); int gobbleBits = 8 - _nextBit; int want = numBits - curBit; @@ -117,10 +120,10 @@ public class BufferedRandomSource extends RandomSource { public synchronized final void nextBytes(byte buf[]) { int outOffset = 0; while (outOffset < buf.length) { - int availableBytes = BUFFER_SIZE - _nextByte - (_nextBit != 0 ? 1 : 0); + int availableBytes = _buffer.length - _nextByte - (_nextBit != 0 ? 1 : 0); if (availableBytes <= 0) refillBuffer(); - int start = BUFFER_SIZE - availableBytes; + int start = _buffer.length - availableBytes; int writeSize = Math.min(buf.length - outOffset, availableBytes); System.arraycopy(_buffer, start, buf, outOffset, writeSize); outOffset += writeSize; @@ -195,6 +198,10 @@ public class BufferedRandomSource extends RandomSource { } public static void main(String args[]) { + for (int i = 0; i < 16; i++) + test(); + } + private static void test() { I2PAppContext ctx = I2PAppContext.getGlobalContext(); byte data[] = new byte[16*1024]; for (int i = 0; i < data.length; i += 4) { @@ -203,8 +210,7 @@ public class BufferedRandomSource extends RandomSource { DataHelper.toLong(data, i, 4, l); } byte compressed[] = DataHelper.compress(data); - System.out.println("Compressed: " + compressed.length); - System.out.println("Orig: " + data.length + ": " + toString(data)); + System.out.println("Data: " + data.length + "/" + compressed.length + ": " + toString(data)); } private static final String toString(byte data[]) { StringBuffer buf = new StringBuffer(data.length * 9); diff --git a/core/java/src/net/i2p/util/PooledRandomSource.java b/core/java/src/net/i2p/util/PooledRandomSource.java index d7dc0176d4820bc6d5f86f95cd9134cccae76710..eaf8ea95ea12a3c6608c0ce3dc2e768aea7924d7 100644 --- a/core/java/src/net/i2p/util/PooledRandomSource.java +++ b/core/java/src/net/i2p/util/PooledRandomSource.java @@ -21,14 +21,40 @@ public class PooledRandomSource extends RandomSource { protected volatile int _nextPool; public static final int POOL_SIZE = 16; + /** + * How much random data will we precalculate and feed from (as opposed to on demand + * reseeding, etc). If this is not set, a default will be used (4MB), or if it is + * set to 0, no buffer will be used, otherwise the amount specified will be allocated + * across the pooled PRNGs. + * + */ + public static final String PROP_BUFFER_SIZE = "i2p.prng.totalBufferSizeKB"; public PooledRandomSource(I2PAppContext context) { super(context); _log = context.logManager().getLog(PooledRandomSource.class); + initializePool(context); + } + + protected void initializePool(I2PAppContext context) { _pool = new RandomSource[POOL_SIZE]; + + String totalSizeProp = context.getProperty(PROP_BUFFER_SIZE); + int totalSize = -1; + if (totalSizeProp != null) { + try { + totalSize = Integer.parseInt(totalSizeProp); + } catch (NumberFormatException nfe) { + totalSize = -1; + } + } for (int i = 0; i < POOL_SIZE; i++) { - //_pool[i] = new RandomSource(context); - _pool[i] = new BufferedRandomSource(context); + if (totalSize < 0) + _pool[i] = new BufferedRandomSource(context); + else if (totalSize > 0) + _pool[i] = new BufferedRandomSource(context, (totalSize*1024) / POOL_SIZE); + else + _pool[i] = new RandomSource(context); _pool[i].nextBoolean(); } _nextPool = 0; diff --git a/history.txt b/history.txt index d6bf86e0c70662e2d54fef37aa15efa3686f032f..daa055c895f75075ec939c0ac26dae09d30a87dc 100644 --- a/history.txt +++ b/history.txt @@ -1,6 +1,20 @@ -$Id: history.txt,v 1.207 2005/07/04 15:44:17 jrandom Exp $ - -2005-07-05 +$Id: history.txt,v 1.208 2005/07/05 17:08:56 jrandom Exp $ + +2005-07-11 jrandom + * Reduced the growth factor on the slow start and congestion avoidance for + the streaming lib. + * Adjusted some of the I2PTunnelServer threading to use a small pool of + handlers, rather than launching off new threads which then immediately + launch off an I2PTunnelRunner instance (which launches 3 more threads..) + * Don't persist session keys / session tags (not worth it, for now) + * Added some detection and handling code for duplicate session tags being + delivered (root cause still not addressed) + * Make the PRNG's buffer size configurable (via the config property + "i2p.prng.totalBufferSizeKB=4096") + * Disable SSU flooding by default (duh) + * Updates to the StreamSink apps for better throttling tests. + +2005-07-05 jrandom * Use a buffered PRNG, pulling the PRNG data off a larger precalculated buffer, rather than the underlying PRNG's (likely small) one, which in turn reduces the frequency of recalcing. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 21dea88cb61050efb9b34623d256795c0c009d4a..dea8d7a2b726804ea86aa325bcde86dad1a0006c 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.198 $ $Date: 2005/07/04 15:44:21 $"; + public final static String ID = "$Revision: 1.199 $ $Date: 2005/07/05 17:08:59 $"; public final static String VERSION = "0.5.0.7"; - public final static long BUILD = 10; + public final static long BUILD = 11; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/SessionKeyPersistenceHelper.java b/router/java/src/net/i2p/router/SessionKeyPersistenceHelper.java index 93a35aba5f9288a458c42734f85fd6f44a9837c0..57db42bb425fcbebb2656b9ea90132a9a9f9a155 100644 --- a/router/java/src/net/i2p/router/SessionKeyPersistenceHelper.java +++ b/router/java/src/net/i2p/router/SessionKeyPersistenceHelper.java @@ -70,6 +70,8 @@ public class SessionKeyPersistenceHelper implements Service { } private void writeState() { + if (true) return; + Object o = _context.sessionKeyManager(); if (!(o instanceof PersistentSessionKeyManager)) { _log.error("Unable to persist the session key state - manager is " + o.getClass().getName()); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 40c96f2accb000fc12cd14350f388392012486d0..cb1a17f7aa39b77bfac8cd993ea4577c1265c080 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -135,7 +135,7 @@ public class ClientConnectionRunner { _alreadyProcessed.clear(); } _config = null; - _manager = null; + //_manager = null; } /** current client's config */ @@ -271,11 +271,13 @@ public class ClientConnectionRunner { + "]"); long beforeDistribute = _context.clock().now(); // the following blocks as described above - _manager.distributeMessage(_config.getDestination(), message.getDestination(), message.getPayload(), id); + SessionConfig cfg = _config; + if (cfg != null) + _manager.distributeMessage(cfg.getDestination(), dest, payload, id); long timeToDistribute = _context.clock().now() - beforeDistribute; if (_log.shouldLog(Log.DEBUG)) _log.warn("Time to distribute in the manager to " - + message.getDestination().calculateHash().toBase64() + ": " + + dest.calculateHash().toBase64() + ": " + timeToDistribute); return id; } diff --git a/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java b/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java index 27cf5dafea23763e9c67ab535b489e7b9246f63f..0e3182b9f9fd9cbd568b969844449abdd3170218 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageReceiver.java @@ -103,11 +103,11 @@ public class GarlicMessageReceiver { clove.getExpiration().getTime()); if (invalidReason != null) { String howLongAgo = DataHelper.formatDuration(_context.clock().now()-clove.getExpiration().getTime()); - if (_log.shouldLog(Log.ERROR)) - _log.error("Clove is NOT valid: id=" + clove.getCloveId() - + " expiration " + howLongAgo + " ago: " + invalidReason + ": " + clove); if (_log.shouldLog(Log.WARN)) _log.warn("Clove is NOT valid: id=" + clove.getCloveId() + + " expiration " + howLongAgo + " ago: " + invalidReason + ": " + clove); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Clove is NOT valid: id=" + clove.getCloveId() + " expiration " + howLongAgo + " ago", new Exception("Invalid within...")); _context.messageHistory().messageProcessingError(clove.getCloveId(), clove.getData().getClass().getName(), diff --git a/router/java/src/net/i2p/router/transport/TransportManager.java b/router/java/src/net/i2p/router/transport/TransportManager.java index d8b3ea3ce6a7924b23ce3f2221631883070a902e..127625b0cb79dd9497fdcaeca283a0718146dedf 100644 --- a/router/java/src/net/i2p/router/transport/TransportManager.java +++ b/router/java/src/net/i2p/router/transport/TransportManager.java @@ -140,7 +140,8 @@ public class TransportManager implements TransportEventListener { for (int i = 0; i < _transports.size(); i++) { Transport t = (Transport)_transports.get(i); if (failedTransports.contains(t.getStyle())) { - _log.debug("Skipping transport " + t.getStyle() + " as it already failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Skipping transport " + t.getStyle() + " as it already failed"); continue; } // we always want to try all transports, in case there is a faster bidirectional one diff --git a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java index 240d372fee4572df0e57a903dd1e8fbf11acbc0f..1f1ffe2342529fa74f055fd6ff15a0beca69b827 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java @@ -114,9 +114,9 @@ class UDPFlooder implements Runnable { private long calcFloodDelay() { try { - return Long.parseLong(_context.getProperty("udp.floodDelay", "30000")); + return Long.parseLong(_context.getProperty("udp.floodDelay", "300000")); } catch (Exception e) { - return 30*1000; + return 5*60*1000; } } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index cb5c0c8ed96ef8275b3b23141a1040617a45d0b0..a273bd051a714887ea8e9fabcad928083f29fa71 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -100,7 +100,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; /** should we flood all UDP peers with the configured rate? */ - private static final boolean SHOULD_FLOOD_PEERS = true; + private static final boolean SHOULD_FLOOD_PEERS = false; private static final int MAX_CONSECUTIVE_FAILED = 5;