From 9f433b2e6bafefab2cc277d9cea8c446ef9717a4 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Thu, 14 Jul 2011 18:53:10 +0000 Subject: [PATCH] * Streaming: - Hook I2CP ports through to I2PSocket - Javadocs, init cleanups, final --- .../net/i2p/client/streaming/I2PSocket.java | 15 ++++ .../i2p/client/streaming/I2PSocketImpl.java | 21 +++++- .../streaming/I2PSocketManagerFactory.java | 6 +- .../client/streaming/I2PSocketOptions.java | 30 +++++++- .../streaming/I2PSocketOptionsImpl.java | 64 ++++++++++++++++- .../net/i2p/client/streaming/Connection.java | 70 ++++++++++++------- .../streaming/ConnectionDataReceiver.java | 20 +++--- .../client/streaming/ConnectionHandler.java | 9 ++- .../client/streaming/ConnectionManager.java | 37 +++++----- .../client/streaming/ConnectionOptions.java | 40 ++++++++--- .../streaming/ConnectionPacketHandler.java | 4 +- .../client/streaming/I2PServerSocketFull.java | 2 +- .../i2p/client/streaming/I2PSocketFull.java | 24 ++++++- .../i2p/client/streaming/MessageHandler.java | 24 +++++-- .../client/streaming/MessageOutputStream.java | 4 -- .../src/net/i2p/client/streaming/Packet.java | 47 +++++++++++++ .../i2p/client/streaming/PacketHandler.java | 6 +- .../net/i2p/client/streaming/PacketLocal.java | 14 ++-- .../net/i2p/client/streaming/PacketQueue.java | 14 ++-- .../i2p/client/streaming/StandardSocket.java | 8 +-- .../net/i2p/client/streaming/TCBShare.java | 8 +-- 21 files changed, 361 insertions(+), 106 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index fee9fe6678..595385cc47 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -70,6 +70,21 @@ public interface I2PSocket { public boolean isClosed(); public void setSocketErrorListener(SocketErrorListener lsnr); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + /** * Allow notification of underlying errors communicating across I2P without * waiting for any sort of cleanup process. For example, if some data could diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 98c0a2cbfc..05ff0631e5 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -7,6 +7,7 @@ import java.io.OutputStream; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Clock; @@ -301,6 +302,24 @@ class I2PSocketImpl implements I2PSocket { public long getCreatedOn() { return _createdOn; } public long getClosedOn() { return _closedOn; } + /** + * The remote port. + * @return 0 always + * @since 0.8.9 + */ + public int getPort() { + return I2PSession.PORT_UNSPECIFIED; + } + + /** + * The local port. + * @return 0 always + * @since 0.8.9 + */ + public int getLocalPort() { + return I2PSession.PORT_UNSPECIFIED; + } + private String getPrefix() { return "[" + _socketId + "]: "; } @@ -671,7 +690,7 @@ class I2PSocketImpl implements I2PSocket { return sent; } } - + @Override public String toString() { return "" + hashCode(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 34dc8ac593..57f5f7bb4d 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -90,7 +90,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(InputStream myPrivateKeyStream) { @@ -101,7 +101,7 @@ public class I2PSocketManagerFactory { * Create a socket manager using the destination loaded from the given private key * stream and connected to the default I2CP host and port. * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param opts I2CP options * @return the newly created socket manager, or null if there were errors */ @@ -114,7 +114,7 @@ public class I2PSocketManagerFactory { * stream and connected to the I2CP router on the specified machine on the given * port * - * @param myPrivateKeyStream private key stream + * @param myPrivateKeyStream private key stream, format is specified in {@link net.i2p.data.PrivateKeyFile PrivateKeyFile} * @param i2cpHost I2CP host * @param i2cpPort I2CP port * @param opts I2CP options diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 94532e51b1..d926eb8313 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -2,7 +2,7 @@ package net.i2p.client.streaming; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ public interface I2PSocketOptions { /** How much data will we accept that hasn't been written out yet. */ @@ -81,4 +81,32 @@ public interface I2PSocketOptions { * @param ms wait time to block on the output stream while waiting for the data to flush. */ public void setWriteTimeout(long ms); + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort(); + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port); + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort(); + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port); } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java index b1fedcea79..cb66b1486e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptionsImpl.java @@ -4,22 +4,32 @@ import java.util.Properties; /** * Define the configuration for streaming and verifying data on the socket. - * + * Use I2PSocketManager.buildOptions() to get one of these. */ class I2PSocketOptionsImpl implements I2PSocketOptions { private long _connectTimeout; private long _readTimeout; private long _writeTimeout; private int _maxBufferSize; + private int _localPort; + private int _remotePort; public static final int DEFAULT_BUFFER_SIZE = 1024*64; public static final int DEFAULT_WRITE_TIMEOUT = -1; public static final int DEFAULT_CONNECT_TIMEOUT = 60*1000; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public I2PSocketOptionsImpl() { this(System.getProperties()); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public I2PSocketOptionsImpl(I2PSocketOptions opts) { this(System.getProperties()); if (opts != null) { @@ -27,13 +37,25 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _readTimeout = opts.getReadTimeout(); _writeTimeout = opts.getWriteTimeout(); _maxBufferSize = opts.getMaxBufferSize(); + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); } } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public I2PSocketOptionsImpl(Properties opts) { init(opts); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public void setProperties(Properties opts) { if (opts == null) return; if (opts.containsKey(PROP_BUFFER_SIZE)) @@ -46,6 +68,10 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { _writeTimeout = getInt(opts, PROP_WRITE_TIMEOUT, DEFAULT_WRITE_TIMEOUT); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + */ protected void init(Properties opts) { _maxBufferSize = getInt(opts, PROP_BUFFER_SIZE, DEFAULT_BUFFER_SIZE); _connectTimeout = getInt(opts, PROP_CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); @@ -144,4 +170,40 @@ class I2PSocketOptionsImpl implements I2PSocketOptions { public void setWriteTimeout(long ms) { _writeTimeout = ms; } + + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * The remote port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setPort(int port) { + _remotePort = port; + } + + /** + * The local port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * The local port. + * @param port 0 - 65535 + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 0eb8fe66ff..20105da772 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -23,25 +23,25 @@ import net.i2p.util.SimpleTimer2; * */ class Connection { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _connectionManager; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _connectionManager; private Destination _remotePeer; private long _sendStreamId; private long _receiveStreamId; private long _lastSendTime; - private AtomicLong _lastSendId; + private final AtomicLong _lastSendId; private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; private boolean _connected; private boolean _hardDisconnected; - private MessageInputStream _inputStream; - private MessageOutputStream _outputStream; - private SchedulerChooser _chooser; + private final MessageInputStream _inputStream; + private final MessageOutputStream _outputStream; + private final SchedulerChooser _chooser; private long _nextSendTime; private long _ackedPackets; - private long _createdOn; + private final long _createdOn; private long _closeSentOn; private long _closeReceivedOn; private int _unackedPacketsReceived; @@ -51,10 +51,10 @@ class Connection { private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ private final Map<Long, PacketLocal> _outboundPackets; - private PacketQueue _outboundQueue; - private ConnectionPacketHandler _handler; + private final PacketQueue _outboundQueue; + private final ConnectionPacketHandler _handler; private ConnectionOptions _options; - private ConnectionDataReceiver _receiver; + private final ConnectionDataReceiver _receiver; private I2PSocketFull _socket; /** set to an error cause if the connection could not be established */ private String _connectionError; @@ -70,8 +70,10 @@ class Connection { private final Object _connectLock; /** how many messages have been resent and not yet ACKed? */ private int _activeResends; - private ConEvent _connectionEvent; - private int _randomWait; + private final ConEvent _connectionEvent; + private final int _randomWait; + private int _localPort; + private int _remotePort; private long _lifetimeBytesSent; private long _lifetimeBytesReceived; @@ -86,10 +88,13 @@ class Connection { public static final int MAX_WINDOW_SIZE = 128; - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler) { this(ctx, manager, chooser, queue, handler, null); } - public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { + + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, + PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { _context = ctx; _connectionManager = manager; _chooser = chooser; @@ -101,34 +106,31 @@ class Connection { // FIXME pass through a passive flush delay setting as the 4th arg _outputStream = new MessageOutputStream(_context, _receiver, (opts == null ? Packet.MAX_PAYLOAD_SIZE : opts.getMaxMessageSize())); _outboundPackets = new TreeMap(); + if (opts != null) { + _localPort = opts.getLocalPort(); + _remotePort = opts.getPort(); + } _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); _lastSendId = new AtomicLong(-1); _nextSendTime = -1; - _ackedPackets = 0; _createdOn = _context.clock().now(); _closeSentOn = -1; _closeReceivedOn = -1; - _unackedPacketsReceived = 0; _congestionWindowEnd = _options.getWindowSize()-1; _highestAckedThrough = -1; _lastCongestionSeenAt = MAX_WINDOW_SIZE*2; // lets allow it to grow _lastCongestionTime = -1; _lastCongestionHighestUnacked = -1; - _resetReceived = false; _connected = true; _disconnectScheduledOn = -1; _lastReceivedOn = -1; _activityTimer = new ActivityTimer(); _ackSinceCongestion = true; _connectLock = new Object(); - _activeResends = 0; _resetSentOn = -1; - _isInbound = false; - _updatedShareOpts = false; _connectionEvent = new ConEvent(); - _hardDisconnected = false; _randomWait = _context.random().nextInt(10*1000); // just do this once to reduce usage _context.statManager().createRateStat("stream.con.windowSizeAtCongestion", "How large was our send window when we send a dup?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("stream.chokeSizeBegin", "How many messages were outstanding when we started to choke?", "Stream", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); @@ -678,6 +680,23 @@ class Connection { public I2PSocketFull getSocket() { return _socket; } public void setSocket(I2PSocketFull socket) { _socket = socket; } + /** + * The remote port. + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getPort() { + return _remotePort; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + public String getConnectionError() { return _connectionError; } public void setConnectionError(String err) { _connectionError = err; } @@ -781,7 +800,7 @@ class Connection { } public int getLastCongestionSeenAt() { return _lastCongestionSeenAt; } - + void congestionOccurred() { // if we hit congestion and e.g. 5 packets are resent, // dont set the size to (winSize >> 4). only set the @@ -962,12 +981,13 @@ class Connection { * @return the inbound message stream */ public MessageInputStream getInputStream() { return _inputStream; } + /** stream that the local peer sends data to the remote peer on * @return the outbound message stream */ public MessageOutputStream getOutputStream() { return _outputStream; } - - @Override + + @Override public String toString() { StringBuilder buf = new StringBuilder(128); buf.append("[Connection "); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java index 07247e670e..900fb96267 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -13,11 +13,14 @@ import net.i2p.util.Log; * */ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private static final MessageOutputStream.WriteStatus _dummyStatus = new DummyStatus(); + /** + * @param con non-null + */ public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { _context = ctx; _log = ctx.logManager().getLog(ConnectionDataReceiver.class); @@ -41,10 +44,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { * @return !flush */ public boolean writeInProcess() { - Connection con = _connection; - if (con != null) - return con.getUnackedPacketsSent() >= con.getOptions().getWindowSize(); - return false; + return _connection.getUnackedPacketsSent() >= _connection.getOptions().getWindowSize(); } /** @@ -60,7 +60,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public MessageOutputStream.WriteStatus writeData(byte[] buf, int off, int size) { Connection con = _connection; - if (con == null) return _dummyStatus; + //if (con == null) return _dummyStatus; boolean doSend = true; if ( (size <= 0) && (con.getLastSendId() >= 0) ) { if (con.getOutputStream().getClosed()) { @@ -121,7 +121,7 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { */ public PacketLocal send(byte buf[], int off, int size, boolean forceIncrement) { Connection con = _connection; - if (con == null) return null; + //if (con == null) return null; long before = System.currentTimeMillis(); PacketLocal packet = buildPacket(con, buf, off, size, forceIncrement); long built = System.currentTimeMillis(); @@ -185,6 +185,8 @@ class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { packet.setFlag(Packet.FLAG_SYNCHRONIZE); packet.setOptionalFrom(con.getSession().getMyDestination()); packet.setOptionalMaxSize(con.getOptions().getMaxMessageSize()); + packet.setLocalPort(con.getLocalPort()); + packet.setRemotePort(con.getPort()); } if (con.getSendStreamId() == Packet.STREAM_ID_UNKNOWN) { packet.setFlag(Packet.FLAG_NO_ACK); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 6ba876dd92..e2c9d3f556 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -18,10 +18,10 @@ import net.i2p.util.SimpleTimer; * @author zzz modded to use concurrent and bound queue size */ class ConnectionHandler { - private I2PAppContext _context; - private Log _log; - private ConnectionManager _manager; - private LinkedBlockingQueue<Packet> _synQueue; + private final I2PAppContext _context; + private final Log _log; + private final ConnectionManager _manager; + private final LinkedBlockingQueue<Packet> _synQueue; private boolean _active; private int _acceptTimeout; @@ -41,7 +41,6 @@ class ConnectionHandler { _log = context.logManager().getLog(ConnectionHandler.class); _manager = mgr; _synQueue = new LinkedBlockingQueue<Packet>(MAX_QUEUE_SIZE); - _active = false; _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 10ff388564..95a1923677 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,24 +21,24 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionManager { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private MessageHandler _messageHandler; - private PacketHandler _packetHandler; - private ConnectionHandler _connectionHandler; - private PacketQueue _outboundQueue; - private SchedulerChooser _schedulerChooser; - private ConnectionPacketHandler _conPacketHandler; - private TCBShare _tcbShare; + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final MessageHandler _messageHandler; + private final PacketHandler _packetHandler; + private final ConnectionHandler _connectionHandler; + private final PacketQueue _outboundQueue; + private final SchedulerChooser _schedulerChooser; + private final ConnectionPacketHandler _conPacketHandler; + private final TCBShare _tcbShare; /** Inbound stream ID (Long) to Connection map */ - private ConcurrentHashMap<Long, Connection> _connectionByInboundId; + private final ConcurrentHashMap<Long, Connection> _connectionByInboundId; /** Ping ID (Long) to PingRequest */ private final Map<Long, PingRequest> _pendingPings; private boolean _allowIncoming; private boolean _throttlersInitialized; private int _maxConcurrentStreams; - private ConnectionOptions _defaultOptions; + private final ConnectionOptions _defaultOptions; private volatile int _numWaiting; private long _soTimeout; private ConnThrottler _minuteThrottler; @@ -59,10 +59,12 @@ class ConnectionManager { _schedulerChooser = new SchedulerChooser(_context); _conPacketHandler = new ConnectionPacketHandler(_context); _tcbShare = new TCBShare(_context); - _session.setSessionListener(_messageHandler); + // PROTO_ANY is for backward compatibility (pre-0.7.1) + // TODO change proto to PROTO_STREAMING someday. + // Right now we get everything, and rely on Datagram to specify PROTO_UDP. + // PacketQueue has sent PROTO_STREAMING since the beginning of mux support (0.7.1) + _session.addMuxedSessionListener(_messageHandler, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); _outboundQueue = new PacketQueue(_context, _session, this); - _allowIncoming = false; - _numWaiting = 0; /** Socket timeout for accept() */ _soTimeout = -1; @@ -141,7 +143,10 @@ class ConnectionManager { * it, or null if the syn's streamId was already taken */ public Connection receiveConnection(Packet synPacket) { - Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); + ConnectionOptions opts = new ConnectionOptions(_defaultOptions); + opts.setPort(synPacket.getRemotePort()); + opts.setLocalPort(synPacket.getLocalPort()); + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); _tcbShare.updateOptsFromShare(con); con.setInbound(); long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index ae14daa15c..e6ec6fa423 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -106,6 +106,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * This is based on documentation, the code, and logging, however there are still * some parts that could use more research. * + *<pre> * 1024 Tunnel Message * - 21 Header (see router/tunnel/BatchedPreprocessor.java) * ----- @@ -169,7 +170,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { * Similarly: * 3 msgs: 2722 * 4 msgs: 3714 - * + *</pre> * * Before release 0.6.1.14 this was 4096. * From release 0.6.1.14 through release 0.6.4, this was 960. @@ -205,18 +206,35 @@ class ConnectionOptions extends I2PSocketOptionsImpl { public static final int DEFAULT_MAX_MESSAGE_SIZE = 1730; public static final int MIN_MESSAGE_SIZE = 512; + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from System properties. Does not set local port or remote port. + */ public ConnectionOptions() { super(); } + /** + * Sets max buffer size, connect timeout, read timeout, and write timeout + * from properties. Does not set local port or remote port. + * @param opts may be null + */ public ConnectionOptions(Properties opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(I2PSocketOptions opts) { super(opts); } + /** + * Initializes from System properties then copies over all options. + * @param opts may be null + */ public ConnectionOptions(ConnectionOptions opts) { super(opts); if (opts != null) { @@ -235,8 +253,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setInboundBufferSize(opts.getInboundBufferSize()); setCongestionAvoidanceGrowthRateFactor(opts.getCongestionAvoidanceGrowthRateFactor()); setSlowStartGrowthRateFactor(opts.getSlowStartGrowthRateFactor()); - setWriteTimeout(opts.getWriteTimeout()); - setReadTimeout(opts.getReadTimeout()); + // handled in super() + // not clear why added by jr 12/22/2005 + //setWriteTimeout(opts.getWriteTimeout()); + //setReadTimeout(opts.getReadTimeout()); setAnswerPings(opts.getAnswerPings()); initLists(opts); _maxConnsPerMinute = opts.getMaxConnsPerMinute(); @@ -248,7 +268,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { } } - @Override + @Override protected void init(Properties opts) { super.init(opts); _trend = new int[TREND_COUNT]; @@ -262,12 +282,14 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSendAckDelay(getInt(opts, PROP_INITIAL_ACK_DELAY, DEFAULT_INITIAL_ACK_DELAY)); setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, INACTIVITY_ACTION_SEND)); setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2)); setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1)); setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1)); + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); initLists(opts); @@ -279,7 +301,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { _maxTotalConnsPerDay = getInt(opts, PROP_MAX_TOTAL_CONNS_DAY, 0); } - @Override + @Override public void setProperties(Properties opts) { super.setProperties(opts); if (opts == null) return; @@ -303,8 +325,9 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setWindowSize(getInt(opts, PROP_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE)); if (opts.containsKey(PROP_MAX_RESENDS)) setMaxResends(getInt(opts, PROP_MAX_RESENDS, DEFAULT_MAX_SENDS)); - if (opts.containsKey(PROP_WRITE_TIMEOUT)) - setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); + // handled in super() + //if (opts.containsKey(PROP_WRITE_TIMEOUT)) + // setWriteTimeout(getInt(opts, PROP_WRITE_TIMEOUT, -1)); if (opts.containsKey(PROP_INACTIVITY_TIMEOUT)) setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, 90*1000)); if (opts.containsKey(PROP_INACTIVITY_ACTION)) @@ -316,6 +339,7 @@ class ConnectionOptions extends I2PSocketOptionsImpl { setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2)); if (opts.containsKey(PROP_CONNECT_TIMEOUT)) // wow 5 minutes!!! FIXME!! + // overrides default in super() setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT)); if (opts.containsKey(PROP_ANSWER_PINGS)) setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS)); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 526fae3722..53ebb17e1b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -15,8 +15,8 @@ import net.i2p.util.SimpleTimer; * */ class ConnectionPacketHandler { - private I2PAppContext _context; - private Log _log; + private final I2PAppContext _context; + private final Log _log; public ConnectionPacketHandler(I2PAppContext context) { _context = context; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index 71e1dd3ac7..f40dbd0c79 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -8,7 +8,7 @@ import net.i2p.I2PException; * */ class I2PServerSocketFull implements I2PServerSocket { - private I2PSocketManagerFull _socketManager; + private final I2PSocketManagerFull _socketManager; public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index f8dbe74ea6..5cd76a864f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -4,6 +4,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import net.i2p.client.I2PSession; import net.i2p.data.Destination; /** @@ -127,7 +128,28 @@ class I2PSocketFull implements I2PSocket { if (c != null) c.disconnectComplete(); } - @Override + + /** + * The remote port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getPort(); + } + + /** + * The local port. + * @return the port or 0 if unknown + * @since 0.8.9 + */ + public int getLocalPort() { + Connection c = _connection; + return c == null ? I2PSession.PORT_UNSPECIFIED : c.getLocalPort(); + } + + @Override public String toString() { Connection c = _connection; if (c == null) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 632b904b74..d9ca691b4d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -7,7 +7,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; -import net.i2p.client.I2PSessionListener; +import net.i2p.client.I2PSessionMuxedListener; import net.i2p.util.Log; /** @@ -15,10 +15,10 @@ import net.i2p.util.Log; * Packets, if we can. * */ -class MessageHandler implements I2PSessionListener { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; +class MessageHandler implements I2PSessionMuxedListener { + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; private final Set<I2PSocketManager.DisconnectListener> _listeners; public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { @@ -31,11 +31,23 @@ class MessageHandler implements I2PSessionListener { /** Instruct the client that the given session has received a message with * size # of bytes. + * This shouldn't be called anymore since we are registering as a muxed listener. * @param session session to notify * @param msgId message number available * @param size size of the message */ public void messageAvailable(I2PSession session, int msgId, long size) { + messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED, + I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + } + + /** Instruct the client that the given session has received a message with + * size # of bytes. + * @param session session to notify + * @param msgId message number available + * @param size size of the message + */ + public void messageAvailable(I2PSession session, int msgId, long size, int proto, int fromPort, int toPort) { byte data[] = null; try { data = session.receiveMessage(msgId); @@ -49,6 +61,8 @@ class MessageHandler implements I2PSessionListener { Packet packet = new Packet(); try { packet.readPacket(data, 0, data.length); + packet.setRemotePort(fromPort); + packet.setLocalPort(toPort); _manager.getPacketHandler().receivePacket(packet); } catch (IllegalArgumentException iae) { _context.statManager().addRateData("stream.packetReceiveFailure", 1, 0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 71c9ebce9f..737e0f7b2d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -63,14 +63,10 @@ class MessageOutputStream extends OutputStream { _buf = _dataCache.acquire().getData(); // new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); - _written = 0; - _closed = false; _writeTimeout = -1; _passiveFlushDelay = passiveFlushDelay; _nextBufferSize = -1; _sendPeriodBeginTime = ctx.clock().now(); - _sendPeriodBytes = 0; - _sendBps = 0; _context.statManager().createRateStat("stream.sendBps", "How fast we pump data through the stream", "Stream", new long[] { 60*1000, 5*60*1000, 60*60*1000 }); _flusher = new Flusher(); if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index faff2ff722..5f25c3cd96 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -13,6 +13,13 @@ import net.i2p.data.SigningPrivateKey; import net.i2p.util.Log; /** + * This contains solely the data that goes out on the wire, + * including the local and remote port which is embedded in + * the I2CP overhead, not in the packet itself. + * For local state saved for outbound packets, see PacketLocal. + * + * <p> + * * Contain a single packet transferred as part of a streaming connection. * The data format is as follows:<ul> * <li>{@link #getSendStreamId sendStreamId} [4 byte value]</li> @@ -67,6 +74,8 @@ class Packet { private Destination _optionFrom; private int _optionDelay; private int _optionMaxSize; + private int _localPort; + private int _remotePort; /** * The receiveStreamId will be set to this when the packet doesn't know @@ -148,6 +157,10 @@ class Packet { public static final int DEFAULT_MAX_SIZE = 32*1024; protected static final int MAX_DELAY_REQUEST = 65535; + /** + * Does no initialization. + * See readPacket() for inbound packets, and the setters for outbound packets. + */ public Packet() { } private boolean _sendStreamIdSet = false; @@ -316,6 +329,40 @@ class Packet { _optionMaxSize = numBytes; } + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getLocalPort() { + return _localPort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setLocalPort(int port) { + _localPort = port; + } + + /** + * @return Default I2PSession.PORT_UNSPECIFIED (0) or PORT_ANY (0) + * @since 0.8.9 + */ + public int getRemotePort() { + return _remotePort; + } + + /** + * Must be called to change the port, not set by readPacket() + * as the port is out-of-band in the I2CP header. + * @since 0.8.9 + */ + public void setRemotePort(int port) { + _remotePort = port; + } + /** * Write the packet to the buffer (starting at the offset) and return * the number of bytes written. diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 4f977712c3..ef145179c3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -16,9 +16,9 @@ import net.i2p.util.Log; * */ class PacketHandler { - private ConnectionManager _manager; - private I2PAppContext _context; - private Log _log; + private final ConnectionManager _manager; + private final I2PAppContext _context; + private final Log _log; //private int _lastDelay; //private int _dropped; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index dd5fe1ceb4..ca2e25d42d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -13,13 +13,13 @@ import net.i2p.util.SimpleTimer2; * retries, etc. */ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { - private I2PAppContext _context; - private Log _log; - private Connection _connection; + private final I2PAppContext _context; + private final Log _log; + private final Connection _connection; private Destination _to; private SessionKey _keyUsed; private Set _tagsSent; - private long _createdOn; + private final long _createdOn; private int _numSends; private long _lastSend; private long _acceptedOn; @@ -29,9 +29,11 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { private volatile boolean _retransmitted; private SimpleTimer2.TimedEvent _resendEvent; + /** not bound to a connection */ public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); } + public PacketLocal(I2PAppContext ctx, Destination to, Connection con) { _context = ctx; _createdOn = ctx.clock().now(); @@ -40,8 +42,6 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { _connection = con; _lastSend = -1; _cancelledOn = -1; - _nackCount = 0; - _retransmitted = false; } public Destination getTo() { return _to; } @@ -138,6 +138,8 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } + + /** @return null if not bound */ public Connection getConnection() { return _connection; } public void incrementNACKs() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index fa0aa87ce7..4de4c6e163 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -19,11 +19,11 @@ import net.i2p.util.Log; * */ class PacketQueue { - private I2PAppContext _context; - private Log _log; - private I2PSession _session; - private ConnectionManager _connectionManager; - private ByteCache _cache = ByteCache.getInstance(64, 36*1024); + private final I2PAppContext _context; + private final Log _log; + private final I2PSession _session; + private final ConnectionManager _connectionManager; + private final ByteCache _cache = ByteCache.getInstance(64, 36*1024); public PacketQueue(I2PAppContext context, I2PSession session, ConnectionManager mgr) { _context = context; @@ -98,7 +98,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, expires, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); else // I2PSessionImpl2 //sent = _session.sendMessage(packet.getTo(), buf, 0, size, keyUsed, tagsSent, 0); @@ -107,7 +107,7 @@ class PacketQueue { // I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); // I2PSessionMuxedImpl no tags sent = _session.sendMessage(packet.getTo(), buf, 0, size, null, null, - I2PSession.PROTO_STREAMING, I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + I2PSession.PROTO_STREAMING, packet.getLocalPort(), packet.getRemotePort()); end = _context.clock().now(); if ( (end-begin > 1000) && (_log.shouldLog(Log.WARN)) ) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java index 6ba78bfd49..b0ffbf2504 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/StandardSocket.java @@ -107,11 +107,11 @@ class StandardSocket extends Socket { } /** - * @return -1 always + * @return the port or 0 if unknown */ @Override public int getLocalPort() { - return -1; + return _socket.getLocalPort(); } /** @@ -139,11 +139,11 @@ class StandardSocket extends Socket { } /** - * @return 0 always + * @return the port or 0 if unknown */ @Override public int getPort() { - return 0; + return _socket.getPort(); } @Override diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java index d2d02021a0..d33e7a741e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/TCBShare.java @@ -21,10 +21,10 @@ import net.i2p.util.SimpleTimer2; * */ class TCBShare { - private I2PAppContext _context; - private Log _log; - private Map<Destination, Entry> _cache; - private CleanEvent _cleaner; + private final I2PAppContext _context; + private final Log _log; + private final Map<Destination, Entry> _cache; + private final CleanEvent _cleaner; private static final long EXPIRE_TIME = 30*60*1000; private static final long CLEAN_TIME = 10*60*1000; -- GitLab