diff --git a/apps/i2psnark/java/src/org/klomp/snark/BitField.java b/apps/i2psnark/java/src/org/klomp/snark/BitField.java index d18ef13435d8b129949742589511921db1a24156..cb147fb2683f2599fee23071f091f3d21da53f10 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/BitField.java +++ b/apps/i2psnark/java/src/org/klomp/snark/BitField.java @@ -20,6 +20,8 @@ package org.klomp.snark; +import java.util.Arrays; + /** * Container of a byte array representing set and unset bits. @@ -66,7 +68,7 @@ public class BitField /** * This returns the actual byte array used. Changes to this array - * effect this BitField. Note that some bits at the end of the byte + * affect this BitField. Note that some bits at the end of the byte * array are supposed to be always unset if they represent bits * bigger then the size of the bitfield. */ @@ -105,6 +107,16 @@ public class BitField } } + /** + * Sets all bits to true. + * + * @since 0.9.21 + */ + public void setAll() { + Arrays.fill(bitfield, (byte) 0xff); + count = size; + } + /** * Return true if the bit is set or false if it is not. * diff --git a/apps/i2psnark/java/src/org/klomp/snark/Message.java b/apps/i2psnark/java/src/org/klomp/snark/Message.java index 9f2197d2b50af29116cb17f61ea37883472340e1..ee07ae9ef33e5f7af2e3c7a8622fcc3a4cb5523f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Message.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Message.java @@ -55,11 +55,13 @@ class Message byte type; // Used for HAVE, REQUEST, PIECE and CANCEL messages. + // Also SUGGEST, REJECT, ALLOWED_FAST // low byte used for EXTENSION message // low two bytes used for PORT message int piece; // Used for REQUEST, PIECE and CANCEL messages. + // Also REJECT int begin; int length; @@ -104,15 +106,18 @@ class Message int datalen = 1; // piece is 4 bytes. - if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL) + if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL || + type == SUGGEST || type == REJECT || type == ALLOWED_FAST) datalen += 4; // begin/offset is 4 bytes - if (type == REQUEST || type == PIECE || type == CANCEL) + if (type == REQUEST || type == PIECE || type == CANCEL || + type == REJECT) datalen += 4; // length is 4 bytes - if (type == REQUEST || type == CANCEL) + if (type == REQUEST || type == CANCEL || + type == REJECT) datalen += 4; // msg type is 1 byte @@ -131,15 +136,18 @@ class Message dos.writeByte(type & 0xFF); // Send additional info (piece number) - if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL) + if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL || + type == SUGGEST || type == REJECT || type == ALLOWED_FAST) dos.writeInt(piece); // Send additional info (begin/offset) - if (type == REQUEST || type == PIECE || type == CANCEL) + if (type == REQUEST || type == PIECE || type == CANCEL || + type == REJECT) dos.writeInt(begin); // Send additional info (length); for PIECE this is implicit. - if (type == REQUEST || type == CANCEL) + if (type == REQUEST || type == CANCEL || + type == REJECT) dos.writeInt(length); if (type == EXTENSION) @@ -173,21 +181,32 @@ class Message case UNINTERESTED: return "UNINTERESTED"; case HAVE: - return "HAVE(" + piece + ")"; + return "HAVE(" + piece + ')'; case BITFIELD: return "BITFIELD"; case REQUEST: - return "REQUEST(" + piece + "," + begin + "," + length + ")"; + return "REQUEST(" + piece + ',' + begin + ',' + length + ')'; case PIECE: - return "PIECE(" + piece + "," + begin + "," + length + ")"; + return "PIECE(" + piece + ',' + begin + ',' + length + ')'; case CANCEL: - return "CANCEL(" + piece + "," + begin + "," + length + ")"; + return "CANCEL(" + piece + ',' + begin + ',' + length + ')'; case PORT: - return "PORT(" + piece + ")"; + return "PORT(" + piece + ')'; case EXTENSION: return "EXTENSION(" + piece + ',' + data.length + ')'; + // fast extensions below here + case SUGGEST: + return "SUGGEST(" + piece + ')'; + case HAVE_ALL: + return "HAVE_ALL"; + case HAVE_NONE: + return "HAVE_NONE"; + case REJECT: + return "REJECT(" + piece + ',' + begin + ',' + length + ')'; + case ALLOWED_FAST: + return "ALLOWED_FAST(" + piece + ')'; default: - return "<UNKNOWN>"; + return "UNKNOWN (" + type + ')'; } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index a7c4f4127a714e92942bd77d3fa8d6a99e7b2107..9a4e3524db0676e89a6a5a5511eb33060fefaaf9 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -79,15 +79,15 @@ public class Peer implements Comparable<Peer> private long uploaded_old[] = {-1,-1,-1}; private long downloaded_old[] = {-1,-1,-1}; - // bytes per bt spec: 0011223344556677 - static final long OPTION_EXTENSION = 0x0000000000100000l; - static final long OPTION_FAST = 0x0000000000000004l; - static final long OPTION_DHT = 0x0000000000000001l; + // bytes per bt spec: 0011223344556677 + private static final long OPTION_EXTENSION = 0x0000000000100000l; + private static final long OPTION_FAST = 0x0000000000000004l; + private static final long OPTION_DHT = 0x0000000000000001l; /** we use a different bit since the compact format is different */ /* no, let's use an extension message static final long OPTION_I2P_DHT = 0x0000000040000000l; */ - static final long OPTION_AZMP = 0x1000000000000000l; + private static final long OPTION_AZMP = 0x1000000000000000l; private long options; /** @@ -338,6 +338,9 @@ public class Peer implements Comparable<Peer> dout.write("BitTorrent protocol".getBytes("UTF-8")); // Handshake write - options long myOptions = OPTION_EXTENSION; + // we can't handle HAVE_ALL or HAVE_NONE if we don't know the number of pieces + if (metainfo != null) + myOptions |= OPTION_FAST; // FIXME get util here somehow //if (util.getDHT() != null) // myOptions |= OPTION_I2P_DHT; @@ -391,9 +394,9 @@ public class Peer implements Comparable<Peer> return bs; } - /** @since 0.8.4 */ - public long getOptions() { - return options; + /** @since 0.9.21 */ + public boolean supportsFast() { + return (options & OPTION_FAST) != 0; } /** @since 0.8.4 */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java index 32ed0473cee2dfd3fd81d4b6a9657107f6ba03b8..962034ab842de7a3470a065030c4b88f199f8d1a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java @@ -102,40 +102,46 @@ class PeerConnectionIn implements Runnable m.type = b; switch (b) { - case 0: + case Message.CHOKE: ps.chokeMessage(true); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received choke from " + peer); break; - case 1: + + case Message.UNCHOKE: ps.chokeMessage(false); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received unchoke from " + peer); break; - case 2: + + case Message.INTERESTED: ps.interestedMessage(true); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received interested from " + peer); break; - case 3: + + case Message.UNINTERESTED: ps.interestedMessage(false); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received not interested from " + peer); break; - case 4: + + case Message.HAVE: piece = din.readInt(); ps.haveMessage(piece); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received havePiece(" + piece + ") from " + peer); break; - case 5: + + case Message.BITFIELD: byte[] bitmap = new byte[i-1]; din.readFully(bitmap); ps.bitfieldMessage(bitmap); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received bitmap from " + peer + ": size=" + (i-1) /* + ": " + ps.bitfield */ ); break; - case 6: + + case Message.REQUEST: piece = din.readInt(); begin = din.readInt(); len = din.readInt(); @@ -143,7 +149,8 @@ class PeerConnectionIn implements Runnable if (_log.shouldLog(Log.DEBUG)) _log.debug("Received request(" + piece + "," + begin + ") from " + peer); break; - case 7: + + case Message.PIECE: piece = din.readInt(); begin = din.readInt(); len = i-9; @@ -165,7 +172,8 @@ class PeerConnectionIn implements Runnable _log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer); } break; - case 8: + + case Message.CANCEL: piece = din.readInt(); begin = din.readInt(); len = din.readInt(); @@ -173,13 +181,15 @@ class PeerConnectionIn implements Runnable if (_log.shouldLog(Log.DEBUG)) _log.debug("Received cancel(" + piece + "," + begin + ") from " + peer); break; - case 9: // PORT message + + case Message.PORT: int port = din.readUnsignedShort(); ps.portMessage(port); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received port message from " + peer); break; - case 20: // Extension message + + case Message.EXTENSION: int id = din.readUnsignedByte(); byte[] payload = new byte[i-2]; din.readFully(payload); @@ -187,6 +197,43 @@ class PeerConnectionIn implements Runnable _log.debug("Received extension message from " + peer); ps.extensionMessage(id, payload); break; + + // fast extensions below here + case Message.SUGGEST: + piece = din.readInt(); + ps.suggestMessage(piece); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received suggest(" + piece + ") from " + peer); + break; + + case Message.HAVE_ALL: + ps.haveMessage(true); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received have_all from " + peer); + break; + + case Message.HAVE_NONE: + ps.haveMessage(false); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received have_none from " + peer); + break; + + case Message.REJECT: + piece = din.readInt(); + begin = din.readInt(); + len = din.readInt(); + ps.rejectMessage(piece, begin, len); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received reject(" + piece + ',' + begin + ',' + len + ") from " + peer); + break; + + case Message.ALLOWED_FAST: + piece = din.readInt(); + ps.allowedFastMessage(piece); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received allowed_fast(" + piece + ") from " + peer); + break; + default: byte[] bs = new byte[i-1]; din.readFully(bs); diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index b878c3b791d3caaec94b37059c50c28e964c5793..6b11200b6a1ac747ffd6bb9afe40ff9d01405a5e 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -277,11 +277,20 @@ class PeerConnectionOut implements Runnable while (it.hasNext()) { Message m = it.next(); - if (m.type == type) - { + if (m.type == type) { it.remove(); removed = true; - } + if (type == Message.PIECE && peer.supportsFast()) { + Message r = new Message(); + r.type = Message.REJECT; + r.piece = m.piece; + r.begin = m.begin; + r.length = m.length; + try { + r.sendMessage(dout); + } catch (IOException ioe) {} + } + } } sendQueue.notifyAll(); } @@ -349,12 +358,19 @@ class PeerConnectionOut implements Runnable void sendBitfield(BitField bitfield) { - Message m = new Message(); - m.type = Message.BITFIELD; - m.data = bitfield.getFieldBytes(); - m.off = 0; - m.len = m.data.length; - addMessage(m); + boolean fast = peer.supportsFast(); + if (fast && bitfield.complete()) { + sendHaveAll(); + } else if (fast && bitfield.count() <= 0) { + sendHaveNone(); + } else { + Message m = new Message(); + m.type = Message.BITFIELD; + m.data = bitfield.getFieldBytes(); + m.off = 0; + m.len = m.data.length; + addMessage(m); + } } /** reransmit requests not received in 7m */ @@ -559,4 +575,50 @@ class PeerConnectionOut implements Runnable m.piece = port; addMessage(m); } + + /** + * Unused + * @since 0.9.21 + */ + void sendSuggest(int piece) { + Message m = new Message(); + m.type = Message.SUGGEST; + m.piece = piece; + addMessage(m); + } + + /** @since 0.9.21 */ + private void sendHaveAll() { + Message m = new Message(); + m.type = Message.HAVE_ALL; + addMessage(m); + } + + /** @since 0.9.21 */ + private void sendHaveNone() { + Message m = new Message(); + m.type = Message.HAVE_NONE; + addMessage(m); + } + + /** @since 0.9.21 */ + void sendReject(int piece, int begin, int length) { + Message m = new Message(); + m.type = Message.REJECT; + m.piece = piece; + m.begin = begin; + m.length = length; + addMessage(m); + } + + /** + * Unused + * @since 0.9.21 + */ + void sendAllowedFast(int piece) { + Message m = new Message(); + m.type = Message.ALLOWED_FAST; + m.piece = piece; + addMessage(m); + } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index cf295a530df1930dd77aee6ab5c0b4501ae51e49..675b42bf116bdb9178d6c574eb32d468520a8f12 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -155,10 +155,17 @@ class PeerState implements DataLoader setInteresting(true); } - void bitfieldMessage(byte[] bitmap) - { - synchronized(this) - { + void bitfieldMessage(byte[] bitmap) { + bitfieldMessage(bitmap, false); + } + + /** + * @param bitmap null to use the isAll param + * @param isAll only if bitmap == null: true for have_all, false for have_none + * @since 0.9.21 + */ + private void bitfieldMessage(byte[] bitmap, boolean isAll) { + synchronized(this) { if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " rcv bitfield"); if (bitfield != null) @@ -172,10 +179,24 @@ class PeerState implements DataLoader // XXX - Check for weird bitfield and disconnect? // FIXME will have to regenerate the bitfield after we know exactly // how many pieces there are, as we don't know how many spare bits there are. - if (metainfo == null) - bitfield = new BitField(bitmap, bitmap.length * 8); - else - bitfield = new BitField(bitmap, metainfo.getPieces()); + if (metainfo == null) { + if (bitmap != null) { + bitfield = new BitField(bitmap, bitmap.length * 8); + } else { + // we can't handle this situation + if (_log.shouldLog(Log.WARN)) + _log.warn("have_x w/o metainfo: " + isAll); + return; + } + } else { + if (bitmap != null) { + bitfield = new BitField(bitmap, metainfo.getPieces()); + } else { + bitfield = new BitField(metainfo.getPieces()); + if (isAll) + bitfield.setAll(); + } + } } if (metainfo == null) return; @@ -198,12 +219,17 @@ class PeerState implements DataLoader + piece + ", " + begin + ", " + length + ") "); if (metainfo == null) return; - if (choking) - { - if (_log.shouldLog(Log.INFO)) - _log.info("Request received, but choking " + peer); + if (choking) { + if (peer.supportsFast()) { + if (_log.shouldInfo()) + _log.info("Request received, sending reject to choked " + peer); + out.sendReject(piece, begin, length); + } else { + if (_log.shouldInfo()) + _log.info("Request received, but choking " + peer); + } return; - } + } // Sanity check if (piece < 0 @@ -227,8 +253,14 @@ class PeerState implements DataLoader // Todo: limit number of requests also? (robert 64 x 4KB) if (out.queuedBytes() + length > MAX_PIPELINE_BYTES) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Discarding request over pipeline limit from " + peer); + if (peer.supportsFast()) { + if (_log.shouldWarn()) + _log.warn("Rejecting request over pipeline limit from " + peer); + out.sendReject(piece, begin, length); + } else { + if (_log.shouldWarn()) + _log.warn("Discarding request over pipeline limit from " + peer); + } return; } @@ -536,6 +568,58 @@ class PeerState implements DataLoader listener.gotPort(peer, port, port + 1); } + /////////// fast message handlers ///////// + + /** + * BEP 6 + * Treated as "have" for now + * @since 0.9.21 + */ + void suggestMessage(int piece) { + if (_log.shouldInfo()) + _log.info("Handling suggest as have(" + piece + ") from " + peer); + haveMessage(piece); + } + + /** + * BEP 6 + * @param isAll true for have_all, false for have_none + * @since 0.9.21 + */ + void haveMessage(boolean isAll) { + bitfieldMessage(null, isAll); + } + + /** + * BEP 6 + * @since 0.9.21 + */ + void rejectMessage(int piece, int begin, int length) { + if (_log.shouldInfo()) + _log.info("Got reject(" + piece + ',' + begin + ',' + length + ") from " + peer); + out.cancelRequest(piece, begin, length); + synchronized(this) { + for (Iterator<Request> iter = outstandingRequests.iterator(); iter.hasNext(); ) { + Request req = iter.next(); + if (req.getPiece() == piece && req.off == begin && req.len == length) + iter.remove(); + } + if (lastRequest != null && lastRequest.getPiece() == piece && + lastRequest.off == begin && lastRequest.len == length) + lastRequest = null; + } + } + + /** + * BEP 6 + * Ignored for now + * @since 0.9.21 + */ + void allowedFastMessage(int piece) { + if (_log.shouldInfo()) + _log.info("Ignoring allowed_fast(" + piece + ") from " + peer); + } + void unknownMessage(int type, byte[] bs) { if (_log.shouldLog(Log.WARN)) @@ -543,6 +627,8 @@ class PeerState implements DataLoader + " length: " + bs.length); } + /////////// end message handlers ///////// + /** * We now have this piece. * Tell the peer and cancel any requests for the piece.