diff --git a/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java b/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java index 767979d061c8a20f9f5291c46d50f2a2f659ebe4..c5b7ec1077af45f870413425a224077d0a646a63 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/CoordinatorListener.java @@ -40,4 +40,5 @@ interface CoordinatorListener public boolean overUploadLimit(int uploaders); public boolean overUpBWLimit(); public boolean overUpBWLimit(long total); + public void addMessage(String message); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index d0ac03092a7d5e9f45b54e2eac14c547a2c7e78e..38ff07ced2a707ee5abd0206656932d11a171e64 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -87,7 +87,6 @@ public class I2PSnarkUtil { //setProxy("127.0.0.1", 4444); setI2CPConfig("127.0.0.1", 7654, null); _shitlist = new ConcurrentHashSet(); - _configured = false; _maxUploaders = Snark.MAX_TOTAL_UPLOADERS; _maxUpBW = DEFAULT_MAX_UP_BW; _maxConnections = MAX_CONNECTIONS; @@ -248,7 +247,6 @@ public class I2PSnarkUtil { _manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts); _connecting = false; } - // FIXME this only instantiates krpc once, left stuck with old manager if (_shouldUseDHT && _manager != null && _dht == null) _dht = new KRPC(_context, _manager.getSession()); return (_manager != null); diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index e06657e0d3890a97842b525c2f6eca41c0cf05be..fd170586a398269e1420e73b0c5a52727975cfd4 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -890,8 +890,10 @@ class PeerCoordinator implements PeerListener snark.stopTorrent(); String msg = "Error reading the storage (piece " + piece + ") for " + metainfo.getName() + ": " + ioe; _log.error(msg, ioe); - SnarkManager.instance().addMessage(msg); - SnarkManager.instance().addMessage("Fatal storage error: Stopping torrent " + metainfo.getName()); + if (listener != null) { + listener.addMessage(msg); + listener.addMessage("Fatal storage error: Stopping torrent " + metainfo.getName()); + } throw new RuntimeException(msg, ioe); } } @@ -970,8 +972,10 @@ class PeerCoordinator implements PeerListener snark.stopTorrent(); String msg = "Error writing storage (piece " + piece + ") for " + metainfo.getName() + ": " + ioe; _log.error(msg, ioe); - SnarkManager.instance().addMessage(msg); - SnarkManager.instance().addMessage("Fatal storage error: Stopping torrent " + metainfo.getName()); + if (listener != null) { + listener.addMessage(msg); + listener.addMessage("Fatal storage error: Stopping torrent " + metainfo.getName()); + } throw new RuntimeException(msg, ioe); } wantedPieces.remove(p); diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index efb491d318a8be3cfd001cce63043ed09bddad30..c0781d7d8ab5f7adb63116cc0e0f93603dc988e7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -1179,6 +1179,15 @@ public class Snark //System.exit(0); } + /** + * StorageListener and CoordinatorListener callback + * @since 0.9.2 + */ + public void addMessage(String message) { + if (completeListener != null) + completeListener.addMessage(this, message); + } + public interface CompleteListener { public void torrentComplete(Snark snark); public void updateStatus(Snark snark); @@ -1198,6 +1207,11 @@ public class Snark */ public void fatal(Snark snark, String error); + /** + * @since 0.9.2 + */ + public void addMessage(Snark snark, String message); + // not really listeners but the easiest way to get back to an optional SnarkManager public long getSavedTorrentTime(Snark snark); public BitField getSavedTorrentBitField(Snark snark); diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 1782af7f42c45de21d78b04c475c9cc808505def..47af95bbb22f6b9364caaa9ee4d1422abf7d220a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -123,15 +123,11 @@ public class SnarkManager implements Snark.CompleteListener { /** comma delimited list of name=announceURL=baseURL for the trackers to be displayed */ public static final String PROP_TRACKERS = "i2psnark.trackers"; - private static final SnarkManager _instance = new SnarkManager(); - - public static SnarkManager instance() { return _instance; } - - private SnarkManager() { + public SnarkManager(I2PAppContext ctx) { _snarks = new ConcurrentHashMap(); _magnets = new ConcurrentHashSet(); _addSnarkLock = new Object(); - _context = I2PAppContext.getGlobalContext(); + _context = ctx; _log = _context.logManager().getLog(SnarkManager.class); _messages = new LinkedBlockingQueue(); _util = new I2PSnarkUtil(_context); @@ -824,13 +820,13 @@ public class SnarkManager implements Snark.CompleteListener { if (info.isPrivate()) { addMessage(_("ERROR - No I2P trackers in private torrent \"{0}\"", info.getName())); } else if (_util.shouldUseOpenTrackers() && _util.getOpenTrackers() != null) { - //addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName())); - addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers only.", info.getName())); - //} else if (_util.getDHT() != null) { - // addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName())); + addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers and DHT only.", info.getName())); + //addMessage(_("Warning - No I2P trackers in \"{0}\", will announce to I2P open trackers only.", info.getName())); + } else if (_util.getDHT() != null) { + addMessage(_("Warning - No I2P trackers in \"{0}\", and open trackers are disabled, will announce to DHT only.", info.getName())); } else { - //addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName())); - addMessage(_("Warning - No I2P Trackers found in \"{0}\". Make sure Open Tracker is enabled before starting this torrent.", info.getName())); + addMessage(_("Warning - No I2P trackers in \"{0}\", and DHT and open trackers are disabled, you should enable open trackers or DHT before starting the torrent.", info.getName())); + //addMessage(_("Warning - No I2P Trackers found in \"{0}\". Make sure Open Tracker is enabled before starting this torrent.", info.getName())); dontAutoStart = true; } } @@ -1456,6 +1452,14 @@ public class SnarkManager implements Snark.CompleteListener { addMessage(_("Error on torrent {0}", snark.getName()) + ": " + error); } + /** + * A Snark.CompleteListener method. + * @since 0.9.2 + */ + public void addMessage(Snark snark, String message) { + addMessage(message); + } + // End Snark.CompleteListeners /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/Storage.java b/apps/i2psnark/java/src/org/klomp/snark/Storage.java index a17e1bde6873052e7dc6f49627e16f31b46174aa..145a76a28ca51d81d8094ec50a04f31d332dca2a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Storage.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Storage.java @@ -740,7 +740,8 @@ public class Storage } else { String msg = "File '" + names[i] + "' exists, but has wrong length (expected " + lengths[i] + " but found " + length + ") - repairing corruption"; - SnarkManager.instance().addMessage(msg); + if (listener != null) + listener.addMessage(msg); _log.error(msg); changed = true; resume = true; diff --git a/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java b/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java index 279d958e4d7afdae5dd8dff36670b9c7bcf8eed9..d484907fa6ad39e3c6ec2ae5863c2210688a63fb 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/StorageListener.java @@ -61,4 +61,6 @@ interface StorageListener * */ void setWantedPieces(Storage storage); + + void addMessage(String message); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index ccb02813405de6150da2802cf3aa56093ec46d1f..826b659650a7847970521c6d3ba900e22d40c5f0 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -297,12 +297,11 @@ public class TrackerClient implements Runnable { } } - if (trackers.isEmpty()) { + if (trackers.isEmpty() && _util.getDHT() == null) { stop = true; - // FIXME translate - SnarkManager.instance().addMessage("No valid trackers for " + this.snark.getBaseName() + " - enable opentrackers?"); + this.snark.addMessage(_util.getString("No valid trackers for {0} - enable opentrackers or DHT?", + this.snark.getBaseName())); _log.error("No valid trackers for " + this.snark.getBaseName()); - // FIXME keep going if DHT enabled this.snark.stopTorrent(); return; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java index d5642835bd29ada0c98e1e2230b5b590ad574a52..d5b8913ffd8d7dd9eecc6415b836535780adb827 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHT.java @@ -94,4 +94,9 @@ public interface DHT { * Known nodes, not estimated total network size. */ public int size(); + + /** + * Debug info, HTML formatted + */ + public String renderStatusHTML(); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java index 97fbcef73d997187aafc0133a7ea1666935351f9..dc56fdd721dcac7fc0d62f9c1a21a3ad146a8595 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/DHTTracker.java @@ -27,6 +27,10 @@ class DHTTracker { private long _expireTime; private final Log _log; private volatile boolean _isRunning; + /** not current, updated by cleaner */ + private int _peerCount; + /** not current, updated by cleaner */ + private int _torrentCount; /** stagger with other cleaners */ private static final long CLEAN_TIME = 199*1000; @@ -97,6 +101,15 @@ class DHTTracker { return rv; } + /** + * Debug info, HTML formatted + */ + public void renderStatusHTML(StringBuilder buf) { + buf.append("DHT tracker: ").append(_torrentCount).append(" torrents ") + .append(_peerCount).append(" peers ") + .append(DataHelper.formatDuration(_expireTime)).append(" expiration<br>"); + } + private class Cleaner extends SimpleTimer2.TimedEvent { public Cleaner() { @@ -137,6 +150,8 @@ class DHTTracker { torrentCount + " torrents, " + peerCount + " peers, " + DataHelper.formatDuration(_expireTime) + " expiration"); + _peerCount = peerCount; + _torrentCount = torrentCount; schedule(CLEAN_TIME); } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index f892233e4c1eed1245d886b59710fc05ca675fbe..8b2d63c107721036b37b91739f8fbaa2db15639d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -21,6 +21,7 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; import net.i2p.client.I2PClient; @@ -111,6 +112,12 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private final File _dhtFile; private volatile boolean _isRunning; private volatile boolean _hasBootstrapped; + /** stats */ + private final AtomicLong _rxPkts = new AtomicLong(); + private final AtomicLong _txPkts = new AtomicLong(); + private final AtomicLong _rxBytes = new AtomicLong(); + private final AtomicLong _txBytes = new AtomicLong(); + private long _started; /** all-zero NID used for pings */ public static final NID FAKE_NID = new NID(new byte[NID.HASH_LENGTH]); @@ -519,6 +526,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT { // no need to keep ref, it will eventually stop new Cleaner(); new Explorer(5*1000); + _txPkts.set(0); + _rxPkts.set(0); + _txBytes.set(0); + _rxBytes.set(0); + _started = _context.clock().now(); } /** @@ -548,6 +560,26 @@ public class KRPC implements I2PSessionMuxedListener, DHT { _knownNodes.clear(); } + /** + * Debug info, HTML formatted + */ + public String renderStatusHTML() { + long uptime = Math.max(1000, _context.clock().now() - _started); + StringBuilder buf = new StringBuilder(); + buf.append("<br><b>DHT DEBUG</b><br>TX: ").append(_txPkts.get()).append(" pkts / ") + .append(DataHelper.formatSize2(_txBytes.get())).append("B / ") + .append(DataHelper.formatSize2(_txBytes.get() * 1000 / uptime)).append("Bps<br>" + + "RX: ").append(_rxPkts.get()).append(" pkts / ") + .append(DataHelper.formatSize2(_rxBytes.get())).append("B / ") + .append(DataHelper.formatSize2(_rxBytes.get() * 1000 / uptime)).append("Bps<br>" + + "DHT Peers: ").append( _knownNodes.size()).append("<br>" + + "Sent tokens: ").append(_outgoingTokens.size()).append("<br>" + + "Rcvd tokens: ").append(_incomingTokens.size()).append("<br>" + + "Pending queries: ").append(_sentQueries.size()).append("<br>"); + _tracker.renderStatusHTML(buf); + return buf.toString(); + } + ////////// All private below here ///////////////////////////////////// ///// Sending..... @@ -862,7 +894,10 @@ public class KRPC implements I2PSessionMuxedListener, DHT { boolean success = _session.sendMessage(dest, payload, 0, payload.length, null, null, 60*1000, repliable ? I2PSession.PROTO_DATAGRAM : I2PSession.PROTO_DATAGRAM_RAW, fromPort, toPort); - if (!success) { + if (success) { + _txPkts.incrementAndGet(); + _txBytes.addAndGet(payload.length); + } else { if (_log.shouldLog(Log.WARN)) _log.warn("WTF sendMessage fail"); } @@ -880,7 +915,6 @@ public class KRPC implements I2PSessionMuxedListener, DHT { * @param from dest or null if it didn't come in on signed port */ private void receiveMessage(Destination from, int fromPort, byte[] payload) { - try { InputStream is = new ByteArrayInputStream(payload); BDecoder dec = new BDecoder(is); @@ -1352,14 +1386,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT { * @param toPort 1-65535 or 0 for unspecified */ public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { + // TODO throttle try { byte[] payload = session.receiveMessage(msgId); + _rxPkts.incrementAndGet(); + _rxBytes.addAndGet(payload.length); if (toPort == _qPort) { // repliable I2PDatagramDissector dgDiss = new I2PDatagramDissector(); dgDiss.loadI2PDatagram(payload); payload = dgDiss.getPayload(); Destination from = dgDiss.getSender(); + // TODO per-dest throttle receiveMessage(from, fromPort, payload); } else if (toPort == _rPort) { // raw diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index 3eafa3ecd83e5c0f8c8f29267411c782beacfe6d..8c689a60208fd6b3f9a4feeaa8dd5b75b9be1163 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -75,8 +75,7 @@ public class I2PSnarkServlet extends DefaultServlet { _context = I2PAppContext.getGlobalContext(); _log = _context.logManager().getLog(I2PSnarkServlet.class); _nonce = _context.random().nextLong(); - // FIXME instantiate new one every time - _manager = SnarkManager.instance(); + _manager = new SnarkManager(_context); String configFile = _context.getProperty(PROP_CONFIG_FILE); if ( (configFile == null) || (configFile.trim().length() <= 0) ) configFile = "i2psnark.config"; @@ -449,9 +448,9 @@ public class I2PSnarkServlet extends DefaultServlet { } out.write("</th></tr></thead>\n"); String uri = "/i2psnark/"; + boolean showDebug = "2".equals(peerParam); for (int i = 0; i < snarks.size(); i++) { Snark snark = (Snark)snarks.get(i); - boolean showDebug = "2".equals(peerParam); boolean showPeers = showDebug || "1".equals(peerParam) || Base64.encode(snark.getInfoHash()).equals(peerParam); displaySnark(out, snark, uri, i, stats, showPeers, isDegraded, noThinsp, showDebug); } @@ -478,6 +477,8 @@ public class I2PSnarkServlet extends DefaultServlet { out.write(", "); out.write(ngettext("1 DHT peer", "{0} DHT peers", dhts)); } + if (showDebug) + out.write(dht.renderStatusHTML()); } out.write("</th>\n"); if (_manager.util().connected()) {