I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit de4d47de authored by zzz's avatar zzz
Browse files

i2psnark: Add support for fast extensions (BEP 6)

untested
parent 2dc3d684
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
package org.klomp.snark; package org.klomp.snark;
import java.util.Arrays;
/** /**
* Container of a byte array representing set and unset bits. * Container of a byte array representing set and unset bits.
...@@ -66,7 +68,7 @@ public class BitField ...@@ -66,7 +68,7 @@ public class BitField
/** /**
* This returns the actual byte array used. Changes to this array * This returns the actual byte array used. Changes to this array
* effect this BitField. Note that some bits at the end of the byte * affect this BitField. Note that some bits at the end of the byte
* array are supposed to be always unset if they represent bits * array are supposed to be always unset if they represent bits
* bigger then the size of the bitfield. * bigger then the size of the bitfield.
*/ */
...@@ -105,6 +107,16 @@ public class BitField ...@@ -105,6 +107,16 @@ public class BitField
} }
} }
/**
* Sets all bits to true.
*
* @since 0.9.21
*/
public void setAll() {
Arrays.fill(bitfield, (byte) 0xff);
count = size;
}
/** /**
* Return true if the bit is set or false if it is not. * Return true if the bit is set or false if it is not.
* *
......
...@@ -55,11 +55,13 @@ class Message ...@@ -55,11 +55,13 @@ class Message
byte type; byte type;
// Used for HAVE, REQUEST, PIECE and CANCEL messages. // Used for HAVE, REQUEST, PIECE and CANCEL messages.
// Also SUGGEST, REJECT, ALLOWED_FAST
// low byte used for EXTENSION message // low byte used for EXTENSION message
// low two bytes used for PORT message // low two bytes used for PORT message
int piece; int piece;
// Used for REQUEST, PIECE and CANCEL messages. // Used for REQUEST, PIECE and CANCEL messages.
// Also REJECT
int begin; int begin;
int length; int length;
...@@ -104,15 +106,18 @@ class Message ...@@ -104,15 +106,18 @@ class Message
int datalen = 1; int datalen = 1;
// piece is 4 bytes. // piece is 4 bytes.
if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL) if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL ||
type == SUGGEST || type == REJECT || type == ALLOWED_FAST)
datalen += 4; datalen += 4;
// begin/offset is 4 bytes // begin/offset is 4 bytes
if (type == REQUEST || type == PIECE || type == CANCEL) if (type == REQUEST || type == PIECE || type == CANCEL ||
type == REJECT)
datalen += 4; datalen += 4;
// length is 4 bytes // length is 4 bytes
if (type == REQUEST || type == CANCEL) if (type == REQUEST || type == CANCEL ||
type == REJECT)
datalen += 4; datalen += 4;
// msg type is 1 byte // msg type is 1 byte
...@@ -131,15 +136,18 @@ class Message ...@@ -131,15 +136,18 @@ class Message
dos.writeByte(type & 0xFF); dos.writeByte(type & 0xFF);
// Send additional info (piece number) // Send additional info (piece number)
if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL) if (type == HAVE || type == REQUEST || type == PIECE || type == CANCEL ||
type == SUGGEST || type == REJECT || type == ALLOWED_FAST)
dos.writeInt(piece); dos.writeInt(piece);
// Send additional info (begin/offset) // Send additional info (begin/offset)
if (type == REQUEST || type == PIECE || type == CANCEL) if (type == REQUEST || type == PIECE || type == CANCEL ||
type == REJECT)
dos.writeInt(begin); dos.writeInt(begin);
// Send additional info (length); for PIECE this is implicit. // Send additional info (length); for PIECE this is implicit.
if (type == REQUEST || type == CANCEL) if (type == REQUEST || type == CANCEL ||
type == REJECT)
dos.writeInt(length); dos.writeInt(length);
if (type == EXTENSION) if (type == EXTENSION)
...@@ -173,21 +181,32 @@ class Message ...@@ -173,21 +181,32 @@ class Message
case UNINTERESTED: case UNINTERESTED:
return "UNINTERESTED"; return "UNINTERESTED";
case HAVE: case HAVE:
return "HAVE(" + piece + ")"; return "HAVE(" + piece + ')';
case BITFIELD: case BITFIELD:
return "BITFIELD"; return "BITFIELD";
case REQUEST: case REQUEST:
return "REQUEST(" + piece + "," + begin + "," + length + ")"; return "REQUEST(" + piece + ',' + begin + ',' + length + ')';
case PIECE: case PIECE:
return "PIECE(" + piece + "," + begin + "," + length + ")"; return "PIECE(" + piece + ',' + begin + ',' + length + ')';
case CANCEL: case CANCEL:
return "CANCEL(" + piece + "," + begin + "," + length + ")"; return "CANCEL(" + piece + ',' + begin + ',' + length + ')';
case PORT: case PORT:
return "PORT(" + piece + ")"; return "PORT(" + piece + ')';
case EXTENSION: case EXTENSION:
return "EXTENSION(" + piece + ',' + data.length + ')'; return "EXTENSION(" + piece + ',' + data.length + ')';
// fast extensions below here
case SUGGEST:
return "SUGGEST(" + piece + ')';
case HAVE_ALL:
return "HAVE_ALL";
case HAVE_NONE:
return "HAVE_NONE";
case REJECT:
return "REJECT(" + piece + ',' + begin + ',' + length + ')';
case ALLOWED_FAST:
return "ALLOWED_FAST(" + piece + ')';
default: default:
return "<UNKNOWN>"; return "UNKNOWN (" + type + ')';
} }
} }
} }
...@@ -79,15 +79,15 @@ public class Peer implements Comparable<Peer> ...@@ -79,15 +79,15 @@ public class Peer implements Comparable<Peer>
private long uploaded_old[] = {-1,-1,-1}; private long uploaded_old[] = {-1,-1,-1};
private long downloaded_old[] = {-1,-1,-1}; private long downloaded_old[] = {-1,-1,-1};
// bytes per bt spec: 0011223344556677 // bytes per bt spec: 0011223344556677
static final long OPTION_EXTENSION = 0x0000000000100000l; private static final long OPTION_EXTENSION = 0x0000000000100000l;
static final long OPTION_FAST = 0x0000000000000004l; private static final long OPTION_FAST = 0x0000000000000004l;
static final long OPTION_DHT = 0x0000000000000001l; private static final long OPTION_DHT = 0x0000000000000001l;
/** we use a different bit since the compact format is different */ /** we use a different bit since the compact format is different */
/* no, let's use an extension message /* no, let's use an extension message
static final long OPTION_I2P_DHT = 0x0000000040000000l; static final long OPTION_I2P_DHT = 0x0000000040000000l;
*/ */
static final long OPTION_AZMP = 0x1000000000000000l; private static final long OPTION_AZMP = 0x1000000000000000l;
private long options; private long options;
/** /**
...@@ -338,6 +338,9 @@ public class Peer implements Comparable<Peer> ...@@ -338,6 +338,9 @@ public class Peer implements Comparable<Peer>
dout.write("BitTorrent protocol".getBytes("UTF-8")); dout.write("BitTorrent protocol".getBytes("UTF-8"));
// Handshake write - options // Handshake write - options
long myOptions = OPTION_EXTENSION; long myOptions = OPTION_EXTENSION;
// we can't handle HAVE_ALL or HAVE_NONE if we don't know the number of pieces
if (metainfo != null)
myOptions |= OPTION_FAST;
// FIXME get util here somehow // FIXME get util here somehow
//if (util.getDHT() != null) //if (util.getDHT() != null)
// myOptions |= OPTION_I2P_DHT; // myOptions |= OPTION_I2P_DHT;
...@@ -391,9 +394,9 @@ public class Peer implements Comparable<Peer> ...@@ -391,9 +394,9 @@ public class Peer implements Comparable<Peer>
return bs; return bs;
} }
/** @since 0.8.4 */ /** @since 0.9.21 */
public long getOptions() { public boolean supportsFast() {
return options; return (options & OPTION_FAST) != 0;
} }
/** @since 0.8.4 */ /** @since 0.8.4 */
......
...@@ -102,40 +102,46 @@ class PeerConnectionIn implements Runnable ...@@ -102,40 +102,46 @@ class PeerConnectionIn implements Runnable
m.type = b; m.type = b;
switch (b) switch (b)
{ {
case 0: case Message.CHOKE:
ps.chokeMessage(true); ps.chokeMessage(true);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received choke from " + peer); _log.debug("Received choke from " + peer);
break; break;
case 1:
case Message.UNCHOKE:
ps.chokeMessage(false); ps.chokeMessage(false);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received unchoke from " + peer); _log.debug("Received unchoke from " + peer);
break; break;
case 2:
case Message.INTERESTED:
ps.interestedMessage(true); ps.interestedMessage(true);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received interested from " + peer); _log.debug("Received interested from " + peer);
break; break;
case 3:
case Message.UNINTERESTED:
ps.interestedMessage(false); ps.interestedMessage(false);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received not interested from " + peer); _log.debug("Received not interested from " + peer);
break; break;
case 4:
case Message.HAVE:
piece = din.readInt(); piece = din.readInt();
ps.haveMessage(piece); ps.haveMessage(piece);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received havePiece(" + piece + ") from " + peer); _log.debug("Received havePiece(" + piece + ") from " + peer);
break; break;
case 5:
case Message.BITFIELD:
byte[] bitmap = new byte[i-1]; byte[] bitmap = new byte[i-1];
din.readFully(bitmap); din.readFully(bitmap);
ps.bitfieldMessage(bitmap); ps.bitfieldMessage(bitmap);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received bitmap from " + peer + ": size=" + (i-1) /* + ": " + ps.bitfield */ ); _log.debug("Received bitmap from " + peer + ": size=" + (i-1) /* + ": " + ps.bitfield */ );
break; break;
case 6:
case Message.REQUEST:
piece = din.readInt(); piece = din.readInt();
begin = din.readInt(); begin = din.readInt();
len = din.readInt(); len = din.readInt();
...@@ -143,7 +149,8 @@ class PeerConnectionIn implements Runnable ...@@ -143,7 +149,8 @@ class PeerConnectionIn implements Runnable
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received request(" + piece + "," + begin + ") from " + peer); _log.debug("Received request(" + piece + "," + begin + ") from " + peer);
break; break;
case 7:
case Message.PIECE:
piece = din.readInt(); piece = din.readInt();
begin = din.readInt(); begin = din.readInt();
len = i-9; len = i-9;
...@@ -165,7 +172,8 @@ class PeerConnectionIn implements Runnable ...@@ -165,7 +172,8 @@ class PeerConnectionIn implements Runnable
_log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer); _log.debug("Received UNWANTED data(" + piece + "," + begin + ") from " + peer);
} }
break; break;
case 8:
case Message.CANCEL:
piece = din.readInt(); piece = din.readInt();
begin = din.readInt(); begin = din.readInt();
len = din.readInt(); len = din.readInt();
...@@ -173,13 +181,15 @@ class PeerConnectionIn implements Runnable ...@@ -173,13 +181,15 @@ class PeerConnectionIn implements Runnable
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received cancel(" + piece + "," + begin + ") from " + peer); _log.debug("Received cancel(" + piece + "," + begin + ") from " + peer);
break; break;
case 9: // PORT message
case Message.PORT:
int port = din.readUnsignedShort(); int port = din.readUnsignedShort();
ps.portMessage(port); ps.portMessage(port);
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("Received port message from " + peer); _log.debug("Received port message from " + peer);
break; break;
case 20: // Extension message
case Message.EXTENSION:
int id = din.readUnsignedByte(); int id = din.readUnsignedByte();
byte[] payload = new byte[i-2]; byte[] payload = new byte[i-2];
din.readFully(payload); din.readFully(payload);
...@@ -187,6 +197,43 @@ class PeerConnectionIn implements Runnable ...@@ -187,6 +197,43 @@ class PeerConnectionIn implements Runnable
_log.debug("Received extension message from " + peer); _log.debug("Received extension message from " + peer);
ps.extensionMessage(id, payload); ps.extensionMessage(id, payload);
break; break;
// fast extensions below here
case Message.SUGGEST:
piece = din.readInt();
ps.suggestMessage(piece);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received suggest(" + piece + ") from " + peer);
break;
case Message.HAVE_ALL:
ps.haveMessage(true);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received have_all from " + peer);
break;
case Message.HAVE_NONE:
ps.haveMessage(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received have_none from " + peer);
break;
case Message.REJECT:
piece = din.readInt();
begin = din.readInt();
len = din.readInt();
ps.rejectMessage(piece, begin, len);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received reject(" + piece + ',' + begin + ',' + len + ") from " + peer);
break;
case Message.ALLOWED_FAST:
piece = din.readInt();
ps.allowedFastMessage(piece);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Received allowed_fast(" + piece + ") from " + peer);
break;
default: default:
byte[] bs = new byte[i-1]; byte[] bs = new byte[i-1];
din.readFully(bs); din.readFully(bs);
......
...@@ -277,11 +277,20 @@ class PeerConnectionOut implements Runnable ...@@ -277,11 +277,20 @@ class PeerConnectionOut implements Runnable
while (it.hasNext()) while (it.hasNext())
{ {
Message m = it.next(); Message m = it.next();
if (m.type == type) if (m.type == type) {
{
it.remove(); it.remove();
removed = true; removed = true;
} if (type == Message.PIECE && peer.supportsFast()) {
Message r = new Message();
r.type = Message.REJECT;
r.piece = m.piece;
r.begin = m.begin;
r.length = m.length;
try {
r.sendMessage(dout);
} catch (IOException ioe) {}
}
}
} }
sendQueue.notifyAll(); sendQueue.notifyAll();
} }
...@@ -349,12 +358,19 @@ class PeerConnectionOut implements Runnable ...@@ -349,12 +358,19 @@ class PeerConnectionOut implements Runnable
void sendBitfield(BitField bitfield) void sendBitfield(BitField bitfield)
{ {
Message m = new Message(); boolean fast = peer.supportsFast();
m.type = Message.BITFIELD; if (fast && bitfield.complete()) {
m.data = bitfield.getFieldBytes(); sendHaveAll();
m.off = 0; } else if (fast && bitfield.count() <= 0) {
m.len = m.data.length; sendHaveNone();
addMessage(m); } else {
Message m = new Message();
m.type = Message.BITFIELD;
m.data = bitfield.getFieldBytes();
m.off = 0;
m.len = m.data.length;
addMessage(m);
}
} }
/** reransmit requests not received in 7m */ /** reransmit requests not received in 7m */
...@@ -559,4 +575,50 @@ class PeerConnectionOut implements Runnable ...@@ -559,4 +575,50 @@ class PeerConnectionOut implements Runnable
m.piece = port; m.piece = port;
addMessage(m); addMessage(m);
} }
/**
* Unused
* @since 0.9.21
*/
void sendSuggest(int piece) {
Message m = new Message();
m.type = Message.SUGGEST;
m.piece = piece;
addMessage(m);
}
/** @since 0.9.21 */
private void sendHaveAll() {
Message m = new Message();
m.type = Message.HAVE_ALL;
addMessage(m);
}
/** @since 0.9.21 */
private void sendHaveNone() {
Message m = new Message();
m.type = Message.HAVE_NONE;
addMessage(m);
}
/** @since 0.9.21 */
void sendReject(int piece, int begin, int length) {
Message m = new Message();
m.type = Message.REJECT;
m.piece = piece;
m.begin = begin;
m.length = length;
addMessage(m);
}
/**
* Unused
* @since 0.9.21
*/
void sendAllowedFast(int piece) {
Message m = new Message();
m.type = Message.ALLOWED_FAST;
m.piece = piece;
addMessage(m);
}
} }
...@@ -155,10 +155,17 @@ class PeerState implements DataLoader ...@@ -155,10 +155,17 @@ class PeerState implements DataLoader
setInteresting(true); setInteresting(true);
} }
void bitfieldMessage(byte[] bitmap) void bitfieldMessage(byte[] bitmap) {
{ bitfieldMessage(bitmap, false);
synchronized(this) }
{
/**
* @param bitmap null to use the isAll param
* @param isAll only if bitmap == null: true for have_all, false for have_none
* @since 0.9.21
*/
private void bitfieldMessage(byte[] bitmap, boolean isAll) {
synchronized(this) {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " rcv bitfield"); _log.debug(peer + " rcv bitfield");
if (bitfield != null) if (bitfield != null)
...@@ -172,10 +179,24 @@ class PeerState implements DataLoader ...@@ -172,10 +179,24 @@ class PeerState implements DataLoader
// XXX - Check for weird bitfield and disconnect? // XXX - Check for weird bitfield and disconnect?
// FIXME will have to regenerate the bitfield after we know exactly // FIXME will have to regenerate the bitfield after we know exactly
// how many pieces there are, as we don't know how many spare bits there are. // how many pieces there are, as we don't know how many spare bits there are.
if (metainfo == null) if (metainfo == null) {
bitfield = new BitField(bitmap, bitmap.length * 8); if (bitmap != null) {
else bitfield = new BitField(bitmap, bitmap.length * 8);
bitfield = new BitField(bitmap, metainfo.getPieces()); } else {
// we can't handle this situation
if (_log.shouldLog(Log.WARN))
_log.warn("have_x w/o metainfo: " + isAll);
return;
}
} else {
if (bitmap != null) {
bitfield = new BitField(bitmap, metainfo.getPieces());
} else {
bitfield = new BitField(metainfo.getPieces());
if (isAll)
bitfield.setAll();
}
}
} }
if (metainfo == null) if (metainfo == null)
return; return;
...@@ -198,12 +219,17 @@ class PeerState implements DataLoader ...@@ -198,12 +219,17 @@ class PeerState implements DataLoader
+ piece + ", " + begin + ", " + length + ") "); + piece + ", " + begin + ", " + length + ") ");
if (metainfo == null) if (metainfo == null)
return; return;
if (choking) if (choking) {
{ if (peer.supportsFast()) {
if (_log.shouldLog(Log.INFO)) if (_log.shouldInfo())
_log.info("Request received, but choking " + peer); _log.info("Request received, sending reject to choked " + peer);
out.sendReject(piece, begin, length);
} else {
if (_log.shouldInfo())
_log.info("Request received, but choking " + peer);
}
return; return;
} }
// Sanity check // Sanity check
if (piece < 0 if (piece < 0
...@@ -227,8 +253,14 @@ class PeerState implements DataLoader ...@@ -227,8 +253,14 @@ class PeerState implements DataLoader
// Todo: limit number of requests also? (robert 64 x 4KB) // Todo: limit number of requests also? (robert 64 x 4KB)
if (out.queuedBytes() + length > MAX_PIPELINE_BYTES) if (out.queuedBytes() + length > MAX_PIPELINE_BYTES)
{ {
if (_log.shouldLog(Log.WARN)) if (peer.supportsFast()) {
_log.warn("Discarding request over pipeline limit from " + peer); if (_log.shouldWarn())
_log.warn("Rejecting request over pipeline limit from " + peer);
out.sendReject(piece, begin, length);
} else {
if (_log.shouldWarn())
_log.warn("Discarding request over pipeline limit from " + peer);
}
return; return;
} }
...@@ -536,6 +568,58 @@ class PeerState implements DataLoader ...@@ -536,6 +568,58 @@ class PeerState implements DataLoader
listener.gotPort(peer, port, port + 1); listener.gotPort(peer, port, port + 1);
} }
/////////// fast message handlers /////////
/**
* BEP 6
* Treated as "have" for now
* @since 0.9.21
*/
void suggestMessage(int piece) {
if (_log.shouldInfo())
_log.info("Handling suggest as have(" + piece + ") from " + peer);
haveMessage(piece);
}
/**
* BEP 6
* @param isAll true for have_all, false for have_none
* @since 0.9.21
*/
void haveMessage(boolean isAll) {
bitfieldMessage(null, isAll);
}
/**
* BEP 6
* @since 0.9.21
*/
void rejectMessage(int piece, int begin, int length) {
if (_log.shouldInfo())
_log.info("Got reject(" + piece + ',' + begin + ',' + length + ") from " + peer);
out.cancelRequest(piece, begin, length);
synchronized(this) {
for (Iterator<Request> iter = outstandingRequests.iterator(); iter.hasNext(); ) {
Request req = iter.next();
if (req.getPiece() == piece && req.off == begin && req.len == length)
iter.remove();
}
if (lastRequest != null && lastRequest.getPiece() == piece &&
lastRequest.off == begin && lastRequest.len == length)
lastRequest = null;
}
}
/**
* BEP 6
* Ignored for now
* @since 0.9.21
*/
void allowedFastMessage(int piece) {
if (_log.shouldInfo())
_log.info("Ignoring allowed_fast(" + piece + ") from " + peer);
}
void unknownMessage(int type, byte[] bs) void unknownMessage(int type, byte[] bs)
{ {
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
...@@ -543,6 +627,8 @@ class PeerState implements DataLoader ...@@ -543,6 +627,8 @@ class PeerState implements DataLoader
+ " length: " + bs.length); + " length: " + bs.length);
} }
/////////// end message handlers /////////
/** /**
* We now have this piece. * We now have this piece.
* Tell the peer and cancel any requests for the piece. * Tell the peer and cancel any requests for the piece.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment