package org.klomp.snark; import java.io.ByteArrayOutputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.util.Arrays; import java.util.ArrayList; import java.util.Iterator; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketEepGet; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.EepGet; import net.i2p.util.Log; /** * BEP 19. * Does not have an associated PeerState. * All request tracking is done here. * @since 0.9.49 */ class WebPeer extends Peer implements EepGet.StatusListener { private final PeerCoordinator _coordinator; private final URI _uri; // as received from coordinator private final List<Request> outstandingRequests = new ArrayList<Request>(); private final boolean isMultiFile; // needed? private Request lastRequest; private PeerListener listener; private BitField bitfield; private Thread thread; private boolean connected; private long lastRcvd; private int maxRequests; // to be recognized by the UI public static final byte[] IDBytes = DataHelper.getASCII("WebSeedBEP19"); private static final long HEADER_TIMEOUT = 60*1000; private static final long TOTAL_TIMEOUT = 10*60*1000; private static final long INACTIVITY_TIMEOUT = 2*60*1000; private static final long TARGET_FETCH_TIME = 2*60*1000; // 128 KB private static final int ABSOLUTE_MIN_REQUESTS = 8; // 2 MB private static final int ABSOLUTE_MAX_REQUESTS = 128; private final int MIN_REQUESTS; private final int MAX_REQUESTS; /** * Outgoing connection. * Creates a disconnected peer given a PeerID, your own id and the * relevant MetaInfo. * @param uri must be http with .i2p host * @param metainfo non-null */ public WebPeer(PeerCoordinator coord, URI uri, PeerID peerID, MetaInfo metainfo) { super(peerID, null, null, metainfo); // no use asking for more than the number of chunks in a piece MAX_REQUESTS = Math.max(1, Math.min(ABSOLUTE_MAX_REQUESTS, metainfo.getPieceLength(0) / PeerState.PARTSIZE)); MIN_REQUESTS = Math.min(ABSOLUTE_MIN_REQUESTS, MAX_REQUESTS); maxRequests = MIN_REQUESTS; isMultiFile = metainfo.getLengths() != null; _coordinator = coord; // We'll assume the base path is already encoded, because // it would have failed the checks in TrackerClient.getHostHash() _uri = uri; } @Override public String toString() { return "WebSeed " + _uri; } /** * @return socket debug string (for debug printing) */ @Override public synchronized String getSocket() { return toString() + ' ' + outstandingRequests.toString(); } /** * The hash code of a Peer is the hash code of the peerID. */ @Override public int hashCode() { return super.hashCode(); } /** * Two Peers are equal when they have the same PeerID. * All other properties are ignored. */ @Override public boolean equals(Object o) { if (o instanceof WebPeer) { WebPeer p = (WebPeer)o; // TODO return getPeerID().equals(p.getPeerID()); } return false; } /** * Runs the connection to the other peer. This method does not * return until the connection is terminated. * * @param ignore our bitfield, ignore * @param uploadOnly if we are complete with skipped files, i.e. a partial seed */ @Override public void runConnection(I2PSnarkUtil util, PeerListener listener, BandwidthListener bwl, BitField ignore, MagnetState mState, boolean uploadOnly) { if (uploadOnly) return; int fails = 0; int successes = 0; long dl = 0; boolean notify = true; ByteArrayOutputStream out = null; // current requests per-loop List<Request> requests = new ArrayList<Request>(8); try { if (!util.connected()) { boolean ok = util.connect(); if (!ok) return; } // This breaks out of the loop after any failure. TrackerClient will requeue eventually. loop: while (true) { I2PSocketManager mgr = util.getSocketManager(); if (mgr == null) return; if (notify) { synchronized(this) { this.listener = listener; bitfield = new BitField(metainfo.getPieces()); bitfield.setAll(); thread = Thread.currentThread(); connected = true; } listener.connected(this); boolean want = listener.gotBitField(this, bitfield); if (!want) return; listener.gotChoke(this, false); notify = false; } synchronized(this) { // clear out previous requests if (!requests.isEmpty()) { outstandingRequests.removeAll(requests); requests.clear(); } addRequest(); if (_log.shouldDebug()) _log.debug("Requests: " + outstandingRequests); while (outstandingRequests.isEmpty()) { if (_coordinator.getNeededLength() <= 0) { if (_log.shouldDebug()) _log.debug("Complete: " + this); break loop; } if (_log.shouldDebug()) _log.debug("No requests, sleeping: " + this); connected = false; out = null; try { this.wait(); } catch (InterruptedException ie) { if (_log.shouldWarn()) _log.warn("Interrupted: " + this, ie); break loop; } } connected = true; // Add current requests from outstandingRequests list and add to requests list. // Do not remove from outstandingRequests until success. lastRequest = outstandingRequests.get(0); requests.add(lastRequest); int piece = lastRequest.getPiece(); // Glue together additional requests if consecutive for a single piece. // This will never glue together requests from different pieces, // and the coordinator generally won't give us consecutive pieces anyway. // Servers generally won't support multiple byte ranges anymore. for (int i = 1; i < outstandingRequests.size(); i++) { if (i >= maxRequests) break; Request r = outstandingRequests.get(i); if (!shouldRequest(r.len)) break; if (r.getPiece() == piece && lastRequest.off + lastRequest.len == r.off) { requests.add(r); lastRequest = r; } else { // all requests for a piece should be together, but not in practice // as orphaned requests can get in-between //break; } } } // total values Request first = requests.get(0); Request last = requests.get(requests.size() - 1); int piece = first.getPiece(); int off = first.off; long toff = (((long) piece) * metainfo.getPieceLength(0)) + off; int tlen = (last.off - first.off) + last.len; long start = System.currentTimeMillis(); ///// TODO direct to file, not in-memory if (out == null) out = new ByteArrayOutputStream(tlen); else out.reset(); int filenum = -1; // Loop for each file if multifile and crosses file boundaries. // Once only for single file. while (out.size() < tlen) { // need these three things: // url to fetch String url; // offset in fetched file long foff; // length to fetch, will be adjusted if crossing a file boundary int flen = tlen - out.size(); if (isMultiFile) { // multifile List<Long> lengths = metainfo.getLengths(); long limit = 0; if (filenum < 0) { // find the first file number and limit // inclusive long fstart = 0; // exclusive long fend = 0; foff = 0; // keep compiler happy, will always be re-set for (int f = 0; f < lengths.size(); f++) { long filelen = lengths.get(f).longValue(); fend = fstart + filelen; if (toff < fend) { filenum = f; foff = toff - fstart; limit = fend - toff; break; } fstart += filelen; } if (filenum < 0) throw new IllegalStateException(lastRequest.toString()); } else { // next file filenum++; foff = 0; limit = lengths.get(filenum).longValue(); } if (limit > 0 && flen > limit) flen = (int) limit; if (metainfo.isPaddingFile(filenum)) { for (int i = 0; i < flen; i++) { out.write((byte) 0); } if (_log.shouldDebug()) _log.debug("Skipped padding file " + filenum); continue; } // build url String uri = _uri.toString(); StringBuilder buf = new StringBuilder(uri.length() + 128); buf.append(uri); if (!uri.endsWith("/")) buf.append('/'); // See BEP 19 rules URIUtil.encodePath(buf, metainfo.getName()); List<String> path = metainfo.getFiles().get(filenum); for (int i = 0; i < path.size(); i++) { buf.append('/'); URIUtil.encodePath(buf, path.get(i)); } url = buf.toString(); } else { // single file // See BEP 19 rules String uri = _uri.toString(); if (uri.endsWith("/")) url = uri + URIUtil.encodePath(metainfo.getName()); else url = uri; foff = toff; flen = tlen; } // do the fetch EepGet get = new I2PSocketEepGet(util.getContext(), mgr, 0, flen, flen, null, out, url); get.addHeader("User-Agent", I2PSnarkUtil.EEPGET_USER_AGENT); get.addHeader("Range", "bytes=" + foff + '-' + (foff + flen - 1)); get.addStatusListener(this); int osz = out.size(); if (_log.shouldDebug()) _log.debug("Fetching piece: " + piece + " offset: " + off + " file offset: " + foff + " len: " + flen + " from " + url); if (get.fetch(HEADER_TIMEOUT, TOTAL_TIMEOUT, INACTIVITY_TIMEOUT)) { int resp = get.getStatusCode(); if (resp != 200 && resp != 206) { fail(url, resp); return; } int sz = out.size() - osz; if (sz != flen) { if (_log.shouldWarn()) _log.warn("Fetch of " + url + " received: " + sz + " expected: " + flen); return; } } else { if (out.size() > 0) { // save any complete chunks received DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray())); for (Iterator<Request> iter = requests.iterator(); iter.hasNext(); ) { Request req = iter.next(); if (dis.available() < req.len) break; req.read(dis, this); iter.remove(); if (_log.shouldWarn()) _log.warn("Saved chunk " + req + " recvd before failure"); } } int resp = get.getStatusCode(); fail(url, resp); return; } successes++; dl += flen; if (!isMultiFile) break; } // for each file // all data received successfully, now process it if (_log.shouldDebug()) _log.debug("Fetch of piece: " + piece + " chunks: " + requests.size() + " offset: " + off + " torrent offset: " + toff + " len: " + tlen + " successful"); DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray())); for (Request req : requests) { req.read(dis, this); } PartialPiece pp = last.getPartialPiece(); synchronized(pp) { // Last chunk needed for this piece? if (pp.isComplete()) { if (listener.gotPiece(this, pp)) { if (_log.shouldDebug()) _log.debug("Got " + piece + ": " + this); } else { if (_log.shouldWarn()) _log.warn("Got BAD " + piece + " from " + this); return; } } else { // piece not complete } } long time = lastRcvd - start; if (time < TARGET_FETCH_TIME) maxRequests = Math.min(MAX_REQUESTS, 2 * maxRequests); else if (time > 2 * TARGET_FETCH_TIME) maxRequests = Math.max(MIN_REQUESTS, maxRequests / 2); } // request loop } catch(IOException eofe) { if (_log.shouldWarn()) _log.warn(toString(), eofe); } finally { List<Request> pcs = returnPartialPieces(); synchronized(this) { connected = false; outstandingRequests.clear(); } requests.clear(); if (!pcs.isEmpty()) listener.savePartialPieces(this, pcs); listener.disconnected(this); disconnect(); if (_log.shouldWarn()) _log.warn("Completed, successful fetches: " + successes + " downloaded: " + dl + " for " + this); } } private void fail(String url, int resp) { if (_log.shouldWarn()) _log.warn("Fetch of " + url + " failed, rc: " + resp); if (resp == 301 || resp == 308 || resp == 401 || resp == 403 || resp == 404 || resp == 410 || resp == 414 || resp == 416 || resp == 451) { // ban forever _coordinator.banWebPeer(_uri.getHost(), true); if (_log.shouldWarn()) _log.warn("Permanently banning the webseed " + url); } else if (resp == 429 || resp == 503) { // ban for a while _coordinator.banWebPeer(_uri.getHost(), false); if (_log.shouldWarn()) _log.warn("Temporarily banning the webseed " + url); } } @Override public int getMaxPipeline() { return maxRequests; } @Override public boolean isConnected() { synchronized(this) { return connected; } } @Override synchronized void disconnect() { if (thread != null) thread.interrupt(); } @Override public void have(int piece) {} @Override void cancel(int piece) {} @Override void request() { addRequest(); } @Override public boolean isInterested() { return false; } @Deprecated @Override public void setInteresting(boolean interest) {} @Override public boolean isInteresting() { return true; } @Override public void setChoking(boolean choke) {} @Override public boolean isChoking() { return false; } @Override public boolean isChoked() { return false; } @Override public long getInactiveTime() { if (lastRcvd <= 0) return -1; long now = System.currentTimeMillis(); return now - lastRcvd; } @Override public long getMaxInactiveTime() { return PeerCoordinator.MAX_INACTIVE; } @Override public void keepAlive() {} @Override public void retransmitRequests() {} @Override public int completed() { return metainfo.getPieces(); } @Override public boolean isCompleted() { return true; } /** * @return true * @since 0.9.49 */ @Override public boolean isWebPeer() { return false; } // begin BandwidthListener interface overrides // Because super doesn't have a PeerState /** * @since 0.9.62 */ @Override public void downloaded(int size) { super.downloaded(size); _coordinator.downloaded(size); } /** * Should we request this many bytes? * @since 0.9.62 */ @Override public boolean shouldRequest(int size) { return _coordinator.shouldRequest(this, size); } // end BandwidthListener interface overrides // private methods below here implementing parts of PeerState private synchronized void addRequest() { boolean more_pieces = true; while (more_pieces) { more_pieces = outstandingRequests.size() < getMaxPipeline(); // We want something and we don't have outstanding requests? if (more_pieces && lastRequest == null) { // we have nothing in the queue right now more_pieces = requestNextPiece(); } else if (more_pieces) { // We want something int pieceLength; boolean isLastChunk; pieceLength = metainfo.getPieceLength(lastRequest.getPiece()); isLastChunk = lastRequest.off + lastRequest.len == pieceLength; // Last part of a piece? if (isLastChunk) { more_pieces = requestNextPiece(); } else { PartialPiece nextPiece = lastRequest.getPartialPiece(); int nextBegin = lastRequest.off + PeerState.PARTSIZE; int maxLength = pieceLength - nextBegin; int nextLength = maxLength > PeerState.PARTSIZE ? PeerState.PARTSIZE : maxLength; Request req = new Request(nextPiece,nextBegin, nextLength); outstandingRequests.add(req); lastRequest = req; if (shouldRequest(maxLength)) this.notifyAll(); } } } } /** * Starts requesting first chunk of next piece. Returns true if * something has been added to the requests, false otherwise. */ private synchronized boolean requestNextPiece() { // Check for adopting an orphaned partial piece PartialPiece pp = listener.getPartialPiece(this, bitfield); if (pp != null) { // Double-check that r not already in outstandingRequests if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) { Request r = pp.getRequest(); outstandingRequests.add(r); lastRequest = r; if (shouldRequest(r.len)) this.notifyAll(); return true; } else { if (_log.shouldLog(Log.WARN)) _log.warn("Got dup from coord: " + pp); pp.release(); } } // failsafe // However this is bad as it thrashes the peer when we change our mind // Ticket 691 cause here? if (outstandingRequests.isEmpty()) lastRequest = null; /* // If we are not in the end game, we may run out of things to request // because we are asking other peers. Set not-interesting now rather than // wait for those other requests to be satisfied via havePiece() if (interesting && lastRequest == null) { interesting = false; out.sendInterest(false); if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " nothing more to request, now uninteresting"); } */ return false; } /** * @return all pieces we are currently requesting, or empty Set */ private synchronized Set<Integer> getRequestedPieces() { Set<Integer> rv = new HashSet<Integer>(outstandingRequests.size() + 1); for (Request req : outstandingRequests) { rv.add(Integer.valueOf(req.getPiece())); } return rv; } /** * @return index in outstandingRequests or -1 */ private synchronized int getFirstOutstandingRequest(int piece) { for (int i = 0; i < outstandingRequests.size(); i++) { if (outstandingRequests.get(i).getPiece() == piece) return i; } return -1; } private synchronized List<Request> returnPartialPieces() { Set<Integer> pcs = getRequestedPieces(); List<Request> rv = new ArrayList<Request>(pcs.size()); for (Integer p : pcs) { Request req = getLowestOutstandingRequest(p.intValue()); if (req != null) rv.add(req); } outstandingRequests.clear(); return rv; } private synchronized Request getLowestOutstandingRequest(int piece) { Request rv = null; int lowest = Integer.MAX_VALUE; for (Request r : outstandingRequests) { if (r.getPiece() == piece && r.off < lowest) { lowest = r.off; rv = r; } } return rv; } // EepGet status listeners to maintain the state for the web page public void bytesTransferred(long alreadyTransferred, int currentWrite, long bytesTransferred, long bytesRemaining, String url) { lastRcvd = System.currentTimeMillis(); } public void attemptFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt, int numRetries, Exception cause) {} public void transferComplete(long alreadyTransferred, long bytesTransferred, long bytesRemaining, String url, String outputFile, boolean notModified) {} public void transferFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt) {} public void headerReceived(String url, int attemptNum, String key, String val) {} public void attempting(String url) {} // End of EepGet status listeners }