diff --git a/history.txt b/history.txt index aa116831db6f2b9d2367d74e6d5dd54c4c1ff29a..01b8c6025cfff769fca139248f25ff16c2ba6859 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,14 @@ -$Id: history.txt,v 1.313 2005/11/03 20:20:18 jrandom Exp $ +$Id: history.txt,v 1.314 2005/11/05 06:01:57 dust Exp $ + +2005-11-05 jrandom + * Include the most recent ACKs with packets, rather than only sending an + ack exactly once. SSU differs from TCP in this regard, as TCP has ever + increasing sequence numbers, while each message ID in SSU is random, so + we don't get the benefit of later ACKs implicitly ACKing earlier + messages. + * Reduced the max retransmission timeout for SSU + * Don't try to send messages queued up for a long time waiting for + establishment. 2005-11-05 dust * Fix sucker to delete its temporary files. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index bac7a84f5920f67203d4ce7ef4a3d54b3d9749bc..8337157e5aaba5fc10216b3d34eee5cf8dbfd4d1 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.282 $ $Date: 2005/11/03 20:20:17 $"; + public final static String ID = "$Revision: 1.283 $ $Date: 2005/11/05 06:01:58 $"; public final static String VERSION = "0.6.1.4"; - public final static long BUILD = 3; + public final static long BUILD = 4; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/GetBidsJob.java b/router/java/src/net/i2p/router/transport/GetBidsJob.java index 4e207f7a00ed463e5fa7bd08e703cc28472d4cc6..219057598986e34074af371b334556cab45afb7b 100644 --- a/router/java/src/net/i2p/router/transport/GetBidsJob.java +++ b/router/java/src/net/i2p/router/transport/GetBidsJob.java @@ -42,6 +42,7 @@ public class GetBidsJob extends JobImpl { static void getBids(RouterContext context, CommSystemFacadeImpl facade, OutNetMessage msg) { Log log = context.logManager().getLog(GetBidsJob.class); Hash to = msg.getTarget().getIdentity().getHash(); + msg.timestamp("bid"); if (context.shitlist().isShitlisted(to)) { if (log.shouldLog(Log.WARN)) diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index f85ff8e4acb6afe53f450a0686a0aab8c4ddb2a8..a867b5a814ce07d49f5975c59b94d00530255b7a 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -132,15 +132,18 @@ public abstract class TransportImpl implements Transport { if (msToSend > 1000) { if (_log.shouldLog(Log.WARN)) - _log.warn("afterSend: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " + _log.warn("afterSend slow: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " took " + msToSend); } long lifetime = msg.getLifetime(); - if (lifetime > 5000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + if (lifetime > 3000) { + int level = Log.WARN; + //if (!sendSuccessful) + // level = Log.INFO; + if (_log.shouldLog(level)) + _log.log(level, "afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString()); } else { @@ -219,7 +222,7 @@ public abstract class TransportImpl implements Transport { _log.info("Took too long from preperation to afterSend(ok? " + sendSuccessful + "): " + allTime + "ms " + " after failing on: " + msg.getFailedTransports() + " and succeeding on " + getStyle()); - if (allTime > 60*1000) { + if ( (allTime > 60*1000) && (sendSuccessful) ) { // WTF!!@# if (_log.shouldLog(Log.WARN)) _log.warn("WTF, more than a minute slow? " + msg.getMessageType() @@ -271,6 +274,7 @@ public abstract class TransportImpl implements Transport { if (_log.shouldLog(Log.DEBUG)) _log.debug("Message added to send pool"); + msg.timestamp("send on " + getStyle()); outboundMessageReady(); if (_log.shouldLog(Log.INFO)) _log.debug("OutboundMessageReady called"); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 383d002de29e3577a09e70c3c10be965b7bacc54..8a68f6b00a45dff70659678928b73ceb641927c6 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -92,7 +92,7 @@ public class ACKSender implements Runnable { if (peer != null) { long lastSend = peer.getLastACKSend(); long wanted = peer.getWantedACKSendSince(); - List ackBitfields = peer.retrieveACKBitfields(); + List ackBitfields = peer.retrieveACKBitfields(false); if (wanted < 0) _log.error("wtf, why are we acking something they dont want? remaining=" + remaining + ", peer=" + peer + ", bitfields=" + ackBitfields); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index cf842f1caf1ca511b208eb4059da972c290947c5..3baf7317a0ff1f72bc961ded9a7ff85b794fbf4d 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -17,6 +17,7 @@ import net.i2p.data.Signature; import net.i2p.data.i2np.DatabaseStoreMessage; import net.i2p.router.CommSystemFacade; import net.i2p.router.OutNetMessage; +import net.i2p.router.Router; import net.i2p.router.RouterContext; import net.i2p.util.I2PThread; import net.i2p.util.Log; @@ -137,8 +138,10 @@ public class EstablishmentManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Add outobund establish state to: " + to); + OutboundEstablishState state = null; + int deferred = 0; synchronized (_outboundStates) { - OutboundEstablishState state = (OutboundEstablishState)_outboundStates.get(to); + state = (OutboundEstablishState)_outboundStates.get(to); if (state == null) { if (_outboundStates.size() >= getMaxConcurrentEstablish()) { List queued = (List)_queuedOutbound.get(to); @@ -147,6 +150,7 @@ public class EstablishmentManager { _queuedOutbound.put(to, queued); } queued.add(msg); + deferred = _queuedOutbound.size(); } else { state = new OutboundEstablishState(_context, remAddr, port, msg.getTarget().getIdentity(), @@ -163,6 +167,10 @@ public class EstablishmentManager { } } + if (deferred > 0) + msg.timestamp("too many deferred establishers: " + deferred); + else if (state != null) + msg.timestamp("establish state already waiting " + state.getLifetime()); notifyActivity(); } @@ -312,8 +320,11 @@ public class EstablishmentManager { new SessionKey(addr.getIntroKey()), addr); _outboundStates.put(to, qstate); - for (int i = 0; i < queued.size(); i++) - qstate.addMessage((OutNetMessage)queued.get(i)); + for (int i = 0; i < queued.size(); i++) { + OutNetMessage m = (OutNetMessage)queued.get(i); + m.timestamp("no longer deferred... establishing"); + qstate.addMessage(m); + } admitted++; } return admitted; @@ -388,11 +399,19 @@ public class EstablishmentManager { _context.statManager().addRateData("udp.outboundEstablishTime", state.getLifetime(), 0); sendOurInfo(peer); + int i = 0; while (true) { OutNetMessage msg = state.getNextQueuedMessage(); if (msg == null) break; - _transport.send(msg); + if (now - Router.CLOCK_FUDGE_FACTOR > msg.getExpiration()) { + msg.timestamp("took too long but established..."); + _transport.failed(msg); + } else { + msg.timestamp("session fully established and sent " + i); + _transport.send(msg); + } + i++; } return peer; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index 0d0c1b15b42d8b9498854eb0ad6c3201516861d7..19608a2ecb5b8f5b47bbf30c995aa7f3b465b790 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -43,13 +43,24 @@ public class PacketBuilder { /** * @param ackIdsRemaining list of messageIds (Long) that should be acked by this packet. * The list itself is passed by reference, and if a messageId is - * included, it should be removed from the list. + * transmitted and the sender does not want the ID to be included + * in subsequent acks, it should be removed from the list. NOTE: + * right now this does NOT remove the IDs, which means it assumes + * that the IDs will be transmitted potentially multiple times, + * and should otherwise be removed from the list. * @param partialACKsRemaining list of messageIds (ACKBitfield) that should be acked by this packet. * The list itself is passed by reference, and if a messageId is * included, it should be removed from the list. */ public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) { UDPPacket packet = UDPPacket.acquire(_context); + + StringBuffer msg = null; + boolean acksIncluded = false; + if (_log.shouldLog(Log.WARN)) { + msg = new StringBuffer(128); + msg.append("building data packet with acks to ").append(peer.getRemotePeer().toBase64().substring(0,6)); + } byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); @@ -81,10 +92,14 @@ public class PacketBuilder { if ( (ackIdsRemaining != null) && (ackIdsRemaining.size() > 0) ) { DataHelper.toLong(data, off, 1, ackIdsRemaining.size()); off++; - while (ackIdsRemaining.size() > 0) { - Long ackId = (Long)ackIdsRemaining.remove(0); + for (int i = 0; i < ackIdsRemaining.size(); i++) { + //while (ackIdsRemaining.size() > 0) { + Long ackId = (Long)ackIdsRemaining.get(i);//(Long)ackIdsRemaining.remove(0); DataHelper.toLong(data, off, 4, ackId.longValue()); - off += 4; + off += 4; + if (msg != null) // logging it + msg.append(" full ack: ").append(ackId.longValue()); + acksIncluded = true; } } @@ -111,12 +126,18 @@ public class PacketBuilder { off++; } partialACKsRemaining.remove(i); + if (msg != null) // logging it + msg.append(" partial ack: ").append(bitfield); + acksIncluded = true; i--; } // now jump back and fill in the number of bitfields *actually* included DataHelper.toLong(data, numPartialOffset, 1, origNumRemaining - partialACKsRemaining.size()); } + if ( (msg != null) && (acksIncluded) ) + _log.warn(msg.toString()); + DataHelper.toLong(data, off, 1, 1); // only one fragment in this message off++; @@ -171,6 +192,12 @@ public class PacketBuilder { public UDPPacket buildACK(PeerState peer, List ackBitfields) { UDPPacket packet = UDPPacket.acquire(_context); + StringBuffer msg = null; + if (_log.shouldLog(Log.WARN)) { + msg = new StringBuffer(128); + msg.append("building ACK packet to ").append(peer.getRemotePeer().toBase64().substring(0,6)); + } + byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -207,6 +234,8 @@ public class PacketBuilder { if (bf.receivedComplete()) { DataHelper.toLong(data, off, 4, bf.getMessageId()); off += 4; + if (msg != null) // logging it + msg.append(" full ack: ").append(bf.getMessageId()); } } } @@ -231,12 +260,18 @@ public class PacketBuilder { } off++; } + + if (msg != null) // logging it + msg.append(" partial ack: ").append(bitfield); } } DataHelper.toLong(data, off, 1, 0); // no fragments in this message off++; + if (msg != null) + _log.warn(msg.toString()); + // we can pad here if we want, maybe randomized? // pad up so we're on the encryption boundary @@ -777,8 +812,8 @@ public class PacketBuilder { System.arraycopy(ourIntroKey.getData(), 0, data, off, SessionKey.KEYSIZE_BYTES); off += SessionKey.KEYSIZE_BYTES; - if (_log.shouldLog(Log.WARN)) - _log.warn("wrote alice intro key: " + Base64.encode(data, off-SessionKey.KEYSIZE_BYTES, SessionKey.KEYSIZE_BYTES) + if (_log.shouldLog(Log.DEBUG)) + _log.debug("wrote alice intro key: " + Base64.encode(data, off-SessionKey.KEYSIZE_BYTES, SessionKey.KEYSIZE_BYTES) + " with nonce " + introNonce + " size=" + (off+4 + (16 - (off+4)%16)) + " and data: " + Base64.encode(data, 0, off)); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 5bb01c20f17cd7c4a430116822edc228786b50ee..942e71c385d9466d4a809e0f9dd7aa52eaebfd59 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -75,6 +75,12 @@ public class PeerState { private long _lastFailedSendPeriod; /** list of messageIds (Long) that we have received but not yet sent */ private List _currentACKs; + /** + * list of the most recent messageIds (Long) that we have received and sent + * an ACK for. We keep a few of these around to retransmit with _currentACKs, + * hopefully saving some spurious retransmissions + */ + private List _currentACKsResend; /** when did we last send ACKs to the peer? */ private volatile long _lastACKSend; /** when did we decide we need to ACK to this peer? */ @@ -173,7 +179,7 @@ public class PeerState { */ private static final int DEFAULT_MTU = 608;//600; //1500; private static final int MIN_RTO = 500 + ACKSender.ACK_FREQUENCY; - private static final int MAX_RTO = 5000; // 5000; + private static final int MAX_RTO = 1200; // 5000; public PeerState(I2PAppContext ctx) { _context = ctx; @@ -191,6 +197,7 @@ public class PeerState { _lastSendTime = -1; _lastReceiveTime = -1; _currentACKs = new ArrayList(8); + _currentACKsResend = new ArrayList(8); _currentSecondECNReceived = false; _remoteWantsPreviousACKs = false; _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; @@ -536,16 +543,25 @@ public class PeerState { */ public List getCurrentFullACKs() { synchronized (_currentACKs) { - return new ArrayList(_currentACKs); + ArrayList rv = new ArrayList(_currentACKs); + // include some for retransmission + rv.addAll(_currentACKsResend); + return rv; } } public void removeACKMessage(Long messageId) { synchronized (_currentACKs) { _currentACKs.remove(messageId); + _currentACKsResend.add(messageId); + // trim down the resends + while (_currentACKsResend.size() > MAX_RESEND_ACKS) + _currentACKsResend.remove(0); } _lastACKSend = _context.clock().now(); } + private static final int MAX_RESEND_ACKS = 8; + /** * grab a list of ACKBitfield instances, some of which may fully * ACK a message while others may only partially ACK a message. @@ -555,20 +571,34 @@ public class PeerState { * will be unchanged if there are ACKs remaining. * */ - public List retrieveACKBitfields() { + public List retrieveACKBitfields() { return retrieveACKBitfields(true); } + public List retrieveACKBitfields(boolean alwaysIncludeRetransmissions) { List rv = null; int bytesRemaining = countMaxACKData(); synchronized (_currentACKs) { rv = new ArrayList(_currentACKs.size()); + int oldIndex = _currentACKsResend.size(); while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { - long id = ((Long)_currentACKs.remove(0)).longValue(); + Long val = (Long)_currentACKs.remove(0); + long id = val.longValue(); rv.add(new FullACKBitfield(id)); + _currentACKsResend.add(val); bytesRemaining -= 4; } if (_currentACKs.size() <= 0) _wantACKSendSince = -1; + if (alwaysIncludeRetransmissions || rv.size() > 0) { + // now repeat by putting in some old ACKs + for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) { + rv.add(new FullACKBitfield(((Long)_currentACKsResend.get(i)).longValue())); + bytesRemaining -= 4; + } + } + // trim down the resends + while (_currentACKsResend.size() > MAX_RESEND_ACKS) + _currentACKsResend.remove(0); } - + int partialIncluded = 0; if (bytesRemaining > 4) { // ok, there's room to *try* to fit in some partial ACKs, so @@ -674,7 +704,7 @@ public class PeerState { _messagesSent++; if (numSends < 2) recalculateTimeouts(lifetime); - else + else if (_log.shouldLog(Log.WARN)) _log.warn("acked after numSends=" + numSends + " w/ lifetime=" + lifetime + " and size=" + bytesACKed); _context.statManager().addRateData("udp.sendBps", _sendBps, lifetime); diff --git a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java index 97cf9d9fc426d7a473cf7789bbe230f64d66ab75..ed18a452af442e2e090840cf9421c820a54d41d4 100644 --- a/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java +++ b/router/java/src/net/i2p/router/transport/udp/TimedWeightedPriorityMessageQueue.java @@ -104,6 +104,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound _addedSincePassBegan = true; _nextLock.notifyAll(); } + message.timestamp("added to queue " + queue); } /** @@ -138,10 +139,14 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound _messagesFlushed[currentQueue] = 0; _nextQueue = (currentQueue + 1) % _queue.length; } - _context.statManager().addRateData("udp.messageQueueSize", _queue[currentQueue].size(), currentQueue); + int sz = _queue[currentQueue].size(); + _context.statManager().addRateData("udp.messageQueueSize", sz, currentQueue); if (_log.shouldLog(Log.DEBUG)) _log.debug("Pulling a message off queue " + currentQueue + " with " - + _queue[currentQueue].size() + " remaining"); + + sz + " remaining"); + + + msg.timestamp("made active with remaining queue size " + sz); return msg; } @@ -247,6 +252,7 @@ public class TimedWeightedPriorityMessageQueue implements MessageQueue, Outbound for (int i = 0; i < removed.size(); i++) { OutNetMessage m = (OutNetMessage)removed.get(i); + m.timestamp("expirer killed it"); _listener.failed(m); } removed.clear(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index b481381680544b23ca91dfcad315edd664127126..b80383e764ec1108885826664213ab11e7b4f11e 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -506,9 +506,12 @@ public class UDPPacketReader { buf.append(getMessageId()); buf.append(" with ACKs for: "); int numFrags = fragmentCount(); - for (int i = 0; i < numFrags; i++) + for (int i = 0; i < numFrags; i++) { if (received(i)) buf.append(i).append(" "); + else + buf.append('!').append(i).append(" "); + } return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index c674de151785a1cdac5c04682b3e88376733fece..07be8e5c9b8e930b9d9f3164e3b4f9c328cfa04b 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -669,7 +669,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (msg == null) return; if (msg.getTarget() == null) return; if (msg.getTarget().getIdentity() == null) return; - + + msg.timestamp("sending on UDP transport"); Hash to = msg.getTarget().getIdentity().calculateHash(); PeerState peer = getPeerState(to); if (peer != null) { @@ -682,13 +683,16 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority (peer.getConsecutiveFailedSends() > 0) ) { // peer is waaaay idle, drop the con and queue it up as a new con dropPeer(peer, false); + msg.timestamp("peer is really idle, dropping con and reestablishing"); _establisher.establish(msg); _context.statManager().addRateData("udp.proactiveReestablish", now-lastSend, now-peer.getKeyEstablishedTime()); return; } } + msg.timestamp("enqueueing for an already established peer"); _outboundMessages.add(msg); } else { + msg.timestamp("establishing a new connection"); _establisher.establish(msg); } } @@ -846,6 +850,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if ( (msg.getPeer() != null) && ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || (msg.isExpired())) ) { + OutNetMessage m = msg.getMessage(); + if (m != null) + m.timestamp("message failure - volleys = " + msg.getMaxSends() + + " lastReceived: " + (_context.clock().now() - msg.getPeer().getLastReceiveTime()) + + " lastSentFully: " + (_context.clock().now() - msg.getPeer().getLastSendFullyTime()) + + " expired? " + msg.isExpired()); consecutive = msg.getPeer().incrementConsecutiveFailedSends(); if (_log.shouldLog(Log.WARN)) _log.warn("Consecutive failure #" + consecutive + " sending to " + msg.getPeer());