diff --git a/apps/i2psnark/icons/arrow_down.png b/apps/i2psnark/icons/arrow_down.png deleted file mode 100644 index 2c4e279377..0000000000 Binary files a/apps/i2psnark/icons/arrow_down.png and /dev/null differ diff --git a/apps/i2psnark/icons/basket_put.png b/apps/i2psnark/icons/basket_put.png new file mode 100644 index 0000000000..be62faaaab Binary files /dev/null and b/apps/i2psnark/icons/basket_put.png differ diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 2bf4f2d41d..229132a7fc 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -122,6 +122,9 @@ public class I2PSnarkUtil { } ******/ + /** @since 0.9.1 */ + public I2PAppContext getContext() { return _context; } + public boolean configured() { return _configured; } public void setI2CPConfig(String i2cpHost, int i2cpPort, Map opts) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java b/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java index da89f5577a..c33dc4dffe 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java @@ -5,10 +5,10 @@ import java.io.IOException; import java.io.InputStream; import java.util.HashMap; import java.util.Map; -import java.util.Random; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; +import net.i2p.util.RandomSource; import org.klomp.snark.bencode.BDecoder; import org.klomp.snark.bencode.BEValue; @@ -27,7 +27,6 @@ import org.klomp.snark.bencode.BEValue; */ class MagnetState { public static final int CHUNK_SIZE = 16*1024; - private static final Random random = I2PAppContext.getGlobalContext().random(); private final byte[] infohash; private boolean complete; @@ -129,7 +128,7 @@ class MagnetState { throw new IllegalArgumentException("not initialized"); if (complete) throw new IllegalArgumentException("complete"); - int rand = random.nextInt(totalChunks); + int rand = RandomSource.getInstance().nextInt(totalChunks); for (int i = 0; i < totalChunks; i++) { int chk = (i + rand) % totalChunks; if (!(have.get(chk) || requested.get(chk))) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java index 9efce9f072..ede3be57b7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Random; import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; /** * TimerTask that checks for good/bad up/downloader. Works together @@ -36,16 +38,18 @@ class PeerCheckerTask implements Runnable private final PeerCoordinator coordinator; private final I2PSnarkUtil _util; + private final Log _log; + private final Random random; private int _runCount; PeerCheckerTask(I2PSnarkUtil util, PeerCoordinator coordinator) { _util = util; + _log = util.getContext().logManager().getLog(PeerCheckerTask.class); + random = util.getContext().random(); this.coordinator = coordinator; } - private static final Random random = I2PAppContext.getGlobalContext().random(); - public void run() { _runCount++; @@ -82,6 +86,14 @@ class PeerCheckerTask implements Runnable continue; } + if (peer.getInactiveTime() > PeerCoordinator.MAX_INACTIVE) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Disconnecting peer idle " + + DataHelper.formatDuration(peer.getInactiveTime()) + ": " + peer); + peer.disconnect(); + continue; + } + if (!peer.isChoking()) uploaders++; @@ -92,14 +104,15 @@ class PeerCheckerTask implements Runnable peer.setRateHistory(upload, download); peer.resetCounters(); - _util.debug(peer + ":", Snark.DEBUG); - _util.debug(" ul: " + upload*1024/KILOPERSECOND + if (_log.shouldLog(Log.DEBUG)) { + _log.debug(peer + ":" + + " ul: " + upload*1024/KILOPERSECOND + " dl: " + download*1024/KILOPERSECOND + " i: " + peer.isInterested() + " I: " + peer.isInteresting() + " c: " + peer.isChoking() - + " C: " + peer.isChoked(), - Snark.DEBUG); + + " C: " + peer.isChoked()); + } // Choke a percentage of them rather than all so it isn't so drastic... // unless this torrent is over the limit all by itself. @@ -120,8 +133,8 @@ class PeerCheckerTask implements Runnable // Check if it still wants pieces from us. if (!peer.isInterested()) { - _util.debug("Choke uninterested peer: " + peer, - Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.debug("Choke uninterested peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -131,8 +144,8 @@ class PeerCheckerTask implements Runnable } else if (overBWLimitChoke) { - _util.debug("BW limit (" + upload + "/" + uploaded + "), choke peer: " + peer, - Snark.INFO); + if (_log.shouldLog(Log.INFO)) + _log.debug("BW limit (" + upload + "/" + uploaded + "), choke peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -144,7 +157,8 @@ class PeerCheckerTask implements Runnable else if (peer.isInteresting() && peer.isChoked()) { // If they are choking us make someone else a downloader - _util.debug("Choke choking peer: " + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke choking peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -156,7 +170,8 @@ class PeerCheckerTask implements Runnable else if (!peer.isInteresting() && !coordinator.completed()) { // If they aren't interesting make someone else a downloader - _util.debug("Choke uninteresting peer: " + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke uninteresting peer: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -170,8 +185,8 @@ class PeerCheckerTask implements Runnable && download == 0) { // We are downloading but didn't receive anything... - _util.debug("Choke downloader that doesn't deliver:" - + peer, Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke downloader that doesn't deliver: " + peer); peer.setChoking(true); uploaders--; coordinator.uploaders--; @@ -198,7 +213,10 @@ class PeerCheckerTask implements Runnable // send PEX if ((_runCount % 17) == 0 && !peer.isCompleted()) coordinator.sendPeers(peer); - peer.keepAlive(); + // cheap failsafe for seeds connected to seeds, stop pinging and hopefully + // the inactive checker (above) will eventually disconnect it + if (coordinator.getNeededLength() > 0 || !peer.isCompleted()) + peer.keepAlive(); // announce them to local tracker (TrackerClient does this too) if (_util.getDHT() != null && (_runCount % 5) == 0) { _util.getDHT().announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash()); @@ -215,8 +233,8 @@ class PeerCheckerTask implements Runnable || uploaders > uploadLimit) && worstDownloader != null) { - _util.debug("Choke worst downloader: " + worstDownloader, - Snark.DEBUG); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Choke worst downloader: " + worstDownloader); worstDownloader.setChoking(true); coordinator.uploaders--; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 45b6ef82ac..8ded86247b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -68,6 +68,7 @@ class PeerCoordinator implements PeerListener // package local for access by CheckDownLoadersTask final static long CHECK_PERIOD = 40*1000; // 40 seconds final static int MAX_UPLOADERS = 6; + public static final long MAX_INACTIVE = 8*60*1000; /** * Approximation of the number of current uploaders. @@ -125,12 +126,12 @@ class PeerCoordinator implements PeerListener /** partial pieces - lock by synching on wantedPieces - TODO store Requests, not PartialPieces */ private final List partialPieces; - private boolean halted = false; + private volatile boolean halted; private final MagnetState magnetState; private final CoordinatorListener listener; private final I2PSnarkUtil _util; - private static final Random _random = I2PAppContext.getGlobalContext().random(); + private final Random _random; /** * @param metainfo null if in magnet mode @@ -140,6 +141,7 @@ class PeerCoordinator implements PeerListener CoordinatorListener listener, Snark torrent) { _util = util; + _random = util.getContext().random(); this.id = id; this.infohash = infohash; this.metainfo = metainfo; @@ -377,8 +379,10 @@ class PeerCoordinator implements PeerListener } /** - * Reduce max if huge pieces to keep from ooming when leeching - * @return 512K: 16; 1M: 11; 2M: 6 + * Formerly used to + * reduce max if huge pieces to keep from ooming when leeching + * but now we don't + * @return usually 16 */ private int getMaxConnections() { if (metainfo == null) @@ -388,7 +392,7 @@ class PeerCoordinator implements PeerListener return 4; if (pieces <= 5) return 6; - int size = metainfo.getPieceLength(0); + //int size = metainfo.getPieceLength(0); int max = _util.getMaxConnections(); // Now that we use temp files, no memory concern //if (size <= 512*1024 || completed()) @@ -429,6 +433,14 @@ class PeerCoordinator implements PeerListener } } + /** + * @since 0.9.1 + */ + public void restart() { + halted = false; + timer.schedule((CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD)); + } + public void connected(Peer peer) { if (halted) @@ -441,7 +453,7 @@ class PeerCoordinator implements PeerListener synchronized(peers) { Peer old = peerIDInList(peer.getPeerID(), peers); - if ( (old != null) && (old.getInactiveTime() > 8*60*1000) ) { + if ( (old != null) && (old.getInactiveTime() > MAX_INACTIVE) ) { // idle for 8 minutes, kill the old con (32KB/8min = 68B/sec minimum for one block) if (_log.shouldLog(Log.WARN)) _log.warn("Remomving old peer: " + peer + ": " + old + ", inactive for " + old.getInactiveTime()); @@ -535,7 +547,7 @@ class PeerCoordinator implements PeerListener need_more = (!peer.isConnected()) && peersize < getMaxConnections(); // Check if we already have this peer before we build the connection Peer old = peerIDInList(peer.getPeerID(), peers); - need_more = need_more && ((old == null) || (old.getInactiveTime() > 8*60*1000)); + need_more = need_more && ((old == null) || (old.getInactiveTime() > MAX_INACTIVE)); } if (need_more) @@ -966,11 +978,8 @@ class PeerCoordinator implements PeerListener // Announce to the world we have it! // Disconnect from other seeders when we get the last piece - List toDisconnect = new ArrayList(); - Iterator it = peers.iterator(); - while (it.hasNext()) - { - Peer p = it.next(); + List toDisconnect = done ? new ArrayList() : null; + for (Peer p : peers) { if (p.isConnected()) { if (done && p.isCompleted()) @@ -978,15 +987,13 @@ class PeerCoordinator implements PeerListener else p.have(piece); } - } - it = toDisconnect.iterator(); - while (it.hasNext()) - { - Peer p = it.next(); - p.disconnect(true); - } - + } + if (done) { + for (Peer p : toDisconnect) { + p.disconnect(true); + } + // put msg on the console if partial, since Storage won't do it if (!completed()) snark.storageCompleted(storage); diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index acf5f15ed5..138f2dd2c8 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -553,21 +553,14 @@ public class Snark } stopped = false; - boolean coordinatorChanged = false; if (coordinator.halted()) { - // ok, we have already started and stopped, but the coordinator seems a bit annoying to - // restart safely, so lets build a new one to replace the old + coordinator.restart(); if (_peerCoordinatorSet != null) - _peerCoordinatorSet.remove(coordinator); - PeerCoordinator newCoord = new PeerCoordinator(_util, id, infoHash, meta, storage, this, this); - if (_peerCoordinatorSet != null) - _peerCoordinatorSet.add(newCoord); - coordinator = newCoord; - coordinatorChanged = true; + _peerCoordinatorSet.add(coordinator); } - if (!trackerclient.started() && !coordinatorChanged) { + if (!trackerclient.started()) { trackerclient.start(); - } else if (trackerclient.halted() || coordinatorChanged) { + } else if (trackerclient.halted()) { if (storage != null) { try { storage.reopen(rootDataDir); diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java b/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java index abf007a534..c5f31d8f52 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/FetchAndAdd.java @@ -243,11 +243,12 @@ public class FetchAndAdd extends Snark implements EepGet.StatusListener, Runnabl } /** - * @return torrent file bytes remaining or -1 + * @return -1 when done so the web will list us as "complete" instead of "seeding" */ @Override public long getRemainingLength() { - return _remaining; + long rv = _remaining; + return rv > 0 ? rv : -1; } /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 06c9501d70..3b667da421 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -1073,7 +1073,7 @@ public class I2PSnarkServlet extends DefaultServlet { else if (isValid) icon = toIcon(meta.getName()); else if (snark instanceof FetchAndAdd) - icon = "arrow_down"; + icon = "basket_put"; else icon = "magnet"; if (isValid) { @@ -1104,7 +1104,7 @@ public class I2PSnarkServlet extends DefaultServlet { out.write(""); if(isRunning && remainingSeconds > 0) - out.write(DataHelper.formatDuration2(remainingSeconds*1000)); // (eta 6h) + out.write(DataHelper.formatDuration2(Math.max(remainingSeconds, 10) * 1000)); // (eta 6h) out.write("\n\t"); out.write(""); if (remaining > 0) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java index 1e69e0e182..d9fea97433 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelClientBase.java @@ -511,7 +511,7 @@ public abstract class I2PTunnelClientBase extends I2PTunnelTask implements Runna if (sm == null) return; Properties props = tunnel.getClientOptions(); - sm.setDefaultOptions(sockMgr.buildOptions(props)); + sm.setDefaultOptions(sm.buildOptions(props)); } /** diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java index ce301a48e8..5270cbb6b9 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcInboundFilter.java @@ -79,7 +79,8 @@ public class IrcInboundFilter implements Runnable { outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 output.write(outmsg.getBytes("ISO-8859-1")); // probably doesn't do much but can't hurt - output.flush(); + if (!in.ready()) + output.flush(); } else { if (_log.shouldLog(Log.WARN)) _log.warn("inbound BLOCKED: "+inmsg); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java index 5e8c57d997..5e142081af 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/irc/IrcOutboundFilter.java @@ -79,7 +79,9 @@ public class IrcOutboundFilter implements Runnable { outmsg=outmsg+"\r\n"; // rfc1459 sec. 2.3 output.write(outmsg.getBytes("ISO-8859-1")); // save 250 ms in streaming - output.flush(); + // Check ready() so we don't split the initial handshake up into multiple streaming messages + if (!in.ready()) + output.flush(); } else { if (_log.shouldLog(Log.WARN)) _log.warn("outbound BLOCKED: "+"\""+inmsg+"\""); diff --git a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java index 98ad9f6034..235bf3731b 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateChecker.java @@ -158,7 +158,7 @@ public class PluginUpdateChecker extends UpdateHandler { try { _get = new PartialEepGet(_context, proxyHost, proxyPort, _baos, _xpi2pURL, TrustedUpdate.HEADER_BYTES); _get.addStatusListener(PluginUpdateCheckerRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT); } catch (Throwable t) { _log.error("Error checking update for plugin", t); } diff --git a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java index ce60170361..25514992c2 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/PluginUpdateHandler.java @@ -149,7 +149,7 @@ public class PluginUpdateHandler extends UpdateHandler { else _get = new EepGet(_context, 1, _updateFile, _xpi2pURL, false); _get.addStatusListener(PluginUpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT, -1, shouldProxy ? INACTIVITY_TIMEOUT : NOPROXY_INACTIVITY_TIMEOUT); } catch (Throwable t) { _log.error("Error downloading plugin", t); } diff --git a/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java index f728783758..4725f62b16 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/UnsignedUpdateHandler.java @@ -75,7 +75,7 @@ public class UnsignedUpdateHandler extends UpdateHandler { // 40 retries!! _get = new EepGet(_context, proxyHost, proxyPort, 40, _updateFile, _zipURL, false); _get.addStatusListener(UnsignedUpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT, -1, INACTIVITY_TIMEOUT); } catch (Throwable t) { _log.error("Error updating", t); } diff --git a/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java index 6ebb2a469d..c7a1f695f6 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/UpdateHandler.java @@ -45,6 +45,10 @@ public class UpdateHandler { static final String PROP_UPDATE_IN_PROGRESS = "net.i2p.router.web.UpdateHandler.updateInProgress"; protected static final String PROP_LAST_UPDATE_TIME = "router.updateLastDownloaded"; + protected static final long CONNECT_TIMEOUT = 55*1000; + protected static final long INACTIVITY_TIMEOUT = 5*60*1000; + protected static final long NOPROXY_INACTIVITY_TIMEOUT = 60*1000; + public UpdateHandler() { this(ContextHelper.getContext(null)); } @@ -193,7 +197,7 @@ public class UpdateHandler { // no retries _get = new PartialEepGet(_context, proxyHost, proxyPort, _baos, updateURL, TrustedUpdate.HEADER_BYTES); _get.addStatusListener(UpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT); } catch (Throwable t) { _isNewer = false; } @@ -210,7 +214,7 @@ public class UpdateHandler { else _get = new EepGet(_context, 1, _updateFile, updateURL, false); _get.addStatusListener(UpdateRunner.this); - _get.fetch(); + _get.fetch(CONNECT_TIMEOUT, -1, shouldProxy ? INACTIVITY_TIMEOUT : NOPROXY_INACTIVITY_TIMEOUT); } catch (Throwable t) { _log.error("Error updating", t); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 8db219c607..298a1c21f8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -34,19 +34,19 @@ class Connection { private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; - private boolean _connected; + private volatile boolean _connected; private boolean _hardDisconnected; private final MessageInputStream _inputStream; private final MessageOutputStream _outputStream; private final SchedulerChooser _chooser; - private long _nextSendTime; + private volatile long _nextSendTime; private long _ackedPackets; private final long _createdOn; private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; private long _congestionWindowEnd; - private long _highestAckedThrough; + private volatile long _highestAckedThrough; private boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ @@ -60,11 +60,11 @@ class Connection { private String _connectionError; private long _disconnectScheduledOn; private long _lastReceivedOn; - private ActivityTimer _activityTimer; + private final ActivityTimer _activityTimer; /** window size when we last saw congestion */ private int _lastCongestionSeenAt; private long _lastCongestionTime; - private long _lastCongestionHighestUnacked; + private volatile long _lastCongestionHighestUnacked; private boolean _ackSinceCongestion; /** Notify this on connection (or connection failure) */ private final Object _connectLock; @@ -96,7 +96,9 @@ class Connection { } ****/ - /** */ + /** + * @param opts may be null + */ public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, SimpleTimer2 timer, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { @@ -138,10 +140,7 @@ class Connection { _resetSentOn = -1; _connectionEvent = new ConEvent(); _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage - _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 }); + // all createRateStats in ConnectionManager if (_log.shouldLog(Log.INFO)) _log.info("New connection created with options: " + _options); } @@ -169,7 +168,6 @@ class Connection { * will return false after 5 minutes even if timeoutMs is <= 0. */ boolean packetSendChoke(long timeoutMs) { - // if (false) return true; // <--- what the fuck?? long start = _context.clock().now(); long writeExpire = start + timeoutMs; // only used if timeoutMs > 0 boolean started = false; @@ -187,19 +185,26 @@ class Connection { if (!_connected) return false; started = true; - if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || - (_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) { + // Try to keep things moving even during NACKs and retransmissions... + // Limit unacked packets to the window + // Limit active resends to half the window + // Limit (highest-lowest) to twice the window (if far end doesn't like it, it can send a choke) + int unacked = _outboundPackets.size(); + int wsz = _options.getWindowSize(); + if (unacked >= wsz || + _activeResends >= (wsz + 1) / 2 || + _lastSendId.get() - _highestAckedThrough >= Math.max(MAX_WINDOW_SIZE, 2 * wsz)) { if (timeoutMs > 0) { if (timeLeft <= 0) { if (_log.shouldLog(Log.INFO)) - _log.info("Outbound window is full of " + _outboundPackets.size() - + " with " + _activeResends + " active resends" + _log.info("Outbound window is full " + unacked + + " unacked with " + _activeResends + " active resends" + " and we've waited too long (" + (0-(timeLeft - timeoutMs)) + "ms): " + toString()); return false; } if (_log.shouldLog(Log.DEBUG)) - _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "/" + _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/" + _activeResends + "), waiting " + timeLeft); try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} } else { @@ -223,6 +228,12 @@ class Connection { void ackImmediately() { PacketLocal packet = null; +/*** why would we do this? + was it to force a congestion indication at the other end? + an expensive way to do that... + One big user was via SchedulerClosing to resend a CLOSE packet, + but why do that either... + synchronized (_outboundPackets) { if (!_outboundPackets.isEmpty()) { // ordered, so pick the lowest to retransmit @@ -239,6 +250,7 @@ class Connection { } ResendPacketEvent evt = (ResendPacketEvent)packet.getResendEvent(); if (evt != null) { + // fixme should we set a flag and reschedule instead? or synch? boolean sent = evt.retransmit(false); if (sent) { if (_log.shouldLog(Log.DEBUG)) @@ -251,7 +263,9 @@ class Connection { } } } +***/ // if we don't have anything to retransmit, send a small ack + // this calls sendPacket() below packet = _receiver.send(null, 0, 0); if (_log.shouldLog(Log.DEBUG)) _log.debug("sending new ack: " + packet); @@ -281,11 +295,15 @@ class Connection { reply.setReceiveStreamId(_receiveStreamId); reply.setOptionalFrom(_connectionManager.getSession().getMyDestination()); // this just sends the packet - no retries or whatnot - _outboundQueue.enqueue(reply); + if (_outboundQueue.enqueue(reply)) { + _unackedPacketsReceived = 0; + _lastSendTime = _context.clock().now(); + resetActivityTimer(); + } } /** - * Flush any data that we can + * Flush any data that we can. Non-blocking. */ void sendAvailable() { // this grabs the data, builds a packet, and queues it up via sendPacket @@ -301,7 +319,6 @@ class Connection { if (packet == null) return; setNextSendTime(-1); - _unackedPacketsReceived = 0; if (_options.getRequireFullySigned()) { packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED); @@ -328,8 +345,8 @@ class Connection { (packet.getSequenceNum() % 8 == 0)) { packet.setOptionalDelay(0); packet.setFlag(Packet.FLAG_DELAY_REQUESTED); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Requesting no ack delay for packet " + packet); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Requesting no ack delay for packet " + packet); } else { // This is somewhat of a waste of time, unless the RTT < 4000, // since the other end limits it to getSendAckDelay() @@ -358,10 +375,12 @@ class Connection { // warning, getStatLog() can be null //_context.statManager().getStatLog().addData(Packet.toId(_sendStreamId), "stream.rtt", _options.getRTT(), _options.getWindowSize()); - _lastSendTime = _context.clock().now(); - _outboundQueue.enqueue(packet); - resetActivityTimer(); - + if (_outboundQueue.enqueue(packet)) { + _unackedPacketsReceived = 0; + _lastSendTime = _context.clock().now(); + resetActivityTimer(); + } + /* if (ackOnly) { // ACK only, don't schedule this packet for retries @@ -397,6 +416,7 @@ class Connection { * @return List of packets acked or null */ List ackPackets(long ackThrough, long nacks[]) { + // FIXME synch this part too? if (ackThrough < _highestAckedThrough) { // dupack which won't tell us anything } else { @@ -415,16 +435,17 @@ class Connection { List acked = null; synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { - Long id = iter.next(); - if (id.longValue() <= ackThrough) { + for (Map.Entry e : _outboundPackets.entrySet()) { + long id = e.getKey().longValue(); + if (id <= ackThrough) { boolean nacked = false; if (nacks != null) { // linear search since its probably really tiny for (int i = 0; i < nacks.length; i++) { - if (nacks[i] == id.longValue()) { + if (nacks[i] == id) { nacked = true; - PacketLocal nackedPacket = _outboundPackets.get(id); + PacketLocal nackedPacket = e.getValue(); + // this will do a fast retransmit if appropriate nackedPacket.incrementNACKs(); break; // NACKed } @@ -433,11 +454,27 @@ class Connection { if (!nacked) { // aka ACKed if (acked == null) acked = new ArrayList(1); - PacketLocal ackedPacket = _outboundPackets.get(id); + PacketLocal ackedPacket = e.getValue(); ackedPacket.ackReceived(); acked.add(ackedPacket); } } else { + // TODO + // we do not currently do an "implicit nack" of the packets higher + // than ackThrough, so those will not be fast retransmitted + // we could incrementNACK them here... but we may need to set the fastRettransmit + // threshold back to 3 for that. + // this will do a fast retransmit if appropriate + // This doesn't work because every packet has an ACK in it, so we hit the + // FAST_TRANSMIT threshold in a heartbeat and retransmit everything, + // even with the threshold at 3. (we never set the NO_ACK field in the header) + // Also, we may need to track that we + // have the same ackThrough for 3 or 4 consecutive times. + // See https://secure.wikimedia.org/wikipedia/en/wiki/Fast_retransmit + //if (_log.shouldLog(Log.INFO)) + // _log.info("ACK thru " + ackThrough + " implicitly NACKs " + id); + //PacketLocal nackedPacket = e.getValue(); + //nackedPacket.incrementNACKs(); break; // _outboundPackets is ordered } } @@ -465,31 +502,33 @@ class Connection { return acked; } - private long _occurredTime; - private long _occurredEventCount; + //private long _occurredTime; + //private long _occurredEventCount; + void eventOccurred() { - long now = System.currentTimeMillis(); + //long now = System.currentTimeMillis(); TaskScheduler sched = _chooser.getScheduler(this); - now = now - now % 1000; - if (_occurredTime == now) { - _occurredEventCount++; - } else { - _occurredTime = now; - if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) { - _log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on " - + toString() + ": scheduler = " + sched); - } - _occurredEventCount = 0; - } + //now = now - now % 1000; + //if (_occurredTime == now) { + // _occurredEventCount++; + //} else { + // _occurredTime = now; + // if ( (_occurredEventCount > 1000) && (_log.shouldLog(Log.WARN)) ) { + // _log.warn("More than 1000 events (" + _occurredEventCount + ") in a second on " + // + toString() + ": scheduler = " + sched); + // } + // _occurredEventCount = 0; + //} long before = System.currentTimeMillis(); sched.eventOccurred(this); long elapsed = System.currentTimeMillis() - before; - if ( (elapsed > 1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Took " + elapsed + "ms to pump through " + sched); + // 250 and warn for debugging + if ( (elapsed > 250) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString()); } void resetReceived() { @@ -498,12 +537,8 @@ class Connection { SimpleScheduler.getInstance().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT); } _resetReceived = true; - MessageOutputStream mos = _outputStream; - MessageInputStream mis = _inputStream; - if (mos != null) - mos.streamErrorOccurred(new IOException("Reset received")); - if (mis != null) - mis.streamErrorOccurred(new IOException("Reset received")); + _outputStream.streamErrorOccurred(new IOException("Reset received")); + _inputStream.streamErrorOccurred(new IOException("Reset received")); _connectionError = "Connection reset"; synchronized (_connectLock) { _connectLock.notifyAll(); } } @@ -556,15 +591,10 @@ class Connection { s.destroy2(); _socket = null; } - if (_outputStream != null) - _outputStream.destroy(); - if (_receiver != null) - _receiver.destroy(); - if (_activityTimer != null) - _activityTimer.cancel(); - //_activityTimer = null; - if (_inputStream != null) - _inputStream.streamErrorOccurred(new IOException("disconnected!")); + _outputStream.destroy(); + _receiver.destroy(); + _activityTimer.cancel(); + _inputStream.streamErrorOccurred(new IOException("disconnected!")); if (_disconnectScheduledOn < 0) { _disconnectScheduledOn = _context.clock().now(); @@ -656,11 +686,7 @@ class Connection { * @return Last time we sent data */ public long getLastSendTime() { return _lastSendTime; } - /** Set the time we sent data. - * @param when The time we sent data - */ - public void setLastSendTime(long when) { _lastSendTime = when; } - + /** What was the last packet Id sent to the peer? * @return The last sent packet ID */ @@ -795,10 +821,9 @@ class Connection { public long getCongestionWindowEnd() { return _congestionWindowEnd; } public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; } + /** @return the highest outbound packet we have recieved an ack for */ public long getHighestAckedThrough() { return _highestAckedThrough; } - /** @deprecated unused */ - public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } public long getLastActivityOn() { return (_lastSendTime > _lastReceivedOn ? _lastSendTime : _lastReceivedOn); @@ -878,17 +903,12 @@ class Connection { } private void resetActivityTimer() { - if (_options.getInactivityTimeout() <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); - return; - } - if (_activityTimer == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); - return; - } long howLong = _options.getInactivityTimeout(); + if (howLong <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resetting the inactivity timer, but its gone!", new Exception("where did it go?")); + return; + } howLong += _randomWait; // randomize it a bit, so both sides don't do it at once //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Resetting the inactivity timer to " + howLong); @@ -983,12 +1003,12 @@ class Connection { } /** stream that the local peer receives data on - * @return the inbound message stream + * @return the inbound message stream, non-null */ public MessageInputStream getInputStream() { return _inputStream; } /** stream that the local peer sends data to the remote peer on - * @return the outbound message stream + * @return the outbound message stream, non-null */ public MessageOutputStream getOutputStream() { return _outputStream; } @@ -1032,12 +1052,10 @@ class Connection { */ buf.append("unacked in: ").append(getUnackedPacketsReceived()); int missing = 0; - if (_inputStream != null) { - long nacks[] = _inputStream.getNacks(); - if (nacks != null) { - missing = nacks.length; - buf.append(" [").append(missing).append(" missing]"); - } + long nacks[] = _inputStream.getNacks(); + if (nacks != null) { + missing = nacks.length; + buf.append(" [").append(missing).append(" missing]"); } if (getResetSent()) @@ -1053,8 +1071,7 @@ class Connection { if (getCloseReceivedOn() > 0) buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago"); buf.append(" sent: ").append(1 + _lastSendId.get()); - if (_inputStream != null) - buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); + buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); buf.append(" maxWin ").append(getOptions().getMaxWindowSize()); buf.append(" MTU ").append(getOptions().getMaxMessageSize()); @@ -1086,14 +1103,15 @@ class Connection { * there are other packets in flight. 3 takes forever, let's try 2. * */ - static final int FAST_RETRANSMIT_THRESHOLD = 2; + static final int FAST_RETRANSMIT_THRESHOLD = 3; /** * Coordinate the resends of a given packet */ class ResendPacketEvent extends SimpleTimer2.TimedEvent { - private PacketLocal _packet; + private final PacketLocal _packet; private long _nextSendTime; + public ResendPacketEvent(PacketLocal packet, long delay) { super(_timer); _packet = packet; @@ -1111,6 +1129,8 @@ class Connection { * we have to use forceReschedule() instead of schedule() below, * to prevent duplicates in the timer queue. * + * don't synchronize this, deadlock with ackPackets->ackReceived->SimpleTimer2.cancel + * * @param penalize true if this retransmission is caused by a timeout, false if we * are just sending this packet instead of an ACK * @return true if the packet was sent, false if it was not @@ -1131,7 +1151,12 @@ class Connection { boolean resend = false; boolean isLowest = false; synchronized (_outboundPackets) { - if (_packet.getSequenceNum() == _highestAckedThrough + 1) + // allow appx. half the window to be "lowest" and be active resends, minimum of 3 + // Note: we should really pick the N lowest, not the lowest one + N more who + // happen to get here next, as the timers get out-of-order esp. after fast retx + if (_packet.getSequenceNum() == _highestAckedThrough + 1 || + _packet.getNumSends() > 1 || + _activeResends < Math.max(3, (_options.getWindowSize() + 1) / 2)) isLowest = true; if (_outboundPackets.containsKey(Long.valueOf(_packet.getSequenceNum()))) resend = true; @@ -1145,24 +1170,28 @@ class Connection { // BUG? seq# = 0, activeResends = 0, loop forever - why? // also seen with seq# > 0. Is the _activeResends count reliable? if (_log.shouldLog(Log.INFO)) - _log.info("Delaying resend of " + _packet + " as there are " - + _activeResends + " active resends already in play"); - forceReschedule(1000); - _nextSendTime = 1000 + _context.clock().now(); + _log.info("Delaying resend of " + _packet + " with " + + _activeResends + " active resend, " + + _outboundPackets.size() + " unacked, window size = " + _options.getWindowSize()); + forceReschedule(1333); + _nextSendTime = 1333 + _context.clock().now(); return false; } + // It's the lowest, or it's fast retransmit time. Resend the packet. + if (fastRetransmit) _context.statManager().addRateData("stream.fastRetransmit", _packet.getLifetime(), _packet.getLifetime()); // revamp various fields, in case we need to ack more, etc - _inputStream.updateAcks(_packet); + // updateAcks done in enqueue() + //_inputStream.updateAcks(_packet); int choke = getOptions().getChoke(); _packet.setOptionalDelay(choke); if (choke > 0) _packet.setFlag(Packet.FLAG_DELAY_REQUESTED); // this seems unnecessary to send the MSS again: - _packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); + //_packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); // bugfix release 0.7.8, we weren't dividing by 1000 _packet.setResendDelay(getOptions().getResendDelay() / 1000); if (_packet.getReceiveStreamId() <= 0) @@ -1186,7 +1215,7 @@ class Connection { getOptions().setWindowSize(newWindowSize); if (_log.shouldLog(Log.WARN)) - _log.warn("Congestion resending packet " + _packet.getSequenceNum() + ": new windowSize " + newWindowSize + _log.warn("Congestion, resending packet " + _packet.getSequenceNum() + " (new windowSize " + newWindowSize + "/" + getOptions().getWindowSize() + ") for " + Connection.this.toString()); windowAdjusted(); @@ -1195,10 +1224,6 @@ class Connection { int numSends = _packet.getNumSends() + 1; - if (numSends == 2) { - // first resend for this packet - _activeResends++; - } // in case things really suck, the other side may have lost thier // session tags (e.g. they restarted), so jump back to ElGamal. @@ -1225,27 +1250,34 @@ class Connection { // set this before enqueue() as it passes it on to the router _nextSendTime = timeout + _context.clock().now(); - if (_log.shouldLog(Log.INFO)) - _log.info("Resend packet " + _packet + " time " + numSends + + if (_outboundQueue.enqueue(_packet)) { + // first resend for this packet ? + if (numSends == 2) + _activeResends++; + if (_log.shouldLog(Log.INFO)) + _log.info("Resent packet " + + (fastRetransmit ? "(fast) " : "(timeout) ") + + _packet + + " next resend in " + timeout + "ms" + " activeResends: " + _activeResends + " (wsize " + newWindowSize + " lifetime " + (_context.clock().now() - _packet.getCreatedOn()) + "ms)"); - _outboundQueue.enqueue(_packet); - _lastSendTime = _context.clock().now(); + _unackedPacketsReceived = 0; + _lastSendTime = _context.clock().now(); + // timer reset added 0.9.1 + resetActivityTimer(); + } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling resend in " + timeout + "ms for " + _packet); forceReschedule(timeout); } - // acked during resending (... or somethin') + // acked during resending (... or somethin') ???????????? if ( (_packet.getAckTime() > 0) && (_packet.getNumSends() > 1) ) { _activeResends--; synchronized (_outboundPackets) { _outboundPackets.notifyAll(); } - return true; } return true; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 900fb96267..8f2a40f469 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -10,7 +10,12 @@ import net.i2p.util.Log; * do NOT block, but they also do not necessary imply immediate * delivery, or even the generation of a new packet. This class * is the only one that builds useful outbound Packet objects. - * + *

+ * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession + *

+ * There's one of these per MessageOutputStream. + * It stores no state. It sends everything to the Connection unless + * the Connection is closed, */ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { private final I2PAppContext _context; @@ -82,7 +87,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { if (_log.shouldLog(Log.INFO) && !doSend) _log.info("writeData called: size="+size + " doSend=" + doSend + " unackedReceived: " + con.getUnackedPacketsReceived() - + " con: " + con, new Exception("write called by")); + + " con: " + con /* , new Exception("write called by") */ ); if (doSend) { PacketLocal packet = send(buf, off, size); @@ -111,6 +116,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { public PacketLocal send(byte buf[], int off, int size) { return send(buf, off, size, false); } + /** * @param buf data to be sent - may be null * @param off offset into the buffer to start writing from @@ -120,22 +126,20 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return the packet sent */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { - Connection con = _connection; - //if (con == null) return null; - long before = System.currentTimeMillis(); - PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); - long built = System.currentTimeMillis(); - con.sendPacket(packet); - long sent = System.currentTimeMillis(); + //long before = System.currentTimeMillis(); + PacketLocal packet = buildPacket(buf, off, size, forceIncrement); + //long built = System.currentTimeMillis(); + _connection.sendPacket(packet); + //long sent = System.currentTimeMillis(); - if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet); - if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) ) - _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet); + //if ( (built-before > 5*1000) && (_log.shouldLog(Log.WARN)) ) + // _log.warn("wtf, took " + (built-before) + "ms to build a packet: " + packet); + //if ( (sent-built> 5*1000) && (_log.shouldLog(Log.WARN)) ) + // _log.warn("wtf, took " + (sent-built) + "ms to send a packet: " + packet); return packet; } - private boolean isAckOnly(Connection con, int size) { + private static boolean isAckOnly(Connection con, int size) { boolean ackOnly = ( (size <= 0) && // no data (con.getLastSendId() >= 0) && // not a SYN ( (!con.getOutputStream().getClosed()) || // not a CLOSE @@ -144,7 +148,16 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { return ackOnly; } - private PacketLocal buildPacket(Connection con, byte buf[], int off, int size, boolean forceIncrement) { + /** + * @param buf data to be sent - may be null + * @param off offset into the buffer to start writing from + * @param size how many bytes of the buffer to write (may be 0) + * @param forceIncrement even if the buffer is empty, increment the packetId + * so we get an ACK back + * @return the packet to be sent + */ + private PacketLocal buildPacket(byte buf[], int off, int size, boolean forceIncrement) { + Connection con = _connection; if (size > Packet.MAX_PAYLOAD_SIZE) throw new IllegalArgumentException("size is too large (" + size + ")"); boolean ackOnly = isAckOnly(con, size); boolean isFirst = (con.getAckedPackets() <= 0) && (con.getUnackedPacketsSent() <= 0); @@ -164,7 +177,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setSendStreamId(con.getSendStreamId()); packet.setReceiveStreamId(con.getReceiveStreamId()); - con.getInputStream().updateAcks(packet); + // not needed here, handled in PacketQueue.enqueue() + //con.getInputStream().updateAcks(packet); // note that the optional delay is usually rewritten in Connection.sendPacket() int choke = con.getOptions().getChoke(); packet.setOptionalDelay(choke); @@ -195,6 +209,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { // don't set the closed flag if this is a plain ACK and there are outstanding // packets sent, otherwise the other side could receive the CLOSE prematurely, // since this ACK could arrive before the unacked payload message. + // TODO if the only unacked packet is the CLOSE packet and it didn't have any data... if (con.getOutputStream().getClosed() && ( (size > 0) || (con.getUnackedPacketsSent() <= 0) || (packet.getSequenceNum() > 0) ) ) { packet.setFlag(Packet.FLAG_CLOSE); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index b2d9a201ac..739a420177 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -75,6 +75,7 @@ class ConnectionManager { /** Socket timeout for accept() */ _soTimeout = -1; + // Stats for this class _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -85,6 +86,14 @@ class ConnectionManager { _context.statManager().createRateStat("stream.con.lifetimeCongestionSeenAt", "When was the last congestion seen at when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeSendWindowSize", "What is the final send window size when a stream closes?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); + // Stats for Connection + _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.chokeSizeEnd", "How many messages were outstanding when we stopped being choked?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.fastRetransmit", "How long a packet has been around for if it has been resent per the fast retransmit timer?", "Stream", new long[] { 60*1000, 10*60*1000 }); + // Stats for PacketQueue + _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } Connection getConnectionByInboundId(long id) { @@ -420,13 +429,11 @@ class ConnectionManager { if (removed) { _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime()); MessageInputStream stream = con.getInputStream(); - if (stream != null) { long rcvd = 1 + stream.getHighestBlockId(); long nacks[] = stream.getNacks(); if (nacks != null) rcvd -= nacks.length; _context.statManager().addRateData("stream.con.lifetimeMessagesReceived", rcvd, con.getLifetime()); - } _context.statManager().addRateData("stream.con.lifetimeBytesSent", con.getLifetimeBytesSent(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeBytesReceived", con.getLifetimeBytesReceived(), con.getLifetime()); _context.statManager().addRateData("stream.con.lifetimeDupMessagesSent", con.getLifetimeDupMessagesSent(), con.getLifetime()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index ff3c545dfb..f71a19d424 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -98,15 +98,17 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final int DEFAULT_INITIAL_ACK_DELAY = 2*1000; static final int MIN_WINDOW_SIZE = 1; private static final boolean DEFAULT_ANSWER_PINGS = true; + private static final int DEFAULT_INACTIVITY_TIMEOUT = 90*1000; + private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND; + + /** * If PROTO is enforced, we cannot communicate with destinations earlier than version 0.7.1. * @since 0.9.1 */ private static final boolean DEFAULT_ENFORCE_PROTO = false; - // Syncronization fix, but doing it this way causes NPE... - // FIXME private final int _trend[] = new int[TREND_COUNT]; FIXME - private int _trend[]; + private final int _trend[] = new int[TREND_COUNT]; /** * OK, here is the calculation on the message size to fit in a single @@ -220,6 +222,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions() { super(); + cinit(System.getProperties()); } /** @@ -229,6 +232,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions(Properties opts) { super(opts); + cinit(opts); } /** @@ -237,6 +241,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions(I2PSocketOptions opts) { super(opts); + cinit(System.getProperties()); } /** @@ -245,6 +250,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { */ public ConnectionOptions(ConnectionOptions opts) { super(opts); + cinit(System.getProperties()); if (opts != null) update(opts); } @@ -302,10 +308,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = opts.getMaxTotalConnsPerDay(); } - @Override - protected void init(Properties opts) { - super.init(opts); - _trend = new int[TREND_COUNT]; + /** + * Initialization + */ + private void cinit(Properties opts) { setMaxWindowSize(getInt(opts, PROP_MAX_WINDOW_SIZE, Connection.MAX_WINDOW_SIZE)); setConnectDelay(getInt(opts, PROP_CONNECT_DELAY, -1)); setProfile(getInt(opts, PROP_PROFILE, PROFILE_BULK)); @@ -318,8 +324,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); // handled in super() //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); - setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); - setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT)); + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); @@ -367,9 +373,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { //if (opts.containsKey(PROP_WRITE_TIMEOUT)) // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) - setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); + setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) - setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); + setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR)) setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 53ebb17e1b..720c38fdf6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -12,7 +12,13 @@ import net.i2p.util.SimpleTimer; /** * Receive a packet for a particular connection - placing the data onto the * queue, marking packets as acked, updating various fields, etc. - * + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream + *

+ * One of these is instantiated per-Destination + * (i.e. per-ConnectionManager, not per-Connection). + * It doesn't store any state. + */ class ConnectionPacketHandler { private final I2PAppContext _context; @@ -94,19 +100,24 @@ class ConnectionPacketHandler { } } - long ready = con.getInputStream().getHighestReadyBockId(); - int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); - int allowedBlocks = available/con.getOptions().getMaxMessageSize(); - if ( (packet.getPayloadSize() > 0) && (packet.getSequenceNum() > ready + allowedBlocks) ) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Inbound buffer exceeded on connection " + con + " (" - + ready + "/"+ (ready+allowedBlocks) + "/" + available - + ": dropping " + packet); - ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke); - con.getOptions().setChoke(61*1000); - packet.releasePayload(); - con.ackImmediately(); - return; + if (packet.getPayloadSize() > 0) { + // Here, for the purposes of calculating whether the input stream is full, + // we assume all the not-ready blocks are the max message size. + // This prevents us from getting DoSed by accepting unlimited out-of-order small messages + long ready = con.getInputStream().getHighestReadyBockId(); + int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); + int allowedBlocks = available/con.getOptions().getMaxMessageSize(); + if (packet.getSequenceNum() > ready + allowedBlocks) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Inbound buffer exceeded on connection " + con + " (" + + ready + "/"+ (ready+allowedBlocks) + "/" + available + + ": dropping " + packet); + ack(con, packet.getAckThrough(), packet.getNacks(), null, false, choke); + con.getOptions().setChoke(61*1000); + packet.releasePayload(); + con.ackImmediately(); + return; + } } con.getOptions().setChoke(0); @@ -513,12 +524,14 @@ class ConnectionPacketHandler { } private class AckDup implements SimpleTimer.TimedEvent { - private long _created; - private Connection _con; + private final long _created; + private final Connection _con; + public AckDup(Connection con) { _created = _context.clock().now(); _con = con; } + public void timeReached() { if (_con.getLastSendTime() <= _created) { if (_con.getResetReceived() || _con.getResetSent()) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index d9ca691b4d..4aba07d314 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -13,7 +13,8 @@ import net.i2p.util.Log; /** * Receive raw information from the I2PSession and turn it into * Packets, if we can. - * + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream */ class MessageHandler implements I2PSessionMuxedListener { private final ConnectionManager _manager; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 3dc510fd52..eb168d1106 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -16,6 +16,11 @@ import net.i2p.util.Log; /** * Stream that can be given messages out of order * yet present them in order. + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream + *

+ * This buffers unlimited data via messageReceived() - + * limiting / blocking is done in ConnectionPacketHandler.receivePacket(). * */ class MessageInputStream extends InputStream { @@ -113,6 +118,9 @@ class MessageInputStream extends InputStream { } } + /** + * Adds the ack-through and nack fields to a packet we are building for transmission + */ public void updateAcks(PacketLocal packet) { synchronized (_dataLock) { packet.setAckThrough(_highestBlockId); @@ -126,6 +134,7 @@ class MessageInputStream extends InputStream { * * @return block IDs greater than the highest ready block ID, or null if there aren't any. */ +/*** public long[] getOutOfOrderBlocks() { long blocks[] = null; synchronized (_dataLock) { @@ -140,15 +149,18 @@ class MessageInputStream extends InputStream { Arrays.sort(blocks); return blocks; } +***/ /** how many blocks have we received that we still have holes before? * @return Count of blocks received that still have holes */ +/*** public int getOutOfOrderBlockCount() { synchronized (_dataLock) { return _notYetReadyBlocks.size(); } } +***/ /** * how long a read() call should block (if less than 0, block indefinitely, @@ -157,8 +169,8 @@ class MessageInputStream extends InputStream { */ public int getReadTimeout() { return _readTimeout; } public void setReadTimeout(int timeout) { - if (_log.shouldLog(Log.INFO)) - _log.info("Changing read timeout from " + _readTimeout + " to " + timeout); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Changing read timeout from " + _readTimeout + " to " + timeout); _readTimeout = timeout; } @@ -205,9 +217,9 @@ class MessageInputStream extends InputStream { * @return true if this is a new packet, false if it is a dup */ public boolean messageReceived(long messageId, ByteArray payload) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); synchronized (_dataLock) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("received " + messageId + " with " + (payload != null ? payload.getValid()+"" : "no payload")); if (messageId <= _highestReadyBlockId) { if (_log.shouldLog(Log.DEBUG)) _log.debug("ignoring dup message " + messageId); @@ -237,7 +249,6 @@ class MessageInputStream extends InputStream { cur++; _highestReadyBlockId++; } - _dataLock.notifyAll(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("message is out of order: " + messageId); @@ -245,8 +256,8 @@ class MessageInputStream extends InputStream { _notYetReadyBlocks.put(Long.valueOf(messageId), new ByteArray(null)); else _notYetReadyBlocks.put(Long.valueOf(messageId), payload); - _dataLock.notifyAll(); } + _dataLock.notifyAll(); } return true; } @@ -278,7 +289,7 @@ class MessageInputStream extends InputStream { while (_readyDataBlocks.isEmpty()) { if (_locallyClosed) - throw new IOException("Already closed, you wanker"); + throw new IOException("Already closed"); if ( (_notYetReadyBlocks.isEmpty()) && (_closeReceived) ) { if (_log.shouldLog(Log.INFO)) @@ -360,7 +371,7 @@ class MessageInputStream extends InputStream { @Override public int available() throws IOException { - if (_locallyClosed) throw new IOException("Already closed, you wanker"); + if (_locallyClosed) throw new IOException("Already closed"); throwAnyError(); int numBytes = 0; synchronized (_dataLock) { @@ -373,7 +384,7 @@ class MessageInputStream extends InputStream { } } if (_log.shouldLog(Log.DEBUG)) - _log.debug("available(): " + numBytes + " " + toString()); + _log.debug("available(): " + numBytes); return numBytes; } @@ -384,6 +395,7 @@ class MessageInputStream extends InputStream { * * @return Count of bytes waiting to be read */ +/*** public int getTotalQueuedSize() { synchronized (_dataLock) { if (_locallyClosed) return 0; @@ -401,7 +413,11 @@ class MessageInputStream extends InputStream { return numBytes; } } +***/ + /** + * Same as available() but doesn't throw IOE + */ public int getTotalReadySize() { synchronized (_dataLock) { if (_locallyClosed) return 0; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 96a3ebe557..1470bd5025 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -14,6 +14,8 @@ import net.i2p.util.SimpleTimer2; * A stream that we can shove data into that fires off those bytes * on flush or when the buffer is full. It also blocks according * to the data receiver's needs. + *

+ * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession */ class MessageOutputStream extends OutputStream { private final I2PAppContext _context; @@ -21,17 +23,17 @@ class MessageOutputStream extends OutputStream { private byte _buf[]; private int _valid; private final Object _dataLock; - private DataReceiver _dataReceiver; + private final DataReceiver _dataReceiver; private IOException _streamError; - private boolean _closed; + private volatile boolean _closed; private long _written; private int _writeTimeout; private ByteCache _dataCache; private final Flusher _flusher; private long _lastFlushed; - private long _lastBuffered; + private volatile long _lastBuffered; /** if we enqueue data but don't flush it in this period, flush it passively */ - private int _passiveFlushDelay; + private final int _passiveFlushDelay; /** * if we are changing the buffer size during operation, set this to the new * buffer size, and next time we are flushing, update the _buf array to the new @@ -39,9 +41,9 @@ class MessageOutputStream extends OutputStream { */ private volatile int _nextBufferSize; // rate calc helpers - private long _sendPeriodBeginTime; - private long _sendPeriodBytes; - private int _sendBps; + //private long _sendPeriodBeginTime; + //private long _sendPeriodBytes; + //private int _sendBps; /** * Since this is less than i2ptunnel's i2p.streaming.connectDelay default of 1000, @@ -73,16 +75,16 @@ class MessageOutputStream extends OutputStream { _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; - _sendPeriodBeginTime = ctx.clock().now(); - _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); + //_sendPeriodBeginTime = ctx.clock().now(); + //_context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(timer); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("MessageOutputStream created"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("MessageOutputStream created"); } public void setWriteTimeout(int ms) { - if (_log.shouldLog(Log.INFO)) - _log.info("Changing write timeout from " + _writeTimeout + " to " + ms); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Changing write timeout from " + _writeTimeout + " to " + ms); _writeTimeout = ms; } @@ -131,15 +133,9 @@ class MessageOutputStream extends OutputStream { remaining -= toWrite; cur += toWrite; _valid = _buf.length; - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; - if (rcvr == null) { - throwAnyError(); - return; - } if (_log.shouldLog(Log.INFO)) _log.info("write() direct valid = " + _valid); - ws = rcvr.writeData(_buf, 0, _valid); + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; throwAnyError(); @@ -167,17 +163,18 @@ class MessageOutputStream extends OutputStream { _log.info("After waitForAccept of " + ws); } } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Queued " + len + " without sending to the receiver"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Queued " + len + " without sending to the receiver"); } } long elapsed = _context.clock().now() - begin; if ( (elapsed > 10*1000) && (_log.shouldLog(Log.INFO)) ) _log.info("wtf, took " + elapsed + "ms to write to the stream?", new Exception("foo")); throwAnyError(); - updateBps(len); + //updateBps(len); } +/**** private void updateBps(int len) { long now = _context.clock().now(); int periods = (int)Math.floor((now - _sendPeriodBeginTime) / 1000d); @@ -191,7 +188,9 @@ class MessageOutputStream extends OutputStream { _sendPeriodBytes += len; } } +****/ + /** */ public void write(int b) throws IOException { write(new byte[] { (byte)b }, 0, 1); throwAnyError(); @@ -240,14 +239,15 @@ class MessageOutputStream extends OutputStream { _enqueued = true; } public void timeReached() { + if (_closed) + return; _enqueued = false; - DataReceiver rec = _dataReceiver; long timeLeft = (_lastBuffered + _passiveFlushDelay - _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) _log.debug("flusher time reached: left = " + timeLeft); if (timeLeft > 0) enqueue(); - else if ( (rec != null) && (rec.writeInProcess()) ) + else if (_dataReceiver.writeInProcess()) enqueue(); // don't passive flush if there is a write being done (unacked outbound) else doFlush(); @@ -261,10 +261,8 @@ class MessageOutputStream extends OutputStream { if ( (_valid > 0) && (flushTime <= _context.clock().now()) ) { if (_log.shouldLog(Log.INFO)) _log.info("doFlush() valid = " + _valid); - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; - if ( (_buf != null) && (rcvr != null) ) { - ws = rcvr.writeData(_buf, 0, _valid); + if (_buf != null) { + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; _lastFlushed = _context.clock().now(); @@ -317,25 +315,18 @@ class MessageOutputStream extends OutputStream { if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("flush() valid = " + _valid); - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; synchronized (_dataLock) { if (_buf == null) { _dataLock.notifyAll(); throw new IOException("closed (buffer went away)"); } - if (rcvr == null) { - _dataLock.notifyAll(); - throwAnyError(); - return; - } // if valid == 0 return ??? - no, this could flush a CLOSE packet too. // Yes, flush here, inside the data lock, and do all the waitForCompletion() stuff below // (disabled) if (!wait_for_accept_only) { - ws = rcvr.writeData(_buf, 0, _valid); + ws = _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; locked_updateBufferSize(); @@ -347,7 +338,7 @@ class MessageOutputStream extends OutputStream { // Skip all the waitForCompletion() stuff below, which is insanity, as of 0.8.1 // must do this outside the data lock if (wait_for_accept_only) { - flushAvailable(rcvr, true); + flushAvailable(_dataReceiver, true); return; } @@ -387,6 +378,7 @@ class MessageOutputStream extends OutputStream { } // setting _closed before flush() will force flush() to send a CLOSE packet _closed = true; + _flusher.cancel(); // In 0.8.1 we rewrote flush() to only wait for accept into the window, // not "completion" (i.e. ack from the far end). @@ -415,10 +407,11 @@ class MessageOutputStream extends OutputStream { /** * nonblocking close - - * Use outside of this package is deprecated, should be made package local + * Only for use inside package */ public void closeInternal() { _closed = true; + _flusher.cancel(); if (_streamError == null) _streamError = new IOException("Closed internally"); clearData(true); @@ -429,12 +422,10 @@ class MessageOutputStream extends OutputStream { if (_log.shouldLog(Log.INFO) && _valid > 0) _log.info("clearData() valid = " + _valid); - // avoid NPE from race with destroy() - DataReceiver rcvr = _dataReceiver; synchronized (_dataLock) { // flush any data, but don't wait for it - if ( (rcvr != null) && (_valid > 0) && shouldFlush) - rcvr.writeData(_buf, 0, _valid); + if (_valid > 0 && shouldFlush) + _dataReceiver.writeData(_buf, 0, _valid); _written += _valid; _valid = 0; @@ -503,15 +494,15 @@ class MessageOutputStream extends OutputStream { throw new InterruptedIOException("Flush available timed out (" + _writeTimeout + "ms)"); } long afterAccept = System.currentTimeMillis(); - if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.DEBUG)) ) - _log.debug("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws); + if ( (afterAccept - afterBuild > 1000) && (_log.shouldLog(Log.INFO)) ) + _log.info("Took " + (afterAccept-afterBuild) + "ms to accept a packet? " + ws); return; } void destroy() { - _dataReceiver = null; + _closed = true; + _flusher.cancel(); synchronized (_dataLock) { - _closed = true; _dataLock.notifyAll(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index bb3e723b65..a79f67cb75 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -395,6 +395,7 @@ class Packet { DataHelper.toLong(buffer, cur, 4, _ackThrough > 0 ? _ackThrough : 0); cur += 4; if (_nacks != null) { + // if max win is ever > 255, limit to 255 DataHelper.toLong(buffer, cur, 1, _nacks.length); cur++; for (int i = 0; i < _nacks.length; i++) { @@ -461,7 +462,7 @@ class Packet { * @return How large the current packet would be * @throws IllegalStateException */ - public int writtenSize() throws IllegalStateException { + private int writtenSize() { int size = 0; size += 4; // _sendStreamId.length; size += 4; // _receiveStreamId.length; @@ -469,6 +470,7 @@ class Packet { size += 4; // ackThrough if (_nacks != null) { size++; // nacks length + // if max win is ever > 255, limit to 255 size += 4 * _nacks.length; } else { size++; // nacks length @@ -671,10 +673,11 @@ class Packet { buf.append(toId(_sendStreamId)); //buf.append("<-->"); buf.append(toId(_receiveStreamId)).append(": #").append(_sequenceNum); - if (_sequenceNum < 10) - buf.append(" \t"); // so the tab lines up right - else - buf.append('\t'); + //if (_sequenceNum < 10) + // buf.append(" \t"); // so the tab lines up right + //else + // buf.append('\t'); + buf.append(' '); buf.append(toFlagString()); buf.append(" ACK ").append(getAckThrough()); if (_nacks != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index ef145179c3..34a59475e6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -2,7 +2,6 @@ package net.i2p.client.streaming; import java.text.SimpleDateFormat; import java.util.Date; -import java.util.Iterator; import java.util.Set; import net.i2p.I2PAppContext; @@ -13,7 +12,8 @@ import net.i2p.util.Log; /** * receive a packet and dispatch it correctly to the connection specified, * the server socket, or queue a reply RST packet. - * + *

+ * I2PSession -> MessageHandler -> PacketHandler -> ConnectionPacketHandler -> MessageInputStream */ class PacketHandler { private final ConnectionManager _manager; @@ -86,6 +86,7 @@ class PacketHandler { } *****/ + /** */ void receivePacket(Packet packet) { //boolean ok = choke(packet); //if (ok) @@ -202,15 +203,13 @@ class PacketHandler { // someone is sending us a packet on the wrong stream // It isn't a SYN so it isn't likely to have a FROM to send a reset back to if (_log.shouldLog(Log.ERROR)) { - Set cons = _manager.listConnections(); StringBuilder buf = new StringBuilder(512); buf.append("Received a packet on the wrong stream: "); buf.append(packet); buf.append("\nthis connection:\n"); buf.append(con); buf.append("\nall connections:"); - for (Iterator iter = cons.iterator(); iter.hasNext();) { - Connection cur = (Connection)iter.next(); + for (Connection cur : _manager.listConnections()) { buf.append('\n').append(cur); } _log.error(buf.toString(), new Exception("Wrong stream")); @@ -299,9 +298,7 @@ class PacketHandler { } if (_log.shouldLog(Log.DEBUG)) { StringBuilder buf = new StringBuilder(128); - Set cons = _manager.listConnections(); - for (Iterator iter = cons.iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); + for (Connection con : _manager.listConnections()) { buf.append(con.toString()).append(" "); } _log.debug("connections: " + buf.toString() + " sendId: " diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index ca2e25d42d..748fe19cab 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -16,11 +16,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private final I2PAppContext _context; private final Log _log; private final Connection _connection; - private Destination _to; + private final Destination _to; private SessionKey _keyUsed; private Set _tagsSent; private final long _createdOn; - private int _numSends; + private volatile int _numSends; private long _lastSend; private long _acceptedOn; private long _ackOn; @@ -45,7 +45,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public Destination getTo() { return _to; } - public void setTo(Destination to) { _to = to; } /** * @deprecated should always return null @@ -72,6 +71,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { public void setTagsSent(Set tags) { if (tags != null && !tags.isEmpty()) _log.error("Who is sending tags thru the streaming lib? " + tags.size()); + /**** if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) && (!tags.isEmpty()) ) { //int old = _tagsSent.size(); //_tagsSent.addAll(tags); @@ -80,6 +80,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } else { _tagsSent = tags; } + ****/ } public boolean shouldSign() { @@ -142,10 +143,15 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { /** @return null if not bound */ public Connection getConnection() { return _connection; } + /** + * Will force a fast restransmit on the 3rd call (FAST_RETRANSMIT_THRESHOLD) + * but only if it's the lowest unacked (see Connection.ResendPacketEvent) + */ public void incrementNACKs() { int cnt = ++_nackCount; SimpleTimer2.TimedEvent evt = _resendEvent; - if ( (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD) && (evt != null) && (!_retransmitted)) { + if (cnt >= Connection.FAST_RETRANSMIT_THRESHOLD && evt != null && (!_retransmitted) && + (_numSends == 1 || _lastSend < _context.clock().now() + 4*1000)) { // Don't fast retx if we recently resent it _retransmitted = true; evt.reschedule(0); } @@ -162,8 +168,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { if (con != null) buf.append(" rtt ").append(con.getOptions().getRTT()); - if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) ) - buf.append(" with tags"); + //if ( (_tagsSent != null) && (!_tagsSent.isEmpty()) ) + // buf.append(" with tags"); + + if (_nackCount > 0) + buf.append(" nacked ").append(_nackCount).append(" times"); if (_ackOn > 0) buf.append(" ack after ").append(getAckTime()); @@ -200,8 +209,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { * @param maxWaitMs MessageOutputStream is the only caller, generally with -1 */ public void waitForAccept(int maxWaitMs) { - if (_connection == null) - throw new IllegalStateException("Cannot wait for accept with no connection"); long before = _context.clock().now(); int queued = _connection.getUnackedPacketsSent(); int window = _connection.getOptions().getWindowSize(); @@ -216,7 +223,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { int afterQueued = _connection.getUnackedPacketsSent(); if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) _log.debug("Took " + (after-before) + "ms to get " - + (accepted ? " accepted" : " rejected") + + (accepted ? "accepted" : "rejected") + (_cancelledOn > 0 ? " and CANCELLED" : "") + ", queued behind " + queued +" with a window size of " + window + ", finally accepted with " + afterQueued + " queued: " diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 724e44202f..5e7ae6acc3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -12,7 +12,8 @@ import net.i2p.util.Log; * Well, thats the theory at least... in practice we just * send them immediately with no blocking, since the * mode=bestEffort doesnt block in the SDK. - * + *

+ * MessageOutputStream -> ConnectionDataReceiver -> Connection -> PacketQueue -> I2PSession */ class PacketQueue { private final I2PAppContext _context; @@ -26,16 +27,17 @@ class PacketQueue { _session = session; _connectionManager = mgr; _log = context.logManager().getLog(PacketQueue.class); - _context.statManager().createRateStat("stream.con.sendMessageSize", "Size of a message sent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("stream.con.sendDuplicateSize", "Size of a message resent on a connection", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + // all createRateStats in ConnectionManager } /** * Add a new packet to be sent out ASAP * * keys and tags disabled since dropped in I2PSession + * @return true if sent */ - public void enqueue(PacketLocal packet) { + public boolean enqueue(PacketLocal packet) { + // this updates the ack/nack field packet.prepare(); //SessionKey keyUsed = packet.getKeyUsed(); @@ -52,7 +54,7 @@ class PacketQueue { if (packet.getAckTime() > 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Not resending " + packet); - return; + return false; } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending... " + packet); @@ -76,7 +78,7 @@ class PacketQueue { _log.warn("took " + writeTime + "ms to write the packet: " + packet); // last chance to short circuit... - if (packet.getAckTime() > 0) return; + if (packet.getAckTime() > 0) return false; // this should not block! begin = _context.clock().now(); @@ -158,6 +160,7 @@ class PacketQueue { // reset packet.releasePayload(); } + return sent; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java index b0167809ab..17918a6e12 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java @@ -15,7 +15,7 @@ class SchedulerChooser { private final Log _log; private final TaskScheduler _nullScheduler; /** list of TaskScheduler objects */ - private final List _schedulers; + private final List _schedulers; public SchedulerChooser(I2PAppContext context) { _context = context; @@ -26,7 +26,7 @@ class SchedulerChooser { public TaskScheduler getScheduler(Connection con) { for (int i = 0; i < _schedulers.size(); i++) { - TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i); + TaskScheduler scheduler = _schedulers.get(i); if (scheduler.accept(con)) { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName()); @@ -50,11 +50,7 @@ class SchedulerChooser { } private class NullScheduler implements TaskScheduler { - private final Log _log; - public NullScheduler() { - _log = _context.logManager().getLog(NullScheduler.class); - } - + public void eventOccurred(Connection con) { if (_log.shouldLog(Log.WARN)) _log.warn("Yell at jrandom: Event occurred on " + con, new Exception("source")); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java index 29e69f6fa3..b8754f7845 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java @@ -45,18 +45,25 @@ class SchedulerClosing extends SchedulerImpl { } public void eventOccurred(Connection con) { - if (con.getNextSendTime() <= 0) - con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); - long remaining = con.getNextSendTime() - _context.clock().now(); + long nextSend = con.getNextSendTime(); + long now = _context.clock().now(); + long remaining; + if (nextSend <= 0) { + remaining = con.getOptions().getSendAckDelay(); + nextSend = now + remaining; + con.setNextSendTime(nextSend); + } else { + remaining = nextSend - now; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Event occurred w/ remaining: " + remaining + " on " + con); if (remaining <= 0) { if (con.getCloseSentOn() <= 0) { con.sendAvailable(); - con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { - con.ackImmediately(); + //con.ackImmediately(); } + con.setNextSendTime(now + con.getOptions().getSendAckDelay()); } else { //if (remaining < 5*1000) // remaining = 5*1000; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java index ace5916297..fd36b7723c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -19,4 +19,9 @@ abstract class SchedulerImpl implements TaskScheduler { protected void reschedule(long msToWait, Connection con) { SimpleScheduler.getInstance().addEvent(con.getConnectionEvent(), msToWait); } + + @Override + public String toString() { + return getClass().getSimpleName(); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index e4c4ec9173..15b8e937e7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -130,8 +130,8 @@ class TCBShare { super(timer); } public void timeReached() { - for (Iterator iter = _cache.keySet().iterator(); iter.hasNext(); ) { - if (_cache.get(iter.next()).isExpired()) + for (Iterator iter = _cache.values().iterator(); iter.hasNext(); ) { + if (iter.next().isExpired()) iter.remove(); } schedule(CLEAN_TIME); diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 60f67236a3..9dc50b12b4 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -584,7 +584,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } else { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Message received of type " + message.getType() - + " to be handled by " + handler); + + " to be handled by " + handler.getClass().getSimpleName()); handler.handleMessage(message, this); } } diff --git a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java index 755b688a25..3aec9e288d 100644 --- a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java @@ -61,7 +61,8 @@ public class ElGamalAESEngine { } /** - * Decrypt the message using the given private key using tags from the default key manager. + * Decrypt the message using the given private key using tags from the default key manager, + * which is the router's key manager. Use extreme care if you aren't the router. * * @deprecated specify the key manager! */ @@ -75,6 +76,10 @@ public class ElGamalAESEngine { * This works according to the * ElGamal+AES algorithm in the data structure spec. * + * Warning - use the correct SessionKeyManager. Clients should instantiate their own. + * Clients using I2PAppContext.sessionKeyManager() may be correlated with the router, + * unless you are careful to use different keys. + * * @return decrypted data or null on failure */ public byte[] decrypt(byte data[], PrivateKey targetPrivateKey, SessionKeyManager keyManager) throws DataFormatException { @@ -100,7 +105,7 @@ public class ElGamalAESEngine { //if (_log.shouldLog(Log.DEBUG)) _log.debug("Key is known for tag " + st); long id = _context.random().nextLong(); if (_log.shouldLog(Log.DEBUG)) - _log.debug(id + ": Decrypting existing session encrypted with tag: " + st.toString() + ": key: " + key.toBase64() + ": " + data.length + " bytes: " + Base64.encode(data, 0, 64)); + _log.debug(id + ": Decrypting existing session encrypted with tag: " + st.toString() + ": key: " + key.toBase64() + ": " + data.length + " bytes " /* + Base64.encode(data, 0, 64) */ ); decrypted = decryptExistingSession(data, key, targetPrivateKey, foundTags, usedKey, foundKey); if (decrypted != null) { @@ -410,7 +415,7 @@ public class ElGamalAESEngine { _context.statManager().updateFrequency("crypto.elGamalAES.encryptExistingSession"); byte rv[] = encryptExistingSession(data, target, key, tagsForDelivery, currentTag, newKey, paddedSize); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Existing session encrypted with tag: " + currentTag.toString() + ": " + rv.length + " bytes and key: " + key.toBase64() + ": " + Base64.encode(rv, 0, 64)); + _log.debug("Existing session encrypted with tag: " + currentTag.toString() + ": " + rv.length + " bytes and key: " + key.toBase64() /* + ": " + Base64.encode(rv, 0, 64) */); return rv; } @@ -599,7 +604,6 @@ public class ElGamalAESEngine { //_log.debug("Encrypting AES"); if (tagsForDelivery == null) tagsForDelivery = Collections.EMPTY_SET; int size = 2 // sizeof(tags) - + tagsForDelivery.size() + SessionTag.BYTE_LENGTH*tagsForDelivery.size() + 4 // payload length + Hash.HASH_LENGTH diff --git a/core/java/src/net/i2p/crypto/SessionKeyManager.java b/core/java/src/net/i2p/crypto/SessionKeyManager.java index 4a9456f7d5..c7af8c9703 100644 --- a/core/java/src/net/i2p/crypto/SessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/SessionKeyManager.java @@ -59,7 +59,8 @@ public class SessionKeyManager { * Associate a new session key with the specified target. Metrics to determine * when to expire that key begin with this call. * - * @deprecated racy + * Racy if called after getCurrentKey() to check for a current session; + * use getCurrentOrNewKey() in that case. */ public void createSession(PublicKey target, SessionKey key) { // nop } @@ -67,7 +68,8 @@ public class SessionKeyManager { /** * Generate a new session key and associate it with the specified target. * - * @deprecated racy + * Racy if called after getCurrentKey() to check for a current session; + * use getCurrentOrNewKey() in that case. */ public SessionKey createSession(PublicKey target) { SessionKey key = KeyGenerator.getInstance().generateSessionKey(); diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index e31adcada7..bd4758a151 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -243,7 +243,8 @@ public class TransientSessionKeyManager extends SessionKeyManager { * Associate a new session key with the specified target. Metrics to determine * when to expire that key begin with this call. * - * @deprecated racy + * Racy if called after getCurrentKey() to check for a current session; + * use getCurrentOrNewKey() in that case. */ @Override public void createSession(PublicKey target, SessionKey key) { diff --git a/core/java/src/net/i2p/util/EepGet.java b/core/java/src/net/i2p/util/EepGet.java index cbe4b42e7b..af60551296 100644 --- a/core/java/src/net/i2p/util/EepGet.java +++ b/core/java/src/net/i2p/util/EepGet.java @@ -458,19 +458,32 @@ public class EepGet { } public void stopFetching() { _keepFetching = false; } + /** - * Blocking fetch, returning true if the URL was retrieved, false if all retries failed + * Blocking fetch, returning true if the URL was retrieved, false if all retries failed. * + * Header timeout default 45 sec, total timeout default none, inactivity timeout default 60 sec. */ public boolean fetch() { return fetch(_fetchHeaderTimeout); } + /** * Blocking fetch, timing out individual attempts if the HTTP response headers * don't come back in the time given. If the timeout is zero or less, this will * wait indefinitely. + * + * Total timeout default none, inactivity timeout default 60 sec. */ public boolean fetch(long fetchHeaderTimeout) { return fetch(fetchHeaderTimeout, -1, -1); } + + /** + * Blocking fetch. + * + * @param fetchHeaderTimeout <= 0 for none (proxy will timeout if none, none isn't recommended if no proxy) + * @param totalTimeout <= 0 for default none + * @param inactivityTimeout <= 0 for default 60 sec + */ public boolean fetch(long fetchHeaderTimeout, long totalTimeout, long inactivityTimeout) { _fetchHeaderTimeout = fetchHeaderTimeout; _fetchEndTime = (totalTimeout > 0 ? System.currentTimeMillis() + totalTimeout : -1); diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index 1bff875578..463882a25c 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -9,12 +9,13 @@ import net.i2p.I2PAppContext; */ class Executor implements Runnable { private final I2PAppContext _context; - private Log _log; - private final List _readyEvents; + private final Log _log; + private final List _readyEvents; private final SimpleStore runn; - public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { + public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { _context = ctx; + _log = log; _readyEvents = events; runn = x; } @@ -26,7 +27,7 @@ class Executor implements Runnable { if (_readyEvents.isEmpty()) try { _readyEvents.wait(); } catch (InterruptedException ie) {} if (!_readyEvents.isEmpty()) - evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); + evt = _readyEvents.remove(0); } if (evt != null) { @@ -34,21 +35,12 @@ class Executor implements Runnable { try { evt.timeReached(); } catch (Throwable t) { - log("Executing task " + evt + " exited unexpectedly, please report", t); + _log.error("Executing task " + evt + " exited unexpectedly, please report", t); } long time = _context.clock().now() - before; - // FIXME _log won't be non-null unless we already had a CRIT - if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) + if ( (time > 1000) && (_log.shouldLog(Log.WARN)) ) _log.warn("wtf, event execution took " + time + ": " + evt); } } } - - private void log(String msg, Throwable t) { - synchronized (this) { - if (_log == null) - _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); - } - _log.log(Log.CRIT, msg, t); - } } diff --git a/history.txt b/history.txt index 6734f3335d..9468144ae2 100644 --- a/history.txt +++ b/history.txt @@ -1,3 +1,69 @@ +2012-07-01 zzz + * i2psnark: + - Don't send a keepalive to a peer we are going to disconnect + - Disconnect peer when idle a long time + - PeerCheckerTask cleanup + - Static ref cleanup + - Don't show a downloaded torrent file as "seeding" + - Better torrent file download icon + +2012-06-29 zzz + * HTTP Proxy: Change the error code for unknown host from 404 to 500 + * SimpleTimer: Fix logging + * Streaming: + - Allow at least 3 packets and up to half the window to be active resends + instead of just 1, to reduce stall time after a packet drop + - Increase fast retransmit threshold back to 3 to reduce retransmissions + - Don't fast retransmit if we recently retransmitted it already + - Allow double the window as long as gaps are less than the window + - Don't set the MSS in a resent packet (saves 2 bytes) + - Remove redundant calls to updateAcks() + - Update activity timer when resending a packet + - Reset unackedPacketsReceived counter at all places where acks are sent + so it isn't wrong + - Fix some places where the activeResends count could become wrong + - Prevent storm of CLOSE packets + - Never resend the whole packet in ackImmediately(), just send an ack + - Cancel flusher timer in MessageOutputStream when closed + - Move some createRateStats to ConnectionManager to reduce repeated calls + - Cleanups, javadocs, logging, volatile, finals + * Update: Increase eepget timeouts + +2012-06-24 zzz + * ElGamalAESEngine: Fix bad size estimate when tags are included, + resulting in trailing zeros after the padding + in the unencrypted data + * i2psnark: Don't create a new PeerCoordinator after restart, as the + TrackerClient holds on to the old one and that causes it + to not get peers. Possibly fixes ticket #563. + * I2PTunnel: Fix NPE on shared client creation, thx kytv + * Transport: Add Ethiopia to hidden mode list + +2012-06-21 zzz + * I2CP: Make separate message ID counters per-destination, use atomic, + increase max (could have caused "local loopback" problems) + * IRC Client: Don't flush output unless out of input, so the + streaming messages don't get split up unnecessarily + * OCMOSJ, ElG, Streaming: log tweaks + * TunnelInfo: Change msg counter from long to int + * TunnelPeerSelectors: Minor refactoring to store context + * TunnelPool: Fix bug where a tunnel was marked as reused when it wasn't + * TunnelPoolManager: Use one ClientPeerSelector for all pools + +2012-06-20 zzz + * I2PSession: + - Greatly simplify the VerifyUsage timers + - Constructor cleanup + +2012-06-19 zzz + * i2psnark: + - Hide buttons while stopping all + * Socks: Pass remote port through + * Streaming: + - Listen only on local port if set + - Listen only for streaming protocol if configured (new option) + - Javadocs re: ports + 2012-06-18 zzz * i2psnark: - Improve torrent shutdown handling to maximize chance of diff --git a/installer/resources/proxy/dnfh-header.ht b/installer/resources/proxy/dnfh-header.ht index b50a3395f2..126c93f252 100644 --- a/installer/resources/proxy/dnfh-header.ht +++ b/installer/resources/proxy/dnfh-header.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_de.ht b/installer/resources/proxy/dnfh-header_de.ht index c2c80f198b..9f9413424e 100644 --- a/installer/resources/proxy/dnfh-header_de.ht +++ b/installer/resources/proxy/dnfh-header_de.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_fr.ht b/installer/resources/proxy/dnfh-header_fr.ht index d5dce0364a..c24e2d6bf6 100644 --- a/installer/resources/proxy/dnfh-header_fr.ht +++ b/installer/resources/proxy/dnfh-header_fr.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domaine non trouvé +HTTP/1.1 500 Domaine non trouvé Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_nl.ht b/installer/resources/proxy/dnfh-header_nl.ht index 0dc34e3dbf..296a62dddc 100644 --- a/installer/resources/proxy/dnfh-header_nl.ht +++ b/installer/resources/proxy/dnfh-header_nl.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_ru.ht b/installer/resources/proxy/dnfh-header_ru.ht index cf24cbc277..0e34195599 100644 --- a/installer/resources/proxy/dnfh-header_ru.ht +++ b/installer/resources/proxy/dnfh-header_ru.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/installer/resources/proxy/dnfh-header_zh.ht b/installer/resources/proxy/dnfh-header_zh.ht index bf69555006..1e65a67c16 100644 --- a/installer/resources/proxy/dnfh-header_zh.ht +++ b/installer/resources/proxy/dnfh-header_zh.ht @@ -1,4 +1,4 @@ -HTTP/1.1 404 Domain Not Found +HTTP/1.1 500 Domain Not Found Content-Type: text/html; charset=UTF-8 Cache-control: no-cache Connection: close diff --git a/router/java/src/net/i2p/data/i2np/GarlicMessage.java b/router/java/src/net/i2p/data/i2np/GarlicMessage.java index c4cfd29c9e..07621c462c 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicMessage.java +++ b/router/java/src/net/i2p/data/i2np/GarlicMessage.java @@ -83,7 +83,7 @@ public class GarlicMessage extends FastI2NPMessageImpl { public String toString() { StringBuilder buf = new StringBuilder(); buf.append("[GarlicMessage: "); - buf.append("\n\tData length: ").append(getData().length).append(" bytes"); + buf.append("Data length: ").append(getData().length).append(" bytes"); buf.append("]"); return buf.toString(); } diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index e1f87e077d..43dca92409 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 16; + public final static long BUILD = 21; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/TunnelInfo.java b/router/java/src/net/i2p/router/TunnelInfo.java index 7e06e3463d..be200069f7 100644 --- a/router/java/src/net/i2p/router/TunnelInfo.java +++ b/router/java/src/net/i2p/router/TunnelInfo.java @@ -68,10 +68,11 @@ public interface TunnelInfo { */ public void testSuccessful(int responseTime); - public long getProcessedMessagesCount(); + public int getProcessedMessagesCount(); /** we know for sure that this many bytes travelled through the tunnel in its lifetime */ public long getVerifiedBytesTransferred(); + /** we know for sure that the given number of bytes were sent down the tunnel fully */ public void incrementVerifiedBytesTransferred(int numBytes); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 8597c52b2b..83d5d51d5e 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import net.i2p.client.I2PClient; import net.i2p.crypto.SessionKeyManager; @@ -86,7 +87,14 @@ class ClientConnectionRunner { private boolean _dead; /** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */ private boolean _dontSendMSM; + private final AtomicInteger _messageId; // messageId counter + // Was 32767 since the beginning (04-2004). + // But it's 4 bytes in the I2CP spec and stored as a long in MessageID.... + // If this is too low and wraps around, I2CP VerifyUsage could delete the wrong message, + // e.g. on local access + private static final int MAX_MESSAGE_ID = 0x4000000; + /** * Create a new runner against the given socket * @@ -99,6 +107,7 @@ class ClientConnectionRunner { _messages = new ConcurrentHashMap(); _alreadyProcessed = new ArrayList(); _acceptedPending = new ConcurrentHashSet(); + _messageId = new AtomicInteger(_context.random().nextInt()); } private static volatile int __id = 0; @@ -520,18 +529,9 @@ class ClientConnectionRunner { } } - // this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME - private final static int MAX_MESSAGE_ID = 32767; - private static volatile int _messageId = RandomSource.getInstance().nextInt(MAX_MESSAGE_ID); // messageId counter - private final static Object _messageIdLock = new Object(); - - static int getNextMessageId() { - synchronized (_messageIdLock) { - int messageId = (++_messageId)%MAX_MESSAGE_ID; - if (_messageId >= MAX_MESSAGE_ID) - _messageId = 0; - return messageId; - } + public int getNextMessageId() { + // Don't % so we don't get negative IDs + return _messageId.incrementAndGet() & (MAX_MESSAGE_ID - 1); } /** diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 88b1ca10b0..843ebfacea 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -36,7 +36,7 @@ class MessageReceivedJob extends JobImpl { public void runJob() { if (_runner.isDead()) return; MessageId id = new MessageId(); - id.setMessageId(ClientConnectionRunner.getNextMessageId()); + id.setMessageId(_runner.getNextMessageId()); _runner.setPayload(id, _payload); messageAvailable(id, _payload.getSize()); } diff --git a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java index 0f01d53c62..ad7c63627e 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageBuilder.java @@ -233,8 +233,8 @@ public class GarlicMessageBuilder { } if (log.shouldLog(Log.DEBUG)) - log.debug("CloveSet size for message " + msg.getUniqueId() + " is " + cloveSet.length - + " and encrypted message data is " + encData.length); + log.debug("CloveSet (" + config.getCloveCount() + " cloves) for message " + msg.getUniqueId() + " is " + cloveSet.length + + " bytes and encrypted message data is " + encData.length + " bytes"); return msg; } @@ -268,7 +268,7 @@ public class GarlicMessageBuilder { for (int i = 0; i < config.getCloveCount(); i++) { GarlicConfig c = config.getClove(i); if (c instanceof PayloadGarlicConfig) { - log.debug("Subclove IS a payload garlic clove"); + //log.debug("Subclove IS a payload garlic clove"); cloves[i] = buildClove(ctx, (PayloadGarlicConfig)c); } else { log.debug("Subclove IS NOT a payload garlic clove"); diff --git a/router/java/src/net/i2p/router/message/GarlicMessageParser.java b/router/java/src/net/i2p/router/message/GarlicMessageParser.java index 4885d21c11..083ffd8c37 100644 --- a/router/java/src/net/i2p/router/message/GarlicMessageParser.java +++ b/router/java/src/net/i2p/router/message/GarlicMessageParser.java @@ -71,13 +71,13 @@ class GarlicMessageParser { if (_log.shouldLog(Log.DEBUG)) _log.debug("# cloves to read: " + numCloves); for (int i = 0; i < numCloves; i++) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Reading clove " + i); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Reading clove " + i); GarlicClove clove = new GarlicClove(_context); offset += clove.readBytes(data, offset); set.addClove(clove); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("After reading clove " + i); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("After reading clove " + i); } //Certificate cert = new Certificate(); //offset += cert.readBytes(data, offset); diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java index cf8a0dfc9e..7941616f5f 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java @@ -88,7 +88,7 @@ class OutboundClientMessageJobHelper { PayloadGarlicConfig dataClove, Hash from, Destination dest, TunnelInfo replyTunnel, boolean requireAck, LeaseSet bundledReplyLeaseSet) { Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class); - if (log.shouldLog(Log.DEBUG)) + if (replyToken >= 0 && log.shouldLog(Log.DEBUG)) log.debug("Reply token: " + replyToken); GarlicConfig config = new GarlicConfig(); @@ -136,20 +136,17 @@ class OutboundClientMessageJobHelper { Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class); PayloadGarlicConfig ackClove = new PayloadGarlicConfig(); - Hash replyToTunnelRouter = null; // inbound tunnel gateway - TunnelId replyToTunnelId = null; // tunnel id on that gateway - if (replyToTunnel == null) { if (log.shouldLog(Log.WARN)) log.warn("Unable to send client message from " + from.toBase64() + ", as there are no inbound tunnels available"); return null; } - replyToTunnelId = replyToTunnel.getReceiveTunnelId(0); - replyToTunnelRouter = replyToTunnel.getPeer(0); + TunnelId replyToTunnelId = replyToTunnel.getReceiveTunnelId(0); // tunnel id on that gateway + Hash replyToTunnelRouter = replyToTunnel.getPeer(0); // inbound tunnel gateway if (log.shouldLog(Log.DEBUG)) log.debug("Ack for the data message will come back along tunnel " + replyToTunnelId - + ":\n" + replyToTunnel); + + ": " + replyToTunnel); DeliveryInstructions ackInstructions = new DeliveryInstructions(); ackInstructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_TUNNEL); @@ -163,8 +160,8 @@ class OutboundClientMessageJobHelper { DeliveryStatusMessage msg = new DeliveryStatusMessage(ctx); msg.setArrival(ctx.clock().now()); msg.setMessageId(replyToken); - if (log.shouldLog(Log.DEBUG)) - log.debug("Delivery status message key: " + replyToken + " arrival: " + msg.getArrival()); + //if (log.shouldLog(Log.DEBUG)) + // log.debug("Delivery status message key: " + replyToken + " arrival: " + msg.getArrival()); ackClove.setCertificate(Certificate.NULL_CERT); ackClove.setDeliveryInstructions(ackInstructions); @@ -175,11 +172,11 @@ class OutboundClientMessageJobHelper { // defaults //ackClove.setRequestAck(false); - if (log.shouldLog(Log.DEBUG)) - log.debug("Delivery status message is targetting us [" - + ackClove.getRecipient().getIdentity().getHash().toBase64() - + "] via tunnel " + replyToTunnelId.getTunnelId() + " on " - + replyToTunnelRouter.toBase64()); + //if (log.shouldLog(Log.DEBUG)) + // log.debug("Delivery status message is targetting us [" + // + ackClove.getRecipient().getIdentity().getHash().toBase64() + // + "] via tunnel " + replyToTunnelId.getTunnelId() + " on " + // + replyToTunnelRouter.toBase64()); return ackClove; } diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java index 4896e57d46..ea0664ec61 100644 --- a/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java +++ b/router/java/src/net/i2p/router/message/OutboundClientMessageOneShotJob.java @@ -150,8 +150,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } } overallExpiration = timeoutMs + _start; - if (_log.shouldLog(Log.INFO)) - _log.info(getJobId() + " Default Expiration (ms): " + timeoutMs); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getJobId() + " Default Expiration (ms): " + timeoutMs); } _overallExpiration = overallExpiration; } @@ -182,9 +182,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { dieFatal(); return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Send outbound client message job beginning" + - ": preparing to search for the leaseSet for " + _toString); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Send outbound client message job beginning" + + // ": preparing to search for the leaseSet for " + _toString); long timeoutMs = _overallExpiration - now; Hash key = _to.calculateHash(); SendJob success = new SendJob(getContext()); @@ -474,8 +474,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { dieFatal(); return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Clove built to " + _toString); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Clove built to " + _toString); long msgExpiration = _overallExpiration; // getContext().clock().now() + OVERALL_TIMEOUT_MS_DEFAULT; GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(getContext(), token, msgExpiration, key, @@ -494,8 +494,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { return; } - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": send() - token expected " + token + " to " + _toString); SendSuccessJob onReply = null; SendTimeoutJob onFail = null; @@ -515,14 +515,14 @@ public class OutboundClientMessageOneShotJob extends JobImpl { } if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Placing GarlicMessage into the new tunnel message bound for " + _log.debug(getJobId() + ": GarlicMessage in new tunnel msg for " + _toString + " at " + _lease.getTunnelId() + " on " + _lease.getGateway()); if (_outTunnel != null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Sending tunnel message out " + _outTunnel.getSendTunnelId(0) + " to " + _log.debug(getJobId() + ": Sending msg out " + _outTunnel.getSendTunnelId(0) + " to " + _toString + " at " + _lease.getTunnelId() + " on " + _lease.getGateway()); @@ -571,9 +571,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { long before = getContext().clock().now(); getContext().tunnelDispatcher().dispatchOutbound(_msg, _outTunnel.getSendTunnelId(0), _lease.getTunnelId(), _lease.getGateway()); long dispatchSendTime = getContext().clock().now() - before; - if (_log.shouldLog(Log.INFO)) - _log.info(OutboundClientMessageOneShotJob.this.getJobId() + - ": Dispatching message to " + _toString + " complete"); + //if (_log.shouldLog(Log.INFO)) + // _log.info(OutboundClientMessageOneShotJob.this.getJobId() + + // ": Dispatching message to " + _toString + " complete"); getContext().statManager().addRateData("client.dispatchTime", getContext().clock().now() - _start, 0); getContext().statManager().addRateData("client.dispatchSendTime", dispatchSendTime, 0); } @@ -728,8 +728,8 @@ public class OutboundClientMessageOneShotJob extends JobImpl { _clove = clove; _cloveId = _clove.getId(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug(getJobId() + ": Built payload clove with id " + clove.getId()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug(getJobId() + ": Built payload clove with id " + clove.getId()); return true; } @@ -858,9 +858,9 @@ public class OutboundClientMessageOneShotJob extends JobImpl { public String getName() { return "Outbound client message send timeout"; } public void runJob() { - if (_log.shouldLog(Log.INFO)) - _log.info(OutboundClientMessageOneShotJob.this.getJobId() - + ": Soft timeout through the lease " + _lease); + //if (_log.shouldLog(Log.INFO)) + // _log.info(OutboundClientMessageOneShotJob.this.getJobId() + // + ": Soft timeout through the lease " + _lease); // unused //_lease.setNumFailure(_lease.getNumFailure()+1); diff --git a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java index 5ded790ab8..8880a77854 100644 --- a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java +++ b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java @@ -21,6 +21,13 @@ import net.i2p.router.ReplyJob; import net.i2p.router.RouterContext; import net.i2p.util.Log; +/** + * Send a message directly to another router, i.e. not through a tunnel. + * This is safe to run inline via runJob(). + * If the RouterInfo for the Hash is not found locally, it will + * queue a lookup and register itself to be run again when the lookup + * succeeds or times out. + */ public class SendMessageDirectJob extends JobImpl { private final Log _log; private final I2NPMessage _message; @@ -39,9 +46,11 @@ public class SendMessageDirectJob extends JobImpl { public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, int timeoutMs, int priority) { this(ctx, message, toPeer, null, null, null, null, timeoutMs, priority); } + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) { this(ctx, message, toPeer, null, onSuccess, onFail, selector, timeoutMs, priority); } + public SendMessageDirectJob(RouterContext ctx, I2NPMessage message, Hash toPeer, Job onSend, ReplyJob onSuccess, Job onFail, MessageSelector selector, int timeoutMs, int priority) { super(ctx); _log = getContext().logManager().getLog(SendMessageDirectJob.class); @@ -66,6 +75,7 @@ public class SendMessageDirectJob extends JobImpl { } public String getName() { return "Send Message Direct"; } + public void runJob() { long now = getContext().clock().now(); diff --git a/router/java/src/net/i2p/router/transport/BadCountries.java b/router/java/src/net/i2p/router/transport/BadCountries.java index 1431dcb60f..f5d36b64ac 100644 --- a/router/java/src/net/i2p/router/transport/BadCountries.java +++ b/router/java/src/net/i2p/router/transport/BadCountries.java @@ -16,7 +16,7 @@ abstract class BadCountries { // zzz.i2p/topics/969 // List created based on the Press Freedom Index. Those countries with a score of higher than 50 are included: // http://en.wikipedia.org/wiki/Press_Freedom_Index - // Except: + // Except (quote): // I don't really think that is usage of I2P is dangerous in countries from CIS // General situation is really bad (like in Russia) but people here doesn't have problems with Ecnryption usage. @@ -32,6 +32,7 @@ abstract class BadCountries { /* Democratic Republic of the Congo */ "CD", /* Equatorial Guinea */ "GQ", /* Eritrea */ "ER", + /* Ethiopia */ "ET", /* Fiji */ "FJ", /* Honduras */ "HN", /* Iran */ "IR", diff --git a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java index 68d733f82c..5306179a35 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelCreatorConfig.java @@ -29,7 +29,7 @@ public class TunnelCreatorConfig implements TunnelInfo { private List _order; private long _replyMessageId; private final boolean _isInbound; - private long _messagesProcessed; + private int _messagesProcessed; private volatile long _verifiedBytesTransferred; private boolean _failed; private int _failures; @@ -127,7 +127,7 @@ public class TunnelCreatorConfig implements TunnelInfo { /** take note of a message being pumped through this tunnel */ public void incrementProcessedMessages() { _messagesProcessed++; } - public long getProcessedMessagesCount() { return _messagesProcessed; } + public int getProcessedMessagesCount() { return _messagesProcessed; } public void incrementVerifiedBytesTransferred(int bytes) { _verifiedBytesTransferred += bytes; @@ -144,6 +144,7 @@ public class TunnelCreatorConfig implements TunnelInfo { _context.profileManager().tunnelDataPushed1m(_peers[i], (int)normalized); } } + public long getVerifiedBytesTransferred() { return _verifiedBytesTransferred; } private static final int THROUGHPUT_COUNT = 3; diff --git a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java index 24e6966a12..468cacaf26 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ClientPeerSelector.java @@ -15,8 +15,13 @@ import net.i2p.router.TunnelPoolSettings; * */ class ClientPeerSelector extends TunnelPeerSelector { - public List selectPeers(RouterContext ctx, TunnelPoolSettings settings) { - int length = getLength(ctx, settings); + + public ClientPeerSelector(RouterContext context) { + super(context); + } + + public List selectPeers(TunnelPoolSettings settings) { + int length = getLength(settings); if (length < 0) return null; if ( (length == 0) && (settings.getLength()+settings.getLengthVariance() > 0) ) @@ -26,9 +31,9 @@ class ClientPeerSelector extends TunnelPeerSelector { if (length > 0) { if (shouldSelectExplicit(settings)) - return selectExplicit(ctx, settings, length); + return selectExplicit(settings, length); - Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory()); + Set exclude = getExclude(settings.isInbound(), false); Set matches = new HashSet(length); if (length == 1) { ctx.profileOrganizer().selectFastPeers(length, exclude, matches, 0); @@ -41,6 +46,9 @@ class ClientPeerSelector extends TunnelPeerSelector { rv = new ArrayList(length + 1); // OBEP or IB last hop // group 0 or 1 if two hops, otherwise group 0 + if (!settings.isInbound()) { + // exclude existing OBEPs to get some diversity + } ctx.profileOrganizer().selectFastPeers(1, exclude, matches, settings.getRandomKey(), length == 2 ? 2 : 4); matches.remove(ctx.routerHash()); exclude.addAll(matches); @@ -64,6 +72,9 @@ class ClientPeerSelector extends TunnelPeerSelector { } // IBGW or OB first hop // group 2 or 3 if two hops, otherwise group 1 + if (settings.isInbound()) { + // exclude existing IBGWs to get some diversity + } ctx.profileOrganizer().selectFastPeers(1, exclude, matches, settings.getRandomKey(), length == 2 ? 3 : 5); matches.remove(ctx.routerHash()); rv.addAll(matches); diff --git a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java index 1f05727dc5..ed3837ec44 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/ExploratoryPeerSelector.java @@ -18,9 +18,14 @@ import net.i2p.util.Log; * */ class ExploratoryPeerSelector extends TunnelPeerSelector { - public List selectPeers(RouterContext ctx, TunnelPoolSettings settings) { + + public ExploratoryPeerSelector(RouterContext context) { + super(context); + } + + public List selectPeers(TunnelPoolSettings settings) { Log l = ctx.logManager().getLog(getClass()); - int length = getLength(ctx, settings); + int length = getLength(settings); if (length < 0) { if (l.shouldLog(Log.DEBUG)) l.debug("Length requested is zero: " + settings); @@ -28,13 +33,13 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { } if (false && shouldSelectExplicit(settings)) { - List rv = selectExplicit(ctx, settings, length); + List rv = selectExplicit(settings, length); if (l.shouldLog(Log.DEBUG)) l.debug("Explicit peers selected: " + rv); return rv; } - Set exclude = getExclude(ctx, settings.isInbound(), settings.isExploratory()); + Set exclude = getExclude(settings.isInbound(), true); exclude.add(ctx.routerHash()); // Don't use ff peers for exploratory tunnels to lessen exposure to netDb searches and stores // Hmm if they don't get explored they don't get a speed/capacity rating @@ -42,7 +47,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { // FloodfillNetworkDatabaseFacade fac = (FloodfillNetworkDatabaseFacade)ctx.netDb(); // exclude.addAll(fac.getFloodfillPeers()); HashSet matches = new HashSet(length); - boolean exploreHighCap = shouldPickHighCap(ctx); + boolean exploreHighCap = shouldPickHighCap(); // // We don't honor IP Restriction here, to be fixed @@ -84,7 +89,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { * build success rate is much worse, return true so that reliability * is maintained. */ - private static boolean shouldPickHighCap(RouterContext ctx) { + private boolean shouldPickHighCap() { if (ctx.getBooleanProperty("router.exploreHighCapacity")) return true; @@ -118,7 +123,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { if (ctx.router().getUptime() <= 11*60*1000) { failPct = 100 - MIN_NONFAILING_PCT; } else { - failPct = getExploratoryFailPercentage(ctx); + failPct = getExploratoryFailPercentage(); //Log l = ctx.logManager().getLog(getClass()); //if (l.shouldLog(Log.DEBUG)) // l.debug("Normalized Fail pct: " + failPct); @@ -140,9 +145,9 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { * Even this isn't the "true" rate for the NonFailingPeers pool, since we * are often building exploratory tunnels using the HighCapacity pool. */ - private static int getExploratoryFailPercentage(RouterContext ctx) { - int c = getFailPercentage(ctx, "Client"); - int e = getFailPercentage(ctx, "Exploratory"); + private int getExploratoryFailPercentage() { + int c = getFailPercentage("Client"); + int e = getFailPercentage("Exploratory"); //Log l = ctx.logManager().getLog(getClass()); //if (l.shouldLog(Log.DEBUG)) // l.debug("Client, Expl. Fail pct: " + c + ", " + e); @@ -154,11 +159,11 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { return (100 * (e-c)) / (100-c); } - private static int getFailPercentage(RouterContext ctx, String t) { + private int getFailPercentage(String t) { String pfx = "tunnel.build" + t; - int timeout = getEvents(ctx, pfx + "Expire", 10*60*1000); - int reject = getEvents(ctx, pfx + "Reject", 10*60*1000); - int accept = getEvents(ctx, pfx + "Success", 10*60*1000); + int timeout = getEvents(pfx + "Expire", 10*60*1000); + int reject = getEvents(pfx + "Reject", 10*60*1000); + int accept = getEvents(pfx + "Success", 10*60*1000); if (accept + reject + timeout <= 0) return 0; double pct = (double)(reject + timeout) / (accept + reject + timeout); @@ -166,7 +171,7 @@ class ExploratoryPeerSelector extends TunnelPeerSelector { } /** Use current + last to get more recent and smoother data */ - private static int getEvents(RouterContext ctx, String stat, long period) { + private int getEvents(String stat, long period) { RateStat rs = ctx.statManager().getRate(stat); if (rs == null) return 0; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java index 22508a2ce9..7679ed955c 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPeerSelector.java @@ -31,6 +31,12 @@ import net.i2p.util.VersionComparator; * Todo: there's nothing non-static in here */ public abstract class TunnelPeerSelector { + protected final RouterContext ctx; + + protected TunnelPeerSelector(RouterContext context) { + ctx = context; + } + /** * Which peers should go into the next tunnel for the given settings? * @@ -40,12 +46,12 @@ public abstract class TunnelPeerSelector { * to build through, and the settings reject 0 hop tunnels, this will * return null. */ - public abstract List selectPeers(RouterContext ctx, TunnelPoolSettings settings); + public abstract List selectPeers(TunnelPoolSettings settings); /** * @return randomized number of hops 0-7, not including ourselves */ - protected int getLength(RouterContext ctx, TunnelPoolSettings settings) { + protected int getLength(TunnelPoolSettings settings) { int length = settings.getLength(); int override = settings.getLengthOverride(); if (override >= 0) { @@ -109,7 +115,7 @@ public abstract class TunnelPeerSelector { * Needs analysis and testing * @return should always be false */ - protected List selectExplicit(RouterContext ctx, TunnelPoolSettings settings, int length) { + protected List selectExplicit(TunnelPoolSettings settings, int length) { String peers = null; Properties opts = settings.getUnknownOptions(); if (opts != null) @@ -173,7 +179,7 @@ public abstract class TunnelPeerSelector { /** * Pick peers that we want to avoid */ - public Set getExclude(RouterContext ctx, boolean isInbound, boolean isExploratory) { + public Set getExclude(boolean isInbound, boolean isExploratory) { // we may want to update this to skip 'hidden' or 'unreachable' peers, but that // isn't safe, since they may publish one set of routerInfo to us and another to // other peers. the defaults for filterUnreachable has always been to return false, @@ -196,7 +202,7 @@ public abstract class TunnelPeerSelector { peers.addAll(ctx.profileOrganizer().selectPeersRecentlyRejecting()); peers.addAll(ctx.tunnelManager().selectPeersInTooManyTunnels()); // if (false && filterUnreachable(ctx, isInbound, isExploratory)) { - if (filterUnreachable(ctx, isInbound, isExploratory)) { + if (filterUnreachable(isInbound, isExploratory)) { // NOTE: filterUnreachable returns true for inbound, false for outbound // This is the only use for getPeersByCapability? And the whole set of datastructures in PeerManager? Collection caps = ctx.peerManager().getPeersByCapability(Router.CAPABILITY_UNREACHABLE); @@ -439,7 +445,7 @@ public abstract class TunnelPeerSelector { * do we want to skip peers who haven't been up for long? * @return true for inbound, false for outbound, unless configured otherwise */ - protected boolean filterUnreachable(RouterContext ctx, boolean isInbound, boolean isExploratory) { + protected boolean filterUnreachable(boolean isInbound, boolean isExploratory) { boolean def = false; String val = null; diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java index 4b4b4afe04..1668ecb05b 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPool.java @@ -1086,13 +1086,14 @@ public class TunnelPool { for (int i = len - 1; i >= 0; i--) { peers.add(ti.getPeer(i)); } + break; } } } } if (peers == null) { setLengthOverride(); - peers = _peerSelector.selectPeers(_context, settings); + peers = _peerSelector.selectPeers(settings); } if ( (peers == null) || (peers.isEmpty()) ) { diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 671a3e16bc..a849647410 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -41,6 +41,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { private TunnelPool _outboundExploratory; private final BuildExecutor _executor; private final BuildHandler _handler; + private final TunnelPeerSelector _clientPeerSelector; private boolean _isShutdown; private final int _numHandlerThreads; private static final long[] RATES = { 60*1000, 10*60*1000l, 60*60*1000l }; @@ -60,6 +61,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientInboundPools = new ConcurrentHashMap(4); _clientOutboundPools = new ConcurrentHashMap(4); + _clientPeerSelector = new ClientPeerSelector(ctx); _executor = new BuildExecutor(ctx, this); I2PThread execThread = new I2PThread(_executor, "BuildExecutor", true); @@ -407,8 +409,6 @@ public class TunnelPoolManager implements TunnelManagerFacade { settings.getOutboundSettings().setDestination(dest); TunnelPool inbound = null; TunnelPool outbound = null; - // should we share the clientPeerSelector across both inbound and outbound? - // or just one for all clients? why separate? boolean delayOutbound = false; // synch with removeTunnels() below @@ -416,7 +416,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { inbound = _clientInboundPools.get(dest); if (inbound == null) { inbound = new TunnelPool(_context, this, settings.getInboundSettings(), - new ClientPeerSelector()); + _clientPeerSelector); _clientInboundPools.put(dest, inbound); } else { inbound.setSettings(settings.getInboundSettings()); @@ -424,7 +424,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { outbound = _clientOutboundPools.get(dest); if (outbound == null) { outbound = new TunnelPool(_context, this, settings.getOutboundSettings(), - new ClientPeerSelector()); + _clientPeerSelector); _clientOutboundPools.put(dest, outbound); delayOutbound = true; } else { @@ -511,7 +511,7 @@ public class TunnelPoolManager implements TunnelManagerFacade { t.setDaemon(true); t.start(); } - ExploratoryPeerSelector selector = new ExploratoryPeerSelector(); + ExploratoryPeerSelector selector = new ExploratoryPeerSelector(_context); TunnelPoolSettings inboundSettings = new TunnelPoolSettings(); inboundSettings.setIsExploratory(true);