WebPeer.java 23.96 KiB
package org.klomp.snark;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketEepGet;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.EepGet;
import net.i2p.util.Log;
/**
* BEP 19.
* Does not have an associated PeerState.
* All request tracking is done here.
* @since 0.9.49
*/
class WebPeer extends Peer implements EepGet.StatusListener {
private final PeerCoordinator _coordinator;
private final URI _uri;
// as received from coordinator
private final List<Request> outstandingRequests = new ArrayList<Request>();
private final boolean isMultiFile;
// needed?
private Request lastRequest;
private PeerListener listener;
private BitField bitfield;
private Thread thread;
private boolean connected;
private long lastRcvd;
private int maxRequests;
// to be recognized by the UI
public static final byte[] IDBytes = DataHelper.getASCII("WebSeedBEP19");
private static final long HEADER_TIMEOUT = 60*1000;
private static final long TOTAL_TIMEOUT = 10*60*1000;
private static final long INACTIVITY_TIMEOUT = 2*60*1000;
private static final long TARGET_FETCH_TIME = 2*60*1000;
// 128 KB
private static final int ABSOLUTE_MIN_REQUESTS = 8;
// 2 MB
private static final int ABSOLUTE_MAX_REQUESTS = 128;
private final int MIN_REQUESTS;
private final int MAX_REQUESTS;
/**
* Outgoing connection.
* Creates a disconnected peer given a PeerID, your own id and the
* relevant MetaInfo.
* @param uri must be http with .i2p host
* @param metainfo non-null
*/
public WebPeer(PeerCoordinator coord, URI uri, PeerID peerID, MetaInfo metainfo) {
super(peerID, null, null, metainfo);
// no use asking for more than the number of chunks in a piece
MAX_REQUESTS = Math.max(1, Math.min(ABSOLUTE_MAX_REQUESTS, metainfo.getPieceLength(0) / PeerState.PARTSIZE));
MIN_REQUESTS = Math.min(ABSOLUTE_MIN_REQUESTS, MAX_REQUESTS);
maxRequests = MIN_REQUESTS;
isMultiFile = metainfo.getLengths() != null;
_coordinator = coord;
// We'll assume the base path is already encoded, because
// it would have failed the checks in TrackerClient.getHostHash()
_uri = uri;
}
@Override
public String toString() {
return "WebSeed " + _uri;
}
/**
* @return socket debug string (for debug printing)
*/
@Override
public synchronized String getSocket() {
return toString() + ' ' + outstandingRequests.toString();
}
/**
* The hash code of a Peer is the hash code of the peerID.
*/
@Override
public int hashCode() {
return super.hashCode();
}
/**
* Two Peers are equal when they have the same PeerID.
* All other properties are ignored.
*/
@Override
public boolean equals(Object o) {
if (o instanceof WebPeer) {
WebPeer p = (WebPeer)o;
// TODO
return getPeerID().equals(p.getPeerID());
}
return false;
}
/**
* Runs the connection to the other peer. This method does not
* return until the connection is terminated.
*
* @param ignore our bitfield, ignore
* @param uploadOnly if we are complete with skipped files, i.e. a partial seed
*/
@Override
public void runConnection(I2PSnarkUtil util, PeerListener listener, BandwidthListener bwl, BitField ignore,
MagnetState mState, boolean uploadOnly) {
if (uploadOnly)
return;
int fails = 0;
int successes = 0;
long dl = 0;
boolean notify = true;
ByteArrayOutputStream out = null;
// current requests per-loop
List<Request> requests = new ArrayList<Request>(8);
try {
if (!util.connected()) {
boolean ok = util.connect();
if (!ok)
return;
}
// This breaks out of the loop after any failure. TrackerClient will requeue eventually.
loop:
while (true) {
I2PSocketManager mgr = util.getSocketManager();
if (mgr == null)
return;
if (notify) {
synchronized(this) {
this.listener = listener;
bitfield = new BitField(metainfo.getPieces());
bitfield.setAll();
thread = Thread.currentThread();
connected = true;
}
listener.connected(this);
boolean want = listener.gotBitField(this, bitfield);
if (!want)
return;
listener.gotChoke(this, false);
notify = false;
}
synchronized(this) {
// clear out previous requests
if (!requests.isEmpty()) {
outstandingRequests.removeAll(requests);
requests.clear();
}
addRequest();
if (_log.shouldDebug())
_log.debug("Requests: " + outstandingRequests);
while (outstandingRequests.isEmpty()) {
if (_coordinator.getNeededLength() <= 0) {
if (_log.shouldDebug())
_log.debug("Complete: " + this);
break loop;
}
if (_log.shouldDebug())
_log.debug("No requests, sleeping: " + this);
connected = false;
out = null;
try {
this.wait();
} catch (InterruptedException ie) {
if (_log.shouldWarn())
_log.warn("Interrupted: " + this, ie);
break loop;
}
}
connected = true;
// Add current requests from outstandingRequests list and add to requests list.
// Do not remove from outstandingRequests until success.
lastRequest = outstandingRequests.get(0);
requests.add(lastRequest);
int piece = lastRequest.getPiece();
// Glue together additional requests if consecutive for a single piece.
// This will never glue together requests from different pieces,
// and the coordinator generally won't give us consecutive pieces anyway.
// Servers generally won't support multiple byte ranges anymore.
for (int i = 1; i < outstandingRequests.size(); i++) {
if (i >= maxRequests)
break;
Request r = outstandingRequests.get(i);
if (!shouldRequest(r.len))
break;
if (r.getPiece() == piece &&
lastRequest.off + lastRequest.len == r.off) {
requests.add(r);
lastRequest = r;
} else {
// all requests for a piece should be together, but not in practice
// as orphaned requests can get in-between
//break;
}
}
}
// total values
Request first = requests.get(0);
Request last = requests.get(requests.size() - 1);
int piece = first.getPiece();
int off = first.off;
long toff = (((long) piece) * metainfo.getPieceLength(0)) + off;
int tlen = (last.off - first.off) + last.len;
long start = System.currentTimeMillis();
///// TODO direct to file, not in-memory
if (out == null)
out = new ByteArrayOutputStream(tlen);
else
out.reset();
int filenum = -1;
// Loop for each file if multifile and crosses file boundaries.
// Once only for single file.
while (out.size() < tlen) {
// need these three things:
// url to fetch
String url;
// offset in fetched file
long foff;
// length to fetch, will be adjusted if crossing a file boundary
int flen = tlen - out.size();
if (isMultiFile) {
// multifile
List<Long> lengths = metainfo.getLengths();
long limit = 0;
if (filenum < 0) {
// find the first file number and limit
// inclusive
long fstart = 0;
// exclusive
long fend = 0;
foff = 0; // keep compiler happy, will always be re-set
for (int f = 0; f < lengths.size(); f++) {
long filelen = lengths.get(f).longValue();
fend = fstart + filelen;
if (toff < fend) {
filenum = f;
foff = toff - fstart;
limit = fend - toff;
break;
}
fstart += filelen;
}
if (filenum < 0)
throw new IllegalStateException(lastRequest.toString());
} else {
// next file
filenum++;
foff = 0;
limit = lengths.get(filenum).longValue();
}
if (limit > 0 && flen > limit)
flen = (int) limit;
if (metainfo.isPaddingFile(filenum)) {
for (int i = 0; i < flen; i++) {
out.write((byte) 0);
}
if (_log.shouldDebug())
_log.debug("Skipped padding file " + filenum);
continue;
}
// build url
String uri = _uri.toString();
StringBuilder buf = new StringBuilder(uri.length() + 128);
buf.append(uri);
if (!uri.endsWith("/"))
buf.append('/');
// See BEP 19 rules
URIUtil.encodePath(buf, metainfo.getName());
List<String> path = metainfo.getFiles().get(filenum);
for (int i = 0; i < path.size(); i++) {
buf.append('/');
URIUtil.encodePath(buf, path.get(i));
}
url = buf.toString();
} else {
// single file
// See BEP 19 rules
String uri = _uri.toString();
if (uri.endsWith("/"))
url = uri + URIUtil.encodePath(metainfo.getName());
else
url = uri;
foff = toff;
flen = tlen;
}
// do the fetch
EepGet get = new I2PSocketEepGet(util.getContext(), mgr, 0, flen, flen, null, out, url);
get.addHeader("User-Agent", I2PSnarkUtil.EEPGET_USER_AGENT);
get.addHeader("Range", "bytes=" + foff + '-' + (foff + flen - 1));
get.addStatusListener(this);
int osz = out.size();
if (_log.shouldDebug())
_log.debug("Fetching piece: " + piece + " offset: " + off + " file offset: " + foff + " len: " + flen + " from " + url);
if (get.fetch(HEADER_TIMEOUT, TOTAL_TIMEOUT, INACTIVITY_TIMEOUT)) {
int resp = get.getStatusCode();
if (resp != 200 && resp != 206) {
fail(url, resp);
return;
}
int sz = out.size() - osz;
if (sz != flen) {
if (_log.shouldWarn())
_log.warn("Fetch of " + url + " received: " + sz + " expected: " + flen);
return;
}
} else {
if (out.size() > 0) {
// save any complete chunks received
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
for (Iterator<Request> iter = requests.iterator(); iter.hasNext(); ) {
Request req = iter.next();
if (dis.available() < req.len)
break;
req.read(dis, this);
iter.remove();
if (_log.shouldWarn())
_log.warn("Saved chunk " + req + " recvd before failure");
}
}
int resp = get.getStatusCode();
fail(url, resp);
return;
}
successes++;
dl += flen;
if (!isMultiFile)
break;
} // for each file
// all data received successfully, now process it
if (_log.shouldDebug())
_log.debug("Fetch of piece: " + piece + " chunks: " + requests.size() + " offset: " + off + " torrent offset: " + toff + " len: " + tlen + " successful");
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
for (Request req : requests) {
req.read(dis, this);
}
PartialPiece pp = last.getPartialPiece();
synchronized(pp) {
// Last chunk needed for this piece?
if (pp.isComplete()) {
if (listener.gotPiece(this, pp)) {
if (_log.shouldDebug())
_log.debug("Got " + piece + ": " + this);
} else {
if (_log.shouldWarn())
_log.warn("Got BAD " + piece + " from " + this);
return;
}
} else {
// piece not complete
}
}
long time = lastRcvd - start;
if (time < TARGET_FETCH_TIME)
maxRequests = Math.min(MAX_REQUESTS, 2 * maxRequests);
else if (time > 2 * TARGET_FETCH_TIME)
maxRequests = Math.max(MIN_REQUESTS, maxRequests / 2);
} // request loop
} catch(IOException eofe) {
if (_log.shouldWarn())
_log.warn(toString(), eofe);
} finally {
List<Request> pcs = returnPartialPieces();
synchronized(this) {
connected = false;
outstandingRequests.clear();
}
requests.clear();
if (!pcs.isEmpty())
listener.savePartialPieces(this, pcs);
listener.disconnected(this);
disconnect();
if (_log.shouldWarn())
_log.warn("Completed, successful fetches: " + successes + " downloaded: " + dl + " for " + this);
}
}
private void fail(String url, int resp) {
if (_log.shouldWarn())
_log.warn("Fetch of " + url + " failed, rc: " + resp);
if (resp == 301 || resp == 308 ||
resp == 401 || resp == 403 || resp == 404 || resp == 410 || resp == 414 || resp == 416 || resp == 451) {
// ban forever
_coordinator.banWebPeer(_uri.getHost(), true);
if (_log.shouldWarn())
_log.warn("Permanently banning the webseed " + url);
} else if (resp == 429 || resp == 503) {
// ban for a while
_coordinator.banWebPeer(_uri.getHost(), false);
if (_log.shouldWarn())
_log.warn("Temporarily banning the webseed " + url);
}
}
@Override
public int getMaxPipeline() {
return maxRequests;
}
@Override
public boolean isConnected() {
synchronized(this) {
return connected;
}
}
@Override
synchronized void disconnect() {
if (thread != null)
thread.interrupt();
}
@Override
public void have(int piece) {}
@Override
void cancel(int piece) {}
@Override
void request() {
addRequest();
}
@Override
public boolean isInterested() {
return false;
}
@Deprecated
@Override
public void setInteresting(boolean interest) {}
@Override
public boolean isInteresting() {
return true;
}
@Override
public void setChoking(boolean choke) {}
@Override
public boolean isChoking() {
return false;
}
@Override
public boolean isChoked() {
return false;
}
@Override
public long getInactiveTime() {
if (lastRcvd <= 0)
return -1;
long now = System.currentTimeMillis();
return now - lastRcvd;
}
@Override
public long getMaxInactiveTime() {
return PeerCoordinator.MAX_INACTIVE;
}
@Override
public void keepAlive() {}
@Override
public void retransmitRequests() {}
@Override
public int completed() {
return metainfo.getPieces();
}
@Override
public boolean isCompleted() {
return true;
}
/**
* @return true
* @since 0.9.49
*/
@Override
public boolean isWebPeer() {
return false;
}
// begin BandwidthListener interface overrides
// Because super doesn't have a PeerState
/**
* @since 0.9.62
*/
@Override
public void downloaded(int size) {
super.downloaded(size);
_coordinator.downloaded(size);
}
/**
* Should we request this many bytes?
* @since 0.9.62
*/
@Override
public boolean shouldRequest(int size) {
return _coordinator.shouldRequest(this, size);
}
// end BandwidthListener interface overrides
// private methods below here implementing parts of PeerState
private synchronized void addRequest() {
boolean more_pieces = true;
while (more_pieces) {
more_pieces = outstandingRequests.size() < getMaxPipeline();
// We want something and we don't have outstanding requests?
if (more_pieces && lastRequest == null) {
// we have nothing in the queue right now
more_pieces = requestNextPiece();
} else if (more_pieces) {
// We want something
int pieceLength;
boolean isLastChunk;
pieceLength = metainfo.getPieceLength(lastRequest.getPiece());
isLastChunk = lastRequest.off + lastRequest.len == pieceLength;
// Last part of a piece?
if (isLastChunk) {
more_pieces = requestNextPiece();
} else {
PartialPiece nextPiece = lastRequest.getPartialPiece();
int nextBegin = lastRequest.off + PeerState.PARTSIZE;
int maxLength = pieceLength - nextBegin;
int nextLength = maxLength > PeerState.PARTSIZE ? PeerState.PARTSIZE
: maxLength;
Request req = new Request(nextPiece,nextBegin, nextLength);
outstandingRequests.add(req);
lastRequest = req;
if (shouldRequest(maxLength))
this.notifyAll();
}
}
}
}
/**
* Starts requesting first chunk of next piece. Returns true if
* something has been added to the requests, false otherwise.
*/
private synchronized boolean requestNextPiece() {
// Check for adopting an orphaned partial piece
PartialPiece pp = listener.getPartialPiece(this, bitfield);
if (pp != null) {
// Double-check that r not already in outstandingRequests
if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) {
Request r = pp.getRequest();
outstandingRequests.add(r);
lastRequest = r;
if (shouldRequest(r.len))
this.notifyAll();
return true;
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Got dup from coord: " + pp);
pp.release();
}
}
// failsafe
// However this is bad as it thrashes the peer when we change our mind
// Ticket 691 cause here?
if (outstandingRequests.isEmpty())
lastRequest = null;
/*
// If we are not in the end game, we may run out of things to request
// because we are asking other peers. Set not-interesting now rather than
// wait for those other requests to be satisfied via havePiece()
if (interesting && lastRequest == null) {
interesting = false;
out.sendInterest(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug(peer + " nothing more to request, now uninteresting");
}
*/
return false;
}
/**
* @return all pieces we are currently requesting, or empty Set
*/
private synchronized Set<Integer> getRequestedPieces() {
Set<Integer> rv = new HashSet<Integer>(outstandingRequests.size() + 1);
for (Request req : outstandingRequests) {
rv.add(Integer.valueOf(req.getPiece()));
}
return rv;
}
/**
* @return index in outstandingRequests or -1
*/
private synchronized int getFirstOutstandingRequest(int piece) {
for (int i = 0; i < outstandingRequests.size(); i++) {
if (outstandingRequests.get(i).getPiece() == piece)
return i;
}
return -1;
}
private synchronized List<Request> returnPartialPieces() {
Set<Integer> pcs = getRequestedPieces();
List<Request> rv = new ArrayList<Request>(pcs.size());
for (Integer p : pcs) {
Request req = getLowestOutstandingRequest(p.intValue());
if (req != null)
rv.add(req);
}
outstandingRequests.clear();
return rv;
}
private synchronized Request getLowestOutstandingRequest(int piece) {
Request rv = null;
int lowest = Integer.MAX_VALUE;
for (Request r : outstandingRequests) {
if (r.getPiece() == piece && r.off < lowest) {
lowest = r.off;
rv = r;
}
}
return rv;
}
// EepGet status listeners to maintain the state for the web page
public void bytesTransferred(long alreadyTransferred, int currentWrite, long bytesTransferred, long bytesRemaining, String url) {
lastRcvd = System.currentTimeMillis();
}
public void attemptFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt, int numRetries, Exception cause) {}
public void transferComplete(long alreadyTransferred, long bytesTransferred, long bytesRemaining, String url, String outputFile, boolean notModified) {}
public void transferFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt) {}
public void headerReceived(String url, int attemptNum, String key, String val) {}
public void attempting(String url) {}
// End of EepGet status listeners
}