PeerConnectionOut.java 17.42 KiB
/* PeerConnectionOut - Keeps a queue of outgoing messages and delivers them.
Copyright (C) 2003 Mark J. Wielaard
This file is part of Snark.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software Foundation,
Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
package org.klomp.snark;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.util.I2PAppThread;
import net.i2p.util.Log;
//import net.i2p.util.SimpleTimer;
class PeerConnectionOut implements Runnable
{
private final Log _log = I2PAppContext.getGlobalContext().logManager().getLog(PeerConnectionOut.class);
private final Peer peer;
private final DataOutputStream dout;
private Thread thread;
private boolean quit;
// Contains Messages.
private final List<Message> sendQueue = new ArrayList<Message>();
private static final AtomicLong __id = new AtomicLong();
private final long _id;
long lastSent;
public PeerConnectionOut(Peer peer, DataOutputStream dout)
{
this.peer = peer;
this.dout = dout;
_id = __id.incrementAndGet();
lastSent = System.currentTimeMillis();
}
public void startup() {
thread = new I2PAppThread(this, "Snark sender " + _id + ": " + peer);
thread.start();
}
/**
* Continuesly monitors for more outgoing messages that have to be send.
* Stops if quit is true or an IOException occurs.
*/
public void run()
{
try
{
while (!quit && peer.isConnected())
{
Message m = null;
PeerState state = null;
boolean shouldFlush;
synchronized(sendQueue)
{
shouldFlush = !quit && peer.isConnected() && sendQueue.isEmpty();
}
if (shouldFlush)
// Make sure everything will reach the other side.
// flush while not holding lock, could take a long time
dout.flush();
synchronized(sendQueue)
{
while (!quit && peer.isConnected() && sendQueue.isEmpty())
{
try
{
// Make sure everything will reach the other side.
// don't flush while holding lock, could take a long time
// dout.flush();
// Wait till more data arrives.
sendQueue.wait(60*1000);
}
catch (InterruptedException ie)
{
/* ignored */
}
}
state = peer.state;
if (!quit && state != null && peer.isConnected())
{
// Piece messages are big. So if there are other
// (control) messages make sure they are send first.
// Also remove request messages from the queue if
// we are currently being choked to prevent them from
// being send even if we get unchoked a little later.
// (Since we will resent them anyway in that case.)
// And remove piece messages if we are choking.
// this should get fixed for starvation
Iterator<Message> it = sendQueue.iterator();
while (m == null && it.hasNext())
{
Message nm = it.next();
if (nm.type == Message.PIECE)
{
if (state.choking) {
it.remove();
//SimpleTimer.getInstance().removeEvent(nm.expireEvent);
}
nm = null;
}
else if (nm.type == Message.REQUEST && state.choked)
{
it.remove();
//SimpleTimer.getInstance().removeEvent(nm.expireEvent);
nm = null;
}
if (nm != null)
{
m = nm;
//SimpleTimer.getInstance().removeEvent(nm.expireEvent);
it.remove();
}
}
if (m == null && !sendQueue.isEmpty()) {
m = sendQueue.remove(0);
//SimpleTimer.getInstance().removeEvent(m.expireEvent);
}
}
}
if (m != null)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug("Send " + peer + ": " + m);
// This can block for quite a while.
// To help get slow peers going, and track the bandwidth better,
// move this _after_ state.uploaded() and see how it works.
//m.sendMessage(dout);
lastSent = System.currentTimeMillis();
// Remove all piece messages after sending a choke message.
if (m.type == Message.CHOKE)
removeMessage(Message.PIECE);
// XXX - Should also register overhead...
// Don't let other clients requesting big chunks get an advantage
// when we are seeding;
// only count the rest of the upload after sendMessage().
int remainder = 0;
if (m.type == Message.PIECE) {
if (m.len <= PeerState.PARTSIZE) {
state.uploaded(m.len);
} else {
state.uploaded(PeerState.PARTSIZE);
remainder = m.len - PeerState.PARTSIZE;
}
}
m.sendMessage(dout);
if (remainder > 0)
state.uploaded(remainder);
m = null;
}
}
}
catch (IOException ioe)
{
// Ignore, probably other side closed connection.
if (_log.shouldLog(Log.INFO))
_log.info("IOError sending to " + peer, ioe);
}
catch (Throwable t)
{
_log.error("Error sending to " + peer, t);
if (t instanceof OutOfMemoryError)
throw (OutOfMemoryError)t;
}
finally
{
quit = true;
peer.disconnect();
}
}
public void disconnect()
{
synchronized(sendQueue)
{
//if (quit == true)
// return;
quit = true;
if (thread != null)
thread.interrupt();
sendQueue.clear();
sendQueue.notifyAll();
}
if (dout != null) {
try {
dout.close();
} catch (IOException ioe) {
//_log.warn("Error closing the stream to " + peer, ioe);
}
}
}
/**
* Adds a message to the sendQueue and notifies the method waiting
* on the sendQueue to change.
*/
private void addMessage(Message m)
{
synchronized(sendQueue)
{
sendQueue.add(m);
sendQueue.notifyAll();
}
}
/** remove messages not sent in 3m */
private static final int SEND_TIMEOUT = 3*60*1000;
/*****
private class RemoveTooSlow implements SimpleTimer.TimedEvent {
private Message _m;
public RemoveTooSlow(Message m) {
_m = m;
m.expireEvent = RemoveTooSlow.this;
}
public void timeReached() {
boolean removed = false;
synchronized (sendQueue) {
removed = sendQueue.remove(_m);
sendQueue.notifyAll();
}
if (removed)
_log.info("Took too long to send " + _m + " to " + peer);
}
}
*****/
/**
* Removes a particular message type from the queue.
*
* @param type the Message type to remove.
* @returns true when a message of the given type was removed, false
* otherwise.
*/
private boolean removeMessage(int type)
{
boolean removed = false;
synchronized(sendQueue)
{
Iterator<Message> it = sendQueue.iterator();
while (it.hasNext())
{
Message m = it.next();
if (m.type == type) {
it.remove();
removed = true;
if (type == Message.PIECE && peer.supportsFast()) {
Message r = new Message();
r.type = Message.REJECT;
r.piece = m.piece;
r.begin = m.begin;
r.length = m.length;
try {
r.sendMessage(dout);
} catch (IOException ioe) {}
}
}
}
sendQueue.notifyAll();
}
return removed;
}
void sendAlive()
{
Message m = new Message();
m.type = Message.KEEP_ALIVE;
// addMessage(m);
synchronized(sendQueue)
{
if(sendQueue.isEmpty())
sendQueue.add(m);
sendQueue.notifyAll();
}
}
void sendChoke(boolean choke)
{
// We cancel the (un)choke but keep PIECE messages.
// PIECE messages are purged if a choke is actually send.
synchronized(sendQueue)
{
int inverseType = choke ? Message.UNCHOKE
: Message.CHOKE;
if (!removeMessage(inverseType))
{
Message m = new Message();
if (choke)
m.type = Message.CHOKE;
else
m.type = Message.UNCHOKE;
addMessage(m);
}
}
}
void sendInterest(boolean interest)
{
synchronized(sendQueue)
{
int inverseType = interest ? Message.UNINTERESTED
: Message.INTERESTED;
if (!removeMessage(inverseType))
{
Message m = new Message();
if (interest)
m.type = Message.INTERESTED;
else
m.type = Message.UNINTERESTED;
addMessage(m);
}
}
}
void sendHave(int piece)
{
Message m = new Message();
m.type = Message.HAVE;
m.piece = piece;
addMessage(m);
}
void sendBitfield(BitField bitfield)
{
boolean fast = peer.supportsFast();
if (fast && bitfield.complete()) {
sendHaveAll();
} else if (fast && bitfield.count() <= 0) {
sendHaveNone();
} else {
Message m = new Message();
m.type = Message.BITFIELD;
m.data = bitfield.getFieldBytes();
m.off = 0;
m.len = m.data.length;
addMessage(m);
}
}
/** reransmit requests not received in 7m */
private static final int REQ_TIMEOUT = (2 * SEND_TIMEOUT) + (60 * 1000);
void retransmitRequests(List<Request> requests)
{
long now = System.currentTimeMillis();
Iterator<Request> it = requests.iterator();
while (it.hasNext())
{
Request req = it.next();
if(now > req.sendTime + REQ_TIMEOUT) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Retransmit request " + req + " to peer " + peer);
sendRequest(req);
}
}
}
void sendRequests(List<Request> requests)
{
Iterator<Request> it = requests.iterator();
while (it.hasNext())
{
Request req = it.next();
sendRequest(req);
}
}
void sendRequest(Request req)
{
// Check for duplicate requests to deal with fibrillating i2p-bt
// (multiple choke/unchokes received cause duplicate requests in the queue)
synchronized(sendQueue)
{
Iterator<Message> it = sendQueue.iterator();
while (it.hasNext())
{
Message m = it.next();
if (m.type == Message.REQUEST && m.piece == req.getPiece() &&
m.begin == req.off && m.length == req.len)
{
if (_log.shouldLog(Log.DEBUG))
_log.debug("Discarding duplicate request " + req + " to peer " + peer);
return;
}
}
}
Message m = new Message();
m.type = Message.REQUEST;
m.piece = req.getPiece();
m.begin = req.off;
m.length = req.len;
addMessage(m);
req.sendTime = System.currentTimeMillis();
}
// Used by PeerState to limit pipelined requests
int queuedBytes()
{
int total = 0;
synchronized(sendQueue)
{
Iterator<Message> it = sendQueue.iterator();
while (it.hasNext())
{
Message m = it.next();
if (m.type == Message.PIECE)
total += m.length;
}
}
return total;
}
/**
* Queue a piece message with a callback to load the data
* from disk when required.
* @since 0.8.2
*/
void sendPiece(int piece, int begin, int length, DataLoader loader)
{
/****
boolean sendNow = false;
// are there any cases where we should?
if (sendNow) {
// queue the real thing
byte[] bytes = loader.loadData(piece, begin, length);
if (bytes != null)
sendPiece(piece, begin, length, bytes);
return;
}
****/
// queue a fake message... set everything up,
// except save the PeerState instead of the bytes.
Message m = new Message();
m.type = Message.PIECE;
m.piece = piece;
m.begin = begin;
m.length = length;
m.dataLoader = loader;
m.off = 0;
m.len = length;
addMessage(m);
}
/**
* Queue a piece message with the data already loaded from disk
* Also add a timeout.
* We don't use this anymore.
*/
void sendPiece(int piece, int begin, int length, byte[] bytes)
{
Message m = new Message();
m.type = Message.PIECE;
m.piece = piece;
m.begin = begin;
m.length = length;
m.data = bytes;
m.off = 0;
m.len = length;
// since we have the data already loaded, queue a timeout to remove it
// no longer prefetched
addMessage(m);
}
void sendCancel(Request req)
{
// See if it is still in our send queue
synchronized(sendQueue)
{
Iterator<Message> it = sendQueue.iterator();
while (it.hasNext())
{
Message m = it.next();
if (m.type == Message.REQUEST
&& m.piece == req.getPiece()
&& m.begin == req.off
&& m.length == req.len)
it.remove();
}
}
// Always send, just to be sure it it is really canceled.
Message m = new Message();
m.type = Message.CANCEL;
m.piece = req.getPiece();
m.begin = req.off;
m.length = req.len;
addMessage(m);
}
/**
* Remove all Request messages from the queue
* @since 0.8.2
*/
void cancelRequestMessages() {
synchronized(sendQueue) {
for (Iterator<Message> it = sendQueue.iterator(); it.hasNext(); ) {
if (it.next().type == Message.REQUEST)
it.remove();
}
}
}
// Called by the PeerState when the other side doesn't want this
// request to be handled anymore. Removes any pending Piece Message
// from out send queue.
void cancelRequest(int piece, int begin, int length)
{
synchronized (sendQueue)
{
Iterator<Message> it = sendQueue.iterator();
while (it.hasNext())
{
Message m = it.next();
if (m.type == Message.PIECE
&& m.piece == piece
&& m.begin == begin
&& m.length == length)
it.remove();
}
}
}
/** @since 0.8.2 */
void sendExtension(int id, byte[] bytes) {
Message m = new Message();
m.type = Message.EXTENSION;
m.piece = id;
m.data = bytes;
m.off = 0;
m.len = bytes.length;
addMessage(m);
}
/** @since 0.8.4 */
void sendPort(int port) {
Message m = new Message();
m.type = Message.PORT;
m.piece = port;
addMessage(m);
}
/**
* Unused
* @since 0.9.21
*/
void sendSuggest(int piece) {
Message m = new Message();
m.type = Message.SUGGEST;
m.piece = piece;
addMessage(m);
}
/** @since 0.9.21 */
private void sendHaveAll() {
Message m = new Message();
m.type = Message.HAVE_ALL;
addMessage(m);
}
/** @since 0.9.21 */
private void sendHaveNone() {
Message m = new Message();
m.type = Message.HAVE_NONE;
addMessage(m);
}
/** @since 0.9.21 */
void sendReject(int piece, int begin, int length) {
Message m = new Message();
m.type = Message.REJECT;
m.piece = piece;
m.begin = begin;
m.length = length;
addMessage(m);
}
/**
* Unused
* @since 0.9.21
*/
void sendAllowedFast(int piece) {
Message m = new Message();
m.type = Message.ALLOWED_FAST;
m.piece = piece;
addMessage(m);
}
}