diff --git a/apps/i2psnark/java/src/org/klomp/snark/BWLimits.java b/apps/i2psnark/java/src/org/klomp/snark/BWLimits.java index 368215119..b5f7f189b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/BWLimits.java +++ b/apps/i2psnark/java/src/org/klomp/snark/BWLimits.java @@ -37,7 +37,9 @@ class BWLimits { return rv; } +/**** public static void main(String args[]) { System.out.println(Arrays.toString(getBWLimits("127.0.0.1", 7654))); } +****/ } diff --git a/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java b/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java index e1ebf5fa9..8ac44888f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ConnectionAcceptor.java @@ -33,11 +33,10 @@ import net.i2p.data.Hash; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; import net.i2p.util.ObjectCounter; -import net.i2p.util.SimpleScheduler; -import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; /** - * Accepts connections on a TCP port and routes them to sub-acceptors. + * Accepts connections on a I2PServerSocket and routes them to PeerAcceptors. */ class ConnectionAcceptor implements Runnable { @@ -47,14 +46,22 @@ class ConnectionAcceptor implements Runnable private Thread thread; private final I2PSnarkUtil _util; private final ObjectCounter _badCounter = new ObjectCounter(); + private final SimpleTimer2.TimedEvent _cleaner; - private boolean stop; + private volatile boolean stop; private boolean socketChanged; - private static final int MAX_BAD = 2; + // protocol errors before blacklisting. + private static final int MAX_BAD = 1; private static final long BAD_CLEAN_INTERVAL = 30*60*1000; - public ConnectionAcceptor(I2PSnarkUtil util) { _util = util; } + /** + * Multitorrent + */ + public ConnectionAcceptor(I2PSnarkUtil util) { + _util = util; + _cleaner = new Cleaner(); + } public synchronized void startAccepting(PeerCoordinatorSet set, I2PServerSocket socket) { if (serverSocket != socket) { @@ -67,11 +74,14 @@ class ConnectionAcceptor implements Runnable thread = new I2PAppThread(this, "I2PSnark acceptor"); thread.setDaemon(true); thread.start(); - _util.getContext().simpleScheduler().addPeriodicEvent(new Cleaner(), BAD_CLEAN_INTERVAL); + _cleaner.schedule(BAD_CLEAN_INTERVAL); } } } + /** + * Unused (single torrent) + */ public ConnectionAcceptor(I2PSnarkUtil util, I2PServerSocket serverSocket, PeerAcceptor peeracceptor) { @@ -82,7 +92,7 @@ class ConnectionAcceptor implements Runnable thread = new I2PAppThread(this, "I2PSnark acceptor"); thread.setDaemon(true); thread.start(); - _util.getContext().simpleScheduler().addPeriodicEvent(new Cleaner(), BAD_CLEAN_INTERVAL); + _cleaner = new Cleaner(); } public void halt() @@ -101,14 +111,20 @@ class ConnectionAcceptor implements Runnable Thread t = thread; if (t != null) t.interrupt(); + _cleaner.cancel(); } + /** + * Effectively unused, would only be called if we changed + * I2CP host/port, which is hidden in the gui if in router context + */ public void restart() { serverSocket = _util.getServerSocket(); socketChanged = true; Thread t = thread; if (t != null) t.interrupt(); + _cleaner.schedule(BAD_CLEAN_INTERVAL); } public int getPort() @@ -150,9 +166,11 @@ class ConnectionAcceptor implements Runnable try { socket.close(); } catch (IOException ioe) {} continue; } - if (_badCounter.count(socket.getPeerDestination().calculateHash()) >= MAX_BAD) { + int bad = _badCounter.count(socket.getPeerDestination().calculateHash()); + if (count >= MAX_BAD) { if (_log.shouldLog(Log.WARN)) - _log.warn("Rejecting connection from " + socket.getPeerDestination().calculateHash() + " after " + MAX_BAD + " failures"); + _log.warn("Rejecting connection from " + socket.getPeerDestination().calculateHash() + + " after " + count + " failures, max is " + MAX_BAD); try { socket.close(); } catch (IOException ioe) {} continue; } @@ -214,7 +232,17 @@ class ConnectionAcceptor implements Runnable } /** @since 0.9.1 */ - private class Cleaner implements SimpleTimer.TimedEvent { - public void timeReached() { _badCounter.clear(); } + private class Cleaner extends SimpleTimer2.TimedEvent { + + public Cleaner() { + super(_util.getContext().simpleTimer2()); + } + + public void timeReached() { + if (stop) + return; + _badCounter.clear(); + schedule(BAD_CLEAN_INTERVAL); + } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java b/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java index c5b7ec107..88395f679 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java @@ -37,8 +37,20 @@ interface CoordinatorListener */ void gotMetaInfo(PeerCoordinator coordinator, MetaInfo metainfo); + /** + * Is this number of uploaders over the per-torrent limit? + */ public boolean overUploadLimit(int uploaders); + + /** + * Are we currently over the upstream bandwidth limit? + */ public boolean overUpBWLimit(); + + /** + * Is the total (in Bps) over the upstream bandwidth limit? + */ public boolean overUpBWLimit(long total); + public void addMessage(String message); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 837176920..4d34cbd47 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -158,7 +158,7 @@ public class I2PSnarkUtil { } /** - * This updates the session options and tells the router + * This updates ALL the session options (not just the bw) and tells the router * @param limit KBps */ public void setMaxUpBW(int limit) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/IdleChecker.java b/apps/i2psnark/java/src/org/klomp/snark/IdleChecker.java new file mode 100644 index 000000000..d7b47a343 --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/IdleChecker.java @@ -0,0 +1,120 @@ +/* + * Released into the public domain + * with no warranty of any kind, either expressed or implied. + */ +package org.klomp.snark; + +import java.util.Map; +import java.util.Properties; + +import net.i2p.client.I2PSession; +import net.i2p.client.streaming.I2PSocketManager; +import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; + +/** + * Periodically check for idle condition based on connected peers, + * and reduce/restore tunnel count as necessary. + * We can't use the I2CP idle detector because it's based on traffic, + * so DHT and announces would keep it non-idle. + * + * @since 0.9.7 + */ +class IdleChecker extends SimpleTimer2.TimedEvent { + + private final I2PSnarkUtil _util; + private final PeerCoordinatorSet _pcs; + private final Log _log; + private int _consec; + private boolean _isIdle; + + private static final long CHECK_TIME = 63*1000; + private static final int MAX_CONSEC_IDLE = 4; + + /** + * Caller must schedule + */ + public IdleChecker(I2PSnarkUtil util, PeerCoordinatorSet pcs) { + super(util.getContext().simpleTimer2()); + _log = util.getContext().logManager().getLog(IdleChecker.class); + _util = util; + _pcs = pcs; + } + + public void timeReached() { + if (_util.connected()) { + boolean hasPeers = false; + for (PeerCoordinator pc : _pcs) { + if (pc.getPeers() > 0) { + hasPeers = true; + break; + } + } + if (hasPeers) { + if (_isIdle) + restoreTunnels(); + } else { + if (!_isIdle) { + if (_consec++ >= MAX_CONSEC_IDLE) + reduceTunnels(); + } + } + } else { + _isIdle = false; + _consec = 0; + } + schedule(CHECK_TIME); + } + + /** + * Reduce to 1 in / 1 out tunnel + */ + private void reduceTunnels() { + _isIdle = true; + if (_log.shouldLog(Log.INFO)) + _log.info("Reducing tunnels on idle"); + setTunnels("1", "1", "0", "0"); + } + + /** + * Restore tunnel count + */ + private void restoreTunnels() { + _isIdle = false; + if (_log.shouldLog(Log.INFO)) + _log.info("Restoring tunnels on activity"); + Map opts = _util.getI2CPOptions(); + String i = opts.get("inbound.quantity"); + if (i == null) + i = "3"; + String o = opts.get("outbound.quantity"); + if (o == null) + o = "3"; + String ib = opts.get("inbound.backupQuantity"); + if (ib == null) + ib = "0"; + String ob= opts.get("outbound.backupQuantity"); + if (ob == null) + ob = "0"; + setTunnels(i, o, ib, ob); + } + + /** + * Set in / out / in backup / out backup tunnel counts + */ + private void setTunnels(String i, String o, String ib, String ob) { + _consec = 0; + I2PSocketManager mgr = _util.getSocketManager(); + if (mgr != null) { + I2PSession sess = mgr.getSession(); + if (sess != null) { + Properties newProps = new Properties(); + newProps.setProperty("inbound.quantity", i); + newProps.setProperty("outbound.quantity", o); + newProps.setProperty("inbound.backupQuantity", ib); + newProps.setProperty("outbound.backupQuantity", ob); + sess.updateOptions(newProps); + } + } + } +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java index 6df1da42e..7c4e58d72 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinatorSet.java @@ -12,7 +12,7 @@ import net.i2p.crypto.SHA1Hash; * Each PeerCoordinator is added to the set from within the Snark (and removed * from it there too) */ -class PeerCoordinatorSet { +class PeerCoordinatorSet implements Iterable { private final Map _coordinators; public PeerCoordinatorSet() { diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java index db835f730..00fd61764 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java @@ -27,6 +27,8 @@ import net.i2p.data.DataHelper; /** * TimerTask that monitors the peers and total up/download speeds. * Works together with the main Snark class to report periodical statistics. + * + * @deprecated unused, for command line client only, commented out in Snark.java */ class PeerMonitorTask implements Runnable { @@ -45,6 +47,7 @@ class PeerMonitorTask implements Runnable public void run() { +/***** // Get some statistics int peers = 0; int uploaders = 0; @@ -117,5 +120,6 @@ class PeerMonitorTask implements Runnable lastDownloaded = downloaded; lastUploaded = uploaded; +****/ } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index 8a65bd51e..c92b42210 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -1226,8 +1226,7 @@ public class Snark if (_peerCoordinatorSet == null || uploaders <= 0) return false; int totalUploaders = 0; - for (Iterator iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) { - PeerCoordinator c = iter.next(); + for (PeerCoordinator c : _peerCoordinatorSet) { if (!c.halted()) totalUploaders += c.uploaders; } @@ -1240,8 +1239,7 @@ public class Snark if (_peerCoordinatorSet == null) return false; long total = 0; - for (Iterator iter = _peerCoordinatorSet.iterator(); iter.hasNext(); ) { - PeerCoordinator c = iter.next(); + for (PeerCoordinator c : _peerCoordinatorSet) { if (!c.halted()) total += c.getCurrentUploadRate(); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 92e48d9cd..a59ef4465 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -37,6 +37,7 @@ import net.i2p.util.SecureDirectory; import net.i2p.util.SecureFileOutputStream; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; +import net.i2p.util.SimpleTimer2; import org.klomp.snark.dht.DHT; @@ -70,6 +71,7 @@ public class SnarkManager implements CompleteListener { private final Map _trackerMap; private UpdateManager _umgr; private UpdateHandler _uhandler; + private SimpleTimer2.TimedEvent _idleChecker; public static final String PROP_I2CP_HOST = "i2psnark.i2cpHost"; public static final String PROP_I2CP_PORT = "i2psnark.i2cpPort"; @@ -178,6 +180,8 @@ public class SnarkManager implements CompleteListener { _context.simpleScheduler().addEvent(new Register(), 4*60*1000); // Not required, Jetty has a shutdown hook //_context.addShutdownTask(new SnarkManagerShutdown()); + _idleChecker = new IdleChecker(_util, _peerCoordinatorSet); + _idleChecker.schedule(5*60*1000); } /** @since 0.9.4 */ @@ -210,6 +214,7 @@ public class SnarkManager implements CompleteListener { _running = false; _monitor.interrupt(); _connectionAcceptor.halt(); + _idleChecker.cancel(); stopAllTorrents(true); } @@ -635,6 +640,7 @@ public class SnarkManager implements CompleteListener { addMessage(_("I2CP options changed to {0}", i2cpOpts)); _util.setI2CPConfig(oldI2CPHost, oldI2CPPort, opts); } else { + // Won't happen, I2CP host/port, are hidden in the GUI if in router context if (_util.connected()) { _util.disconnect(); addMessage(_("Disconnecting old I2CP destination")); @@ -658,6 +664,8 @@ public class SnarkManager implements CompleteListener { for (Snark snark : _snarks.values()) { if (snark.restartAcceptor()) { addMessage(_("I2CP listener restarted for \"{0}\"", snark.getBaseName())); + // this is the common ConnectionAcceptor, so we only need to do it once + break; } } }