From 7f1ace4dbed7eb30c4071052d080f7298fac372b Mon Sep 17 00:00:00 2001 From: zzz Date: Fri, 26 Nov 2010 00:44:00 +0000 Subject: [PATCH] * i2psnark: Clean up and enhance the PeerCoordinator's partial piece handling, in preparation for more improvements --- .../src/org/klomp/snark/PartialPiece.java | 102 ++++++++++ .../java/src/org/klomp/snark/Peer.java | 8 +- .../src/org/klomp/snark/PeerCoordinator.java | 191 +++++++++++------- .../src/org/klomp/snark/PeerListener.java | 15 +- .../java/src/org/klomp/snark/PeerState.java | 125 ++++++------ .../java/src/org/klomp/snark/Piece.java | 5 +- .../java/src/org/klomp/snark/Request.java | 1 + 7 files changed, 304 insertions(+), 143 deletions(-) create mode 100644 apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java diff --git a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java new file mode 100644 index 000000000..8ecae5778 --- /dev/null +++ b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java @@ -0,0 +1,102 @@ +package org.klomp.snark; + +/** + * This is the class passed from PeerCoordinator to PeerState so + * PeerState may start requests. + * + * It is also passed from PeerState to PeerCoordinator when + * a piece is not completely downloaded, for example + * when the Peer disconnects or chokes. + */ +class PartialPiece implements Comparable { + + private final int piece; + private final byte[] bs; + private final int off; + private final long createdTime; + + /** + * Used by PeerCoordinator. + * Creates a new PartialPiece, with no chunks yet downloaded. + * Allocates the data. + * + * @param piece Piece number requested. + * @param bs length must be equal to the piece length + */ + public PartialPiece (int piece, int len) throws OutOfMemoryError { + this.piece = piece; + this.bs = new byte[len]; + this.off = 0; + this.createdTime = 0; + } + + /** + * Used by PeerState. + * Creates a new PartialPiece, with chunks up to but not including + * firstOutstandingRequest already downloaded and stored in the Request byte array. + * + * Note that this cannot handle gaps; chunks after a missing chunk cannot be saved. + * That would be harder. + * + * @param firstOutstandingRequest the first request not fulfilled for the piece + */ + public PartialPiece (Request firstOutstandingRequest) { + this.piece = firstOutstandingRequest.piece; + this.bs = firstOutstandingRequest.bs; + this.off = firstOutstandingRequest.off; + this.createdTime = System.currentTimeMillis(); + } + + /** + * Convert this PartialPiece to a request for the next chunk. + * Used by PeerState only. + */ + + public Request getRequest() { + return new Request(this.piece, this.bs, this.off, Math.min(this.bs.length - this.off, PeerState.PARTSIZE)); + } + + /** piece number */ + public int getPiece() { + return this.piece; + } + + /** how many bytes are good */ + public int getDownloaded() { + return this.off; + } + + public long getCreated() { + return this.createdTime; + } + + /** + * Highest downloaded first + */ + public int compareTo(Object o) throws ClassCastException { + return ((PartialPiece)o).off - this.off; // reverse + } + + @Override + public int hashCode() { + return piece * 7777; + } + + /** + * Make this simple so PeerCoordinator can keep a List. + * Warning - compares piece number only! + */ + @Override + public boolean equals(Object o) { + if (o instanceof PartialPiece) { + PartialPiece pp = (PartialPiece)o; + return pp.piece == this.piece; + } + return false; + } + + @Override + public String toString() { + return "Partial(" + piece + ',' + off + ',' + bs.length + ')'; + } +} diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index e747332d8..257c5ff9a 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -27,6 +27,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Arrays; +import java.util.List; import net.i2p.client.streaming.I2PSocket; import net.i2p.util.Log; @@ -368,8 +369,11 @@ public class Peer implements Comparable if (this.deregister) { PeerListener p = s.listener; if (p != null) { - p.savePeerPartial(s); - p.markUnrequested(this); + List pcs = s.returnPartialPieces(); + if (!pcs.isEmpty()) + p.savePartialPieces(this, pcs); + // now covered by savePartialPieces + //p.markUnrequested(this); } } state = null; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index c8f8d7d7f..410ac43c5 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -74,6 +74,9 @@ public class PeerCoordinator implements PeerListener // Some random wanted pieces private List wantedPieces; + /** partial pieces */ + private final List partialPieces; + private boolean halted = false; private final CoordinatorListener listener; @@ -94,6 +97,7 @@ public class PeerCoordinator implements PeerListener this.snark = torrent; setWantedPieces(); + partialPieces = new ArrayList(getMaxConnections() + 1); // Install a timer to check the uploaders. // Randomize the first start time so multiple tasks are spread out, @@ -293,7 +297,9 @@ public class PeerCoordinator implements PeerListener removePeerFromPieces(peer); } // delete any saved orphan partial piece - savedRequest = null; + synchronized (partialPieces) { + partialPieces.clear(); + } } public void connected(Peer peer) @@ -773,6 +779,9 @@ public class PeerCoordinator implements PeerListener wantedPieces.remove(p); } + // just in case + removePartialPiece(piece); + // Announce to the world we have it! // Disconnect from other seeders when we get the last piece synchronized(peers) @@ -866,70 +875,123 @@ public class PeerCoordinator implements PeerListener } } - - /** Simple method to save a partial piece on peer disconnection + /** + * Save partial pieces on peer disconnection * and hopefully restart it later. - * Only one partial piece is saved at a time. - * Replace it if a new one is bigger or the old one is too old. + * Replace a partial piece in the List if the new one is bigger. * Storage method is private so we can expand to save multiple partials * if we wish. + * + * Also mark the piece unrequested if this peer was the only one. + * + * @param peer partials, must include the zero-offset (empty) ones too + * @since 0.8.2 */ - private Request savedRequest = null; - private long savedRequestTime = 0; - public void savePeerPartial(PeerState state) + public void savePartialPieces(Peer peer, List partials) { - if (halted) - return; - Request req = state.getPartialRequest(); - if (req == null) - return; - if (savedRequest == null || - req.off > savedRequest.off || - System.currentTimeMillis() > savedRequestTime + (15 * 60 * 1000)) { - if (savedRequest == null || (req.piece != savedRequest.piece && req.off != savedRequest.off)) { - if (_log.shouldLog(Log.DEBUG)) { - _log.debug(" Saving orphaned partial piece " + req); - if (savedRequest != null) - _log.debug(" (Discarding previously saved orphan) " + savedRequest); - } + if (halted) + return; + if (_log.shouldLog(Log.INFO)) + _log.info("Partials received from " + peer + ": " + partials); + synchronized(partialPieces) { + for (PartialPiece pp : partials) { + if (pp.getDownloaded() > 0) { + // PartialPiece.equals() only compares piece number, which is what we want + int idx = partialPieces.indexOf(pp); + if (idx < 0) { + partialPieces.add(pp); + if (_log.shouldLog(Log.INFO)) + _log.info("Saving orphaned partial piece (new) " + pp); + } else if (idx >= 0 && pp.getDownloaded() > partialPieces.get(idx).getDownloaded()) { + // replace what's there now + partialPieces.set(idx, pp); + if (_log.shouldLog(Log.INFO)) + _log.info("Saving orphaned partial piece (bigger) " + pp); + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Discarding partial piece (not bigger)" + pp); + } + int max = getMaxConnections(); + if (partialPieces.size() > max) { + // sorts by remaining bytes, least first + Collections.sort(partialPieces); + PartialPiece gone = partialPieces.remove(max); + if (_log.shouldLog(Log.INFO)) + _log.info("Discarding orphaned partial piece (list full)" + gone); + } + } // else drop the empty partial piece + // synchs on wantedPieces... + markUnrequestedIfOnlyOne(peer, pp.getPiece()); + } + if (_log.shouldLog(Log.INFO)) + _log.info("Partial list size now: " + partialPieces.size()); } - savedRequest = req; - savedRequestTime = System.currentTimeMillis(); - } else { - if (req.piece != savedRequest.piece) - if (_log.shouldLog(Log.DEBUG)) - _log.debug(" Discarding orphaned partial piece " + req); - } } - /** Return partial piece if it's still wanted and peer has it. + /** + * Return partial piece to the PeerState if it's still wanted and peer has it. + * @param havePieces pieces the peer has, the rv will be one of these + * + * @return PartialPiece or null + * @since 0.8.2 */ - public Request getPeerPartial(BitField havePieces) { - if (savedRequest == null) - return null; - if (! havePieces.get(savedRequest.piece)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Peer doesn't have orphaned piece " + savedRequest); - return null; - } - synchronized(wantedPieces) - { - for(Iterator iter = wantedPieces.iterator(); iter.hasNext(); ) { - Piece piece = iter.next(); - if (piece.getId() == savedRequest.piece) { - Request req = savedRequest; - piece.setRequested(true); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Restoring orphaned partial piece " + req); - savedRequest = null; - return req; + public PartialPiece getPartialPiece(Peer peer, BitField havePieces) { + // do it in this order to avoid deadlock (same order as in savePartialPieces()) + synchronized(partialPieces) { + synchronized(wantedPieces) { + // sorts by remaining bytes, least first + Collections.sort(partialPieces); + for (Iterator iter = partialPieces.iterator(); iter.hasNext(); ) { + PartialPiece pp = iter.next(); + int savedPiece = pp.getPiece(); + if (havePieces.get(savedPiece)) { + // this is just a double-check, it should be in there + for(Piece piece : wantedPieces) { + if (piece.getId() == savedPiece) { + piece.setRequested(true); + iter.remove(); + if (_log.shouldLog(Log.INFO)) { + _log.info("Restoring orphaned partial piece " + pp + + " Partial list size now: " + partialPieces.size()); + } + return pp; + } + } + } + } + } + } + // ...and this section turns this into the general move-requests-around code! + // Temporary? So PeerState never calls wantPiece() directly for now... + int piece = wantPiece(peer, havePieces); + if (piece >= 0) { + try { + return new PartialPiece(piece, metainfo.getPieceLength(piece)); + } catch (OutOfMemoryError oom) { + if (_log.shouldLog(Log.WARN)) + _log.warn("OOM creating new partial piece"); + } + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("We have no partial piece to return"); + return null; + } + + /** + * Remove saved state for this piece. + * Unless we are in the end game there shouldnt be anything in there. + * Do not call with wantedPieces lock held (deadlock) + */ + private void removePartialPiece(int piece) { + synchronized(partialPieces) { + for (Iterator iter = partialPieces.iterator(); iter.hasNext(); ) { + PartialPiece pp = iter.next(); + if (pp.getPiece() == piece) { + iter.remove(); + // there should be only one but keep going to be sure + } } - } } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("We no longer want orphaned piece " + savedRequest); - savedRequest = null; - return null; } /** Clear the requested flag for a piece if the peer @@ -947,13 +1009,12 @@ public class PeerCoordinator implements PeerListener continue; if (p.state == null) continue; - int[] arr = p.state.getRequestedPieces(); - for (int i = 0; arr[i] >= 0; i++) - if(arr[i] == piece) { + // FIXME don't go into the state + if (p.state.getRequestedPieces().contains(Integer.valueOf(piece))) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Another peer is requesting piece " + piece); return; - } + } } } @@ -973,20 +1034,6 @@ public class PeerCoordinator implements PeerListener } } - /** Mark a peer's requested pieces unrequested when it is disconnected - ** Once for each piece - ** This is enough trouble, maybe would be easier just to regenerate - ** the requested list from scratch instead. - */ - public void markUnrequested(Peer peer) - { - if (halted || peer.state == null) - return; - int[] arr = peer.state.getRequestedPieces(); - for (int i = 0; arr[i] >= 0; i++) - markUnrequestedIfOnlyOne(peer, arr[i]); - } - /** Return number of allowed uploaders for this torrent. ** Check with Snark to see if we are over the total upload limit. */ diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index 2cbd34bb5..30f6fe453 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -20,10 +20,12 @@ package org.klomp.snark; +import java.util.List; + /** * Listener for Peer events. */ -public interface PeerListener +interface PeerListener { /** * Called when the connection to the peer has started and the @@ -151,7 +153,7 @@ public interface PeerListener * * @param state the PeerState for the peer */ - void savePeerPartial(PeerState state); /* FIXME Exporting non-public type through public API FIXME */ + void savePartialPieces(Peer peer, List pcs); /** * Called when a peer has connected and there may be a partially @@ -161,12 +163,5 @@ public interface PeerListener * * @return request (contains the partial data and valid length) */ - Request getPeerPartial(BitField havePieces); /* FIXME Exporting non-public type through public API FIXME */ - - /** Mark a peer's requested pieces unrequested when it is disconnected - * This prevents premature end game - * - * @param peer the peer that is disconnecting - */ - void markUnrequested(Peer peer); + PartialPiece getPartialPiece(Peer peer, BitField havePieces); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index d649b8227..3a8487c6d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -23,9 +23,11 @@ package org.klomp.snark; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.util.Log; @@ -36,9 +38,9 @@ import org.klomp.snark.bencode.BEValue; class PeerState implements DataLoader { private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerState.class); - final Peer peer; + private final Peer peer; final PeerListener listener; - final MetaInfo metainfo; + private final MetaInfo metainfo; // Interesting and choking describes whether we are interested in or // are choking the other side. @@ -54,6 +56,7 @@ class PeerState implements DataLoader long downloaded; long uploaded; + /** the pieces the peer has */ BitField bitfield; // Package local for use by Peer. @@ -102,6 +105,12 @@ class PeerState implements DataLoader if (interesting && !choked) request(resend); + + if (choked) { + // TODO + // savePartialPieces + // clear request list + } } void interestedMessage(boolean interest) @@ -308,8 +317,11 @@ class PeerState implements DataLoader } } + /** + * @return index in outstandingRequests or -1 + */ synchronized private int getFirstOutstandingRequest(int piece) - { + { for (int i = 0; i < outstandingRequests.size(); i++) if (outstandingRequests.get(i).piece == piece) return i; @@ -397,54 +409,56 @@ class PeerState implements DataLoader } - // get longest partial piece - synchronized Request getPartialRequest() - { - Request req = null; - for (int i = 0; i < outstandingRequests.size(); i++) { - Request r1 = outstandingRequests.get(i); - int j = getFirstOutstandingRequest(r1.piece); - if (j == -1) - continue; - Request r2 = outstandingRequests.get(j); - if (r2.off > 0 && ((req == null) || (r2.off > req.off))) - req = r2; - } - if (pendingRequest != null && req != null && pendingRequest.off < req.off) { - if (pendingRequest.off != 0) - req = pendingRequest; - else - req = null; - } - return req; + /** + * @return lowest offset of any request for the piece + * @since 0.8.2 + */ + synchronized private Request getLowestOutstandingRequest(int piece) { + Request rv = null; + int lowest = Integer.MAX_VALUE; + for (Request r : outstandingRequests) { + if (r.piece == piece && r.off < lowest) { + lowest = r.off; + rv = r; + } + } + if (pendingRequest != null && + pendingRequest.piece == piece && pendingRequest.off < lowest) + rv = pendingRequest; + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " lowest for " + piece + " is " + rv + " out of " + pendingRequest + " and " + outstandingRequests); + return rv; } /** - * return array of pieces terminated by -1 - * remove most duplicates - * but still could be some duplicates, not guaranteed - * TODO rework this Java-style to return a Set or a List + * get partial pieces, give them back to PeerCoordinator + * @return List of PartialPieces, even those with an offset == 0, or empty list + * @since 0.8.2 */ - synchronized int[] getRequestedPieces() + synchronized List returnPartialPieces() { - int size = outstandingRequests.size(); - int[] arr = new int[size+2]; - int pc = -1; - int pos = 0; - if (pendingRequest != null) { - pc = pendingRequest.piece; - arr[pos++] = pc; - } - Request req = null; - for (int i = 0; i < size; i++) { - Request r1 = outstandingRequests.get(i); - if (pc != r1.piece) { - pc = r1.piece; - arr[pos++] = pc; + Set pcs = getRequestedPieces(); + List rv = new ArrayList(pcs.size()); + for (Integer p : pcs) { + Request req = getLowestOutstandingRequest(p.intValue()); + if (req != null) + rv.add(new PartialPiece(req)); } - } - arr[pos] = -1; - return(arr); + return rv; + } + + /** + * @return all pieces we are currently requesting, or empty Set + */ + synchronized Set getRequestedPieces() { + Set rv = new HashSet(outstandingRequests.size() + 1); + for (Request req : outstandingRequests) { + rv.add(Integer.valueOf(req.piece)); + if (pendingRequest != null) + rv.add(Integer.valueOf(pendingRequest.piece)); + } + return rv; } void cancelMessage(int piece, int begin, int length) @@ -555,6 +569,8 @@ class PeerState implements DataLoader { synchronized (this) { out.sendRequests(outstandingRequests); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resending requests to " + peer + outstandingRequests); } } @@ -620,24 +636,17 @@ class PeerState implements DataLoader if (bitfield != null) { // Check for adopting an orphaned partial piece - Request r = listener.getPeerPartial(bitfield); - if (r != null) { - // Check that r not already in outstandingRequests - int[] arr = getRequestedPieces(); - boolean found = false; - for (int i = 0; arr[i] >= 0; i++) { - if (arr[i] == r.piece) { - found = true; - break; - } - } - if (!found) { + PartialPiece pp = listener.getPartialPiece(peer, bitfield); + if (pp != null) { + // Double-check that r not already in outstandingRequests + if (!getRequestedPieces().contains(Integer.valueOf(pp.getPiece()))) { + Request r = pp.getRequest(); outstandingRequests.add(r); if (!choked) out.sendRequest(r); lastRequest = r; return true; - } + } } // Note that in addition to the bitfield, PeerCoordinator uses diff --git a/apps/i2psnark/java/src/org/klomp/snark/Piece.java b/apps/i2psnark/java/src/org/klomp/snark/Piece.java index 0ae9570e1..68b2ddfd4 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Piece.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Piece.java @@ -5,7 +5,10 @@ import java.util.Set; import net.i2p.util.ConcurrentHashSet; -public class Piece implements Comparable { +/** + * This class is used solely by PeerCoordinator. + */ +class Piece implements Comparable { private int id; private Set peers; diff --git a/apps/i2psnark/java/src/org/klomp/snark/Request.java b/apps/i2psnark/java/src/org/klomp/snark/Request.java index cc8600b13..6c086ebae 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Request.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Request.java @@ -22,6 +22,7 @@ package org.klomp.snark; /** * Holds all information needed for a partial piece request. + * This class should be used only by PeerState, PeerConnectionIn, and PeerConnectionOut. */ class Request {