I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
WebPeer.java 23.96 KiB
package org.klomp.snark;

import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketEepGet;
import net.i2p.client.streaming.I2PSocketManager;
import net.i2p.data.DataHelper;
import net.i2p.data.Destination;
import net.i2p.util.EepGet;
import net.i2p.util.Log;

/**
 *  BEP 19.
 *  Does not have an associated PeerState.
 *  All request tracking is done here.
 *  @since 0.9.49
 */
class WebPeer extends Peer implements EepGet.StatusListener {

  private final PeerCoordinator _coordinator;
  private final URI _uri;
  // as received from coordinator
  private final List<Request> outstandingRequests = new ArrayList<Request>();
  private final boolean isMultiFile;
  // needed?
  private Request lastRequest;
  private PeerListener listener;
  private BitField bitfield;
  private Thread thread;
  private boolean connected;
  private long lastRcvd;
  private int maxRequests;

  // to be recognized by the UI
  public static final byte[] IDBytes = DataHelper.getASCII("WebSeedBEP19");
  private static final long HEADER_TIMEOUT = 60*1000;
  private static final long TOTAL_TIMEOUT = 10*60*1000;
  private static final long INACTIVITY_TIMEOUT = 2*60*1000;
  private static final long TARGET_FETCH_TIME = 2*60*1000;
  // 128 KB
  private static final int ABSOLUTE_MIN_REQUESTS = 8;
  // 2 MB
  private static final int ABSOLUTE_MAX_REQUESTS = 128;
  private final int MIN_REQUESTS;
  private final int MAX_REQUESTS;

  /**
   * Outgoing connection.
   * Creates a disconnected peer given a PeerID, your own id and the
   * relevant MetaInfo.
   * @param uri must be http with .i2p host
   * @param metainfo non-null
   */
  public WebPeer(PeerCoordinator coord, URI uri, PeerID peerID, MetaInfo metainfo) {
      super(peerID, null, null, metainfo);
      // no use asking for more than the number of chunks in a piece
      MAX_REQUESTS = Math.max(1, Math.min(ABSOLUTE_MAX_REQUESTS, metainfo.getPieceLength(0) / PeerState.PARTSIZE));
      MIN_REQUESTS = Math.min(ABSOLUTE_MIN_REQUESTS, MAX_REQUESTS);
      maxRequests = MIN_REQUESTS;
      isMultiFile = metainfo.getLengths() != null;
      _coordinator = coord;
      // We'll assume the base path is already encoded, because
      // it would have failed the checks in TrackerClient.getHostHash()
      _uri = uri;
  }

  @Override
  public String toString() {
      return "WebSeed " + _uri;
  }

  /**
   * @return socket debug string (for debug printing)
   */
  @Override
  public synchronized String getSocket() {
      return toString() + ' ' + outstandingRequests.toString();
  }

  /**
   * The hash code of a Peer is the hash code of the peerID.
   */
  @Override
  public int hashCode() {
      return super.hashCode();
  }

  /**
   * Two Peers are equal when they have the same PeerID.
   * All other properties are ignored.
   */
  @Override
  public boolean equals(Object o) {
      if (o instanceof WebPeer) {
          WebPeer p = (WebPeer)o;
          // TODO
          return getPeerID().equals(p.getPeerID());
      }
      return false;
  }

