diff --git a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java index 7b2979b8ab9f6c6cd6e675176bf112bbb4a50ec1..44490c37be367f1fd338aa3224b351383953d421 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java +++ b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java @@ -85,10 +85,18 @@ abstract class ExtensionHandler { // peer state calls peer listener calls sendPEX() } + MagnetState state = peer.getMagnetState(); + if (msgmap.get(TYPE_METADATA) == null) { if (_log.shouldLog(Log.WARN)) _log.debug("Peer does not support metadata extension: " + peer); - // drop if we need metainfo ? + // drop if we need metainfo and we haven't found anybody yet + synchronized(state) { + if (!state.isInitialized()) { + _log.debug("Dropping peer, we need metadata! " + peer); + peer.disconnect(); + } + } return; } @@ -96,14 +104,19 @@ abstract class ExtensionHandler { if (msize == null) { if (_log.shouldLog(Log.WARN)) _log.debug("Peer does not have the metainfo size yet: " + peer); - // drop if we need metainfo ? + // drop if we need metainfo and we haven't found anybody yet + synchronized(state) { + if (!state.isInitialized()) { + _log.debug("Dropping peer, we need metadata! " + peer); + peer.disconnect(); + } + } return; } int metaSize = msize.getInt(); if (_log.shouldLog(Log.WARN)) _log.debug("Got the metainfo size: " + metaSize); - MagnetState state = peer.getMagnetState(); int remaining; synchronized(state) { if (state.isComplete()) diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 8fd7745678dd7a775f7e7d02a21a382094220f4f..a3ecb1777ab1f820accd3ea47727b55b007b164b 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -35,7 +35,7 @@ import net.i2p.util.SimpleTimer; import net.i2p.util.Translate; import org.klomp.snark.dht.DHT; -import org.klomp.snark.dht.KRPC; +//import org.klomp.snark.dht.KRPC; /** * I2P specific helpers for I2PSnark @@ -213,8 +213,8 @@ public class I2PSnarkUtil { _manager = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, opts); } // FIXME this only instantiates krpc once, left stuck with old manager - if (ENABLE_DHT && _manager != null && _dht == null) - _dht = new KRPC(_context, _manager.getSession()); + //if (ENABLE_DHT && _manager != null && _dht == null) + // _dht = new KRPC(_context, _manager.getSession()); return (_manager != null); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index 5a003cac23e6a62325e12b1b01e7a27b6ac5a163..5110ae9a5f01e88173c2cdee86e475a2af41d498 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; import net.i2p.data.DataHelper; import net.i2p.data.Destination; @@ -38,7 +39,7 @@ import org.klomp.snark.bencode.BEValue; public class Peer implements Comparable { - private Log _log = new Log(Peer.class); + private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(Peer.class); // Identifying property, the peer id of the other side. private final PeerID peerID; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java index b767f875a7c7687a83c92e0a5c8959813d1fbe55..5d7b6d66db947808fde63b67e859c85dbec3191d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java @@ -205,12 +205,14 @@ class PeerCheckerTask extends TimerTask } } peer.retransmitRequests(); + // send PEX + if ((_runCount % 17) == 0 && !peer.isCompleted()) + coordinator.sendPeers(peer); peer.keepAlive(); // announce them to local tracker (TrackerClient does this too) if (_util.getDHT() != null && (_runCount % 5) == 0) { _util.getDHT().announce(coordinator.getInfoHash(), peer.getPeerID().getDestHash()); } - // send PEX } // Resync actual uploaders value diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 5e75f176438fd676bca656261e1d891b5271c97b..cdf1e58b5e70cd3e4cd8adbb947ac76e66fa43ce 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -27,15 +27,22 @@ import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Random; +import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; import net.i2p.util.SimpleTimer2; +import org.klomp.snark.bencode.BEValue; +import org.klomp.snark.bencode.InvalidBEncodingException; import org.klomp.snark.dht.DHT; /** @@ -90,6 +97,11 @@ public class PeerCoordinator implements PeerListener */ final Queue<Peer> peers; + /** + * Peers we heard about via PEX + */ + private final Set<PeerID> pexPeers; + /** estimate of the peers, without requiring any synchronization */ private volatile int peerCount; @@ -134,6 +146,7 @@ public class PeerCoordinator implements PeerListener partialPieces = new ArrayList(getMaxConnections() + 1); peers = new LinkedBlockingQueue(); magnetState = new MagnetState(infohash, metainfo); + pexPeers = new ConcurrentHashSet(); // Install a timer to check the uploaders. // Randomize the first start time so multiple tasks are spread out, @@ -1143,16 +1156,31 @@ public class PeerCoordinator implements PeerListener } } } else if (id == ExtensionHandler.ID_HANDSHAKE) { - try { - if (peer.getHandshakeMap().get("m").getMap().get(ExtensionHandler.TYPE_PEX) != null) { - List<Peer> pList = peerList(); - pList.remove(peer); + sendPeers(peer); + } + } + + /** + * Send a PEX message to the peer, if he supports PEX. + * This just sends everybody we are connected to, we don't + * track new vs. old peers yet. + * @since 0.8.4 + */ + void sendPeers(Peer peer) { + Map<String, BEValue> handshake = peer.getHandshakeMap(); + if (handshake == null) + return; + BEValue bev = handshake.get("m"); + if (bev == null) + return; + try { + if (bev.getMap().get(ExtensionHandler.TYPE_PEX) != null) { + List<Peer> pList = peerList(); + pList.remove(peer); + if (!pList.isEmpty()) ExtensionHandler.sendPEX(peer, pList); - } - } catch (Exception e) { - // NPE, no map } - } + } catch (InvalidBEncodingException ibee) {} } /** @@ -1185,7 +1213,30 @@ public class PeerCoordinator implements PeerListener * @since 0.8.4 */ public void gotPeers(Peer peer, List<PeerID> peers) { - // spin off thread or timer task to do a new Peer() and an addPeer() for each one + if (completed() || !needPeers()) + return; + Destination myDest = _util.getMyDestination(); + if (myDest == null) + return; + byte[] myHash = myDest.calculateHash().getData(); + List<Peer> pList = peerList(); + for (PeerID id : peers) { + if (peerIDInList(id, pList) != null) + continue; + if (DataHelper.eq(myHash, id.getDestHash())) + continue; + pexPeers.add(id); + } + // TrackerClient will poll for pexPeers and do the add in its thread, + // rather than running another thread here. + } + + /** + * Called by TrackerClient + * @since 0.8.4 + */ + Set<PeerID> getPEXPeers() { + return pexPeers; } /** Return number of allowed uploaders for this torrent. diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index d03ca55db3d965b43fcce7995525e6482edfb1ba..c8f1cd6c96d28fdb99734176fc889fe2d1c8fb61 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -300,10 +300,8 @@ public class TrackerClient extends I2PAppThread Peer cur = it.next(); // FIXME if id == us || dest == us continue; // only delay if we actually make an attempt to add peer - if(coordinator.addPeer(cur)) { - int delay = DELAY_MUL; - delay *= r.nextInt(10); - delay += DELAY_MIN; + if(coordinator.addPeer(cur) && it.hasNext()) { + int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN; sleptTime += delay; try { Thread.sleep(delay); } catch (InterruptedException ie) {} } @@ -341,6 +339,27 @@ public class TrackerClient extends I2PAppThread maxSeenPeers = tr.seenPeers; } // *** end of trackers loop here + // Get peers from PEX + if (left > 0 && coordinator.needPeers() && !stop) { + Set<PeerID> pids = coordinator.getPEXPeers(); + if (!pids.isEmpty()) { + _util.debug("Got " + pids.size() + " from PEX", Snark.INFO); + List<Peer> peers = new ArrayList(pids.size()); + for (PeerID pID : pids) { + peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo())); + } + Collections.shuffle(peers, r); + Iterator<Peer> it = peers.iterator(); + while ((!stop) && it.hasNext()) { + Peer cur = it.next(); + if (coordinator.addPeer(cur) && it.hasNext()) { + int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN; + try { Thread.sleep(delay); } catch (InterruptedException ie) {} + } + } + } + } + // Get peers from DHT // FIXME this needs to be in its own thread if (_util.getDHT() != null && !stop) { @@ -369,10 +388,8 @@ public class TrackerClient extends I2PAppThread Iterator<Peer> it = peers.iterator(); while ((!stop) && it.hasNext()) { Peer cur = it.next(); - if (coordinator.addPeer(cur)) { - int delay = DELAY_MUL; - delay *= r.nextInt(10); - delay += DELAY_MIN; + if (coordinator.addPeer(cur) && it.hasNext()) { + int delay = (DELAY_MUL * r.nextInt(10)) + DELAY_MIN; try { Thread.sleep(delay); } catch (InterruptedException ie) {} } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index c859c436cf53aea54fa69ebc66b66a9bd08d62c0..9bbfec8fee5dfb299952bd9db6c677140e949d50 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -910,8 +910,8 @@ public class I2PSnarkServlet extends Default { out.write(formatSize(total-remaining) + thinsp(noThinsp) + formatSize(total)); else if (remaining == 0) out.write(formatSize(total)); // 3GB - else - out.write("??"); // no meta size yet + //else + // out.write("??"); // no meta size yet out.write("</td>\n\t"); out.write("<td align=\"right\" class=\"snarkTorrentUploaded " + rowClass + "\">"); if(isRunning && isValid) @@ -1058,6 +1058,12 @@ public class I2PSnarkServlet extends Default { out.write("\">"); out.write(formatSize(peer.getDownloadRate()) + "ps</a></span>"); } + } else if (!isValid) { + //if (peer supports metadata extension) { + out.write("<span class=\"unchoked\">"); + out.write(formatSize(peer.getDownloadRate()) + "ps</span>"); + //} else { + //} } out.write("</td>\n\t"); out.write("<td align=\"right\" class=\"snarkTorrentStatus " + rowClass + "\">"); @@ -1629,6 +1635,8 @@ public class I2PSnarkServlet extends Default { int slsh = announce.indexOf('/'); if (slsh > 0) announce = announce.substring(0, slsh); + if (announce.length() > 67) + announce = announce.substring(0, 40) + "…" + announce.substring(announce.length() - 8); buf.append(announce); } }