From 6c19e7e399a40b05b6b9d6a70f83fdd27dc62022 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 21 Nov 2010 21:19:12 +0000 Subject: [PATCH] * i2psnark: - Defer piece loading until required - Stub out Extension message support --- .../java/src/org/klomp/snark/DataLoader.java | 14 ++++++ .../java/src/org/klomp/snark/Message.java | 27 +++++++++-- .../java/src/org/klomp/snark/Peer.java | 23 +++++++-- .../src/org/klomp/snark/PeerConnectionIn.java | 7 +++ .../org/klomp/snark/PeerConnectionOut.java | 39 +++++++++++++++ .../java/src/org/klomp/snark/PeerState.java | 47 +++++++++++++++++-- 6 files changed, 145 insertions(+), 12 deletions(-) create mode 100644 apps/i2psnark/java/src/org/klomp/snark/DataLoader.java diff --git a/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java b/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java new file mode 100644 index 0000000000..0bd974f538 --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/DataLoader.java @@ -0,0 +1,14 @@ +package org.klomp.snark; + +/** + * Callback used to fetch data + * @since 0.8.2 + */ +interface DataLoader +{ + /** + * This is the callback that PeerConnectionOut calls to get the data from disk + * @return bytes or null for errors + */ + public byte[] loadData(int piece, int begin, int length); +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/Message.java b/apps/i2psnark/java/src/org/klomp/snark/Message.java index a91e5ca234..cdde79a181 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Message.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Message.java @@ -39,23 +39,28 @@ class Message final static byte REQUEST = 6; final static byte PIECE = 7; final static byte CANCEL = 8; + final static byte EXTENSION = 20; // Not all fields are used for every message. // KEEP_ALIVE doesn't have a real wire representation byte type; // Used for HAVE, REQUEST, PIECE and CANCEL messages. + // low byte used for EXTENSION message int piece; // Used for REQUEST, PIECE and CANCEL messages. int begin; int length; - // Used for PIECE and BITFIELD messages + // Used for PIECE and BITFIELD and EXTENSION messages byte[] data; int off; int len; + // Used to do deferred fetch of data + DataLoader dataLoader; + SimpleTimer.TimedEvent expireEvent; /** Utility method for sending a message through a DataStream. */ @@ -68,6 +73,13 @@ class Message return; } + // Get deferred data + if (data == null && dataLoader != null) { + data = dataLoader.loadData(piece, begin, length); + if (data == null) + return; // hmm will get retried, but shouldn't happen + } + // Calculate the total length in bytes // Type is one byte. @@ -85,8 +97,12 @@ class Message if (type == REQUEST || type == CANCEL) datalen += 4; + // length is 1 byte + if (type == EXTENSION) + datalen += 1; + // add length of data for piece or bitfield array. - if (type == BITFIELD || type == PIECE) + if (type == BITFIELD || type == PIECE || type == EXTENSION) datalen += len; // Send length @@ -105,8 +121,11 @@ class Message if (type == REQUEST || type == CANCEL) dos.writeInt(length); + if (type == EXTENSION) + dos.writeByte((byte) piece & 0xff); + // Send actual data - if (type == BITFIELD || type == PIECE) + if (type == BITFIELD || type == PIECE || type == EXTENSION) dos.write(data, off, len); } @@ -135,6 +154,8 @@ class Message return "PIECE(" + piece + "," + begin + "," + length + ")"; case CANCEL: return "CANCEL(" + piece + "," + begin + "," + length + ")"; + case EXTENSION: + return "EXTENSION(" + piece + ',' + data.length + ')'; default: return "<UNKNOWN>"; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index 55b6dcb2ca..e747332d83 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -59,6 +59,11 @@ public class Peer implements Comparable private long uploaded_old[] = {-1,-1,-1}; private long downloaded_old[] = {-1,-1,-1}; + // bytes per bt spec: 0011223344556677 + static final long OPTION_EXTENSION = 0x0000000000100000l; + static final long OPTION_FAST = 0x0000000000000004l; + private long options; + /** * Creates a disconnected peer given a PeerID, your own id and the * relevant MetaInfo. @@ -285,9 +290,8 @@ public class Peer implements Comparable // Handshake write - header dout.write(19); dout.write("BitTorrent protocol".getBytes("UTF-8")); - // Handshake write - zeros - byte[] zeros = new byte[8]; - dout.write(zeros); + // Handshake write - options + dout.writeLong(OPTION_EXTENSION); // Handshake write - metainfo hash byte[] shared_hash = metainfo.getInfoHash(); dout.write(shared_hash); @@ -312,8 +316,8 @@ public class Peer implements Comparable + "'Bittorrent protocol', got '" + bittorrentProtocol + "'"); - // Handshake read - zeros - din.readFully(zeros); + // Handshake read - options + options = din.readLong(); // Handshake read - metainfo hash bs = new byte[20]; @@ -325,6 +329,15 @@ public class Peer implements Comparable din.readFully(bs); if (_log.shouldLog(Log.DEBUG)) _log.debug("Read the remote side's hash and peerID fully from " + toString()); + +// if ((options & OPTION_EXTENSION) != 0) { + if (options != 0) { + // send them something + if (_log.shouldLog(Log.DEBUG)) + //_log.debug("Peer supports extension message, what should we say? " + toString()); + _log.debug("Peer supports options 0x" + Long.toString(options, 16) + ", what should we say? " + toString()); + } + return bs; } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java index 9acfb60e3c..d0abe06efd 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java @@ -171,6 +171,13 @@ class PeerConnectionIn implements Runnable if (_log.shouldLog(Log.DEBUG)) _log.debug("Received cancel(" + piece + "," + begin + ") from " + peer + " on " + peer.metainfo.getName()); break; + case 20: // Extension message + int id = din.readUnsignedByte(); + byte[] payload = new byte[i-2]; + din.readFully(payload); + ps.extensionMessage(id, payload); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received extension message from " + peer + " on " + peer.metainfo.getName()); default: byte[] bs = new byte[i-1]; din.readFully(bs); diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 81525a1ea8..8ab0d55594 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -430,6 +430,33 @@ class PeerConnectionOut implements Runnable return total; } + /** @since 0.8.2 */ + void sendPiece(int piece, int begin, int length, DataLoader loader) + { + boolean sendNow = false; + // are there any cases where we should? + + if (sendNow) { + // queue the real thing + byte[] bytes = loader.loadData(piece, begin, length); + if (bytes != null) + sendPiece(piece, begin, length, bytes); + return; + } + + // queue a fake message... set everything up, + // except save the PeerState instead of the bytes. + Message m = new Message(); + m.type = Message.PIECE; + m.piece = piece; + m.begin = begin; + m.length = length; + m.dataLoader = loader; + m.off = 0; + m.len = length; + addMessage(m); + } + void sendPiece(int piece, int begin, int length, byte[] bytes) { Message m = new Message(); @@ -488,4 +515,16 @@ class PeerConnectionOut implements Runnable } } } + + /** @since 0.8.2 */ + void sendExtension(int id, byte[] bytes) { + Message m = new Message(); + m.type = Message.EXTENSION; + m.piece = id; + m.data = bytes; + m.begin = 0; + m.length = bytes.length; + addMessage(m); + + } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 4d34736a89..8ba5c89696 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -20,14 +20,20 @@ package org.klomp.snark; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import net.i2p.I2PAppContext; import net.i2p.util.Log; -class PeerState +import org.klomp.snark.bencode.BDecoder; +import org.klomp.snark.bencode.BEValue; + +class PeerState implements DataLoader { private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class); final Peer peer; @@ -201,13 +207,28 @@ class PeerState return; } + if (_log.shouldLog(Log.INFO)) + _log.info("Queueing (" + piece + ", " + begin + ", " + + length + ")" + " to " + peer); + + // don't load the data into mem now, let PeerConnectionOut do it + out.sendPiece(piece, begin, length, this); + } + + /** + * This is the callback that PeerConnectionOut calls + * + * @return bytes or null for errors + * @since 0.8.2 + */ + public byte[] loadData(int piece, int begin, int length) { byte[] pieceBytes = listener.gotRequest(peer, piece, begin, length); if (pieceBytes == null) { // XXX - Protocol error-> diconnect? if (_log.shouldLog(Log.WARN)) _log.warn("Got request for unknown piece: " + piece); - return; + return null; } // More sanity checks @@ -219,13 +240,13 @@ class PeerState + ", " + begin + ", " + length + "' message from " + peer); - return; + return null; } if (_log.shouldLog(Log.INFO)) _log.info("Sending (" + piece + ", " + begin + ", " + length + ")" + " to " + peer); - out.sendPiece(piece, begin, length, pieceBytes); + return pieceBytes; } /** @@ -413,6 +434,24 @@ class PeerState out.cancelRequest(piece, begin, length); } + /** @since 0.8.2 */ + void extensionMessage(int id, byte[] bs) + { + if (id == 0) { + InputStream is = new ByteArrayInputStream(bs); + try { + BDecoder dec = new BDecoder(is); + BEValue bev = dec.bdecodeMap(); + Map map = bev.getMap(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got extension handshake message " + bev.toString()); + } catch (Exception e) {} + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Got extended message type: " + id + " length: " + bs.length); + } + } + void unknownMessage(int type, byte[] bs) { if (_log.shouldLog(Log.WARN)) -- GitLab