diff --git a/history.txt b/history.txt index 185feeb26..f340136d1 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,8 @@ +2015-12-21 zzz + * SSU: Hand all messages pending after establishment to the + outbound queue at once, for efficiency. + Don't sort outbound messages by size, keep priority order instead. + 2015-12-20 zzz * BuildHandler: Additional fixes (ticket #1738) * CertUtil: Add methods to export private keys diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 8ab24107c..80cfb9c84 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 17; + public final static long BUILD = 18; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 77fdd78ba..0fd9c83af 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -770,7 +770,7 @@ class EstablishmentManager { // so it needs to be caught in InNetMessagePool. dsm.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); dsm.setMessageId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); - _transport.send(dsm, peer); + // sent below // just do this inline //_context.simpleTimer2().addEvent(new PublishToNewInbound(peer), 0); @@ -780,8 +780,14 @@ class EstablishmentManager { // ok, we are fine with them, send them our latest info //if (_log.shouldLog(Log.INFO)) // _log.info("Publishing to the peer after confirm plus delay (without banlist): " + peer); - sendOurInfo(peer, true); + // bundle the two messages together for efficiency + DatabaseStoreMessage dbsm = getOurInfo(); + List msgs = new ArrayList(2); + msgs.add(dsm); + msgs.add(dbsm); + _transport.send(msgs, peer); } else { + _transport.send(dsm, peer); // nuh uh. if (_log.shouldLog(Log.WARN)) _log.warn("NOT publishing to the peer after confirm plus delay (WITH banlist): " + (hash != null ? hash.toString() : "unknown")); @@ -828,12 +834,14 @@ class EstablishmentManager { _transport.setIP(remote.calculateHash(), state.getSentIP()); _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0); + DatabaseStoreMessage dbsm = null; if (!state.isFirstMessageOurDSM()) { - sendOurInfo(peer, false); + dbsm = getOurInfo(); } else if (_log.shouldLog(Log.INFO)) { _log.info("Skipping publish: " + state); } + List msgs = new ArrayList(8); OutNetMessage msg; while ((msg = state.getNextQueuedMessage()) != null) { if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { @@ -841,21 +849,33 @@ class EstablishmentManager { _transport.failed(msg, "Took too long to establish, but it was established"); } else { msg.timestamp("session fully established and sent"); - _transport.send(msg); + msgs.add(msg); } } + _transport.send(dbsm, msgs, peer); return peer; } +/**** private void sendOurInfo(PeerState peer, boolean isInbound) { if (_log.shouldLog(Log.INFO)) _log.info("Publishing to the peer after confirm: " + (isInbound ? " inbound con from " + peer : "outbound con to " + peer)); - + DatabaseStoreMessage m = getOurInfo(); + _transport.send(m, peer); + } +****/ + + /** + * A database store message with our router info + * @return non-null + * @since 0.9.24 split from sendOurInfo() + */ + private DatabaseStoreMessage getOurInfo() { DatabaseStoreMessage m = new DatabaseStoreMessage(_context); m.setEntry(_context.router().getRouterInfo()); m.setMessageExpiration(_context.clock().now() + DATA_MESSAGE_TIMEOUT); - _transport.send(m, peer); + return m; } /** the relay tag is a 4-byte field in the protocol */ diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index d9cdfe01a..b0350e2bd 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -172,11 +172,12 @@ class OutboundMessageFragments { } /** - * short circuit the OutNetMessage, letting us send the establish - * complete message reliably + * Short circuit the OutNetMessage, letting us send the establish + * complete message reliably. + * If you have multiple messages, use the list variant, + * so the messages may be bundled efficiently. */ - public void add(OutboundMessageState state) { - PeerState peer = state.getPeer(); + public void add(OutboundMessageState state, PeerState peer) { if (peer == null) throw new RuntimeException("null peer for " + state); peer.add(state); @@ -184,6 +185,22 @@ class OutboundMessageFragments { //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); } + /** + * Short circuit the OutNetMessage, letting us send multiple messages + * reliably and efficiently. + * @since 0.9.24 + */ + public void add(List states, PeerState peer) { + if (peer == null) + throw new RuntimeException("null peer"); + int sz = states.size(); + for (int i = 0; i < sz; i++) { + peer.add(states.get(i)); + } + add(peer); + //_context.statManager().addRateData("udp.outboundActiveCount", active, 0); + } + /** * Add the peer to the list of peers wanting to transmit something. * This wakes up the packet pusher if it is sleeping. @@ -400,8 +417,10 @@ class OutboundMessageFragments { int fragmentsToSend = toSend.size(); // sort by size, biggest first // don't bother unless more than one state (fragments are already sorted within a state) - if (fragmentsToSend > 1 && states.size() > 1) - Collections.sort(toSend, new FragmentComparator()); + // This puts the DeliveryStatusMessage after the DatabaseStoreMessage, don't do it for now. + // It also undoes the ordering of the priority queue in PeerState. + //if (fragmentsToSend > 1 && states.size() > 1) + // Collections.sort(toSend, new FragmentComparator()); List sendNext = new ArrayList(Math.min(toSend.size(), 4)); List rv = new ArrayList(toSend.size()); @@ -490,6 +509,7 @@ class OutboundMessageFragments { * Biggest first * @since 0.9.16 */ +/**** private static class FragmentComparator implements Comparator, Serializable { public int compare(Fragment l, Fragment r) { @@ -497,7 +517,9 @@ class OutboundMessageFragments { return r.state.fragmentSize(r.num) - l.state.fragmentSize(l.num); } } +****/ + /** throttle */ public interface ActiveThrottle { public void choke(Hash peer); public void unchoke(Hash peer); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index b700aa80d..de804ae9f 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -305,7 +305,7 @@ class OutboundMessageState implements CDPQEntry { buf.append(i).append(' '); } } - buf.append(" to: ").append(_peer.toString()); + //buf.append(" to: ").append(_peer.toString()); return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 9eb45b6b7..503fb0fef 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1828,14 +1828,74 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } /** - * "injected" message from the EstablishmentManager + * "injected" message from the EstablishmentManager. + * If you have multiple messages, use the list variant, + * so the messages may be bundled efficiently. + * + * @param peer all messages MUST be going to this peer */ void send(I2NPMessage msg, PeerState peer) { try { OutboundMessageState state = new OutboundMessageState(_context, msg, peer); if (_log.shouldLog(Log.DEBUG)) _log.debug("Injecting a data message to a new peer: " + peer); - _fragments.add(state); + _fragments.add(state, peer); + } catch (IllegalArgumentException iae) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Shouldnt happen", new Exception("I did it")); + } + } + + /** + * "injected" message from the EstablishmentManager, + * plus pending messages to send, + * so the messages may be bundled efficiently. + * Called at end of outbound establishment. + * + * @param msg may be null if nothing to inject + * @param msgs non-null, may be empty + * @param peer all messages MUST be going to this peer + * @since 0.9.24 + */ + void send(I2NPMessage msg, List msgs, PeerState peer) { + try { + int sz = msgs.size(); + List states = new ArrayList(sz + 1); + if (msg != null) { + OutboundMessageState state = new OutboundMessageState(_context, msg, peer); + states.add(state); + } + for (int i = 0; i < sz; i++) { + OutboundMessageState state = new OutboundMessageState(_context, msgs.get(i), peer); + states.add(state); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Injecting " + states.size() + " data messages to a new peer: " + peer); + _fragments.add(states, peer); + } catch (IllegalArgumentException iae) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Shouldnt happen", new Exception("I did it")); + } + } + + /** + * "injected" messages from the EstablishmentManager. + * Called at end of inbound establishment. + * + * @param peer all messages MUST be going to this peer + * @since 0.9.24 + */ + void send(List msgs, PeerState peer) { + try { + int sz = msgs.size(); + List states = new ArrayList(sz); + for (int i = 0; i < sz; i++) { + OutboundMessageState state = new OutboundMessageState(_context, msgs.get(i), peer); + states.add(state); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Injecting " + sz + " data messages to a new peer: " + peer); + _fragments.add(states, peer); } catch (IllegalArgumentException iae) { if (_log.shouldLog(Log.WARN)) _log.warn("Shouldnt happen", new Exception("I did it"));