  /**
   * Runs the connection to the other peer. This method does not
   * return until the connection is terminated.
   *
   * @param ignore our bitfield, ignore
   * @param uploadOnly if we are complete with skipped files, i.e. a partial seed
   */
  @Override
  public void runConnection(I2PSnarkUtil util, PeerListener listener, BandwidthListener bwl, BitField ignore,
                            MagnetState mState, boolean uploadOnly) {
      if (uploadOnly)
          return;
      int fails = 0;
      int successes = 0;
      long dl = 0;
      boolean notify = true;
      ByteArrayOutputStream out = null;
      // current requests per-loop
      List<Request> requests = new ArrayList<Request>(8);
      try {
          if (!util.connected()) {
              boolean ok = util.connect();
              if (!ok)
                  return;
          }

          // This breaks out of the loop after any failure. TrackerClient will requeue eventually.
          loop:
          while (true) {
              I2PSocketManager mgr = util.getSocketManager();
              if (mgr == null)
                  return;
              if (notify) {
                  synchronized(this) {
                      this.listener = listener;
                      bitfield = new BitField(metainfo.getPieces());
                      bitfield.setAll();
                      thread = Thread.currentThread();
                      connected = true;
                  }
                  listener.connected(this);
                  boolean want = listener.gotBitField(this, bitfield);
                  if (!want)
                      return;
                  listener.gotChoke(this, false);
                  notify = false;
              }

              synchronized(this) {
                  // clear out previous requests
                  if (!requests.isEmpty()) {
                      outstandingRequests.removeAll(requests);
                      requests.clear();
                  }
                  addRequest();
                  if (_log.shouldDebug())
                      _log.debug("Requests: " + outstandingRequests);
                  while (outstandingRequests.isEmpty()) {
                      if (_coordinator.getNeededLength() <= 0) {
                          if (_log.shouldDebug())
                              _log.debug("Complete: " + this);
                          break loop;
                      }
                      if (_log.shouldDebug())
                          _log.debug("No requests, sleeping: " + this);
                      connected = false;
                      out = null;
                      try {
                          this.wait();
                      } catch (InterruptedException ie) {
                          if (_log.shouldWarn())
                              _log.warn("Interrupted: " + this, ie);
                          break loop;
                      }
                  }
                  connected = true;
                  // Add current requests from outstandingRequests list and add to requests list.
                  // Do not remove from outstandingRequests until success.
                  lastRequest = outstandingRequests.get(0);
                  requests.add(lastRequest);
                  int piece = lastRequest.getPiece();

                  // Glue together additional requests if consecutive for a single piece.
                  // This will never glue together requests from different pieces,
                  // and the coordinator generally won't give us consecutive pieces anyway.
                  // Servers generally won't support multiple byte ranges anymore.
                  for (int i = 1; i < outstandingRequests.size(); i++) {
                      if (i >= maxRequests)
                          break;
                      Request r = outstandingRequests.get(i);
                      if (!shouldRequest(r.len))
                          break;
                      if (r.getPiece() == piece &&
                          lastRequest.off + lastRequest.len == r.off) {
                          requests.add(r);
                          lastRequest = r;
                      } else {
                          // all requests for a piece should be together, but not in practice
                          // as orphaned requests can get in-between
                          //break;
                      }
                  }
              }

              // total values
              Request first = requests.get(0);
              Request last = requests.get(requests.size() - 1);
              int piece = first.getPiece();
              int off = first.off;
              long toff = (((long) piece) * metainfo.getPieceLength(0)) + off;
              int tlen = (last.off - first.off) + last.len;
              long start = System.currentTimeMillis();
              ///// TODO direct to file, not in-memory
              if (out == null)
                  out = new ByteArrayOutputStream(tlen);
              else
                  out.reset();
              int filenum = -1;

              // Loop for each file if multifile and crosses file boundaries.
              // Once only for single file.
              while (out.size() < tlen) {

                  // need these three things:
                  // url to fetch
                  String url;
                  // offset in fetched file
                  long foff;
                  // length to fetch, will be adjusted if crossing a file boundary
                  int flen = tlen - out.size();

                  if (isMultiFile) {
                      // multifile
                      List<Long> lengths = metainfo.getLengths();
                      long limit = 0;
                      if (filenum < 0) {
                          // find the first file number and limit
                          // inclusive
                          long fstart = 0;
                          // exclusive
                          long fend = 0;
                          foff = 0; // keep compiler happy, will always be re-set
                          for (int f = 0; f < lengths.size(); f++) {
                              long filelen = lengths.get(f).longValue();
                              fend = fstart + filelen;
                              if (toff < fend) {
                                  filenum = f;
                                  foff = toff - fstart;
                                  limit = fend - toff;
                                  break;
                              }
                              fstart += filelen;
                          }
                          if (filenum < 0)
                              throw new IllegalStateException(lastRequest.toString());
                      } else {
                          // next file
                          filenum++;
                          foff = 0;
                          limit = lengths.get(filenum).longValue();
                      }

                      if (limit > 0 && flen > limit)
                          flen = (int) limit;

                      if (metainfo.isPaddingFile(filenum)) {
                          for (int i = 0; i < flen; i++) {
                              out.write((byte) 0);
                          }
                          if (_log.shouldDebug())
                              _log.debug("Skipped padding file " + filenum);
                          continue;
                      }

                      // build url
                      String uri = _uri.toString();
                      StringBuilder buf = new StringBuilder(uri.length() + 128);
                      buf.append(uri);
                      if (!uri.endsWith("/"))
                          buf.append('/');
                      // See BEP 19 rules
                      URIUtil.encodePath(buf, metainfo.getName());
                      List<String> path = metainfo.getFiles().get(filenum);
                      for (int i = 0; i < path.size(); i++) {
                           buf.append('/');
                           URIUtil.encodePath(buf, path.get(i));
                      }
                      url = buf.toString();
                  } else {
                      // single file
                      // See BEP 19 rules
                      String uri = _uri.toString();
                      if (uri.endsWith("/"))
                          url = uri + URIUtil.encodePath(metainfo.getName());
                      else
                          url = uri;
                      foff = toff;
                      flen = tlen;
                  }

                  // do the fetch
                  EepGet get = new I2PSocketEepGet(util.getContext(), mgr, 0, flen, flen, null, out, url);
                  get.addHeader("User-Agent", I2PSnarkUtil.EEPGET_USER_AGENT);
                  get.addHeader("Range", "bytes=" + foff + '-' + (foff + flen - 1));
                  get.addStatusListener(this);
                  int osz = out.size();
                  if (_log.shouldDebug())
                      _log.debug("Fetching piece: " + piece + " offset: " + off + " file offset: " + foff + " len: " + flen + " from " + url);
                  if (get.fetch(HEADER_TIMEOUT, TOTAL_TIMEOUT, INACTIVITY_TIMEOUT)) {
                      int resp = get.getStatusCode();
                      if (resp != 200 && resp != 206) {
                          fail(url, resp);
                          return;
                      }
                      int sz = out.size() - osz;
                      if (sz != flen) {
                          if (_log.shouldWarn())
                              _log.warn("Fetch of " + url + " received: " + sz + " expected: " + flen);
                          return;
                      }
                  } else {
                      if (out.size() > 0) {
                          // save any complete chunks received
                          DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
                          for (Iterator<Request> iter = requests.iterator(); iter.hasNext(); ) {
                              Request req = iter.next();
                              if (dis.available() < req.len)
                                  break;
                              req.read(dis, this);
                              iter.remove();
                              if (_log.shouldWarn())
                                  _log.warn("Saved chunk " + req + " recvd before failure");
                          }
                      }
                      int resp = get.getStatusCode();
                      fail(url, resp);
                      return;
                  }

                  successes++;
                  dl += flen;

                  if (!isMultiFile)
                      break;
              } // for each file

              // all data received successfully, now process it
              if (_log.shouldDebug())
                  _log.debug("Fetch of piece: " + piece + " chunks: " + requests.size() + " offset: " + off + " torrent offset: " + toff + " len: " + tlen + " successful");
              DataInputStream dis = new DataInputStream(new ByteArrayInputStream(out.toByteArray()));
              for (Request req : requests) {
                  req.read(dis, this);
              }

              PartialPiece pp = last.getPartialPiece();
              synchronized(pp) {
                  // Last chunk needed for this piece?
                  if (pp.isComplete()) {
                      if (listener.gotPiece(this, pp)) {
                          if (_log.shouldDebug())
                              _log.debug("Got " + piece + ": " + this);
                      } else {
                          if (_log.shouldWarn())
                              _log.warn("Got BAD " + piece + " from " + this);
                          return;
                      }
                  } else {
                      // piece not complete
                  }
              }

              long time = lastRcvd - start;
              if (time < TARGET_FETCH_TIME)
                  maxRequests = Math.min(MAX_REQUESTS, 2 * maxRequests);
              else if (time >  2 * TARGET_FETCH_TIME)
                  maxRequests = Math.max(MIN_REQUESTS, maxRequests / 2);
          } // request loop
      } catch(IOException eofe) {
          if (_log.shouldWarn())
              _log.warn(toString(), eofe);
      } finally {
          List<Request> pcs = returnPartialPieces();
          synchronized(this) {
              connected = false;
              outstandingRequests.clear();
          }
          requests.clear();
          if (!pcs.isEmpty())
              listener.savePartialPieces(this, pcs);
          listener.disconnected(this);
          disconnect();
          if (_log.shouldWarn())
              _log.warn("Completed, successful fetches: " + successes + " downloaded: " + dl + " for " + this);
      }
  }

