i2psnark: Add WebSeed support - WIP - not hooked in yet

This commit is contained in:
zzz
2020-12-06 12:43:55 +00:00
parent 1097220d31
commit 48b8886224
3 changed files with 735 additions and 7 deletions

View File

@@ -42,14 +42,14 @@ import org.klomp.snark.bencode.InvalidBEncodingException;
public class Peer implements Comparable<Peer>
{
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(Peer.class);
protected final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
// Identifying property, the peer id of the other side.
private final PeerID peerID;
private final byte[] my_id;
private final byte[] infohash;
/** will start out null in magnet mode */
private MetaInfo metainfo;
protected MetaInfo metainfo;
private Map<String, BEValue> handshakeMap;
// The data in/output streams set during the handshake and used by
@@ -61,8 +61,11 @@ public class Peer implements Comparable<Peer>
private final AtomicLong downloaded = new AtomicLong();
private final AtomicLong uploaded = new AtomicLong();
// Keeps state for in/out connections. Non-null when the handshake
// was successful, the connection setup and runs
/** `
* Keeps state for in/out connections. Non-null when the handshake
* was successful, the connection setup and runs.
* Do not access directly. All actions should be through Peer methods.
*/
volatile PeerState state;
/** shared across all peers on this torrent */
@@ -77,8 +80,8 @@ public class Peer implements Comparable<Peer>
final static long CHECK_PERIOD = PeerCoordinator.CHECK_PERIOD; // 40 seconds
final static int RATE_DEPTH = PeerCoordinator.RATE_DEPTH; // make following arrays RATE_DEPTH long
private long uploaded_old[] = {-1,-1,-1};
private long downloaded_old[] = {-1,-1,-1};
private final long uploaded_old[] = {-1,-1,-1};
private final long downloaded_old[] = {-1,-1,-1};
private static final byte[] HANDSHAKE = DataHelper.getASCII("BitTorrent protocol");
// See BEP 4 for definitions
@@ -341,7 +344,6 @@ public class Peer implements Comparable<Peer>
private byte[] handshake(InputStream in, OutputStream out)
throws IOException
{
din = new DataInputStream(in);
dout = new DataOutputStream(out);
// Handshake write - header
@@ -366,6 +368,7 @@ public class Peer implements Comparable<Peer>
_log.debug("Wrote my shared hash and ID to " + toString());
// Handshake read - header
din = new DataInputStream(in);
byte b = din.readByte();
if (b != HANDSHAKE.length)
throw new IOException("Handshake failure, expected 19, got "
@@ -789,4 +792,12 @@ public class Peer implements Comparable<Peer>
void setTotalCommentsSent(int count) {
_totalCommentsSent = count;
}
/**
* @return false
* @since 0.9.49
*/
public boolean isWebPeer() {
return false;
}
}

View File

@@ -26,6 +26,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -152,6 +153,10 @@ class PeerCoordinator implements PeerListener
private static final long COMMENT_REQ_INTERVAL = 12*60*60*1000L;
private static final long COMMENT_REQ_DELAY = 60*60*1000L;
private static final int MAX_COMMENT_NOT_REQ = 10;
/** hostname to expire time, sync on this */
private Map<String, Long> _webPeerBans;
private static final long WEBPEER_BAN_TIME = 30*60*1000L;
/**
* @param metainfo null if in magnet mode
@@ -916,6 +921,7 @@ class PeerCoordinator implements PeerListener
// As connections are already up, new Pieces will
// not have their PeerID list populated, so do that.
for (Peer p : peers) {
// TODO don't access state directly
PeerState s = p.state;
if (s != null) {
BitField bf = s.bitfield;
@@ -1299,6 +1305,7 @@ class PeerCoordinator implements PeerListener
if (++seeds >= 4)
break;
} else {
// TODO don't access state directly
PeerState state = pr.state;
if (state == null) continue;
BitField bf = state.bitfield;
@@ -1336,6 +1343,7 @@ class PeerCoordinator implements PeerListener
// Temporary? So PeerState never calls wantPiece() directly for now...
Piece piece = wantPiece(peer, havePieces, true);
if (piece != null) {
// TODO padding
return new PartialPiece(piece, metainfo.getPieceLength(piece.getId()), _util.getTempDir());
}
if (_log.shouldLog(Log.DEBUG))
@@ -1452,6 +1460,10 @@ class PeerCoordinator implements PeerListener
if (bev.getMap().get(ExtensionHandler.TYPE_PEX) != null) {
List<Peer> pList = peerList();
pList.remove(peer);
for (Iterator<Peer> iter = pList.iterator(); iter.hasNext(); ) {
if (iter.next().isWebPeer())
iter.remove();
}
if (!pList.isEmpty())
ExtensionHandler.sendPEX(peer, pList);
}
@@ -1749,5 +1761,43 @@ class PeerCoordinator implements PeerListener
public I2PSnarkUtil getUtil() {
return _util;
}
/**
* Ban a web peer for this torrent, for while or permanently.
* @param host the host name
* @since 0.9.49
*/
public synchronized void banWebPeer(String host, boolean isPermanent) {
if (_webPeerBans == null)
_webPeerBans = new HashMap<String, Long>(4);
Long time;
if (isPermanent) {
time = Long.valueOf(Long.MAX_VALUE);
} else {
long now = _util.getContext().clock().now();
time = Long.valueOf(now + WEBPEER_BAN_TIME);
}
Long old = _webPeerBans.put(host, time);
if (old != null && old.longValue() > time)
_webPeerBans.put(host, old);
}
/**
* Is a web peer banned?
* @param host the host name
* @since 0.9.49
*/
public synchronized boolean isWebPeerBanned(String host) {
if (_webPeerBans == null)
return false;
Long time = _webPeerBans.get(host);
if (time == null)
return false;
long now = _util.getContext().clock().now();
boolean rv = time.longValue() > now;
if (!rv)
_webPeerBans.remove(host);
return rv;
}
}

View File

@@ -0,0 +1,667 @@
package org.klomp.snark;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketEepGet;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.EepGet;
import net.i2p.util.Log;
/**
* BEP 19.
* Does not have an associated PeerState.
* All request tracking is done here.
* @since 0.9.49
*/
class WebPeer extends Peer implements EepGet.StatusListener {
private final PeerCoordinator _coordinator;
private final URI _uri;
// as received from coordinator
private final List<Request> outstandingRequests = new ArrayList<Request>();
private final boolean isMultiFile;
// needed?
private Request lastRequest;
private PeerListener listener;
private BitField bitfield;
private Thread thread;
private boolean connected;
private long lastRcvd;
private int maxRequests;
// to be recognized by the UI
public static final byte[] IDBytes = DataHelper.getASCII("WebSeedBEP19");
private static final long HEADER_TIMEOUT = 60*1000;
private static final long TOTAL_TIMEOUT = 10*60*1000;
private static final long INACTIVITY_TIMEOUT = 2*60*1000;
private static final long TARGET_FETCH_TIME = 2*60*1000;
// 128 KB
private static final int ABSOLUTE_MIN_REQUESTS = 8;
// 2 MB
private static final int ABSOLUTE_MAX_REQUESTS = 128;
private final int MIN_REQUESTS;
private final int MAX_REQUESTS;
/**
* Outgoing connection.
* Creates a disconnected peer given a PeerID, your own id and the
* relevant MetaInfo.
* @param uri must be http with .i2p host
* @param metainfo non-null
*/
public WebPeer(PeerCoordinator coord, URI uri, PeerID peerID, MetaInfo metainfo) {
super(peerID, null, null, metainfo);
// no use asking for more than the number of chunks in a piece
MAX_REQUESTS = Math.max(1, Math.min(ABSOLUTE_MAX_REQUESTS, metainfo.getPieceLength(0) / PeerState.PARTSIZE));
MIN_REQUESTS = Math.min(ABSOLUTE_MIN_REQUESTS, MAX_REQUESTS);
maxRequests = MIN_REQUESTS;
isMultiFile = metainfo.getLengths() != null;
_coordinator = coord;
// We'll assume the base path is already encoded, because
// it would have failed the checks in TrackerClient.getHostHash()
_uri = uri;
}
@Override
public String toString() {
return "WebSeed " + _uri;
}
/**
* @return socket debug string (for debug printing)
*/
@Override
public synchronized String getSocket() {
return toString() + ' ' + outstandingRequests.toString();
}
/**
* The hash code of a Peer is the hash code of the peerID.
*/
@Override
public int hashCode() {
return super.hashCode();
}
/**
* Two Peers are equal when they have the same PeerID.
* All other properties are ignored.
*/
@Override
public boolean equals(Object o) {
if (o instanceof WebPeer) {
WebPeer p = (WebPeer)o;
// TODO
return getPeerID().equals(p.getPeerID());
}
return false;
}
/**
* Runs the connection to the other peer. This method does not
* return until the connection is terminated.
*
* @param ignore our bitfield, ignore
* @param uploadOnly if we are complete with skipped files, i.e. a partial seed
*/
@Override
public void runConnection(I2PSnarkUtil util, PeerListener listener, BitField ignore,
MagnetState mState, boolean uploadOnly) {
if (uploadOnly)
return;
int fails = 0;
int successes = 0;
long dl = 0;
boolean notify = true;
ByteArrayOutputStream out = null;
// current requests per-loop
List<Request> requests = new ArrayList<Request>(8);
try {
if (!util.connected()) {
boolean ok = util.connect();
if (!ok)
return;
}
// This breaks out of the loop after any failure. TrackerClient will requeue eventually.
loop:
while (true) {
I2PSocketManager mgr = util.getSocketManager();
if (mgr == null)
return;
if (notify) {
synchronized(this) {
this.listener = listener;
bitfield = new BitField(metainfo.getPieces());
bitfield.setAll();
thread = Thread.currentThread();
connected = true;
}
listener.connected(this);
boolean want = listener.gotBitField(this, bitfield);
if (!want)
return;
listener.gotChoke(this, false);
notify = false;
}
synchronized(this) {
// clear out previous requests
if (!requests.isEmpty()) {
outstandingRequests.removeAll(requests);
requests.clear();
}
addRequest();
if (_log.shouldDebug())
_log.debug("Requests: " + outstandingRequests);
while (outstandingRequests.isEmpty()) {
if (_coordinator.getNeededLength() <= 0) {
if (_log.shouldDebug())
_log.debug("Complete: " + this);
break loop;
}
if (_log.shouldDebug())
_log.debug("No requests, sleeping: " + this);
connected = false;
out = null;
try {
this.wait();
} catch (InterruptedException ie) {
if (_log.shouldWarn())
_log.warn("Interrupted: " + this, ie);
break loop;
}
}
connected = true;
// Add current requests from outstandingRequests list and add to requests list.
// Do not remove from outstandingRequests until success.
lastRequest = outstandingRequests.get(0);
requests.add(lastRequest);
int piece = lastRequest.getPiece();
// Glue together additional requests if consecutive for a single piece.
// This will never glue together requests from different pieces,
// and the coordinator generally won't give us consecutive pieces anyway.
// Servers generally won't support multiple byte ranges anymore.
for (int i = 1; i < outstandingRequests.size(); i++) {
if (i >= maxRequests)
break;
Request r = outstandingRequests.get(i);
if (r.getPiece() == piece &&
lastRequest.off + lastRequest.len == r.off) {
requests.add(r);
lastRequest = r;
} else {
// all requests for a piece should be together, but not in practice
// as orphaned requests can get in-between
//break;
}
}
}
// total values
Request first = requests.get(0);
Request last = requests.get(requests.size() - 1);
int piece = first.getPiece();
int off = first.off;
long toff = (((long) piece) * metainfo.getPieceLength(0)) + off;
int tlen = (last.off - first.off) + last.len;
long start = System.currentTimeMillis();
///// TODO direct to file, not in-memory
if (out == null)
out = new ByteArrayOutputStream(tlen);
else
out.reset();
int filenum = -1;
// Loop for each file if multifile and crosses file boundaries.
// Once only for single file.
while (out.size() < tlen) {
// need these three things:
// url to fetch
String url;
// offset in fetched file
long foff;
// length to fetch, will be adjusted if crossing a file boundary
int flen = tlen - out.size();
if (isMultiFile) {
// multifile
List<Long> lengths = metainfo.getLengths();
long limit = 0;
if (filenum < 0) {
// find the first file number and limit
// inclusive
long fstart = 0;
// exclusive
long fend = 0;
foff = 0; // keep compiler happy, will always be re-set
for (int f = 0; f < lengths.size(); f++) {
long filelen = lengths.get(f).longValue();
fend = fstart + filelen;
if (toff < fend) {
filenum = f;
foff = toff - fstart;
limit = fend - toff;
break;
}
fstart += filelen;
}
if (filenum < 0)
throw new IllegalStateException(lastRequest.toString());
} else {
// next file
filenum++;
foff = 0;
limit = lengths.get(filenum).longValue();
}
if (limit > 0 && flen > limit)
flen = (int) limit;
if (metainfo.isPaddingFile(filenum)) {
for (int i = 0; i < flen; i++) {
out.write((byte) 0);
}
if (_log.shouldDebug())
_log.debug("Skipped padding file " + filenum);
continue;
}
// build url
String uri = _uri.toString();
StringBuilder buf = new StringBuilder(uri.length() + 128);
buf.append(uri);
if (!uri.endsWith("/"))
buf.append('/');
// See BEP 19 rules
URIUtil.encodePath(buf, metainfo.getName());
List<String> path = metainfo.getFiles().get(filenum);
for (int i = 0; i < path.size(); i++) {
buf.append('/');
URIUtil.encodePath(buf, path.get(i));
}
url = buf.toString();
} else {
// single file
// See BEP 19 rules
String uri = _uri.toString();
if (uri.endsWith("/"))
url = uri + URIUtil.encodePath(metainfo.getName());
else
url = uri;
foff = toff;
flen = tlen;
}
// do the fetch
EepGet get = new I2PSocketEepGet(util.getContext(), mgr, 0, flen, flen, null, out, url);
get.addHeader("User-Agent", I2PSnarkUtil.EEPGET_USER_AGENT);
get.addHeader("Range", "bytes=" + foff + '-' + (foff + flen - 1));
get.addStatusListener(this);
int osz = out.size();
if (_log.shouldDebug())
_log.debug("Fetching piece: " + piece + " offset: " + off + " file offset: " + foff + " len: " + flen + " from " + url);
if (get.fetch(HEADER_TIMEOUT, TOTAL_TIMEOUT, INACTIVITY_TIMEOUT)) {
int resp = get.getStatusCode();
if (resp != 200 && resp != 206) {
fail(url, resp);
return;
}
int sz = out.size() - osz;
if (sz != flen) {
if (_log.shouldWarn())
_log.warn("Fetch of " + url + " received: " + sz + " expected: " + flen);
return;
}
} else {
if (out.size() > 0) {
// save any complete chunks received
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
for (Iterator<Request> iter = requests.iterator(); iter.hasNext(); ) {
Request req = iter.next();
if (dis.available() < req.len)
break;
req.read(dis);
iter.remove();
if (_log.shouldWarn())
_log.warn("Saved chunk " + req + " recvd before failure");
}
}
int resp = get.getStatusCode();
fail(url, resp);
return;
}
successes++;
dl += flen;
if (!isMultiFile)
break;
} // for each file
// all data received successfully, now process it
if (_log.shouldDebug())
_log.debug("Fetch of piece: " + piece + " chunks: " + requests.size() + " offset: " + off + " torrent offset: " + toff + " len: " + tlen + " successful");
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
for (Request req : requests) {
req.read(dis);
}
PartialPiece pp = last.getPartialPiece();
synchronized(pp) {
// Last chunk needed for this piece?
if (pp.getLength() == pp.getDownloaded()) {
if (listener.gotPiece(this, pp)) {
if (_log.shouldDebug())
_log.debug("Got " + piece + ": " + this);
} else {
if (_log.shouldWarn())
_log.warn("Got BAD " + piece + " from " + this);
return;
}
} else {
// piece not complete
}
}
long time = lastRcvd - start;
if (time < TARGET_FETCH_TIME)
maxRequests = Math.min(MAX_REQUESTS, 2 * maxRequests);
else if (time > 2 * TARGET_FETCH_TIME)
maxRequests = Math.max(MIN_REQUESTS, maxRequests / 2);
} // request loop
} catch(IOException eofe) {
if (_log.shouldWarn())
_log.warn(toString(), eofe);
} finally {
List<Request> pcs = returnPartialPieces();
synchronized(this) {
connected = false;
outstandingRequests.clear();
}
requests.clear();
if (!pcs.isEmpty())
listener.savePartialPieces(this, pcs);
listener.disconnected(this);
disconnect();
if (_log.shouldWarn())
_log.warn("Completed, successful fetches: " + successes + " downloaded: " + dl + " for " + this);
}
}
private void fail(String url, int resp) {
if (_log.shouldWarn())
_log.warn("Fetch of " + url + " failed, rc: " + resp);
if (resp == 301 || resp == 308 ||
resp == 401 || resp == 403 || resp == 404 || resp == 410 || resp == 414 || resp == 416 || resp == 451) {
// ban forever
_coordinator.banWebPeer(_uri.getHost(), true);
if (_log.shouldWarn())
_log.warn("Permanently banning the webseed " + url);
} else if (resp == 429 || resp == 503) {
// ban for a while
_coordinator.banWebPeer(_uri.getHost(), false);
if (_log.shouldWarn())
_log.warn("Temporarily banning the webseed " + url);
}
}
@Override
public int getMaxPipeline() {
return maxRequests;
}
@Override
public boolean isConnected() {
synchronized(this) {
return connected;
}
}
@Override
synchronized void disconnect() {
if (thread != null)
thread.interrupt();
}
@Override
public void have(int piece) {}
@Override
void cancel(int piece) {}
@Override
void request() {
addRequest();
}
@Override
public boolean isInterested() {
return false;
}
@Deprecated
@Override
public void setInteresting(boolean interest) {}
@Override
public boolean isInteresting() {
return true;
}
@Override
public void setChoking(boolean choke) {}
@Override
public boolean isChoking() {
return false;
}
@Override
public boolean isChoked() {
return false;
}
@Override
public long getInactiveTime() {
if (lastRcvd <= 0)
return -1;
long now = System.currentTimeMillis();
return now - lastRcvd;
}
@Override
public long getMaxInactiveTime() {
return PeerCoordinator.MAX_INACTIVE;
}
@Override
public void keepAlive() {}
@Override
public void retransmitRequests() {}
@Override
public int completed() {
return metainfo.getPieces();
}
@Override
public boolean isCompleted() {
return true;
}
/**
* @return true
* @since 0.9.49
*/
@Override
public boolean isWebPeer() {
return false;
}
// private methods below here implementing parts of PeerState
private synchronized void addRequest() {
boolean more_pieces = true;
while (more_pieces) {
more_pieces = outstandingRequests.size() < getMaxPipeline();
// We want something and we don't have outstanding requests?
if (more_pieces && lastRequest == null) {
// we have nothing in the queue right now
more_pieces = requestNextPiece();
} else if (more_pieces) {
// We want something
int pieceLength;
boolean isLastChunk;
pieceLength = metainfo.getPieceLength(lastRequest.getPiece());
isLastChunk = lastRequest.off + lastRequest.len == pieceLength;
// Last part of a piece?
if (isLastChunk) {
more_pieces = requestNextPiece();
} else {
PartialPiece nextPiece = lastRequest.getPartialPiece();
int nextBegin = lastRequest.off + PeerState.PARTSIZE;
int maxLength = pieceLength - nextBegin;
int nextLength = maxLength > PeerState.PARTSIZE ? PeerState.PARTSIZE
: maxLength;
Request req = new Request(nextPiece,nextBegin, nextLength);
outstandingRequests.add(req);
lastRequest = req;
this.notifyAll();
}
}
}
}
/**
* Starts requesting first chunk of next piece. Returns true if
* something has been added to the requests, false otherwise.
*/
private synchronized boolean requestNextPiece() {
// Check for adopting an orphaned partial piece
PartialPiece pp = listener.getPartialPiece(this, bitfield);
if (pp != null) {
// Double-check that r not already in outstandingRequests
if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) {
Request r = pp.getRequest();
outstandingRequests.add(r);
lastRequest = r;
this.notifyAll();
return true;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Got dup from coord: " + pp);
pp.release();
}
}
// failsafe
// However this is bad as it thrashes the peer when we change our mind
// Ticket 691 cause here?
if (outstandingRequests.isEmpty())
lastRequest = null;
/*
// If we are not in the end game, we may run out of things to request
// because we are asking other peers. Set not-interesting now rather than
// wait for those other requests to be satisfied via havePiece()
if (interesting && lastRequest == null) {
interesting = false;
out.sendInterest(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " nothing more to request, now uninteresting");
}
*/
return false;
}
/**
* @return all pieces we are currently requesting, or empty Set
*/
private synchronized Set<Integer> getRequestedPieces() {
Set<Integer> rv = new HashSet<Integer>(outstandingRequests.size() + 1);
for (Request req : outstandingRequests) {
rv.add(Integer.valueOf(req.getPiece()));
}
return rv;
}
/**
* @return index in outstandingRequests or -1
*/
private synchronized int getFirstOutstandingRequest(int piece) {
for (int i = 0; i < outstandingRequests.size(); i++) {
if (outstandingRequests.get(i).getPiece() == piece)
return i;
}
return -1;
}
private synchronized List<Request> returnPartialPieces() {
Set<Integer> pcs = getRequestedPieces();
List<Request> rv = new ArrayList<Request>(pcs.size());
for (Integer p : pcs) {
Request req = getLowestOutstandingRequest(p.intValue());
if (req != null) {
PartialPiece pp = req.getPartialPiece();
synchronized(pp) {
int dl = pp.getDownloaded();
if (req.off != dl)
req = new Request(pp, dl);
}
rv.add(req);
}
}
outstandingRequests.clear();
return rv;
}
private synchronized Request getLowestOutstandingRequest(int piece) {
Request rv = null;
int lowest = Integer.MAX_VALUE;
for (Request r : outstandingRequests) {
if (r.getPiece() == piece && r.off < lowest) {
lowest = r.off;
rv = r;
}
}
return rv;
}
// EepGet status listeners to maintain the state for the web page
public void bytesTransferred(long alreadyTransferred, int currentWrite, long bytesTransferred, long bytesRemaining, String url) {
lastRcvd = System.currentTimeMillis();
downloaded(currentWrite);
listener.downloaded(this, currentWrite);
}
public void attemptFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt, int numRetries, Exception cause) {}
public void transferComplete(long alreadyTransferred, long bytesTransferred, long bytesRemaining, String url, String outputFile, boolean notModified) {}
public void transferFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt) {}
public void headerReceived(String url, int attemptNum, String key, String val) {}
public void attempting(String url) {}
// End of EepGet status listeners
}