diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index fee9fe667844baeadb7a05d96fec1207806515e7..595385cc4773980f6f87384691b35874db1d0925 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -70,6 +70,21 @@ public interface I2PSocket { public boolean isClosed(); public void setSocketErrorListener(SocketErrorListener lsnr); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + /** * Allow notification of underlying errors communicating across I2P without * waiting for any sort of cleanup process. For example, if some data could diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 98c0a2cbfc5a5f263b60e4f56a26e2837a9153db..05ff0631e51c4c83bfbc3e171b2d3d0b8f7d3fdb 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -7,6 +7,7 @@ import java.io.OutputStream; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Clock; @@ -301,6 +302,24 @@ class I2PSocketImpl implements I2PSocket { public long getCreatedOn() { return _createdOn; } public long getClosedOn() { return _closedOn; } + /** + * The remote port. + * @return 0 always + * @since 0.8.9 + */ + public int getPort() { + return I2PSession.PORT_UNSPECIFIED; + } + + /** + * The local port. + * @return 0 always + * @since 0.8.9 + */ + public int getLocalPort() { + return I2PSession.PORT_UNSPECIFIED; + } + private String getPrefix() { return "[" + _socketId + "]: "; } @@ -671,7 +690,7 @@ class I2PSocketImpl implements I2PSocket { return sent; } } - + @Override public String toString() { return "" + hashCode(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 34dc8ac593d0d1c33c9202d71a1eb0f7a354b5e8..57f5f7bb4de7f10dda001d6fdf56d84088cd1a25 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -90,7 +90,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream) { @@ -101,7 +101,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param opts I2CP options * @return the newly created socket manager, or null if there were errors */ @@ -114,7 +114,7 @@ public class I2PSocketManagerFactory { * stream and connected to the I2CP router on the specified machine on the given * port * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param i2cpHost I2CP host * @param i2cpPort I2CP port * @param opts I2CP options diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 94532e51b137e88884bfd063a81bf6b033e2d5f3..d926eb8313c41bc674f00a442bf15e5e8e59dd38 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -2,7 +2,7 @@ package net.i2p.client.streaming; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ public interface I2PSocketOptions { /** How much data will we accept that hasn't been written out yet. */ @@ -81,4 +81,32 @@ public interface I2PSocketOptions { * @param ms wait time to block on the output stream while waiting for the data to flush. */ public void setWriteTimeout(long ms); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index b1fedcea7955e70acaf5127eff67b0eb77273eef..cb66b1486ee8a6d9aedb819f83674e48b00fbb74 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -4,22 +4,32 @@ import java.util.Properties; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ class I2PSocketOptionsImpl implements I2PSocketOptions { private long _connectTimeout; private long _readTimeout; private long _writeTimeout; private int _maxBufferSize; + private int _localPort; + private int _remotePort; public static final int DEFAULT_BUFFER_SIZE = 1024*64; public static final int DEFAULT_WRITE_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public I2PSocketOptionsImpl() { this(System.getProperties()); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public I2PSocketOptionsImpl(I2PSocketOptions opts) { this(System.getProperties()); if (opts != null) { @@ -27,13 +37,25 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _readTimeout = opts.getReadTimeout(); _writeTimeout = opts.getWriteTimeout(); _maxBufferSize = opts.getMaxBufferSize(); + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); } } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public I2PSocketOptionsImpl(Properties opts) { init(opts); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public void setProperties(Properties opts) { if (opts == null) return; if (opts.containsKey(PROP_BUFFER_SIZE)) @@ -46,6 +68,10 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + */ protected void init(Properties opts) { _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); @@ -144,4 +170,40 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { public void setWriteTimeout(long ms) { _writeTimeout = ms; } + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port) { + _remotePort = port; + } + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 0eb8fe66ff2b9b68583194d4642ad6a5131fb16d..20105da7726307fcf24a4a725b3a7996eebd947e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -23,25 +23,25 @@ import net.i2p.util.SimpleTimer2; * */ class Connection { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _connectionManager; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _connectionManager; private Destination _remotePeer; private long _sendStreamId; private long _receiveStreamId; private long _lastSendTime; - private AtomicLong _lastSendId; + private final AtomicLong _lastSendId; private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; private boolean _connected; private boolean _hardDisconnected; - private MessageInputStream _inputStream; - private MessageOutputStream _outputStream; - private SchedulerChooser _chooser; + private final MessageInputStream _inputStream; + private final MessageOutputStream _outputStream; + private final SchedulerChooser _chooser; private long _nextSendTime; private long _ackedPackets; - private long _createdOn; + private final long _createdOn; private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; @@ -51,10 +51,10 @@ class Connection { private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private final Map<Long, PacketLocal> _outboundPackets; - private PacketQueue _outboundQueue; - private ConnectionPacketHandler _handler; + private final PacketQueue _outboundQueue; + private final ConnectionPacketHandler _handler; private ConnectionOptions _options; - private ConnectionDataReceiver _receiver; + private final ConnectionDataReceiver _receiver; private I2PSocketFull _socket; /** set to an error cause if the connection could not be established */ private String _connectionError; @@ -70,8 +70,10 @@ class Connection { private final Object _connectLock; /** how many messages have been resent and not yet ACKed? */ private int _activeResends; - private ConEvent _connectionEvent; - private int _randomWait; + private final ConEvent _connectionEvent; + private final int _randomWait; + private int _localPort; + private int _remotePort; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -86,10 +88,13 @@ class Connection { public static final int MAX_WINDOW_SIZE = 128; - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); } - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { + + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { _context = ctx; _connectionManager = manager; _chooser = chooser; @@ -101,34 +106,31 @@ class Connection { // FIXME pass through a passive flush delay setting as the 4th arg _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _outboundPackets = new TreeMap(); + if (opts != null) { + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); + } _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); _lastSendId = new AtomicLong(-1); _nextSendTime = -1; - _ackedPackets = 0; _createdOn = _context.clock().now(); _closeSentOn = -1; _closeReceivedOn = -1; - _unackedPacketsReceived = 0; _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; - _resetReceived = false; _connected = true; _disconnectScheduledOn = -1; _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; _connectLock = new Object(); - _activeResends = 0; _resetSentOn = -1; - _isInbound = false; - _updatedShareOpts = false; _connectionEvent = new ConEvent(); - _hardDisconnected = false; _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -678,6 +680,23 @@ class Connection { public I2PSocketFull getSocket() { return _socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; } + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } @@ -781,7 +800,7 @@ class Connection { } public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; } - + void congestionOccurred() { // if we hit congestion and e.g. 5 packets are resent, // dont set the size to (winSize >> 4). only set the @@ -962,12 +981,13 @@ class Connection { * @return the inbound message stream */ public MessageInputStream getInputStream() { return _inputStream; } + /** stream that the local peer sends data to the remote peer on * @return the outbound message stream */ public MessageOutputStream getOutputStream() { return _outputStream; } - - @Override + + @Override public String toString() { StringBuilder buf = new StringBuilder(128); buf.append("[Connection "); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 07247e670e9abb2441c2a9ae30c14a3a5ca5a2d8..900fb96267c0a4d191436803eb06a5aec0dc1d84 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -13,11 +13,14 @@ import net.i2p.util.Log; * */ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); + /** + * @param con non-null + */ public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); @@ -41,10 +44,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return !flush */ public boolean writeInProcess() { - Connection con = _connection; - if (con != null) - return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize(); - return false; + return _connection.getUnackedPacketsSent() >= _connection.getOptions().getWindowSize(); } /** @@ -60,7 +60,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { Connection con = _connection; - if (con == null) return _dummyStatus; + //if (con == null) return _dummyStatus; boolean doSend = true; if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if (con.getOutputStream().getClosed()) { @@ -121,7 +121,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { Connection con = _connection; - if (con == null) return null; + //if (con == null) return null; long before = System.currentTimeMillis(); PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); long built = System.currentTimeMillis(); @@ -185,6 +185,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); + packet.setLocalPort(con.getLocalPort()); + packet.setRemotePort(con.getPort()); } if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { packet.setFlag(Packet.FLAG_NO_ACK); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 6ba876dd92c4a46e3c2a4d9cb0c1226303830f03..e2c9d3f556aae3e6b7ce58059c759c69c8ece3df 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -18,10 +18,10 @@ import net.i2p.util.SimpleTimer; * @author zzz modded to use concurrent and bound queue size */ class ConnectionHandler { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _manager; - private LinkedBlockingQueue<Packet> _synQueue; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _manager; + private final LinkedBlockingQueue<Packet> _synQueue; private boolean _active; private int _acceptTimeout; @@ -41,7 +41,6 @@ class ConnectionHandler { _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE); - _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 10ff38856435ed20d8da256bcf6b3b67028b5c12..95a192367706a45a38444d49f89a367711730881 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,24 +21,24 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionManager { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private MessageHandler _messageHandler; - private PacketHandler _packetHandler; - private ConnectionHandler _connectionHandler; - private PacketQueue _outboundQueue; - private SchedulerChooser _schedulerChooser; - private ConnectionPacketHandler _conPacketHandler; - private TCBShare _tcbShare; + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final MessageHandler _messageHandler; + private final PacketHandler _packetHandler; + private final ConnectionHandler _connectionHandler; + private final PacketQueue _outboundQueue; + private final SchedulerChooser _schedulerChooser; + private final ConnectionPacketHandler _conPacketHandler; + private final TCBShare _tcbShare; /** Inbound stream ID (Long) to Connection map */ - private ConcurrentHashMap<Long, Connection> _connectionByInboundId; + private final ConcurrentHashMap<Long, Connection> _connectionByInboundId; /** Ping ID (Long) to PingRequest */ private final Map<Long, PingRequest> _pendingPings; private boolean _allowIncoming; private boolean _throttlersInitialized; private int _maxConcurrentStreams; - private ConnectionOptions _defaultOptions; + private final ConnectionOptions _defaultOptions; private volatile int _numWaiting; private long _soTimeout; private ConnThrottler _minuteThrottler; @@ -59,10 +59,12 @@ class ConnectionManager { _schedulerChooser = new SchedulerChooser(_context); _conPacketHandler = new ConnectionPacketHandler(_context); _tcbShare = new TCBShare(_context); - _session.setSessionListener(_messageHandler); + // PROTO_ANY is for backward compatibility (pre-0.7.1) + // TODO change proto to PROTO_STREAMING someday. + // Right now we get everything, and rely on Datagram to specify PROTO_UDP. + // PacketQueue has sent PROTO_STREAMING since the beginning of mux support (0.7.1) + _session.addMuxedSessionListener(_messageHandler, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); _outboundQueue = new PacketQueue(_context, _session, this); - _allowIncoming = false; - _numWaiting = 0; /** Socket timeout for accept() */ _soTimeout = -1; @@ -141,7 +143,10 @@ class ConnectionManager { * it, or null if the syn's streamId was already taken */ public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + ConnectionOptions opts = new ConnectionOptions(_defaultOptions); + opts.setPort(synPacket.getRemotePort()); + opts.setLocalPort(synPacket.getLocalPort()); + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); _tcbShare.updateOptsFromShare(con); con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index ae14daa15c13baa3d76c14a39c89ed42b226ee3e..e6ec6fa423b81e0118f1bfba48de6d42237de930 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -106,6 +106,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * This is based on documentation, the code, and logging, however there are still * some parts that could use more research. * + *<pre> * 1024 Tunnel Message * - 21 Header (see router/tunnel/BatchedPreprocessor.java) * ----- @@ -169,7 +170,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * Similarly: * 3 msgs: 2722 * 4 msgs: 3714 - * + *</pre> * * Before release 0.6.1.14 this was 4096. * From release 0.6.1.14 through release 0.6.4, this was 960. @@ -205,18 +206,35 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730; public static final int MIN_MESSAGE_SIZE = 512; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public ConnectionOptions() { super(); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public ConnectionOptions(Properties opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(I2PSocketOptions opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(ConnectionOptions opts) { super(opts); if (opts != null) { @@ -235,8 +253,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setInboundBufferSize(opts.getInboundBufferSize()); setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); - setWriteTimeout(opts.getWriteTimeout()); - setReadTimeout(opts.getReadTimeout()); + // handled in super() + // not clear why added by jr 12/22/2005 + //setWriteTimeout(opts.getWriteTimeout()); + //setReadTimeout(opts.getReadTimeout()); setAnswerPings(opts.getAnswerPings()); initLists(opts); _maxConnsPerMinute = opts.getMaxConnsPerMinute(); @@ -248,7 +268,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { } } - @Override + @Override protected void init(Properties opts) { super.init(opts); _trend = new int[TREND_COUNT]; @@ -262,12 +282,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); initLists(opts); @@ -279,7 +301,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); } - @Override + @Override public void setProperties(Properties opts) { super.setProperties(opts); if (opts == null) return; @@ -303,8 +325,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); if (opts.containsKey(PROP_MAX_RESENDS)) setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - if (opts.containsKey(PROP_WRITE_TIMEOUT)) - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //if (opts.containsKey(PROP_WRITE_TIMEOUT)) + // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) @@ -316,6 +339,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); if (opts.containsKey(PROP_CONNECT_TIMEOUT)) // wow 5 minutes!!! FIXME!! + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); if (opts.containsKey(PROP_ANSWER_PINGS)) setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 526fae3722e18e2001f56be3fc3e15a32a721f28..53ebb17e1b9a92c631a8b8a3b0ca326063c0e1d4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionPacketHandler { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; public ConnectionPacketHandler(I2PAppContext context) { _context = context; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 71e1dd3ac7e172e2ad5238e206006105939459ed..f40dbd0c7929045f06f2c1402d79017dd1a9a049 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -8,7 +8,7 @@ import net.i2p.I2PException; * */ class I2PServerSocketFull implements I2PServerSocket { - private I2PSocketManagerFull _socketManager; + private final I2PSocketManagerFull _socketManager; public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index f8dbe74ea691d79241461b2dac53569a94bffebd..5cd76a864f376d414e40705f4ba4c0bcfe8e4795 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import net.i2p.client.I2PSession; import net.i2p.data.Destination; /** @@ -127,7 +128,28 @@ class I2PSocketFull implements I2PSocket { if (c != null) c.disconnectComplete(); } - @Override + + /** + * The remote port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getPort(); + } + + /** + * The local port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getLocalPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getLocalPort(); + } + + @Override public String toString() { Connection c = _connection; if (c == null) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 632b904b74a598993a73ae31c6b94e8d31e9b961..d9ca691b4de64068cf5b8445cf2809f65374eea4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -7,7 +7,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; -import net.i2p.client.I2PSessionListener; +import net.i2p.client.I2PSessionMuxedListener; import net.i2p.util.Log; /** @@ -15,10 +15,10 @@ import net.i2p.util.Log; * Packets, if we can. * */ -class MessageHandler implements I2PSessionListener { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; +class MessageHandler implements I2PSessionMuxedListener { + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; private final Set<I2PSocketManager.DisconnectListener> _listeners; public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { @@ -31,11 +31,23 @@ class MessageHandler implements I2PSessionListener { /** Instruct the client that the given session has received a message with * size # of bytes. + * This shouldn't be called anymore since we are registering as a muxed listener. * @param session session to notify * @param msgId message number available * @param size size of the message */ public void messageAvailable(I2PSession session, int msgId, long size) { + messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED, + I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + } + + /** Instruct the client that the given session has received a message with + * size # of bytes. + * @param session session to notify + * @param msgId message number available + * @param size size of the message + */ + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { byte data[] = null; try { data = session.receiveMessage(msgId); @@ -49,6 +61,8 @@ class MessageHandler implements I2PSessionListener { Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); + packet.setRemotePort(fromPort); + packet.setLocalPort(toPort); _manager.getPacketHandler().receivePacket(packet); } catch (IllegalArgumentException iae) { _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 71c9ebce9f99a398915f3bb4743d7564e6d5dce5..737e0f7b2de97a5048c955127290751a197440c6 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -63,14 +63,10 @@ class MessageOutputStream extends OutputStream { _buf = _dataCache.acquire().getData(); // new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); - _written = 0; - _closed = false; _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; _sendPeriodBeginTime = ctx.clock().now(); - _sendPeriodBytes = 0; - _sendBps = 0; _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(); if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index faff2ff722c4034682499caeb0a814ef9cea062a..5f25c3cd961177be94717205ce19c74428727752 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -13,6 +13,13 @@ import net.i2p.data.SigningPrivateKey; import net.i2p.util.Log; /** + * This contains solely the data that goes out on the wire, + * including the local and remote port which is embedded in + * the I2CP overhead, not in the packet itself. + * For local state saved for outbound packets, see PacketLocal. + * + * <p> + * * Contain a single packet transferred as part of a streaming connection. * The data format is as follows:<ul> * <li>{@link #getSendStreamId sendStreamId} [4 byte value]</li> @@ -67,6 +74,8 @@ class Packet { private Destination _optionFrom; private int _optionDelay; private int _optionMaxSize; + private int _localPort; + private int _remotePort; /** * The receiveStreamId will be set to this when the packet doesn't know @@ -148,6 +157,10 @@ class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; protected static final int MAX_DELAY_REQUEST = 65535; + /** + * Does no initialization. + * See readPacket() for inbound packets, and the setters for outbound packets. + */ public Packet() { } private boolean _sendStreamIdSet = false; @@ -316,6 +329,40 @@ class Packet { _optionMaxSize = numBytes; } + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getRemotePort() { + return _remotePort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setRemotePort(int port) { + _remotePort = port; + } + /** * Write the packet to the buffer (starting at the offset) and return * the number of bytes written. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 4f977712c387ccd2d42cbd4f286f2f3401c999e4..ef145179c3cf368a854ef187e5d02f26fdd3370c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -16,9 +16,9 @@ import net.i2p.util.Log; * */ class PacketHandler { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; //private int _lastDelay; //private int _dropped; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index dd5fe1ceb4c5b63baab4a1ac99a42fb53ac68300..ca2e25d42da626c1fb92aaf319cb677310da91a5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -13,13 +13,13 @@ import net.i2p.util.SimpleTimer2; * retries, etc. */ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private Destination _to; private SessionKey _keyUsed; private Set _tagsSent; - private long _createdOn; + private final long _createdOn; private int _numSends; private long _lastSend; private long _acceptedOn; @@ -29,9 +29,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private volatile boolean _retransmitted; private SimpleTimer2.TimedEvent _resendEvent; + /** not bound to a connection */ public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); } + public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { _context = ctx; _createdOn = ctx.clock().now(); @@ -40,8 +42,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _connection = con; _lastSend = -1; _cancelledOn = -1; - _nackCount = 0; - _retransmitted = false; } public Destination getTo() { return _to; } @@ -138,6 +138,8 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } + + /** @return null if not bound */ public Connection getConnection() { return _connection; } public void incrementNACKs() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index fa0aa87ce71dfc2f8f59b8ace23487f78c81d334..4de4c6e1633bf4631acc408ded91549a744d9477 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -19,11 +19,11 @@ import net.i2p.util.Log; * */ class PacketQueue { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private ConnectionManager _connectionManager; - private ByteCache _cache = ByteCache.getInstance(64, 36*1024); + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final ConnectionManager _connectionManager; + private final ByteCache _cache = ByteCache.getInstance(64, 36*1024); public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; @@ -98,7 +98,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); else // I2PSessionImpl2 //sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0); @@ -107,7 +107,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); end = _context.clock().now(); if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java index 6ba78bfd49eccb3341cd5ebf96da7c2d1fe32717..b0ffbf250443545e5b679cde7d35b219ae78a706 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java @@ -107,11 +107,11 @@ class StandardSocket extends Socket { } /** - * @return -1 always + * @return the port or 0 if unknown */ @Override public int getLocalPort() { - return -1; + return _socket.getLocalPort(); } /** @@ -139,11 +139,11 @@ class StandardSocket extends Socket { } /** - * @return 0 always + * @return the port or 0 if unknown */ @Override public int getPort() { - return 0; + return _socket.getPort(); } @Override diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index d2d02021a07aac982d7ba04b5445c3f0714131d4..d33e7a741e6749f91a6a90c5ecb2eb4331cbea30 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -21,10 +21,10 @@ import net.i2p.util.SimpleTimer2; * */ class TCBShare { - private I2PAppContext _context; - private Log _log; - private Map<Destination, Entry> _cache; - private CleanEvent _cleaner; + private final I2PAppContext _context; + private final Log _log; + private final Map<Destination, Entry> _cache; + private final CleanEvent _cleaner; private static final long EXPIRE_TIME = 30*60*1000; private static final long CLEAN_TIME = 10*60*1000;