diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 6f17a810aba46d7520ff2636a2bb6beaf53b5cc2..195659c50fbcdf42cb01271d4ce23381aa39c9cb 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -163,6 +163,10 @@ public class I2PSnarkUtil { opts.setProperty("inbound.nickname", "I2PSnark"); if (opts.getProperty("outbound.nickname") == null) opts.setProperty("outbound.nickname", "I2PSnark"); + // Dont do this for now, it is set in I2PSocketEepGet for announces, + // we don't need fast handshake for peer connections. + //if (opts.getProperty("i2p.streaming.connectDelay") == null) + // opts.setProperty("i2p.streaming.connectDelay", "500"); if (opts.getProperty("i2p.streaming.inactivityTimeout") == null) opts.setProperty("i2p.streaming.inactivityTimeout", "240000"); if (opts.getProperty("i2p.streaming.inactivityAction") == null) diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 8f24c864ac9a6c63e0bd8728cc5c587bd47933e0..07c83244082576f6eccda78c86b282892b5334a4 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -78,6 +78,7 @@ public class PeerCoordinator implements PeerListener private final CoordinatorListener listener; public I2PSnarkUtil _util; + private static final Random _random = I2PAppContext.getGlobalContext().random(); public String trackerProblems = null; public int trackerSeenPeers = 0; @@ -97,8 +98,7 @@ public class PeerCoordinator implements PeerListener // Install a timer to check the uploaders. // Randomize the first start time so multiple tasks are spread out, // this will help the behavior with global limits - Random r = I2PAppContext.getGlobalContext().random(); - timer.schedule(new PeerCheckerTask(_util, this), (CHECK_PERIOD / 2) + r.nextInt((int) CHECK_PERIOD), CHECK_PERIOD); + timer.schedule(new PeerCheckerTask(_util, this), (CHECK_PERIOD / 2) + _random.nextInt((int) CHECK_PERIOD), CHECK_PERIOD); } // only called externally from Storage after the double-check fails @@ -107,10 +107,16 @@ public class PeerCoordinator implements PeerListener // Make a list of pieces wantedPieces = new ArrayList(); BitField bitfield = storage.getBitField(); - for(int i = 0; i < metainfo.getPieces(); i++) - if (!bitfield.get(i)) - wantedPieces.add(new Piece(i)); - Collections.shuffle(wantedPieces); + int[] pri = storage.getPiecePriorities(); + for(int i = 0; i < metainfo.getPieces(); i++) { + if (!bitfield.get(i)) { + Piece p = new Piece(i); + if (pri != null) + p.setPriority(pri[i]); + wantedPieces.add(p); + } + } + Collections.shuffle(wantedPieces, _random); } public Storage getStorage() { return storage; } @@ -520,6 +526,9 @@ public class PeerCoordinator implements PeerListener while (piece == null && it.hasNext()) { Piece p = it.next(); + // sorted by priority, so when we hit a disabled piece we are done + if (p.isDisabled()) + break; if (havePieces.get(p.getId()) && !p.isRequested()) { piece = p; @@ -538,7 +547,7 @@ public class PeerCoordinator implements PeerListener if (wantedPieces.size() > END_GAME_THRESHOLD) return -1; // nothing to request and not in end game // let's not all get on the same piece - Collections.shuffle(requested); + Collections.shuffle(requested, _random); Iterator<Piece> it2 = requested.iterator(); while (piece == null && it2.hasNext()) { @@ -563,11 +572,64 @@ public class PeerCoordinator implements PeerListener _log.debug("parallel request (end game?) for " + peer + ": piece = " + piece); } } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Now requesting: piece " + piece + " priority " + piece.getPriority()); piece.setRequested(true); return piece.getId(); } } + /** + * Maps file priorities to piece priorities. + * Call after updating file priorities Storage.setPriority() + * @since 0.8.1 + */ + public void updatePiecePriorities() { + int[] pri = storage.getPiecePriorities(); + if (pri == null) + return; + synchronized(wantedPieces) { + // Add incomplete and previously unwanted pieces to the list + // Temp to avoid O(n**2) + BitField want = new BitField(pri.length); + for (Piece p : wantedPieces) { + want.set(p.getId()); + } + BitField bitfield = storage.getBitField(); + for (int i = 0; i < pri.length; i++) { + if (pri[i] >= 0 && !bitfield.get(i)) { + if (!want.get(i)) { + Piece piece = new Piece(i); + wantedPieces.add(piece); + // As connections are already up, new Pieces will + // not have their PeerID list populated, so do that. + synchronized(peers) { + for (Peer p : peers) { + PeerState s = p.state; + if (s != null) { + BitField bf = s.bitfield; + if (bf != null && bf.get(i)) + piece.addPeer(p); + } + } + } + } + } + } + // now set the new priorities and remove newly unwanted pieces + for (Iterator<Piece> iter = wantedPieces.iterator(); iter.hasNext(); ) { + Piece p = iter.next(); + int id = pri[p.getId()]; + if (id >= 0) + p.setPriority(pri[p.getId()]); + else + iter.remove(); + } + // if we added pieces, they will be in-order unless we shuffle + Collections.shuffle(wantedPieces, _random); + } + } + /** * Returns a byte array containing the requested piece or null of * the piece is unknown. @@ -632,14 +694,18 @@ public class PeerCoordinator implements PeerListener // No need to announce have piece to peers. // Assume we got a good piece, we don't really care anymore. - return true; + // Well, this could be caused by a change in priorities, so + // only return true if we already have it, otherwise might as well keep it. + if (storage.getBitField().get(piece)) + return true; } try { if (storage.putPiece(piece, bs)) { - _log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); + if (_log.shouldLog(Log.INFO)) + _log.info("Got valid piece " + piece + "/" + metainfo.getPieces() +" from " + peer + " for " + metainfo.getName()); } else { diff --git a/apps/i2psnark/java/src/org/klomp/snark/Piece.java b/apps/i2psnark/java/src/org/klomp/snark/Piece.java index 3fd0771f985446d5fd16d93cfd0ec296ba0fadd0..0ae9570e13a57f4a944278138816b043747168d2 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Piece.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Piece.java @@ -1,22 +1,31 @@ package org.klomp.snark; import java.util.Collections; -import java.util.HashSet; import java.util.Set; +import net.i2p.util.ConcurrentHashSet; + public class Piece implements Comparable { private int id; - private Set peers; + private Set<PeerID> peers; private boolean requested; + /** @since 0.8.1 */ + private int priority; public Piece(int id) { this.id = id; - this.peers = Collections.synchronizedSet(new HashSet()); - this.requested = false; + this.peers = new ConcurrentHashSet(); } + /** + * Highest priority first, + * then rarest first + */ public int compareTo(Object o) throws ClassCastException { + int pdiff = ((Piece)o).priority - this.priority; // reverse + if (pdiff != 0) + return pdiff; return this.peers.size() - ((Piece)o).peers.size(); } @@ -37,12 +46,25 @@ public class Piece implements Comparable { } public int getId() { return this.id; } - public Set getPeers() { return this.peers; } + /** @deprecated unused */ + public Set<PeerID> getPeers() { return this.peers; } public boolean addPeer(Peer peer) { return this.peers.add(peer.getPeerID()); } public boolean removePeer(Peer peer) { return this.peers.remove(peer.getPeerID()); } public boolean isRequested() { return this.requested; } public void setRequested(boolean requested) { this.requested = requested; } + /** @return default 0 @since 0.8.1 */ + public int getPriority() { return this.priority; } + + /** @since 0.8.1 */ + public void setPriority(int p) { this.priority = p; } + + /** @since 0.8.1 */ + public boolean isDisabled() { return this.priority < 0; } + + /** @since 0.8.1 */ + public void setDisabled() { this.priority = -1; } + @Override public String toString() { return String.valueOf(id); diff --git a/apps/i2psnark/java/src/org/klomp/snark/Storage.java b/apps/i2psnark/java/src/org/klomp/snark/Storage.java index f90301848d8ab9b2f4297313b031f9b5976ed5c1..1e8fb5ec2e5c8cd92b1c64e810d3cd6bc7e11bb3 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Storage.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Storage.java @@ -42,6 +42,8 @@ public class Storage private Object[] RAFlock; // lock on RAF access private long[] RAFtime; // when was RAF last accessed, or 0 if closed private File[] RAFfile; // File to make it easier to reopen + /** priorities by file; default 0; may be null. @since 0.8.1 */ + private int[] priorities; private final StorageListener listener; private I2PSnarkUtil _util; @@ -228,6 +230,8 @@ public class Storage RAFlock = new Object[size]; RAFtime = new long[size]; RAFfile = new File[size]; + priorities = new int[size]; + int i = 0; Iterator it = files.iterator(); @@ -330,6 +334,83 @@ public class Storage return -1; } + /** + * @param file canonical path (non-directory) + * @since 0.8.1 + */ + public int getPriority(String file) { + if (complete() || metainfo.getFiles() == null || priorities == null) + return 0; + for (int i = 0; i < rafs.length; i++) { + File f = RAFfile[i]; + // use canonical in case snark dir or sub dirs are symlinked + if (f != null) { + try { + String canonical = f.getCanonicalPath(); + if (canonical.equals(file)) + return priorities[i]; + } catch (IOException ioe) {} + } + } + return 0; + } + + /** + * Must call setPiecePriorities() after calling this + * @param file canonical path (non-directory) + * @param priority default 0; <0 to disable + * @since 0.8.1 + */ + public void setPriority(String file, int pri) { + if (complete() || metainfo.getFiles() == null || priorities == null) + return; + for (int i = 0; i < rafs.length; i++) { + File f = RAFfile[i]; + // use canonical in case snark dir or sub dirs are symlinked + if (f != null) { + try { + String canonical = f.getCanonicalPath(); + if (canonical.equals(file)) { + priorities[i] = pri; + return; + } + } catch (IOException ioe) {} + } + } + } + + /** + * Call setPriority() for all changed files first, + * then call this. + * Set the piece priority to the highest priority + * of all files spanning the piece. + * Caller must pass array to the PeerCoordinator. + * @return null on error, if complete, or if only one file + * @since 0.8.1 + */ + public int[] getPiecePriorities() { + if (complete() || metainfo.getFiles() == null) + return null; + int[] rv = new int[metainfo.getPieces()]; + int file = 0; + long pcEnd = -1; + long fileEnd = lengths[0] - 1; + int psz = metainfo.getPieceLength(0); + for (int i = 0; i < rv.length; i++) { + pcEnd += psz; + int pri = priorities[file]; + while (fileEnd <= pcEnd && file < lengths.length - 1) { + file++; + long oldFileEnd = fileEnd; + fileEnd += lengths[file]; + if (priorities[file] > pri && pcEnd < oldFileEnd) + pri = priorities[file]; + } + rv[i] = pri; + } + return rv; + } + /** * The BitField that tells which pieces this storage contains. * Do not change this since this is the current state of the storage. @@ -436,10 +517,14 @@ public class Storage changed = true; checkCreateFiles(); } - if (complete()) + if (complete()) { _util.debug("Torrent is complete", Snark.NOTICE); - else + } else { + // fixme saved priorities + if (files != null) + priorities = new int[files.size()]; _util.debug("Still need " + needed + " out of " + metainfo.getPieces() + " pieces", Snark.NOTICE); + } } /** @@ -624,7 +709,12 @@ public class Storage // the whole file? listener.storageCreateFile(this, names[nr], lengths[nr]); final int ZEROBLOCKSIZE = metainfo.getPieceLength(0); - byte[] zeros = new byte[ZEROBLOCKSIZE]; + byte[] zeros; + try { + zeros = new byte[ZEROBLOCKSIZE]; + } catch (OutOfMemoryError oom) { + throw new IOException(oom.toString()); + } int i; for (i = 0; i < lengths[nr]/ZEROBLOCKSIZE; i++) { diff --git a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java index 05df351c69b7c28e93deff8e783e1aa0a537cf21..d904f98ca1b63b2959fcb24979680b51aba65738 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java +++ b/apps/i2psnark/java/src/org/klomp/snark/TrackerClient.java @@ -266,7 +266,7 @@ public class TrackerClient extends I2PAppThread // we only want to talk to new people if we need things // from them (duh) List ordered = new ArrayList(peers); - Collections.shuffle(ordered); + Collections.shuffle(ordered, r); Iterator it = ordered.iterator(); while (it.hasNext()) { Peer cur = (Peer)it.next(); diff --git a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java index d3a04df1fbbcea023580802d8f296ad6f006b04a..1c97678005fc2d7b3259ef677a75ec26f9df6fd8 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java +++ b/apps/i2psnark/java/src/org/klomp/snark/web/I2PSnarkServlet.java @@ -133,6 +133,7 @@ public class I2PSnarkServlet extends Default { // bypass the horrid Resource.getListHTML() String pathInfo = req.getPathInfo(); String pathInContext = URI.addPaths(path, pathInfo); + req.setCharacterEncoding("UTF-8"); resp.setCharacterEncoding("UTF-8"); resp.setContentType("text/html; charset=UTF-8"); Resource resource = getResource(pathInContext); @@ -140,7 +141,7 @@ public class I2PSnarkServlet extends Default { resp.sendError(HttpResponse.__404_Not_Found); } else { String base = URI.addPaths(req.getRequestURI(), "/"); - String listing = getListHTML(resource, base, true); + String listing = getListHTML(resource, base, true, method.equals("POST") ? req.getParameterMap() : null); if (listing != null) resp.getWriter().write(listing); else // shouldn't happen @@ -1252,10 +1253,11 @@ public class I2PSnarkServlet extends Default { * @param r The Resource * @param base The base URL * @param parent True if the parent directory should be included + * @param postParams map of POST parameters or null if not a POST * @return String of HTML * @since 0.7.14 */ - private String getListHTML(Resource r, String base, boolean parent) + private String getListHTML(Resource r, String base, boolean parent, Map postParams) throws IOException { if (!r.isDirectory()) @@ -1280,6 +1282,10 @@ public class I2PSnarkServlet extends Default { else torrentName = title; Snark snark = _manager.getTorrentByBaseName(torrentName); + + if (snark != null && postParams != null) + savePriorities(snark, postParams); + if (title.endsWith("/")) title = title.substring(0, title.length() - 1); title = _("Torrent") + ": " + title; @@ -1297,12 +1303,19 @@ public class I2PSnarkServlet extends Default { .append(_("Up to higher level directory")).append("</A>\n"); } - buf.append("</div><div class=\"page\"><div class=\"mainsection\">" + - "<TABLE BORDER=0 class=\"snarkTorrents\" cellpadding=\"5px 10px\">" + + buf.append("</div><div class=\"page\"><div class=\"mainsection\">"); + boolean showPriority = snark != null && !snark.storage.complete(); + if (showPriority) + buf.append("<form action=\"").append(base).append("\" method=\"POST\">\n"); + buf.append("<TABLE BORDER=0 class=\"snarkTorrents\" cellpadding=\"5px 10px\">" + "<thead><tr><th>").append(_("File")).append("</th><th>").append(_("Size")) - .append("</th><th>").append(_("Status")).append("</th></tr></thead>"); + .append("</th><th>").append(_("Status")).append("</th>"); + if (showPriority) + buf.append("<th>").append(_("Priority")).append("</th>"); + buf.append("</tr></thead>\n"); //DateFormat dfmt=DateFormat.getDateTimeInstance(DateFormat.MEDIUM, // DateFormat.MEDIUM); + boolean showSaveButton = false; for (int i=0 ; i< ls.length ; i++) { String encoded=URI.encodePath(ls[i]); @@ -1340,7 +1353,8 @@ public class I2PSnarkServlet extends Default { complete = true; status = toImg("tick") + _("Complete"); } else { - status = toImg("clock") + + status = + (snark.storage.getPriority(f.getCanonicalPath()) < 0 ? toImg("cancel") : toImg("clock")) + (100 * (length - remaining) / length) + "% " + _("complete") + " (" + DataHelper.formatSize2(remaining) + _("bytes remaining") + ")"; } @@ -1384,9 +1398,40 @@ public class I2PSnarkServlet extends Default { buf.append("</TD><TD class=\"").append(rowClass).append(" snarkFileStatus\">"); //buf.append(dfmt.format(new Date(item.lastModified()))); buf.append(status); - buf.append("</TD></TR>\n"); + buf.append("</TD>"); + if (showPriority) { + buf.append("<td>"); + File f = item.getFile(); + if ((!complete) && (!item.isDirectory()) && f != null) { + int pri = snark.storage.getPriority(f.getCanonicalPath()); + buf.append("<input type=\"radio\" value=\"5\" name=\"pri.").append(f.getCanonicalPath()).append("\" "); + if (pri > 0) + buf.append("checked=\"true\""); + buf.append('>').append(_("High")); + + buf.append("<input type=\"radio\" value=\"0\" name=\"pri.").append(f.getCanonicalPath()).append("\" "); + if (pri == 0) + buf.append("checked=\"true\""); + buf.append('>').append(_("Normal")); + + buf.append("<input type=\"radio\" value=\"-9\" name=\"pri.").append(f.getCanonicalPath()).append("\" "); + if (pri < 0) + buf.append("checked=\"true\""); + buf.append('>').append(_("Do not download")); + showSaveButton = true; + } + buf.append("</td>"); + } + buf.append("</TR>\n"); + } + if (showSaveButton) { + buf.append("<thead><tr><th colspan=\"3\"> </th><th align=\"center\"><input type=\"submit\" value=\""); + buf.append(_("Save priorities")); + buf.append("\" name=\"foo\" ></th></tr></thead>\n"); } buf.append("</TABLE>\n"); + if (showPriority) + buf.append("</form>"); buf.append("</div></div></BODY></HTML>\n"); return buf.toString(); @@ -1452,6 +1497,25 @@ public class I2PSnarkServlet extends Default { return "<img alt=\"\" height=\"16\" width=\"16\" src=\"/i2psnark/_icons/" + icon + ".png\"> "; } + /** @since 0.8.1 */ + private static void savePriorities(Snark snark, Map postParams) { + Set<Map.Entry> entries = postParams.entrySet(); + for (Map.Entry entry : entries) { + String key = (String)entry.getKey(); + if (key.startsWith("pri.")) { + try { + String file = key.substring(4); + String val = ((String[])entry.getValue())[0]; // jetty arrays + int pri = Integer.parseInt(val); + snark.storage.setPriority(file, pri); + //System.err.println("Priority now " + pri + " for " + file); + } catch (Throwable t) { t.printStackTrace(); } + } + } + if (snark.coordinator != null) + snark.coordinator.updatePiecePriorities(); + } + /** inner class, don't bother reindenting */ private static class FetchAndAdd implements Runnable {