From 036b77746bc384692bb63a1ea7a5efb30768fb58 Mon Sep 17 00:00:00 2001 From: zzz Date: Wed, 17 Jun 2015 02:16:06 +0000 Subject: [PATCH] Catch uncaught exceptions in ClientConnectionRunner and stop connection Catch null SessionId in messages and stop connection instead of NPE Wait for LS in SubSession in connect() so we don't send data w/o a session ID and leaseset Log tweaks --- .../src/net/i2p/client/I2PSessionImpl.java | 4 +-- core/java/src/net/i2p/client/SubSession.java | 36 +++++++++++++++---- .../net/i2p/data/i2cp/I2CPMessageReader.java | 14 ++++++++ .../i2p/internal/QueuedI2CPMessageReader.java | 2 +- .../router/client/ClientConnectionRunner.java | 17 +++++++-- .../client/ClientMessageEventListener.java | 28 ++++++++------- 6 files changed, 78 insertions(+), 23 deletions(-) diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index aeaea7d34..bc34f88f0 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -81,7 +81,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 /** this session's Id */ private SessionId _sessionId; /** currently granted lease set, or null */ - private volatile LeaseSet _leaseSet; + protected volatile LeaseSet _leaseSet; // subsession stuff // registered subsessions @@ -130,7 +130,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 protected final I2PAppContext _context; /** monitor for waiting until a lease set has been granted */ - private final Object _leaseSetWait = new Object(); + protected final Object _leaseSetWait = new Object(); /** * @since 0.9.8 diff --git a/core/java/src/net/i2p/client/SubSession.java b/core/java/src/net/i2p/client/SubSession.java index eb2a2364f..0e2fca90b 100644 --- a/core/java/src/net/i2p/client/SubSession.java +++ b/core/java/src/net/i2p/client/SubSession.java @@ -101,12 +101,36 @@ class SubSession extends I2PSessionMuxedImpl { _state = State.OPENING; } } - _primary.connect(); - synchronized(_stateLock) { - if (_state != State.OPEN) { - Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); - notifier.start(); - _state = State.OPEN; + boolean success = false; + try { + _primary.connect(); + // wait until we have created a lease set + int waitcount = 0; + while (_leaseSet == null) { + if (waitcount++ > 5*60) { + throw new IOException("No tunnels built after waiting 5 minutes. Your network connection may be down, or there is severe network congestion."); + } + synchronized (_leaseSetWait) { + // InterruptedException caught below + _leaseSetWait.wait(1000); + } + } + synchronized(_stateLock) { + if (_state != State.OPEN) { + Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); + notifier.start(); + _state = State.OPEN; + } + } + success = true; + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); + } catch (IOException ioe) { + throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe); + } finally { + if (!success) { + _availabilityNotifier.stopNotifying(); + changeState(State.CLOSED); } } } diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java index 72148a55d..4dfbb8501 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java @@ -158,6 +158,20 @@ public class I2CPMessageReader { } public void run() { + try { + run2(); + } catch (Exception e) { + _log.log(Log.CRIT, "Uncaught I2CP error", e); + _listener.readError(I2CPMessageReader.this, e); + cancelRunner(); + } + } + + /** + * Called by run() + * @since 0.9.21 + */ + protected void run2() { while (_stayAlive) { while (_doRun) { // do read diff --git a/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java index 3e87a8e2f..ba218631f 100644 --- a/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java +++ b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java @@ -42,7 +42,7 @@ public class QueuedI2CPMessageReader extends I2CPMessageReader { * Pumps messages from the incoming message queue to the listener. */ @Override - public void run() { + protected void run2() { while (_stayAlive) { while (_doRun) { // do read diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index bc92e33be..3e212d081 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -228,9 +228,12 @@ class ClientConnectionRunner { * Current client's config, * will be null if session not found * IS subsession aware. + * Returns null if id is null. * @since 0.9.21 added id param */ public SessionConfig getConfig(SessionId id) { + if (id == null) + return null; for (SessionParams sp : _sessions.values()) { if (id.equals(sp.sessionId)) return sp.config; @@ -317,6 +320,8 @@ class ClientConnectionRunner { * @since 0.9.21 */ public Hash getDestHash(SessionId id) { + if (id == null) + return null; for (Map.Entry e : _sessions.entrySet()) { if (id.equals(e.getValue().sessionId)) return e.getKey(); @@ -330,6 +335,8 @@ class ClientConnectionRunner { * @since 0.9.21 */ public Destination getDestination(SessionId id) { + if (id == null) + return null; for (SessionParams sp : _sessions.values()) { if (id.equals(sp.sessionId)) return sp.dest; @@ -391,6 +398,8 @@ class ClientConnectionRunner { void setSessionId(Hash hash, SessionId id) { if (hash == null) throw new IllegalStateException(); + if (id == null) + throw new NullPointerException(); SessionParams sp = _sessions.get(hash); if (sp == null || sp.sessionId != null) throw new IllegalStateException(); @@ -403,6 +412,8 @@ class ClientConnectionRunner { * @since 0.9.21 */ void removeSession(SessionId id) { + if (id == null) + return; boolean isPrimary = false; for (Iterator iter = _sessions.values().iterator(); iter.hasNext(); ) { SessionParams sp = iter.next(); @@ -813,18 +824,20 @@ class ClientConnectionRunner { synchronized (this) { state = sp.leaseRequest; if (state != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Already requesting " + state); LeaseSet requested = state.getRequested(); LeaseSet granted = state.getGranted(); long ours = set.getEarliestLeaseDate(); if ( ( (requested != null) && (requested.getEarliestLeaseDate() > ours) ) || ( (granted != null) && (granted.getEarliestLeaseDate() > ours) ) ) { // theirs is newer + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Already requesting, theirs newer, do nothing: " + state); } else { // ours is newer, so wait a few secs and retry set.setDestination(dest); _context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Already requesting, ours newer, wait 3 sec: " + state); } // fire onCreated? return; // already requesting diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 7bfd342b1..d2a6ceeb0 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -80,7 +80,11 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * */ public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { - if (_runner.isDead()) return; + if (_runner.isDead()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received but runner dead: \n" + message); + return; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Message received: \n" + message); int type = message.getType(); @@ -362,8 +366,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi SessionConfig cfg = _runner.getConfig(sid); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("SendMessage w/o session"); - _runner.disconnectClient("SendMessage w/o session"); + _log.error("SendMessage w/o session: " + sid); + _runner.disconnectClient("SendMessage w/o session: " + sid); return; } if (_log.shouldLog(Log.DEBUG)) @@ -372,10 +376,10 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi MessageId id = _runner.distributeMessage(message); long timeToDistribute = _context.clock().now() - beforeDistribute; // TODO validate session id - _runner.ackSendMessage(message.getSessionId(), id, message.getNonce()); + _runner.ackSendMessage(sid, id, message.getNonce()); _context.statManager().addRateData("client.distributeTime", timeToDistribute); - if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) ) - _log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); + if ( (timeToDistribute > 50) && (_log.shouldLog(Log.DEBUG)) ) + _log.debug("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); } @@ -386,7 +390,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi private void handleReceiveBegin(ReceiveMessageBeginMessage message) { if (_runner.isDead()) return; if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handling recieve begin: id = " + message.getMessageId()); + _log.debug("Handling receive begin: id = " + message.getMessageId()); MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(message.getMessageId()); // TODO validate session id @@ -409,7 +413,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi } /** - * The client told us that the message has been recieved completely. This currently + * The client told us that the message has been received completely. This currently * does not do any security checking prior to removing the message from the * pending queue, though it should. * @@ -443,8 +447,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi SessionConfig cfg = _runner.getConfig(id); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("CreateLeaseSet w/o session"); - _runner.disconnectClient("CreateLeaseSet w/o session"); + _log.error("CreateLeaseSet w/o session: " + id); + _runner.disconnectClient("CreateLeaseSet w/o session: " + id); return; } Destination dest = cfg.getDestination(); @@ -533,9 +537,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi SessionConfig cfg = _runner.getConfig(id); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("ReconfigureSession w/o session"); + _log.error("ReconfigureSession w/o session: " + id); //sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); - _runner.disconnectClient("ReconfigureSession w/o session"); + _runner.disconnectClient("ReconfigureSession w/o session: " + id); return; } if (_log.shouldLog(Log.INFO))