From 3c45b038c6900df128a93fa0b5692fbf8cb78ddc Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 27 Nov 2010 14:34:08 +0000 Subject: [PATCH] * i2psnark: - Drop queued outbound requests when choked - Redo some data structures and locking to hopefully prevent deadlock - Memory reduction part 3: Return partial pieces to PeerCoordinator when choked --- .../src/org/klomp/snark/PartialPiece.java | 2 +- .../src/org/klomp/snark/PeerCheckerTask.java | 35 ++-- .../org/klomp/snark/PeerConnectionOut.java | 15 +- .../src/org/klomp/snark/PeerCoordinator.java | 195 ++++++++++-------- .../src/org/klomp/snark/PeerListener.java | 17 +- .../java/src/org/klomp/snark/PeerState.java | 87 ++++++-- .../java/src/org/klomp/snark/Snark.java | 2 +- .../src/org/klomp/snark/SnarkManager.java | 2 +- 8 files changed, 223 insertions(+), 132 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java index 8ecae5778d..e810edc29f 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PartialPiece.java @@ -21,7 +21,7 @@ class PartialPiece implements Comparable { * Allocates the data. * * @param piece Piece number requested. - * @param bs length must be equal to the piece length + * @param len must be equal to the piece length */ public PartialPiece (int piece, int len) throws OutOfMemoryError { this.piece = piece; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java index 9d9fd2e2a7..289822df82 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCheckerTask.java @@ -49,10 +49,8 @@ class PeerCheckerTask extends TimerTask public void run() { - synchronized(coordinator.peers) - { - Iterator it = coordinator.peers.iterator(); - if ((!it.hasNext()) || coordinator.halted()) { + List<Peer> peerList = coordinator.peerList(); + if (peerList.isEmpty() || coordinator.halted()) { coordinator.peerCount = 0; coordinator.interestedAndChoking = 0; coordinator.setRateHistory(0, 0); @@ -76,19 +74,18 @@ class PeerCheckerTask extends TimerTask // Keep track of peers we remove now, // we will add them back to the end of the list. - List removed = new ArrayList(); + List<Peer> removed = new ArrayList(); int uploadLimit = coordinator.allowedUploaders(); boolean overBWLimit = coordinator.overUpBWLimit(); - while (it.hasNext()) - { - Peer peer = (Peer)it.next(); + for (Peer peer : peerList) { // Remove dying peers if (!peer.isConnected()) { - it.remove(); - coordinator.removePeerFromPieces(peer); - coordinator.peerCount = coordinator.peers.size(); + // This was just a failsafe, right? + //it.remove(); + //coordinator.removePeerFromPieces(peer); + //coordinator.peerCount = coordinator.peers.size(); continue; } @@ -140,7 +137,6 @@ class PeerCheckerTask extends TimerTask coordinator.uploaders--; // Put it at the back of the list - it.remove(); removed.add(peer); } else if (overBWLimitChoke) @@ -153,7 +149,6 @@ class PeerCheckerTask extends TimerTask removedCount++; // Put it at the back of the list for fairness, even though we won't be unchoking this time - it.remove(); removed.add(peer); } else if (peer.isInteresting() && peer.isChoked()) @@ -166,7 +161,6 @@ class PeerCheckerTask extends TimerTask removedCount++; // Put it at the back of the list - it.remove(); removed.add(peer); } else if (!peer.isInteresting() && !coordinator.completed()) @@ -179,7 +173,6 @@ class PeerCheckerTask extends TimerTask removedCount++; // Put it at the back of the list - it.remove(); removed.add(peer); } else if (peer.isInteresting() @@ -195,7 +188,6 @@ class PeerCheckerTask extends TimerTask removedCount++; // Put it at the back of the list - it.remove(); removed.add(peer); } else if (peer.isInteresting() && !peer.isChoked() && @@ -234,8 +226,6 @@ class PeerCheckerTask extends TimerTask removedCount++; // Put it at the back of the list - coordinator.peers.remove(worstDownloader); - coordinator.peerCount = coordinator.peers.size(); removed.add(worstDownloader); } @@ -244,8 +234,12 @@ class PeerCheckerTask extends TimerTask coordinator.unchokePeer(); // Put peers back at the end of the list that we removed earlier. - coordinator.peers.addAll(removed); - coordinator.peerCount = coordinator.peers.size(); + synchronized (coordinator.peers) { + for(Peer peer : removed) { + if (coordinator.peers.remove(peer)) + coordinator.peers.add(peer); + } + } coordinator.interestedAndChoking += removedCount; // store the rates @@ -255,6 +249,5 @@ class PeerCheckerTask extends TimerTask if (random.nextInt(4) == 0) coordinator.getStorage().cleanRAFs(); - } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 8ab0d55594..d4bc3b3460 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -42,7 +42,7 @@ class PeerConnectionOut implements Runnable private boolean quit; // Contains Messages. - private final List sendQueue = new ArrayList(); + private final List<Message> sendQueue = new ArrayList(); private static long __id = 0; private long _id; @@ -496,6 +496,19 @@ class PeerConnectionOut implements Runnable addMessage(m); } + /** + * Remove all Request messages from the queue + * @since 0.8.2 + */ + void cancelRequestMessages() { + synchronized(sendQueue) { + for (Iterator<Message> it = sendQueue.iterator(); it.hasNext(); ) { + if (it.next().type == Message.REQUEST) + it.remove(); + } + } + } + // Called by the PeerState when the other side doesn't want this // request to be handled anymore. Removes any pending Piece Message // from out send queue. diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java index 410ac43c5b..12e291ea89 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerCoordinator.java @@ -22,12 +22,15 @@ package org.klomp.snark; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.Random; import java.util.Timer; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; import net.i2p.util.I2PAppThread; @@ -62,7 +65,8 @@ public class PeerCoordinator implements PeerListener private long downloaded_old[] = {-1,-1,-1}; // synchronize on this when changing peers or downloaders - final List<Peer> peers = new ArrayList(); + // This is a Queue, not a Set, because PeerCheckerTask keeps things in order for choking/unchoking + final Queue<Peer> peers; /** estimate of the peers, without requiring any synchronization */ volatile int peerCount; @@ -72,9 +76,9 @@ public class PeerCoordinator implements PeerListener private final byte[] id; // Some random wanted pieces - private List<Piece> wantedPieces; + private final List<Piece> wantedPieces; - /** partial pieces */ + /** partial pieces - lock by synching on wantedPieces */ private final List<PartialPiece> partialPieces; private boolean halted = false; @@ -96,8 +100,10 @@ public class PeerCoordinator implements PeerListener this.listener = listener; this.snark = torrent; + wantedPieces = new ArrayList(); setWantedPieces(); partialPieces = new ArrayList(getMaxConnections() + 1); + peers = new LinkedBlockingQueue(); // Install a timer to check the uploaders. // Randomize the first start time so multiple tasks are spread out, @@ -109,22 +115,22 @@ public class PeerCoordinator implements PeerListener public void setWantedPieces() { // Make a list of pieces - // FIXME synchronize, clear and re-add instead? - // Don't replace something we are synchronizing on. - wantedPieces = new ArrayList(); - BitField bitfield = storage.getBitField(); - int[] pri = storage.getPiecePriorities(); - for(int i = 0; i < metainfo.getPieces(); i++) { - // only add if we don't have and the priority is >= 0 - if ((!bitfield.get(i)) && - (pri == null || pri[i] >= 0)) { - Piece p = new Piece(i); - if (pri != null) - p.setPriority(pri[i]); - wantedPieces.add(p); + synchronized(wantedPieces) { + wantedPieces.clear(); + BitField bitfield = storage.getBitField(); + int[] pri = storage.getPiecePriorities(); + for (int i = 0; i < metainfo.getPieces(); i++) { + // only add if we don't have and the priority is >= 0 + if ((!bitfield.get(i)) && + (pri == null || pri[i] >= 0)) { + Piece p = new Piece(i); + if (pri != null) + p.setPriority(pri[i]); + wantedPieces.add(p); + } + } + Collections.shuffle(wantedPieces, _random); } - } - Collections.shuffle(wantedPieces, _random); } public Storage getStorage() { return storage; } @@ -133,10 +139,7 @@ public class PeerCoordinator implements PeerListener // for web page detailed stats public List<Peer> peerList() { - synchronized(peers) - { return new ArrayList(peers); - } } public byte[] getID() @@ -155,12 +158,9 @@ public class PeerCoordinator implements PeerListener /** should be right */ public int getPeers() { - synchronized(peers) - { int rv = peers.size(); peerCount = rv; return rv; - } } /** @@ -254,10 +254,7 @@ public class PeerCoordinator implements PeerListener public boolean needPeers() { - synchronized(peers) - { return !halted && peers.size() < getMaxConnections(); - } } /** @@ -344,7 +341,10 @@ public class PeerCoordinator implements PeerListener // Add it to the beginning of the list. // And try to optimistically make it a uploader. - peers.add(0, peer); + // Can't add to beginning since we converted from a List to a Queue + // We can do this in Java 6 with a Deque + //peers.add(0, peer); + peers.add(peer); peerCount = peers.size(); unchokePeer(); @@ -358,8 +358,10 @@ public class PeerCoordinator implements PeerListener } } - // caller must synchronize on peers - private static Peer peerIDInList(PeerID pid, List peers) + /** + * @return peer if peer id is in the collection, else null + */ + private static Peer peerIDInList(PeerID pid, Collection<Peer> peers) { Iterator<Peer> it = peers.iterator(); while (it.hasNext()) { @@ -429,7 +431,6 @@ public class PeerCoordinator implements PeerListener // At the start are the peers that have us unchoked at the end the // other peer that are interested, but are choking us. List<Peer> interested = new LinkedList(); - synchronized (peers) { int count = 0; int unchokedCount = 0; int maxUploaders = allowedUploaders(); @@ -464,7 +465,6 @@ public class PeerCoordinator implements PeerListener peerCount = peers.size(); } interestedAndChoking = count; - } } public byte[] getBitMap() @@ -528,8 +528,19 @@ public class PeerCoordinator implements PeerListener * Returns one of pieces in the given BitField that is still wanted or * -1 if none of the given pieces are wanted. */ - public int wantPiece(Peer peer, BitField havePieces) - { + public int wantPiece(Peer peer, BitField havePieces) { + return wantPiece(peer, havePieces, true); + } + + /** + * Returns one of pieces in the given BitField that is still wanted or + * -1 if none of the given pieces are wanted. + * + * @param record if true, actually record in our data structures that we gave the + * request to this peer. If false, do not update the data structures. + * @since 0.8.2 + */ + private int wantPiece(Peer peer, BitField havePieces, boolean record) { if (halted) { if (_log.shouldLog(Log.WARN)) _log.warn("We don't want anything from the peer, as we are halted! peer=" + peer); @@ -539,7 +550,8 @@ public class PeerCoordinator implements PeerListener synchronized(wantedPieces) { Piece piece = null; - Collections.sort(wantedPieces); // Sort in order of rarest first. + if (record) + Collections.sort(wantedPieces); // Sort in order of rarest first. List<Piece> requested = new ArrayList(); Iterator<Piece> it = wantedPieces.iterator(); while (piece == null && it.hasNext()) @@ -567,7 +579,8 @@ public class PeerCoordinator implements PeerListener return -1; // nothing to request and not in end game // let's not all get on the same piece // Even better would be to sort by number of requests - Collections.shuffle(requested, _random); + if (record) + Collections.shuffle(requested, _random); Iterator<Piece> it2 = requested.iterator(); while (piece == null && it2.hasNext()) { @@ -575,7 +588,6 @@ public class PeerCoordinator implements PeerListener if (havePieces.get(p.getId())) { // limit number of parallel requests int requestedCount = 0; - synchronized(peers) { for (Peer pr : peers) { if (pr.isRequesting(p.getId())) { if (pr.equals(peer)) { @@ -587,7 +599,6 @@ public class PeerCoordinator implements PeerListener break; } } - } if (requestedCount >= MAX_PARALLEL_REQUESTS) continue; piece = p; @@ -608,9 +619,11 @@ public class PeerCoordinator implements PeerListener _log.debug("parallel request (end game?) for " + peer + ": piece = " + piece); } } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Now requesting: piece " + piece + " priority " + piece.getPriority()); - piece.setRequested(true); + if (record) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Now requesting: piece " + piece + " priority " + piece.getPriority()); + piece.setRequested(true); + } return piece.getId(); } } @@ -641,7 +654,6 @@ public class PeerCoordinator implements PeerListener wantedPieces.add(piece); // As connections are already up, new Pieces will // not have their PeerID list populated, so do that. - synchronized(peers) { for (Peer p : peers) { PeerState s = p.state; if (s != null) { @@ -650,7 +662,6 @@ public class PeerCoordinator implements PeerListener piece.addPeer(p); } } - } } } } @@ -663,11 +674,9 @@ public class PeerCoordinator implements PeerListener } else { iter.remove(); // cancel all peers - synchronized(peers) { for (Peer peer : peers) { peer.cancel(p.getId()); } - } } } if (_log.shouldLog(Log.DEBUG)) @@ -677,10 +686,8 @@ public class PeerCoordinator implements PeerListener // update request queues, in case we added wanted pieces // and we were previously uninterested - synchronized(peers) { for (Peer peer : peers) { peer.request(); - } } } } @@ -784,8 +791,6 @@ public class PeerCoordinator implements PeerListener // Announce to the world we have it! // Disconnect from other seeders when we get the last piece - synchronized(peers) - { List<Peer> toDisconnect = new ArrayList(); Iterator<Peer> it = peers.iterator(); while (it.hasNext()) @@ -805,11 +810,11 @@ public class PeerCoordinator implements PeerListener Peer p = it.next(); p.disconnect(true); } - } return true; } + /** this does nothing but logging */ public void gotChoke(Peer peer, boolean choke) { if (_log.shouldLog(Log.INFO)) @@ -823,8 +828,6 @@ public class PeerCoordinator implements PeerListener { if (interest) { - synchronized(peers) - { if (uploaders < allowedUploaders()) { if(peer.isChoking()) @@ -835,7 +838,6 @@ public class PeerCoordinator implements PeerListener _log.info("Unchoke: " + peer); } } - } } if (listener != null) @@ -893,7 +895,7 @@ public class PeerCoordinator implements PeerListener return; if (_log.shouldLog(Log.INFO)) _log.info("Partials received from " + peer + ": " + partials); - synchronized(partialPieces) { + synchronized(wantedPieces) { for (PartialPiece pp : partials) { if (pp.getDownloaded() > 0) { // PartialPiece.equals() only compares piece number, which is what we want @@ -936,26 +938,23 @@ public class PeerCoordinator implements PeerListener * @since 0.8.2 */ public PartialPiece getPartialPiece(Peer peer, BitField havePieces) { - // do it in this order to avoid deadlock (same order as in savePartialPieces()) - synchronized(partialPieces) { - synchronized(wantedPieces) { - // sorts by remaining bytes, least first - Collections.sort(partialPieces); - for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) { - PartialPiece pp = iter.next(); - int savedPiece = pp.getPiece(); - if (havePieces.get(savedPiece)) { - // this is just a double-check, it should be in there - for(Piece piece : wantedPieces) { - if (piece.getId() == savedPiece) { - piece.setRequested(true); - iter.remove(); - if (_log.shouldLog(Log.INFO)) { - _log.info("Restoring orphaned partial piece " + pp + - " Partial list size now: " + partialPieces.size()); - } - return pp; - } + synchronized(wantedPieces) { + // sorts by remaining bytes, least first + Collections.sort(partialPieces); + for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) { + PartialPiece pp = iter.next(); + int savedPiece = pp.getPiece(); + if (havePieces.get(savedPiece)) { + // this is just a double-check, it should be in there + for(Piece piece : wantedPieces) { + if (piece.getId() == savedPiece) { + piece.setRequested(true); + iter.remove(); + if (_log.shouldLog(Log.INFO)) { + _log.info("Restoring orphaned partial piece " + pp + + " Partial list size now: " + partialPieces.size()); + } + return pp; } } } @@ -977,13 +976,44 @@ public class PeerCoordinator implements PeerListener return null; } + /** + * Called when we are downloading from the peer and may need to ask for + * a new piece. Returns true if wantPiece() or getPartialPiece() would return a piece. + * + * @param peer the Peer that will be asked to provide the piece. + * @param havePieces a BitField containing the pieces that the other + * side has. + * + * @return if we want any of what the peer has + * @since 0.8.2 + */ + public boolean needPiece(Peer peer, BitField havePieces) { + synchronized(wantedPieces) { + for (PartialPiece pp : partialPieces) { + int savedPiece = pp.getPiece(); + if (havePieces.get(savedPiece)) { + // this is just a double-check, it should be in there + for(Piece piece : wantedPieces) { + if (piece.getId() == savedPiece) { + if (_log.shouldLog(Log.INFO)) { + _log.info("We could restore orphaned partial piece " + pp); + } + return true; + } + } + } + } + } + return wantPiece(peer, havePieces, false) > 0; + } + /** * Remove saved state for this piece. * Unless we are in the end game there shouldnt be anything in there. * Do not call with wantedPieces lock held (deadlock) */ private void removePartialPiece(int piece) { - synchronized(partialPieces) { + synchronized(wantedPieces) { for (Iterator<PartialPiece> iter = partialPieces.iterator(); iter.hasNext(); ) { PartialPiece pp = iter.next(); if (pp.getPiece() == piece) { @@ -1000,11 +1030,7 @@ public class PeerCoordinator implements PeerListener private void markUnrequestedIfOnlyOne(Peer peer, int piece) { // see if anybody else is requesting - synchronized (peers) - { - Iterator<Peer> it = peers.iterator(); - while (it.hasNext()) { - Peer p = it.next(); + for (Peer p : peers) { if (p.equals(peer)) continue; if (p.state == null) @@ -1016,16 +1042,13 @@ public class PeerCoordinator implements PeerListener return; } } - } // nobody is, so mark unrequested synchronized(wantedPieces) { - Iterator<Piece> it = wantedPieces.iterator(); - while (it.hasNext()) { - Piece p = it.next(); - if (p.getId() == piece) { - p.setRequested(false); + for (Piece pc : wantedPieces) { + if (pc.getId() == piece) { + pc.setRequested(false); if (_log.shouldLog(Log.DEBUG)) _log.debug("Removing from request list piece " + piece); return; diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java index 30f6fe453f..975c12c106 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerListener.java @@ -147,11 +147,25 @@ interface PeerListener */ int wantPiece(Peer peer, BitField bitfield); + /** + * Called when we are downloading from the peer and may need to ask for + * a new piece. Returns true if wantPiece() or getPartialPiece() would return a piece. + * + * @param peer the Peer that will be asked to provide the piece. + * @param bitfield a BitField containing the pieces that the other + * side has. + * + * @return if we want any of what the peer has + * @since 0.8.2 + */ + boolean needPiece(Peer peer, BitField bitfield); + /** * Called when the peer has disconnected and the peer task may have a partially * downloaded piece that the PeerCoordinator can save * - * @param state the PeerState for the peer + * @param peer the peer + * @since 0.8.2 */ void savePartialPieces(Peer peer, List<PartialPiece> pcs); @@ -162,6 +176,7 @@ interface PeerListener * @param havePieces the have-pieces bitmask for the peer * * @return request (contains the partial data and valid length) + * @since 0.8.2 */ PartialPiece getPartialPiece(Peer peer, BitField havePieces); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 3a8487c6d6..d32dc8ae52 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -68,6 +68,7 @@ class PeerState implements DataLoader /** the tail (NOT the head) of the request queue */ private Request lastRequest = null; + // FIXME if piece size < PARTSIZE, pipeline could be bigger private final static int MAX_PIPELINE = 5; // this is for outbound requests private final static int MAX_PIPELINE_BYTES = 128*1024; // this is for inbound requests public final static int PARTSIZE = 16*1024; // outbound request @@ -107,9 +108,14 @@ class PeerState implements DataLoader request(resend); if (choked) { - // TODO - // savePartialPieces - // clear request list + out.cancelRequestMessages(); + // old Roberts thrash us here, choke+unchoke right together + List<PartialPiece> pcs = returnPartialPieces(); + if (!pcs.isEmpty()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " got choked, returning partial pieces to the PeerCoordinator: " + pcs); + listener.savePartialPieces(this.peer, pcs); + } } } @@ -432,7 +438,8 @@ class PeerState implements DataLoader } /** - * get partial pieces, give them back to PeerCoordinator + * Get partial pieces, give them back to PeerCoordinator. + * Clears the request queue. * @return List of PartialPieces, even those with an offset == 0, or empty list * @since 0.8.2 */ @@ -445,6 +452,9 @@ class PeerState implements DataLoader if (req != null) rv.add(new PartialPiece(req)); } + outstandingRequests.clear(); + pendingRequest = null; + lastRequest = null; return rv; } @@ -513,12 +523,14 @@ class PeerState implements DataLoader // Request something else if necessary. addRequest(); + /**** taken care of in addRequest() synchronized(this) { // Is the peer still interesting? if (lastRequest == null) setInteresting(false); } + ****/ } /** @@ -568,9 +580,11 @@ class PeerState implements DataLoader if (resend) { synchronized (this) { - out.sendRequests(outstandingRequests); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Resending requests to " + peer + outstandingRequests); + if (!outstandingRequests.isEmpty()) { + out.sendRequests(outstandingRequests); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resending requests to " + peer + outstandingRequests); + } } } @@ -583,6 +597,15 @@ class PeerState implements DataLoader * Then send interested if we weren't. * Then send new requests if not choked. * If nothing to request, send not interested if we were. + * + * This is called from several places: + *<pre> + * By getOustandingRequest() when the first part of a chunk comes in + * By havePiece() when somebody got a new piece completed + * By chokeMessage() when we receive an unchoke + * By setInteresting() when we are now interested + * By PeerCoordinator.updatePiecePriorities() + *</pre> */ synchronized void addRequest() { @@ -591,9 +614,30 @@ class PeerState implements DataLoader { more_pieces = outstandingRequests.size() < MAX_PIPELINE; // We want something and we don't have outstanding requests? - if (more_pieces && lastRequest == null) + if (more_pieces && lastRequest == null) { + // we have nothing in the queue right now + if (!interesting) { + // If we need something, set interesting but delay pulling + // a request from the PeerCoordinator until unchoked. + if (listener.needPiece(this.peer, bitfield)) { + setInteresting(true); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " addRequest() we need something, setting interesting, delaying requestNextPiece()"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " addRequest() needs nothing"); + } + return; + } + if (choked) { + // If choked, delay pulling + // a request from the PeerCoordinator until unchoked. + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " addRequest() we are choked, delaying requestNextPiece()"); + return; + } more_pieces = requestNextPiece(); - else if (more_pieces) // We want something + } else if (more_pieces) // We want something { int pieceLength; boolean isLastChunk; @@ -621,6 +665,10 @@ class PeerState implements DataLoader } } + // failsafe + if (interesting && lastRequest == null && outstandingRequests.isEmpty()) + setInteresting(false); + if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " requests " + outstandingRequests); } @@ -633,8 +681,7 @@ class PeerState implements DataLoader private boolean requestNextPiece() { // Check that we already know what the other side has. - if (bitfield != null) - { + if (bitfield != null) { // Check for adopting an orphaned partial piece PartialPiece pp = listener.getPartialPiece(peer, bitfield); if (pp != null) { @@ -649,6 +696,7 @@ class PeerState implements DataLoader } } + /******* getPartialPiece() does it all now // Note that in addition to the bitfield, PeerCoordinator uses // its request tracking and isRequesting() to determine // what piece to give us next. @@ -683,11 +731,12 @@ class PeerState implements DataLoader out.sendRequest(req); lastRequest = req; return true; - } else { + } else { if (_log.shouldLog(Log.DEBUG)) _log.debug(peer + " no more pieces to request"); - } - } + } + *******/ + } // failsafe if (outstandingRequests.isEmpty()) @@ -707,11 +756,10 @@ class PeerState implements DataLoader synchronized void setInteresting(boolean interest) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(peer + " setInteresting(" + interest + ")"); - if (interest != interesting) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " setInteresting(" + interest + ")"); interesting = interest; out.sendInterest(interest); @@ -722,11 +770,10 @@ class PeerState implements DataLoader synchronized void setChoking(boolean choke) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug(peer + " setChoking(" + choke + ")"); - if (choking != choke) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(peer + " setChoking(" + choke + ")"); choking = choke; out.sendChoke(choke); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Snark.java b/apps/i2psnark/java/src/org/klomp/snark/Snark.java index 9d6cd94600..31105a2693 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Snark.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Snark.java @@ -745,7 +745,7 @@ public class Snark _util.debug(s, level, null); } - /** coordinatorListener */ + /** CoordinatorListener - this does nothing */ public void peerChange(PeerCoordinator coordinator, Peer peer) { // System.out.println(peer.toString()); diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index c7bec69062..3665a4aa2d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -851,7 +851,7 @@ public class SnarkManager implements Snark.CompleteListener { buf.append('/'); buf.append("\">").append(snark.storage.getBaseName()).append("</a>"); long len = snark.meta.getTotalLength(); - addMessage(_("Download finished: {0}", buf.toString()) + " (" + _("size: {0}B", DataHelper.formatSize2(len)) + ')'); + addMessage(_("Download finished: {0}", buf.toString())); // + " (" + _("size: {0}B", DataHelper.formatSize2(len)) + ')'); updateStatus(snark); } -- GitLab