I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
  • jrandom's avatar
    813679ba
    2004-10-23 jrandom · 813679ba
    jrandom authored and zzz's avatar zzz committed
        * Minor ministreaming lib refactoring to simplify integration of the full
          streaming lib.
        * Minor bugfixes to data structure serialization.
    813679ba
    History
    2004-10-23 jrandom
    jrandom authored and zzz's avatar zzz committed
        * Minor ministreaming lib refactoring to simplify integration of the full
          streaming lib.
        * Minor bugfixes to data structure serialization.
I2PSocketImpl.java 24.80 KiB
package net.i2p.client.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;

import net.i2p.I2PAppContext;
import net.i2p.I2PException;
import net.i2p.client.I2PSessionException;
import net.i2p.data.Destination;
import net.i2p.util.Clock;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;


/**
 * Initial stub implementation for the socket
 *
 */
class I2PSocketImpl implements I2PSocket {
    private final static Log _log = new Log(I2PSocketImpl.class);

    public static final int MAX_PACKET_SIZE = 1024 * 32;
    public static final int PACKET_DELAY = 100;

    private I2PSocketManagerImpl manager;
    private Destination local;
    private Destination remote;
    private String localID;
    private String remoteID;
    private Object remoteIDWaiter = new Object();
    private I2PInputStream in;
    private I2POutputStream out;
    private SocketErrorListener _socketErrorListener;
    private boolean outgoing;
    private long _socketId;
    private static long __socketId = 0;
    private long _bytesRead = 0;
    private long _bytesWritten = 0;
    private long _createdOn;
    private long _closedOn;
    private long _remoteIdSetTime;
    private I2PSocketOptions _options;
    private Object flagLock = new Object();

    /**
     * Whether the I2P socket has already been closed.
     */
    private boolean closed = false;

    /**
     * Whether to send out a close packet when the socket is
     * closed. (If the socket is closed because of an incoming close
     * packet, we need not send one.)
     */
    private boolean sendClose = true;

    /**
     * Whether the I2P socket has already been closed and all data
     * (from I2P to the app, dunno whether to call this incoming or
     * outgoing) has been processed.
     */
    private boolean closed2 = false;

    /**
     * @param peer who this socket is (or should be) connected to 
     * @param mgr how we talk to the network
     * @param outgoing did we initiate the connection (true) or did we receive it (false)?
     * @param localID what is our half of the socket ID?
     */
    public I2PSocketImpl(Destination peer, I2PSocketManagerImpl mgr, boolean outgoing, String localID) {
        this.outgoing = outgoing;
        manager = mgr;
        remote = peer;
        _socketId = ++__socketId;
        local = mgr.getSession().getMyDestination();
        String us = mgr.getSession().getMyDestination().calculateHash().toBase64().substring(0,4);
        String name = us + (outgoing ? "->" : "<-") + peer.calculateHash().toBase64().substring(0,4);
        in = new I2PInputStream(name + " in");
        I2PInputStream pin = new I2PInputStream(name + " out");
        out = new I2POutputStream(pin);
        new I2PSocketRunner(pin);
        this.localID = localID;
        _createdOn = I2PAppContext.getGlobalContext().clock().now();
        _remoteIdSetTime = -1;
        _closedOn = -1;
        _options = mgr.getDefaultOptions();
    }

    /**
     * Our half of the socket's unique ID
     *
     */
    public String getLocalID() {
        return localID;
    }

    /**
     * We've received the other side's half of the socket's unique ID 
     */
    public void setRemoteID(String id) {
        synchronized (remoteIDWaiter) {
            remoteID = id;
            _remoteIdSetTime = System.currentTimeMillis();
            remoteIDWaiter.notifyAll();
        }
    }

    /**
     * Retrieve the other side's half of the socket's unique ID, or null if it
     * isn't known yet
     *
     * @param wait if true, we should wait until we receive it from the peer, otherwise
     *             return what we know immediately (which may be null)
     */
    public String getRemoteID(boolean wait) {
        try {
            return getRemoteID(wait, -1);
        } catch (InterruptedIOException iie) {
            _log.error("wtf, we said we didn't want it to time out!  you smell", iie);
            return null;
        }
    }

