I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
TunnelGateway.java 9.92 KiB
package net.i2p.router.tunnel;

import java.util.ArrayList;
import java.util.List;

import net.i2p.data.Hash;
import net.i2p.data.TunnelId;
import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.TunnelGatewayMessage;
import net.i2p.router.Router;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;

/**
 * Serve as the gatekeeper for a tunnel, accepting messages, coallescing and/or
 * fragmenting them before wrapping them up for tunnel delivery. The flow here
 * is: <ol>
 * <li>add an I2NPMessage (and a target tunnel/router, if necessary)</li>
 * <li>that message is queued up into a TunnelGateway.Pending and offered to the
 *     assigned QueuePreprocessor.</li>
 * <li>that QueuePreprocessor may then take off any of the TunnelGateway.Pending
 *     messages or instruct the TunnelGateway to offer it the messages again in
 *     a short while (in an attempt to coallesce them).
 * <li>when the QueueProcessor accepts a TunnelGateway.Pending, it preprocesses
 *     it into fragments, forwarding each preprocessed fragment group through 
 *     the Sender.</li>
 * <li>the Sender then encrypts the preprocessed data and delivers it to the 
 *     Receiver.</li>
 * <li>the Receiver now has the encrypted message and may do with it as it 
 *     pleases (e.g. wrap it as necessary and enqueue it onto the OutNetMessagePool,
 *     or if debugging, verify that it can be decrypted properly)</li>
 * </ol>
 *
 * Unused directly - see PumpedTunnelGateway, ThrottledPumpedTunnelGateway, and TunnelGatewayZeroHop overrides.
 */
class TunnelGateway {
    protected final RouterContext _context;
    protected final Log _log;
    protected final List<PendingGatewayMessage> _queue;
    protected final QueuePreprocessor _preprocessor;
    protected final Sender _sender;
    protected final Receiver _receiver;
    protected long _lastFlush;
    //protected int _flushFrequency;
    protected final DelayedFlush _delayedFlush;// FIXME Exporting non-public type through public API FIXME
    protected int _messagesSent;
    
    /**
     * @param preprocessor this pulls Pending messages off a list, builds some
     *                     full preprocessed messages, and pumps those into the sender
     * @param sender this takes a preprocessed message, encrypts it, and sends it to 
     *               the receiver
     * @param receiver this receives the encrypted message and forwards it off 
     *                 to the first hop
     */
    protected TunnelGateway(RouterContext context, QueuePreprocessor preprocessor, Sender sender, Receiver receiver) {
        _context = context;
        _log = context.logManager().getLog(getClass());
        _queue = new ArrayList(4);
        _preprocessor = preprocessor;
        _sender = sender;
        _receiver = receiver;
        //_flushFrequency = 500;
        _delayedFlush = new DelayedFlush();
        _lastFlush = _context.clock().now();
        //_context.statManager().createRateStat("tunnel.lockedGatewayAdd", "How long do we block when adding a message to a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
        //_context.statManager().createRateStat("tunnel.lockedGatewayCheck", "How long do we block when flushing a tunnel gateway's queue", "Tunnels", new long[] { 60*1000, 10*60*1000 });
    }
    
    /**
     * Add a message to be sent down the tunnel, where we are the inbound gateway.
     *
     * @param msg message received to be sent through the tunnel
     */
    public void add(TunnelGatewayMessage msg) {
        add(msg.getMessage(), null, null);
    }
    
