diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 0725033fa600cfb5180ac82283f5194ee35b34b0..f6e6df3a5c0d10fbf1efc2f7069c342c2fe16f4e 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -18,7 +18,7 @@ public class RouterVersion { /** deprecated */ public final static String ID = "Monotone"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 3; + public final static long BUILD = 4; /** for example "-test" */ public final static String EXTRA = ""; diff --git a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java index 9955268e98d2b8291f6499e67f7efdfbab8aaaa1..5f8466d6e655317c828d410c45e4c3c01b079e01 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EstablishState.java @@ -59,6 +59,9 @@ import net.i2p.util.Log; * */ class EstablishState { + + public static final VerifiedEstablishState VERIFIED = new VerifiedEstablishState(); + private final RouterContext _context; private final Log _log; @@ -107,6 +110,17 @@ class EstablishState { private boolean _verified; private boolean _confirmWritten; private boolean _failedBySkew; + + private EstablishState() { + _context = null; + _log = null; + _X = null; + _hX_xor_bobIdentHash = null; + _curDecrypted = null; + _dh = null; + _transport = null; + _con = null; + } public EstablishState(RouterContext ctx, NTCPTransport transport, NTCPConnection con) { _context = ctx; @@ -773,6 +787,10 @@ class EstablishState { return false; } } + + private static class VerifiedEstablishState extends EstablishState { + @Override public boolean isComplete() { return true; } + } /** @deprecated unused */ /********* diff --git a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java index fdc00de340371de39b47cf68ddd389f78584b65f..000d530471da4e28e6b1db7a793d745fd44ea8cc 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java +++ b/router/java/src/net/i2p/router/transport/ntcp/EventPumper.java @@ -749,8 +749,10 @@ class EventPumper implements Runnable { if (!_wantsWrite.isEmpty()) { for (Iterator<NTCPConnection> iter = _wantsWrite.iterator(); iter.hasNext(); ) { con = iter.next(); - iter.remove(); SelectionKey key = con.getKey(); + if (key == null) + continue; + iter.remove(); try { key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); } catch (CancelledKeyException cke) { diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 39f12dfa0271cf6134a5b4655e682ab0c8535cfa..e4f4a3b0ee4bb7b38947664e14a829d0d88e7164 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -82,7 +82,6 @@ class NTCPConnection { /** Requests that were not granted immediately */ private final Set<FIFOBandwidthLimiter.Request> _bwInRequests; private final Set<FIFOBandwidthLimiter.Request> _bwOutRequests; - private boolean _established; private long _establishedOn; private EstablishState _establishState; private final NTCPTransport _transport; @@ -209,6 +208,7 @@ class NTCPConnection { _inboundListener = new InboundListener(); _outboundListener = new OutboundListener(); initialize(); + _establishState = new EstablishState(ctx, transport, this); } private void initialize() { @@ -232,7 +232,7 @@ class NTCPConnection { public void setChannel(SocketChannel chan) { _chan = chan; } public void setKey(SelectionKey key) { _conKey = key; } public boolean isInbound() { return _isInbound; } - public boolean isEstablished() { return _established; } + public synchronized boolean isEstablished() { return _establishState.isComplete(); } /** * @since IPv6 @@ -269,12 +269,11 @@ class NTCPConnection { System.arraycopy(prevReadEnd, prevReadEnd.length - BLOCK_SIZE, _prevReadBlock, 0, BLOCK_SIZE); //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Inbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); - _established = true; _establishedOn = System.currentTimeMillis(); _transport.inboundEstablished(this); - _establishState = null; _nextMetaTime = System.currentTimeMillis() + (META_FREQUENCY / 2) + _context.random().nextInt(META_FREQUENCY); _nextInfoTime = System.currentTimeMillis() + (INFO_FREQUENCY / 2) + _context.random().nextInt(INFO_FREQUENCY); + _establishState = EstablishState.VERIFIED; } /** @return seconds */ @@ -282,7 +281,7 @@ class NTCPConnection { /** @return milliseconds */ public long getUptime() { - if (!_established) + if (!isEstablished()) return getTimeSinceCreated(); else return System.currentTimeMillis()-_establishedOn; @@ -333,7 +332,7 @@ class NTCPConnection { _closed = true; if (_chan != null) try { _chan.close(); } catch (IOException ioe) { } if (_conKey != null) _conKey.cancel(); - _establishState = null; + _establishState = EstablishState.VERIFIED; _transport.removeCon(this); _transport.getReader().connectionClosed(this); _transport.getWriter().connectionClosed(this); @@ -409,7 +408,7 @@ class NTCPConnection { //_context.statManager().addRateData("ntcp.sendQueueSize", enqueued); boolean noOutbound = (_currentOutbound == null); //if (_log.shouldLog(Log.DEBUG)) _log.debug("messages enqueued on " + toString() + ": " + enqueued + " new one: " + msg.getMessageId() + " of " + msg.getMessageType()); - if (_established && noOutbound) + if (isEstablished() && noOutbound) _transport.getWriter().wantsWrite(this, "enqueued"); } @@ -541,9 +540,8 @@ class NTCPConnection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound established, prevWriteEnd: " + Base64.encode(prevWriteEnd) + " prevReadEnd: " + Base64.encode(prevReadEnd)); - _established = true; _establishedOn = System.currentTimeMillis(); - _establishState = null; + _establishState = EstablishState.VERIFIED; _transport.markReachable(getRemotePeer().calculateHash(), false); //_context.banlist().unbanlistRouter(getRemotePeer().calculateHash(), NTCPTransport.STYLE); boolean msgs = !_outbound.isEmpty(); @@ -693,15 +691,7 @@ class NTCPConnection { return; //if (_log.shouldLog(Log.DEBUG)) // _log.debug("prepare next write w/ isInbound? " + _isInbound + " established? " + _established); - if (!_isInbound && !_established) { - if (_establishState == null) { - // shouldn't happen - _establishState = new EstablishState(_context, _transport, this); - _establishState.prepareOutbound(); - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("prepare next write, but we have already prepared the first outbound and we are not yet established..." + toString()); - } + if (!_isInbound && !isEstablished()) { return; } @@ -1535,7 +1525,7 @@ class NTCPConnection { return "NTCP conn " + (_isInbound ? "from " : "to ") + (_remotePeer == null ? "unknown" : _remotePeer.calculateHash().toBase64().substring(0,6)) + - (_established ? "" : " not established") + + (isEstablished() ? "" : " not established") + " created " + DataHelper.formatDuration(getTimeSinceCreated()) + " ago"; } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index 7fa6c1a9163977e8936c85538d6eb47b0568f651..dce1f34854e9afc05fb7dce67b6b88afb5f75af3 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -231,6 +231,7 @@ public class NTCPTransport extends TransportImpl { con.setChannel(channel); channel.configureBlocking(false); _pumper.registerConnect(con); + con.getEstablishState().prepareOutbound(); } catch (IOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Error opening a channel", ioe); diff --git a/router/java/src/net/i2p/router/transport/ntcp/Reader.java b/router/java/src/net/i2p/router/transport/ntcp/Reader.java index 11dcd07b6d191ca1f7d23ee8a5b2963d1302bfb4..87f332f517c39b9b23627ab96bbfa194c92d4668 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/Reader.java +++ b/router/java/src/net/i2p/router/transport/ntcp/Reader.java @@ -132,30 +132,24 @@ class Reader { * Return read buffers back to the pool as we process them. */ private void processRead(NTCPConnection con) { - if (con.isClosed()) - return; ByteBuffer buf = null; - while (!con.isClosed() && !con.isEstablished() && ( (buf = con.getNextReadBuf()) != null) ) { + while(true) { + synchronized(con) { + if (con.isClosed()) + return; + if (con.isEstablished()) + break; + } + if ((buf = con.getNextReadBuf()) == null) + return; EstablishState est = con.getEstablishState(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Processing read buffer as an establishment for " + con + " with [" + est + "]"); - if (est == null) { - EventPumper.releaseBuf(buf); - if (!con.isEstablished()) { - // establish state is only removed when the connection is fully established, - // yet if that happens, con.isEstablished() should return true... - throw new RuntimeException("connection was not established, yet the establish state is null for " + con); - } else { - // hmm, there shouldn't be a race here - only one reader should - // be running on a con at a time... - _log.error("no establishment state but " + con + " is established... race?"); - break; - } - } + if (est.isComplete()) { // why is it complete yet !con.isEstablished? - _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? " - + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")"); + _log.error("establishment state [" + est + "] is complete, yet the connection isn't established? " + + con.isEstablished() + " (inbound? " + con.isInbound() + " " + con + ")"); EventPumper.releaseBuf(buf); break; } @@ -172,9 +166,6 @@ class Reader { if (est.isComplete() && est.getExtraBytes() != null) con.recvEncryptedI2NP(ByteBuffer.wrap(est.getExtraBytes())); } - // catch race? - if (!con.isEstablished()) - return; while (!con.isClosed() && (buf = con.getNextReadBuf()) != null) { // decrypt the data and push it into an i2np message if (_log.shouldLog(Log.DEBUG))