I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit b5dad73f authored by zzz's avatar zzz
Browse files

* I2PSession:

   - Greatly simplify the VerifyUsage timers
   - Constructor cleanup
parent d8a2e390
No related branches found
No related tags found
No related merge requests found
...@@ -54,7 +54,7 @@ import net.i2p.util.SimpleTimer; ...@@ -54,7 +54,7 @@ import net.i2p.util.SimpleTimer;
* @author jrandom * @author jrandom
*/ */
abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessageEventListener { abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessageEventListener {
protected Log _log; protected final Log _log;
/** who we are */ /** who we are */
private Destination _myDestination; private Destination _myDestination;
/** private key for decryption */ /** private key for decryption */
...@@ -104,7 +104,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -104,7 +104,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected I2PClientMessageHandlerMap _handlerMap; protected I2PClientMessageHandlerMap _handlerMap;
/** used to seperate things out so we can get rid of singletons */ /** used to seperate things out so we can get rid of singletons */
protected I2PAppContext _context; protected final I2PAppContext _context;
/** monitor for waiting until a lease set has been granted */ /** monitor for waiting until a lease set has been granted */
private final Object _leaseSetWait = new Object(); private final Object _leaseSetWait = new Object();
...@@ -144,6 +144,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -144,6 +144,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** SSL interface (only) @since 0.8.3 */ /** SSL interface (only) @since 0.8.3 */
protected static final String PROP_ENABLE_SSL = "i2cp.SSL"; protected static final String PROP_ENABLE_SSL = "i2cp.SSL";
private static final long VERIFY_USAGE_TIME = 60*1000;
void dateUpdated() { void dateUpdated() {
_dateReceived = true; _dateReceived = true;
synchronized (_dateReceivedLock) { synchronized (_dateReceivedLock) {
...@@ -154,7 +156,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -154,7 +156,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public static final int LISTEN_PORT = 7654; public static final int LISTEN_PORT = 7654;
/** for extension */ /** for extension */
public I2PSessionImpl() {} protected I2PSessionImpl(I2PAppContext context, Properties options) {
_context = context;
_log = context.logManager().getLog(getClass());
_closed = true;
if (options == null)
options = System.getProperties();
loadConfig(options);
}
/** /**
* Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey
...@@ -166,12 +175,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -166,12 +175,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @throws I2PSessionException if there is a problem loading the private keys or * @throws I2PSessionException if there is a problem loading the private keys or
*/ */
public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException { public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException {
_context = context; this(context, options);
_log = context.logManager().getLog(I2PSessionImpl.class);
_handlerMap = new I2PClientMessageHandlerMap(context); _handlerMap = new I2PClientMessageHandlerMap(context);
_closed = true;
_opening = false;
_closing = false;
_producer = new I2CPMessageProducer(context); _producer = new I2CPMessageProducer(context);
_availabilityNotifier = new AvailabilityNotifier(); _availabilityNotifier = new AvailabilityNotifier();
_availableMessages = new ConcurrentHashMap(); _availableMessages = new ConcurrentHashMap();
...@@ -182,18 +187,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -182,18 +187,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} catch (IOException ioe) { } catch (IOException ioe) {
throw new I2PSessionException("Error reading the destination key stream", ioe); throw new I2PSessionException("Error reading the destination key stream", ioe);
} }
if (options == null)
options = System.getProperties();
loadConfig(options);
_sessionId = null;
_leaseSet = null;
} }
/** /**
* Parse the config for anything we know about. * Parse the config for anything we know about.
* Also fill in the authorization properties if missing. * Also fill in the authorization properties if missing.
*/ */
protected void loadConfig(Properties options) { private void loadConfig(Properties options) {
_options = new Properties(); _options = new Properties();
_options.putAll(filter(options)); _options.putAll(filter(options));
if (_context.isRouterContext()) { if (_context.isRouterContext()) {
...@@ -405,6 +405,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -405,6 +405,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
+ (connected - startConnect) + (connected - startConnect)
+ "ms - ready to participate in the network!"); + "ms - ready to participate in the network!");
startIdleMonitor(); startIdleMonitor();
startVerifyUsage();
setOpening(false); setOpening(false);
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
_closed = true; _closed = true;
...@@ -469,16 +470,38 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -469,16 +470,38 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (_log.shouldLog(Log.INFO)) if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id); _log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id);
} }
SimpleScheduler.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
} }
protected class VerifyUsage implements SimpleTimer.TimedEvent {
private Long _msgId; /**
public VerifyUsage(Long id) { _msgId = id; } * Fire up a periodic task to check for unclamed messages
* @since 0.9.1
*/
private void startVerifyUsage() {
SimpleScheduler.getInstance().addEvent(new VerifyUsage(), VERIFY_USAGE_TIME);
}
/**
* Check for unclaimed messages, without wastefully setting a timer for each
* message. Just copy all unclaimed ones and check 30 seconds later.
*/
private class VerifyUsage implements SimpleTimer.TimedEvent {
private final List<Long> toCheck = new ArrayList();
public void timeReached() { public void timeReached() {
MessagePayloadMessage removed = _availableMessages.remove(_msgId); if (isClosed())
if (removed != null && !isClosed()) return;
_log.error("Message NOT removed! id=" + _msgId + ": " + removed); //if (_log.shouldLog(Log.DEBUG))
// _log.debug(getPrefix() + " VerifyUsage of " + toCheck.size());
if (!toCheck.isEmpty()) {
for (Long msgId : toCheck) {
MessagePayloadMessage removed = _availableMessages.remove(msgId);
if (removed != null)
_log.error("Message NOT removed! id=" + msgId + ": " + removed);
}
toCheck.clear();
}
toCheck.addAll(_availableMessages.keySet());
SimpleScheduler.getInstance().addEvent(this, VERIFY_USAGE_TIME);
} }
} }
......
...@@ -28,6 +28,8 @@ import net.i2p.util.Log; ...@@ -28,6 +28,8 @@ import net.i2p.util.Log;
/** /**
* Thread safe implementation of an I2P session running over TCP. * Thread safe implementation of an I2P session running over TCP.
* *
* Unused directly, see I2PSessionMuxedImpl extension.
*
* @author jrandom * @author jrandom
*/ */
class I2PSessionImpl2 extends I2PSessionImpl { class I2PSessionImpl2 extends I2PSessionImpl {
...@@ -43,7 +45,9 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -43,7 +45,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private boolean _noEffort; private boolean _noEffort;
/** for extension */ /** for extension */
public I2PSessionImpl2() {} protected I2PSessionImpl2(I2PAppContext context, Properties options) {
super(context, options);
}
/** /**
* Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey * Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey
...@@ -56,7 +60,6 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -56,7 +60,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
*/ */
public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException { public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
super(ctx, destKeyStream, options); super(ctx, destKeyStream, options);
_log = ctx.logManager().getLog(I2PSessionImpl2.class);
_sendingStates = new HashSet(32); _sendingStates = new HashSet(32);
// default is BestEffort // default is BestEffort
_noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US)); _noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
...@@ -296,10 +299,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -296,10 +299,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
} }
**********/ **********/
if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce"); //if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
long nonce = _context.random().nextInt(Integer.MAX_VALUE); long nonce = _context.random().nextInt(Integer.MAX_VALUE);
if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state"); //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
MessageState state = new MessageState(_context, nonce, getPrefix()); MessageState state = new MessageState(_context, nonce, getPrefix());
//state.setKey(key); //state.setKey(key);
//state.setTags(sentTags); //state.setTags(sentTags);
...@@ -323,7 +326,7 @@ class I2PSessionImpl2 extends I2PSessionImpl { ...@@ -323,7 +326,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
// } // }
//} //}
if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state"); //if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
long beforeSendingSync = _context.clock().now(); long beforeSendingSync = _context.clock().now();
long inSendingSync = 0; long inSendingSync = 0;
synchronized (_sendingStates) { synchronized (_sendingStates) {
......
...@@ -64,7 +64,7 @@ import net.i2p.util.SimpleScheduler; ...@@ -64,7 +64,7 @@ import net.i2p.util.SimpleScheduler;
* *
* @author zzz * @author zzz
*/ */
class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { class I2PSessionMuxedImpl extends I2PSessionImpl2 {
private final I2PSessionDemultiplexer _demultiplexer; private final I2PSessionDemultiplexer _demultiplexer;
...@@ -233,7 +233,6 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { ...@@ -233,7 +233,6 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
} }
((MuxedAvailabilityNotifier)_availabilityNotifier).available(id, size, getProto(msg), ((MuxedAvailabilityNotifier)_availabilityNotifier).available(id, size, getProto(msg),
getFromPort(msg), getToPort(msg)); getFromPort(msg), getToPort(msg));
SimpleScheduler.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
} }
protected class MuxedAvailabilityNotifier extends AvailabilityNotifier { protected class MuxedAvailabilityNotifier extends AvailabilityNotifier {
......
...@@ -34,15 +34,8 @@ class I2PSimpleSession extends I2PSessionImpl2 { ...@@ -34,15 +34,8 @@ class I2PSimpleSession extends I2PSessionImpl2 {
* @throws I2PSessionException if there is a problem * @throws I2PSessionException if there is a problem
*/ */
public I2PSimpleSession(I2PAppContext context, Properties options) throws I2PSessionException { public I2PSimpleSession(I2PAppContext context, Properties options) throws I2PSessionException {
// Warning, does not call super() super(context, options);
_context = context;
_log = context.logManager().getLog(I2PSimpleSession.class);
_handlerMap = new SimpleMessageHandlerMap(context); _handlerMap = new SimpleMessageHandlerMap(context);
_closed = true;
_closing = false;
if (options == null)
options = System.getProperties();
loadConfig(options);
} }
/** /**
...@@ -79,6 +72,7 @@ class I2PSimpleSession extends I2PSessionImpl2 { ...@@ -79,6 +72,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
_reader = new I2CPMessageReader(in, this); _reader = new I2CPMessageReader(in, this);
} }
// we do not receive payload messages, so we do not need an AvailabilityNotifier // we do not receive payload messages, so we do not need an AvailabilityNotifier
// ... or an Idle timer, or a VerifyUsage
_reader.startReading(); _reader.startReading();
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment