From 5b78b53fe8e4c6dc61fc3d8d08c7726414fa9de7 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Fri, 8 Feb 2019 13:24:42 +0000 Subject: [PATCH] SSU: EstablishmentManager fixes (ticket #2397) --- .../transport/udp/EstablishmentManager.java | 66 +++++++------------ .../transport/udp/InboundEstablishState.java | 11 +++- .../transport/udp/OutboundEstablishState.java | 13 +++- 3 files changed, 45 insertions(+), 45 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 645d2f8096..d0b999604f 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -477,8 +477,8 @@ class EstablishmentManager { byte[] fromIP = from.getIP(); state = new InboundEstablishState(_context, fromIP, from.getPort(), _transport.getExternalPort(fromIP.length == 16), - _transport.getDHBuilder()); - state.receiveSessionRequest(reader.getSessionRequestReader()); + _transport.getDHBuilder(), + reader.getSessionRequestReader()); if (_replayFilter.add(state.getReceivedX(), 0, 8)) { if (_log.shouldLog(Log.WARN)) @@ -652,7 +652,6 @@ class EstablishmentManager { } catch (IllegalStateException ise) { continue; } - RemoteHostId to = entry.getKey(); List<OutNetMessage> allQueued = entry.getValue(); List<OutNetMessage> queued = new ArrayList<OutNetMessage>(); long now = _context.clock().now(); @@ -1134,11 +1133,11 @@ class EstablishmentManager { /** * Drive through the inbound establishment states, adjusting one of them * as necessary. Called from Establisher thread only. - * @return next requested time or -1 + * @return next requested time or Long.MAX_VALUE */ private long handleInbound() { long now = _context.clock().now(); - long nextSendTime = -1; + long nextSendTime = Long.MAX_VALUE; InboundEstablishState inboundState = null; boolean expired = false; @@ -1164,7 +1163,9 @@ class EstablishmentManager { iter.remove(); //_context.statManager().addRateData("udp.inboundEstablishFailedState", cur.getState(), cur.getLifetime()); } else { - if (cur.getNextSendTime() <= now) { + // this will always be > 0 + long next = cur.getNextSendTime(); + if (next <= now) { // our turn... inboundState = cur; // if (_log.shouldLog(Log.DEBUG)) @@ -1174,14 +1175,8 @@ class EstablishmentManager { // nothin to do but wait for them to send us // stuff, so lets move on to the next one being // established - long when = -1; - if (cur.getNextSendTime() <= 0) { - when = cur.getEstablishBeginTime() + MAX_IB_ESTABLISH_TIME; - } else { - when = cur.getNextSendTime(); - } - if (when < nextSendTime) - nextSendTime = when; + if (next < nextSendTime) + nextSendTime = next; } } } @@ -1251,11 +1246,11 @@ class EstablishmentManager { /** * Drive through the outbound establishment states, adjusting one of them * as necessary. Called from Establisher thread only. - * @return next requested time or -1 + * @return next requested time or Long.MAX_VALUE */ private long handleOutbound() { long now = _context.clock().now(); - long nextSendTime = -1; + long nextSendTime = Long.MAX_VALUE; OutboundEstablishState outboundState = null; //int admitted = 0; //int remaining = 0; @@ -1278,7 +1273,9 @@ class EstablishmentManager { // _log.debug("Removing expired outbound: " + cur); break; } else { - if (cur.getNextSendTime() <= now) { + // this will be 0 for a new OES that needs sending, > 0 for others + long next = cur.getNextSendTime(); + if (next <= now) { // our turn... outboundState = cur; // if (_log.shouldLog(Log.DEBUG)) @@ -1288,14 +1285,8 @@ class EstablishmentManager { // nothin to do but wait for them to send us // stuff, so lets move on to the next one being // established - long when = -1; - if (cur.getNextSendTime() <= 0) { - when = cur.getEstablishBeginTime() + MAX_OB_ESTABLISH_TIME; - } else { - when = cur.getNextSendTime(); - } - if ( (nextSendTime <= 0) || (when < nextSendTime) ) - nextSendTime = when; + if (next < nextSendTime) + nextSendTime = next; // if (_log.shouldLog(Log.DEBUG)) // _log.debug("Outbound doesn't want activity: " + cur + " (next=" + (when-now) + ")"); } @@ -1461,8 +1452,9 @@ class EstablishmentManager { private static final long PRINT_INTERVAL = 5*1000; private void doPass() { - if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < _context.clock().now()) { - _lastPrinted = _context.clock().now(); + long now = _context.clock().now(); + if (_log.shouldLog(Log.DEBUG) && _lastPrinted + PRINT_INTERVAL < now) { + _lastPrinted = now; int iactive = _inboundStates.size(); int oactive = _outboundStates.size(); if (iactive > 0 || oactive > 0) { @@ -1476,31 +1468,21 @@ class EstablishmentManager { } } _activity = 0; - long now = _context.clock().now(); - if (_lastFailsafe + FAILSAFE_INTERVAL < _context.clock().now()) { - _lastFailsafe = _context.clock().now(); + if (_lastFailsafe + FAILSAFE_INTERVAL < now) { + _lastFailsafe = now; doFailsafe(); } - long nextSendTime = -1; - long nextSendInbound = handleInbound(); - long nextSendOutbound = handleOutbound(); - if (nextSendInbound > 0) - nextSendTime = nextSendInbound; - if ( (nextSendTime < 0) || (nextSendOutbound < nextSendTime) ) - nextSendTime = nextSendOutbound; + long nextSendTime = Math.min(handleInbound(), handleOutbound()); long delay = nextSendTime - now; - if ( (nextSendTime == -1) || (delay > 0) ) { + if (delay > 0) { if (delay > 1000) delay = 1000; try { synchronized (_activityLock) { if (_activity > 0) return; - if (nextSendTime == -1) - _activityLock.wait(1000); - else - _activityLock.wait(delay); + _activityLock.wait(delay); } } catch (InterruptedException ie) { } diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java index dd108bdb37..418876cc09 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java @@ -97,7 +97,7 @@ class InboundEstablishState { * SessionCreated message will be bad if the external port != the internal port. */ public InboundEstablishState(RouterContext ctx, byte remoteIP[], int remotePort, int localPort, - DHSessionKeyBuilder dh) { + DHSessionKeyBuilder dh, UDPPacketReader.SessionRequestReader req) { _context = ctx; _log = ctx.logManager().getLog(InboundEstablishState.class); _aliceIP = remoteIP; @@ -108,6 +108,7 @@ class InboundEstablishState { _establishBegin = ctx.clock().now(); _keyBuilder = dh; _queuedMessages = new LinkedBlockingQueue<OutNetMessage>(); + receiveSessionRequest(req); } public synchronized InboundState getState() { return _currentState; } @@ -288,6 +289,11 @@ class InboundEstablishState { /** how long have we been trying to establish this session? */ public long getLifetime() { return _context.clock().now() - _establishBegin; } public long getEstablishBeginTime() { return _establishBegin; } + + /** + * @return rcv time after receiving a packet (including after constructor), + * send time + delay after sending a packet + */ public synchronized long getNextSendTime() { return _nextSend; } /** RemoteHostId, uniquely identifies an attempt */ @@ -478,6 +484,9 @@ class InboundEstablishState { } } + /** + * Call from synchronized method only + */ private void packetReceived() { _nextSend = _context.clock().now(); } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index 80932a7009..dd8fb6ac8b 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -653,6 +653,12 @@ class OutboundEstablishState { /** how long have we been trying to establish this session? */ public long getLifetime() { return _context.clock().now() - _establishBegin; } public long getEstablishBeginTime() { return _establishBegin; } + + /** + * @return 0 at initialization (to force sending session request), + * rcv time after receiving a packet, + * send time + delay after sending a packet (including session request) + */ public synchronized long getNextSendTime() { return _nextSend; } /** @@ -678,10 +684,13 @@ class OutboundEstablishState { _currentState = OutboundState.OB_STATE_CONFIRMED_COMPLETELY; } + /** + * Call from synchronized method only + */ private void packetReceived() { _nextSend = _context.clock().now(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Got a packet, nextSend == now"); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Got a packet, nextSend == now"); } /** @since 0.8.9 */ -- GitLab