  private void fail(String url, int resp) {
      if (_log.shouldWarn())
          _log.warn("Fetch of " + url + " failed, rc: " + resp);
      if (resp == 301 || resp == 308 ||
          resp == 401 || resp == 403 || resp == 404 || resp == 410 || resp == 414 || resp == 416 || resp == 451) {
          // ban forever
          _coordinator.banWebPeer(_uri.getHost(), true);
          if (_log.shouldWarn())
              _log.warn("Permanently banning the webseed " + url);
      } else if (resp == 429 || resp == 503) {
          // ban for a while
          _coordinator.banWebPeer(_uri.getHost(), false);
          if (_log.shouldWarn())
              _log.warn("Temporarily banning the webseed " + url);
      }
  }

  @Override
  public int getMaxPipeline() {
      return maxRequests;
  }

  @Override
  public boolean isConnected() {
      synchronized(this) {
          return connected;
      }
  }

  @Override
  synchronized void disconnect() {
      if (thread != null)
          thread.interrupt();
  }

  @Override
  public void have(int piece) {}

  @Override
  void cancel(int piece) {}

  @Override
  void request() {
      addRequest();
  }

  @Override
  public boolean isInterested() {
      return false;
  }

  @Deprecated
  @Override
  public void setInteresting(boolean interest) {}