    /**
     * Retrieve the other side's half of the socket's unique ID, or null if it isn't
     * known yet and we were instructed not to wait
     *
     * @param wait should we wait for the peer to send us their half of the ID, or 
     *             just return immediately?
     * @param maxWait if we're going to wait, after how long should we timeout and fail?
     *                (if this value is < 0, we wait indefinitely)
     * @throws InterruptedIOException when the max waiting period has been exceeded
     */
    public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException {
        long dieAfter = System.currentTimeMillis() + maxWait;
        synchronized (remoteIDWaiter) {
            if (wait) {
                if (remoteID == null) {
                    try {
                        if (maxWait >= 0)
                            remoteIDWaiter.wait(maxWait);
                        else
                            remoteIDWaiter.wait();
                    } catch (InterruptedException ex) {
                    }
                }

                long now = System.currentTimeMillis();
                if ((maxWait >= 0) && (now >= dieAfter)) {
                    long waitedExcess = now - dieAfter;
                    throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess 
                                                     + "ms too long [" + maxWait + "ms, remId " + remoteID
                                                     + ", remId set " + (now-_remoteIdSetTime) + "ms ago])");
                }
             
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug("TIMING: RemoteID set to " 
                               + I2PSocketManagerImpl.getReadableForm(remoteID) + " for "
                               + this.hashCode());
            }
            return remoteID;
        }
    }

    /**
     * Retrieve the other side's half of the socket's unique ID, or null if it
     * isn't known yet.  This does not wait
     *
     */
    public String getRemoteID() {
        return getRemoteID(false);
    }

    /**
     * The other side has given us some data, so inject it into our socket's 
     * inputStream
     *
     * @param data the data to inject into our local inputStream
     */
    public void queueData(byte[] data) {
        _bytesRead += data.length;
        try {
            in.queueData(data, false);
        } catch (IOException ioe) {
            _log.log(Log.CRIT, "wtf, we said DONT block, how can we timeout?", ioe);
        }
    }

    /**
     * Return the Destination of this side of the socket.
     */
    public Destination getThisDestination() {
        return local;
    }

    /**
     * Return the destination of the peer.
     */
    public Destination getPeerDestination() {
        return remote;
    }

    /**
     * Return an InputStream to read from the socket.
     */
    public InputStream getInputStream() throws IOException {
        if ((in == null)) throw new IOException("Not connected");
        return in;
    }

    /**
     * Return an OutputStream to write into the socket.
     */
    public OutputStream getOutputStream() throws IOException {
        if ((out == null)) throw new IOException("Not connected");
        return out;
    }

    /**
     * Closes the socket if not closed yet (from the Application
     * side).
     */
    public void close() throws IOException {
        synchronized (flagLock) {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug("Closing connection");
            closed = true;
            _closedOn = I2PAppContext.getGlobalContext().clock().now();
        }
        out.close();
        in.notifyClosed();
    }

    /**
     * Close the socket from the I2P side (by a close packet)
     */
    protected void internalClose() {
        synchronized (flagLock) {
            closed = true;
            closed2 = true;
            sendClose = false;
            _closedOn = I2PAppContext.getGlobalContext().clock().now();
        }
        out.close();
        in.notifyClosed();
    }

    private byte getMask(int add) {
        if (outgoing)
            return (byte)(I2PSocketManagerImpl.DATA_IN + (byte)add);
        else
            return (byte)(I2PSocketManagerImpl.DATA_OUT + (byte)add);
    }

    public void setOptions(I2PSocketOptions options) {
        _options = options;
        in.setReadTimeout(options.getReadTimeout());
    }
    
    public I2PSocketOptions getOptions() { 
        return _options;
    }
    
    /**
     * How long we will wait blocked on a read() operation.  This is simply a
     * helper to query the I2PSocketOptions
     *
     * @return milliseconds to wait, or -1 if we will wait indefinitely
     */
    public long getReadTimeout() {
        return _options.getReadTimeout();
    }

    /**
     * Define how long we will wait blocked on a read() operation (-1 will make
     * the socket wait forever).  This is simply a helper to adjust the 
     * I2PSocketOptions
     *
     */
    public void setReadTimeout(long ms) {
        _options.setReadTimeout(ms);
        in.setReadTimeout(ms);
    }
    
    public void setSocketErrorListener(SocketErrorListener lsnr) {
        _socketErrorListener = lsnr;
    }
    
    void errorOccurred() {
        if (_socketErrorListener != null)
            _socketErrorListener.errorOccurred();
    }
    
    public long getBytesSent() { return _bytesWritten; }
    public long getBytesReceived() { return _bytesRead; }
    public long getCreatedOn() { return _createdOn; }
    public long getClosedOn() { return _closedOn; }
    
    
    private String getPrefix() { return "[" + _socketId + "]: "; }
    
    //--------------------------------------------------
    private class I2PInputStream extends InputStream {
        private String streamName;
        private ByteCollector bc = new ByteCollector();
        private boolean inStreamClosed = false;

        private long readTimeout = -1;

        public I2PInputStream(String name) {
            streamName = name;
        }
        
        public long getReadTimeout() {
            return readTimeout;
        }
        
        private String getStreamPrefix() { 
            return getPrefix() + streamName + ": ";
        }
        
        public void setReadTimeout(long ms) {
            readTimeout = ms;
        }
        
        public int read() throws IOException {
            byte[] b = new byte[1];
            int res = read(b);
            if (res == 1) return b[0] & 0xff;
            if (res == -1) return -1;
            throw new RuntimeException("Incorrect read() result");
        }

        public int read(byte[] b, int off, int len) throws IOException {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getStreamPrefix() + "Read called for " + len + " bytes (avail=" 
                           + bc.getCurrentSize() + "): " + this.hashCode());
            if (len == 0) return 0;
            long dieAfter = System.currentTimeMillis() + readTimeout;
            byte[] read = null;
            synchronized (bc) {
                read = bc.startToByteArray(len);
                bc.notifyAll();
            }
            boolean timedOut = false;

            while ( (read.length == 0) && (!inStreamClosed) ) {
                synchronized (flagLock) {
                    if (closed) {
                        if (_log.shouldLog(Log.DEBUG))
                            _log.debug(getStreamPrefix() + "Closed is set after reading " 
                                       + _bytesRead + " and writing " + _bytesWritten 
                                       + ", so closing stream: " + hashCode());
                        return -1;
                    }
                }
                try {
                    synchronized (I2PSocketImpl.I2PInputStream.this) {
                        if (readTimeout >= 0) {
                            wait(readTimeout);
                        } else {
                            wait();
                        }
                    }
                } catch (InterruptedException ex) {}

                if ((readTimeout >= 0)
                    && (System.currentTimeMillis() >= dieAfter)) {
                    throw new InterruptedIOException(getStreamPrefix() + "Timeout reading from I2PSocket (" 
                                                     + readTimeout + " msecs)");
                }

                synchronized (bc) {
                    read = bc.startToByteArray(len);
                    bc.notifyAll();
                }
            }
            if (read.length > len) throw new RuntimeException("BUG");
            if ( (inStreamClosed) && ( (read == null) || (read.length <= 0) ) )
                return -1;
            
            System.arraycopy(read, 0, b, off, read.length);

            if (_log.shouldLog(Log.DEBUG)) {
                _log.debug(getStreamPrefix() + "Read from I2PInputStream " + hashCode() + " returned " 
                           + read.length + " bytes");
            }
            //if (_log.shouldLog(Log.DEBUG)) {
            //  _log.debug("Read from I2PInputStream " + this.hashCode()
            //             + " returned "+read.length+" bytes:\n"
            //             + HexDump.dump(read));
            //}
            return read.length;
        }

        public int available() {
            synchronized (bc) {
                return bc.getCurrentSize();
            }
        }

        /**
         * Add the data to the queue
         *
         * @param allowBlock if true, we will block if the buffer and the socket options
         *                   say so, otherwise we simply take the data regardless.
         * @throws InterruptedIOException if the queue's buffer is full, the socket has
         *                                a write timeout, and that timeout is exceeded
         * @throws IOException if the connection was closed while queueing up the data
         */
        void queueData(byte[] data, boolean allowBlock) throws InterruptedIOException, IOException {
            queueData(data, 0, data.length, allowBlock);
        }

        /**
         * Add the data to the queue
         *
         * @param allowBlock if true, we will block if the buffer and the socket options
         *                   say so, otherwise we simply take the data regardless.
         * @throws InterruptedIOException if the queue's buffer is full, the socket has
         *                                a write timeout, and that timeout is exceeded
         * @throws IOException if the connection was closed while queueing up the data
         */
        public void queueData(byte[] data, int off, int len, boolean allowBlock) throws InterruptedIOException, IOException {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getStreamPrefix() + "Insert " + len + " bytes into queue: " + hashCode());
            Clock clock = I2PAppContext.getGlobalContext().clock();
            long endAfter = clock.now() + _options.getWriteTimeout();
            synchronized (bc) {
                if (allowBlock) {
                    if (_options.getMaxBufferSize() > 0) {
                        while (bc.getCurrentSize() > _options.getMaxBufferSize()) {
                            if (_log.shouldLog(Log.DEBUG))
                                _log.debug(getStreamPrefix() + "Buffer size exceeded: pending " 
                                           + bc.getCurrentSize() + " limit " + _options.getMaxBufferSize());
                            if (_options.getWriteTimeout() > 0) {
                                long timeLeft = endAfter - clock.now();
                                if (timeLeft <= 0) {
                                    long waited = _options.getWriteTimeout() - timeLeft;
                                    throw new InterruptedIOException(getStreamPrefix() + "Waited too long (" 
                                                                     + waited + "ms) to write " 
                                                                     + len + " with a buffer at " + bc.getCurrentSize());
                                }
                            }
                            if (inStreamClosed)
                                throw new IOException(getStreamPrefix() + "Stream closed while writing");
                            if (_closedOn > 0)
                                throw new IOException(getStreamPrefix() + "I2PSocket closed while writing");
                            try {
                                bc.wait(1000);
                            } catch (InterruptedException ie) {}
                        }
                    }
                }
                bc.append(data, off, len);
            }
            synchronized (I2PInputStream.this) {
                I2PInputStream.this.notifyAll();
            }
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode());
        }

        public void notifyClosed() {
            synchronized (I2PInputStream.this) {
                I2PInputStream.this.notifyAll();
            }
        }
        
        public void close() throws IOException {
            super.close();
            notifyClosed();
            synchronized (bc) {
                inStreamClosed = true;
                bc.notifyAll();
            }
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getStreamPrefix() + "After close");
        }

    }

    private class I2POutputStream extends OutputStream {

        public I2PInputStream sendTo;

        public I2POutputStream(I2PInputStream sendTo) {
            this.sendTo = sendTo;
        }

        public void write(int b) throws IOException {
            write(new byte[] { (byte) b});
        }

        public void write(byte[] b, int off, int len) throws IOException {
            _bytesWritten += len;
            sendTo.queueData(b, off, len, true);
        }

        public void close() {
            sendTo.notifyClosed();
        }
    }

    private static volatile long __runnerId = 0;
    private class I2PSocketRunner extends I2PThread {

        public InputStream in;

        public I2PSocketRunner(InputStream in) {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode());
            this.in = in;
            String peer = I2PSocketImpl.this.remote.calculateHash().toBase64();
            setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4));
            start();
        }
        
        /**
         * Pump some more data
         *
         * @return true if we should keep on handling, false otherwise
         */
        private boolean handleNextPacket(ByteCollector bc, byte buffer[]) 
                                         throws IOException, I2PSessionException {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket");
            int len = in.read(buffer);
            int bcsize = 0;
            synchronized (bc) {
                bcsize = bc.getCurrentSize();
            }

            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize);

            if (len != -1) {
                synchronized (bc) {
                    bc.append(buffer, len);
                }
            } else if (bcsize == 0) {
                // nothing left in the buffer, and read(..) got EOF (-1).
                // the bart the
                return false;
            }
            if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) {
                if (_log.shouldLog(Log.DEBUG))
                    _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode());

                try {
                    Thread.sleep(PACKET_DELAY);
                } catch (InterruptedException e) {
                    _log.warn("wtf", e);
                }
            }
            if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) {
                byte data[] = null;
                synchronized (bc) {
                    data = bc.startToByteArray(MAX_PACKET_SIZE);
                }
                if (data.length > 0) {
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length);
                    boolean sent = sendBlock(data);
                    if (!sent) {
                        if (_log.shouldLog(Log.WARN))
                            _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer.  Killing socket runner");
                        errorOccurred();
                        return false;
                    } else {
                        if (_log.shouldLog(Log.DEBUG))
                            _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer");
                    }
                }
            }
            return true;
        }

        public void run() {
            byte[] buffer = new byte[MAX_PACKET_SIZE];
            ByteCollector bc = new ByteCollector();
            boolean keepHandling = true;
            int packetsHandled = 0;
            try {
                //              try {
                while (keepHandling) {
                    keepHandling = handleNextPacket(bc, buffer);
                    packetsHandled++;
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug(getPrefix() + ":" + Thread.currentThread().getName() 
                                   + "Packets handled: " + packetsHandled);
                }
                if (_log.shouldLog(Log.INFO))
                    _log.info(getPrefix() + ":" + Thread.currentThread().getName() 
                               + "After handling packets, we're done.  Packets handled: " + packetsHandled);
                
                if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) {
                    if (_log.shouldLog(Log.WARN))
                        _log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: " 
                                  + in.hashCode() + "; "
                                  + "queue size: " + bc.getCurrentSize() + ")");
                }
                synchronized (flagLock) {
                    closed2 = true;
                }
                boolean sc;
                synchronized (flagLock) {
                    sc = sendClose;
                } // FIXME: Race here?
                if (sc) {
                    if (_log.shouldLog(Log.INFO))
                        _log.info(getPrefix() + ":" + Thread.currentThread().getName() 
                                  + "Sending close packet: (we started? " + outgoing 
                                  + ") after reading " + _bytesRead + " and writing " + _bytesWritten);
                    byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x02), remoteID, new byte[0]);
                    boolean sent = manager.getSession().sendMessage(remote, packet);
                    if (!sent) {
                        if (_log.shouldLog(Log.WARN))
                            _log.warn(getPrefix() + ":" + Thread.currentThread().getName() 
                                      + "Error sending close packet to peer");
                        errorOccurred();
                    }
                }
                manager.removeSocket(I2PSocketImpl.this);
                internalClose();
            } catch (InterruptedIOException ex) {
                _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex);
            } catch (IOException ex) {
                // WHOEVER removes this event on inconsistent
                // state before fixing the inconsistent state (a
                // reference on the socket in the socket manager
                // etc.) will get hanged by me personally -- mihi
                _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
            } catch (I2PException ex) {
                _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex);
            }
        }

        private boolean sendBlock(byte data[]) throws I2PSessionException {
            if (_log.shouldLog(Log.DEBUG))
                _log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode());
            if (remoteID == null) {
                _log.error(getPrefix() + "NULL REMOTEID");
                return false;
            }
            byte[] packet = I2PSocketManagerImpl.makePacket(getMask(0x00), remoteID, data);
            boolean sent;
            synchronized (flagLock) {
                if (closed2) return false;
            }
            sent = manager.getSession().sendMessage(remote, packet);
            return sent;
        }
    }
    
    public String toString() { return "" + hashCode(); }
}