propagate from branch 'i2p.i2p' (head d2198c4bc21a9d06194cdb2dce24945ebc9d1542)

to branch 'i2p.i2p.zzz.update' (head 88ac67dc4e166b7e9dec0d3224e58bec4894440d)
This commit is contained in:
zzz
2012-08-03 18:30:39 +00:00
517 changed files with 57103 additions and 45584 deletions

View File

@@ -16,7 +16,7 @@ package net.i2p;
public class CoreVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = "0.9";
public final static String VERSION = "0.9.1";
public static void main(String args[]) {
System.out.println("I2P Core version: " + VERSION);

View File

@@ -38,6 +38,9 @@ public interface I2PSession {
/** Send a new message to the given destination, containing the specified
* payload, returning true if the router feels confident that the message
* was delivered.
*
* WARNING: It is recommended that you use a method that specifies the protocol and ports.
*
* @param dest location to send the message
* @param payload body of the message to be sent (unencrypted)
* @return whether it was accepted by the router for delivery or not
@@ -149,6 +152,9 @@ public interface I2PSession {
public void reportAbuse(int msgId, int severity) throws I2PSessionException;
/** Instruct the I2PSession where it should send event notifications
*
* WARNING: It is recommended that you use a method that specifies the protocol and ports.
*
* @param lsnr listener to retrieve events
*/
public void setSessionListener(I2PSessionListener lsnr);

View File

@@ -43,7 +43,8 @@ public class I2PSessionDemultiplexer implements I2PSessionMuxedListener {
if (_log.shouldLog(Log.WARN))
_log.warn("No listeners for incoming message");
} else {
_log.error("No listener found for proto: " + proto + " port: " + toport + " msg id: " + msgId +
if (_log.shouldLog(Log.WARN))
_log.warn("No listener found for proto: " + proto + " port: " + toport + " msg id: " + msgId +
" from pool of " + _listeners.size() + " listeners");
}
try {

View File

@@ -54,7 +54,7 @@ import net.i2p.util.SimpleTimer;
* @author jrandom
*/
abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessageEventListener {
protected Log _log;
protected final Log _log;
/** who we are */
private Destination _myDestination;
/** private key for decryption */
@@ -104,16 +104,16 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
protected I2PClientMessageHandlerMap _handlerMap;
/** used to seperate things out so we can get rid of singletons */
protected I2PAppContext _context;
protected final I2PAppContext _context;
/** monitor for waiting until a lease set has been granted */
private final Object _leaseSetWait = new Object();
/** whether the session connection has already been closed (or not yet opened) */
protected boolean _closed;
protected volatile boolean _closed;
/** whether the session connection is in the process of being closed */
protected boolean _closing;
protected volatile boolean _closing;
/** have we received the current date from the router yet? */
private boolean _dateReceived;
@@ -121,7 +121,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
private final Object _dateReceivedLock = new Object();
/** whether the session connection is in the process of being opened */
protected boolean _opening;
protected volatile boolean _opening;
/** monitor for waiting until opened */
private final Object _openingWait = new Object();
@@ -144,6 +144,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** SSL interface (only) @since 0.8.3 */
protected static final String PROP_ENABLE_SSL = "i2cp.SSL";
private static final long VERIFY_USAGE_TIME = 60*1000;
void dateUpdated() {
_dateReceived = true;
synchronized (_dateReceivedLock) {
@@ -154,7 +156,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public static final int LISTEN_PORT = 7654;
/** for extension */
public I2PSessionImpl() {}
protected I2PSessionImpl(I2PAppContext context, Properties options) {
_context = context;
_log = context.logManager().getLog(getClass());
_closed = true;
if (options == null)
options = System.getProperties();
loadConfig(options);
}
/**
* Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey
@@ -166,12 +175,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @throws I2PSessionException if there is a problem loading the private keys or
*/
public I2PSessionImpl(I2PAppContext context, InputStream destKeyStream, Properties options) throws I2PSessionException {
_context = context;
_log = context.logManager().getLog(I2PSessionImpl.class);
this(context, options);
_handlerMap = new I2PClientMessageHandlerMap(context);
_closed = true;
_opening = false;
_closing = false;
_producer = new I2CPMessageProducer(context);
_availabilityNotifier = new AvailabilityNotifier();
_availableMessages = new ConcurrentHashMap();
@@ -182,18 +187,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} catch (IOException ioe) {
throw new I2PSessionException("Error reading the destination key stream", ioe);
}
if (options == null)
options = System.getProperties();
loadConfig(options);
_sessionId = null;
_leaseSet = null;
}
/**
* Parse the config for anything we know about.
* Also fill in the authorization properties if missing.
*/
protected void loadConfig(Properties options) {
private void loadConfig(Properties options) {
_options = new Properties();
_options.putAll(filter(options));
if (_context.isRouterContext()) {
@@ -405,6 +405,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
+ (connected - startConnect)
+ "ms - ready to participate in the network!");
startIdleMonitor();
startVerifyUsage();
setOpening(false);
} catch (UnknownHostException uhe) {
_closed = true;
@@ -469,16 +470,38 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
if (_log.shouldLog(Log.INFO))
_log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id);
}
SimpleScheduler.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
}
protected class VerifyUsage implements SimpleTimer.TimedEvent {
private Long _msgId;
public VerifyUsage(Long id) { _msgId = id; }
/**
* Fire up a periodic task to check for unclamed messages
* @since 0.9.1
*/
private void startVerifyUsage() {
_context.simpleScheduler().addEvent(new VerifyUsage(), VERIFY_USAGE_TIME);
}
/**
* Check for unclaimed messages, without wastefully setting a timer for each
* message. Just copy all unclaimed ones and check 30 seconds later.
*/
private class VerifyUsage implements SimpleTimer.TimedEvent {
private final List<Long> toCheck = new ArrayList();
public void timeReached() {
MessagePayloadMessage removed = _availableMessages.remove(_msgId);
if (removed != null && !isClosed())
_log.error("Message NOT removed! id=" + _msgId + ": " + removed);
if (isClosed())
return;
//if (_log.shouldLog(Log.DEBUG))
// _log.debug(getPrefix() + " VerifyUsage of " + toCheck.size());
if (!toCheck.isEmpty()) {
for (Long msgId : toCheck) {
MessagePayloadMessage removed = _availableMessages.remove(msgId);
if (removed != null)
_log.error("Message NOT removed! id=" + msgId + ": " + removed);
}
toCheck.clear();
}
toCheck.addAll(_availableMessages.keySet());
_context.simpleScheduler().addEvent(this, VERIFY_USAGE_TIME);
}
}
@@ -561,7 +584,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug(getPrefix() + "Message received of type " + message.getType()
+ " to be handled by " + handler);
+ " to be handled by " + handler.getClass().getSimpleName());
handler.handleMessage(message, this);
}
}
@@ -944,7 +967,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
boolean close = Boolean.valueOf(_options.getProperty("i2cp.closeOnIdle")).booleanValue();
if (reduce || close) {
updateActivity();
SimpleScheduler.getInstance().addEvent(new SessionIdleTimer(_context, this, reduce, close), SessionIdleTimer.MINIMUM_TIME);
_context.simpleScheduler().addEvent(new SessionIdleTimer(_context, this, reduce, close), SessionIdleTimer.MINIMUM_TIME);
}
}

View File

@@ -28,6 +28,8 @@ import net.i2p.util.Log;
/**
* Thread safe implementation of an I2P session running over TCP.
*
* Unused directly, see I2PSessionMuxedImpl extension.
*
* @author jrandom
*/
class I2PSessionImpl2 extends I2PSessionImpl {
@@ -43,7 +45,9 @@ class I2PSessionImpl2 extends I2PSessionImpl {
private boolean _noEffort;
/** for extension */
public I2PSessionImpl2() {}
protected I2PSessionImpl2(I2PAppContext context, Properties options) {
super(context, options);
}
/**
* Create a new session, reading the Destination, PrivateKey, and SigningPrivateKey
@@ -56,7 +60,6 @@ class I2PSessionImpl2 extends I2PSessionImpl {
*/
public I2PSessionImpl2(I2PAppContext ctx, InputStream destKeyStream, Properties options) throws I2PSessionException {
super(ctx, destKeyStream, options);
_log = ctx.logManager().getLog(I2PSessionImpl2.class);
_sendingStates = new HashSet(32);
// default is BestEffort
_noEffort = "none".equals(getOptions().getProperty(I2PClient.PROP_RELIABILITY, "").toLowerCase(Locale.US));
@@ -296,10 +299,10 @@ class I2PSessionImpl2 extends I2PSessionImpl {
}
**********/
if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
//if (_log.shouldLog(Log.DEBUG)) _log.debug("before creating nonce");
long nonce = _context.random().nextInt(Integer.MAX_VALUE);
if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
//if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
MessageState state = new MessageState(_context, nonce, getPrefix());
//state.setKey(key);
//state.setTags(sentTags);
@@ -323,7 +326,7 @@ class I2PSessionImpl2 extends I2PSessionImpl {
// }
//}
if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
//if (_log.shouldLog(Log.DEBUG)) _log.debug("before sync state");
long beforeSendingSync = _context.clock().now();
long inSendingSync = 0;
synchronized (_sendingStates) {

View File

@@ -64,7 +64,7 @@ import net.i2p.util.SimpleScheduler;
*
* @author zzz
*/
class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
class I2PSessionMuxedImpl extends I2PSessionImpl2 {
private final I2PSessionDemultiplexer _demultiplexer;
@@ -233,7 +233,6 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
}
((MuxedAvailabilityNotifier)_availabilityNotifier).available(id, size, getProto(msg),
getFromPort(msg), getToPort(msg));
SimpleScheduler.getInstance().addEvent(new VerifyUsage(mid), 30*1000);
}
protected class MuxedAvailabilityNotifier extends AvailabilityNotifier {

View File

@@ -34,15 +34,8 @@ class I2PSimpleSession extends I2PSessionImpl2 {
* @throws I2PSessionException if there is a problem
*/
public I2PSimpleSession(I2PAppContext context, Properties options) throws I2PSessionException {
// Warning, does not call super()
_context = context;
_log = context.logManager().getLog(I2PSimpleSession.class);
super(context, options);
_handlerMap = new SimpleMessageHandlerMap(context);
_closed = true;
_closing = false;
if (options == null)
options = System.getProperties();
loadConfig(options);
}
/**
@@ -79,6 +72,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
_reader = new I2CPMessageReader(in, this);
}
// we do not receive payload messages, so we do not need an AvailabilityNotifier
// ... or an Idle timer, or a VerifyUsage
_reader.startReading();
} catch (UnknownHostException uhe) {

View File

@@ -112,6 +112,6 @@ class SessionIdleTimer implements SimpleTimer.TimedEvent {
} else {
nextDelay = _minimumTime - (now - lastActivity);
}
SimpleScheduler.getInstance().addEvent(this, nextDelay);
_context.simpleScheduler().addEvent(this, nextDelay);
}
}

View File

@@ -61,7 +61,8 @@ public class ElGamalAESEngine {
}
/**
* Decrypt the message using the given private key using tags from the default key manager.
* Decrypt the message using the given private key using tags from the default key manager,
* which is the router's key manager. Use extreme care if you aren't the router.
*
* @deprecated specify the key manager!
*/
@@ -75,6 +76,10 @@ public class ElGamalAESEngine {
* This works according to the
* ElGamal+AES algorithm in the data structure spec.
*
* Warning - use the correct SessionKeyManager. Clients should instantiate their own.
* Clients using I2PAppContext.sessionKeyManager() may be correlated with the router,
* unless you are careful to use different keys.
*
* @return decrypted data or null on failure
*/
public byte[] decrypt(byte data[], PrivateKey targetPrivateKey, SessionKeyManager keyManager) throws DataFormatException {
@@ -100,7 +105,7 @@ public class ElGamalAESEngine {
//if (_log.shouldLog(Log.DEBUG)) _log.debug("Key is known for tag " + st);
long id = _context.random().nextLong();
if (_log.shouldLog(Log.DEBUG))
_log.debug(id + ": Decrypting existing session encrypted with tag: " + st.toString() + ": key: " + key.toBase64() + ": " + data.length + " bytes: " + Base64.encode(data, 0, 64));
_log.debug(id + ": Decrypting existing session encrypted with tag: " + st.toString() + ": key: " + key.toBase64() + ": " + data.length + " bytes " /* + Base64.encode(data, 0, 64) */ );
decrypted = decryptExistingSession(data, key, targetPrivateKey, foundTags, usedKey, foundKey);
if (decrypted != null) {
@@ -389,7 +394,8 @@ public class ElGamalAESEngine {
*
* @param target public key to which the data should be encrypted.
* @param key session key to use during encryption
* @param tagsForDelivery session tags to be associated with the key (or newKey if specified), or null
* @param tagsForDelivery session tags to be associated with the key (or newKey if specified), or null;
* 200 max enforced at receiver
* @param currentTag sessionTag to use, or null if it should use ElG (i.e. new session)
* @param newKey key to be delivered to the target, with which the tagsForDelivery should be associated, or null
* @param paddedSize minimum size in bytes of the body after padding it (if less than the
@@ -410,7 +416,7 @@ public class ElGamalAESEngine {
_context.statManager().updateFrequency("crypto.elGamalAES.encryptExistingSession");
byte rv[] = encryptExistingSession(data, target, key, tagsForDelivery, currentTag, newKey, paddedSize);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Existing session encrypted with tag: " + currentTag.toString() + ": " + rv.length + " bytes and key: " + key.toBase64() + ": " + Base64.encode(rv, 0, 64));
_log.debug("Existing session encrypted with tag: " + currentTag.toString() + ": " + rv.length + " bytes and key: " + key.toBase64() /* + ": " + Base64.encode(rv, 0, 64) */);
return rv;
}
@@ -418,6 +424,30 @@ public class ElGamalAESEngine {
* Encrypt the data to the target using the given key and deliver the specified tags
* No new session key
* This is the one called from GarlicMessageBuilder and is the primary entry point.
*
* Re: padded size: The AES block adds at least 39 bytes of overhead to the data, and
* that is included in the minimum size calculation.
*
* In the router, we always use garlic messages. A garlic message with a single
* clove and zero data is about 84 bytes, so that's 123 bytes minimum. So any paddingSize
* <= 128 is a no-op as every message will be at least 128 bytes
* (Streaming, if used, adds more overhead).
*
* Outside the router, with a client using its own message format, the minimum size
* is 48, so any paddingSize <= 48 is a no-op.
*
* Not included in the minimum is a 32-byte session tag for an existing session,
* or a 514-byte ElGamal block and several 32-byte session tags for a new session.
* So the returned encrypted data will be at least 32 bytes larger than paddedSize.
*
* @param target public key to which the data should be encrypted.
* @param key session key to use during encryption
* @param tagsForDelivery session tags to be associated with the key or null;
* 200 max enforced at receiver
* @param currentTag sessionTag to use, or null if it should use ElG (i.e. new session)
* @param paddedSize minimum size in bytes of the body after padding it (if less than the
* body's real size, no bytes are appended but the body is not truncated)
*
*/
public byte[] encrypt(byte data[], PublicKey target, SessionKey key, Set tagsForDelivery,
SessionTag currentTag, long paddedSize) {
@@ -599,7 +629,6 @@ public class ElGamalAESEngine {
//_log.debug("Encrypting AES");
if (tagsForDelivery == null) tagsForDelivery = Collections.EMPTY_SET;
int size = 2 // sizeof(tags)
+ tagsForDelivery.size()
+ SessionTag.BYTE_LENGTH*tagsForDelivery.size()
+ 4 // payload length
+ Hash.HASH_LENGTH

View File

@@ -24,14 +24,14 @@ import net.i2p.data.SessionTag;
* unknown (and hence always forces a full ElGamal encryption for each message).
* A more intelligent subclass should manage and persist keys and tags.
*
* TODO if we aren't going to use this for testing, make it abstract.
*/
public class SessionKeyManager {
/** session key managers must be created through an app context */
protected SessionKeyManager(I2PAppContext context) { // nop
}
/** see above */
private SessionKeyManager() { // nop
/**
* Make this public if you need a dummy SessionKeyManager for testing
*/
protected SessionKeyManager(I2PAppContext context) { // nop
}
/**
@@ -59,7 +59,8 @@ public class SessionKeyManager {
* Associate a new session key with the specified target. Metrics to determine
* when to expire that key begin with this call.
*
* @deprecated racy
* Racy if called after getCurrentKey() to check for a current session;
* use getCurrentOrNewKey() in that case.
*/
public void createSession(PublicKey target, SessionKey key) { // nop
}
@@ -67,7 +68,8 @@ public class SessionKeyManager {
/**
* Generate a new session key and associate it with the specified target.
*
* @deprecated racy
* Racy if called after getCurrentKey() to check for a current session;
* use getCurrentOrNewKey() in that case.
*/
public SessionKey createSession(PublicKey target) {
SessionKey key = KeyGenerator.getInstance().generateSessionKey();
@@ -86,6 +88,31 @@ public class SessionKeyManager {
return null;
}
/**
* How many to send, IF we need to.
* @since 0.9.2
*/
public int getTagsToSend() { return 0; };
/**
* @since 0.9.2
*/
public int getLowThreshold() { return 0; };
/**
* @return true if we have less than the threshold or what we have is about to expire
* @since 0.9.2
*/
public boolean shouldSendTags(PublicKey target, SessionKey key) {
return shouldSendTags(target, key, getLowThreshold());
}
/**
* @return true if we have less than the threshold or what we have is about to expire
* @since 0.9.2
*/
public boolean shouldSendTags(PublicKey target, SessionKey key, int lowThreshold) { return false; }
/**
* Determine (approximately) how many available session tags for the current target
* have been confirmed and are available

View File

@@ -85,27 +85,74 @@ public class TransientSessionKeyManager extends SessionKeyManager {
/** for debugging */
private final AtomicInteger _rcvTagSetID = new AtomicInteger();
private final AtomicInteger _sentTagSetID = new AtomicInteger();
private final int _tagsToSend;
private final int _lowThreshold;
/**
* Let session tags sit around for 10 minutes before expiring them. We can now have such a large
* Let session tags sit around for this long before expiring them. We can now have such a large
* value since there is the persistent session key manager. This value is for outbound tags -
* inbound tags are managed by SESSION_LIFETIME_MAX_MS
*
*/
public final static long SESSION_TAG_DURATION_MS = 10 * 60 * 1000;
private final static long SESSION_TAG_DURATION_MS = 12 * 60 * 1000;
/**
* Keep unused inbound session tags around for up to 12 minutes (2 minutes longer than
* Keep unused inbound session tags around for this long (a few minutes longer than
* session tags are used on the outbound side so that no reasonable network lag
* can cause failed decrypts)
*
*/
public final static long SESSION_LIFETIME_MAX_MS = SESSION_TAG_DURATION_MS + 5 * 60 * 1000;
private final static long SESSION_LIFETIME_MAX_MS = SESSION_TAG_DURATION_MS + 3 * 60 * 1000;
/**
* Time to send more if we are this close to expiration
*/
private static final long SESSION_TAG_EXPIRATION_WINDOW = 90 * 1000;
/**
* a few MB? how about 16MB!
* This is the max size of _inboundTagSets.
*/
public final static int MAX_INBOUND_SESSION_TAGS = 500 * 1000; // this will consume at most a few MB
/**
* This was 100 since 0.6.1.10 (50 before that). It's important because:
* <pre>
* - Tags are 32 bytes. So it previously added 3200 bytes to an initial message.
* - Too many tags adds a huge overhead to short-duration connections
* (like http, datagrams, etc.)
* - Large messages have a much higher chance of being dropped due to
* one of their 1KB fragments being discarded by a tunnel participant.
* - This reduces the effective maximum datagram size because the client
* doesn't know when tags will be bundled, so the tag size must be
* subtracted from the maximum I2NP size or transport limit.
* </pre>
*
* Issues with too small a value:
* <pre>
* - When tags are sent, a reply leaseset (~1KB) is always bundled.
* Maybe don't need to bundle more than every minute or so
* rather than every time?
* - Does the number of tags (and the threshold of 20) limit the effective
* streaming lib window size? Should the threshold and the number of
* sent tags be variable based on the message rate?
* </pre>
*
* We have to be very careful if we implement an adaptive scheme,
* since the key manager is per-router, not per-local-dest.
* Or maybe that's a bad idea, and we need to move to a per-dest manager.
* This needs further investigation.
*
* So a value somewhat higher than the low threshold
* seems appropriate.
*
* Use care when adjusting these values. See ConnectionOptions in streaming,
* and TransientSessionKeyManager in crypto, for more information.
*
* @since 0.9.2 moved from GarlicMessageBuilder to per-SKM config
*/
public static final int DEFAULT_TAGS = 40;
/** ditto */
public static final int LOW_THRESHOLD = 30;
/**
* The session key manager should only be constructed and accessed through the
* application context. This constructor should only be used by the
@@ -113,15 +160,28 @@ public class TransientSessionKeyManager extends SessionKeyManager {
*
*/
public TransientSessionKeyManager(I2PAppContext context) {
this(context, DEFAULT_TAGS, LOW_THRESHOLD);
}
/**
* @param tagsToSend how many to send at a time, may be lower or higher than lowThreshold. 1-128
* @param lowThreshold below this, send more. 1-128
* @since 0.9.2
*/
public TransientSessionKeyManager(I2PAppContext context, int tagsToSend, int lowThreshold) {
super(context);
if (tagsToSend <= 0 || tagsToSend > 128 || lowThreshold <= 0 || lowThreshold > 128)
throw new IllegalArgumentException();
_tagsToSend = tagsToSend;
_lowThreshold = lowThreshold;
_log = context.logManager().getLog(TransientSessionKeyManager.class);
_context = context;
_outboundSessions = new HashMap(64);
_inboundTagSets = new HashMap(1024);
_inboundTagSets = new HashMap(128);
context.statManager().createRateStat("crypto.sessionTagsExpired", "How many tags/sessions are expired?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
context.statManager().createRateStat("crypto.sessionTagsRemaining", "How many tags/sessions are remaining after a cleanup?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 });
_alive = true;
SimpleScheduler.getInstance().addEvent(new CleanupEvent(), 60*1000);
_context.simpleScheduler().addEvent(new CleanupEvent(), 60*1000);
}
@Override
@@ -143,7 +203,7 @@ public class TransientSessionKeyManager extends SessionKeyManager {
int expired = aggressiveExpire();
long expireTime = _context.clock().now() - beforeExpire;
_context.statManager().addRateData("crypto.sessionTagsExpired", expired, expireTime);
SimpleScheduler.getInstance().addEvent(this, 60*1000);
_context.simpleScheduler().addEvent(this, 60*1000);
}
}
@@ -243,7 +303,8 @@ public class TransientSessionKeyManager extends SessionKeyManager {
* Associate a new session key with the specified target. Metrics to determine
* when to expire that key begin with this call.
*
* @deprecated racy
* Racy if called after getCurrentKey() to check for a current session;
* use getCurrentOrNewKey() in that case.
*/
@Override
public void createSession(PublicKey target, SessionKey key) {
@@ -291,6 +352,31 @@ public class TransientSessionKeyManager extends SessionKeyManager {
return null;
}
/**
* How many to send, IF we need to.
* @return the configured value (not adjusted for current available)
* @since 0.9.2
*/
@Override
public int getTagsToSend() { return _tagsToSend; };
/**
* @return the configured value
* @since 0.9.2
*/
@Override
public int getLowThreshold() { return _lowThreshold; };
/**
* @return true if we have less than the threshold or what we have is about to expire
* @since 0.9.2
*/
@Override
public boolean shouldSendTags(PublicKey target, SessionKey key, int lowThreshold) {
return getAvailableTags(target, key) < lowThreshold ||
getAvailableTimeLeft(target, key) < SESSION_TAG_EXPIRATION_WINDOW;
}
/**
* Determine (approximately) how many available session tags for the current target
* have been confirmed and are available

View File

@@ -57,7 +57,9 @@ public class RoutingKeyGenerator {
private volatile long _lastChanged;
private final static Calendar _cal = GregorianCalendar.getInstance(TimeZone.getTimeZone("GMT"));
private final static SimpleDateFormat _fmt = new SimpleDateFormat("yyyyMMdd");
private static final String FORMAT = "yyyyMMdd";
private static final int LENGTH = FORMAT.length();
private final static SimpleDateFormat _fmt = new SimpleDateFormat(FORMAT);
public byte[] getModData() {
return _currentModData;
@@ -74,9 +76,7 @@ public class RoutingKeyGenerator {
* @return true if changed
*/
public synchronized boolean generateDateBasedModData() {
Date today = null;
long now = _context.clock().now();
synchronized (_cal) {
_cal.setTime(new Date(now));
_cal.set(Calendar.YEAR, _cal.get(Calendar.YEAR)); // gcj <= 4.0 workaround
_cal.set(Calendar.DAY_OF_YEAR, _cal.get(Calendar.DAY_OF_YEAR)); // gcj <= 4.0 workaround
@@ -84,12 +84,13 @@ public class RoutingKeyGenerator {
_cal.set(Calendar.MINUTE, 0);
_cal.set(Calendar.SECOND, 0);
_cal.set(Calendar.MILLISECOND, 0);
today = _cal.getTime();
}
Date today = _cal.getTime();
String modVal = _fmt.format(today);
byte[] mod = new byte[modVal.length()];
for (int i = 0; i < modVal.length(); i++)
if (modVal.length() != LENGTH)
throw new IllegalStateException();
byte[] mod = new byte[LENGTH];
for (int i = 0; i < LENGTH; i++)
mod[i] = (byte)(modVal.charAt(i) & 0xFF);
boolean changed = !DataHelper.eq(_currentModData, mod);
if (changed) {
@@ -112,9 +113,9 @@ public class RoutingKeyGenerator {
*/
public Hash getRoutingKey(Hash origKey) {
if (origKey == null) throw new IllegalArgumentException("Original key is null");
byte modVal[] = new byte[Hash.HASH_LENGTH + _currentModData.length];
byte modVal[] = new byte[Hash.HASH_LENGTH + LENGTH];
System.arraycopy(origKey.getData(), 0, modVal, 0, Hash.HASH_LENGTH);
System.arraycopy(_currentModData, 0, modVal, Hash.HASH_LENGTH, _currentModData.length);
System.arraycopy(_currentModData, 0, modVal, Hash.HASH_LENGTH, LENGTH);
return SHA256Generator.getInstance().calculateHash(modVal);
}

View File

@@ -1,455 +0,0 @@
package net.i2p.util;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
import org.xlattice.crypto.filters.BloomSHA1;
/**
* Series of bloom filters which decay over time, allowing their continual use
* for time sensitive data. This has a fixed size (per
* period, using two periods overall), allowing this to pump through hundreds of
* entries per second with virtually no false positive rate. Down the line,
* this may be refactored to allow tighter control of the size necessary for the
* contained bloom filters.
*
* Deprecated for use outside of the router; to be moved to router.jar.
*
* See main() for an analysis of false positive rate.
* See BloomFilterIVValidator for instantiation parameters.
* See DecayingHashSet for a smaller and simpler version.
* @see net.i2p.router.tunnel.BloomFilterIVValidator
* @see net.i2p.util.DecayingHashSet
*/
public class DecayingBloomFilter {
protected final I2PAppContext _context;
protected final Log _log;
private BloomSHA1 _current;
private BloomSHA1 _previous;
protected final int _durationMs;
protected final int _entryBytes;
private final byte _extenders[][];
private final byte _extended[];
private final byte _longToEntry[];
private final long _longToEntryMask;
protected long _currentDuplicates;
protected volatile boolean _keepDecaying;
protected final SimpleTimer.TimedEvent _decayEvent;
/** just for logging */
protected final String _name;
/** synchronize against this lock when switching double buffers */
protected final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock();
private static final int DEFAULT_M = 23;
private static final int DEFAULT_K = 11;
/** true for debugging */
private static final boolean ALWAYS_MISS = false;
/** only for extension by DHS */
protected DecayingBloomFilter(int durationMs, int entryBytes, String name, I2PAppContext context) {
_context = context;
_log = context.logManager().getLog(getClass());
_entryBytes = entryBytes;
_name = name;
_durationMs = durationMs;
// all final
_extenders = null;
_extended = null;
_longToEntry = null;
_longToEntryMask = 0;
context.addShutdownTask(new Shutdown());
_decayEvent = new DecayEvent();
_keepDecaying = true;
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
}
/**
* Create a bloom filter that will decay its entries over time.
*
* @param durationMs entries last for at least this long, but no more than twice this long
* @param entryBytes how large are the entries to be added? if this is less than 32 bytes,
* the entries added will be expanded by concatenating their XORing
* against with sufficient random values.
*/
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes) {
this(context, durationMs, entryBytes, "DBF");
}
/** @param name just for logging / debugging / stats */
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name) {
// this is instantiated in four different places, they may have different
// requirements, but for now use this as a gross method of memory reduction.
// m == 23 => 1MB each BloomSHA1 (4 pairs = 8MB total)
this(context, durationMs, entryBytes, name, context.getProperty("router.decayingBloomFilterM", DEFAULT_M));
}
/** @param m filter size exponent */
public DecayingBloomFilter(I2PAppContext context, int durationMs, int entryBytes, String name, int m) {
_context = context;
_log = context.logManager().getLog(DecayingBloomFilter.class);
_entryBytes = entryBytes;
_name = name;
int k = DEFAULT_K;
// max is (23,11) or (26,10); see KeySelector for details
if (m > DEFAULT_M)
k--;
_current = new BloomSHA1(m, k);
_previous = new BloomSHA1(m, k);
_durationMs = durationMs;
int numExtenders = (32+ (entryBytes-1))/entryBytes - 1;
if (numExtenders < 0)
numExtenders = 0;
_extenders = new byte[numExtenders][entryBytes];
for (int i = 0; i < numExtenders; i++)
_context.random().nextBytes(_extenders[i]);
if (numExtenders > 0) {
_extended = new byte[32];
_longToEntry = new byte[_entryBytes];
_longToEntryMask = (1l << (_entryBytes * 8l)) -1;
} else {
// final
_extended = null;
_longToEntry = null;
_longToEntryMask = 0;
}
_decayEvent = new DecayEvent();
_keepDecaying = true;
SimpleTimer.getInstance().addEvent(_decayEvent, _durationMs);
if (_log.shouldLog(Log.WARN))
_log.warn("New DBF " + name + " m = " + m + " k = " + k + " entryBytes = " + entryBytes +
" numExtenders = " + numExtenders + " cycle (s) = " + (durationMs / 1000));
// try to get a handle on memory usage vs. false positives
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".size",
"Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".dups",
"1000000 * Duplicates/Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
context.statManager().createRateStat("router.decayingBloomFilter." + name + ".log10(falsePos)",
"log10 of the false positive rate (must have net.i2p.util.DecayingBloomFilter=DEBUG)",
"Router", new long[] { 10 * Math.max(60*1000, durationMs) });
context.addShutdownTask(new Shutdown());
}
/**
* @since 0.8.8
*/
private class Shutdown implements Runnable {
public void run() {
clear();
}
}
public long getCurrentDuplicateCount() { return _currentDuplicates; }
/** unsynchronized but only used for logging elsewhere */
public int getInsertedCount() {
return _current.size() + _previous.size();
}
/** unsynchronized, only used for logging elsewhere */
public double getFalsePositiveRate() {
return _current.falsePositives();
}
/**
* @return true if the entry added is a duplicate
*/
public boolean add(byte entry[]) {
return add(entry, 0, entry.length);
}
/**
* @return true if the entry added is a duplicate
*/
public boolean add(byte entry[], int off, int len) {
if (ALWAYS_MISS) return false;
if (entry == null)
throw new IllegalArgumentException("Null entry");
if (len != _entryBytes)
throw new IllegalArgumentException("Bad entry [" + len + ", expected "
+ _entryBytes + "]");
getReadLock();
try {
return locked_add(entry, off, len, true);
} finally { releaseReadLock(); }
}
/**
* @return true if the entry added is a duplicate. the number of low order
* bits used is determined by the entryBytes parameter used on creation of the
* filter.
*
*/
public boolean add(long entry) {
if (ALWAYS_MISS) return false;
if (_entryBytes <= 7)
entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask);
//entry &= _longToEntryMask;
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
getReadLock();
try {
return locked_add(_longToEntry, 0, _longToEntry.length, true);
} finally { releaseReadLock(); }
}
/**
* @return true if the entry is already known. this does NOT add the
* entry however.
*
*/
public boolean isKnown(long entry) {
if (ALWAYS_MISS) return false;
if (_entryBytes <= 7)
entry = ((entry ^ _longToEntryMask) & ((1 << 31)-1)) | (entry ^ _longToEntryMask);
if (entry < 0) {
DataHelper.toLong(_longToEntry, 0, _entryBytes, 0-entry);
_longToEntry[0] |= (1 << 7);
} else {
DataHelper.toLong(_longToEntry, 0, _entryBytes, entry);
}
getReadLock();
try {
return locked_add(_longToEntry, 0, _longToEntry.length, false);
} finally { releaseReadLock(); }
}
private boolean locked_add(byte entry[], int offset, int len, boolean addIfNew) {
if (_extended != null) {
// extend the entry to 32 bytes
System.arraycopy(entry, offset, _extended, 0, len);
for (int i = 0; i < _extenders.length; i++)
DataHelper.xor(entry, offset, _extenders[i], 0, _extended, _entryBytes * (i+1), _entryBytes);
BloomSHA1.FilterKey key = _current.getFilterKey(_extended, 0, 32);
boolean seen = _current.locked_member(key);
if (!seen)
seen = _previous.locked_member(key);
if (seen) {
_currentDuplicates++;
_current.release(key);
return true;
} else {
if (addIfNew) {
_current.locked_insert(key);
}
_current.release(key);
return false;
}
} else {
BloomSHA1.FilterKey key = _current.getFilterKey(entry, offset, len);
boolean seen = _current.locked_member(key);
if (!seen)
seen = _previous.locked_member(key);
if (seen) {
_currentDuplicates++;
_current.release(key);
return true;
} else {
if (addIfNew) {
_current.locked_insert(key);
}
_current.release(key);
return false;
}
}
}
public void clear() {
if (!getWriteLock())
return;
try {
_current.clear();
_previous.clear();
_currentDuplicates = 0;
} finally { releaseWriteLock(); }
}
public void stopDecaying() {
_keepDecaying = false;
SimpleTimer.getInstance().removeEvent(_decayEvent);
}
protected void decay() {
int currentCount = 0;
long dups = 0;
double fpr = 0d;
if (!getWriteLock())
return;
try {
BloomSHA1 tmp = _previous;
currentCount = _current.size();
if (_log.shouldLog(Log.DEBUG) && currentCount > 0)
fpr = _current.falsePositives();
_previous = _current;
_current = tmp;
_current.clear();
dups = _currentDuplicates;
_currentDuplicates = 0;
} finally { releaseWriteLock(); }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Decaying the filter " + _name + " after inserting " + currentCount
+ " elements and " + dups + " false positives with FPR = " + fpr);
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".size",
currentCount);
if (currentCount > 0)
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".dups",
1000l*1000*dups/currentCount);
if (fpr > 0d) {
// only if log.shouldLog(Log.DEBUG) ...
long exponent = (long) Math.log10(fpr);
_context.statManager().addRateData("router.decayingBloomFilter." + _name + ".log10(falsePos)",
exponent);
}
}
private class DecayEvent implements SimpleTimer.TimedEvent {
public void timeReached() {
if (_keepDecaying) {
decay();
SimpleTimer.getInstance().addEvent(DecayEvent.this, _durationMs);
}
}
}
/** @since 0.8.11 moved from DecayingHashSet */
protected void getReadLock() {
_reorganizeLock.readLock().lock();
}
/** @since 0.8.11 moved from DecayingHashSet */
protected void releaseReadLock() {
_reorganizeLock.readLock().unlock();
}
/**
* @return true if the lock was acquired
* @since 0.8.11 moved from DecayingHashSet
*/
protected boolean getWriteLock() {
try {
boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS);
if (!rv)
_log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats"));
return rv;
} catch (InterruptedException ie) {}
return false;
}
/** @since 0.8.11 moved from DecayingHashSet */
protected void releaseWriteLock() {
_reorganizeLock.writeLock().unlock();
}
/**
* This filter is used only for participants and OBEPs, not
* IBGWs, so depending on your assumptions of avg. tunnel length,
* the performance is somewhat better than the gross share BW
* would indicate.
*
*<pre>
* Following stats for m=23, k=11:
* Theoretical false positive rate for 16 KBps: 1.17E-21
* Theoretical false positive rate for 24 KBps: 9.81E-20
* Theoretical false positive rate for 32 KBps: 2.24E-18
* Theoretical false positive rate for 256 KBps: 7.45E-9
* Theoretical false positive rate for 512 KBps: 5.32E-6
* Theoretical false positive rate for 1024 KBps: 1.48E-3
* Then it gets bad: 1280 .67%; 1536 2.0%; 1792 4.4%; 2048 8.2%.
*
* Following stats for m=24, k=10:
* 1280 4.5E-5; 1792 5.6E-4; 2048 0.14%
*
* Following stats for m=25, k=10:
* 1792 2.4E-6; 4096 0.14%; 5120 0.6%; 6144 1.7%; 8192 6.8%; 10240 15%
*</pre>
*/
public static void main(String args[]) {
System.out.println("Usage: DecayingBloomFilter [kbps [m [iterations]]] (default 256 23 10)");
int kbps = 256;
if (args.length >= 1) {
try {
kbps = Integer.parseInt(args[0]);
} catch (NumberFormatException nfe) {}
}
int m = DEFAULT_M;
if (args.length >= 2) {
try {
m = Integer.parseInt(args[1]);
} catch (NumberFormatException nfe) {}
}
int iterations = 10;
if (args.length >= 3) {
try {
iterations = Integer.parseInt(args[2]);
} catch (NumberFormatException nfe) {}
}
testByLong(kbps, m, iterations);
testByBytes(kbps, m, iterations);
}
private static void testByLong(int kbps, int m, int numRuns) {
int messages = 60 * 10 * kbps;
Random r = new Random();
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 8, "test", m);
int falsePositives = 0;
long totalTime = 0;
double fpr = 0d;
for (int j = 0; j < numRuns; j++) {
long start = System.currentTimeMillis();
for (int i = 0; i < messages; i++) {
if (filter.add(r.nextLong())) {
falsePositives++;
//System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
}
}
totalTime += System.currentTimeMillis() - start;
fpr = filter.getFalsePositiveRate();
filter.clear();
}
filter.stopDecaying();
System.out.println("False postive rate should be " + fpr);
System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
+ falsePositives + " false positives");
}
private static void testByBytes(int kbps, int m, int numRuns) {
byte iv[][] = new byte[60*10*kbps][16];
Random r = new Random();
for (int i = 0; i < iv.length; i++)
r.nextBytes(iv[i]);
DecayingBloomFilter filter = new DecayingBloomFilter(I2PAppContext.getGlobalContext(), 600*1000, 16, "test", m);
int falsePositives = 0;
long totalTime = 0;
double fpr = 0d;
for (int j = 0; j < numRuns; j++) {
long start = System.currentTimeMillis();
for (int i = 0; i < iv.length; i++) {
if (filter.add(iv[i])) {
falsePositives++;
//System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
}
}
totalTime += System.currentTimeMillis() - start;
fpr = filter.getFalsePositiveRate();
filter.clear();
}
filter.stopDecaying();
System.out.println("False postive rate should be " + fpr);
System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
+ falsePositives + " false positives");
//System.out.println("inserted: " + bloom.size() + " with " + bloom.capacity()
// + " (" + bloom.falsePositives()*100.0d + "% false positive)");
}
}

View File

@@ -1,335 +0,0 @@
package net.i2p.util;
import java.util.Random;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
/**
* Double buffered hash set.
* Since DecayingBloomFilter was instantiated 4 times for a total memory usage
* of 8MB, it seemed like we could do a lot better, given these usage stats
* on a class L router:
*
* ./router/java/src/net/i2p/router/tunnel/BuildMessageProcessor.java:
* 32 bytes, peak 10 entries in 1m
* (320 peak entries seen on fast router)
*
* ./router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java:
* 4 bytes, peak 150 entries in 10s
* (1600 peak entries seen on fast router)
*
* ./router/java/src/net/i2p/router/MessageValidator.java:
* 8 bytes, peak 1K entries in 2m
* (36K peak entries seen on fast router)
*
* ./router/java/src/net/i2p/router/tunnel/BloomFilterIVValidator.java:
* 16 bytes, peak 15K entries in 10m
*
* If the ArrayWrapper object in the HashSet is 50 bytes, and BloomSHA1(23, 11) is 1MB,
* then for less than 20K entries this is smaller.
* And this uses space proportional to traffiic, so it doesn't penalize small routers
* with a fixed 8MB.
* So let's try it for the first 2 or 3, for now.
*
* Also, DBF is syncrhonized, and uses SimpleTimer.
* Here we use a read/write lock, with synchronization only
* when switching double buffers, and we use SimpleScheduler.
*
* Yes, we could stare at stats all day, and try to calculate an acceptable
* false-positive rate for each of the above uses, then estimate the DBF size
* required to meet that rate for a given usage. Or even start adjusting the
* Bloom filter m and k values on a per-DBF basis. But it's a whole lot easier
* to implement something with a zero false positive rate, and uses less memory
* for almost all bandwidth classes.
*
* This has a strictly zero false positive rate for <= 8 byte keys.
* For larger keys, it is 1 / (2**64) ~= 5E-20, which is better than
* DBF for any entry count greater than about 14K.
*
* DBF has a zero false negative rate over the period
* 2 * durationMs. And a 100% false negative rate beyond that period.
* This has the same properties.
*
* This performs about twice as fast as DBF in the test below.
*
* Deprecated for use outside of the router; to be moved to router.jar.
*
* @author zzz
*/
public class DecayingHashSet extends DecayingBloomFilter {
private ConcurrentHashSet<ArrayWrapper> _current;
private ConcurrentHashSet<ArrayWrapper> _previous;
/**
* Create a double-buffered hash set that will decay its entries over time.
*
* @param durationMs entries last for at least this long, but no more than twice this long
* @param entryBytes how large are the entries to be added? 1 to 32 bytes
*/
public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes) {
this(context, durationMs, entryBytes, "DHS");
}
/** @param name just for logging / debugging / stats */
public DecayingHashSet(I2PAppContext context, int durationMs, int entryBytes, String name) {
super(durationMs, entryBytes, name, context);
if (entryBytes <= 0 || entryBytes > 32)
throw new IllegalArgumentException("Bad size");
_current = new ConcurrentHashSet(128);
_previous = new ConcurrentHashSet(128);
if (_log.shouldLog(Log.WARN))
_log.warn("New DHS " + name + " entryBytes = " + entryBytes +
" cycle (s) = " + (durationMs / 1000));
// try to get a handle on memory usage vs. false positives
context.statManager().createRateStat("router.decayingHashSet." + name + ".size",
"Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
context.statManager().createRateStat("router.decayingHashSet." + name + ".dups",
"1000000 * Duplicates/Size", "Router", new long[] { 10 * Math.max(60*1000, durationMs) });
}
/** unsynchronized but only used for logging elsewhere */
@Override
public int getInsertedCount() {
return _current.size() + _previous.size();
}
/** pointless, only used for logging elsewhere */
@Override
public double getFalsePositiveRate() {
if (_entryBytes <= 8)
return 0d;
return 1d / Math.pow(2d, 64d); // 5.4E-20
}
/**
* @return true if the entry added is a duplicate
*/
@Override
public boolean add(byte entry[], int off, int len) {
if (entry == null)
throw new IllegalArgumentException("Null entry");
if (len != _entryBytes)
throw new IllegalArgumentException("Bad entry [" + len + ", expected "
+ _entryBytes + "]");
ArrayWrapper w = new ArrayWrapper(entry, off, len);
getReadLock();
try {
return locked_add(w, true);
} finally { releaseReadLock(); }
}
/**
* @return true if the entry added is a duplicate. the number of low order
* bits used is determined by the entryBytes parameter used on creation of the
* filter.
*
*/
@Override
public boolean add(long entry) {
return add(entry, true);
}
/**
* @return true if the entry is already known. this does NOT add the
* entry however.
*
*/
@Override
public boolean isKnown(long entry) {
return add(entry, false);
}
private boolean add(long entry, boolean addIfNew) {
ArrayWrapper w = new ArrayWrapper(entry);
getReadLock();
try {
return locked_add(w, addIfNew);
} finally { releaseReadLock(); }
}
/**
* @param addIfNew if true, add the element to current if it is not already there or in previous;
* if false, only check
* @return if the element is in either the current or previous set
*/
private boolean locked_add(ArrayWrapper w, boolean addIfNew) {
boolean seen = _previous.contains(w);
// only access _current once.
if (!seen) {
if (addIfNew)
seen = !_current.add(w);
else
seen = _current.contains(w);
}
if (seen) {
// why increment if addIfNew == false? Only used for stats...
_currentDuplicates++;
}
return seen;
}
@Override
public void clear() {
_current.clear();
_previous.clear();
_currentDuplicates = 0;
}
/** super doesn't call clear, but neither do the users, so it seems like we should here */
@Override
public void stopDecaying() {
_keepDecaying = false;
clear();
}
@Override
protected void decay() {
int currentCount = 0;
long dups = 0;
if (!getWriteLock())
return;
try {
ConcurrentHashSet<ArrayWrapper> tmp = _previous;
currentCount = _current.size();
_previous = _current;
_current = tmp;
_current.clear();
dups = _currentDuplicates;
_currentDuplicates = 0;
} finally { releaseWriteLock(); }
if (_log.shouldLog(Log.DEBUG))
_log.debug("Decaying the filter " + _name + " after inserting " + currentCount
+ " elements and " + dups + " false positives");
_context.statManager().addRateData("router.decayingHashSet." + _name + ".size",
currentCount);
if (currentCount > 0)
_context.statManager().addRateData("router.decayingHashSet." + _name + ".dups",
1000l*1000*dups/currentCount);
}
/**
* This saves the data as-is if the length is <= 8 bytes,
* otherwise it stores an 8-byte hash.
* Hash function is from DataHelper, modded to get
* the maximum entropy given the length of the data.
*/
private static class ArrayWrapper {
private final long _longhashcode;
public ArrayWrapper(byte[] b, int offset, int len) {
int idx = offset;
int shift = Math.min(8, 64 / len);
long lhc = 0;
for (int i = 0; i < len; i++) {
// xor better than + in tests
lhc ^= (((long) b[idx++]) << (i * shift));
}
_longhashcode = lhc;
}
/** faster version for when storing <= 8 bytes */
public ArrayWrapper(long b) {
_longhashcode = b;
}
public int hashCode() {
return (int) _longhashcode;
}
public long longHashCode() {
return _longhashcode;
}
public boolean equals(Object o) {
if (o == null || !(o instanceof ArrayWrapper))
return false;
return ((ArrayWrapper) o).longHashCode() == _longhashcode;
}
}
/**
* vs. DBF, this measures 1.93x faster for testByLong and 2.46x faster for testByBytes.
*/
public static void main(String args[]) {
/** KBytes per sec, 1 message per KByte */
int kbps = 256;
int iterations = 10;
//testSize();
testByLong(kbps, iterations);
testByBytes(kbps, iterations);
}
/** and the answer is: 49.9 bytes. The ArrayWrapper alone measured 16, so that's 34 for the HashSet entry. */
/*****
private static void testSize() {
int qty = 256*1024;
byte b[] = new byte[8];
Random r = new Random();
long old = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
ConcurrentHashSet foo = new ConcurrentHashSet(qty);
for (int i = 0; i < qty; i++) {
r.nextBytes(b);
foo.add(new ArrayWrapper(b, 0, 8));
}
long used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
System.out.println("Memory per ArrayWrapper: " + (((double) (used - old)) / qty));
}
*****/
/** 8 bytes, simulate the router message validator */
private static void testByLong(int kbps, int numRuns) {
int messages = 60 * 10 * kbps;
Random r = new Random();
DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 8);
int falsePositives = 0;
long totalTime = 0;
for (int j = 0; j < numRuns; j++) {
long start = System.currentTimeMillis();
for (int i = 0; i < messages; i++) {
if (filter.add(r.nextLong())) {
falsePositives++;
System.out.println("False positive " + falsePositives + " (testByLong j=" + j + " i=" + i + ")");
}
}
totalTime += System.currentTimeMillis() - start;
filter.clear();
}
System.out.println("False postive rate should be " + filter.getFalsePositiveRate());
filter.stopDecaying();
System.out.println("After " + numRuns + " runs pushing " + messages + " entries in "
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
+ falsePositives + " false positives");
}
/** 16 bytes, simulate the tunnel IV validator */
private static void testByBytes(int kbps, int numRuns) {
byte iv[][] = new byte[60*10*kbps][16];
Random r = new Random();
for (int i = 0; i < iv.length; i++)
r.nextBytes(iv[i]);
DecayingBloomFilter filter = new DecayingHashSet(I2PAppContext.getGlobalContext(), 600*1000, 16);
int falsePositives = 0;
long totalTime = 0;
for (int j = 0; j < numRuns; j++) {
long start = System.currentTimeMillis();
for (int i = 0; i < iv.length; i++) {
if (filter.add(iv[i])) {
falsePositives++;
System.out.println("False positive " + falsePositives + " (testByBytes j=" + j + " i=" + i + ")");
}
}
totalTime += System.currentTimeMillis() - start;
filter.clear();
}
System.out.println("False postive rate should be " + filter.getFalsePositiveRate());
filter.stopDecaying();
System.out.println("After " + numRuns + " runs pushing " + iv.length + " entries in "
+ DataHelper.formatDuration(totalTime/numRuns) + " per run, there were "
+ falsePositives + " false positives");
}
}

View File

@@ -458,19 +458,32 @@ public class EepGet {
}
public void stopFetching() { _keepFetching = false; }
/**
* Blocking fetch, returning true if the URL was retrieved, false if all retries failed
* Blocking fetch, returning true if the URL was retrieved, false if all retries failed.
*
* Header timeout default 45 sec, total timeout default none, inactivity timeout default 60 sec.
*/
public boolean fetch() { return fetch(_fetchHeaderTimeout); }
/**
* Blocking fetch, timing out individual attempts if the HTTP response headers
* don't come back in the time given. If the timeout is zero or less, this will
* wait indefinitely.
*
* Total timeout default none, inactivity timeout default 60 sec.
*/
public boolean fetch(long fetchHeaderTimeout) {
return fetch(fetchHeaderTimeout, -1, -1);
}
/**
* Blocking fetch.
*
* @param fetchHeaderTimeout <= 0 for none (proxy will timeout if none, none isn't recommended if no proxy)
* @param totalTimeout <= 0 for default none
* @param inactivityTimeout <= 0 for default 60 sec
*/
public boolean fetch(long fetchHeaderTimeout, long totalTimeout, long inactivityTimeout) {
_fetchHeaderTimeout = fetchHeaderTimeout;
_fetchEndTime = (totalTimeout > 0 ? System.currentTimeMillis() + totalTimeout : -1);

View File

@@ -9,12 +9,13 @@ import net.i2p.I2PAppContext;
*/
class Executor implements Runnable {
private final I2PAppContext _context;
private Log _log;
private final List _readyEvents;
private final Log _log;
private final List<SimpleTimer.TimedEvent> _readyEvents;
private final SimpleStore runn;
public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) {
public Executor(I2PAppContext ctx, Log log, List<SimpleTimer.TimedEvent> events, SimpleStore x) {
_context = ctx;
_log = log;
_readyEvents = events;
runn = x;
}
@@ -26,7 +27,7 @@ class Executor implements Runnable {
if (_readyEvents.isEmpty())
try { _readyEvents.wait(); } catch (InterruptedException ie) {}
if (!_readyEvents.isEmpty())
evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0);
evt = _readyEvents.remove(0);
}
if (evt != null) {
@@ -34,21 +35,12 @@ class Executor implements Runnable {
try {
evt.timeReached();
} catch (Throwable t) {
log("Executing task " + evt + " exited unexpectedly, please report", t);
_log.error("Executing task " + evt + " exited unexpectedly, please report", t);
}
long time = _context.clock().now() - before;
// FIXME _log won't be non-null unless we already had a CRIT
if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) )
if ( (time > 1000) && (_log.shouldLog(Log.WARN)) )
_log.warn("wtf, event execution took " + time + ": " + evt);
}
}
}
private void log(String msg, Throwable t) {
synchronized (this) {
if (_log == null)
_log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class);
}
_log.log(Log.CRIT, msg, t);
}
}

View File

@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import net.i2p.I2PAppContext;
import net.i2p.data.DataHelper;
@@ -53,6 +54,10 @@ public class LogManager {
public final static String PROP_CONSOLEBUFFERSIZE = "logger.consoleBufferSize";
public final static String PROP_DISPLAYONSCREENLEVEL = "logger.minimumOnScreenLevel";
public final static String PROP_DEFAULTLEVEL = "logger.defaultLevel";
/** @since 0.9.2 */
private static final String PROP_LOG_BUFFER_SIZE = "logger.logBufferSize";
/** @since 0.9.2 */
private static final String PROP_DROP = "logger.dropOnOverflow";
public final static String PROP_RECORD_PREFIX = "logger.record.";
public final static String DEFAULT_FORMAT = DATE + " " + PRIORITY + " [" + THREAD + "] " + CLASS + ": " + MESSAGE;
@@ -110,16 +115,18 @@ public class LogManager {
/** whether or not we even want to display anything on stdout */
private boolean _displayOnScreen;
/** how many records we want to buffer in the "recent logs" list */
private int _consoleBufferSize;
private int _consoleBufferSize = DEFAULT_CONSOLEBUFFERSIZE;
/** the actual "recent logs" list */
private final LogConsoleBuffer _consoleBuffer;
private int _logBufferSize = MAX_BUFFER;
private boolean _dropOnOverflow;
private final AtomicLong _droppedRecords = new AtomicLong();
private boolean _alreadyNoticedMissingConfig;
public LogManager(I2PAppContext context) {
_displayOnScreen = true;
_alreadyNoticedMissingConfig = false;
_records = new LinkedBlockingQueue(MAX_BUFFER);
_limits = new ConcurrentHashSet();
_logs = new ConcurrentHashMap(128);
_defaultLimit = Log.ERROR;
@@ -127,6 +134,7 @@ public class LogManager {
_log = getLog(LogManager.class);
String location = context.getProperty(CONFIG_LOCATION_PROP, CONFIG_LOCATION_DEFAULT);
setConfig(location);
_records = new LinkedBlockingQueue(_logBufferSize);
_consoleBuffer = new LogConsoleBuffer(_consoleBufferSize);
// If we aren't in the router context, delay creating the LogWriter until required,
// so it doesn't create a log directory and log files unless there is output.
@@ -243,6 +251,11 @@ public class LogManager {
boolean success = _records.offer(record);
if (!success) {
if (_dropOnOverflow) {
// TODO use the counter in a periodic drop msg
_droppedRecords.incrementAndGet();
return;
}
// the writer waits 10 seconds *or* until we tell them to wake up
// before rereading the config and writing out any log messages
synchronized (_writer) {
@@ -345,15 +358,17 @@ public class LogManager {
try {
String str = config.getProperty(PROP_CONSOLEBUFFERSIZE);
if (str == null)
_consoleBufferSize = DEFAULT_CONSOLEBUFFERSIZE;
else
if (str != null)
_consoleBufferSize = Integer.parseInt(str);
} catch (NumberFormatException nfe) {
System.err.println("Invalid console buffer size");
nfe.printStackTrace();
_consoleBufferSize = DEFAULT_CONSOLEBUFFERSIZE;
}
} catch (NumberFormatException nfe) {}
try {
String str = config.getProperty(PROP_LOG_BUFFER_SIZE);
if (str != null)
_logBufferSize = Integer.parseInt(str);
} catch (NumberFormatException nfe) {}
_dropOnOverflow = Boolean.valueOf(config.getProperty(PROP_DROP)).booleanValue();
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("Log set to use the base log file as " + _baseLogfilename);

View File

@@ -44,7 +44,7 @@ public class LookaheadInputStream extends FilterInputStream {
Arrays.fill(_footerLookahead, (byte)0x00);
int footerRead = 0;
while (footerRead < _footerLookahead.length) {
int read = in.read(_footerLookahead);
int read = in.read(_footerLookahead, footerRead, _footerLookahead.length - footerRead);
if (read == -1) throw new IOException("EOF reading the footer lookahead");
footerRead += read;
}

View File

@@ -90,6 +90,8 @@ public class SSLEepGet extends EepGet {
/** may be null if init failed */
private SavingTrustManager _stm;
private static final boolean _isAndroid = System.getProperty("java.vendor").contains("Android");
/**
* A new SSLEepGet with a new SSLState
*/
@@ -192,12 +194,23 @@ public class SSLEepGet extends EepGet {
String override = System.getProperty("javax.net.ssl.keyStore");
if (override != null)
success = loadCerts(new File(override), ks);
if (!success)
success = loadCerts(new File(System.getProperty("java.home"), "lib/security/jssecacerts"), ks);
if (!success)
success = loadCerts(new File(System.getProperty("java.home"), "lib/security/cacerts"), ks);
if (!success) {
if (_isAndroid) {
// thru API 13. As of API 14 (ICS), the file is gone, but
// ks.load(null, pw) will bring in the default certs?
success = loadCerts(new File(System.getProperty("java.home"), "etc/security/cacerts.bks"), ks);
} else {
success = loadCerts(new File(System.getProperty("java.home"), "lib/security/jssecacerts"), ks);
if (!success)
success = loadCerts(new File(System.getProperty("java.home"), "lib/security/cacerts"), ks);
}
}
if (!success) {
try {
// must be initted
ks.load(null, "changeit".toCharArray());
} catch (Exception e) {}
_log.error("All key store loads failed, will only load local certificates");
} else if (_log.shouldLog(Log.INFO)) {
int count = 0;

View File

@@ -146,7 +146,7 @@ public class SimpleScheduler {
* Same as SimpleTimer.TimedEvent but use run() instead of timeReached(), and remembers the time
*/
private class RunnableEvent implements Runnable {
protected SimpleTimer.TimedEvent _timedEvent;
protected final SimpleTimer.TimedEvent _timedEvent;
protected long _scheduled;
public RunnableEvent(SimpleTimer.TimedEvent t, long timeoutMs) {