  @Override
  public boolean isInteresting() {
      return true;
  }

  @Override
  public void setChoking(boolean choke) {}

  @Override
  public boolean isChoking() {
      return false;
  }

  @Override
  public boolean isChoked() {
      return false;
  }
  
  @Override
  public long getInactiveTime() {
      if (lastRcvd <= 0)
          return -1;
      long now = System.currentTimeMillis();
      return now - lastRcvd;
  }

  @Override
  public long getMaxInactiveTime() {
      return PeerCoordinator.MAX_INACTIVE;
  }

  @Override
  public void keepAlive() {}

  @Override
  public void retransmitRequests() {}

  @Override
  public int completed() {
      return metainfo.getPieces();
  }

  @Override
  public boolean isCompleted() {
      return true;
  }

  /**
   * @return true
   * @since 0.9.49
   */
  @Override
  public boolean isWebPeer() {
      return false;
  }

  // begin BandwidthListener interface overrides
  // Because super doesn't have a PeerState

  /**
   * @since 0.9.62
   */
  @Override
  public void downloaded(int size) {
      super.downloaded(size);
      _coordinator.downloaded(size);
  }

  /**
   * Should we request this many bytes?
   * @since 0.9.62
   */
  @Override
  public boolean shouldRequest(int size) {
      return _coordinator.shouldRequest(this, size);
  }

  // end BandwidthListener interface overrides

  // private methods below here implementing parts of PeerState

