diff --git a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..41cfcdf8d275c0a653dd4b3c4865ab605809dbbc --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandler.java @@ -0,0 +1,224 @@ +package org.klomp.snark; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +import org.klomp.snark.bencode.BDecoder; +import org.klomp.snark.bencode.BEncoder; +import org.klomp.snark.bencode.BEValue; +import org.klomp.snark.bencode.InvalidBEncodingException; + +/** + * REF: BEP 10 Extension Protocol + * @since 0.8.2 + * @author zzz + */ +abstract class ExtensionHandler { + + private static final byte[] _handshake = buildHandshake(); + private static final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(ExtensionHandler.class); + + public static final int ID_METADATA = 3; + private static final String TYPE_METADATA = "ut_metadata"; + private static final int MAX_METADATA_SIZE = Storage.MAX_PIECES * 32 * 5 / 4; + private static final int PARALLEL_REQUESTS = 3; + + + /** + * @return bencoded outgoing handshake message + */ + public static byte[] getHandshake() { + return _handshake; + } + + /** outgoing handshake message */ + private static byte[] buildHandshake() { + Map<String, Object> handshake = new HashMap(); + Map<String, Integer> m = new HashMap(); + m.put(TYPE_METADATA, Integer.valueOf(ID_METADATA)); + handshake.put("m", m); + handshake.put("p", Integer.valueOf(6881)); + handshake.put("v", "I2PSnark"); + handshake.put("reqq", Integer.valueOf(5)); + return BEncoder.bencode(handshake); + } + + public static void handleMessage(Peer peer, int id, byte[] bs) { + if (id == 0) + handleHandshake(peer, bs); + else if (id == ID_METADATA) + handleMetadata(peer, bs); + else if (_log.shouldLog(Log.INFO)) + _log.info("Unknown extension msg " + id + " from " + peer); + } + + private static void handleHandshake(Peer peer, byte[] bs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got handshake msg from " + peer); + try { + // this throws NPE on missing keys + InputStream is = new ByteArrayInputStream(bs); + BDecoder dec = new BDecoder(is); + BEValue bev = dec.bdecodeMap(); + Map<String, BEValue> map = bev.getMap(); + Map<String, BEValue> msgmap = map.get("m").getMap(); + peer.setHandshakeMap(map); + + // not used, just to throw out of here + int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_METADATA).getInt(); + + int metaSize = map.get("metadata_size").getInt(); + MagnetState state = peer.getMagnetState(); + int remaining; + synchronized(state) { + if (state.isComplete()) + return; + + if (state.isInitialized()) { + if (state.getSize() != metaSize) { + peer.disconnect(); + return; + } + } else { + // initialize it + if (metaSize > MAX_METADATA_SIZE) { + peer.disconnect(false); + return; + } + if (_log.shouldLog(Log.INFO)) + _log.info("Initialized state, metadata size = " + metaSize + " from " + peer); + state.initialize(metaSize); + } + remaining = state.chunksRemaining(); + } + + // send requests for chunks + int count = Math.min(remaining, PARALLEL_REQUESTS); + for (int i = 0; i < count; i++) { + int chk; + synchronized(state) { + chk = state.getNextRequest(); + } + if (_log.shouldLog(Log.INFO)) + _log.info("Request chunk " + chk + " from " + peer); + sendRequest(peer, chk); + } + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.info("Handshake exception from " + peer, e); + //peer.disconnect(false); + } + } + + private static final int TYPE_REQUEST = 0; + private static final int TYPE_DATA = 1; + private static final int TYPE_REJECT = 2; + + private static final int CHUNK_SIZE = 16*1024; + /** 25% extra for file names, benconding overhead, etc */ + + /** + * REF: BEP 9 + * @since 0.8.4 + */ + private static void handleMetadata(Peer peer, byte[] bs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got metadata msg from " + peer); + try { + InputStream is = new ByteArrayInputStream(bs); + BDecoder dec = new BDecoder(is); + BEValue bev = dec.bdecodeMap(); + Map<String, BEValue> map = bev.getMap(); + int type = map.get("msg_type").getInt(); + int piece = map.get("piece").getInt(); + + MagnetState state = peer.getMagnetState(); + if (type == TYPE_REQUEST) { + byte[] pc; + synchronized(state) { + pc = state.getChunk(piece); + } + sendPiece(peer, piece, pc); + } else if (type == TYPE_DATA) { + int size = map.get("total_size").getInt(); + boolean done; + int chk = -1; + synchronized(state) { + if (state.isComplete()) + return; + int len = is.available(); + done = state.saveChunk(piece, bs, bs.length - len, len); + if (_log.shouldLog(Log.INFO)) + _log.info("Got chunk " + piece + " from " + peer); + if (!done) + chk = state.getNextRequest(); + } + // out of the lock + if (done) { + // Done! + // PeerState will call the listener (peer coord), who will + // check to see if the MagnetState has it + if (_log.shouldLog(Log.WARN)) + _log.warn("Got last chunk from " + peer); + } else { + // get the next chunk + if (_log.shouldLog(Log.INFO)) + _log.info("Request chunk " + chk + " from " + peer); + sendRequest(peer, chk); + } + } else if (type == TYPE_REJECT) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Got reject msg from " + peer); + peer.disconnect(false); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Got unknown metadata msg from " + peer); + peer.disconnect(false); + } + } catch (Exception e) { + if (_log.shouldLog(Log.WARN)) + _log.info("Metadata ext. msg. exception from " + peer, e); + peer.disconnect(false); + } + } + + private static void sendRequest(Peer peer, int piece) { + Map<String, Object> map = new HashMap(); + map.put("msg_type", TYPE_REQUEST); + map.put("piece", Integer.valueOf(piece)); + byte[] payload = BEncoder.bencode(map); + try { + int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get(TYPE_METADATA).getInt(); + peer.sendExtension(hisMsgCode, payload); + } catch (Exception e) { + // NPE, no metadata capability + if (_log.shouldLog(Log.WARN)) + _log.info("Metadata send req msg exception to " + peer, e); + } + } + + private static void sendPiece(Peer peer, int piece, byte[] data) { + Map<String, Object> map = new HashMap(); + map.put("msg_type", TYPE_REQUEST); + map.put("piece", Integer.valueOf(piece)); + map.put("total_size", Integer.valueOf(data.length)); + byte[] dict = BEncoder.bencode(map); + byte[] payload = new byte[dict.length + data.length]; + System.arraycopy(dict, 0, payload, 0, dict.length); + System.arraycopy(data, 0, payload, dict.length, payload.length); + try { + int hisMsgCode = peer.getHandshakeMap().get("m").getMap().get("METADATA").getInt(); + peer.sendExtension(hisMsgCode, payload); + } catch (Exception e) { + // NPE, no metadata caps + if (_log.shouldLog(Log.WARN)) + _log.info("Metadata send piece msg exception to " + peer, e); + } + } +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandshake.java b/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandshake.java deleted file mode 100644 index fb69b044d284a7414f74b3e58a0b5773fd1c8419..0000000000000000000000000000000000000000 --- a/apps/i2psnark/java/src/org/klomp/snark/ExtensionHandshake.java +++ /dev/null @@ -1,36 +0,0 @@ -package org.klomp.snark; - -import java.util.HashMap; -import java.util.Map; - -import org.klomp.snark.bencode.BEncoder; -import org.klomp.snark.bencode.BEValue; - -/** - * REF: BEP 10 Extension Protocol - * @since 0.8.2 - */ -class ExtensionHandshake { - - private static final byte[] _payload = buildPayload(); - - /** - * @return bencoded data - */ - static byte[] getPayload() { - return _payload; - } - - /** just a test for now */ - private static byte[] buildPayload() { - Map<String, Object> handshake = new HashMap(); - Map<String, Integer> m = new HashMap(); - m.put("foo", Integer.valueOf(99)); - m.put("bar", Integer.valueOf(101)); - handshake.put("m", m); - handshake.put("p", Integer.valueOf(6881)); - handshake.put("v", "I2PSnark"); - handshake.put("reqq", Integer.valueOf(5)); - return BEncoder.bencode(handshake); - } -} diff --git a/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java b/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java new file mode 100644 index 0000000000000000000000000000000000000000..fb7e5a542da7cd8a4f861841083877bc6252d372 --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/MagnetState.java @@ -0,0 +1,204 @@ +package org.klomp.snark; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; + +import org.klomp.snark.bencode.BDecoder; +import org.klomp.snark.bencode.BEValue; + +/** + * Simple state for the download of the metainfo, shared between + * Peer and ExtensionHandler. + * + * Nothing is synchronized here! + * Caller must synchronize on this for everything! + * + * Reference: BEP 9 + * + * @since 0.8.4 + * author zzz + */ +class MagnetState { + public static final int CHUNK_SIZE = 16*1024; + private static final Random random = I2PAppContext.getGlobalContext().random(); + + private final byte[] infohash; + private boolean complete; + /** if false, nothing below is valid */ + private boolean isInitialized; + + private int metaSize; + private int totalChunks; + /** bitfield for the metainfo chunks - will remain null if we start out complete */ + private BitField requested; + private BitField have; + /** bitfield for the metainfo */ + private byte[] metainfoBytes; + /** only valid when finished */ + private MetaInfo metainfo; + + /** + * @param meta null for new magnet + */ + public MagnetState(byte[] iHash, MetaInfo meta) { + infohash = iHash; + if (meta != null) { + metainfo = meta; + initialize(meta.getInfoBytes().length); + complete = true; + } else { + metainfoBytes = new byte[metaSize]; + } + } + + /** + * @param call this for a new magnet when you have the size + * @throws IllegalArgumentException + */ + public void initialize(int size) { + if (isInitialized) + throw new IllegalArgumentException("already set"); + isInitialized = true; + metaSize = size; + totalChunks = (size + (CHUNK_SIZE - 1)) / CHUNK_SIZE; + if (metainfo == null) { + // we don't need these if complete + have = new BitField(totalChunks); + requested = new BitField(totalChunks); + } + } + + /** + * @param Call this for a new magnet when the download is complete. + * @throws IllegalArgumentException + */ + public void setMetaInfo(MetaInfo meta) { + metainfo = meta; + } + + /** + * @throws IllegalArgumentException + */ + public MetaInfo getMetaInfo() { + if (!complete) + throw new IllegalArgumentException("not complete"); + return metainfo; + } + + /** + * @throws IllegalArgumentException + */ + public int getSize() { + if (!isInitialized) + throw new IllegalArgumentException("not initialized"); + return metaSize; + } + + public boolean isInitialized() { + return isInitialized; + } + + public boolean isComplete() { + return complete; + } + + public int chunkSize(int chunk) { + return Math.min(CHUNK_SIZE, metaSize - (chunk * CHUNK_SIZE)); + } + + /** @return chunk count */ + public int chunksRemaining() { + if (!isInitialized) + throw new IllegalArgumentException("not initialized"); + if (complete) + return 0; + return totalChunks - have.count(); + } + + /** @return chunk number */ + public int getNextRequest() { + if (!isInitialized) + throw new IllegalArgumentException("not initialized"); + if (complete) + throw new IllegalArgumentException("complete"); + int rand = random.nextInt(totalChunks); + for (int i = 0; i < totalChunks; i++) { + int chk = (i + rand) % totalChunks; + if (!(have.get(chk) || requested.get(chk))) { + requested.set(chk); + return chk; + } + } + // all requested - end game + for (int i = 0; i < totalChunks; i++) { + int chk = (i + rand) % totalChunks; + if (!have.get(chk)) + return chk; + } + throw new IllegalArgumentException("complete"); + } + + /** + * @throws IllegalArgumentException + */ + public byte[] getChunk(int chunk) { + if (!complete) + throw new IllegalArgumentException("not complete"); + if (chunk < 0 || chunk >= totalChunks) + throw new IllegalArgumentException("bad chunk number"); + int size = chunkSize(chunk); + byte[] rv = new byte[size]; + System.arraycopy(metainfoBytes, chunk * CHUNK_SIZE, rv, 0, size); + // use meta.getInfoBytes() so we don't save it in memory + return rv; + } + + /** + * @return true if this was the last piece + * @throws NPE, IllegalArgumentException, IOException, ... + */ + public boolean saveChunk(int chunk, byte[] data, int off, int length) throws Exception { + if (!isInitialized) + throw new IllegalArgumentException("not initialized"); + if (chunk < 0 || chunk >= totalChunks) + throw new IllegalArgumentException("bad chunk number"); + if (have.get(chunk)) + return false; // shouldn't happen if synced + int size = chunkSize(chunk); + if (size != length) + throw new IllegalArgumentException("bad chunk length"); + System.arraycopy(data, off, metainfoBytes, chunk * CHUNK_SIZE, size); + have.set(chunk); + boolean done = have.complete(); + if (done) { + metainfo = buildMetaInfo(); + complete = true; + } + return done; + } + + /** + * @return true if this was the last piece + * @throws NPE, IllegalArgumentException, IOException, ... + */ + public MetaInfo buildMetaInfo() throws Exception { + // top map has nothing in it but the info map (no announce) + Map<String, Object> map = new HashMap(); + InputStream is = new ByteArrayInputStream(metainfoBytes); + BDecoder dec = new BDecoder(is); + BEValue bev = dec.bdecodeMap(); + Map<String, BEValue> info = bev.getMap(); + map.put("info", info); + MetaInfo newmeta = new MetaInfo(map); + if (!DataHelper.eq(newmeta.getInfoHash(), infohash)) + throw new IOException("info hash mismatch"); + return newmeta; + } +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java b/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java index 140d3cb468fc024d0f961b928ba3b07b44a3529d..a2522cfc4ce44218aa6b9ee6f74a74224b3c51d8 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java +++ b/apps/i2psnark/java/src/org/klomp/snark/MetaInfo.java @@ -25,6 +25,7 @@ import java.io.InputStream; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -59,10 +60,11 @@ public class MetaInfo private final int piece_length; private final byte[] piece_hashes; private final long length; - private final Map infoMap; - - private byte[] torrentdata; + private Map infoMap; + /** + * Called by Storage when creating a new torrent from local data + */ MetaInfo(String announce, String name, String name_utf8, List files, List lengths, int piece_length, byte[] piece_hashes, long length) { @@ -77,7 +79,7 @@ public class MetaInfo this.length = length; this.info_hash = calculateInfoHash(); - infoMap = null; + //infoMap = null; } /** @@ -104,7 +106,7 @@ public class MetaInfo * Creates a new MetaInfo from a Map of BEValues and the SHA1 over * the original bencoded info dictonary (this is a hack, we could * reconstruct the bencoded stream and recalculate the hash). Will - * throw a InvalidBEncodingException if the given map does not + * NOT throw a InvalidBEncodingException if the given map does not * contain a valid announce string or info dictonary. */ public MetaInfo(Map m) throws InvalidBEncodingException @@ -112,9 +114,13 @@ public class MetaInfo if (_log.shouldLog(Log.DEBUG)) _log.debug("Creating a metaInfo: " + m, new Exception("source")); BEValue val = (BEValue)m.get("announce"); - if (val == null) - throw new InvalidBEncodingException("Missing announce string"); - this.announce = val.getString(); + // Disabled check, we can get info from a magnet now + if (val == null) { + //throw new InvalidBEncodingException("Missing announce string"); + this.announce = null; + } else { + this.announce = val.getString(); + } val = (BEValue)m.get("info"); if (val == null) @@ -215,6 +221,7 @@ public class MetaInfo /** * Returns the string representing the URL of the tracker for this torrent. + * @return may be null! */ public String getAnnounce() { @@ -388,26 +395,34 @@ public class MetaInfo piece_hashes, length); } - public byte[] getTorrentData() + /** + * Called by servlet to save a new torrent file generated from local data + */ + public synchronized byte[] getTorrentData() { - if (torrentdata == null) - { Map m = new HashMap(); m.put("announce", announce); Map info = createInfoMap(); m.put("info", info); - torrentdata = BEncoder.bencode(m); - } - return torrentdata; + // don't save this locally, we should only do this once + return BEncoder.bencode(m); + } + + /** @since 0.8.4 */ + public synchronized byte[] getInfoBytes() { + if (infoMap == null) + createInfoMap(); + return BEncoder.bencode(infoMap); } - private Map createInfoMap() + /** @return an unmodifiable view of the Map */ + private Map<String, BEValue> createInfoMap() { + // if we loaded this metainfo from a file, we have the map + if (infoMap != null) + return Collections.unmodifiableMap(infoMap); + // otherwise we must create it Map info = new HashMap(); - if (infoMap != null) { - info.putAll(infoMap); - return info; - } info.put("name", name); if (name_utf8 != null) info.put("name.utf-8", name_utf8); @@ -429,7 +444,8 @@ public class MetaInfo } info.put("files", l); } - return info; + infoMap = info; + return Collections.unmodifiableMap(infoMap); } private byte[] calculateInfoHash() diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index f5d200fdadf269985553cec124a6803666fdb9d7..681a1a5806546ded305f9a0ab61f2fe64db73a72 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -28,11 +28,14 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import java.util.Map; import net.i2p.client.streaming.I2PSocket; import net.i2p.data.DataHelper; import net.i2p.util.Log; +import org.klomp.snark.bencode.BEValue; + public class Peer implements Comparable { private Log _log = new Log(Peer.class); @@ -41,7 +44,9 @@ public class Peer implements Comparable private final byte[] my_id; private final byte[] infohash; - final MetaInfo metainfo; + /** will start out null in magnet mode */ + private MetaInfo metainfo; + private Map<String, BEValue> handshakeMap; // The data in/output streams set during the handshake and used by // the actual connections. @@ -52,6 +57,9 @@ public class Peer implements Comparable // was successful, the connection setup and runs PeerState state; + /** shared across all peers on this torrent */ + MagnetState magnetState; + private I2PSocket sock; private boolean deregister = true; @@ -197,7 +205,7 @@ public class Peer implements Comparable * If the given BitField is non-null it is send to the peer as first * message. */ - public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField bitfield) + public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField bitfield, MagnetState mState) { if (state != null) throw new IllegalStateException("Peer already started"); @@ -255,7 +263,7 @@ public class Peer implements Comparable if ((options & OPTION_EXTENSION) != 0) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Peer supports extensions, sending test message"); - out.sendExtension(0, ExtensionHandshake.getPayload()); + out.sendExtension(0, ExtensionHandler.getHandshake()); } if ((options & OPTION_DHT) != 0 && util.getDHT() != null) { @@ -271,6 +279,7 @@ public class Peer implements Comparable // We are up and running! state = s; + magnetState = mState; listener.connected(this); if (_log.shouldLog(Log.DEBUG)) @@ -371,6 +380,42 @@ public class Peer implements Comparable return options; } + /** + * Shared state across all peers, callers must sync on returned object + * @return non-null + * @since 0.8.4 + */ + public MagnetState getMagnetState() { + return magnetState; + } + + /** @return could be null @since 0.8.4 */ + public Map<String, BEValue> getHandshakeMap() { + return handshakeMap; + } + + /** @since 0.8.4 */ + public void setHandshakeMap(Map<String, BEValue> map) { + handshakeMap = map; + } + + /** @since 0.8.4 */ + public void sendExtension(int type, byte[] payload) { + PeerState s = state; + if (s != null) + s.out.sendExtension(type, payload); + } + + /** + * Switch from magnet mode to normal mode + * @since 0.8.4 + */ + public void gotMetaInfo(MetaInfo meta) { + PeerState s = state; + if (s != null) + s.gotMetaInfo(meta); + } + public boolean isConnected() { return state != null; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java index 90755a9371a64695e1f71c4521efb5f166848859..a27fc1c9dd17a91cfcc6698349b7e0186c9af3be 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java @@ -90,7 +90,7 @@ class PeerConnectionIn implements Runnable { ps.keepAliveMessage(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received keepalive from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received keepalive from " + peer); continue; } @@ -102,35 +102,35 @@ class PeerConnectionIn implements Runnable case 0: ps.chokeMessage(true); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received choke from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received choke from " + peer); break; case 1: ps.chokeMessage(false); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received unchoke from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received unchoke from " + peer); break; case 2: ps.interestedMessage(true); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received interested from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received interested from " + peer); break; case 3: ps.interestedMessage(false); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received not interested from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received not interested from " + peer); break; case 4: piece = din.readInt(); ps.haveMessage(piece); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received havePiece(" + piece + ") from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received havePiece(" + piece + ") from " + peer); break; case 5: byte[] bitmap = new byte[i-1]; din.readFully(bitmap); ps.bitfieldMessage(bitmap); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received bitmap from " + peer + " on " + peer.metainfo.getName() + ": size=" + (i-1) /* + ": " + ps.bitfield */ ); + _log.debug("Received bitmap from " + peer + ": size=" + (i-1) /* + ": " + ps.bitfield */ ); break; case 6: piece = din.readInt(); @@ -138,7 +138,7 @@ class PeerConnectionIn implements Runnable len = din.readInt(); ps.requestMessage(piece, begin, len); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received request(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received request(" + piece + "," + begin + ") from " + peer); break; case 7: piece = din.readInt(); @@ -152,7 +152,7 @@ class PeerConnectionIn implements Runnable din.readFully(piece_bytes, begin, len); ps.pieceMessage(req); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received data(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received data(" + piece + "," + begin + ") from " + peer); } else { @@ -160,7 +160,7 @@ class PeerConnectionIn implements Runnable piece_bytes = new byte[len]; din.readFully(piece_bytes); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer); } break; case 8: @@ -169,27 +169,27 @@ class PeerConnectionIn implements Runnable len = din.readInt(); ps.cancelMessage(piece, begin, len); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received cancel(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received cancel(" + piece + "," + begin + ") from " + peer); break; case 9: // PORT message int port = din.readUnsignedShort(); ps.portMessage(port); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received port message from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received port message from " + peer); case 20: // Extension message int id = din.readUnsignedByte(); byte[] payload = new byte[i-2]; din.readFully(payload); ps.extensionMessage(id, payload); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received extension message from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received extension message from " + peer); break; default: byte[] bs = new byte[i-1]; din.readFully(bs); ps.unknownMessage(b, bs); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received unknown message from " + peer + " on " + peer.metainfo.getName()); + _log.debug("Received unknown message from " + peer); } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 19614bc52e20a850aacc2590749a0bc8ec580dc7..181ca496955fec63e6a35a4b4ae5112563a8d930 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -151,7 +151,7 @@ class PeerConnectionOut implements Runnable if (m != null) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Send " + peer + ": " + m + " on " + peer.metainfo.getName()); + _log.debug("Send " + peer + ": " + m); // This can block for quite a while. // To help get slow peers going, and track the bandwidth better, diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index f7a3a42fc83a84b58c1c2179e4e454ae0bedcf99..6eb730fc9dd2b4fcbb6360661aafb5d3b4a98549 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -105,6 +105,7 @@ public class PeerCoordinator implements PeerListener private boolean halted = false; + private final MagnetState magnetState; private final CoordinatorListener listener; private final I2PSnarkUtil _util; private static final Random _random = I2PAppContext.getGlobalContext().random(); @@ -128,6 +129,7 @@ public class PeerCoordinator implements PeerListener setWantedPieces(); partialPieces = new ArrayList(getMaxConnections() + 1); peers = new LinkedBlockingQueue(); + magnetState = new MagnetState(infohash, metainfo); // Install a timer to check the uploaders. // Randomize the first start time so multiple tasks are spread out, @@ -484,7 +486,7 @@ public class PeerCoordinator implements PeerListener { public void run() { - peer.runConnection(_util, listener, bitfield); + peer.runConnection(_util, listener, bitfield, magnetState); } }; String threadName = "Snark peer " + peer.toString(); @@ -1149,10 +1151,30 @@ public class PeerCoordinator implements PeerListener } /** @since 0.8.4 */ - public void gotExtension(Peer peer, int id, byte[] bs) {} + public void gotExtension(Peer peer, int id, byte[] bs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got extension message " + id + " from " + peer); + // basic handling done in PeerState... here we just check if we are done + if (metainfo == null && id == ExtensionHandler.ID_METADATA) { + synchronized (magnetState) { + if (magnetState.isComplete()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Got completed metainfo via extension"); + MetaInfo newinfo = magnetState.getMetaInfo(); + // more validation + // set global + // instantiate storage + // tell Snark listener + // tell all peers + } + } + } + } /** @since 0.8.4 */ - public void gotPort(Peer peer, int port) {} + public void gotPort(Peer peer, int port) { + // send to DHT + } /** Return number of allowed uploaders for this torrent. ** Check with Snark to see if we are over the total upload limit. diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 3ec3d219f60c51d3617af29144fea1798abeafed..a85207476b9a58def420f982de2cf693d9337dca 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -497,9 +497,21 @@ class PeerState implements DataLoader /** @since 0.8.2 */ void extensionMessage(int id, byte[] bs) { + ExtensionHandler.handleMessage(peer, id, bs); + // Peer coord will get metadata from MagnetState, + // verify, and then call gotMetaInfo() listener.gotExtension(peer, id, bs); } + /** + * Switch from magnet mode to normal mode + * @since 0.8.4 + */ + public void gotMetaInfo(MetaInfo meta) { + // set metainfo + // fix bitfield + } + /** @since 0.8.4 */ void portMessage(int port) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 24e8ebc7f6ffc6360c1a047136b4242353194136..dca9c8324293a964b8ac289a767bde4f5e72b157 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -594,9 +594,15 @@ public class SnarkManager implements Snark.CompleteListener { _peerCoordinatorSet, _connectionAcceptor, false, getDataDir().getPath()); - // Tell the dir monitor not to delete us - _magnets.add(name); synchronized (_snarks) { + for (Snark snark : _snarks.values()) { + if (DataHelper.eq(ih, snark.getInfoHash())) { + addMessage(_("Torrent already running: {0}", snark.getBaseName())); + return; + } + } + // Tell the dir monitor not to delete us + _magnets.add(name); _snarks.put(name, torrent); } if (shouldAutoStart()) {