    /**
     * Add a message to be sent down the tunnel, either sending it now (perhaps
     * coallesced with other pending messages) or after a brief pause (_flushFrequency).
     * If it is queued up past its expiration, it is silently dropped
     *
     * UNUSED - see overrides
     *
     * @param msg message to be sent through the tunnel
     * @param toRouter router to send to after the endpoint (or null for endpoint processing)
     * @param toTunnel tunnel to send to after the endpoint (or null for endpoint or router processing)
     */
    public void add(I2NPMessage msg, Hash toRouter, TunnelId toTunnel) {
        throw new UnsupportedOperationException("unused, right?");
/****
        _messagesSent++;
        long startAdd = System.currentTimeMillis();
        boolean delayedFlush = false;
        long delayAmount = -1;
        int remaining = 0;
        Pending cur = new PendingImpl(msg, toRouter, toTunnel);
        long beforeLock = System.currentTimeMillis();
        long afterAdded = -1;
        long afterPreprocess = 0;
        long afterExpire = 0;
        synchronized (_queue) {
            _queue.add(cur);
            afterAdded = System.currentTimeMillis();
            if (_log.shouldLog(Log.DEBUG))
                _log.debug("Added before direct flush preprocessing: " + _queue);
            delayedFlush = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
            afterPreprocess = System.currentTimeMillis();
            if (delayedFlush)
                delayAmount = _preprocessor.getDelayAmount();
            _lastFlush = _context.clock().now();
            
            // expire any as necessary, even if its framented
            for (int i = 0; i < _queue.size(); i++) {
                Pending m = _queue.get(i);
                if (m.getExpiration() + Router.CLOCK_FUDGE_FACTOR < _lastFlush) {
                    if (_log.shouldLog(Log.DEBUG))
                        _log.debug("Expire on the queue (size=" + _queue.size() + "): " + m);
                    _queue.remove(i);
                    i--;
                }
            }
            afterExpire = System.currentTimeMillis();
            remaining = _queue.size();
            if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
                _log.debug("Remaining after preprocessing: " + _queue);
        }
        
        if (delayedFlush) {
            _delayedFlush.reschedule(delayAmount);
        }
        _context.statManager().addRateData("tunnel.lockedGatewayAdd", afterAdded-beforeLock, remaining);
        if (_log.shouldLog(Log.DEBUG)) {
            long complete = System.currentTimeMillis();
            _log.debug("Time to add the message " + msg.getUniqueId() + ": " + (complete-startAdd)
                       + " delayed? " + delayedFlush + " remaining: " + remaining
                       + " prepare: " + (beforeLock-startAdd)
                       + " add: " + (afterAdded-beforeLock)
                       + " preprocess: " + (afterPreprocess-afterAdded)
                       + " expire: " + (afterExpire-afterPreprocess)
                       + " queue flush: " + (complete-afterExpire));
        }
****/
    }
    
    public int getMessagesSent() { return _messagesSent; }
    
    public interface Sender {
        /**
         * Take the preprocessed data containing zero or more fragments, encrypt
         * it, and pass it on to the receiver
         *
         * @param preprocessed IV + (rand padding) + 0x0 + Hash[0:3] + {instruction+fragment}*
         * @return message ID it was sent in, or -1 if it was deferred
         */
        public long sendPreprocessed(byte preprocessed[], Receiver receiver);
    }
        
    public interface QueuePreprocessor {
        /** 
         * Caller must synchronize on the list!
         *
         * @param pending list of Pending objects for messages either unsent
         *                or partly sent.  This list should be update with any
         *                values removed (the preprocessor owns the lock)
         *                Messages are not removed from the list until actually sent.
         *                The status of unsent and partially-sent messages is stored in
         *                the Pending structure.
         *
         * @return true if we should delay before preprocessing again 
         */
        public boolean preprocessQueue(List<PendingGatewayMessage> pending, Sender sender, Receiver receiver);
        
        /** how long do we want to wait before flushing */
        public long getDelayAmount();
    }
    
    public interface Receiver {
        /**
         * Take the encrypted data and send it off to the next hop
         * @return message ID it was sent in, or -1 if it had to be deferred
         */
        public long receiveEncrypted(byte encrypted[]);
    }

    protected class DelayedFlush extends SimpleTimer2.TimedEvent {
    	DelayedFlush() {
            super(_context.simpleTimer2());
    	}

        public void timeReached() {
            boolean wantRequeue = false;
            //int remaining = 0;
            //long beforeLock = _context.clock().now();
            //long afterChecked = -1;
            long delayAmount = -1;
            //if (_queue.size() > 10000) // stay out of the synchronized block
            //    System.out.println("foo!");
            synchronized (_queue) {
                //if (_queue.size() > 10000) // stay in the synchronized block
                //    System.out.println("foo!");
                //afterChecked = _context.clock().now();
                if (!_queue.isEmpty()) {
                    //if ( (remaining > 0) && (_log.shouldLog(Log.DEBUG)) )
                    //    _log.debug("Remaining before delayed flush preprocessing: " + _queue);
                    wantRequeue = _preprocessor.preprocessQueue(_queue, _sender, _receiver);
                    if (wantRequeue) {
                        delayAmount = _preprocessor.getDelayAmount();
                        if (_log.shouldLog(Log.DEBUG))
                            _log.debug("Remaining after delayed flush preprocessing: " + _queue);
                    }
                }
                //remaining = _queue.size();
            }
            
            if (wantRequeue)
                schedule(delayAmount);
            else
                _lastFlush = _context.clock().now();
            
            //_context.statManager().addRateData("tunnel.lockedGatewayCheck", afterChecked-beforeLock, remaining);
        }
    }
}