diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 6cef36f67..0433ffcbb 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -1,5 +1,6 @@ package org.klomp.snark; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -393,6 +394,46 @@ public class I2PSnarkUtil { } } + /** + * Fetch to memory + * @param retries if < 0, set timeout to a few seconds + * @param initialSize buffer size + * @param maxSize fails if greater + * @return null on error + * @since 0.9.4 + */ + public byte[] get(String url, boolean rewrite, int retries, int initialSize, int maxSize) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Fetching [" + url + "] to memory"); + String fetchURL = url; + if (rewrite) + fetchURL = rewriteAnnounce(url); + int timeout; + if (retries < 0) { + if (!connected()) + return null; + timeout = EEPGET_CONNECT_TIMEOUT_SHORT; + retries = 0; + } else { + timeout = EEPGET_CONNECT_TIMEOUT; + if (!connected()) { + if (!connect()) + return null; + } + } + ByteArrayOutputStream out = new ByteArrayOutputStream(initialSize); + EepGet get = new I2PSocketEepGet(_context, _manager, retries, -1, maxSize, null, out, fetchURL); + if (get.fetch(timeout)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Fetch successful [" + url + "]: size=" + out.size()); + return out.toByteArray(); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Fetch failed [" + url + "]"); + return null; + } + } + public I2PServerSocket getServerSocket() { I2PSocketManager mgr = _manager; if (mgr != null) @@ -523,6 +564,15 @@ public class I2PSnarkUtil { return Collections.EMPTY_LIST; return _openTrackers; } + + /** + * List of open trackers to use as backups even if disabled + * @return non-null + * @since 0.9.4 + */ + public List getBackupTrackers() { + return _openTrackers; + } public void setUseOpenTrackers(boolean yes) { _shouldUseOT = yes; diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index dccfddbcd..6a821f2ba 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -20,6 +20,7 @@ package org.klomp.snark; +import java.io.ByteArrayInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -43,6 +44,7 @@ import net.i2p.util.I2PAppThread; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; +import org.klomp.snark.bencode.InvalidBEncodingException; import org.klomp.snark.dht.DHT; /** @@ -70,6 +72,8 @@ public class TrackerClient implements Runnable { private static final String COMPLETED_EVENT = "completed"; private static final String STOPPED_EVENT = "stopped"; private static final String NOT_REGISTERED = "torrent not registered"; //bytemonsoon + /** this is our equivalent to router.utorrent.com for bootstrap */ + private static final String DEFAULT_BACKUP_TRACKER = "http://tracker.welterde.i2p/a"; private final static int SLEEP = 5; // 5 minutes. private final static int DELAY_MIN = 2000; // 2 secs. @@ -78,7 +82,7 @@ public class TrackerClient implements Runnable { private final static int INITIAL_SLEEP = 90*1000; private final static int MAX_CONSEC_FAILS = 5; // slow down after this private final static int LONG_SLEEP = 30*60*1000; // sleep a while after lots of fails - private final static long MIN_TRACKER_ANNOUNCE_INTERVAL = 10*60*1000; + private final static long MIN_TRACKER_ANNOUNCE_INTERVAL = 15*60*1000; private final static long MIN_DHT_ANNOUNCE_INTERVAL = 10*60*1000; private final I2PSnarkUtil _util; @@ -106,6 +110,7 @@ public class TrackerClient implements Runnable { private volatile boolean _fastUnannounce; private long lastDHTAnnounce; private final List trackers; + private final List backupTrackers; /** * Call start() to start it. @@ -131,6 +136,7 @@ public class TrackerClient implements Runnable { this.infoHash = urlencode(snark.getInfoHash()); this.peerID = urlencode(snark.getID()); this.trackers = new ArrayList(2); + this.backupTrackers = new ArrayList(2); } public synchronized void start() { @@ -233,7 +239,7 @@ public class TrackerClient implements Runnable { if (!_initialized) { _initialized = true; // FIXME only when starting everybody at once, not for a single torrent - long delay = I2PAppContext.getGlobalContext().random().nextInt(30*1000); + long delay = _util.getContext().random().nextInt(30*1000); try { Thread.sleep(delay); } catch (InterruptedException ie) {} @@ -267,18 +273,20 @@ public class TrackerClient implements Runnable { if (primary != null) { if (isValidAnnounce(primary)) { trackers.add(new Tracker(primary, true)); - _log.debug("Announce: [" + primary + "] infoHash: " + infoHash); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Announce: [" + primary + "] infoHash: " + infoHash); } else { - _log.warn("Skipping invalid or non-i2p announce: " + primary); + if (_log.shouldLog(Log.WARN)) + _log.warn("Skipping invalid or non-i2p announce: " + primary); } } else { _log.warn("No primary announce"); primary = ""; } - List tlist = _util.getOpenTrackers(); - if (tlist != null && (meta == null || !meta.isPrivate())) { + if (meta == null || !meta.isPrivate()) { + List tlist = _util.getOpenTrackers(); for (int i = 0; i < tlist.size(); i++) { - String url = (String)tlist.get(i); + String url = tlist.get(i); if (!isValidAnnounce(url)) { _log.error("Bad announce URL: [" + url + "]"); continue; @@ -301,9 +309,37 @@ public class TrackerClient implements Runnable { continue; // opentrackers are primary if we don't have primary trackers.add(new Tracker(url, primary.equals(""))); - _log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Additional announce: [" + url + "] for infoHash: " + infoHash); } } + + // backup trackers if DHT needs bootstrapping + if (trackers.isEmpty() && (meta == null || !meta.isPrivate())) { + List tlist = _util.getBackupTrackers(); + for (int i = 0; i < tlist.size(); i++) { + String url = tlist.get(i); + if (!isValidAnnounce(url)) { + _log.error("Bad announce URL: [" + url + "]"); + continue; + } + int slash = url.indexOf('/', 7); + if (slash <= 7) { + _log.error("Bad announce URL: [" + url + "]"); + continue; + } + String dest = _util.lookup(url.substring(7, slash)); + if (dest == null) { + _log.error("Announce host unknown: [" + url.substring(7, slash) + "]"); + continue; + } + backupTrackers.add(new Tracker(url, false)); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Backup announce: [" + url + "] for infoHash: " + infoHash); + } + if (backupTrackers.isEmpty()) + backupTrackers.add(new Tracker(DEFAULT_BACKUP_TRACKER, false)); + } this.completed = coordinator.getLeft() == 0; } @@ -315,7 +351,7 @@ public class TrackerClient implements Runnable { private void loop() { try { - Random r = I2PAppContext.getGlobalContext().random(); + // normally this will only go once, then call queueLoop() and return while(!stop) { if (!verifyConnected()) { @@ -325,187 +361,25 @@ public class TrackerClient implements Runnable { // Local DHT tracker announce DHT dht = _util.getDHT(); - if (dht != null) + if (dht != null && (meta == null || !meta.isPrivate())) dht.announce(snark.getInfoHash()); - long uploaded = coordinator.getUploaded(); - long downloaded = coordinator.getDownloaded(); - long left = coordinator.getLeft(); // -1 in magnet mode - - // First time we got a complete download? - String event; - if (!completed && left == 0) - { - completed = true; - event = COMPLETED_EVENT; - } - else - event = NO_EVENT; - - // *** loop once for each tracker int maxSeenPeers = 0; - for (Tracker tr : trackers) { - if ((!stop) && (!tr.stop) && - (completed || coordinator.needOutboundPeers() || !tr.started) && - (event.equals(COMPLETED_EVENT) || System.currentTimeMillis() > tr.lastRequestTime + tr.interval)) - { - try - { - if (!tr.started) - event = STARTED_EVENT; - TrackerInfo info = doRequest(tr, infoHash, peerID, - uploaded, downloaded, left, - event); - - snark.setTrackerProblems(null); - tr.trackerProblems = null; - tr.registerFails = 0; - tr.consecutiveFails = 0; - if (tr.isPrimary) - consecutiveFails = 0; - runStarted = true; - tr.started = true; - - Set peers = info.getPeers(); - tr.seenPeers = info.getPeerCount(); - if (snark.getTrackerSeenPeers() < tr.seenPeers) // update rising number quickly - snark.setTrackerSeenPeers(tr.seenPeers); - - // pass everybody over to our tracker - dht = _util.getDHT(); - if (dht != null) { - for (Peer peer : peers) { - dht.announce(snark.getInfoHash(), peer.getPeerID().getDestHash()); - } - } - - if (coordinator.needOutboundPeers()) { - // we only want to talk to new people if we need things - // from them (duh) - List ordered = new ArrayList(peers); - Collections.shuffle(ordered, r); - Iterator it = ordered.iterator(); - while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { - Peer cur = it.next(); - // FIXME if id == us || dest == us continue; - // only delay if we actually make an attempt to add peer - if(coordinator.addPeer(cur) && it.hasNext()) { - int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; - try { Thread.sleep(delay); } catch (InterruptedException ie) {} - } - } - } - } - catch (IOException ioe) - { - // Probably not fatal (if it doesn't last to long...) - if (_log.shouldLog(Log.WARN)) - _log.warn - ("WARNING: Could not contact tracker at '" - + tr.announce + "': " + ioe); - tr.trackerProblems = ioe.getMessage(); - // don't show secondary tracker problems to the user - if (tr.isPrimary) - snark.setTrackerProblems(tr.trackerProblems); - if (tr.trackerProblems.toLowerCase(Locale.US).startsWith(NOT_REGISTERED)) { - // Give a guy some time to register it if using opentrackers too - if (trackers.size() == 1) { - stop = true; - snark.stopTorrent(); - } else { // hopefully each on the opentrackers list is really open - if (tr.registerFails++ > MAX_REGISTER_FAILS) - tr.stop = true; - } - } - if (++tr.consecutiveFails == MAX_CONSEC_FAILS) { - tr.seenPeers = 0; - if (tr.interval < LONG_SLEEP) - tr.interval = LONG_SLEEP; // slow down - } - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Not announcing to " + tr.announce + " last announce was " + - new Date(tr.lastRequestTime) + " interval is " + DataHelper.formatDuration(tr.interval)); - } - if ((!tr.stop) && maxSeenPeers < tr.seenPeers) - maxSeenPeers = tr.seenPeers; - } // *** end of trackers loop here - - // Get peers from PEX - if (coordinator.needOutboundPeers() && (meta == null || !meta.isPrivate()) && !stop) { - Set pids = coordinator.getPEXPeers(); - if (!pids.isEmpty()) { - if (_log.shouldLog(Log.INFO)) - _log.info("Got " + pids.size() + " from PEX"); - List peers = new ArrayList(pids.size()); - for (PeerID pID : pids) { - peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo())); - } - Collections.shuffle(peers, r); - Iterator it = peers.iterator(); - while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { - Peer cur = it.next(); - if (coordinator.addPeer(cur) && it.hasNext()) { - int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; - try { Thread.sleep(delay); } catch (InterruptedException ie) {} - } - } - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Not getting PEX peers"); + if (!trackers.isEmpty()) + maxSeenPeers = getPeersFromTrackers(trackers); + int p = getPeersFromPEX(); + if (p > maxSeenPeers) + maxSeenPeers = p; + p = getPeersFromDHT(); + if (p > maxSeenPeers) + maxSeenPeers = p; + // backup if DHT needs bootstrapping + if (trackers.isEmpty() && !backupTrackers.isEmpty() && dht != null && dht.size() < 16) { + p = getPeersFromTrackers(backupTrackers); + if (p > maxSeenPeers) + maxSeenPeers = p; } - // Get peers from DHT - // FIXME this needs to be in its own thread - dht = _util.getDHT(); - if (dht != null && (meta == null || !meta.isPrivate()) && (!stop) && - _util.getContext().clock().now() > lastDHTAnnounce + MIN_DHT_ANNOUNCE_INTERVAL) { - int numwant; - if (event.equals(STOPPED_EVENT) || !coordinator.needOutboundPeers()) - numwant = 1; - else - numwant = _util.getMaxConnections(); - Collection hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000); - if (!hashes.isEmpty()) { - runStarted = true; - lastDHTAnnounce = _util.getContext().clock().now(); - } - if (_log.shouldLog(Log.INFO)) - _log.info("Got " + hashes + " from DHT"); - // announce ourselves while the token is still good - // FIXME this needs to be in its own thread - if (!stop) { - // announce only to the 1 closest - int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000); - if (_log.shouldLog(Log.INFO)) - _log.info("Sent " + good + " good announces to DHT"); - } - - // now try these peers - if ((!stop) && !hashes.isEmpty()) { - List peers = new ArrayList(hashes.size()); - for (Hash h : hashes) { - PeerID pID = new PeerID(h.getData(), _util); - peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo())); - } - Collections.shuffle(peers, r); - Iterator it = peers.iterator(); - while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { - Peer cur = it.next(); - if (coordinator.addPeer(cur) && it.hasNext()) { - int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; - try { Thread.sleep(delay); } catch (InterruptedException ie) {} - } - } - } - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("Not getting DHT peers"); - } - - // we could try and total the unique peers but that's too hard for now snark.setTrackerSeenPeers(maxSeenPeers); @@ -516,6 +390,7 @@ public class TrackerClient implements Runnable { // Sleep some minutes... // Sleep the minimum interval for all the trackers, but 60s minimum int delay; + Random r = _util.getContext().random(); int random = r.nextInt(120*1000); if (completed && runStarted) delay = 3*SLEEP*60*1000 + random; @@ -547,6 +422,212 @@ public class TrackerClient implements Runnable { } } + /** + * @return max peers seen + */ + private int getPeersFromTrackers(List trckrs) { + long uploaded = coordinator.getUploaded(); + long downloaded = coordinator.getDownloaded(); + long left = coordinator.getLeft(); // -1 in magnet mode + + // First time we got a complete download? + String event; + if (!completed && left == 0) + { + completed = true; + event = COMPLETED_EVENT; + } + else + event = NO_EVENT; + + // *** loop once for each tracker + int maxSeenPeers = 0; + for (Tracker tr : trckrs) { + if ((!stop) && (!tr.stop) && + (completed || coordinator.needOutboundPeers() || !tr.started) && + (event.equals(COMPLETED_EVENT) || System.currentTimeMillis() > tr.lastRequestTime + tr.interval)) + { + try + { + if (!tr.started) + event = STARTED_EVENT; + TrackerInfo info = doRequest(tr, infoHash, peerID, + uploaded, downloaded, left, + event); + + snark.setTrackerProblems(null); + tr.trackerProblems = null; + tr.registerFails = 0; + tr.consecutiveFails = 0; + if (tr.isPrimary) + consecutiveFails = 0; + runStarted = true; + tr.started = true; + + Set peers = info.getPeers(); + tr.seenPeers = info.getPeerCount(); + if (snark.getTrackerSeenPeers() < tr.seenPeers) // update rising number quickly + snark.setTrackerSeenPeers(tr.seenPeers); + + // pass everybody over to our tracker + DHT dht = _util.getDHT(); + if (dht != null) { + for (Peer peer : peers) { + dht.announce(snark.getInfoHash(), peer.getPeerID().getDestHash()); + } + } + + if (coordinator.needOutboundPeers()) { + // we only want to talk to new people if we need things + // from them (duh) + List ordered = new ArrayList(peers); + Random r = _util.getContext().random(); + Collections.shuffle(ordered, r); + Iterator it = ordered.iterator(); + while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { + Peer cur = it.next(); + // FIXME if id == us || dest == us continue; + // only delay if we actually make an attempt to add peer + if(coordinator.addPeer(cur) && it.hasNext()) { + int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; + try { Thread.sleep(delay); } catch (InterruptedException ie) {} + } + } + } + } + catch (IOException ioe) + { + // Probably not fatal (if it doesn't last to long...) + if (_log.shouldLog(Log.WARN)) + _log.warn + ("WARNING: Could not contact tracker at '" + + tr.announce + "': " + ioe); + tr.trackerProblems = ioe.getMessage(); + // don't show secondary tracker problems to the user + if (tr.isPrimary) + snark.setTrackerProblems(tr.trackerProblems); + if (tr.trackerProblems.toLowerCase(Locale.US).startsWith(NOT_REGISTERED)) { + // Give a guy some time to register it if using opentrackers too + //if (trckrs.size() == 1) { + // stop = true; + // snark.stopTorrent(); + //} else { // hopefully each on the opentrackers list is really open + if (tr.registerFails++ > MAX_REGISTER_FAILS) + tr.stop = true; + // + } + if (++tr.consecutiveFails == MAX_CONSEC_FAILS) { + tr.seenPeers = 0; + if (tr.interval < LONG_SLEEP) + tr.interval = LONG_SLEEP; // slow down + } + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Not announcing to " + tr.announce + " last announce was " + + new Date(tr.lastRequestTime) + " interval is " + DataHelper.formatDuration(tr.interval)); + } + if ((!tr.stop) && maxSeenPeers < tr.seenPeers) + maxSeenPeers = tr.seenPeers; + } // *** end of trackers loop here + + return maxSeenPeers; + } + + /** + * @return max peers seen + */ + private int getPeersFromPEX() { + // Get peers from PEX + int rv = 0; + if (coordinator.needOutboundPeers() && (meta == null || !meta.isPrivate()) && !stop) { + Set pids = coordinator.getPEXPeers(); + if (!pids.isEmpty()) { + if (_log.shouldLog(Log.INFO)) + _log.info("Got " + pids.size() + " from PEX"); + List peers = new ArrayList(pids.size()); + for (PeerID pID : pids) { + peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo())); + } + Random r = _util.getContext().random(); + Collections.shuffle(peers, r); + Iterator it = peers.iterator(); + while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { + Peer cur = it.next(); + if (coordinator.addPeer(cur) && it.hasNext()) { + int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; + try { Thread.sleep(delay); } catch (InterruptedException ie) {} + } + } + rv = pids.size(); + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Not getting PEX peers"); + } + return rv; + } + + /** + * @return max peers seen + */ + private int getPeersFromDHT() { + // Get peers from DHT + // FIXME this needs to be in its own thread + int rv = 0; + DHT dht = _util.getDHT(); + if (dht != null && (meta == null || !meta.isPrivate()) && (!stop) && + _util.getContext().clock().now() > lastDHTAnnounce + MIN_DHT_ANNOUNCE_INTERVAL) { + int numwant; + if (!coordinator.needOutboundPeers()) + numwant = 1; + else + numwant = _util.getMaxConnections(); + Collection hashes = dht.getPeers(snark.getInfoHash(), numwant, 2*60*1000); + if (!hashes.isEmpty()) { + runStarted = true; + lastDHTAnnounce = _util.getContext().clock().now(); + rv = hashes.size(); + } + if (_log.shouldLog(Log.INFO)) + _log.info("Got " + hashes + " from DHT"); + // announce ourselves while the token is still good + // FIXME this needs to be in its own thread + if (!stop) { + // announce only to the 1 closest + int good = dht.announce(snark.getInfoHash(), 1, 5*60*1000); + if (_log.shouldLog(Log.INFO)) + _log.info("Sent " + good + " good announces to DHT"); + } + + // now try these peers + if ((!stop) && !hashes.isEmpty()) { + List peers = new ArrayList(hashes.size()); + for (Hash h : hashes) { + try { + PeerID pID = new PeerID(h.getData(), _util); + peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo())); + } catch (InvalidBEncodingException ibe) {} + } + Random r = _util.getContext().random(); + Collections.shuffle(peers, r); + Iterator it = peers.iterator(); + while ((!stop) && it.hasNext() && coordinator.needOutboundPeers()) { + Peer cur = it.next(); + if (coordinator.addPeer(cur) && it.hasNext()) { + int delay = r.nextInt(DELAY_RAND) + DELAY_MIN; + try { Thread.sleep(delay); } catch (InterruptedException ie) {} + } + } + } + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Not getting DHT peers"); + } + return rv; + } + + /** * Creates a thread for each tracker in parallel if tunnel is still open * @since 0.9.1 @@ -630,7 +711,8 @@ public class TrackerClient implements Runnable { if (! event.equals(NO_EVENT)) buf.append("&event=").append(event); buf.append("&numwant="); - if (left == 0 || event.equals(STOPPED_EVENT) || !coordinator.needOutboundPeers()) + boolean small = left == 0 || event.equals(STOPPED_EVENT) || !coordinator.needOutboundPeers(); + if (small) buf.append('0'); else buf.append(_util.getMaxConnections()); @@ -641,14 +723,12 @@ public class TrackerClient implements Runnable { tr.lastRequestTime = System.currentTimeMillis(); // Don't wait for a response to stopped when shutting down boolean fast = _fastUnannounce && event.equals(STOPPED_EVENT); - File fetched = _util.get(s, true, fast ? -1 : 0); + byte[] fetched = _util.get(s, true, fast ? -1 : 0, small ? 128 : 1024, small ? 1024 : 8*1024); if (fetched == null) { throw new IOException("Error fetching " + s); } - InputStream in = null; - try { - in = new FileInputStream(fetched); + InputStream in = new ByteArrayInputStream(fetched); TrackerInfo info = new TrackerInfo(in, snark.getID(), snark.getInfoHash(), snark.getMetaInfo(), _util); @@ -661,10 +741,6 @@ public class TrackerClient implements Runnable { tr.interval = Math.max(MIN_TRACKER_ANNOUNCE_INTERVAL, info.getInterval() * 1000l); return info; - } finally { - if (in != null) try { in.close(); } catch (IOException ioe) {} - fetched.delete(); - } } /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java index 55521a787..b94c5f555 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java +++ b/apps/i2psnark/java/src/org/klomp/snark/dht/KRPC.java @@ -143,6 +143,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT { /** how long since generated do we delete - BEP 5 says 10 minutes */ private static final long MAX_TOKEN_AGE = 10*60*1000; private static final long MAX_INBOUND_TOKEN_AGE = MAX_TOKEN_AGE - 2*60*1000; + private static final int MAX_OUTBOUND_TOKENS = 5000; /** how long since sent do we wait for a reply */ private static final long MAX_MSGID_AGE = 2*60*1000; /** how long since sent do we wait for a reply */ @@ -1208,7 +1209,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { /** * Handle and respond to the query. - * We have no node info here, it came on response port, we have to get it from the token + * We have no node info here, it came on response port, we have to get it from the token. + * So we can't verify that it came from the same peer, as BEP 5 specifies. */ private void receiveAnnouncePeer(MsgID msgID, InfoHash ih, byte[] tok) throws InvalidBEncodingException { Token token = new Token(tok); @@ -1216,8 +1218,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT { if (nInfo == null) { if (_log.shouldLog(Log.WARN)) _log.warn("Unknown token in announce_peer: " + token); - if (_log.shouldLog(Log.INFO)) - _log.info("Current known tokens: " + _outgoingTokens.keySet()); + //if (_log.shouldLog(Log.INFO)) + // _log.info("Current known tokens: " + _outgoingTokens.keySet()); return; } if (_log.shouldLog(Log.INFO)) @@ -1282,8 +1284,9 @@ public class KRPC implements I2PSessionMuxedListener, DHT { * @throws NPE, IllegalArgumentException, and others too */ private List receiveNodes(NodeInfo nInfo, byte[] ids) throws InvalidBEncodingException { - List rv = new ArrayList(ids.length / NodeInfo.LENGTH); - for (int off = 0; off < ids.length; off += NodeInfo.LENGTH) { + int max = Math.min(K, ids.length / NodeInfo.LENGTH); + List rv = new ArrayList(max); + for (int off = 0; off < ids.length && rv.size() < max; off += NodeInfo.LENGTH) { NodeInfo nInf = new NodeInfo(ids, off); if (_blacklist.contains(nInf.getNID())) { if (_log.shouldLog(Log.INFO)) @@ -1305,12 +1308,15 @@ public class KRPC implements I2PSessionMuxedListener, DHT { private List receivePeers(NodeInfo nInfo, List peers) throws InvalidBEncodingException { if (_log.shouldLog(Log.INFO)) _log.info("Rcvd peers from: " + nInfo); - List rv = new ArrayList(peers.size()); + int max = Math.min(MAX_WANT, peers.size()); + List rv = new ArrayList(max); for (BEValue bev : peers) { byte[] b = bev.getBytes(); //Hash h = new Hash(b); Hash h = Hash.create(b); rv.add(h); + if (rv.size() >= max) + break; } if (_log.shouldLog(Log.INFO)) _log.info("Rcvd peers from: " + nInfo + ": " + DataHelper.toString(rv)); @@ -1535,20 +1541,28 @@ public class KRPC implements I2PSessionMuxedListener, DHT { _blacklist.size() + " in blacklist, " + _outgoingTokens.size() + " sent Tokens, " + _incomingTokens.size() + " rcvd Tokens"); + int cnt = 0; + long expire = now - MAX_TOKEN_AGE; for (Iterator iter = _outgoingTokens.keySet().iterator(); iter.hasNext(); ) { Token tok = iter.next(); - if (tok.lastSeen() < now - MAX_TOKEN_AGE) + // just delete at random if we have too many + // TODO reduce the expire time and iterate again? + if (tok.lastSeen() < expire || cnt >= MAX_OUTBOUND_TOKENS) iter.remove(); + else + cnt++; } + expire = now - MAX_INBOUND_TOKEN_AGE; for (Iterator iter = _incomingTokens.values().iterator(); iter.hasNext(); ) { Token tok = iter.next(); - if (tok.lastSeen() < now - MAX_INBOUND_TOKEN_AGE) + if (tok.lastSeen() < expire) iter.remove(); } + expire = now - BLACKLIST_CLEAN_TIME; for (Iterator iter = _blacklist.iterator(); iter.hasNext(); ) { NID nid = iter.next(); // lastSeen() is actually when-added - if (now > nid.lastSeen() + BLACKLIST_CLEAN_TIME) + if (nid.lastSeen() < expire) iter.remove(); } // TODO sent queries?