diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 667c05d654b2d39e19fdd3a750ba142407ecf43a..fc415b983bb8f8e573065470ae72c5344df62c18 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -92,7 +92,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa protected I2PSessionListener _sessionListener; /** class that generates new messages */ - protected I2CPMessageProducer _producer; + protected final I2CPMessageProducer _producer; /** map of Long --> MessagePayloadMessage */ protected Map<Long, MessagePayloadMessage> _availableMessages; @@ -101,7 +101,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa protected final Object _bwReceivedLock = new Object(); protected volatile int[] _bwLimits; - protected I2PClientMessageHandlerMap _handlerMap; + protected final I2PClientMessageHandlerMap _handlerMap; /** used to seperate things out so we can get rid of singletons */ protected final I2PAppContext _context; @@ -119,7 +119,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa CLOSED } - protected State _state = State.CLOSED; + private State _state = State.CLOSED; protected final Object _stateLock = new Object(); /** have we received the current date from the router yet? */ @@ -172,16 +172,19 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** * for extension by SimpleSession (no dest) */ - protected I2PSessionImpl(I2PAppContext context, Properties options) { - this(context, options, false); + protected I2PSessionImpl(I2PAppContext context, Properties options, + I2PClientMessageHandlerMap handlerMap) { + this(context, options, handlerMap, false); } /** * Basic setup of finals * @since 0.9.7 */ - private I2PSessionImpl(I2PAppContext context, Properties options, boolean hasDest) { + private I2PSessionImpl(I2PAppContext context, Properties options, + I2PClientMessageHandlerMap handlerMap, boolean hasDest) { _context = context; + _handlerMap = handlerMap; _log = context.logManager().getLog(getClass()); if (options == null) options = (Properties) System.getProperties().clone(); @@ -190,10 +193,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _portNum = getPort(); _fastReceive = Boolean.parseBoolean(_options.getProperty(I2PClient.PROP_FAST_RECEIVE)); if (hasDest) { + _producer = new I2CPMessageProducer(context); + _availableMessages = new ConcurrentHashMap(); _myDestination = new Destination(); _privateKey = new PrivateKey(); _signingPrivateKey = new SigningPrivateKey(); } else { + _producer = null; + _availableMessages = null; _myDestination = null; _privateKey = null; _signingPrivateKey = null; @@ -210,11 +217,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * @throws I2PSessionException if there is a problem loading the private keys or */ public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { - this(context, options, true); - _handlerMap = new I2PClientMessageHandlerMap(context); - _producer = new I2CPMessageProducer(context); + this(context, options, new I2PClientMessageHandlerMap(context), true); _availabilityNotifier = new AvailabilityNotifier(); - _availableMessages = new ConcurrentHashMap(); try { readDestination(destKeyStream); } catch (DataFormatException dfe) { @@ -394,8 +398,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa case OPENING: wasOpening = true; try { - _stateLock.wait(); - } catch (InterruptedException ie) {} + _stateLock.wait(10*1000); + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); + } break; case CLOSING: throw new I2PSessionException("close in progress"); @@ -442,8 +448,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _reader = new I2CPMessageReader(in, this); } } - Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); - notifier.start(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading"); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate"); @@ -452,53 +456,60 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa int waitcount = 0; while (!_dateReceived) { if (waitcount++ > 30) { - closeSocket(); throw new IOException("No handshake received from the router"); } - try { - synchronized (_dateReceivedLock) { - _dateReceivedLock.wait(1000); - } - } catch (InterruptedException ie) { // nop + synchronized (_dateReceivedLock) { + // InterruptedException caught below + _dateReceivedLock.wait(1000); } } if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After received a SetDate response"); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before producer.connect()"); _producer.connect(this); - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After producer.connect()"); + if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After producer.connect()"); // wait until we have created a lease set waitcount = 0; while (_leaseSet == null) { if (waitcount++ > 5*60) { - try { - _producer.disconnect(this); - } catch (I2PSessionException ipe) {} - closeSocket(); throw new IOException("No tunnels built after waiting 5 minutes. Your network connection may be down, or there is severe network congestion."); } synchronized (_leaseSetWait) { - try { - _leaseSetWait.wait(1000); - } catch (InterruptedException ie) { // nop - } + // InterruptedException caught below + _leaseSetWait.wait(1000); } } - long connected = _context.clock().now(); - if (_log.shouldLog(Log.INFO)) + if (_log.shouldLog(Log.INFO)) { + long connected = _context.clock().now(); _log.info(getPrefix() + "Lease set created with inbound tunnels after " + (connected - startConnect) + "ms - ready to participate in the network!"); + } + Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true); + notifier.start(); startIdleMonitor(); startVerifyUsage(); success = true; + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); } catch (UnknownHostException uhe) { throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, uhe); } catch (IOException ioe) { throw new I2PSessionException(getPrefix() + "Cannot connect to the router on " + _hostname + ':' + _portNum, ioe); } finally { - changeState(success ? State.OPEN : State.CLOSED); + if (success) { + changeState(State.OPEN); + } else { + _availabilityNotifier.stopNotifying(); + synchronized(_stateLock) { + changeState(State.CLOSING); + try { + _producer.disconnect(this); + } catch (I2PSessionException ipe) {} + closeSocket(); + } + } } } @@ -723,7 +734,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Has the session been closed (or not yet connected)? * False when open and during transitions. Unsynchronized. */ - public boolean isClosed() { return _state == State.CLOSED; } + public boolean isClosed() { + synchronized (_stateLock) { + return _state == State.CLOSED; + } + } /** * Deliver an I2CP message to the router @@ -740,7 +755,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (!_queue.offer(message, MAX_SEND_WAIT)) throw new I2PSessionException("Timed out waiting while write queue was full"); } catch (InterruptedException ie) { - throw new I2PSessionException("Interrupted while write queue was full", ie); + throw new I2PSessionException("Interrupted", ie); } } else if (_writer == null) { throw new I2PSessionException("Already closed"); @@ -902,7 +917,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa i++; if ( (delay > MAX_RECONNECT_DELAY) || (delay <= 0) ) delay = MAX_RECONNECT_DELAY; - try { Thread.sleep(delay); } catch (InterruptedException ie) {} + try { + Thread.sleep(delay); + } catch (InterruptedException ie) { + return false; + } try { connect(); @@ -1017,7 +1036,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa synchronized (waiter) { waiter.wait(maxWait); } - } catch (InterruptedException ie) {} + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); + } } finally { _pendingLookups.remove(waiter); } @@ -1040,7 +1061,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa synchronized (_bwReceivedLock) { _bwReceivedLock.wait(5*1000); } - } catch (InterruptedException ie) {} + } catch (InterruptedException ie) { + throw new I2PSessionException("Interrupted", ie); + } return _bwLimits; } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index c369f3a62eaeb04869d7e43cb385764eb7d463e6..eb537815edaf3b2f65e20b037889ffd02d549353 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -44,9 +44,12 @@ class I2PSessionImpl2 extends I2PSessionImpl { /** Don't expect any MSMs from the router for outbound traffic @since 0.8.1 */ protected boolean _noEffort; - /** for extension */ - protected I2PSessionImpl2(I2PAppContext context, Properties options) { - super(context, options); + /** + * for extension by SimpleSession (no dest) + */ + protected I2PSessionImpl2(I2PAppContext context, Properties options, + I2PClientMessageHandlerMap handlerMap) { + super(context, options, handlerMap); } /** diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index 66daa01fb362b597049525f1ba077d124c65d823..35b6422e28aa0dcbe0d750d5771634221c2be9d3 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -38,8 +38,7 @@ class I2PSimpleSession extends I2PSessionImpl2 { * @throws I2PSessionException if there is a problem */ public I2PSimpleSession(I2PAppContext context, Properties options) throws I2PSessionException { - super(context, options); - _handlerMap = new SimpleMessageHandlerMap(context); + super(context, options, new SimpleMessageHandlerMap(context)); } /**