diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index aeaea7d34c5a0354038a9f7b897c744f58e64d77..bc34f88f00eee31693f2454f1f38cc66e6346a28 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -81,7 +81,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 /** this session's Id */ private SessionId _sessionId; /** currently granted lease set, or null */ - private volatile LeaseSet _leaseSet; + protected volatile LeaseSet _leaseSet; // subsession stuff // registered subsessions @@ -130,7 +130,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 protected final I2PAppContext _context; /** monitor for waiting until a lease set has been granted */ - private final Object _leaseSetWait = new Object(); + protected final Object _leaseSetWait = new Object(); /** * @since 0.9.8 diff --git a/core/java/src/net/i2p/client/SubSession.java b/core/java/src/net/i2p/client/SubSession.java index eb2a2364f7d0f8905292d9713130b4974b544c68..0e2fca90b683ca20ca695130eeb8ddc4953ef624 100644 --- a/core/java/src/net/i2p/client/SubSession.java +++ b/core/java/src/net/i2p/client/SubSession.java @@ -101,12 +101,36 @@ class SubSession extends I2PSessionMuxedImpl { _state = State.OPENING; } } - _primary.connect(); - synchronized(_stateLock) { - if (_state != State.OPEN) { - Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); - notifier.start(); - _state = State.OPEN; + boolean success = false; + try { + _primary.connect(); + // wait until we have created a lease set + int waitcount = 0; + while (_leaseSet == null) { + if (waitcount++ > 5*60) { + throw new IOException("No tunnels built after waiting 5 minutes. Your network connection may be down, or there is severe network congestion."); + } + synchronized (_leaseSetWait) { + // InterruptedException caught below + _leaseSetWait.wait(1000); + } + } + synchronized(_stateLock) { + if (_state != State.OPEN) { + Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); + notifier.start(); + _state = State.OPEN; + } + } + success = true; + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); + } catch (IOException ioe) { + throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe); + } finally { + if (!success) { + _availabilityNotifier.stopNotifying(); + changeState(State.CLOSED); } } } diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java index 72148a55dab2a1f0f41f69c36793b32fac35d8ac..4dfbb8501c76193c31f5a0eb54aa95c55ac4a2db 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java @@ -158,6 +158,20 @@ public class I2CPMessageReader { } public void run() { + try { + run2(); + } catch (Exception e) { + _log.log(Log.CRIT, "Uncaught I2CP error", e); + _listener.readError(I2CPMessageReader.this, e); + cancelRunner(); + } + } + + /** + * Called by run() + * @since 0.9.21 + */ + protected void run2() { while (_stayAlive) { while (_doRun) { // do read diff --git a/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java index 3e87a8e2f21a3517aecda3e204cc5b618a5ef86a..ba218631f405739fe47fd79aa0d97bdfc91eed73 100644 --- a/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java +++ b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java @@ -42,7 +42,7 @@ public class QueuedI2CPMessageReader extends I2CPMessageReader { * Pumps messages from the incoming message queue to the listener. */ @Override - public void run() { + protected void run2() { while (_stayAlive) { while (_doRun) { // do read diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index bc92e33be3a6f0a40e20b0725832c91f405ee16b..3e212d081cfb697baccfaa045b3c01493e89f19b 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -228,9 +228,12 @@ class ClientConnectionRunner { * Current client's config, * will be null if session not found * IS subsession aware. + * Returns null if id is null. * @since 0.9.21 added id param */ public SessionConfig getConfig(SessionId id) { + if (id == null) + return null; for (SessionParams sp : _sessions.values()) { if (id.equals(sp.sessionId)) return sp.config; @@ -317,6 +320,8 @@ class ClientConnectionRunner { * @since 0.9.21 */ public Hash getDestHash(SessionId id) { + if (id == null) + return null; for (Map.Entry<Hash, SessionParams> e : _sessions.entrySet()) { if (id.equals(e.getValue().sessionId)) return e.getKey(); @@ -330,6 +335,8 @@ class ClientConnectionRunner { * @since 0.9.21 */ public Destination getDestination(SessionId id) { + if (id == null) + return null; for (SessionParams sp : _sessions.values()) { if (id.equals(sp.sessionId)) return sp.dest; @@ -391,6 +398,8 @@ class ClientConnectionRunner { void setSessionId(Hash hash, SessionId id) { if (hash == null) throw new IllegalStateException(); + if (id == null) + throw new NullPointerException(); SessionParams sp = _sessions.get(hash); if (sp == null || sp.sessionId != null) throw new IllegalStateException(); @@ -403,6 +412,8 @@ class ClientConnectionRunner { * @since 0.9.21 */ void removeSession(SessionId id) { + if (id == null) + return; boolean isPrimary = false; for (Iterator<SessionParams> iter = _sessions.values().iterator(); iter.hasNext(); ) { SessionParams sp = iter.next(); @@ -813,18 +824,20 @@ class ClientConnectionRunner { synchronized (this) { state = sp.leaseRequest; if (state != null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Already requesting " + state); LeaseSet requested = state.getRequested(); LeaseSet granted = state.getGranted(); long ours = set.getEarliestLeaseDate(); if ( ( (requested != null) && (requested.getEarliestLeaseDate() > ours) ) || ( (granted != null) && (granted.getEarliestLeaseDate() > ours) ) ) { // theirs is newer + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Already requesting, theirs newer, do nothing: " + state); } else { // ours is newer, so wait a few secs and retry set.setDestination(dest); _context.simpleTimer2().addEvent(new Rerequest(set, expirationTime, onCreateJob, onFailedJob), 3*1000); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Already requesting, ours newer, wait 3 sec: " + state); } // fire onCreated? return; // already requesting diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index 7bfd342b147881751846393e1d2a3b2c4fab3002..d2a6ceeb038ad363fc6eed3c313e6d5a97a5673e 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -80,7 +80,11 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * */ public void messageReceived(I2CPMessageReader reader, I2CPMessage message) { - if (_runner.isDead()) return; + if (_runner.isDead()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received but runner dead: \n" + message); + return; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Message received: \n" + message); int type = message.getType(); @@ -362,8 +366,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi SessionConfig cfg = _runner.getConfig(sid); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("SendMessage w/o session"); - _runner.disconnectClient("SendMessage w/o session"); + _log.error("SendMessage w/o session: " + sid); + _runner.disconnectClient("SendMessage w/o session: " + sid); return; } if (_log.shouldLog(Log.DEBUG)) @@ -372,10 +376,10 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi MessageId id = _runner.distributeMessage(message); long timeToDistribute = _context.clock().now() - beforeDistribute; // TODO validate session id - _runner.ackSendMessage(message.getSessionId(), id, message.getNonce()); + _runner.ackSendMessage(sid, id, message.getNonce()); _context.statManager().addRateData("client.distributeTime", timeToDistribute); - if ( (timeToDistribute > 50) && (_log.shouldLog(Log.INFO)) ) - _log.info("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); + if ( (timeToDistribute > 50) && (_log.shouldLog(Log.DEBUG)) ) + _log.debug("Took too long to distribute the message (which holds up the ack): " + timeToDistribute); } @@ -386,7 +390,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi private void handleReceiveBegin(ReceiveMessageBeginMessage message) { if (_runner.isDead()) return; if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handling recieve begin: id = " + message.getMessageId()); + _log.debug("Handling receive begin: id = " + message.getMessageId()); MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(message.getMessageId()); // TODO validate session id @@ -409,7 +413,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi } /** - * The client told us that the message has been recieved completely. This currently + * The client told us that the message has been received completely. This currently * does not do any security checking prior to removing the message from the * pending queue, though it should. * @@ -443,8 +447,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi SessionConfig cfg = _runner.getConfig(id); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("CreateLeaseSet w/o session"); - _runner.disconnectClient("CreateLeaseSet w/o session"); + _log.error("CreateLeaseSet w/o session: " + id); + _runner.disconnectClient("CreateLeaseSet w/o session: " + id); return; } Destination dest = cfg.getDestination(); @@ -533,9 +537,9 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi SessionConfig cfg = _runner.getConfig(id); if (cfg == null) { if (_log.shouldLog(Log.ERROR)) - _log.error("ReconfigureSession w/o session"); + _log.error("ReconfigureSession w/o session: " + id); //sendStatusMessage(id, SessionStatusMessage.STATUS_INVALID); - _runner.disconnectClient("ReconfigureSession w/o session"); + _runner.disconnectClient("ReconfigureSession w/o session: " + id); return; } if (_log.shouldLog(Log.INFO))