I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
I2PSocketImpl.java 16.72 KiB
package net.i2p.client.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;

import net.i2p.I2PException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;

/**
 * Initial stub implementation for the socket
 *
 */
class I2PSocketImpl implements I2PSocket {
    private final static Log _log = new Log(I2PSocketImpl.class);

    public static final int MAX_PACKET_SIZE = 1024 * 32;
    public static final int PACKET_DELAY = 100;

    private I2PSocketManager manager;
    private Destination local;
    private Destination remote;
    private String localID;
    private String remoteID;
    private Object remoteIDWaiter = new Object();
    private I2PInputStream in;
    private I2POutputStream out;
    private SocketErrorListener _socketErrorListener;
    private boolean outgoing;
    private long _socketId;
    private static long __socketId = 0;
    private long _bytesRead = 0;
    private long _bytesWritten = 0;
    private Object flagLock = new Object();

    /**
     * Whether the I2P socket has already been closed.
     */
    private boolean closed = false;

    /**
     * Whether to send out a close packet when the socket is
     * closed. (If the socket is closed because of an incoming close
     * packet, we need not send one.)
     */
    private boolean sendClose = true;

    /**
     * Whether the I2P socket has already been closed and all data
     * (from I2P to the app, dunno whether to call this incoming or
     * outgoing) has been processed.
     */
    private boolean closed2 = false;

    /**
     * @param peer who this socket is (or should be) connected to 
     * @param mgr how we talk to the network
     * @param outgoing did we initiate the connection (true) or did we receive it (false)?
     * @param localID what is our half of the socket ID?
     */
    public I2PSocketImpl(Destination peer, I2PSocketManager mgr, boolean outgoing, String localID) {
        this.outgoing = outgoing;
        manager = mgr;
        remote = peer;
        _socketId = ++__socketId;
        local = mgr.getSession().getMyDestination();
        in = new I2PInputStream();
        I2PInputStream pin = new I2PInputStream();
        out = new I2POutputStream(pin);
        new I2PSocketRunner(pin);
        this.localID = localID;
    }

    /**
     * Our half of the socket's unique ID
     *
     */
    public String getLocalID() {
        return localID;
    }

    /**
     * We've received the other side's half of the socket's unique ID 
     */
    public void setRemoteID(String id) {
        synchronized (remoteIDWaiter) {
            remoteID = id;
            remoteIDWaiter.notifyAll();
        }
    }

    /**
     * Retrieve the other side's half of the socket's unique ID, or null if it
     * isn't known yet
     *
     * @param wait if true, we should wait until we receive it from the peer, otherwise
     *             return what we know immediately (which may be null)
     */
    public String getRemoteID(boolean wait) {
        try {
            return getRemoteID(wait, -1);
        } catch (InterruptedIOException iie) {
            _log.error("wtf, we said we didn't want it to time out!  you smell", iie);
            return null;
        }
    }

