diff --git a/apps/i2ptunnel/jsp/editClient.jsp b/apps/i2ptunnel/jsp/editClient.jsp index 9f22cbcfa39c6bab3e5aba4788ff5a3dae7ac5fe..12a3d6aa89c8f1eec37fd3ac7dd33216e453fb1a 100644 --- a/apps/i2ptunnel/jsp/editClient.jsp +++ b/apps/i2ptunnel/jsp/editClient.jsp @@ -111,7 +111,7 @@ if (curTunnel >= 0) { </select> <b>others:</b> -<input type="text" name="reachablyByOther" size="20" value="<%=clientInterface%>" /> +<input type="text" name="reachableByOther" size="20" value="<%=clientInterface%>" /> <% } %> </td> diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java index cc73355edc5ed173c2d72ab1078e0a40e5371fd8..f8a3c73516281b058f9d8d1cdee5ae9e27b59f17 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/TestSwarm.java @@ -149,7 +149,8 @@ public class TestSwarm { public void run() { _started = _context.clock().now(); _context.statManager().addRateData("swarm." + _connectionId + ".started", 1, 0); - byte data[] = new byte[32*1024]; + byte data[] = new byte[4*1024]; + _context.random().nextBytes(data); long value = 0; long lastSend = _context.clock().now(); if (_socket == null) { @@ -167,15 +168,19 @@ public class TestSwarm { try { OutputStream out = _socket.getOutputStream(); while (!_closed) { - out.write(data); - // out.flush(); - _totalSent += data.length; - _context.statManager().addRateData("swarm." + _connectionId + ".totalSent", _totalSent, 0); - //try { Thread.sleep(100); } catch (InterruptedException ie) {} - long now = _context.clock().now(); - _log.debug("Sending " + _connectionId + " after " + (now-lastSend)); - lastSend = now; - try { Thread.sleep(20); } catch (InterruptedException ie) {} + if (shouldSend()) { + out.write(data); + // out.flush(); + _totalSent += data.length; + _context.statManager().addRateData("swarm." + _connectionId + ".totalSent", _totalSent, 0); + //try { Thread.sleep(100); } catch (InterruptedException ie) {} + long now = _context.clock().now(); + //_log.debug("Sending " + _connectionId + " after " + (now-lastSend)); + lastSend = now; + //try { Thread.sleep(20); } catch (InterruptedException ie) {} + } else { + try { Thread.sleep(5000); } catch (InterruptedException ie) {} + } } } catch (Exception e) { _log.error("Error sending", e); @@ -188,13 +193,13 @@ public class TestSwarm { long now = lastRead; try { InputStream in = _socket.getInputStream(); - byte buf[] = new byte[32*1024]; + byte buf[] = new byte[8*1024]; int read = 0; while ( (read = in.read(buf)) != -1) { now = System.currentTimeMillis(); _totalReceived += read; _context.statManager().addRateData("swarm." + getConnectionId() + ".totalReceived", _totalReceived, 0); - _log.debug("Receiving " + _connectionId + " with " + read + " after " + (now-lastRead)); + //_log.debug("Receiving " + _connectionId + " with " + read + " after " + (now-lastRead)); lastRead = now; } } catch (Exception e) { @@ -203,4 +208,8 @@ public class TestSwarm { } } } + + private boolean shouldSend() { + return Boolean.valueOf(_context.getProperty("shouldSend", "false")).booleanValue(); + } } \ No newline at end of file diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 988723730a3fcf4114e391ef1e8592eb3ce9a2cc..e4cc494d4084dd041287b8c511754833937e3098 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -22,6 +22,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { private int _inactivityAction; private int _inboundBufferSize; private int _maxWindowSize; + private int _congestionAvoidanceGrowthRateFactor; + private int _slowStartGrowthRateFactor; public static final int PROFILE_BULK = 1; public static final int PROFILE_INTERACTIVE = 2; @@ -45,6 +47,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public static final String PROP_INACTIVITY_TIMEOUT = "i2p.streaming.inactivityTimeout"; public static final String PROP_INACTIVITY_ACTION = "i2p.streaming.inactivityAction"; public static final String PROP_MAX_WINDOW_SIZE = "i2p.streaming.maxWindowSize"; + public static final String PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = "i2p.streaming.congestionAvoidanceGrowthRateFactor"; + public static final String PROP_SLOW_START_GROWTH_RATE_FACTOR = "i2p.streaming.slowStartGrowthRateFactor"; public ConnectionOptions() { super(); @@ -74,6 +78,8 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setInactivityAction(opts.getInactivityAction()); setInboundBufferSize(opts.getInboundBufferSize()); setMaxWindowSize(opts.getMaxWindowSize()); + setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); + setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); } } @@ -85,13 +91,15 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setRTT(getInt(opts, PROP_INITIAL_RTT, 10*1000)); setReceiveWindow(getInt(opts, PROP_INITIAL_RECEIVE_WINDOW, 1)); setResendDelay(getInt(opts, PROP_INITIAL_RESEND_DELAY, 1000)); - setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 1000)); + setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, 500)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, 1)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, 5)); setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); - setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE); + setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); + setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); + setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); @@ -124,7 +132,11 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 5*60*1000)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_DISCONNECT)); - setInboundBufferSize((getMaxMessageSize() + 2) * Connection.MAX_WINDOW_SIZE); + setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); + if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR)) + setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); + if (opts.contains(PROP_SLOW_START_GROWTH_RATE_FACTOR)) + setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); if (opts.containsKey(PROP_CONNECT_TIMEOUT)) setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); @@ -257,6 +269,24 @@ public class ConnectionOptions extends I2PSocketOptionsImpl { public int getInboundBufferSize() { return _inboundBufferSize; } public void setInboundBufferSize(int bytes) { _inboundBufferSize = bytes; } + /** + * When we're in congestion avoidance, we grow the window size at the rate + * of 1/(windowSize*factor). In standard TCP, window sizes are in bytes, + * while in I2P, window sizes are in messages, so setting factor=maxMessageSize + * mimics TCP, but using a smaller factor helps grow a little more rapidly. + */ + public int getCongestionAvoidanceGrowthRateFactor() { return _congestionAvoidanceGrowthRateFactor; } + public void setCongestionAvoidanceGrowthRateFactor(int factor) { _congestionAvoidanceGrowthRateFactor = factor; } + + /** + * When we're in slow start, we grow the window size at the rate + * of 1/(factor). In standard TCP, window sizes are in bytes, + * while in I2P, window sizes are in messages, so setting factor=maxMessageSize + * mimics TCP, but using a smaller factor helps grow a little more rapidly. + */ + public int getSlowStartGrowthRateFactor() { return _slowStartGrowthRateFactor; } + public void setSlowStartGrowthRateFactor(int factor) { _slowStartGrowthRateFactor = factor; } + public String toString() { StringBuffer buf = new StringBuffer(128); buf.append("conDelay=").append(_connectDelay); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 64b2fb8aaffb43c46085756ea16c762b763e1b48..95ebb291bab4247db2a485bf32a3647ac687e81a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -64,6 +64,15 @@ public class ConnectionPacketHandler { con.packetReceived(); + boolean choke = false; + if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) { + if (packet.getOptionalDelay() > 60000) { + // requested choke + choke = true; + con.getOptions().setRTT(con.getOptions().getRTT() + 10*1000); + } + } + long ready = con.getInputStream().getHighestReadyBockId(); int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); int allowedBlocks = available/con.getOptions().getMaxMessageSize(); @@ -72,9 +81,10 @@ public class ConnectionPacketHandler { _log.warn("Inbound buffer exceeded on connection " + con + " (" + ready + "/"+ (ready+allowedBlocks) + "/" + available + ": dropping " + packet); - ack(con, packet.getAckThrough(), packet.getNacks(), null, false); - con.getOptions().setChoke(5*1000); + ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke); + con.getOptions().setChoke(61*1000); packet.releasePayload(); + con.ackImmediately(); return; } con.getOptions().setChoke(0); @@ -107,7 +117,7 @@ public class ConnectionPacketHandler { } else { int delay = con.getOptions().getSendAckDelay(); if (packet.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) // delayed ACK requested - delay += packet.getOptionalDelay(); + delay = packet.getOptionalDelay(); con.setNextSendTime(delay + _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet); @@ -142,7 +152,7 @@ public class ConnectionPacketHandler { // don't honor the ACK 0 in SYN packets received when the other side // has obviously not seen our messages } else { - fastAck = fastAck || ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew); + fastAck = ack(con, packet.getAckThrough(), packet.getNacks(), packet, isNew, choke); } con.eventOccurred(); if (fastAck) { @@ -159,7 +169,10 @@ public class ConnectionPacketHandler { } } - private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew) { + private boolean ack(Connection con, long ackThrough, long nacks[], Packet packet, boolean isNew, boolean choke) { + if ( (nacks != null) && (nacks.length > 0) ) + con.getOptions().setRTT(con.getOptions().getRTT() + nacks.length*1000); + int numResends = 0; List acked = con.ackPackets(ackThrough, nacks); if ( (acked != null) && (acked.size() > 0) ) { @@ -196,16 +209,16 @@ public class ConnectionPacketHandler { } if (packet != null) - return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0)); + return adjustWindow(con, isNew, packet.getSequenceNum(), numResends, (acked != null ? acked.size() : 0), choke); else - return adjustWindow(con, false, -1, numResends, (acked != null ? acked.size() : 0)); + return adjustWindow(con, false, -1, numResends, (acked != null ? acked.size() : 0), choke); } - private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked) { + private boolean adjustWindow(Connection con, boolean isNew, long sequenceNum, int numResends, int acked, boolean choke) { boolean congested = false; if ( (!isNew) && (sequenceNum > 0) ) { - // dup real packet + // dup real packet, or they told us to back off int oldSize = con.getOptions().getWindowSize(); con.congestionOccurred(); oldSize >>>= 1; @@ -235,12 +248,16 @@ public class ConnectionPacketHandler { // we can't use newWindowSize += 1/newWindowSize, since we're // integers, so lets use a random distribution instead - int shouldIncrement = _context.random().nextInt(newWindowSize); + int shouldIncrement = _context.random().nextInt(con.getOptions().getCongestionAvoidanceGrowthRateFactor()*newWindowSize); if (shouldIncrement <= 0) newWindowSize += 1; } else { - // slow start - newWindowSize += 1; + // slow start, but modified to take into account the fact + // that windows in the streaming lib are messages, not bytes, + // so we only grow 1 every N times (where N = the slow start factor) + int shouldIncrement = _context.random().nextInt(con.getOptions().getSlowStartGrowthRateFactor()); + if (shouldIncrement <= 0) + newWindowSize += 1; } } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 4bd107d31ae9be9cd9005c4e1381df71933ed4be..c4fc19f226360ec17714fcfabf55d0b66c9f94e8 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -135,6 +135,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } catch (IOException ioe) { throw new I2PSessionException("Error reading the destination key stream", ioe); } + if (options == null) + options = System.getProperties(); loadConfig(options); _sessionId = null; _leaseSet = null; diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 116dbab8848a99e17a5d5bc8da13b238a23768d7..b654260f5ea8f679edee67a2530c545c2883c805 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -50,6 +50,14 @@ class I2PSessionImpl2 extends I2PSessionImpl { super(ctx, destKeyStream, options); _log = ctx.logManager().getLog(I2PSessionImpl2.class); _sendingStates = new HashSet(32); + + ctx.statManager().createRateStat("i2cp.sendBestEffortTotalTime", "how long to do the full sendBestEffort call?", "i2cp", new long[] { 10*60*1000 } ); + //ctx.statManager().createRateStat("i2cp.sendBestEffortStage0", "first part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); + //ctx.statManager().createRateStat("i2cp.sendBestEffortStage1", "second part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); + //ctx.statManager().createRateStat("i2cp.sendBestEffortStage2", "third part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); + //ctx.statManager().createRateStat("i2cp.sendBestEffortStage3", "fourth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); + //ctx.statManager().createRateStat("i2cp.sendBestEffortStage4", "fifth part of sendBestEffort?", "i2cp", new long[] { 10*60*1000 } ); + } protected long getTimeout() { @@ -158,7 +166,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { long nonce = _context.random().nextInt(Integer.MAX_VALUE); if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state"); - MessageState state = new MessageState(nonce, getPrefix()); + MessageState state = new MessageState(_context, nonce, getPrefix()); state.setKey(key); state.setTags(sentTags); state.setNewKey(newKey); @@ -196,7 +204,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { // saying that the router received it - in theory, that should come back // immediately, but in practice can take up to a second (though usually // much quicker). setting this to false will short-circuit that delay - boolean actuallyWait = true; + boolean actuallyWait = false; // true; long beforeWaitFor = _context.clock().now(); if (actuallyWait) @@ -226,6 +234,13 @@ class I2PSessionImpl2 extends I2PSessionImpl { + (afterRemovingSync-beforeWaitFor) + "ms waiting for reply"); } + _context.statManager().addRateData("i2cp.sendBestEffortTotalTime", afterRemovingSync - begin, 0); + //_context.statManager().addRateData("i2cp.sendBestEffortStage0", beforeSendingSync- begin, 0); + //_context.statManager().addRateData("i2cp.sendBestEffortStage1", afterSendingSync- beforeSendingSync, 0); + //_context.statManager().addRateData("i2cp.sendBestEffortStage2", beforeWaitFor- afterSendingSync, 0); + //_context.statManager().addRateData("i2cp.sendBestEffortStage3", afterWaitFor- beforeWaitFor, 0); + //_context.statManager().addRateData("i2cp.sendBestEffortStage4", afterRemovingSync- afterWaitFor, 0); + if (found) { if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Message sent after " + state.getElapsed() + "ms with " @@ -260,7 +275,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { newKey = _context.keyGenerator().generateSessionKey(); long nonce = _context.random().nextInt(Integer.MAX_VALUE); - MessageState state = new MessageState(nonce, getPrefix()); + MessageState state = new MessageState(_context, nonce, getPrefix()); state.setKey(key); state.setTags(sentTags); state.setNewKey(newKey); @@ -418,6 +433,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { _log.info(getPrefix() + "No matching state for messageId " + msgId + " / " + nonce + " w/ status = " + status); } + _context.statManager().addRateData("i2cp.receiveStatusTime", _context.clock().now() - beforeSync, 0); } /** diff --git a/core/java/src/net/i2p/client/MessageState.java b/core/java/src/net/i2p/client/MessageState.java index ee153bd739f3e672d55126d8b3c2fe4e4b49f384..4ae18c74a42753f0987bd6bab71c1a8107a52e5d 100644 --- a/core/java/src/net/i2p/client/MessageState.java +++ b/core/java/src/net/i2p/client/MessageState.java @@ -4,6 +4,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import net.i2p.I2PAppContext; import net.i2p.data.Destination; import net.i2p.data.SessionKey; import net.i2p.data.i2cp.MessageId; @@ -16,6 +17,7 @@ import net.i2p.util.Log; * */ class MessageState { + private I2PAppContext _context; private final static Log _log = new Log(MessageState.class); private long _nonce; private String _prefix; @@ -31,8 +33,9 @@ class MessageState { private static long __stateId = 0; private long _stateId; - public MessageState(long nonce, String prefix) { + public MessageState(I2PAppContext ctx, long nonce, String prefix) { _stateId = ++__stateId; + _context = ctx; _nonce = nonce; _prefix = prefix + "[" + _stateId + "]: "; _id = null; @@ -42,7 +45,8 @@ class MessageState { _newKey = null; _tags = null; _to = null; - _created = Clock.getInstance().now(); + _created = ctx.clock().now(); + //ctx.statManager().createRateStat("i2cp.checkStatusTime", "how long it takes to go through the states", "i2cp", new long[] { 60*1000 }); } public void receive(int status) { @@ -99,32 +103,41 @@ class MessageState { } public long getElapsed() { - return Clock.getInstance().now() - _created; + return _context.clock().now() - _created; } public void waitFor(int status, long expiration) { - while (true) { + long checkTime = -1; + boolean found = false; + while (!found) { if (_cancelled) return; - long timeToWait = expiration - Clock.getInstance().now(); + long timeToWait = expiration - _context.clock().now(); if (timeToWait <= 0) { if (_log.shouldLog(Log.WARN)) _log.warn(_prefix + "Expired waiting for the status [" + status + "]"); return; } + found = false; synchronized (_receivedStatus) { + long beforeCheck = _context.clock().now(); if (locked_isSuccess(status) || locked_isFailure(status)) { if (_log.shouldLog(Log.DEBUG)) _log.debug(_prefix + "Received a confirm (one way or the other)"); - return; + found = true; } - if (timeToWait > 5000) { - timeToWait = 5000; - } - try { - _receivedStatus.wait(timeToWait); - } catch (InterruptedException ie) { // nop + checkTime = _context.clock().now() - beforeCheck; + if (!found) { + if (timeToWait > 5000) { + timeToWait = 5000; + } + try { + _receivedStatus.wait(timeToWait); + } catch (InterruptedException ie) { // nop + } } } + //if (found) + // _context.statManager().addRateData("i2cp.checkStatusTime", checkTime, 0); } } diff --git a/core/java/src/net/i2p/util/DecayingBloomFilter.java b/core/java/src/net/i2p/util/DecayingBloomFilter.java index 4727de9262e0f27acb8ee18e5e1a15f90bef2252..c02b39f3a1d8b3d0c81021cbaa92e75ecf643ffa 100644 --- a/core/java/src/net/i2p/util/DecayingBloomFilter.java +++ b/core/java/src/net/i2p/util/DecayingBloomFilter.java @@ -61,6 +61,18 @@ public class DecayingBloomFilter { SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs); } + public long getCurrentDuplicateCount() { return _currentDuplicates; } + public int getInsertedCount() { + synchronized (this) { + return _current.size() + _previous.size(); + } + } + public double getFalsePositiveRate() { + synchronized (this) { + return _current.falsePositives(); + } + } + /** * return true if the entry added is a duplicate * diff --git a/history.txt b/history.txt index 34c75003e77e182b0adcc627b188f4065bb5a098..204b2f92a77348ccabf6913d443b8b800bd97481 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,21 @@ -$Id: history.txt,v 1.194 2005/04/17 18:23:21 jrandom Exp $ +$Id: history.txt,v 1.195 2005/04/17 21:07:58 jrandom Exp $ + +2005-04-20 jrandom + * In the SDK, we don't actually need to block when we're sending a message + as BestEffort (and these days, we're always sending BestEffort). + * Pass out client messages in fewer (larger) steps. + * Have the InNetMessagePool short circuit dispatch requests. + * Have the message validator take into account expiration to cut down on + false positives at high transfer rates. + * Allow configuration of the probabalistic window size growth rate in the + streaming lib's slow start and congestion avoidance phases, and default + them to a more conservative value (2), rather than the previous value + (1). + * Reduce the ack delay in the streaming lib to 500ms + * Honor choke requests in the streaming lib (only affects those getting + insanely high transfer rates) + * Let the user specify an interface besides 127.0.0.1 or 0.0.0.0 on the + I2PTunnel client page (thanks maestro^!) 2005-04-17 sirup * Added the possibility for i2ptunnel client and httpclient instances to diff --git a/router/java/src/net/i2p/router/InNetMessagePool.java b/router/java/src/net/i2p/router/InNetMessagePool.java index 40ee57d7e21662ed175c05ce3362c9749fcc8fb0..e1516860f259dfc7e12303f38ebbd0549c9eea40 100644 --- a/router/java/src/net/i2p/router/InNetMessagePool.java +++ b/router/java/src/net/i2p/router/InNetMessagePool.java @@ -13,6 +13,7 @@ import java.util.ArrayList; import java.util.List; import net.i2p.data.Hash; import net.i2p.data.RouterIdentity; +import net.i2p.data.i2np.DataMessage; import net.i2p.data.i2np.DeliveryStatusMessage; import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.DatabaseSearchReplyMessage; @@ -52,6 +53,13 @@ public class InNetMessagePool implements Service { */ public static final String PROP_DISPATCH_THREADED = "router.dispatchThreaded"; public static final boolean DEFAULT_DISPATCH_THREADED = false; + /** + * If we aren't doing threaded dispatch for tunnel messages, should we + * call the actual dispatch() method inline (on the same thread which + * called add())? If false, we queue it up in a shared short circuit + * job. + */ + private static final boolean DISPATCH_DIRECT = true; public InNetMessagePool(RouterContext context) { _context = context; @@ -101,6 +109,10 @@ public class InNetMessagePool implements Service { + " expiring on " + exp + " of type " + messageBody.getClass().getName()); + //if (messageBody instanceof DataMessage) { + // _context.statManager().getStatLog().addData(fromRouterHash.toBase64().substring(0,6), "udp.floodDataReceived", 1, 0); + // return 0; + //} if (messageBody instanceof TunnelDataMessage) { // do not validate the message with the validator - the IV validator is sufficient } else { @@ -228,7 +240,7 @@ public class InNetMessagePool implements Service { // others and/or on other threads (e.g. transport threads). lets try 'em both. private void shortCircuitTunnelGateway(I2NPMessage messageBody) { - if (false) { + if (DISPATCH_DIRECT) { doShortCircuitTunnelGateway(messageBody); } else { synchronized (_pendingGatewayMessages) { @@ -249,7 +261,7 @@ public class InNetMessagePool implements Service { } private void shortCircuitTunnelData(I2NPMessage messageBody, Hash from) { - if (false) { + if (DISPATCH_DIRECT) { doShortCircuitTunnelData(messageBody, from); } else { synchronized (_pendingDataMessages) { diff --git a/router/java/src/net/i2p/router/MessageValidator.java b/router/java/src/net/i2p/router/MessageValidator.java index 8c35bff2622b85b965053adb76ab291284c1399b..140e83d44390ba46a5425e7c10d6c9b40a5aa194 100644 --- a/router/java/src/net/i2p/router/MessageValidator.java +++ b/router/java/src/net/i2p/router/MessageValidator.java @@ -60,6 +60,8 @@ public class MessageValidator { } } + private static final long TIME_MASK = 0xFFFFFC00; + /** * Note that we've received the message (which has the expiration given). * This functionality will need to be reworked for I2P 3.0 when we take into @@ -69,7 +71,16 @@ public class MessageValidator { * @return true if we HAVE already seen this message, false if not */ private boolean noteReception(long messageId, long messageExpiration) { - boolean dup = _filter.add(messageId); + long val = messageId; + // tweak the high order bits with the message expiration /seconds/ + val ^= (messageExpiration & TIME_MASK) << 16; + boolean dup = _filter.add(val); + if (dup && _log.shouldLog(Log.WARN)) { + _log.warn("Duplicate with " + _filter.getCurrentDuplicateCount() + + " other dups, " + _filter.getInsertedCount() + + " other entries, and a false positive rate of " + + _filter.getFalsePositiveRate()); + } return dup; } diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 6fdf15706736f06a9d19c83668f2ae3f0c07b5f3..1d7021deb3da92c4d6105e37ccdd1faa52a4f43a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.186 $ $Date: 2005/04/17 18:23:20 $"; + public final static String ID = "$Revision: 1.187 $ $Date: 2005/04/17 21:07:58 $"; public final static String VERSION = "0.5.0.6"; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 4b902f78141db58c95a1502a040c3f7b2ab63fa7..534c531d81c121102d4eb55eb084bf30d1e019e8 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -114,6 +114,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { ctx.statManager().createRateStat("client.leaseSetFoundLocally", "How often we tried to look for a leaseSet and found it locally?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.leaseSetFoundRemoteTime", "How long we tried to look fora remote leaseSet (when we succeeded)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); ctx.statManager().createRateStat("client.leaseSetFailedRemoteTime", "How long we tried to look for a remote leaseSet (when we failed)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchPrepareTime", "How long until we've queued up the dispatch job (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchTime", "How long until we've dispatched the message (since we started)?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + ctx.statManager().createRateStat("client.dispatchSendTime", "How long the actual dispatching takes?", "ClientMessages", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); long timeoutMs = OVERALL_TIMEOUT_MS_DEFAULT; _clientMessage = msg; _clientMessageId = msg.getMessageId(); @@ -355,8 +358,11 @@ public class OutboundClientMessageOneShotJob extends JobImpl { + _lease.getTunnelId() + " on " + _lease.getGateway().toBase64()); - // dispatch may take 100+ms, so toss it in its own job - getContext().jobQueue().addJob(new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now()))); + DispatchJob dispatchJob = new DispatchJob(getContext(), msg, selector, onReply, onFail, (int)(_overallExpiration-getContext().clock().now())); + if (false) // dispatch may take 100+ms, so toss it in its own job + getContext().jobQueue().addJob(dispatchJob); + else + dispatchJob.runJob(); } else { if (_log.shouldLog(Log.ERROR)) _log.error(getJobId() + ": Could not find any outbound tunnels to send the payload through... wtf?"); @@ -364,6 +370,7 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } _clientMessage = null; _clove = null; + getContext().statManager().addRateData("client.dispatchPrepareTime", getContext().clock().now() - _start, 0); } private class DispatchJob extends JobImpl { @@ -385,10 +392,13 @@ public class OutboundClientMessageOneShotJob extends JobImpl { getContext().messageRegistry().registerPending(_selector, _replyFound, _replyTimeout, _timeoutMs); if (_log.shouldLog(Log.INFO)) _log.info("Dispatching message to " + _toString + ": " + _msg); + long before = getContext().clock().now(); getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway()); + long dispatchSendTime = getContext().clock().now() - before; if (_log.shouldLog(Log.INFO)) _log.info("Dispatching message to " + _toString + " complete"); - + getContext().statManager().addRateData("client.dispatchTime", getContext().clock().now() - _start, 0); + getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0); } } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 0c67a2e12dbc93325975457286432539c128dfaf..5e3e92a135156fddba00461a0e7f3aaf4d047507 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -66,7 +66,7 @@ public class OutboundMessageState { } } - private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { + private synchronized void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { _message = m; _peer = peer; if (_messageBuf != null) { @@ -91,8 +91,9 @@ public class OutboundMessageState { _log.debug("Raw byte array for " + _messageId + ": " + Base64.encode(_messageBuf.getData(), 0, len)); } - public void releaseResources() { - _cache.release(_messageBuf); + public synchronized void releaseResources() { + if (_messageBuf != null) + _cache.release(_messageBuf); _messageBuf = null; } @@ -136,7 +137,7 @@ public class OutboundMessageState { * fragmentSize bytes per fragment. * */ - public void fragment(int fragmentSize) { + public synchronized void fragment(int fragmentSize) { int totalSize = _messageBuf.getValid(); int numFragments = totalSize / fragmentSize; if (numFragments * fragmentSize != totalSize) @@ -161,7 +162,8 @@ public class OutboundMessageState { } /** should we continue sending this fragment? */ public boolean shouldSend(int fragmentNum) { return _fragmentSends[fragmentNum] >= (short)0; } - public int fragmentSize(int fragmentNum) { + public synchronized int fragmentSize(int fragmentNum) { + if (_messageBuf == null) return -1; if (fragmentNum + 1 == _fragmentSends.length) return _messageBuf.getValid() % _fragmentSize; else @@ -233,6 +235,7 @@ public class OutboundMessageState { public synchronized int writeFragment(byte out[], int outOffset, int fragmentNum) { int start = _fragmentSize * fragmentNum; int end = start + _fragmentSize; + if (_messageBuf == null) return -1; if (end > _messageBuf.getValid()) end = _messageBuf.getValid(); int toSend = end - start; @@ -243,7 +246,7 @@ public class OutboundMessageState { return toSend; } - public String toString() { + public synchronized String toString() { StringBuffer buf = new StringBuffer(64); buf.append("Message ").append(_messageId); if (_fragmentSends != null) diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index d91e1070305404d0a363f66f8f6f9b5891764c24..fec48f3663450bbbc58a02949d3f7615eb1d511f 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -65,10 +65,15 @@ public class PacketBuilder { data[off] |= 1 << 2; // isLast off++; - DataHelper.toLong(data, off, 2, state.fragmentSize(fragment)); + int size = state.fragmentSize(fragment); + if (size < 0) + return null; + DataHelper.toLong(data, off, 2, size); off += 2; - off += state.writeFragment(data, off, fragment); + size = state.writeFragment(data, off, fragment); + if (size < 0) return null; + off += size; // we can pad here if we want, maybe randomized? diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index ef1592c0a91081f0b8a46302281476b512a97b8f..bc7a231ea56e9238c355f9f704ac9cdb265f6a08 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -150,6 +150,7 @@ public class UDPSender { try { synchronized (_outboundQueue) { if (_outboundQueue.size() <= 0) { + _outboundQueue.notifyAll(); _outboundQueue.wait(); } else { packet = (UDPPacket)_outboundQueue.remove(0); diff --git a/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java index c770e87183cd0739e24c901b55828471b55b4f56..10b3aebcad06c097ce636b4f3f66cc1f75cafc18 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java @@ -31,9 +31,10 @@ public class OutboundMessageDistributor { public void distribute(I2NPMessage msg, Hash target, TunnelId tunnel) { RouterInfo info = _context.netDb().lookupRouterInfoLocally(target); if (info == null) { - _log.debug("outbound distributor to " + target.toBase64().substring(0,4) - + "." + (tunnel != null ? tunnel.getTunnelId() + "" : "") - + ": no info locally, searching..."); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("outbound distributor to " + target.toBase64().substring(0,4) + + "." + (tunnel != null ? tunnel.getTunnelId() + "" : "") + + ": no info locally, searching..."); _context.netDb().lookupRouterInfo(target, new DistributeJob(_context, msg, target, tunnel), null, MAX_DISTRIBUTE_TIME); return; } else {