From baa89c5bbf46e05f2b2cd15f40f5b556c4f62a1a Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 19:10:14 +0000 Subject: [PATCH 01/17] * OCMOSJ, ElG, Streaming: log tweaks --- .../client/streaming/MessageInputStream.java | 6 +-- .../client/streaming/MessageOutputStream.java | 4 +- .../src/net/i2p/crypto/ElGamalAESEngine.java | 4 +- .../src/net/i2p/data/i2np/GarlicMessage.java | 2 +- .../router/message/GarlicMessageBuilder.java | 6 +-- .../router/message/GarlicMessageParser.java | 8 ++-- .../OutboundClientMessageJobHelper.java | 25 ++++++------ .../OutboundClientMessageOneShotJob.java | 38 +++++++++---------- 8 files changed, 45 insertions(+), 48 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 3dc510fd52..956eb9b510 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -157,8 +157,8 @@ class MessageInputStream extends InputStream { */ public int getReadTimeout() { return _readTimeout; } public void setReadTimeout(int timeout) { - if (_log.shouldLog(Log.INFO)) - _log.info("Changing read timeout from " + _readTimeout + " to " + timeout); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout); _readTimeout = timeout; } @@ -373,7 +373,7 @@ class MessageInputStream extends InputStream { } } if (_log.shouldLog(Log.DEBUG)) - _log.debug("available(): " + numBytes + " " + toString()); + _log.debug("available(): " + numBytes); return numBytes; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 96a3ebe557..3a4cd42060 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -81,8 +81,8 @@ class MessageOutputStream extends OutputStream { } public void setWriteTimeout(int ms) { - if (_log.shouldLog(Log.INFO)) - _log.info("Changing write timeout from " + _writeTimeout + " to " + ms); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Changing write timeout from " + _writeTimeout + " to " + ms); _writeTimeout = ms; } diff --git a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java index 755b688a25..c31a204823 100644 --- a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java @@ -100,7 +100,7 @@ public class ElGamalAESEngine { //if (_log.shouldLog(Log.DEBUG)) _log.debug("Key is known for tag " + st); long id = _context.random().nextLong(); if (_log.shouldLog(Log.DEBUG)) - _log.debug(id + ": Decrypting existing session encrypted with tag: " + st.toString() + ": key: " + key.toBase64() + ": " + data.length + " bytes: " + Base64.encode(data, 0, 64)); + _log.debug(id + ": Decrypting existing session encrypted with tag: " + st.toString() + ": key: " + key.toBase64() + ": " + data.length + " bytes " /* + Base64.encode(data, 0, 64) */ ); decrypted = decryptExistingSession(data, key, targetPrivateKey, foundTags, usedKey, foundKey); if (decrypted != null) { @@ -410,7 +410,7 @@ public class ElGamalAESEngine { _context.statManager().updateFrequency("crypto.elGamalAES.encryptExistingSession"); byte rv[] = encryptExistingSession(data, target, key, tagsForDelivery, currentTag, newKey, paddedSize); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Existing session encrypted with tag: " + currentTag.toString() + ": " + rv.length + " bytes and key: " + key.toBase64() + ": " + Base64.encode(rv, 0, 64)); + _log.debug("Existing session encrypted with tag: " + currentTag.toString() + ": " + rv.length + " bytes and key: " + key.toBase64() /* + ": " + Base64.encode(rv, 0, 64) */); return rv; } diff --git a/router/java/src/net/i2p/data/i2np/GarlicMessage.java b/router/java/src/net/i2p/data/i2np/GarlicMessage.java index c4cfd29c9e..07621c462c 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicMessage.java +++ b/router/java/src/net/i2p/data/i2np/GarlicMessage.java @@ -83,7 +83,7 @@ public class GarlicMessage extends FastI2NPMessageImpl { public String toString() { StringBuilder buf = new StringBuilder(); buf.append("[GarlicMessage: "); - buf.append("\n\tData length: ").append(getData().length).append(" bytes"); + buf.append("Data length: ").append(getData().length).append(" bytes"); buf.append("]"); return buf.toString(); } diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index 0f01d53c62..ad7c63627e 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -233,8 +233,8 @@ public class GarlicMessageBuilder { } if (log.shouldLog(Log.DEBUG)) - log.debug("CloveSet size for message " + msg.getUniqueId() + " is " + cloveSet.length - + " and encrypted message data is " + encData.length); + log.debug("CloveSet (" + config.getCloveCount() + " cloves) for message " + msg.getUniqueId() + " is " + cloveSet.length + + " bytes and encrypted message data is " + encData.length + " bytes"); return msg; } @@ -268,7 +268,7 @@ public class GarlicMessageBuilder { for (int i = 0; i < config.getCloveCount(); i++) { GarlicConfig c = config.getClove(i); if (c instanceof PayloadGarlicConfig) { - log.debug("Subclove IS a payload garlic clove"); + //log.debug("Subclove IS a payload garlic clove"); cloves[i] = buildClove(ctx, (PayloadGarlicConfig)c); } else { log.debug("Subclove IS NOT a payload garlic clove"); diff --git a/router/java/src/net/i2p/router/message/GarlicMessageParser.java b/router/java/src/net/i2p/router/message/GarlicMessageParser.java index 4885d21c11..083ffd8c37 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageParser.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageParser.java @@ -71,13 +71,13 @@ class GarlicMessageParser { if (_log.shouldLog(Log.DEBUG)) _log.debug("# cloves to read: " + numCloves); for (int i = 0; i < numCloves; i++) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Reading clove " + i); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Reading clove " + i); GarlicClove clove = new GarlicClove(_context); offset += clove.readBytes(data, offset); set.addClove(clove); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("After reading clove " + i); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("After reading clove " + i); } //Certificate cert = new Certificate(); //offset += cert.readBytes(data, offset); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java index cf8a0dfc9e..7941616f5f 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java @@ -88,7 +88,7 @@ class OutboundClientMessageJobHelper { PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, boolean requireAck, LeaseSet bundledReplyLeaseSet) { Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class); - if (log.shouldLog(Log.DEBUG)) + if (replyToken >= 0 && log.shouldLog(Log.DEBUG)) log.debug("Reply token: " + replyToken); GarlicConfig config = new GarlicConfig(); @@ -136,20 +136,17 @@ class OutboundClientMessageJobHelper { Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class); PayloadGarlicConfig ackClove = new PayloadGarlicConfig(); - Hash replyToTunnelRouter = null; // inbound tunnel gateway - TunnelId replyToTunnelId = null; // tunnel id on that gateway - if (replyToTunnel == null) { if (log.shouldLog(Log.WARN)) log.warn("Unable to send client message from " + from.toBase64() + ", as there are no inbound tunnels available"); return null; } - replyToTunnelId = replyToTunnel.getReceiveTunnelId(0); - replyToTunnelRouter = replyToTunnel.getPeer(0); + TunnelId replyToTunnelId = replyToTunnel.getReceiveTunnelId(0); // tunnel id on that gateway + Hash replyToTunnelRouter = replyToTunnel.getPeer(0); // inbound tunnel gateway if (log.shouldLog(Log.DEBUG)) log.debug("Ack for the data message will come back along tunnel " + replyToTunnelId - + ":\n" + replyToTunnel); + + ": " + replyToTunnel); DeliveryInstructions ackInstructions = new DeliveryInstructions(); ackInstructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_TUNNEL); @@ -163,8 +160,8 @@ class OutboundClientMessageJobHelper { DeliveryStatusMessage msg = new DeliveryStatusMessage(ctx); msg.setArrival(ctx.clock().now()); msg.setMessageId(replyToken); - if (log.shouldLog(Log.DEBUG)) - log.debug("Delivery status message key: " + replyToken + " arrival: " + msg.getArrival()); + //if (log.shouldLog(Log.DEBUG)) + // log.debug("Delivery status message key: " + replyToken + " arrival: " + msg.getArrival()); ackClove.setCertificate(Certificate.NULL_CERT); ackClove.setDeliveryInstructions(ackInstructions); @@ -175,11 +172,11 @@ class OutboundClientMessageJobHelper { // defaults //ackClove.setRequestAck(false); - if (log.shouldLog(Log.DEBUG)) - log.debug("Delivery status message is targetting us [" - + ackClove.getRecipient().getIdentity().getHash().toBase64() - + "] via tunnel " + replyToTunnelId.getTunnelId() + " on " - + replyToTunnelRouter.toBase64()); + //if (log.shouldLog(Log.DEBUG)) + // log.debug("Delivery status message is targetting us [" + // + ackClove.getRecipient().getIdentity().getHash().toBase64() + // + "] via tunnel " + replyToTunnelId.getTunnelId() + " on " + // + replyToTunnelRouter.toBase64()); return ackClove; } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 4896e57d46..ea0664ec61 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -150,8 +150,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } overallExpiration = timeoutMs + _start; - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + " Default Expiration (ms): " + timeoutMs); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + " Default Expiration (ms): " + timeoutMs); } _overallExpiration = overallExpiration; } @@ -182,9 +182,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { dieFatal(); return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Send outbound client message job beginning" + - ": preparing to search for the leaseSet for " + _toString); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Send outbound client message job beginning" + + // ": preparing to search for the leaseSet for " + _toString); long timeoutMs = _overallExpiration - now; Hash key = _to.calculateHash(); SendJob success = new SendJob(getContext()); @@ -474,8 +474,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { dieFatal(); return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Clove built to " + _toString); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Clove built to " + _toString); long msgExpiration = _overallExpiration; // getContext().clock().now() + OVERALL_TIMEOUT_MS_DEFAULT; GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token, msgExpiration, key, @@ -494,8 +494,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString); SendSuccessJob onReply = null; SendTimeoutJob onFail = null; @@ -515,14 +515,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for " + _log.debug(getJobId() + ": GarlicMessage in new tunnel msg for " + _toString + " at " + _lease.getTunnelId() + " on " + _lease.getGateway()); if (_outTunnel != null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Sending tunnel message out " + _outTunnel.getSendTunnelId(0) + " to " + _log.debug(getJobId() + ": Sending msg out " + _outTunnel.getSendTunnelId(0) + " to " + _toString + " at " + _lease.getTunnelId() + " on " + _lease.getGateway()); @@ -571,9 +571,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { long before = getContext().clock().now(); getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway()); long dispatchSendTime = getContext().clock().now() - before; - if (_log.shouldLog(Log.INFO)) - _log.info(OutboundClientMessageOneShotJob.this.getJobId() + - ": Dispatching message to " + _toString + " complete"); + //if (_log.shouldLog(Log.INFO)) + // _log.info(OutboundClientMessageOneShotJob.this.getJobId() + + // ": Dispatching message to " + _toString + " complete"); getContext().statManager().addRateData("client.dispatchTime", getContext().clock().now() - _start, 0); getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0); } @@ -728,8 +728,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _clove = clove; _cloveId = _clove.getId(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Built payload clove with id " + clove.getId()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Built payload clove with id " + clove.getId()); return true; } @@ -858,9 +858,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public String getName() { return "Outbound client message send timeout"; } public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info(OutboundClientMessageOneShotJob.this.getJobId() - + ": Soft timeout through the lease " + _lease); + //if (_log.shouldLog(Log.INFO)) + // _log.info(OutboundClientMessageOneShotJob.this.getJobId() + // + ": Soft timeout through the lease " + _lease); // unused //_lease.setNumFailure(_lease.getNumFailure()+1); From 6635448bda05e0968091ec34b63441b24c373ce7 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 19:52:00 +0000 Subject: [PATCH 02/17] * I2CP: Make separate message ID counters per-destination, use atomic, increase max (could have caused "local loopback" problems) --- .../router/client/ClientConnectionRunner.java | 24 +++++++++---------- .../i2p/router/client/MessageReceivedJob.java | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 8597c52b2b..83d5d51d5e 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import net.i2p.client.I2PClient; import net.i2p.crypto.SessionKeyManager; @@ -86,7 +87,14 @@ class ClientConnectionRunner { private boolean _dead; /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ private boolean _dontSendMSM; + private final AtomicInteger _messageId; // messageId counter + // Was 32767 since the beginning (04-2004). + // But it's 4 bytes in the I2CP spec and stored as a long in MessageID.... + // If this is too low and wraps around, I2CP VerifyUsage could delete the wrong message, + // e.g. on local access + private static final int MAX_MESSAGE_ID = 0x4000000; + /** * Create a new runner against the given socket * @@ -99,6 +107,7 @@ class ClientConnectionRunner { _messages = new ConcurrentHashMap(); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); + _messageId = new AtomicInteger(_context.random().nextInt()); } private static volatile int __id = 0; @@ -520,18 +529,9 @@ class ClientConnectionRunner { } } - // this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME - private final static int MAX_MESSAGE_ID = 32767; - private static volatile int _messageId = RandomSource.getInstance().nextInt(MAX_MESSAGE_ID); // messageId counter - private final static Object _messageIdLock = new Object(); - - static int getNextMessageId() { - synchronized (_messageIdLock) { - int messageId = (++_messageId)%MAX_MESSAGE_ID; - if (_messageId >= MAX_MESSAGE_ID) - _messageId = 0; - return messageId; - } + public int getNextMessageId() { + // Don't % so we don't get negative IDs + return _messageId.incrementAndGet() & (MAX_MESSAGE_ID - 1); } /** diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 88b1ca10b0..843ebfacea 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -36,7 +36,7 @@ class MessageReceivedJob extends JobImpl { public void runJob() { if (_runner.isDead()) return; MessageId id = new MessageId(); - id.setMessageId(ClientConnectionRunner.getNextMessageId()); + id.setMessageId(_runner.getNextMessageId()); _runner.setPayload(id, _payload); messageAvailable(id, _payload.getSize()); } From f7656b0401c49d256da6e4891d4c204bfed85944 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 19:59:29 +0000 Subject: [PATCH 03/17] * TunnelInfo: Change msg counter from long to int --- router/java/src/net/i2p/router/TunnelInfo.java | 3 ++- .../java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/router/java/src/net/i2p/router/TunnelInfo.java b/router/java/src/net/i2p/router/TunnelInfo.java index 7e06e3463d..be200069f7 100644 --- a/router/java/src/net/i2p/router/TunnelInfo.java +++ b/router/java/src/net/i2p/router/TunnelInfo.java @@ -68,10 +68,11 @@ public interface TunnelInfo { */ public void testSuccessful(int responseTime); - public long getProcessedMessagesCount(); + public int getProcessedMessagesCount(); /** we know for sure that this many bytes travelled through the tunnel in its lifetime */ public long getVerifiedBytesTransferred(); + /** we know for sure that the given number of bytes were sent down the tunnel fully */ public void incrementVerifiedBytesTransferred(int numBytes); diff --git a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java index 68d733f82c..5306179a35 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java @@ -29,7 +29,7 @@ public class TunnelCreatorConfig implements TunnelInfo { private List _order; private long _replyMessageId; private final boolean _isInbound; - private long _messagesProcessed; + private int _messagesProcessed; private volatile long _verifiedBytesTransferred; private boolean _failed; private int _failures; @@ -127,7 +127,7 @@ public class TunnelCreatorConfig implements TunnelInfo { /** take note of a message being pumped through this tunnel */ public void incrementProcessedMessages() { _messagesProcessed++; } - public long getProcessedMessagesCount() { return _messagesProcessed; } + public int getProcessedMessagesCount() { return _messagesProcessed; } public void incrementVerifiedBytesTransferred(int bytes) { _verifiedBytesTransferred += bytes; @@ -144,6 +144,7 @@ public class TunnelCreatorConfig implements TunnelInfo { _context.profileManager().tunnelDataPushed1m(_peers[i], (int)normalized); } } + public long getVerifiedBytesTransferred() { return _verifiedBytesTransferred; } private static final int THROUGHPUT_COUNT = 3; From d148efd4589bb4ad3e289b88e5008377370bcbc9 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 20:08:02 +0000 Subject: [PATCH 04/17] * TunnelPoolManager: Use one ClientPeerSelector for all pools --- history.txt | 21 +++++++++++++++++++ .../src/net/i2p/router/RouterVersion.java | 2 +- .../router/tunnel/pool/TunnelPoolManager.java | 8 +++---- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/history.txt b/history.txt index 6734f3335d..1ed3d87e13 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,24 @@ +2012-06-21 zzz + * I2CP: Make separate message ID counters per-destination, use atomic, + increase max (could have caused "local loopback" problems) + * OCMOSJ, ElG, Streaming: log tweaks + * TunnelInfo: Change msg counter from long to int + * TunnelPoolManager: Use one ClientPeerSelector for all pools + +2012-06-20 zzz + * I2PSession: + - Greatly simplify the VerifyUsage timers + - Constructor cleanup + +2012-06-19 zzz + * i2psnark: + - Hide buttons while stopping all + * Socks: Pass remote port through + * Streaming: + - Listen only on local port if set + - Listen only for streaming protocol if configured (new option) + - Javadocs re: ports + 2012-06-18 zzz * i2psnark: - Improve torrent shutdown handling to maximize chance of diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e1f87e077d..8ab24107c9 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 16; + public final static long BUILD = 17; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 671a3e16bc..66080501ca 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -41,6 +41,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { private TunnelPool _outboundExploratory; private final BuildExecutor _executor; private final BuildHandler _handler; + private final TunnelPeerSelector _clientPeerSelector; private boolean _isShutdown; private final int _numHandlerThreads; private static final long[] RATES = { 60*1000, 10*60*1000l, 60*60*1000l }; @@ -60,6 +61,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientInboundPools = new ConcurrentHashMap(4); _clientOutboundPools = new ConcurrentHashMap(4); + _clientPeerSelector = new ClientPeerSelector(); _executor = new BuildExecutor(ctx, this); I2PThread execThread = new I2PThread(_executor, "BuildExecutor", true); @@ -407,8 +409,6 @@ public class TunnelPoolManager implements TunnelManagerFacade { settings.getOutboundSettings().setDestination(dest); TunnelPool inbound = null; TunnelPool outbound = null; - // should we share the clientPeerSelector across both inbound and outbound? - // or just one for all clients? why separate? boolean delayOutbound = false; // synch with removeTunnels() below @@ -416,7 +416,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { inbound = _clientInboundPools.get(dest); if (inbound == null) { inbound = new TunnelPool(_context, this, settings.getInboundSettings(), - new ClientPeerSelector()); + _clientPeerSelector); _clientInboundPools.put(dest, inbound); } else { inbound.setSettings(settings.getInboundSettings()); @@ -424,7 +424,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { outbound = _clientOutboundPools.get(dest); if (outbound == null) { outbound = new TunnelPool(_context, this, settings.getOutboundSettings(), - new ClientPeerSelector()); + _clientPeerSelector); _clientOutboundPools.put(dest, outbound); delayOutbound = true; } else { From 4e4634496a6a63703de9b4fa27323bc38e909e0e Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 20:26:29 +0000 Subject: [PATCH 05/17] * TunnelPool: Fix bug where a tunnel was marked as reused when it wasn't --- history.txt | 1 + router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java | 1 + 2 files changed, 2 insertions(+) diff --git a/history.txt b/history.txt index 1ed3d87e13..0dad0b1349 100644 --- a/history.txt +++ b/history.txt @@ -3,6 +3,7 @@ increase max (could have caused "local loopback" problems) * OCMOSJ, ElG, Streaming: log tweaks * TunnelInfo: Change msg counter from long to int + * TunnelPool: Fix bug where a tunnel was marked as reused when it wasn't * TunnelPoolManager: Use one ClientPeerSelector for all pools 2012-06-20 zzz diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 4b4b4afe04..9bb01fd38a 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -1086,6 +1086,7 @@ public class TunnelPool { for (int i = len - 1; i >= 0; i--) { peers.add(ti.getPeer(i)); } + break; } } } From 829e3f47ffb36e90f102af71d93baaf231a2845e Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 20:52:39 +0000 Subject: [PATCH 06/17] Store context in the PeerSelector so we don't have to pass it around --- .../tunnel/pool/ClientPeerSelector.java | 19 +++++++--- .../tunnel/pool/ExploratoryPeerSelector.java | 35 +++++++++++-------- .../tunnel/pool/TunnelPeerSelector.java | 18 ++++++---- .../i2p/router/tunnel/pool/TunnelPool.java | 2 +- .../router/tunnel/pool/TunnelPoolManager.java | 4 +-- 5 files changed, 50 insertions(+), 28 deletions(-) diff --git a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java index 24e6966a12..468cacaf26 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java @@ -15,8 +15,13 @@ import net.i2p.router.TunnelPoolSettings; * */ class ClientPeerSelector extends TunnelPeerSelector { - public List selectPeers(RouterContext ctx, TunnelPoolSettings settings) { - int length = getLength(ctx, settings); + + public ClientPeerSelector(RouterContext context) { + super(context); + } + + public List selectPeers(TunnelPoolSettings settings) { + int length = getLength(settings); if (length < 0) return null; if ( (length == 0) && (settings.getLength()+settings.getLengthVariance() > 0) ) @@ -26,9 +31,9 @@ class ClientPeerSelector extends TunnelPeerSelector { if (length > 0) { if (shouldSelectExplicit(settings)) - return selectExplicit(ctx, settings, length); + return selectExplicit(settings, length); - Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory()); + Set exclude = getExclude(settings.isInbound(), false); Set matches = new HashSet(length); if (length == 1) { ctx.profileOrganizer().selectFastPeers(length, exclude, matches, 0); @@ -41,6 +46,9 @@ class ClientPeerSelector extends TunnelPeerSelector { rv = new ArrayList(length + 1); // OBEP or IB last hop // group 0 or 1 if two hops, otherwise group 0 + if (!settings.isInbound()) { + // exclude existing OBEPs to get some diversity + } ctx.profileOrganizer().selectFastPeers(1, exclude, matches, settings.getRandomKey(), length == 2 ? 2 : 4); matches.remove(ctx.routerHash()); exclude.addAll(matches); @@ -64,6 +72,9 @@ class ClientPeerSelector extends TunnelPeerSelector { } // IBGW or OB first hop // group 2 or 3 if two hops, otherwise group 1 + if (settings.isInbound()) { + // exclude existing IBGWs to get some diversity + } ctx.profileOrganizer().selectFastPeers(1, exclude, matches, settings.getRandomKey(), length == 2 ? 3 : 5); matches.remove(ctx.routerHash()); rv.addAll(matches); diff --git a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java index 1f05727dc5..ed3837ec44 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java @@ -18,9 +18,14 @@ import net.i2p.util.Log; * */ class ExploratoryPeerSelector extends TunnelPeerSelector { - public List selectPeers(RouterContext ctx, TunnelPoolSettings settings) { + + public ExploratoryPeerSelector(RouterContext context) { + super(context); + } + + public List selectPeers(TunnelPoolSettings settings) { Log l = ctx.logManager().getLog(getClass()); - int length = getLength(ctx, settings); + int length = getLength(settings); if (length < 0) { if (l.shouldLog(Log.DEBUG)) l.debug("Length requested is zero: " + settings); @@ -28,13 +33,13 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { } if (false && shouldSelectExplicit(settings)) { - List rv = selectExplicit(ctx, settings, length); + List rv = selectExplicit(settings, length); if (l.shouldLog(Log.DEBUG)) l.debug("Explicit peers selected: " + rv); return rv; } - Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory()); + Set exclude = getExclude(settings.isInbound(), true); exclude.add(ctx.routerHash()); // Don't use ff peers for exploratory tunnels to lessen exposure to netDb searches and stores // Hmm if they don't get explored they don't get a speed/capacity rating @@ -42,7 +47,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { // FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)ctx.netDb(); // exclude.addAll(fac.getFloodfillPeers()); HashSet matches = new HashSet(length); - boolean exploreHighCap = shouldPickHighCap(ctx); + boolean exploreHighCap = shouldPickHighCap(); // // We don't honor IP Restriction here, to be fixed @@ -84,7 +89,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { * build success rate is much worse, return true so that reliability * is maintained. */ - private static boolean shouldPickHighCap(RouterContext ctx) { + private boolean shouldPickHighCap() { if (ctx.getBooleanProperty("router.exploreHighCapacity")) return true; @@ -118,7 +123,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { if (ctx.router().getUptime() <= 11*60*1000) { failPct = 100 - MIN_NONFAILING_PCT; } else { - failPct = getExploratoryFailPercentage(ctx); + failPct = getExploratoryFailPercentage(); //Log l = ctx.logManager().getLog(getClass()); //if (l.shouldLog(Log.DEBUG)) // l.debug("Normalized Fail pct: " + failPct); @@ -140,9 +145,9 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { * Even this isn't the "true" rate for the NonFailingPeers pool, since we * are often building exploratory tunnels using the HighCapacity pool. */ - private static int getExploratoryFailPercentage(RouterContext ctx) { - int c = getFailPercentage(ctx, "Client"); - int e = getFailPercentage(ctx, "Exploratory"); + private int getExploratoryFailPercentage() { + int c = getFailPercentage("Client"); + int e = getFailPercentage("Exploratory"); //Log l = ctx.logManager().getLog(getClass()); //if (l.shouldLog(Log.DEBUG)) // l.debug("Client, Expl. Fail pct: " + c + ", " + e); @@ -154,11 +159,11 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { return (100 * (e-c)) / (100-c); } - private static int getFailPercentage(RouterContext ctx, String t) { + private int getFailPercentage(String t) { String pfx = "tunnel.build" + t; - int timeout = getEvents(ctx, pfx + "Expire", 10*60*1000); - int reject = getEvents(ctx, pfx + "Reject", 10*60*1000); - int accept = getEvents(ctx, pfx + "Success", 10*60*1000); + int timeout = getEvents(pfx + "Expire", 10*60*1000); + int reject = getEvents(pfx + "Reject", 10*60*1000); + int accept = getEvents(pfx + "Success", 10*60*1000); if (accept + reject + timeout <= 0) return 0; double pct = (double)(reject + timeout) / (accept + reject + timeout); @@ -166,7 +171,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { } /** Use current + last to get more recent and smoother data */ - private static int getEvents(RouterContext ctx, String stat, long period) { + private int getEvents(String stat, long period) { RateStat rs = ctx.statManager().getRate(stat); if (rs == null) return 0; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java index 22508a2ce9..7679ed955c 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java @@ -31,6 +31,12 @@ import net.i2p.util.VersionComparator; * Todo: there's nothing non-static in here */ public abstract class TunnelPeerSelector { + protected final RouterContext ctx; + + protected TunnelPeerSelector(RouterContext context) { + ctx = context; + } + /** * Which peers should go into the next tunnel for the given settings? * @@ -40,12 +46,12 @@ public abstract class TunnelPeerSelector { * to build through, and the settings reject 0 hop tunnels, this will * return null. */ - public abstract List selectPeers(RouterContext ctx, TunnelPoolSettings settings); + public abstract List selectPeers(TunnelPoolSettings settings); /** * @return randomized number of hops 0-7, not including ourselves */ - protected int getLength(RouterContext ctx, TunnelPoolSettings settings) { + protected int getLength(TunnelPoolSettings settings) { int length = settings.getLength(); int override = settings.getLengthOverride(); if (override >= 0) { @@ -109,7 +115,7 @@ public abstract class TunnelPeerSelector { * Needs analysis and testing * @return should always be false */ - protected List selectExplicit(RouterContext ctx, TunnelPoolSettings settings, int length) { + protected List selectExplicit(TunnelPoolSettings settings, int length) { String peers = null; Properties opts = settings.getUnknownOptions(); if (opts != null) @@ -173,7 +179,7 @@ public abstract class TunnelPeerSelector { /** * Pick peers that we want to avoid */ - public Set getExclude(RouterContext ctx, boolean isInbound, boolean isExploratory) { + public Set getExclude(boolean isInbound, boolean isExploratory) { // we may want to update this to skip 'hidden' or 'unreachable' peers, but that // isn't safe, since they may publish one set of routerInfo to us and another to // other peers. the defaults for filterUnreachable has always been to return false, @@ -196,7 +202,7 @@ public abstract class TunnelPeerSelector { peers.addAll(ctx.profileOrganizer().selectPeersRecentlyRejecting()); peers.addAll(ctx.tunnelManager().selectPeersInTooManyTunnels()); // if (false && filterUnreachable(ctx, isInbound, isExploratory)) { - if (filterUnreachable(ctx, isInbound, isExploratory)) { + if (filterUnreachable(isInbound, isExploratory)) { // NOTE: filterUnreachable returns true for inbound, false for outbound // This is the only use for getPeersByCapability? And the whole set of datastructures in PeerManager? Collection caps = ctx.peerManager().getPeersByCapability(Router.CAPABILITY_UNREACHABLE); @@ -439,7 +445,7 @@ public abstract class TunnelPeerSelector { * do we want to skip peers who haven't been up for long? * @return true for inbound, false for outbound, unless configured otherwise */ - protected boolean filterUnreachable(RouterContext ctx, boolean isInbound, boolean isExploratory) { + protected boolean filterUnreachable(boolean isInbound, boolean isExploratory) { boolean def = false; String val = null; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 9bb01fd38a..1668ecb05b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -1093,7 +1093,7 @@ public class TunnelPool { } if (peers == null) { setLengthOverride(); - peers = _peerSelector.selectPeers(_context, settings); + peers = _peerSelector.selectPeers(settings); } if ( (peers == null) || (peers.isEmpty()) ) { diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 66080501ca..a849647410 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -61,7 +61,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientInboundPools = new ConcurrentHashMap(4); _clientOutboundPools = new ConcurrentHashMap(4); - _clientPeerSelector = new ClientPeerSelector(); + _clientPeerSelector = new ClientPeerSelector(ctx); _executor = new BuildExecutor(ctx, this); I2PThread execThread = new I2PThread(_executor, "BuildExecutor", true); @@ -511,7 +511,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { t.setDaemon(true); t.start(); } - ExploratoryPeerSelector selector = new ExploratoryPeerSelector(); + ExploratoryPeerSelector selector = new ExploratoryPeerSelector(_context); TunnelPoolSettings inboundSettings = new TunnelPoolSettings(); inboundSettings.setIsExploratory(true); From 2a8adcb89ab30585db7f352768d313f9fa5aa5e6 Mon Sep 17 00:00:00 2001 From: zzz Date: Thu, 21 Jun 2012 21:16:54 +0000 Subject: [PATCH 07/17] * IRC Client: Don't flush output unless out of input, so the streaming messages don't get split up unnecessarily --- .../java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java | 3 ++- .../java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java | 4 +++- history.txt | 2 ++ 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java index ce301a48e8..5270cbb6b9 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java @@ -79,7 +79,8 @@ public class IrcInboundFilter implements Runnable { outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 output.write(outmsg.getBytes("ISO-8859-1")); // probably doesn't do much but can't hurt - output.flush(); + if (!in.ready()) + output.flush(); } else { if (_log.shouldLog(Log.WARN)) _log.warn("inbound BLOCKED: "+inmsg); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java index 5e8c57d997..5e142081af 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java @@ -79,7 +79,9 @@ public class IrcOutboundFilter implements Runnable { outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 output.write(outmsg.getBytes("ISO-8859-1")); // save 250 ms in streaming - output.flush(); + // Check ready() so we don't split the initial handshake up into multiple streaming messages + if (!in.ready()) + output.flush(); } else { if (_log.shouldLog(Log.WARN)) _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); diff --git a/history.txt b/history.txt index 0dad0b1349..760700ad44 100644 --- a/history.txt +++ b/history.txt @@ -1,6 +1,8 @@ 2012-06-21 zzz * I2CP: Make separate message ID counters per-destination, use atomic, increase max (could have caused "local loopback" problems) + * IRC Client: Don't flush output unless out of input, so the + streaming messages don't get split up unnecessarily * OCMOSJ, ElG, Streaming: log tweaks * TunnelInfo: Change msg counter from long to int * TunnelPool: Fix bug where a tunnel was marked as reused when it wasn't From 97b05b1dbfed60ec3a1955bd58a99bc18d1fa07d Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 24 Jun 2012 11:38:37 +0000 Subject: [PATCH 08/17] * I2PTunnel: Fix NPE on shared client creation, thx kytv * Transport: Add Ethiopia to hidden mode list * Log and javadoc tweaks --- .../src/net/i2p/i2ptunnel/I2PTunnelClientBase.java | 2 +- core/java/src/net/i2p/client/I2PSessionImpl.java | 2 +- history.txt | 5 +++++ router/java/src/net/i2p/router/RouterVersion.java | 2 +- .../net/i2p/router/message/SendMessageDirectJob.java | 10 ++++++++++ .../src/net/i2p/router/transport/BadCountries.java | 3 ++- 6 files changed, 20 insertions(+), 4 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 1e69e0e182..d9fea97433 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -511,7 +511,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna if (sm == null) return; Properties props = tunnel.getClientOptions(); - sm.setDefaultOptions(sockMgr.buildOptions(props)); + sm.setDefaultOptions(sm.buildOptions(props)); } /** diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 60f67236a3..9dc50b12b4 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -584,7 +584,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } else { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Message received of type " + message.getType() - + " to be handled by " + handler); + + " to be handled by " + handler.getClass().getSimpleName()); handler.handleMessage(message, this); } } diff --git a/history.txt b/history.txt index 760700ad44..ccc1e43c96 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,7 @@ +2012-06-24 zzz + * I2PTunnel: Fix NPE on shared client creation, thx kytv + * Transport: Add Ethiopia to hidden mode list + 2012-06-21 zzz * I2CP: Make separate message ID counters per-destination, use atomic, increase max (could have caused "local loopback" problems) @@ -5,6 +9,7 @@ streaming messages don't get split up unnecessarily * OCMOSJ, ElG, Streaming: log tweaks * TunnelInfo: Change msg counter from long to int + * TunnelPeerSelectors: Minor refactoring to store context * TunnelPool: Fix bug where a tunnel was marked as reused when it wasn't * TunnelPoolManager: Use one ClientPeerSelector for all pools diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 8ab24107c9..80cfb9c849 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 17; + public final static long BUILD = 18; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java index 5ded790ab8..8880a77854 100644 --- a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java +++ b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java @@ -21,6 +21,13 @@ import net.i2p.router.ReplyJob; import net.i2p.router.RouterContext; import net.i2p.util.Log; +/** + * Send a message directly to another router, i.e. not through a tunnel. + * This is safe to run inline via runJob(). + * If the RouterInfo for the Hash is not found locally, it will + * queue a lookup and register itself to be run again when the lookup + * succeeds or times out. + */ public class SendMessageDirectJob extends JobImpl { private final Log _log; private final I2NPMessage _message; @@ -39,9 +46,11 @@ public class SendMessageDirectJob extends JobImpl { public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int timeoutMs, int priority) { this(ctx, message, toPeer, null, null, null, null, timeoutMs, priority); } + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) { this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority); } + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) { super(ctx); _log = getContext().logManager().getLog(SendMessageDirectJob.class); @@ -66,6 +75,7 @@ public class SendMessageDirectJob extends JobImpl { } public String getName() { return "Send Message Direct"; } + public void runJob() { long now = getContext().clock().now(); diff --git a/router/java/src/net/i2p/router/transport/BadCountries.java b/router/java/src/net/i2p/router/transport/BadCountries.java index 1431dcb60f..f5d36b64ac 100644 --- a/router/java/src/net/i2p/router/transport/BadCountries.java +++ b/router/java/src/net/i2p/router/transport/BadCountries.java @@ -16,7 +16,7 @@ abstract class BadCountries { // zzz.i2p/topics/969 // List created based on the Press Freedom Index. Those countries with a score of higher than 50 are included: // http://en.wikipedia.org/wiki/Press_Freedom_Index - // Except: + // Except (quote): // I don't really think that is usage of I2P is dangerous in countries from CIS // General situation is really bad (like in Russia) but people here doesn't have problems with Ecnryption usage. @@ -32,6 +32,7 @@ abstract class BadCountries { /* Democratic Republic of the Congo */ "CD", /* Equatorial Guinea */ "GQ", /* Eritrea */ "ER", + /* Ethiopia */ "ET", /* Fiji */ "FJ", /* Honduras */ "HN", /* Iran */ "IR", From 1671e3b126e0a14c5346b4aa4cb6d2182d13eca7 Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 24 Jun 2012 19:23:05 +0000 Subject: [PATCH 09/17] Fix bad size estimate when tags are included in the AES block, resulting in trailing zeros after the random padding in the unencrypted AES data block. The number of zeros equaled the number of tags included (typ. 6 or 40). As the data size is rounded up to the next multiple of 16, this increased the size of the data by 0, 16, 32, or 48 bytes when tags were included. Bug introduced 2004-10-30. --- core/java/src/net/i2p/crypto/ElGamalAESEngine.java | 1 - 1 file changed, 1 deletion(-) diff --git a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java index c31a204823..8b6cce5f1d 100644 --- a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java @@ -599,7 +599,6 @@ public class ElGamalAESEngine { //_log.debug("Encrypting AES"); if (tagsForDelivery == null) tagsForDelivery = Collections.EMPTY_SET; int size = 2 // sizeof(tags) - + tagsForDelivery.size() + SessionTag.BYTE_LENGTH*tagsForDelivery.size() + 4 // payload length + Hash.HASH_LENGTH From ff0bfb9f127623b0e042167040a61b934a0dd203 Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 24 Jun 2012 19:26:23 +0000 Subject: [PATCH 10/17] * i2psnark: Don't create a new PeerCoordinator after restart, as the TrackerClient holds on to the old one and that causes it to not get peers. Possibly fixes ticket #563. --- .../java/src/org/klomp/snark/PeerCoordinator.java | 10 +++++++++- apps/i2psnark/java/src/org/klomp/snark/Snark.java | 15 ++++----------- history.txt | 6 ++++++ router/java/src/net/i2p/router/RouterVersion.java | 2 +- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 45b6ef82ac..fe924f79c6 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -125,7 +125,7 @@ class PeerCoordinator implements PeerListener /** partial pieces - lock by synching on wantedPieces - TODO store Requests, not PartialPieces */ private final List partialPieces; - private boolean halted = false; + private volatile boolean halted; private final MagnetState magnetState; private final CoordinatorListener listener; @@ -429,6 +429,14 @@ class PeerCoordinator implements PeerListener } } + /** + * @since 0.9.1 + */ + public void restart() { + halted = false; + timer.schedule((CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD)); + } + public void connected(Peer peer) { if (halted) diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index acf5f15ed5..138f2dd2c8 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -553,21 +553,14 @@ public class Snark } stopped = false; - boolean coordinatorChanged = false; if (coordinator.halted()) { - // ok, we have already started and stopped, but the coordinator seems a bit annoying to - // restart safely, so lets build a new one to replace the old + coordinator.restart(); if (_peerCoordinatorSet != null) - _peerCoordinatorSet.remove(coordinator); - PeerCoordinator newCoord = new PeerCoordinator(_util, id, infoHash, meta, storage, this, this); - if (_peerCoordinatorSet != null) - _peerCoordinatorSet.add(newCoord); - coordinator = newCoord; - coordinatorChanged = true; + _peerCoordinatorSet.add(coordinator); } - if (!trackerclient.started() && !coordinatorChanged) { + if (!trackerclient.started()) { trackerclient.start(); - } else if (trackerclient.halted() || coordinatorChanged) { + } else if (trackerclient.halted()) { if (storage != null) { try { storage.reopen(rootDataDir); diff --git a/history.txt b/history.txt index ccc1e43c96..1955fa0309 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ 2012-06-24 zzz + * ElGamalAESEngine: Fix bad size estimate when tags are included, + resulting in trailing zeros after the padding + in the unencrypted data + * i2psnark: Don't create a new PeerCoordinator after restart, as the + TrackerClient holds on to the old one and that causes it + to not get peers. Possibly fixes ticket #563. * I2PTunnel: Fix NPE on shared client creation, thx kytv * Transport: Add Ethiopia to hidden mode list diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 80cfb9c849..5b1ba79e39 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 18; + public final static long BUILD = 19; /** for example "-test" */ public final static String EXTRA = ""; From 6c8c87b2dd372d7e098bfe0329b035ec1d8d3204 Mon Sep 17 00:00:00 2001 From: zzz Date: Mon, 25 Jun 2012 19:29:51 +0000 Subject: [PATCH 11/17] javadocs --- core/java/src/net/i2p/crypto/ElGamalAESEngine.java | 7 ++++++- core/java/src/net/i2p/crypto/SessionKeyManager.java | 6 ++++-- .../src/net/i2p/crypto/TransientSessionKeyManager.java | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java index 8b6cce5f1d..3aec9e288d 100644 --- a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java @@ -61,7 +61,8 @@ public class ElGamalAESEngine { } /** - * Decrypt the message using the given private key using tags from the default key manager. + * Decrypt the message using the given private key using tags from the default key manager, + * which is the router's key manager. Use extreme care if you aren't the router. * * @deprecated specify the key manager! */ @@ -75,6 +76,10 @@ public class ElGamalAESEngine { * This works according to the * ElGamal+AES algorithm in the data structure spec. * + * Warning - use the correct SessionKeyManager. Clients should instantiate their own. + * Clients using I2PAppContext.sessionKeyManager() may be correlated with the router, + * unless you are careful to use different keys. + * * @return decrypted data or null on failure */ public byte[] decrypt(byte data[], PrivateKey targetPrivateKey, SessionKeyManager keyManager) throws DataFormatException { diff --git a/core/java/src/net/i2p/crypto/SessionKeyManager.java b/core/java/src/net/i2p/crypto/SessionKeyManager.java index 4a9456f7d5..c7af8c9703 100644 --- a/core/java/src/net/i2p/crypto/SessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/SessionKeyManager.java @@ -59,7 +59,8 @@ public class SessionKeyManager { * Associate a new session key with the specified target. Metrics to determine * when to expire that key begin with this call. * - * @deprecated racy + * Racy if called after getCurrentKey() to check for a current session; + * use getCurrentOrNewKey() in that case. */ public void createSession(PublicKey target, SessionKey key) { // nop } @@ -67,7 +68,8 @@ public class SessionKeyManager { /** * Generate a new session key and associate it with the specified target. * - * @deprecated racy + * Racy if called after getCurrentKey() to check for a current session; + * use getCurrentOrNewKey() in that case. */ public SessionKey createSession(PublicKey target) { SessionKey key = KeyGenerator.getInstance().generateSessionKey(); diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index e31adcada7..bd4758a151 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -243,7 +243,8 @@ public class TransientSessionKeyManager extends SessionKeyManager { * Associate a new session key with the specified target. Metrics to determine * when to expire that key begin with this call. * - * @deprecated racy + * Racy if called after getCurrentKey() to check for a current session; + * use getCurrentOrNewKey() in that case. */ @Override public void createSession(PublicKey target, SessionKey key) { From 5a1d52d82ca03d0611441ee5dd0f8a9e7565063e Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 29 Jun 2012 13:30:24 +0000 Subject: [PATCH 12/17] * HTTP Proxy: Change the error code for unknown host from 404 to 500. --- installer/resources/proxy/dnfh-header.ht | 2 +- installer/resources/proxy/dnfh-header_de.ht | 2 +- installer/resources/proxy/dnfh-header_fr.ht | 2 +- installer/resources/proxy/dnfh-header_nl.ht | 2 +- installer/resources/proxy/dnfh-header_ru.ht | 2 +- installer/resources/proxy/dnfh-header_zh.ht | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/installer/resources/proxy/dnfh-header.ht b/installer/resources/proxy/dnfh-header.ht index b50a3395f2..126c93f252 100644 --- a/installer/resources/proxy/dnfh-header.ht +++ b/installer/resources/proxy/dnfh-header.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_de.ht b/installer/resources/proxy/dnfh-header_de.ht index c2c80f198b..9f9413424e 100644 --- a/installer/resources/proxy/dnfh-header_de.ht +++ b/installer/resources/proxy/dnfh-header_de.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_fr.ht b/installer/resources/proxy/dnfh-header_fr.ht index d5dce0364a..c24e2d6bf6 100644 --- a/installer/resources/proxy/dnfh-header_fr.ht +++ b/installer/resources/proxy/dnfh-header_fr.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domaine non trouvé +HTTP/1.1 500 Domaine non trouvé Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_nl.ht b/installer/resources/proxy/dnfh-header_nl.ht index 0dc34e3dbf..296a62dddc 100644 --- a/installer/resources/proxy/dnfh-header_nl.ht +++ b/installer/resources/proxy/dnfh-header_nl.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_ru.ht b/installer/resources/proxy/dnfh-header_ru.ht index cf24cbc277..0e34195599 100644 --- a/installer/resources/proxy/dnfh-header_ru.ht +++ b/installer/resources/proxy/dnfh-header_ru.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_zh.ht b/installer/resources/proxy/dnfh-header_zh.ht index bf69555006..1e65a67c16 100644 --- a/installer/resources/proxy/dnfh-header_zh.ht +++ b/installer/resources/proxy/dnfh-header_zh.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close From ebb6609a2b69a25fe49120b05fee99c63ec1540b Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 29 Jun 2012 14:05:39 +0000 Subject: [PATCH 13/17] fix SimpleTimer logging --- core/java/src/net/i2p/util/Executor.java | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index 1bff875578..463882a25c 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -9,12 +9,13 @@ import net.i2p.I2PAppContext; */ class Executor implements Runnable { private final I2PAppContext _context; - private Log _log; - private final List _readyEvents; + private final Log _log; + private final List _readyEvents; private final SimpleStore runn; - public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { + public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { _context = ctx; + _log = log; _readyEvents = events; runn = x; } @@ -26,7 +27,7 @@ class Executor implements Runnable { if (_readyEvents.isEmpty()) try { _readyEvents.wait(); } catch (InterruptedException ie) {} if (!_readyEvents.isEmpty()) - evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); + evt = _readyEvents.remove(0); } if (evt != null) { @@ -34,21 +35,12 @@ class Executor implements Runnable { try { evt.timeReached(); } catch (Throwable t) { - log("Executing task " + evt + " exited unexpectedly, please report", t); + _log.error("Executing task " + evt + " exited unexpectedly, please report", t); } long time = _context.clock().now() - before; - // FIXME _log won't be non-null unless we already had a CRIT - if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) + if ( (time > 1000) && (_log.shouldLog(Log.WARN)) ) _log.warn("wtf, event execution took " + time + ": " + evt); } } } - - private void log(String msg, Throwable t) { - synchronized (this) { - if (_log == null) - _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); - } - _log.log(Log.CRIT, msg, t); - } } From 4092f618985cb895a3877966907cf3188a9d5fe5 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 29 Jun 2012 14:53:53 +0000 Subject: [PATCH 14/17] * Streaming: - Allow at least 3 packets and up to half the window to be active resends instead of just 1, to reduce stall time after a packet drop - Increase fast retransmit threshold back to 3 to reduce retransmissions - Don't fast retransmit if we recently retransmitted it already - Allow double the window as long as gaps are less than the window - Don't set the MSS in a resent packet (saves 2 bytes) - Remove redundant calls to updateAcks() - Update activity timer when resending a packet - Reset unackedPacketsReceived counter at all places where acks are sent so it isn't wrong - Fix some places where the activeResends count could become wrong - Prevent storm of CLOSE packets - Never resend the whole packet in ackImmediately(), just send an ack - Cancel flusher timer in MessageOutputStream when closed - Move some createRateStats to ConnectionManager to reduce repeated calls - Cleanups, javadocs, logging, volatile, finals --- .../net/i2p/client/streaming/Connection.java | 254 ++++++++++-------- .../streaming/ConnectionDataReceiver.java | 47 ++-- .../client/streaming/ConnectionManager.java | 11 +- .../client/streaming/ConnectionOptions.java | 13 +- .../streaming/ConnectionPacketHandler.java | 45 ++-- .../i2p/client/streaming/MessageHandler.java | 3 +- .../client/streaming/MessageInputStream.java | 28 +- .../client/streaming/MessageOutputStream.java | 81 +++--- .../src/net/i2p/client/streaming/Packet.java | 13 +- .../i2p/client/streaming/PacketHandler.java | 13 +- .../net/i2p/client/streaming/PacketLocal.java | 25 +- .../net/i2p/client/streaming/PacketQueue.java | 15 +- .../client/streaming/SchedulerChooser.java | 10 +- .../client/streaming/SchedulerClosing.java | 17 +- .../i2p/client/streaming/SchedulerImpl.java | 5 + .../net/i2p/client/streaming/TCBShare.java | 4 +- history.txt | 21 ++ .../src/net/i2p/router/RouterVersion.java | 2 +- 18 files changed, 363 insertions(+), 244 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 8db219c607..298a1c21f8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -34,19 +34,19 @@ class Connection { private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; - private boolean _connected; + private volatile boolean _connected; private boolean _hardDisconnected; private final MessageInputStream _inputStream; private final MessageOutputStream _outputStream; private final SchedulerChooser _chooser; - private long _nextSendTime; + private volatile long _nextSendTime; private long _ackedPackets; private final long _createdOn; private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; private long _congestionWindowEnd; - private long _highestAckedThrough; + private volatile long _highestAckedThrough; private boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ @@ -60,11 +60,11 @@ class Connection { private String _connectionError; private long _disconnectScheduledOn; private long _lastReceivedOn; - private ActivityTimer _activityTimer; + private final ActivityTimer _activityTimer; /** window size when we last saw congestion */ private int _lastCongestionSeenAt; private long _lastCongestionTime; - private long _lastCongestionHighestUnacked; + private volatile long _lastCongestionHighestUnacked; private boolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ private final Object _connectLock; @@ -96,7 +96,9 @@ class Connection { } ****/ - /** */ + /** + * @param opts may be null + */ public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, SimpleTimer2 timer, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { @@ -138,10 +140,7 @@ class Connection { _resetSentOn = -1; _connectionEvent = new ConEvent(); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage - _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 }); + // all createRateStats in ConnectionManager if (_log.shouldLog(Log.INFO)) _log.info("New connection created with options: " + _options); } @@ -169,7 +168,6 @@ class Connection { * will return false after 5 minutes even if timeoutMs is <= 0. */ boolean packetSendChoke(long timeoutMs) { - // if (false) return true; // <--- what the fuck?? long start = _context.clock().now(); long writeExpire = start + timeoutMs; // only used if timeoutMs > 0 boolean started = false; @@ -187,19 +185,26 @@ class Connection { if (!_connected) return false; started = true; - if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || - (_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) { + // Try to keep things moving even during NACKs and retransmissions... + // Limit unacked packets to the window + // Limit active resends to half the window + // Limit (highest-lowest) to twice the window (if far end doesn't like it, it can send a choke) + int unacked = _outboundPackets.size(); + int wsz = _options.getWindowSize(); + if (unacked >= wsz || + _activeResends >= (wsz + 1) / 2 || + _lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) { if (timeoutMs > 0) { if (timeLeft <= 0) { if (_log.shouldLog(Log.INFO)) - _log.info("Outbound window is full of " + _outboundPackets.size() - + " with " + _activeResends + " active resends" + _log.info("Outbound window is full " + unacked + + " unacked with " + _activeResends + " active resends" + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): " + toString()); return false; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" + _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/" + _activeResends + "), waiting " + timeLeft); try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} } else { @@ -223,6 +228,12 @@ class Connection { void ackImmediately() { PacketLocal packet = null; +/*** why would we do this? + was it to force a congestion indication at the other end? + an expensive way to do that... + One big user was via SchedulerClosing to resend a CLOSE packet, + but why do that either... + synchronized (_outboundPackets) { if (!_outboundPackets.isEmpty()) { // ordered, so pick the lowest to retransmit @@ -239,6 +250,7 @@ class Connection { } ResendPacketEvent evt = (ResendPacketEvent)packet.getResendEvent(); if (evt != null) { + // fixme should we set a flag and reschedule instead? or synch? boolean sent = evt.retransmit(false); if (sent) { if (_log.shouldLog(Log.DEBUG)) @@ -251,7 +263,9 @@ class Connection { } } } +***/ // if we don't have anything to retransmit, send a small ack + // this calls sendPacket() below packet = _receiver.send(null, 0, 0); if (_log.shouldLog(Log.DEBUG)) _log.debug("sending new ack: " + packet); @@ -281,11 +295,15 @@ class Connection { reply.setReceiveStreamId(_receiveStreamId); reply.setOptionalFrom(_connectionManager.getSession().getMyDestination()); // this just sends the packet - no retries or whatnot - _outboundQueue.enqueue(reply); + if (_outboundQueue.enqueue(reply)) { + _unackedPacketsReceived = 0; + _lastSendTime = _context.clock().now(); + resetActivityTimer(); + } } /** - * Flush any data that we can + * Flush any data that we can. Non-blocking. */ void sendAvailable() { // this grabs the data, builds a packet, and queues it up via sendPacket @@ -301,7 +319,6 @@ class Connection { if (packet == null) return; setNextSendTime(-1); - _unackedPacketsReceived = 0; if (_options.getRequireFullySigned()) { packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED); @@ -328,8 +345,8 @@ class Connection { (packet.getSequenceNum() % 8 == 0)) { packet.setOptionalDelay(0); packet.setFlag(Packet.FLAG_DELAY_REQUESTED); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Requesting no ack delay for packet " + packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Requesting no ack delay for packet " + packet); } else { // This is somewhat of a waste of time, unless the RTT < 4000, // since the other end limits it to getSendAckDelay() @@ -358,10 +375,12 @@ class Connection { // warning, getStatLog() can be null //_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); - _lastSendTime = _context.clock().now(); - _outboundQueue.enqueue(packet); - resetActivityTimer(); - + if (_outboundQueue.enqueue(packet)) { + _unackedPacketsReceived = 0; + _lastSendTime = _context.clock().now(); + resetActivityTimer(); + } + /* if (ackOnly) { // ACK only, don't schedule this packet for retries @@ -397,6 +416,7 @@ class Connection { * @return List of packets acked or null */ List ackPackets(long ackThrough, long nacks[]) { + // FIXME synch this part too? if (ackThrough < _highestAckedThrough) { // dupack which won't tell us anything } else { @@ -415,16 +435,17 @@ class Connection { List acked = null; synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { - Long id = iter.next(); - if (id.longValue() <= ackThrough) { + for (Map.Entry e : _outboundPackets.entrySet()) { + long id = e.getKey().longValue(); + if (id <= ackThrough) { boolean nacked = false; if (nacks != null) { // linear search since its probably really tiny for (int i = 0; i < nacks.length; i++) { - if (nacks[i] == id.longValue()) { + if (nacks[i] == id) { nacked = true; - PacketLocal nackedPacket = _outboundPackets.get(id); + PacketLocal nackedPacket = e.getValue(); + // this will do a fast retransmit if appropriate nackedPacket.incrementNACKs(); break; // NACKed } @@ -433,11 +454,27 @@ class Connection { if (!nacked) { // aka ACKed if (acked == null) acked = new ArrayList(1); - PacketLocal ackedPacket = _outboundPackets.get(id); + PacketLocal ackedPacket = e.getValue(); ackedPacket.ackReceived(); acked.add(ackedPacket); } } else { + // TODO + // we do not currently do an "implicit nack" of the packets higher + // than ackThrough, so those will not be fast retransmitted + // we could incrementNACK them here... but we may need to set the fastRettransmit + // threshold back to 3 for that. + // this will do a fast retransmit if appropriate + // This doesn't work because every packet has an ACK in it, so we hit the + // FAST_TRANSMIT threshold in a heartbeat and retransmit everything, + // even with the threshold at 3. (we never set the NO_ACK field in the header) + // Also, we may need to track that we + // have the same ackThrough for 3 or 4 consecutive times. + // See https://secure.wikimedia.org/wikipedia/en/wiki/Fast_retransmit + //if (_log.shouldLog(Log.INFO)) + // _log.info("ACK thru " + ackThrough + " implicitly NACKs " + id); + //PacketLocal nackedPacket = e.getValue(); + //nackedPacket.incrementNACKs(); break; // _outboundPackets is ordered } } @@ -465,31 +502,33 @@ class Connection { return acked; } - private long _occurredTime; - private long _occurredEventCount; + //private long _occurredTime; + //private long _occurredEventCount; + void eventOccurred() { - long now = System.currentTimeMillis(); + //long now = System.currentTimeMillis(); TaskScheduler sched = _chooser.getScheduler(this); - now = now - now % 1000; - if (_occurredTime == now) { - _occurredEventCount++; - } else { - _occurredTime = now; - if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) { - _log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on " - + toString() + ": scheduler = " + sched); - } - _occurredEventCount = 0; - } + //now = now - now % 1000; + //if (_occurredTime == now) { + // _occurredEventCount++; + //} else { + // _occurredTime = now; + // if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) { + // _log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on " + // + toString() + ": scheduler = " + sched); + // } + // _occurredEventCount = 0; + //} long before = System.currentTimeMillis(); sched.eventOccurred(this); long elapsed = System.currentTimeMillis() - before; - if ( (elapsed > 1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Took " + elapsed + "ms to pump through " + sched); + // 250 and warn for debugging + if ( (elapsed > 250) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString()); } void resetReceived() { @@ -498,12 +537,8 @@ class Connection { SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); } _resetReceived = true; - MessageOutputStream mos = _outputStream; - MessageInputStream mis = _inputStream; - if (mos != null) - mos.streamErrorOccurred(new IOException("Reset received")); - if (mis != null) - mis.streamErrorOccurred(new IOException("Reset received")); + _outputStream.streamErrorOccurred(new IOException("Reset received")); + _inputStream.streamErrorOccurred(new IOException("Reset received")); _connectionError = "Connection reset"; synchronized (_connectLock) { _connectLock.notifyAll(); } } @@ -556,15 +591,10 @@ class Connection { s.destroy2(); _socket = null; } - if (_outputStream != null) - _outputStream.destroy(); - if (_receiver != null) - _receiver.destroy(); - if (_activityTimer != null) - _activityTimer.cancel(); - //_activityTimer = null; - if (_inputStream != null) - _inputStream.streamErrorOccurred(new IOException("disconnected!")); + _outputStream.destroy(); + _receiver.destroy(); + _activityTimer.cancel(); + _inputStream.streamErrorOccurred(new IOException("disconnected!")); if (_disconnectScheduledOn < 0) { _disconnectScheduledOn = _context.clock().now(); @@ -656,11 +686,7 @@ class Connection { * @return Last time we sent data */ public long getLastSendTime() { return _lastSendTime; } - /** Set the time we sent data. - * @param when The time we sent data - */ - public void setLastSendTime(long when) { _lastSendTime = when; } - + /** What was the last packet Id sent to the peer? * @return The last sent packet ID */ @@ -795,10 +821,9 @@ class Connection { public long getCongestionWindowEnd() { return _congestionWindowEnd; } public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; } + /** @return the highest outbound packet we have recieved an ack for */ public long getHighestAckedThrough() { return _highestAckedThrough; } - /** @deprecated unused */ - public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } public long getLastActivityOn() { return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn); @@ -878,17 +903,12 @@ class Connection { } private void resetActivityTimer() { - if (_options.getInactivityTimeout() <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); - return; - } - if (_activityTimer == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); - return; - } long howLong = _options.getInactivityTimeout(); + if (howLong <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); + return; + } howLong += _randomWait; // randomize it a bit, so both sides don't do it at once //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Resetting the inactivity timer to " + howLong); @@ -983,12 +1003,12 @@ class Connection { } /** stream that the local peer receives data on - * @return the inbound message stream + * @return the inbound message stream, non-null */ public MessageInputStream getInputStream() { return _inputStream; } /** stream that the local peer sends data to the remote peer on - * @return the outbound message stream + * @return the outbound message stream, non-null */ public MessageOutputStream getOutputStream() { return _outputStream; } @@ -1032,12 +1052,10 @@ class Connection { */ buf.append("unacked in: ").append(getUnackedPacketsReceived()); int missing = 0; - if (_inputStream != null) { - long nacks[] = _inputStream.getNacks(); - if (nacks != null) { - missing = nacks.length; - buf.append(" [").append(missing).append(" missing]"); - } + long nacks[] = _inputStream.getNacks(); + if (nacks != null) { + missing = nacks.length; + buf.append(" [").append(missing).append(" missing]"); } if (getResetSent()) @@ -1053,8 +1071,7 @@ class Connection { if (getCloseReceivedOn() > 0) buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago"); buf.append(" sent: ").append(1 + _lastSendId.get()); - if (_inputStream != null) - buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); + buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); @@ -1086,14 +1103,15 @@ class Connection { * there are other packets in flight. 3 takes forever, let's try 2. * */ - static final int FAST_RETRANSMIT_THRESHOLD = 2; + static final int FAST_RETRANSMIT_THRESHOLD = 3; /** * Coordinate the resends of a given packet */ class ResendPacketEvent extends SimpleTimer2.TimedEvent { - private PacketLocal _packet; + private final PacketLocal _packet; private long _nextSendTime; + public ResendPacketEvent(PacketLocal packet, long delay) { super(_timer); _packet = packet; @@ -1111,6 +1129,8 @@ class Connection { * we have to use forceReschedule() instead of schedule() below, * to prevent duplicates in the timer queue. * + * don't synchronize this, deadlock with ackPackets->ackReceived->SimpleTimer2.cancel + * * @param penalize true if this retransmission is caused by a timeout, false if we * are just sending this packet instead of an ACK * @return true if the packet was sent, false if it was not @@ -1131,7 +1151,12 @@ class Connection { boolean resend = false; boolean isLowest = false; synchronized (_outboundPackets) { - if (_packet.getSequenceNum() == _highestAckedThrough + 1) + // allow appx. half the window to be "lowest" and be active resends, minimum of 3 + // Note: we should really pick the N lowest, not the lowest one + N more who + // happen to get here next, as the timers get out-of-order esp. after fast retx + if (_packet.getSequenceNum() == _highestAckedThrough + 1 || + _packet.getNumSends() > 1 || + _activeResends < Math.max(3, (_options.getWindowSize() + 1) / 2)) isLowest = true; if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum()))) resend = true; @@ -1145,24 +1170,28 @@ class Connection { // BUG? seq# = 0, activeResends = 0, loop forever - why? // also seen with seq# > 0. Is the _activeResends count reliable? if (_log.shouldLog(Log.INFO)) - _log.info("Delaying resend of " + _packet + " as there are " - + _activeResends + " active resends already in play"); - forceReschedule(1000); - _nextSendTime = 1000 + _context.clock().now(); + _log.info("Delaying resend of " + _packet + " with " + + _activeResends + " active resend, " + + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize()); + forceReschedule(1333); + _nextSendTime = 1333 + _context.clock().now(); return false; } + // It's the lowest, or it's fast retransmit time. Resend the packet. + if (fastRetransmit) _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime()); // revamp various fields, in case we need to ack more, etc - _inputStream.updateAcks(_packet); + // updateAcks done in enqueue() + //_inputStream.updateAcks(_packet); int choke = getOptions().getChoke(); _packet.setOptionalDelay(choke); if (choke > 0) _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); // this seems unnecessary to send the MSS again: - _packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); + //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); // bugfix release 0.7.8, we weren't dividing by 1000 _packet.setResendDelay(getOptions().getResendDelay() / 1000); if (_packet.getReceiveStreamId() <= 0) @@ -1186,7 +1215,7 @@ class Connection { getOptions().setWindowSize(newWindowSize); if (_log.shouldLog(Log.WARN)) - _log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize + _log.warn("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize + "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString()); windowAdjusted(); @@ -1195,10 +1224,6 @@ class Connection { int numSends = _packet.getNumSends() + 1; - if (numSends == 2) { - // first resend for this packet - _activeResends++; - } // in case things really suck, the other side may have lost thier // session tags (e.g. they restarted), so jump back to ElGamal. @@ -1225,27 +1250,34 @@ class Connection { // set this before enqueue() as it passes it on to the router _nextSendTime = timeout + _context.clock().now(); - if (_log.shouldLog(Log.INFO)) - _log.info("Resend packet " + _packet + " time " + numSends + + if (_outboundQueue.enqueue(_packet)) { + // first resend for this packet ? + if (numSends == 2) + _activeResends++; + if (_log.shouldLog(Log.INFO)) + _log.info("Resent packet " + + (fastRetransmit ? "(fast) " : "(timeout) ") + + _packet + + " next resend in " + timeout + "ms" + " activeResends: " + _activeResends + " (wsize " + newWindowSize + " lifetime " + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); - _outboundQueue.enqueue(_packet); - _lastSendTime = _context.clock().now(); + _unackedPacketsReceived = 0; + _lastSendTime = _context.clock().now(); + // timer reset added 0.9.1 + resetActivityTimer(); + } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling resend in " + timeout + "ms for " + _packet); forceReschedule(timeout); } - // acked during resending (... or somethin') + // acked during resending (... or somethin') ???????????? if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) { _activeResends--; synchronized (_outboundPackets) { _outboundPackets.notifyAll(); } - return true; } return true; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 900fb96267..8f2a40f469 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -10,7 +10,12 @@ import net.i2p.util.Log; * do NOT block, but they also do not necessary imply immediate * delivery, or even the generation of a new packet. This class * is the only one that builds useful outbound Packet objects. - * + *

+ * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession + *

+ * There's one of these per MessageOutputStream. + * It stores no state. It sends everything to the Connection unless + * the Connection is closed, */ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private final I2PAppContext _context; @@ -82,7 +87,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if (_log.shouldLog(Log.INFO) && !doSend) _log.info("writeData called: size="+size + " doSend=" + doSend + " unackedReceived: " + con.getUnackedPacketsReceived() - + " con: " + con, new Exception("write called by")); + + " con: " + con /* , new Exception("write called by") */ ); if (doSend) { PacketLocal packet = send(buf, off, size); @@ -111,6 +116,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { public PacketLocal send(byte buf[], int off, int size) { return send(buf, off, size, false); } + /** * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from @@ -120,22 +126,20 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return the packet sent */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { - Connection con = _connection; - //if (con == null) return null; - long before = System.currentTimeMillis(); - PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); - long built = System.currentTimeMillis(); - con.sendPacket(packet); - long sent = System.currentTimeMillis(); + //long before = System.currentTimeMillis(); + PacketLocal packet = buildPacket(buf, off, size, forceIncrement); + //long built = System.currentTimeMillis(); + _connection.sendPacket(packet); + //long sent = System.currentTimeMillis(); - if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet); - if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet); + //if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) ) + // _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet); + //if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) ) + // _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet); return packet; } - private boolean isAckOnly(Connection con, int size) { + private static boolean isAckOnly(Connection con, int size) { boolean ackOnly = ( (size <= 0) && // no data (con.getLastSendId() >= 0) && // not a SYN ( (!con.getOutputStream().getClosed()) || // not a CLOSE @@ -144,7 +148,16 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { return ackOnly; } - private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) { + /** + * @param buf data to be sent - may be null + * @param off offset into the buffer to start writing from + * @param size how many bytes of the buffer to write (may be 0) + * @param forceIncrement even if the buffer is empty, increment the packetId + * so we get an ACK back + * @return the packet to be sent + */ + private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { + Connection con = _connection; if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")"); boolean ackOnly = isAckOnly(con, size); boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0); @@ -164,7 +177,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setSendStreamId(con.getSendStreamId()); packet.setReceiveStreamId(con.getReceiveStreamId()); - con.getInputStream().updateAcks(packet); + // not needed here, handled in PacketQueue.enqueue() + //con.getInputStream().updateAcks(packet); // note that the optional delay is usually rewritten in Connection.sendPacket() int choke = con.getOptions().getChoke(); packet.setOptionalDelay(choke); @@ -195,6 +209,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { // don't set the closed flag if this is a plain ACK and there are outstanding // packets sent, otherwise the other side could receive the CLOSE prematurely, // since this ACK could arrive before the unacked payload message. + // TODO if the only unacked packet is the CLOSE packet and it didn't have any data... if (con.getOutputStream().getClosed() && ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index b2d9a201ac..739a420177 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -75,6 +75,7 @@ class ConnectionManager { /** Socket timeout for accept() */ _soTimeout = -1; + // Stats for this class _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -85,6 +86,14 @@ class ConnectionManager { _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + // Stats for Connection + _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 }); + // Stats for PacketQueue + _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } Connection getConnectionByInboundId(long id) { @@ -420,13 +429,11 @@ class ConnectionManager { if (removed) { _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime()); MessageInputStream stream = con.getInputStream(); - if (stream != null) { long rcvd = 1 + stream.getHighestBlockId(); long nacks[] = stream.getNacks(); if (nacks != null) rcvd -= nacks.length; _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", rcvd, con.getLifetime()); - } _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index ff3c545dfb..42a2fc95a6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -98,6 +98,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000; static final int MIN_WINDOW_SIZE = 1; private static final boolean DEFAULT_ANSWER_PINGS = true; + private static final int DEFAULT_INACTIVITY_TIMEOUT = 90*1000; + private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND; + + /** * If PROTO is enforced, we cannot communicate with destinations earlier than version 0.7.1. * @since 0.9.1 @@ -302,6 +306,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); } + /** called by super's constructor */ @Override protected void init(Properties opts) { super.init(opts); @@ -318,8 +323,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); // handled in super() //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); - setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); - setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT)); + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); @@ -367,9 +372,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { //if (opts.containsKey(PROP_WRITE_TIMEOUT)) // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) - setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) - setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR)) setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 53ebb17e1b..720c38fdf6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -12,7 +12,13 @@ import net.i2p.util.SimpleTimer; /** * Receive a packet for a particular connection - placing the data onto the * queue, marking packets as acked, updating various fields, etc. - * + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream + *

+ * One of these is instantiated per-Destination + * (i.e. per-ConnectionManager, not per-Connection). + * It doesn't store any state. + */ class ConnectionPacketHandler { private final I2PAppContext _context; @@ -94,19 +100,24 @@ class ConnectionPacketHandler { } } - long ready = con.getInputStream().getHighestReadyBockId(); - int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); - int allowedBlocks = available/con.getOptions().getMaxMessageSize(); - if ( (packet.getPayloadSize() > 0) && (packet.getSequenceNum() > ready + allowedBlocks) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Inbound buffer exceeded on connection " + con + " (" - + ready + "/"+ (ready+allowedBlocks) + "/" + available - + ": dropping " + packet); - ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke); - con.getOptions().setChoke(61*1000); - packet.releasePayload(); - con.ackImmediately(); - return; + if (packet.getPayloadSize() > 0) { + // Here, for the purposes of calculating whether the input stream is full, + // we assume all the not-ready blocks are the max message size. + // This prevents us from getting DoSed by accepting unlimited out-of-order small messages + long ready = con.getInputStream().getHighestReadyBockId(); + int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); + int allowedBlocks = available/con.getOptions().getMaxMessageSize(); + if (packet.getSequenceNum() > ready + allowedBlocks) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Inbound buffer exceeded on connection " + con + " (" + + ready + "/"+ (ready+allowedBlocks) + "/" + available + + ": dropping " + packet); + ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke); + con.getOptions().setChoke(61*1000); + packet.releasePayload(); + con.ackImmediately(); + return; + } } con.getOptions().setChoke(0); @@ -513,12 +524,14 @@ class ConnectionPacketHandler { } private class AckDup implements SimpleTimer.TimedEvent { - private long _created; - private Connection _con; + private final long _created; + private final Connection _con; + public AckDup(Connection con) { _created = _context.clock().now(); _con = con; } + public void timeReached() { if (_con.getLastSendTime() <= _created) { if (_con.getResetReceived() || _con.getResetSent()) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index d9ca691b4d..4aba07d314 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -13,7 +13,8 @@ import net.i2p.util.Log; /** * Receive raw information from the I2PSession and turn it into * Packets, if we can. - * + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream */ class MessageHandler implements I2PSessionMuxedListener { private final ConnectionManager _manager; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 956eb9b510..eb168d1106 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -16,6 +16,11 @@ import net.i2p.util.Log; /** * Stream that can be given messages out of order * yet present them in order. + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream + *

+ * This buffers unlimited data via messageReceived() - + * limiting / blocking is done in ConnectionPacketHandler.receivePacket(). * */ class MessageInputStream extends InputStream { @@ -113,6 +118,9 @@ class MessageInputStream extends InputStream { } } + /** + * Adds the ack-through and nack fields to a packet we are building for transmission + */ public void updateAcks(PacketLocal packet) { synchronized (_dataLock) { packet.setAckThrough(_highestBlockId); @@ -126,6 +134,7 @@ class MessageInputStream extends InputStream { * * @return block IDs greater than the highest ready block ID, or null if there aren't any. */ +/*** public long[] getOutOfOrderBlocks() { long blocks[] = null; synchronized (_dataLock) { @@ -140,15 +149,18 @@ class MessageInputStream extends InputStream { Arrays.sort(blocks); return blocks; } +***/ /** how many blocks have we received that we still have holes before? * @return Count of blocks received that still have holes */ +/*** public int getOutOfOrderBlockCount() { synchronized (_dataLock) { return _notYetReadyBlocks.size(); } } +***/ /** * how long a read() call should block (if less than 0, block indefinitely, @@ -205,9 +217,9 @@ class MessageInputStream extends InputStream { * @return true if this is a new packet, false if it is a dup */ public boolean messageReceived(long messageId, ByteArray payload) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); synchronized (_dataLock) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); if (messageId <= _highestReadyBlockId) { if (_log.shouldLog(Log.DEBUG)) _log.debug("ignoring dup message " + messageId); @@ -237,7 +249,6 @@ class MessageInputStream extends InputStream { cur++; _highestReadyBlockId++; } - _dataLock.notifyAll(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("message is out of order: " + messageId); @@ -245,8 +256,8 @@ class MessageInputStream extends InputStream { _notYetReadyBlocks.put(Long.valueOf(messageId), new ByteArray(null)); else _notYetReadyBlocks.put(Long.valueOf(messageId), payload); - _dataLock.notifyAll(); } + _dataLock.notifyAll(); } return true; } @@ -278,7 +289,7 @@ class MessageInputStream extends InputStream { while (_readyDataBlocks.isEmpty()) { if (_locallyClosed) - throw new IOException("Already closed, you wanker"); + throw new IOException("Already closed"); if ( (_notYetReadyBlocks.isEmpty()) && (_closeReceived) ) { if (_log.shouldLog(Log.INFO)) @@ -360,7 +371,7 @@ class MessageInputStream extends InputStream { @Override public int available() throws IOException { - if (_locallyClosed) throw new IOException("Already closed, you wanker"); + if (_locallyClosed) throw new IOException("Already closed"); throwAnyError(); int numBytes = 0; synchronized (_dataLock) { @@ -384,6 +395,7 @@ class MessageInputStream extends InputStream { * * @return Count of bytes waiting to be read */ +/*** public int getTotalQueuedSize() { synchronized (_dataLock) { if (_locallyClosed) return 0; @@ -401,7 +413,11 @@ class MessageInputStream extends InputStream { return numBytes; } } +***/ + /** + * Same as available() but doesn't throw IOE + */ public int getTotalReadySize() { synchronized (_dataLock) { if (_locallyClosed) return 0; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 3a4cd42060..1470bd5025 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -14,6 +14,8 @@ import net.i2p.util.SimpleTimer2; * A stream that we can shove data into that fires off those bytes * on flush or when the buffer is full. It also blocks according * to the data receiver's needs. + *

+ * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession */ class MessageOutputStream extends OutputStream { private final I2PAppContext _context; @@ -21,17 +23,17 @@ class MessageOutputStream extends OutputStream { private byte _buf[]; private int _valid; private final Object _dataLock; - private DataReceiver _dataReceiver; + private final DataReceiver _dataReceiver; private IOException _streamError; - private boolean _closed; + private volatile boolean _closed; private long _written; private int _writeTimeout; private ByteCache _dataCache; private final Flusher _flusher; private long _lastFlushed; - private long _lastBuffered; + private volatile long _lastBuffered; /** if we enqueue data but don't flush it in this period, flush it passively */ - private int _passiveFlushDelay; + private final int _passiveFlushDelay; /** * if we are changing the buffer size during operation, set this to the new * buffer size, and next time we are flushing, update the _buf array to the new @@ -39,9 +41,9 @@ class MessageOutputStream extends OutputStream { */ private volatile int _nextBufferSize; // rate calc helpers - private long _sendPeriodBeginTime; - private long _sendPeriodBytes; - private int _sendBps; + //private long _sendPeriodBeginTime; + //private long _sendPeriodBytes; + //private int _sendBps; /** * Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000, @@ -73,11 +75,11 @@ class MessageOutputStream extends OutputStream { _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; - _sendPeriodBeginTime = ctx.clock().now(); - _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + //_sendPeriodBeginTime = ctx.clock().now(); + //_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(timer); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("MessageOutputStream created"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("MessageOutputStream created"); } public void setWriteTimeout(int ms) { @@ -131,15 +133,9 @@ class MessageOutputStream extends OutputStream { remaining -= toWrite; cur += toWrite; _valid = _buf.length; - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; - if (rcvr == null) { - throwAnyError(); - return; - } if (_log.shouldLog(Log.INFO)) _log.info("write() direct valid = " + _valid); - ws = rcvr.writeData(_buf, 0, _valid); + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; throwAnyError(); @@ -167,17 +163,18 @@ class MessageOutputStream extends OutputStream { _log.info("After waitForAccept of " + ws); } } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Queued " + len + " without sending to the receiver"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Queued " + len + " without sending to the receiver"); } } long elapsed = _context.clock().now() - begin; if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) ) _log.info("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo")); throwAnyError(); - updateBps(len); + //updateBps(len); } +/**** private void updateBps(int len) { long now = _context.clock().now(); int periods = (int)Math.floor((now - _sendPeriodBeginTime) / 1000d); @@ -191,7 +188,9 @@ class MessageOutputStream extends OutputStream { _sendPeriodBytes += len; } } +****/ + /** */ public void write(int b) throws IOException { write(new byte[] { (byte)b }, 0, 1); throwAnyError(); @@ -240,14 +239,15 @@ class MessageOutputStream extends OutputStream { _enqueued = true; } public void timeReached() { + if (_closed) + return; _enqueued = false; - DataReceiver rec = _dataReceiver; long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) _log.debug("flusher time reached: left = " + timeLeft); if (timeLeft > 0) enqueue(); - else if ( (rec != null) && (rec.writeInProcess()) ) + else if (_dataReceiver.writeInProcess()) enqueue(); // don't passive flush if there is a write being done (unacked outbound) else doFlush(); @@ -261,10 +261,8 @@ class MessageOutputStream extends OutputStream { if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { if (_log.shouldLog(Log.INFO)) _log.info("doFlush() valid = " + _valid); - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; - if ( (_buf != null) && (rcvr != null) ) { - ws = rcvr.writeData(_buf, 0, _valid); + if (_buf != null) { + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; _lastFlushed = _context.clock().now(); @@ -317,25 +315,18 @@ class MessageOutputStream extends OutputStream { if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("flush() valid = " + _valid); - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; synchronized (_dataLock) { if (_buf == null) { _dataLock.notifyAll(); throw new IOException("closed (buffer went away)"); } - if (rcvr == null) { - _dataLock.notifyAll(); - throwAnyError(); - return; - } // if valid == 0 return ??? - no, this could flush a CLOSE packet too. // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below // (disabled) if (!wait_for_accept_only) { - ws = rcvr.writeData(_buf, 0, _valid); + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; locked_updateBufferSize(); @@ -347,7 +338,7 @@ class MessageOutputStream extends OutputStream { // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1 // must do this outside the data lock if (wait_for_accept_only) { - flushAvailable(rcvr, true); + flushAvailable(_dataReceiver, true); return; } @@ -387,6 +378,7 @@ class MessageOutputStream extends OutputStream { } // setting _closed before flush() will force flush() to send a CLOSE packet _closed = true; + _flusher.cancel(); // In 0.8.1 we rewrote flush() to only wait for accept into the window, // not "completion" (i.e. ack from the far end). @@ -415,10 +407,11 @@ class MessageOutputStream extends OutputStream { /** * nonblocking close - - * Use outside of this package is deprecated, should be made package local + * Only for use inside package */ public void closeInternal() { _closed = true; + _flusher.cancel(); if (_streamError == null) _streamError = new IOException("Closed internally"); clearData(true); @@ -429,12 +422,10 @@ class MessageOutputStream extends OutputStream { if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("clearData() valid = " + _valid); - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; synchronized (_dataLock) { // flush any data, but don't wait for it - if ( (rcvr != null) && (_valid > 0) && shouldFlush) - rcvr.writeData(_buf, 0, _valid); + if (_valid > 0 && shouldFlush) + _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; @@ -503,15 +494,15 @@ class MessageOutputStream extends OutputStream { throw new InterruptedIOException("Flush available timed out (" + _writeTimeout + "ms)"); } long afterAccept = System.currentTimeMillis(); - if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.DEBUG)) ) - _log.debug("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws); + if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.INFO)) ) + _log.info("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws); return; } void destroy() { - _dataReceiver = null; + _closed = true; + _flusher.cancel(); synchronized (_dataLock) { - _closed = true; _dataLock.notifyAll(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index bb3e723b65..a79f67cb75 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -395,6 +395,7 @@ class Packet { DataHelper.toLong(buffer, cur, 4, _ackThrough > 0 ? _ackThrough : 0); cur += 4; if (_nacks != null) { + // if max win is ever > 255, limit to 255 DataHelper.toLong(buffer, cur, 1, _nacks.length); cur++; for (int i = 0; i < _nacks.length; i++) { @@ -461,7 +462,7 @@ class Packet { * @return How large the current packet would be * @throws IllegalStateException */ - public int writtenSize() throws IllegalStateException { + private int writtenSize() { int size = 0; size += 4; // _sendStreamId.length; size += 4; // _receiveStreamId.length; @@ -469,6 +470,7 @@ class Packet { size += 4; // ackThrough if (_nacks != null) { size++; // nacks length + // if max win is ever > 255, limit to 255 size += 4 * _nacks.length; } else { size++; // nacks length @@ -671,10 +673,11 @@ class Packet { buf.append(toId(_sendStreamId)); //buf.append("<-->"); buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum); - if (_sequenceNum < 10) - buf.append(" \t"); // so the tab lines up right - else - buf.append('\t'); + //if (_sequenceNum < 10) + // buf.append(" \t"); // so the tab lines up right + //else + // buf.append('\t'); + buf.append(' '); buf.append(toFlagString()); buf.append(" ACK ").append(getAckThrough()); if (_nacks != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index ef145179c3..34a59475e6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -2,7 +2,6 @@ package net.i2p.client.streaming; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.Iterator; import java.util.Set; import net.i2p.I2PAppContext; @@ -13,7 +12,8 @@ import net.i2p.util.Log; /** * receive a packet and dispatch it correctly to the connection specified, * the server socket, or queue a reply RST packet. - * + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream */ class PacketHandler { private final ConnectionManager _manager; @@ -86,6 +86,7 @@ class PacketHandler { } *****/ + /** */ void receivePacket(Packet packet) { //boolean ok = choke(packet); //if (ok) @@ -202,15 +203,13 @@ class PacketHandler { // someone is sending us a packet on the wrong stream // It isn't a SYN so it isn't likely to have a FROM to send a reset back to if (_log.shouldLog(Log.ERROR)) { - Set cons = _manager.listConnections(); StringBuilder buf = new StringBuilder(512); buf.append("Received a packet on the wrong stream: "); buf.append(packet); buf.append("\nthis connection:\n"); buf.append(con); buf.append("\nall connections:"); - for (Iterator iter = cons.iterator(); iter.hasNext();) { - Connection cur = (Connection)iter.next(); + for (Connection cur : _manager.listConnections()) { buf.append('\n').append(cur); } _log.error(buf.toString(), new Exception("Wrong stream")); @@ -299,9 +298,7 @@ class PacketHandler { } if (_log.shouldLog(Log.DEBUG)) { StringBuilder buf = new StringBuilder(128); - Set cons = _manager.listConnections(); - for (Iterator iter = cons.iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); + for (Connection con : _manager.listConnections()) { buf.append(con.toString()).append(" "); } _log.debug("connections: " + buf.toString() + " sendId: " diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index ca2e25d42d..748fe19cab 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -16,11 +16,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private final I2PAppContext _context; private final Log _log; private final Connection _connection; - private Destination _to; + private final Destination _to; private SessionKey _keyUsed; private Set _tagsSent; private final long _createdOn; - private int _numSends; + private volatile int _numSends; private long _lastSend; private long _acceptedOn; private long _ackOn; @@ -45,7 +45,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public Destination getTo() { return _to; } - public void setTo(Destination to) { _to = to; } /** * @deprecated should always return null @@ -72,6 +71,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { public void setTagsSent(Set tags) { if (tags != null && !tags.isEmpty()) _log.error("Who is sending tags thru the streaming lib? " + tags.size()); + /**** if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) && (!tags.isEmpty()) ) { //int old = _tagsSent.size(); //_tagsSent.addAll(tags); @@ -80,6 +80,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } else { _tagsSent = tags; } + ****/ } public boolean shouldSign() { @@ -142,10 +143,15 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { /** @return null if not bound */ public Connection getConnection() { return _connection; } + /** + * Will force a fast restransmit on the 3rd call (FAST_RETRANSMIT_THRESHOLD) + * but only if it's the lowest unacked (see Connection.ResendPacketEvent) + */ public void incrementNACKs() { int cnt = ++_nackCount; SimpleTimer2.TimedEvent evt = _resendEvent; - if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) { + if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) && + (_numSends == 1 || _lastSend < _context.clock().now() + 4*1000)) { // Don't fast retx if we recently resent it _retransmitted = true; evt.reschedule(0); } @@ -162,8 +168,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { if (con != null) buf.append(" rtt ").append(con.getOptions().getRTT()); - if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) ) - buf.append(" with tags"); + //if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) ) + // buf.append(" with tags"); + + if (_nackCount > 0) + buf.append(" nacked ").append(_nackCount).append(" times"); if (_ackOn > 0) buf.append(" ack after ").append(getAckTime()); @@ -200,8 +209,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { * @param maxWaitMs MessageOutputStream is the only caller, generally with -1 */ public void waitForAccept(int maxWaitMs) { - if (_connection == null) - throw new IllegalStateException("Cannot wait for accept with no connection"); long before = _context.clock().now(); int queued = _connection.getUnackedPacketsSent(); int window = _connection.getOptions().getWindowSize(); @@ -216,7 +223,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { int afterQueued = _connection.getUnackedPacketsSent(); if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) _log.debug("Took " + (after-before) + "ms to get " - + (accepted ? " accepted" : " rejected") + + (accepted ? "accepted" : "rejected") + (_cancelledOn > 0 ? " and CANCELLED" : "") + ", queued behind " + queued +" with a window size of " + window + ", finally accepted with " + afterQueued + " queued: " diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 724e44202f..5e7ae6acc3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -12,7 +12,8 @@ import net.i2p.util.Log; * Well, thats the theory at least... in practice we just * send them immediately with no blocking, since the * mode=bestEffort doesnt block in the SDK. - * + *

+ * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession */ class PacketQueue { private final I2PAppContext _context; @@ -26,16 +27,17 @@ class PacketQueue { _session = session; _connectionManager = mgr; _log = context.logManager().getLog(PacketQueue.class); - _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + // all createRateStats in ConnectionManager } /** * Add a new packet to be sent out ASAP * * keys and tags disabled since dropped in I2PSession + * @return true if sent */ - public void enqueue(PacketLocal packet) { + public boolean enqueue(PacketLocal packet) { + // this updates the ack/nack field packet.prepare(); //SessionKey keyUsed = packet.getKeyUsed(); @@ -52,7 +54,7 @@ class PacketQueue { if (packet.getAckTime() > 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Not resending " + packet); - return; + return false; } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending... " + packet); @@ -76,7 +78,7 @@ class PacketQueue { _log.warn("took " + writeTime + "ms to write the packet: " + packet); // last chance to short circuit... - if (packet.getAckTime() > 0) return; + if (packet.getAckTime() > 0) return false; // this should not block! begin = _context.clock().now(); @@ -158,6 +160,7 @@ class PacketQueue { // reset packet.releasePayload(); } + return sent; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java index b0167809ab..17918a6e12 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java @@ -15,7 +15,7 @@ class SchedulerChooser { private final Log _log; private final TaskScheduler _nullScheduler; /** list of TaskScheduler objects */ - private final List _schedulers; + private final List _schedulers; public SchedulerChooser(I2PAppContext context) { _context = context; @@ -26,7 +26,7 @@ class SchedulerChooser { public TaskScheduler getScheduler(Connection con) { for (int i = 0; i < _schedulers.size(); i++) { - TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i); + TaskScheduler scheduler = _schedulers.get(i); if (scheduler.accept(con)) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName()); @@ -50,11 +50,7 @@ class SchedulerChooser { } private class NullScheduler implements TaskScheduler { - private final Log _log; - public NullScheduler() { - _log = _context.logManager().getLog(NullScheduler.class); - } - + public void eventOccurred(Connection con) { if (_log.shouldLog(Log.WARN)) _log.warn("Yell at jrandom: Event occurred on " + con, new Exception("source")); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java index 29e69f6fa3..b8754f7845 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java @@ -45,18 +45,25 @@ class SchedulerClosing extends SchedulerImpl { } public void eventOccurred(Connection con) { - if (con.getNextSendTime() <= 0) - con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); - long remaining = con.getNextSendTime() - _context.clock().now(); + long nextSend = con.getNextSendTime(); + long now = _context.clock().now(); + long remaining; + if (nextSend <= 0) { + remaining = con.getOptions().getSendAckDelay(); + nextSend = now + remaining; + con.setNextSendTime(nextSend); + } else { + remaining = nextSend - now; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Event occurred w/ remaining: " + remaining + " on " + con); if (remaining <= 0) { if (con.getCloseSentOn() <= 0) { con.sendAvailable(); - con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { - con.ackImmediately(); + //con.ackImmediately(); } + con.setNextSendTime(now + con.getOptions().getSendAckDelay()); } else { //if (remaining < 5*1000) // remaining = 5*1000; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java index ace5916297..fd36b7723c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -19,4 +19,9 @@ abstract class SchedulerImpl implements TaskScheduler { protected void reschedule(long msToWait, Connection con) { SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait); } + + @Override + public String toString() { + return getClass().getSimpleName(); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index e4c4ec9173..15b8e937e7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -130,8 +130,8 @@ class TCBShare { super(timer); } public void timeReached() { - for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) { - if (_cache.get(iter.next()).isExpired()) + for (Iterator iter = _cache.values().iterator(); iter.hasNext(); ) { + if (iter.next().isExpired()) iter.remove(); } schedule(CLEAN_TIME); diff --git a/history.txt b/history.txt index 1955fa0309..a06d638513 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,24 @@ +2012-06-29 zzz + * HTTP Proxy: Change the error code for unknown host from 404 to 500 + * SimpleTimer: Fix logging + * Streaming: + - Allow at least 3 packets and up to half the window to be active resends + instead of just 1, to reduce stall time after a packet drop + - Increase fast retransmit threshold back to 3 to reduce retransmissions + - Don't fast retransmit if we recently retransmitted it already + - Allow double the window as long as gaps are less than the window + - Don't set the MSS in a resent packet (saves 2 bytes) + - Remove redundant calls to updateAcks() + - Update activity timer when resending a packet + - Reset unackedPacketsReceived counter at all places where acks are sent + so it isn't wrong + - Fix some places where the activeResends count could become wrong + - Prevent storm of CLOSE packets + - Never resend the whole packet in ackImmediately(), just send an ack + - Cancel flusher timer in MessageOutputStream when closed + - Move some createRateStats to ConnectionManager to reduce repeated calls + - Cleanups, javadocs, logging, volatile, finals + 2012-06-24 zzz * ElGamalAESEngine: Fix bad size estimate when tags are included, resulting in trailing zeros after the padding diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 5b1ba79e39..f5c9264549 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 19; + public final static long BUILD = 20; /** for example "-test" */ public final static String EXTRA = ""; From ab1855071110c88d335edeaa2c47e77634a7723f Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 29 Jun 2012 16:25:22 +0000 Subject: [PATCH 15/17] * Update: Increase eepget timeouts to reduce retries --- .../net/i2p/router/web/PluginUpdateChecker.java | 2 +- .../net/i2p/router/web/PluginUpdateHandler.java | 2 +- .../net/i2p/router/web/UnsignedUpdateHandler.java | 2 +- .../src/net/i2p/router/web/UpdateHandler.java | 8 ++++++-- core/java/src/net/i2p/util/EepGet.java | 15 ++++++++++++++- history.txt | 1 + 6 files changed, 24 insertions(+), 6 deletions(-) diff --git a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java index 98ad9f6034..235bf3731b 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java @@ -158,7 +158,7 @@ public class PluginUpdateChecker extends UpdateHandler { try { _get = new PartialEepGet(_context, proxyHost, proxyPort, _baos, _xpi2pURL, TrustedUpdate.HEADER_BYTES); _get.addStatusListener(PluginUpdateCheckerRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT); } catch (Throwable t) { _log.error("Error checking update for plugin", t); } diff --git a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java index ce60170361..25514992c2 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java @@ -149,7 +149,7 @@ public class PluginUpdateHandler extends UpdateHandler { else _get = new EepGet(_context, 1, _updateFile, _xpi2pURL, false); _get.addStatusListener(PluginUpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT, -1, shouldProxy ? INACTIVITY_TIMEOUT : NOPROXY_INACTIVITY_TIMEOUT); } catch (Throwable t) { _log.error("Error downloading plugin", t); } diff --git a/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java index f728783758..4725f62b16 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java @@ -75,7 +75,7 @@ public class UnsignedUpdateHandler extends UpdateHandler { // 40 retries!! _get = new EepGet(_context, proxyHost, proxyPort, 40, _updateFile, _zipURL, false); _get.addStatusListener(UnsignedUpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT, -1, INACTIVITY_TIMEOUT); } catch (Throwable t) { _log.error("Error updating", t); } diff --git a/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java index 6ebb2a469d..c7a1f695f6 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java @@ -45,6 +45,10 @@ public class UpdateHandler { static final String PROP_UPDATE_IN_PROGRESS = "net.i2p.router.web.UpdateHandler.updateInProgress"; protected static final String PROP_LAST_UPDATE_TIME = "router.updateLastDownloaded"; + protected static final long CONNECT_TIMEOUT = 55*1000; + protected static final long INACTIVITY_TIMEOUT = 5*60*1000; + protected static final long NOPROXY_INACTIVITY_TIMEOUT = 60*1000; + public UpdateHandler() { this(ContextHelper.getContext(null)); } @@ -193,7 +197,7 @@ public class UpdateHandler { // no retries _get = new PartialEepGet(_context, proxyHost, proxyPort, _baos, updateURL, TrustedUpdate.HEADER_BYTES); _get.addStatusListener(UpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT); } catch (Throwable t) { _isNewer = false; } @@ -210,7 +214,7 @@ public class UpdateHandler { else _get = new EepGet(_context, 1, _updateFile, updateURL, false); _get.addStatusListener(UpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT, -1, shouldProxy ? INACTIVITY_TIMEOUT : NOPROXY_INACTIVITY_TIMEOUT); } catch (Throwable t) { _log.error("Error updating", t); } diff --git a/core/java/src/net/i2p/util/EepGet.java b/core/java/src/net/i2p/util/EepGet.java index cbe4b42e7b..af60551296 100644 --- a/core/java/src/net/i2p/util/EepGet.java +++ b/core/java/src/net/i2p/util/EepGet.java @@ -458,19 +458,32 @@ public class EepGet { } public void stopFetching() { _keepFetching = false; } + /** - * Blocking fetch, returning true if the URL was retrieved, false if all retries failed + * Blocking fetch, returning true if the URL was retrieved, false if all retries failed. * + * Header timeout default 45 sec, total timeout default none, inactivity timeout default 60 sec. */ public boolean fetch() { return fetch(_fetchHeaderTimeout); } + /** * Blocking fetch, timing out individual attempts if the HTTP response headers * don't come back in the time given. If the timeout is zero or less, this will * wait indefinitely. + * + * Total timeout default none, inactivity timeout default 60 sec. */ public boolean fetch(long fetchHeaderTimeout) { return fetch(fetchHeaderTimeout, -1, -1); } + + /** + * Blocking fetch. + * + * @param fetchHeaderTimeout <= 0 for none (proxy will timeout if none, none isn't recommended if no proxy) + * @param totalTimeout <= 0 for default none + * @param inactivityTimeout <= 0 for default 60 sec + */ public boolean fetch(long fetchHeaderTimeout, long totalTimeout, long inactivityTimeout) { _fetchHeaderTimeout = fetchHeaderTimeout; _fetchEndTime = (totalTimeout > 0 ? System.currentTimeMillis() + totalTimeout : -1); diff --git a/history.txt b/history.txt index a06d638513..1456fa838f 100644 --- a/history.txt +++ b/history.txt @@ -18,6 +18,7 @@ - Cancel flusher timer in MessageOutputStream when closed - Move some createRateStats to ConnectionManager to reduce repeated calls - Cleanups, javadocs, logging, volatile, finals + * Update: Increase eepget timeouts 2012-06-24 zzz * ElGamalAESEngine: Fix bad size estimate when tags are included, From 63f22a54e1a5dd2bc0c74f7b3eb67765292b11d8 Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 29 Jun 2012 17:21:57 +0000 Subject: [PATCH 16/17] fix unsafe initialization of super constructor calling override --- .../i2p/client/streaming/ConnectionOptions.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index 42a2fc95a6..f71a19d424 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -108,9 +108,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ private static final boolean DEFAULT_ENFORCE_PROTO = false; - // Syncronization fix, but doing it this way causes NPE... - // FIXME private final int _trend[] = new int[TREND_COUNT]; FIXME - private int _trend[]; + private final int _trend[] = new int[TREND_COUNT]; /** * OK, here is the calculation on the message size to fit in a single @@ -224,6 +222,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions() { super(); + cinit(System.getProperties()); } /** @@ -233,6 +232,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions(Properties opts) { super(opts); + cinit(opts); } /** @@ -241,6 +241,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions(I2PSocketOptions opts) { super(opts); + cinit(System.getProperties()); } /** @@ -249,6 +250,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions(ConnectionOptions opts) { super(opts); + cinit(System.getProperties()); if (opts != null) update(opts); } @@ -306,11 +308,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); } - /** called by super's constructor */ - @Override - protected void init(Properties opts) { - super.init(opts); - _trend = new int[TREND_COUNT]; + /** + * Initialization + */ + private void cinit(Properties opts) { setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); From 0d8bcd5dadcf4f484a90ade09e7f059ee5283a5a Mon Sep 17 00:00:00 2001 From: zzz Date: Sun, 1 Jul 2012 16:16:08 +0000 Subject: [PATCH 17/17] * i2psnark: - Don't send a keepalive to a peer we are going to disconnect - Disconnect peer when idle a long time - PeerCheckerTask cleanup - Static ref cleanup - Don't show a downloaded torrent file as "seeding" - Better torrent file download icon (from Silk, same license as the others) --- apps/i2psnark/icons/arrow_down.png | Bin 379 -> 0 bytes apps/i2psnark/icons/basket_put.png | Bin 0 -> 733 bytes .../src/org/klomp/snark/I2PSnarkUtil.java | 3 + .../java/src/org/klomp/snark/MagnetState.java | 5 +- .../src/org/klomp/snark/PeerCheckerTask.java | 52 ++++++++++++------ .../src/org/klomp/snark/PeerCoordinator.java | 37 ++++++------- .../src/org/klomp/snark/web/FetchAndAdd.java | 5 +- .../org/klomp/snark/web/I2PSnarkServlet.java | 4 +- history.txt | 9 +++ .../src/net/i2p/router/RouterVersion.java | 2 +- 10 files changed, 73 insertions(+), 44 deletions(-) delete mode 100644 apps/i2psnark/icons/arrow_down.png create mode 100644 apps/i2psnark/icons/basket_put.png diff --git a/apps/i2psnark/icons/arrow_down.png b/apps/i2psnark/icons/arrow_down.png deleted file mode 100644 index 2c4e279377bf348f9cf53894e76bb673ccf067bd..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 379 zcmV->0fhdEP)RB*?~^j!LKVQ>(O&A{Xr%)RXLn#U zs4LtZ6rCMFY5|B2$)yG$6aaIFk4ls z?I!B3WVC@<#F`aNER-!jYPQ1Y^<$s!$F}~@D5-FE;K%=co^zga&hzoIN~J=Z*|kUe zD`H8cmBJe3ob|WC?;=|ty?Cj+a{UZ=3Z(t?V;sNiq;AO_1TYM zPddZX*@@zRx3KW}R`+afoaTiIvgn)1YHXwU@^eb98>x5uee1xLyIor*7_=9!tCxmo zGWUdLeuQX7ct)>N$@RwUDYCLW_1nuA{yOaG>FGO)qU-}C%d!TvdpsWYaBN%*+0JNH zf|D+{4AaYaqHv^tZ+#>Z$xD*-Nfbqbepg|kVzb#?Nan)CRgFf&qS5H9jvZ$0iPe#l zrwunLkw_eGv+C-@;qZuFuh$ie#UZED=|Ylr7K_C?7-JuExm*hp2*3@}a5nNvz=7{Y zXfyO1vw!#k)K`T>5Q$z;wq}JHQfXLgiiH zVjvLc+jxmspaum5)2}Pq0E;%@*NsD`QmOQ~`VMOS&Rdz^@7Hh*27}U1CMl+92*+qN P00000NkvXXu0mjfP PeerCoordinator.MAX_INACTIVE) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Disconnecting peer idle " + + DataHelper.formatDuration(peer.getInactiveTime()) + ": " + peer); + peer.disconnect(); + continue; + } + if (!peer.isChoking()) uploaders++; @@ -92,14 +104,15 @@ class PeerCheckerTask implements Runnable peer.setRateHistory(upload, download); peer.resetCounters(); - _util.debug(peer + ":", Snark.DEBUG); - _util.debug(" ul: " + upload*1024/KILOPERSECOND + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(peer + ":" + + " ul: " + upload*1024/KILOPERSECOND + " dl: " + download*1024/KILOPERSECOND + " i: " + peer.isInterested() + " I: " + peer.isInteresting() + " c: " + peer.isChoking() - + " C: " + peer.isChoked(), - Snark.DEBUG); + + " C: " + peer.isChoked()); + } // Choke a percentage of them rather than all so it isn't so drastic... // unless this torrent is over the limit all by itself. @@ -120,8 +133,8 @@ class PeerCheckerTask implements Runnable // Check if it still wants pieces from us. if (!peer.isInterested()) { - _util.debug("Choke uninterested peer: " + peer, - Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.debug("Choke uninterested peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -131,8 +144,8 @@ class PeerCheckerTask implements Runnable } else if (overBWLimitChoke) { - _util.debug("BW limit (" + upload + "/" + uploaded + "), choke peer: " + peer, - Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.debug("BW limit (" + upload + "/" + uploaded + "), choke peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -144,7 +157,8 @@ class PeerCheckerTask implements Runnable else if (peer.isInteresting() && peer.isChoked()) { // If they are choking us make someone else a downloader - _util.debug("Choke choking peer: " + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke choking peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -156,7 +170,8 @@ class PeerCheckerTask implements Runnable else if (!peer.isInteresting() && !coordinator.completed()) { // If they aren't interesting make someone else a downloader - _util.debug("Choke uninteresting peer: " + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke uninteresting peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -170,8 +185,8 @@ class PeerCheckerTask implements Runnable && download == 0) { // We are downloading but didn't receive anything... - _util.debug("Choke downloader that doesn't deliver:" - + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke downloader that doesn't deliver: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -198,7 +213,10 @@ class PeerCheckerTask implements Runnable // send PEX if ((_runCount % 17) == 0 && !peer.isCompleted()) coordinator.sendPeers(peer); - peer.keepAlive(); + // cheap failsafe for seeds connected to seeds, stop pinging and hopefully + // the inactive checker (above) will eventually disconnect it + if (coordinator.getNeededLength() > 0 || !peer.isCompleted()) + peer.keepAlive(); // announce them to local tracker (TrackerClient does this too) if (_util.getDHT() != null && (_runCount % 5) == 0) { _util.getDHT().announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash()); @@ -215,8 +233,8 @@ class PeerCheckerTask implements Runnable || uploaders > uploadLimit) && worstDownloader != null) { - _util.debug("Choke worst downloader: " + worstDownloader, - Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke worst downloader: " + worstDownloader); worstDownloader.setChoking(true); coordinator.uploaders--; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index fe924f79c6..8ded86247b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -68,6 +68,7 @@ class PeerCoordinator implements PeerListener // package local for access by CheckDownLoadersTask final static long CHECK_PERIOD = 40*1000; // 40 seconds final static int MAX_UPLOADERS = 6; + public static final long MAX_INACTIVE = 8*60*1000; /** * Approximation of the number of current uploaders. @@ -130,7 +131,7 @@ class PeerCoordinator implements PeerListener private final MagnetState magnetState; private final CoordinatorListener listener; private final I2PSnarkUtil _util; - private static final Random _random = I2PAppContext.getGlobalContext().random(); + private final Random _random; /** * @param metainfo null if in magnet mode @@ -140,6 +141,7 @@ class PeerCoordinator implements PeerListener CoordinatorListener listener, Snark torrent) { _util = util; + _random = util.getContext().random(); this.id = id; this.infohash = infohash; this.metainfo = metainfo; @@ -377,8 +379,10 @@ class PeerCoordinator implements PeerListener } /** - * Reduce max if huge pieces to keep from ooming when leeching - * @return 512K: 16; 1M: 11; 2M: 6 + * Formerly used to + * reduce max if huge pieces to keep from ooming when leeching + * but now we don't + * @return usually 16 */ private int getMaxConnections() { if (metainfo == null) @@ -388,7 +392,7 @@ class PeerCoordinator implements PeerListener return 4; if (pieces <= 5) return 6; - int size = metainfo.getPieceLength(0); + //int size = metainfo.getPieceLength(0); int max = _util.getMaxConnections(); // Now that we use temp files, no memory concern //if (size <= 512*1024 || completed()) @@ -449,7 +453,7 @@ class PeerCoordinator implements PeerListener synchronized(peers) { Peer old = peerIDInList(peer.getPeerID(), peers); - if ( (old != null) && (old.getInactiveTime() > 8*60*1000) ) { + if ( (old != null) && (old.getInactiveTime() > MAX_INACTIVE) ) { // idle for 8 minutes, kill the old con (32KB/8min = 68B/sec minimum for one block) if (_log.shouldLog(Log.WARN)) _log.warn("Remomving old peer: " + peer + ": " + old + ", inactive for " + old.getInactiveTime()); @@ -543,7 +547,7 @@ class PeerCoordinator implements PeerListener need_more = (!peer.isConnected()) && peersize < getMaxConnections(); // Check if we already have this peer before we build the connection Peer old = peerIDInList(peer.getPeerID(), peers); - need_more = need_more && ((old == null) || (old.getInactiveTime() > 8*60*1000)); + need_more = need_more && ((old == null) || (old.getInactiveTime() > MAX_INACTIVE)); } if (need_more) @@ -974,11 +978,8 @@ class PeerCoordinator implements PeerListener // Announce to the world we have it! // Disconnect from other seeders when we get the last piece - List toDisconnect = new ArrayList(); - Iterator it = peers.iterator(); - while (it.hasNext()) - { - Peer p = it.next(); + List toDisconnect = done ? new ArrayList() : null; + for (Peer p : peers) { if (p.isConnected()) { if (done && p.isCompleted()) @@ -986,15 +987,13 @@ class PeerCoordinator implements PeerListener else p.have(piece); } - } - it = toDisconnect.iterator(); - while (it.hasNext()) - { - Peer p = it.next(); - p.disconnect(true); - } - + } + if (done) { + for (Peer p : toDisconnect) { + p.disconnect(true); + } + // put msg on the console if partial, since Storage won't do it if (!completed()) snark.storageCompleted(storage); diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java b/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java index abf007a534..c5f31d8f52 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java @@ -243,11 +243,12 @@ public class FetchAndAdd extends Snark implements EepGet.StatusListener, Runnabl } /** - * @return torrent file bytes remaining or -1 + * @return -1 when done so the web will list us as "complete" instead of "seeding" */ @Override public long getRemainingLength() { - return _remaining; + long rv = _remaining; + return rv > 0 ? rv : -1; } /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 06c9501d70..3b667da421 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -1073,7 +1073,7 @@ public class I2PSnarkServlet extends DefaultServlet { else if (isValid) icon = toIcon(meta.getName()); else if (snark instanceof FetchAndAdd) - icon = "arrow_down"; + icon = "basket_put"; else icon = "magnet"; if (isValid) { @@ -1104,7 +1104,7 @@ public class I2PSnarkServlet extends DefaultServlet { out.write(""); if(isRunning && remainingSeconds > 0) - out.write(DataHelper.formatDuration2(remainingSeconds*1000)); // (eta 6h) + out.write(DataHelper.formatDuration2(Math.max(remainingSeconds, 10) * 1000)); // (eta 6h) out.write("\n\t"); out.write(""); if (remaining > 0) diff --git a/history.txt b/history.txt index 1456fa838f..9468144ae2 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,12 @@ +2012-07-01 zzz + * i2psnark: + - Don't send a keepalive to a peer we are going to disconnect + - Disconnect peer when idle a long time + - PeerCheckerTask cleanup + - Static ref cleanup + - Don't show a downloaded torrent file as "seeding" + - Better torrent file download icon + 2012-06-29 zzz * HTTP Proxy: Change the error code for unknown host from 404 to 500 * SimpleTimer: Fix logging diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f5c9264549..43dca92409 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 20; + public final static long BUILD = 21; /** for example "-test" */ public final static String EXTRA = "";