    /**
     * Retrieve the other side's half of the socket's unique ID, or null if it isn't
     * known yet and we were instructed not to wait
     *
     * @param wait should we wait for the peer to send us their half of the ID, or 
     *             just return immediately?
     * @param maxWait if we're going to wait, after how long should we timeout and fail?
     *                (if this value is < 0, we wait indefinitely)
     * @throws InterruptedIOException when the max waiting period has been exceeded
     */
    public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
        long dieAfter = System.currentTimeMillis() + maxWait;
        synchronized (remoteIDWaiter) {
            if (wait) {
                try {
                    if (maxWait >= 0)
                        remoteIDWaiter.wait(maxWait);
                    else
                        remoteIDWaiter.wait();
                } catch (InterruptedException ex) {
                }

                long now = System.currentTimeMillis();
                if ((maxWait >= 0) && (now >= dieAfter)) {
                    long waitedExcess = now - dieAfter;
                    throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess + "ms too long [" + maxWait + "ms])");
                }
             
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug("TIMING: RemoteID set to " 
                               + I2PSocketManager.getReadableForm(remoteID) + " for "
                               + this.hashCode());
            }
            return remoteID;
        }
    }

    /**
     * Retrieve the other side's half of the socket's unique ID, or null if it
     * isn't known yet.  This does not wait
     *
     */
    public String getRemoteID() {
        return getRemoteID(false);
    }

    /**
     * The other side has given us some data, so inject it into our socket's 
     * inputStream
     *
     * @param data the data to inject into our local inputStream
     */
    public void queueData(byte[] data) {
        _bytesRead += data.length;
        in.queueData(data);
    }

    /**
     * Return the Destination of this side of the socket.
     */
    public Destination getThisDestination() {
        return local;
    }

    /**
     * Return the destination of the peer.
     */
    public Destination getPeerDestination() {
        return remote;
    }

    /**
     * Return an InputStream to read from the socket.
     */
    public InputStream getInputStream() throws IOException {
        if ((in == null)) throw new IOException("Not connected");
        return in;
    }

    /**
     * Return an OutputStream to write into the socket.
     */
    public OutputStream getOutputStream() throws IOException {
        if ((out == null)) throw new IOException("Not connected");
        return out;
    }

    /**
     * Closes the socket if not closed yet (from the Application
     * side).
     */
    public void close() throws IOException {
        synchronized (flagLock) {
            _log.debug("Closing connection");
            closed = true;
        }
        out.close();
        in.notifyClosed();
    }

    /**
     * Close the socket from the I2P side, e. g. by a close packet.
     */
    protected void internalClose() {
        synchronized (flagLock) {
            closed = true;
            closed2 = true;
            sendClose = false;
        }
        out.close();
        in.notifyClosed();
    }

    private byte getMask(int add) {
        if (outgoing)
            return (byte)(I2PSocketManager.DATA_IN + (byte)add);
        else
            return (byte)(I2PSocketManager.DATA_OUT + (byte)add);
    }

    /**
     * What is the longest we'll block on the input stream while waiting
     * for more data?  If this value is exceeded, the read() throws 
     * InterruptedIOException
     */
    public long getReadTimeout() {
        return in.getReadTimeout();
    }

    public void setReadTimeout(long ms) {
        in.setReadTimeout(ms);
    }

    public void setSocketErrorListener(SocketErrorListener lsnr) {
        _socketErrorListener = lsnr;
    }
    
    void errorOccurred() {
        if (_socketErrorListener != null)
            _socketErrorListener.errorOccurred();
    }
    
    private String getPrefix() { return "[" + _socketId + "]: "; }
    
    //--------------------------------------------------
    private class I2PInputStream extends InputStream {

        private ByteCollector bc = new ByteCollector();

        private long readTimeout = -1;

        public long getReadTimeout() {
            return readTimeout;
        }
        
        public void setReadTimeout(long ms) {
            readTimeout = ms;
        }
        
        public int read() throws IOException {
            byte[] b = new byte[1];
            int res = read(b);
            if (res == 1) return b[0] & 0xff;
            if (res == -1) return -1;
            throw new RuntimeException("Incorrect read() result");
        }

        public synchronized int read(byte[] b, int off, int len) throws IOException {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + "Read called: " + this.hashCode());
            if (len == 0) return 0;
            long dieAfter = System.currentTimeMillis() + readTimeout;
            byte[] read = bc.startToByteArray(len);
            boolean timedOut = false;

            while (read.length == 0) {
                synchronized (flagLock) {
                    if (closed) {
                        if (_log.shouldLog(Log.DEBUG))
                            _log.debug(getPrefix() + "Closed is set after reading " + _bytesRead + " and writing " + _bytesWritten + ", so closing stream: " + hashCode());
                        return -1;
                    }
                }
                try {
                    if (readTimeout >= 0) {
                        wait(readTimeout);
                    } else {
                        wait();
                    }
                } catch (InterruptedException ex) {}

                if ((readTimeout >= 0)
                    && (System.currentTimeMillis() >= dieAfter)) {
                    throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)");
                }

                read = bc.startToByteArray(len);
            }
            if (read.length > len) throw new RuntimeException("BUG");
            System.arraycopy(read, 0, b, off, read.length);

            if (_log.shouldLog(Log.DEBUG)) {
                _log.debug(getPrefix() + "Read from I2PInputStream " + hashCode() + " returned " 
                           + read.length + " bytes");
            }
            //if (_log.shouldLog(Log.DEBUG)) {
            //  _log.debug("Read from I2PInputStream " + this.hashCode()
            //             + " returned "+read.length+" bytes:\n"
            //             + HexDump.dump(read));
            //}
            return read.length;
        }

        public int available() {
            return bc.getCurrentSize();
        }

        public void queueData(byte[] data) {
            queueData(data, 0, data.length);
        }

        public synchronized void queueData(byte[] data, int off, int len) {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
            bc.append(data, off, len);
            notifyAll();
        }

        public synchronized void notifyClosed() {
            I2PInputStream.this.notifyAll();
        }
        
        public void close() throws IOException {
            super.close();
            notifyClosed();
        }

    }
    private class I2POutputStream extends OutputStream {

        public I2PInputStream sendTo;

        public I2POutputStream(I2PInputStream sendTo) {
            this.sendTo = sendTo;
        }

        public void write(int b) throws IOException {
            write(new byte[] { (byte) b});
        }

        public void write(byte[] b, int off, int len) throws IOException {
            _bytesWritten += len;
            sendTo.queueData(b, off, len);
        }

        public void close() {
            sendTo.notifyClosed();
        }
    }

    private static volatile long __runnerId = 0;
    private class I2PSocketRunner extends I2PThread {

        public InputStream in;

        public I2PSocketRunner(InputStream in) {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode());
            this.in = in;
            String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
            setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4));
            start();
        }
        
        /**
         * Pump some more data
         *
         * @return true if we should keep on handling, false otherwise
         */
        private boolean handleNextPacket(ByteCollector bc, byte buffer[]) 
                                         throws IOException, I2PSessionException {
            int len = in.read(buffer);
            int bcsize = bc.getCurrentSize();
            if (len != -1) {
                bc.append(buffer, len);
            } else if (bcsize == 0) {
                // nothing left in the buffer, and read(..) got EOF (-1).
                // the bart the
                return false;
            }
            if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug(getPrefix() + "Runner Point d: " + hashCode());

                try {
                    Thread.sleep(PACKET_DELAY);
                } catch (InterruptedException e) {
                    _log.warn("wtf", e);
                }
            }
            if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
                byte[] data = bc.startToByteArray(MAX_PACKET_SIZE);
                if (data.length > 0) {
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug(getPrefix() + "Message size is: " + data.length);
                    boolean sent = sendBlock(data);
                    if (!sent) {
                        _log.error(getPrefix() + "Error sending message to peer.  Killing socket runner");
                        errorOccurred();
                        return false;
                    }
                }
            }
            return true;
        }

        public void run() {
            byte[] buffer = new byte[MAX_PACKET_SIZE];
            ByteCollector bc = new ByteCollector();
            boolean keepHandling = true;
            int packetsHandled = 0;
            try {
                //              try {
                while (keepHandling) {
                    keepHandling = handleNextPacket(bc, buffer);
                    packetsHandled++;
                }
                if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
                    _log.error(getPrefix() + "A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " 
                               + in.hashCode() + "; "
                               + "queue size: " + bc.getCurrentSize() + ")");
                }
                synchronized (flagLock) {
                    closed2 = true;
                }
                boolean sc;
                synchronized (flagLock) {
                    sc = sendClose;
                } // FIXME: Race here?
                if (sc) {
                    if (_log.shouldLog(Log.INFO))
                        _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
                    byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]);
                    boolean sent = manager.getSession().sendMessage(remote, packet);
                    if (!sent) {
                        _log.error(getPrefix() + "Error sending close packet to peer");
                        errorOccurred();
                    }
                }
                manager.removeSocket(I2PSocketImpl.this);
            } catch (InterruptedIOException ex) {
                _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
            } catch (IOException ex) {
                // WHOEVER removes this event on inconsistent
                // state before fixing the inconsistent state (a
                // reference on the socket in the socket manager
                // etc.) will get hanged by me personally -- mihi
                _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
            } catch (I2PException ex) {
                _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
            }
        }

        private boolean sendBlock(byte data[]) throws I2PSessionException {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
            if (remoteID == null) {
                _log.error(getPrefix() + "NULL REMOTEID");
                return false;
            }
            byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data);
            boolean sent;
            synchronized (flagLock) {
                if (closed2) return false;
            }
            sent = manager.getSession().sendMessage(remote, packet);
            return sent;
        }
    }
    
    public String toString() { return "" + hashCode(); }
}