Files
i2p.i2p/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java
sponge ca5c15d4de Added more complete javadocs to ministreaming and cleaned up overrides so
the code is JDK5 compliant. There remains some unchecked warnings, but these
aren't important at this juncture.
2008-10-11 10:28:31 +00:00

677 lines
25 KiB
Java

package net.i2p.client.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.Clock;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
/**
* Initial stub implementation for the socket
*
*/
class I2PSocketImpl implements I2PSocket {
private final static Log _log = new Log(I2PSocketImpl.class);
public static final int MAX_PACKET_SIZE = 1024 * 32;
public static final int PACKET_DELAY = 100;
private I2PSocketManagerImpl manager;
private Destination local;
private Destination remote;
private String localID;
private String remoteID;
private Object remoteIDWaiter = new Object();
private I2PInputStream in;
private I2POutputStream out;
private I2PSocket.SocketErrorListener _socketErrorListener;
private boolean outgoing;
private long _socketId;
private static long __socketId = 0;
private long _bytesRead = 0;
private long _bytesWritten = 0;
private long _createdOn;
private long _closedOn;
private long _remoteIdSetTime;
private I2PSocketOptions _options;
private Object flagLock = new Object();
/**
* Whether the I2P socket has already been closed.
*/
private boolean closed = false;
/**
* Whether to send out a close packet when the socket is
* closed. (If the socket is closed because of an incoming close
* packet, we need not send one.)
*/
private boolean sendClose = true;
/**
* Whether the I2P socket has already been closed and all data
* (from I2P to the app, dunno whether to call this incoming or
* outgoing) has been processed.
*/
private boolean closed2 = false;
/**
* @param peer who this socket is (or should be) connected to
* @param mgr how we talk to the network
* @param outgoing did we initiate the connection (true) or did we receive it (false)?
* @param localID what is our half of the socket ID?
*/
public I2PSocketImpl(Destination peer, I2PSocketManagerImpl mgr, boolean outgoing, String localID) {
this.outgoing = outgoing;
manager = mgr;
remote = peer;
_socketId = ++__socketId;
local = mgr.getSession().getMyDestination();
String us = mgr.getSession().getMyDestination().calculateHash().toBase64().substring(0,4);
String name = us + (outgoing ? "->" : "<-") + peer.calculateHash().toBase64().substring(0,4);
in = new I2PInputStream(name + " in");
I2PInputStream pin = new I2PInputStream(name + " out");
out = new I2POutputStream(pin);
new I2PSocketRunner(pin);
this.localID = localID;
_createdOn = I2PAppContext.getGlobalContext().clock().now();
_remoteIdSetTime = -1;
_closedOn = -1;
_options = mgr.getDefaultOptions();
}
/**
* Our half of the socket's unique ID
*
*/
public String getLocalID() {
return localID;
}
/**
* We've received the other side's half of the socket's unique ID
*/
public void setRemoteID(String id) {
synchronized (remoteIDWaiter) {
remoteID = id;
_remoteIdSetTime = System.currentTimeMillis();
remoteIDWaiter.notifyAll();
}
}
/**
* Retrieve the other side's half of the socket's unique ID, or null if it
* isn't known yet
*
* @param wait if true, we should wait until we receive it from the peer, otherwise
* return what we know immediately (which may be null)
*/
public String getRemoteID(boolean wait) {
try {
return getRemoteID(wait, -1);
} catch (InterruptedIOException iie) {
_log.error("wtf, we said we didn't want it to time out! you smell", iie);
return null;
}
}
/**
* Retrieve the other side's half of the socket's unique ID, or null if it isn't
* known yet and we were instructed not to wait
*
* @param wait should we wait for the peer to send us their half of the ID, or
* just return immediately?
* @param maxWait if we're going to wait, after how long should we timeout and fail?
* (if this value is < 0, we wait indefinitely)
* @throws InterruptedIOException when the max waiting period has been exceeded
*/
public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
long dieAfter = System.currentTimeMillis() + maxWait;
synchronized (remoteIDWaiter) {
if (wait) {
if (remoteID == null) {
try {
if (maxWait >= 0)
remoteIDWaiter.wait(maxWait);
else
remoteIDWaiter.wait();
} catch (InterruptedException ex) {
}
}
long now = System.currentTimeMillis();
if ((maxWait >= 0) && (now >= dieAfter)) {
long waitedExcess = now - dieAfter;
throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess
+ "ms too long [" + maxWait + "ms, remId " + remoteID
+ ", remId set " + (now-_remoteIdSetTime) + "ms ago])");
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("TIMING: RemoteID set to "
+ I2PSocketManagerImpl.getReadableForm(remoteID) + " for "
+ this.hashCode());
}
return remoteID;
}
}
/**
* Retrieve the other side's half of the socket's unique ID, or null if it
* isn't known yet. This does not wait
*
*/
public String getRemoteID() {
return getRemoteID(false);
}
/**
* The other side has given us some data, so inject it into our socket's
* inputStream
*
* @param data the data to inject into our local inputStream
*/
public void queueData(byte[] data) {
_bytesRead += data.length;
try {
in.queueData(data, false);
} catch (IOException ioe) {
_log.log(Log.CRIT, "wtf, we said DONT block, how can we timeout?", ioe);
}
}
/**
* Return the Destination of this side of the socket.
*/
public Destination getThisDestination() {
return local;
}
/**
* Return the destination of the peer.
*/
public Destination getPeerDestination() {
return remote;
}
/**
* Return an InputStream to read from the socket.
*/
public InputStream getInputStream() throws IOException {
if ((in == null)) throw new IOException("Not connected");
return in;
}
/**
* Return an OutputStream to write into the socket.
*/
public OutputStream getOutputStream() throws IOException {
if ((out == null)) throw new IOException("Not connected");
return out;
}
/**
* Closes the socket if not closed yet (from the Application
* side).
*/
public void close() throws IOException {
synchronized (flagLock) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Closing connection");
closed = true;
_closedOn = I2PAppContext.getGlobalContext().clock().now();
}
out.close();
in.notifyClosed();
}
public boolean isClosed() { return _closedOn > 0; }
/**
* Close the socket from the I2P side (by a close packet)
*/
protected void internalClose() {
synchronized (flagLock) {
closed = true;
closed2 = true;
sendClose = false;
_closedOn = I2PAppContext.getGlobalContext().clock().now();
}
out.close();
in.notifyClosed();
}
private byte getMask(int add) {
if (outgoing)
return (byte)(I2PSocketManagerImpl.DATA_IN + (byte)add);
else
return (byte)(I2PSocketManagerImpl.DATA_OUT + (byte)add);
}
public void setOptions(I2PSocketOptions options) {
_options = options;
in.setReadTimeout(options.getReadTimeout());
}
public I2PSocketOptions getOptions() {
return _options;
}
/**
* How long we will wait blocked on a read() operation. This is simply a
* helper to query the I2PSocketOptions
*
* @return milliseconds to wait, or -1 if we will wait indefinitely
*/
public long getReadTimeout() {
return _options.getReadTimeout();
}
/**
* Define how long we will wait blocked on a read() operation (-1 will make
* the socket wait forever). This is simply a helper to adjust the
* I2PSocketOptions
*
*/
public void setReadTimeout(long ms) {
_options.setReadTimeout(ms);
in.setReadTimeout(ms);
}
public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) {
_socketErrorListener = lsnr;
}
void errorOccurred() {
if (_socketErrorListener != null)
_socketErrorListener.errorOccurred();
}
public long getBytesSent() { return _bytesWritten; }
public long getBytesReceived() { return _bytesRead; }
public long getCreatedOn() { return _createdOn; }
public long getClosedOn() { return _closedOn; }
private String getPrefix() { return "[" + _socketId + "]: "; }
//--------------------------------------------------
private class I2PInputStream extends InputStream {
private String streamName;
private ByteCollector bc = new ByteCollector();
private boolean inStreamClosed = false;
private long readTimeout = -1;
public I2PInputStream(String name) {
streamName = name;
}
public long getReadTimeout() {
return readTimeout;
}
private String getStreamPrefix() {
return getPrefix() + streamName + ": ";
}
public void setReadTimeout(long ms) {
readTimeout = ms;
}
public int read() throws IOException {
byte[] b = new byte[1];
int res = read(b);
if (res == 1) return b[0] & 0xff;
if (res == -1) return -1;
throw new RuntimeException("Incorrect read() result");
}
// I have to ask if this method is really needed, since the JDK has this already,
// including the timeouts. Perhaps the need is for debugging more than anything
// else?
@Override
public int read(byte[] b, int off, int len) throws IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "Read called for " + len + " bytes (avail="
+ bc.getCurrentSize() + "): " + this.hashCode());
if (len == 0) return 0;
long dieAfter = System.currentTimeMillis() + readTimeout;
byte[] read = null;
synchronized (bc) {
read = bc.startToByteArray(len);
bc.notifyAll();
}
boolean timedOut = false;
while ( (read.length == 0) && (!inStreamClosed) ) {
synchronized (flagLock) {
if (closed) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "Closed is set after reading "
+ _bytesRead + " and writing " + _bytesWritten
+ ", so closing stream: " + hashCode());
return -1;
}
}
try {
synchronized (I2PSocketImpl.I2PInputStream.this) {
if (readTimeout >= 0) {
wait(readTimeout);
} else {
wait();
}
}
} catch (InterruptedException ex) {}
if ((readTimeout >= 0)
&& (System.currentTimeMillis() >= dieAfter)) {
throw new InterruptedIOException(getStreamPrefix() + "Timeout reading from I2PSocket ("
+ readTimeout + " msecs)");
}
synchronized (bc) {
read = bc.startToByteArray(len);
bc.notifyAll();
}
}
if (read.length > len) throw new RuntimeException("BUG");
if ( (inStreamClosed) && ( (read == null) || (read.length <= 0) ) )
return -1;
System.arraycopy(read, 0, b, off, read.length);
if (_log.shouldLog(Log.DEBUG)) {
_log.debug(getStreamPrefix() + "Read from I2PInputStream " + hashCode() + " returned "
+ read.length + " bytes");
}
//if (_log.shouldLog(Log.DEBUG)) {
// _log.debug("Read from I2PInputStream " + this.hashCode()
// + " returned "+read.length+" bytes:\n"
// + HexDump.dump(read));
//}
return read.length;
}
/**
* @return 0 if empty, > 0 if there is data.
*/
@Override
public int available() {
synchronized (bc) {
return bc.getCurrentSize();
}
}
/**
* Add the data to the queue
*
* @param allowBlock if true, we will block if the buffer and the socket options
* say so, otherwise we simply take the data regardless.
* @throws InterruptedIOException if the queue's buffer is full, the socket has
* a write timeout, and that timeout is exceeded
* @throws IOException if the connection was closed while queueing up the data
*/
void queueData(byte[] data, boolean allowBlock) throws InterruptedIOException, IOException {
queueData(data, 0, data.length, allowBlock);
}
/**
* Add the data to the queue
*
* @param allowBlock if true, we will block if the buffer and the socket options
* say so, otherwise we simply take the data regardless.
* @throws InterruptedIOException if the queue's buffer is full, the socket has
* a write timeout, and that timeout is exceeded
* @throws IOException if the connection was closed while queueing up the data
*/
public void queueData(byte[] data, int off, int len, boolean allowBlock) throws InterruptedIOException, IOException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
Clock clock = I2PAppContext.getGlobalContext().clock();
long endAfter = clock.now() + _options.getWriteTimeout();
synchronized (bc) {
if (allowBlock) {
if (_options.getMaxBufferSize() > 0) {
while (bc.getCurrentSize() > _options.getMaxBufferSize()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "Buffer size exceeded: pending "
+ bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
if (_options.getWriteTimeout() > 0) {
long timeLeft = endAfter - clock.now();
if (timeLeft <= 0) {
long waited = _options.getWriteTimeout() - timeLeft;
throw new InterruptedIOException(getStreamPrefix() + "Waited too long ("
+ waited + "ms) to write "
+ len + " with a buffer at " + bc.getCurrentSize());
}
}
if (inStreamClosed)
throw new IOException(getStreamPrefix() + "Stream closed while writing");
if (_closedOn > 0)
throw new IOException(getStreamPrefix() + "I2PSocket closed while writing");
try {
bc.wait(1000);
} catch (InterruptedException ie) {}
}
}
}
bc.append(data, off, len);
}
synchronized (I2PInputStream.this) {
I2PInputStream.this.notifyAll();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode());
}
public void notifyClosed() {
synchronized (I2PInputStream.this) {
I2PInputStream.this.notifyAll();
}
}
@Override
public void close() throws IOException {
super.close();
notifyClosed();
synchronized (bc) {
inStreamClosed = true;
bc.notifyAll();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getStreamPrefix() + "After close");
}
}
private class I2POutputStream extends OutputStream {
public I2PInputStream sendTo;
public I2POutputStream(I2PInputStream sendTo) {
this.sendTo = sendTo;
}
public void write(int b) throws IOException {
write(new byte[] { (byte) b});
}
// This override is faster than the built in JDK,
// but there are other variations not handled
@Override
public void write(byte[] b, int off, int len) throws IOException {
_bytesWritten += len;
sendTo.queueData(b, off, len, true);
}
@Override
public void close() {
sendTo.notifyClosed();
}
}
private static volatile long __runnerId = 0;
private class I2PSocketRunner extends I2PThread {
public InputStream in;
public I2PSocketRunner(InputStream in) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode());
this.in = in;
String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4));
start();
}
/**
* Pump some more data
*
* @return true if we should keep on handling, false otherwise
*/
private boolean handleNextPacket(ByteCollector bc, byte buffer[])
throws IOException, I2PSessionException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket");
int len = in.read(buffer);
int bcsize = 0;
synchronized (bc) {
bcsize = bc.getCurrentSize();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize);
if (len != -1) {
synchronized (bc) {
bc.append(buffer, len);
}
} else if (bcsize == 0) {
// nothing left in the buffer, and read(..) got EOF (-1).
// the bart the
return false;
}
if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode());
try {
Thread.sleep(PACKET_DELAY);
} catch (InterruptedException e) {
_log.warn("wtf", e);
}
}
if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
byte data[] = null;
synchronized (bc) {
data = bc.startToByteArray(MAX_PACKET_SIZE);
}
if (data.length > 0) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length);
boolean sent = sendBlock(data);
if (!sent) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer. Killing socket runner");
errorOccurred();
return false;
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer");
}
}
}
return true;
}
@Override
public void run() {
byte[] buffer = new byte[MAX_PACKET_SIZE];
ByteCollector bc = new ByteCollector();
boolean keepHandling = true;
int packetsHandled = 0;
try {
// try {
while (keepHandling) {
keepHandling = handleNextPacket(bc, buffer);
packetsHandled++;
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + ":" + Thread.currentThread().getName()
+ "Packets handled: " + packetsHandled);
}
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + ":" + Thread.currentThread().getName()
+ "After handling packets, we're done. Packets handled: " + packetsHandled);
if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: "
+ in.hashCode() + "; "
+ "queue size: " + bc.getCurrentSize() + ")");
}
synchronized (flagLock) {
closed2 = true;
}
boolean sc;
synchronized (flagLock) {
sc = sendClose;
} // FIXME: Race here?
if (sc) {
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + ":" + Thread.currentThread().getName()
+ "Sending close packet: (we started? " + outgoing
+ ") after reading " + _bytesRead + " and writing " + _bytesWritten);
byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x02), remoteID, new byte[0]);
boolean sent = manager.getSession().sendMessage(remote, packet);
if (!sent) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + ":" + Thread.currentThread().getName()
+ "Error sending close packet to peer");
errorOccurred();
}
}
manager.removeSocket(I2PSocketImpl.this);
internalClose();
} catch (InterruptedIOException ex) {
_log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
} catch (IOException ex) {
// WHOEVER removes this event on inconsistent
// state before fixing the inconsistent state (a
// reference on the socket in the socket manager
// etc.) will get hanged by me personally -- mihi
_log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
} catch (I2PException ex) {
_log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
}
}
private boolean sendBlock(byte data[]) throws I2PSessionException {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
if (remoteID == null) {
_log.error(getPrefix() + "NULL REMOTEID");
return false;
}
byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x00), remoteID, data);
boolean sent;
synchronized (flagLock) {
if (closed2) return false;
}
sent = manager.getSession().sendMessage(remote, packet);
return sent;
}
}
@Override
public String toString() { return "" + hashCode(); }
}