  private synchronized void addRequest() {
      boolean more_pieces = true;
      while (more_pieces) {
          more_pieces = outstandingRequests.size() < getMaxPipeline();
          // We want something and we don't have outstanding requests?
          if (more_pieces && lastRequest == null) {
              // we have nothing in the queue right now
              more_pieces = requestNextPiece();
          } else if (more_pieces) {
              // We want something
              int pieceLength;
              boolean isLastChunk;
              pieceLength = metainfo.getPieceLength(lastRequest.getPiece());
              isLastChunk = lastRequest.off + lastRequest.len == pieceLength;

              // Last part of a piece?
              if (isLastChunk) {
                  more_pieces = requestNextPiece();
              } else {
                  PartialPiece nextPiece = lastRequest.getPartialPiece();
                  int nextBegin = lastRequest.off + PeerState.PARTSIZE;
                  int maxLength = pieceLength - nextBegin;
                  int nextLength = maxLength > PeerState.PARTSIZE ? PeerState.PARTSIZE
                                                        : maxLength;
                  Request req = new Request(nextPiece,nextBegin, nextLength);
                  outstandingRequests.add(req);
                  lastRequest = req;
                  if (shouldRequest(maxLength))
                      this.notifyAll();
              }
          }
      }
  }

  /**
   * Starts requesting first chunk of next piece. Returns true if
   * something has been added to the requests, false otherwise.
   */
  private synchronized boolean requestNextPiece() {
      // Check for adopting an orphaned partial piece
      PartialPiece pp = listener.getPartialPiece(this, bitfield);
      if (pp != null) {
          // Double-check that r not already in outstandingRequests
          if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) {
              Request r = pp.getRequest();
              outstandingRequests.add(r);
              lastRequest = r;
              if (shouldRequest(r.len))
                  this.notifyAll();
              return true;
          } else {
              if (_log.shouldLog(Log.WARN))
                  _log.warn("Got dup from coord: " + pp);
              pp.release();
          }
      }

      // failsafe
      // However this is bad as it thrashes the peer when we change our mind
      // Ticket 691 cause here?
      if (outstandingRequests.isEmpty())
          lastRequest = null;

/*
      // If we are not in the end game, we may run out of things to request
      // because we are asking other peers. Set not-interesting now rather than
      // wait for those other requests to be satisfied via havePiece()
      if (interesting && lastRequest == null) {
          interesting = false;
          out.sendInterest(false);
          if (_log.shouldLog(Log.DEBUG))
              _log.debug(peer + " nothing more to request, now uninteresting");
      }
*/
      return false;
  }

  /**
   * @return all pieces we are currently requesting, or empty Set
   */
  private synchronized Set<Integer> getRequestedPieces() {
      Set<Integer> rv = new HashSet<Integer>(outstandingRequests.size() + 1);
      for (Request req : outstandingRequests) {
          rv.add(Integer.valueOf(req.getPiece()));
      }
      return rv;
  }

  /**
   *  @return index in outstandingRequests or -1
   */
  private synchronized int getFirstOutstandingRequest(int piece) {
      for (int i = 0; i < outstandingRequests.size(); i++) {
          if (outstandingRequests.get(i).getPiece() == piece)
              return i;
      }
      return -1;
  }

  private synchronized List<Request> returnPartialPieces() {
      Set<Integer> pcs = getRequestedPieces();
      List<Request> rv = new ArrayList<Request>(pcs.size());
      for (Integer p : pcs) {
          Request req = getLowestOutstandingRequest(p.intValue());
          if (req != null)
              rv.add(req);
      }
      outstandingRequests.clear();
      return rv;
  }

  private synchronized Request getLowestOutstandingRequest(int piece) {
      Request rv = null;
      int lowest = Integer.MAX_VALUE;
      for (Request r :  outstandingRequests) {
          if (r.getPiece() == piece && r.off < lowest) {
              lowest = r.off;
              rv = r;
          }
      }
      return rv;
  }

    // EepGet status listeners to maintain the state for the web page

    public void bytesTransferred(long alreadyTransferred, int currentWrite, long bytesTransferred, long bytesRemaining, String url) {
        lastRcvd = System.currentTimeMillis();
    }

    public void attemptFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt, int numRetries, Exception cause) {}
    public void transferComplete(long alreadyTransferred, long bytesTransferred, long bytesRemaining, String url, String outputFile, boolean notModified) {}
    public void transferFailed(String url, long bytesTransferred, long bytesRemaining, int currentAttempt) {}
    public void headerReceived(String url, int attemptNum, String key, String val) {}
    public void attempting(String url) {}

    // End of EepGet status listeners
}