From 14cd469c6dc74769c10a81c576da3a0f826c3667 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Sat, 5 Nov 2005 21:12:57 +0000 Subject: [PATCH] 2005-11-05 jrandom * Include the most recent ACKs with packets, rather than only sending an ack exactly once. SSU differs from TCP in this regard, as TCP has ever increasing sequence numbers, while each message ID in SSU is random, so we don't get the benefit of later ACKs implicitly ACKing earlier messages. * Reduced the max retransmission timeout for SSU * Don't try to send messages queued up for a long time waiting for establishment. --- history.txt | 12 ++++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../net/i2p/router/transport/GetBidsJob.java | 1 + .../i2p/router/transport/TransportImpl.java | 14 ++++-- .../i2p/router/transport/udp/ACKSender.java | 2 +- .../transport/udp/EstablishmentManager.java | 27 +++++++++-- .../router/transport/udp/PacketBuilder.java | 47 ++++++++++++++++--- .../i2p/router/transport/udp/PeerState.java | 42 ++++++++++++++--- .../TimedWeightedPriorityMessageQueue.java | 10 +++- .../router/transport/udp/UDPPacketReader.java | 5 +- .../router/transport/udp/UDPTransport.java | 12 ++++- 11 files changed, 147 insertions(+), 29 deletions(-) diff --git a/history.txt b/history.txt index aa116831db..01b8c6025c 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,14 @@ -$Id: history.txt,v 1.313 2005/11/03 20:20:18 jrandom Exp $ +$Id: history.txt,v 1.314 2005/11/05 06:01:57 dust Exp $ + +2005-11-05 jrandom + * Include the most recent ACKs with packets, rather than only sending an + ack exactly once. SSU differs from TCP in this regard, as TCP has ever + increasing sequence numbers, while each message ID in SSU is random, so + we don't get the benefit of later ACKs implicitly ACKing earlier + messages. + * Reduced the max retransmission timeout for SSU + * Don't try to send messages queued up for a long time waiting for + establishment. 2005-11-05 dust * Fix sucker to delete its temporary files. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index bac7a84f59..8337157e5a 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.282 $ $Date: 2005/11/03 20:20:17 $"; + public final static String ID = "$Revision: 1.283 $ $Date: 2005/11/05 06:01:58 $"; public final static String VERSION = "0.6.1.4"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/GetBidsJob.java b/router/java/src/net/i2p/router/transport/GetBidsJob.java index 4e207f7a00..2190575989 100644 --- a/router/java/src/net/i2p/router/transport/GetBidsJob.java +++ b/router/java/src/net/i2p/router/transport/GetBidsJob.java @@ -42,6 +42,7 @@ public class GetBidsJob extends JobImpl { static void getBids(RouterContext context, CommSystemFacadeImpl facade, OutNetMessage msg) { Log log = context.logManager().getLog(GetBidsJob.class); Hash to = msg.getTarget().getIdentity().getHash(); + msg.timestamp("bid"); if (context.shitlist().isShitlisted(to)) { if (log.shouldLog(Log.WARN)) diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index f85ff8e4ac..a867b5a814 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -132,15 +132,18 @@ public abstract class TransportImpl implements Transport { if (msToSend > 1000) { if (_log.shouldLog(Log.WARN)) - _log.warn("afterSend: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " + _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " took " + msToSend); } long lifetime = msg.getLifetime(); - if (lifetime > 5000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + if (lifetime > 3000) { + int level = Log.WARN; + //if (!sendSuccessful) + // level = Log.INFO; + if (_log.shouldLog(level)) + _log.log(level, "afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString()); } else { @@ -219,7 +222,7 @@ public abstract class TransportImpl implements Transport { _log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful + "): " + allTime + "ms " + " after failing on: " + msg.getFailedTransports() + " and succeeding on " + getStyle()); - if (allTime > 60*1000) { + if ( (allTime > 60*1000) && (sendSuccessful) ) { // WTF!!@# if (_log.shouldLog(Log.WARN)) _log.warn("WTF, more than a minute slow? " + msg.getMessageType() @@ -271,6 +274,7 @@ public abstract class TransportImpl implements Transport { if (_log.shouldLog(Log.DEBUG)) _log.debug("Message added to send pool"); + msg.timestamp("send on " + getStyle()); outboundMessageReady(); if (_log.shouldLog(Log.INFO)) _log.debug("OutboundMessageReady called"); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 383d002de2..8a68f6b00a 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -92,7 +92,7 @@ public class ACKSender implements Runnable { if (peer != null) { long lastSend = peer.getLastACKSend(); long wanted = peer.getWantedACKSendSince(); - List ackBitfields = peer.retrieveACKBitfields(); + List ackBitfields = peer.retrieveACKBitfields(false); if (wanted < 0) _log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index cf842f1caf..3baf7317a0 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -17,6 +17,7 @@ import net.i2p.data.Signature; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.router.CommSystemFacade; import net.i2p.router.OutNetMessage; +import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -137,8 +138,10 @@ public class EstablishmentManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Add outobund establish state to: " + to); + OutboundEstablishState state = null; + int deferred = 0; synchronized (_outboundStates) { - OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(to); + state = (OutboundEstablishState)_outboundStates.get(to); if (state == null) { if (_outboundStates.size() >= getMaxConcurrentEstablish()) { List queued = (List)_queuedOutbound.get(to); @@ -147,6 +150,7 @@ public class EstablishmentManager { _queuedOutbound.put(to, queued); } queued.add(msg); + deferred = _queuedOutbound.size(); } else { state = new OutboundEstablishState(_context, remAddr, port, msg.getTarget().getIdentity(), @@ -163,6 +167,10 @@ public class EstablishmentManager { } } + if (deferred > 0) + msg.timestamp("too many deferred establishers: " + deferred); + else if (state != null) + msg.timestamp("establish state already waiting " + state.getLifetime()); notifyActivity(); } @@ -312,8 +320,11 @@ public class EstablishmentManager { new SessionKey(addr.getIntroKey()), addr); _outboundStates.put(to, qstate); - for (int i = 0; i < queued.size(); i++) - qstate.addMessage((OutNetMessage)queued.get(i)); + for (int i = 0; i < queued.size(); i++) { + OutNetMessage m = (OutNetMessage)queued.get(i); + m.timestamp("no longer deferred... establishing"); + qstate.addMessage(m); + } admitted++; } return admitted; @@ -388,11 +399,19 @@ public class EstablishmentManager { _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0); sendOurInfo(peer); + int i = 0; while (true) { OutNetMessage msg = state.getNextQueuedMessage(); if (msg == null) break; - _transport.send(msg); + if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { + msg.timestamp("took too long but established..."); + _transport.failed(msg); + } else { + msg.timestamp("session fully established and sent " + i); + _transport.send(msg); + } + i++; } return peer; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index 0d0c1b15b4..19608a2ecb 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -43,13 +43,24 @@ public class PacketBuilder { /** * @param ackIdsRemaining list of messageIds (Long) that should be acked by this packet. * The list itself is passed by reference, and if a messageId is - * included, it should be removed from the list. + * transmitted and the sender does not want the ID to be included + * in subsequent acks, it should be removed from the list. NOTE: + * right now this does NOT remove the IDs, which means it assumes + * that the IDs will be transmitted potentially multiple times, + * and should otherwise be removed from the list. * @param partialACKsRemaining list of messageIds (ACKBitfield) that should be acked by this packet. * The list itself is passed by reference, and if a messageId is * included, it should be removed from the list. */ public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) { UDPPacket packet = UDPPacket.acquire(_context); + + StringBuffer msg = null; + boolean acksIncluded = false; + if (_log.shouldLog(Log.WARN)) { + msg = new StringBuffer(128); + msg.append("building data packet with acks to ").append(peer.getRemotePeer().toBase64().substring(0,6)); + } byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); @@ -81,10 +92,14 @@ public class PacketBuilder { if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) { DataHelper.toLong(data, off, 1, ackIdsRemaining.size()); off++; - while (ackIdsRemaining.size() > 0) { - Long ackId = (Long)ackIdsRemaining.remove(0); + for (int i = 0; i < ackIdsRemaining.size(); i++) { + //while (ackIdsRemaining.size() > 0) { + Long ackId = (Long)ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0); DataHelper.toLong(data, off, 4, ackId.longValue()); - off += 4; + off += 4; + if (msg != null) // logging it + msg.append(" full ack: ").append(ackId.longValue()); + acksIncluded = true; } } @@ -111,12 +126,18 @@ public class PacketBuilder { off++; } partialACKsRemaining.remove(i); + if (msg != null) // logging it + msg.append(" partial ack: ").append(bitfield); + acksIncluded = true; i--; } // now jump back and fill in the number of bitfields *actually* included DataHelper.toLong(data, numPartialOffset, 1, origNumRemaining - partialACKsRemaining.size()); } + if ( (msg != null) && (acksIncluded) ) + _log.warn(msg.toString()); + DataHelper.toLong(data, off, 1, 1); // only one fragment in this message off++; @@ -171,6 +192,12 @@ public class PacketBuilder { public UDPPacket buildACK(PeerState peer, List ackBitfields) { UDPPacket packet = UDPPacket.acquire(_context); + StringBuffer msg = null; + if (_log.shouldLog(Log.WARN)) { + msg = new StringBuffer(128); + msg.append("building ACK packet to ").append(peer.getRemotePeer().toBase64().substring(0,6)); + } + byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -207,6 +234,8 @@ public class PacketBuilder { if (bf.receivedComplete()) { DataHelper.toLong(data, off, 4, bf.getMessageId()); off += 4; + if (msg != null) // logging it + msg.append(" full ack: ").append(bf.getMessageId()); } } } @@ -231,12 +260,18 @@ public class PacketBuilder { } off++; } + + if (msg != null) // logging it + msg.append(" partial ack: ").append(bitfield); } } DataHelper.toLong(data, off, 1, 0); // no fragments in this message off++; + if (msg != null) + _log.warn(msg.toString()); + // we can pad here if we want, maybe randomized? // pad up so we're on the encryption boundary @@ -777,8 +812,8 @@ public class PacketBuilder { System.arraycopy(ourIntroKey.getData(), 0, data, off, SessionKey.KEYSIZE_BYTES); off += SessionKey.KEYSIZE_BYTES; - if (_log.shouldLog(Log.WARN)) - _log.warn("wrote alice intro key: " + Base64.encode(data, off-SessionKey.KEYSIZE_BYTES, SessionKey.KEYSIZE_BYTES) + if (_log.shouldLog(Log.DEBUG)) + _log.debug("wrote alice intro key: " + Base64.encode(data, off-SessionKey.KEYSIZE_BYTES, SessionKey.KEYSIZE_BYTES) + " with nonce " + introNonce + " size=" + (off+4 + (16 - (off+4)%16)) + " and data: " + Base64.encode(data, 0, off)); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 5bb01c20f1..942e71c385 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -75,6 +75,12 @@ public class PeerState { private long _lastFailedSendPeriod; /** list of messageIds (Long) that we have received but not yet sent */ private List _currentACKs; + /** + * list of the most recent messageIds (Long) that we have received and sent + * an ACK for. We keep a few of these around to retransmit with _currentACKs, + * hopefully saving some spurious retransmissions + */ + private List _currentACKsResend; /** when did we last send ACKs to the peer? */ private volatile long _lastACKSend; /** when did we decide we need to ACK to this peer? */ @@ -173,7 +179,7 @@ public class PeerState { */ private static final int DEFAULT_MTU = 608;//600; //1500; private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY; - private static final int MAX_RTO = 5000; // 5000; + private static final int MAX_RTO = 1200; // 5000; public PeerState(I2PAppContext ctx) { _context = ctx; @@ -191,6 +197,7 @@ public class PeerState { _lastSendTime = -1; _lastReceiveTime = -1; _currentACKs = new ArrayList(8); + _currentACKsResend = new ArrayList(8); _currentSecondECNReceived = false; _remoteWantsPreviousACKs = false; _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; @@ -536,16 +543,25 @@ public class PeerState { */ public List getCurrentFullACKs() { synchronized (_currentACKs) { - return new ArrayList(_currentACKs); + ArrayList rv = new ArrayList(_currentACKs); + // include some for retransmission + rv.addAll(_currentACKsResend); + return rv; } } public void removeACKMessage(Long messageId) { synchronized (_currentACKs) { _currentACKs.remove(messageId); + _currentACKsResend.add(messageId); + // trim down the resends + while (_currentACKsResend.size() > MAX_RESEND_ACKS) + _currentACKsResend.remove(0); } _lastACKSend = _context.clock().now(); } + private static final int MAX_RESEND_ACKS = 8; + /** * grab a list of ACKBitfield instances, some of which may fully * ACK a message while others may only partially ACK a message. @@ -555,20 +571,34 @@ public class PeerState { * will be unchanged if there are ACKs remaining. * */ - public List retrieveACKBitfields() { + public List retrieveACKBitfields() { return retrieveACKBitfields(true); } + public List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { List rv = null; int bytesRemaining = countMaxACKData(); synchronized (_currentACKs) { rv = new ArrayList(_currentACKs.size()); + int oldIndex = _currentACKsResend.size(); while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { - long id = ((Long)_currentACKs.remove(0)).longValue(); + Long val = (Long)_currentACKs.remove(0); + long id = val.longValue(); rv.add(new FullACKBitfield(id)); + _currentACKsResend.add(val); bytesRemaining -= 4; } if (_currentACKs.size() <= 0) _wantACKSendSince = -1; + if (alwaysIncludeRetransmissions || rv.size() > 0) { + // now repeat by putting in some old ACKs + for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) { + rv.add(new FullACKBitfield(((Long)_currentACKsResend.get(i)).longValue())); + bytesRemaining -= 4; + } + } + // trim down the resends + while (_currentACKsResend.size() > MAX_RESEND_ACKS) + _currentACKsResend.remove(0); } - + int partialIncluded = 0; if (bytesRemaining > 4) { // ok, there's room to *try* to fit in some partial ACKs, so @@ -674,7 +704,7 @@ public class PeerState { _messagesSent++; if (numSends < 2) recalculateTimeouts(lifetime); - else + else if (_log.shouldLog(Log.WARN)) _log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); _context.statManager().addRateData("udp.sendBps", _sendBps, lifetime); diff --git a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java index 97cf9d9fc4..ed18a452af 100644 --- a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java +++ b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java @@ -104,6 +104,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound _addedSincePassBegan = true; _nextLock.notifyAll(); } + message.timestamp("added to queue " + queue); } /** @@ -138,10 +139,14 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound _messagesFlushed[currentQueue] = 0; _nextQueue = (currentQueue + 1) % _queue.length; } - _context.statManager().addRateData("udp.messageQueueSize", _queue[currentQueue].size(), currentQueue); + int sz = _queue[currentQueue].size(); + _context.statManager().addRateData("udp.messageQueueSize", sz, currentQueue); if (_log.shouldLog(Log.DEBUG)) _log.debug("Pulling a message off queue " + currentQueue + " with " - + _queue[currentQueue].size() + " remaining"); + + sz + " remaining"); + + + msg.timestamp("made active with remaining queue size " + sz); return msg; } @@ -247,6 +252,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound for (int i = 0; i < removed.size(); i++) { OutNetMessage m = (OutNetMessage)removed.get(i); + m.timestamp("expirer killed it"); _listener.failed(m); } removed.clear(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index b481381680..b80383e764 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -506,9 +506,12 @@ public class UDPPacketReader { buf.append(getMessageId()); buf.append(" with ACKs for: "); int numFrags = fragmentCount(); - for (int i = 0; i < numFrags; i++) + for (int i = 0; i < numFrags; i++) { if (received(i)) buf.append(i).append(" "); + else + buf.append('!').append(i).append(" "); + } return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index c674de1517..07be8e5c9b 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -669,7 +669,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (msg == null) return; if (msg.getTarget() == null) return; if (msg.getTarget().getIdentity() == null) return; - + + msg.timestamp("sending on UDP transport"); Hash to = msg.getTarget().getIdentity().calculateHash(); PeerState peer = getPeerState(to); if (peer != null) { @@ -682,13 +683,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority (peer.getConsecutiveFailedSends() > 0) ) { // peer is waaaay idle, drop the con and queue it up as a new con dropPeer(peer, false); + msg.timestamp("peer is really idle, dropping con and reestablishing"); _establisher.establish(msg); _context.statManager().addRateData("udp.proactiveReestablish", now-lastSend, now-peer.getKeyEstablishedTime()); return; } } + msg.timestamp("enqueueing for an already established peer"); _outboundMessages.add(msg); } else { + msg.timestamp("establishing a new connection"); _establisher.establish(msg); } } @@ -846,6 +850,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if ( (msg.getPeer() != null) && ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || (msg.isExpired())) ) { + OutNetMessage m = msg.getMessage(); + if (m != null) + m.timestamp("message failure - volleys = " + msg.getMaxSends() + + " lastReceived: " + (_context.clock().now() - msg.getPeer().getLastReceiveTime()) + + " lastSentFully: " + (_context.clock().now() - msg.getPeer().getLastSendFullyTime()) + + " expired? " + msg.isExpired()); consecutive = msg.getPeer().incrementConsecutiveFailedSends(); if (_log.shouldLog(Log.WARN)) _log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer()); -- GitLab