diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index 194ae22cdeeea3810bbc9debfebf2ec0febfde64..51272e8a672dbaebfd282e419221d0bf5a5ed941 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -22,9 +22,10 @@ package org.klomp.snark; import java.io.DataOutputStream; import java.io.IOException; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; @@ -42,7 +43,7 @@ class PeerConnectionOut implements Runnable private boolean quit; // Contains Messages. - private final List<Message> sendQueue = new ArrayList<Message>(); + private final BlockingQueue<Message> sendQueue = new LinkedBlockingQueue<Message>(); private static final AtomicLong __id = new AtomicLong(); private final long _id; @@ -124,6 +125,16 @@ class PeerConnectionOut implements Runnable if (state.choking) { it.remove(); //SimpleTimer.getInstance().removeEvent(nm.expireEvent); + if (peer.supportsFast()) { + Message r = new Message(); + r.type = Message.REJECT; + r.piece = nm.piece; + r.begin = nm.begin; + r.length = nm.length; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send " + peer + ": " + r); + r.sendMessage(dout); + } } nm = null; } @@ -141,8 +152,8 @@ class PeerConnectionOut implements Runnable it.remove(); } } - if (m == null && !sendQueue.isEmpty()) { - m = sendQueue.remove(0); + if (m == null) { + m = sendQueue.poll(); //SimpleTimer.getInstance().removeEvent(m.expireEvent); } } @@ -233,7 +244,7 @@ class PeerConnectionOut implements Runnable { synchronized(sendQueue) { - sendQueue.add(m); + sendQueue.offer(m); sendQueue.notifyAll(); } } @@ -307,7 +318,7 @@ class PeerConnectionOut implements Runnable synchronized(sendQueue) { if(sendQueue.isEmpty()) - sendQueue.add(m); + sendQueue.offer(m); sendQueue.notifyAll(); } }