propagate from branch 'i2p.i2p' (head 687bab4d9de8e6686e2734069dfb72f6f2134124)

to branch 'i2p.i2p.zzz.test4' (head 9580436bb581971920ea96e58861142c546c861b)
This commit is contained in:
zzz
2011-01-07 00:37:51 +00:00
96 changed files with 1519 additions and 776 deletions

View File

@@ -63,7 +63,7 @@ import net.i2p.util.SecureDirectory;
*/
public class I2PAppContext {
/** the context that components without explicit root are bound */
protected static I2PAppContext _globalAppContext;
protected static volatile I2PAppContext _globalAppContext;
private Properties _overrideProps;
@@ -117,7 +117,8 @@ public class I2PAppContext {
*
*/
public static I2PAppContext getGlobalContext() {
// skip the global lock
// skip the global lock - _gAC must be volatile
// http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
I2PAppContext rv = _globalAppContext;
if (rv != null)
return rv;
@@ -474,6 +475,9 @@ public class I2PAppContext {
* provided during the context construction, as well as the ones included in
* System.getProperties.
*
* WARNING - not overridden in RouterContext, doesn't contain router config settings,
* use getProperties() instead.
*
* @return set of Strings containing the names of defined system properties
*/
public Set getPropertyNames() {
@@ -483,6 +487,21 @@ public class I2PAppContext {
return names;
}
/**
* Access the configuration attributes of this context, listing the properties
* provided during the context construction, as well as the ones included in
* System.getProperties.
*
* @return new Properties with system and context properties
* @since 0.8.4
*/
public Properties getProperties() {
Properties rv = new Properties();
rv.putAll(System.getProperties());
rv.putAll(_overrideProps);
return rv;
}
/**
* The statistics component with which we can track various events
* over time.

View File

@@ -12,6 +12,8 @@ package net.i2p.client;
import java.util.Date;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import net.i2p.I2PAppContext;
import net.i2p.data.DataFormatException;
@@ -41,22 +43,51 @@ import net.i2p.util.Log;
* @author jrandom
*/
class I2CPMessageProducer {
private final static Log _log = new Log(I2CPMessageProducer.class);
private final Log _log;
private final I2PAppContext _context;
private int _sendBps;
private long _sendPeriodBytes;
private long _sendPeriodBeginTime;
private int _maxBytesPerSecond;
private volatile int _sendPeriodBytes;
private volatile long _sendPeriodBeginTime;
private final ReentrantLock _lock;
private static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond";
/** see ConnectionOptions in streaming - MTU + streaming overhead + gzip overhead */
private static final int TYP_SIZE = 1730 + 28 + 23;
private static final int MIN_RATE = 2 * TYP_SIZE;
public I2CPMessageProducer(I2PAppContext context) {
_context = context;
context.statManager().createRateStat("client.sendBpsRaw", "How fast we pump out I2CP data messages", "ClientMessages", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 });
_log = context.logManager().getLog(I2CPMessageProducer.class);
_lock = new ReentrantLock(true);
context.statManager().createRateStat("client.sendThrottled", "Times waited for bandwidth", "ClientMessages", new long[] { 60*1000 });
context.statManager().createRateStat("client.sendDropped", "Length of msg dropped waiting for bandwidth", "ClientMessages", new long[] { 60*1000 });
}
/**
* Update the bandwidth setting
* @since 0.8.4
*/
public void updateBandwidth(I2PSessionImpl session) {
String max = session.getOptions().getProperty(PROP_MAX_BW);
if (max != null) {
try {
int iMax = Integer.parseInt(max);
if (iMax > 0)
// round up to next higher TYP_SIZE for efficiency, then add some fudge for small messages
_maxBytesPerSecond = 256 + Math.max(MIN_RATE, TYP_SIZE * ((iMax + TYP_SIZE - 1) / TYP_SIZE));
else
_maxBytesPerSecond = 0;
} catch (NumberFormatException nfe) {}
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("Setting " + _maxBytesPerSecond + " BPS max");
}
/**
* Send all the messages that a client needs to send to a router to establish
* a new session.
*/
public void connect(I2PSessionImpl session) throws I2PSessionException {
updateBandwidth(session);
CreateSessionMessage msg = new CreateSessionMessage();
SessionConfig cfg = new SessionConfig(session.getMyDestination());
cfg.setOptions(session.getOptions());
@@ -99,6 +130,9 @@ class I2CPMessageProducer {
*/
public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag,
SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException {
if (!updateBps(payload.length, expires))
// drop the message... send fail notification?
return;
SendMessageMessage msg;
if (expires > 0) {
msg = new SendMessageExpiresMessage();
@@ -111,20 +145,108 @@ class I2CPMessageProducer {
Payload data = createPayload(dest, payload, tag, key, tags, newKey);
msg.setPayload(data);
session.sendMessage(msg);
updateBps(payload.length);
}
private void updateBps(int len) {
long now = _context.clock().now();
float period = ((float)now-_sendPeriodBeginTime)/1000f;
if (period >= 1f) {
// first term decays on slow transmission
_sendBps = (int)(((float)0.9f * (float)_sendBps) + ((float)0.1f*((float)_sendPeriodBytes)/period));
_sendPeriodBytes = len;
_sendPeriodBeginTime = now;
_context.statManager().addRateData("client.sendBpsRaw", _sendBps, 0);
} else {
_sendPeriodBytes += len;
/**
* Super-simple bandwidth throttler.
* We only calculate on a one-second basis, so large messages
* (compared to the one-second limit) may exceed the limits.
* Tuned for streaming, may not work well for large datagrams.
*
* This does poorly with low rate limits since it doesn't credit
* bandwidth across two periods. So the limit is rounded up,
* and the min limit is set to 2x the typ size, above.
*
* Blocking so this could be very bad for retransmissions,
* as it could clog StreamingTimer.
* Waits are somewhat "fair" using ReentrantLock.
* While out-of-order transmission is acceptable, fairness
* reduces the chance of starvation. ReentrantLock does not
* guarantee in-order execution due to thread priority issues,
* so out-of-order may still occur. But shouldn't happen within
* the same thread anyway... Also note that small messages may
* go ahead of large ones that are waiting for the next window.
* Also, threads waiting a second time go to the back of the line.
*
* Since this is at the I2CP layer, it includes streaming overhead,
* streaming acks and retransmissions,
* gzip overhead (or "underhead" for compression),
* repliable datagram overhead, etc.
* However, it does not, of course, include the substantial overhead
* imposed by the router for the leaseset, tags, encryption,
* and fixed-size tunnel messages.
*
* @param expires if > 0, an expiration date
* @return true if we should send the message, false to drop it
*/
private boolean updateBps(int len, long expires) {
if (_maxBytesPerSecond <= 0)
return true;
//synchronized(this) {
_lock.lock();
try {
int waitCount = 0;
while (true) {
long now = _context.clock().now();
if (waitCount > 0 && expires > 0 && expires < now) {
// just say no to bufferbloat... drop the message right here
_context.statManager().addRateData("client.sendDropped", len, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping " + len + " byte msg expired in queue");
return false;
}
long period = now - _sendPeriodBeginTime;
if (period >= 2000) {
// start new period, always let it through no matter how big
_sendPeriodBytes = len;
_sendPeriodBeginTime = now;
if (_log.shouldLog(Log.DEBUG))
_log.debug("New period after idle, " + len + " bytes");
return true;
}
if (period >= 1000) {
// start new period
// Allow burst within 2 sec, only advance window by 1 sec, and
// every other second give credit for unused bytes in previous period
if (_sendPeriodBytes > 0 && ((_sendPeriodBeginTime / 1000) & 0x01) == 0)
_sendPeriodBytes += len - _maxBytesPerSecond;
else
_sendPeriodBytes = len;
_sendPeriodBeginTime += 1000;
if (_log.shouldLog(Log.DEBUG))
_log.debug("New period, " + len + " bytes");
return true;
}
if (_sendPeriodBytes + len <= _maxBytesPerSecond) {
// still bytes available in this period
_sendPeriodBytes += len;
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending " + len + ", Elapsed " + period + "ms, total " + _sendPeriodBytes + " bytes");
return true;
}
if (waitCount >= 2) {
// just say no to bufferbloat... drop the message right here
_context.statManager().addRateData("client.sendDropped", len, 0);
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping " + len + " byte msg after waiting " + waitCount + " times");
return false;
}
// wait until next period
_context.statManager().addRateData("client.sendThrottled", ++waitCount, 0);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Throttled " + len + " bytes, wait #" + waitCount + ' ' + (1000 - period) + "ms" /*, new Exception()*/);
try {
//this.wait(1000 - period);
_lock.newCondition().await(1000 - period, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {}
}
} finally {
_lock.unlock();
}
}

View File

@@ -9,6 +9,7 @@ package net.i2p.client;
*
*/
import java.util.Properties;
import java.util.Set;
import net.i2p.data.Destination;
@@ -151,6 +152,13 @@ public interface I2PSession {
*/
public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException;
/**
* Does not remove properties previously present but missing from this options parameter.
* @param options non-null
* @since 0.8.4
*/
public void updateOptions(Properties options);
/**
* Get the current bandwidth limits. Blocking.
*/

View File

@@ -221,20 +221,32 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
}
}
/** save some memory, don't pass along the pointless properties */
private Properties filter(Properties options) {
Properties rv = new Properties();
for (Iterator iter = options.keySet().iterator(); iter.hasNext();) {
String key = (String) iter.next();
String val = options.getProperty(key);
if (key.startsWith("java") ||
key.startsWith("user") ||
key.startsWith("os") ||
key.startsWith("sun") ||
key.startsWith("file") ||
key.startsWith("line") ||
key.startsWith("wrapper")) {
if (key.startsWith("java.") ||
key.startsWith("user.") ||
key.startsWith("os.") ||
key.startsWith("sun.") ||
key.startsWith("file.") ||
key.equals("line.separator") ||
key.equals("path.separator") ||
key.equals("prng.buffers") ||
key.equals("router.trustedUpdateKeys") ||
key.startsWith("router.update") ||
key.startsWith("routerconsole.") ||
key.startsWith("time.") ||
key.startsWith("stat.") ||
key.startsWith("gnu.") || // gnu JVM
key.startsWith("net.i2p.router.web.") || // console nonces
key.startsWith("wrapper.")) {
if (_log.shouldLog(Log.DEBUG)) _log.debug("Skipping property: " + key);
} else if ((key.length() > 255) || (val.length() > 255)) {
continue;
}
String val = options.getProperty(key);
if ((key.length() > 255) || (val.length() > 255)) {
if (_log.shouldLog(Log.WARN))
_log.warn(getPrefix() + "Not passing on property ["
+ key
@@ -247,6 +259,18 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
return rv;
}
/**
* Update the tunnel and bandwidth settings
* @since 0.8.4
*/
public void updateOptions(Properties options) {
_options.putAll(filter(options));
_producer.updateBandwidth(this);
try {
_producer.updateTunnels(this, 0);
} catch (I2PSessionException ise) {}
}
void setLeaseSet(LeaseSet ls) {
_leaseSet = ls;
if (ls != null) {
@@ -397,7 +421,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
*
*/
public byte[] receiveMessage(int msgId) throws I2PSessionException {
MessagePayloadMessage msg = _availableMessages.remove(new Long(msgId));
MessagePayloadMessage msg = _availableMessages.remove(Long.valueOf(msgId));
if (msg == null) {
_log.error("Receive message " + msgId + " had no matches");
return null;
@@ -444,7 +468,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* Recieve a payload message and let the app know its available
*/
public void addNewMessage(MessagePayloadMessage msg) {
Long mid = new Long(msg.getMessageId());
Long mid = Long.valueOf(msg.getMessageId());
_availableMessages.put(mid, msg);
long id = msg.getMessageId();
byte data[] = msg.getPayload().getUnencryptedData();
@@ -494,7 +518,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
public void available(long msgId, int size) {
synchronized (AvailabilityNotifier.this) {
_pendingIds.add(new Long(msgId));
_pendingIds.add(Long.valueOf(msgId));
_pendingSizes.add(Integer.valueOf(size));
AvailabilityNotifier.this.notifyAll();
}

View File

@@ -191,7 +191,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession {
*/
@Override
public void addNewMessage(MessagePayloadMessage msg) {
Long mid = new Long(msg.getMessageId());
Long mid = Long.valueOf(msg.getMessageId());
_availableMessages.put(mid, msg);
long id = msg.getMessageId();
byte data[] = msg.getPayload().getUnencryptedData();

View File

@@ -98,10 +98,17 @@ class I2PSimpleSession extends I2PSessionImpl2 {
}
}
/**
* Ignore, does nothing
* @since 0.8.4
*/
@Override
public void updateOptions(Properties options) {}
/**
* Only map message handlers that we will use
*/
class SimpleMessageHandlerMap extends I2PClientMessageHandlerMap {
private static class SimpleMessageHandlerMap extends I2PClientMessageHandlerMap {
public SimpleMessageHandlerMap(I2PAppContext context) {
int highest = Math.max(DestReplyMessage.MESSAGE_TYPE, BandwidthLimitsMessage.MESSAGE_TYPE);
_handlers = new I2CPMessageHandler[highest+1];

View File

@@ -149,7 +149,7 @@ public class CryptixAESEngine extends AESEngine {
@Override
public final void decryptBlock(byte payload[], int inIndex, SessionKey sessionKey, byte rv[], int outIndex) {
if ( (payload == null) || (rv == null) )
throw new IllegalArgumentException("null block args [payload=" + payload + " rv="+rv);
throw new IllegalArgumentException("null block args");
if (payload.length - inIndex > rv.length - outIndex)
throw new IllegalArgumentException("bad block args [payload.len=" + payload.length
+ " inIndex=" + inIndex + " rv.len=" + rv.length

View File

@@ -72,13 +72,13 @@ public class Base32 {
}
private static void runApp(String args[]) {
if ("encodestring".equalsIgnoreCase(args[0])) {
System.out.println(encode(args[1].getBytes()));
return;
}
InputStream in = System.in;
OutputStream out = System.out;
try {
if ("encodestring".equalsIgnoreCase(args[0])) {
System.out.println(encode(args[1].getBytes()));
return;
}
InputStream in = System.in;
OutputStream out = System.out;
if (args.length >= 3) {
out = new FileOutputStream(args[2]);
}
@@ -95,6 +95,9 @@ public class Base32 {
}
} catch (IOException ioe) {
ioe.printStackTrace(System.err);
} finally {
try { in.close(); } catch (IOException e) {}
try { out.close(); } catch (IOException e) {}
}
}

View File

@@ -178,13 +178,13 @@ public class Base64 {
}
private static void runApp(String args[]) {
if ("encodestring".equalsIgnoreCase(args[0])) {
System.out.println(encode(args[1].getBytes()));
return;
}
InputStream in = System.in;
OutputStream out = System.out;
try {
if ("encodestring".equalsIgnoreCase(args[0])) {
System.out.println(encode(args[1].getBytes()));
return;
}
InputStream in = System.in;
OutputStream out = System.out;
if (args.length >= 3) {
out = new FileOutputStream(args[2]);
}
@@ -201,6 +201,9 @@ public class Base64 {
}
} catch (IOException ioe) {
ioe.printStackTrace(System.err);
} finally {
try { in.close(); } catch (IOException e) {}
try { out.close(); } catch (IOException e) {}
}
}

View File

@@ -844,7 +844,7 @@ public class DataHelper {
*/
public final static void xor(byte lhs[], int startLeft, byte rhs[], int startRight, byte out[], int startOut, int len) {
if ( (lhs == null) || (rhs == null) || (out == null) )
throw new NullPointerException("Invalid params to xor (" + lhs + ", " + rhs + ", " + out + ")");
throw new NullPointerException("Null params to xor");
if (lhs.length < startLeft + len)
throw new IllegalArgumentException("Left hand side is too short");
if (rhs.length < startRight + len)

View File

@@ -133,9 +133,15 @@ public class PrivateKeyFile {
*/
public Destination createIfAbsent() throws I2PException, IOException, DataFormatException {
if(!this.file.exists()) {
FileOutputStream out = new FileOutputStream(this.file);
this.client.createDestination(out);
out.close();
FileOutputStream out = null;
try {
out = new FileOutputStream(this.file);
this.client.createDestination(out);
} finally {
if (out != null) {
try { out.close(); } catch (IOException ioe) {}
}
}
}
return getDestination();
}
@@ -243,29 +249,36 @@ public class PrivateKeyFile {
public I2PSession open() throws I2PSessionException, IOException {
return this.open(new Properties());
}
public I2PSession open(Properties opts) throws I2PSessionException, IOException {
// open input file
FileInputStream in = new FileInputStream(this.file);
// create sesssion
I2PSession s = this.client.createSession(in, opts);
// close file
in.close();
return s;
FileInputStream in = null;
try {
in = new FileInputStream(this.file);
I2PSession s = this.client.createSession(in, opts);
return s;
} finally {
if (in != null) {
try { in.close(); } catch (IOException ioe) {}
}
}
}
/**
* Copied from I2PClientImpl.createDestination()
*/
public void write() throws IOException, DataFormatException {
FileOutputStream out = new FileOutputStream(this.file);
this.dest.writeBytes(out);
this.privKey.writeBytes(out);
this.signingPrivKey.writeBytes(out);
out.flush();
out.close();
FileOutputStream out = null;
try {
out = new FileOutputStream(this.file);
this.dest.writeBytes(out);
this.privKey.writeBytes(out);
this.signingPrivKey.writeBytes(out);
out.flush();
} finally {
if (out != null) {
try { out.close(); } catch (IOException ioe) {}
}
}
}
@Override
@@ -377,7 +390,8 @@ public class PrivateKeyFile {
}
}
}
} catch (Exception ioe) {
} catch (DataFormatException dfe) {
} catch (IOException ioe) {
}
// not found, continue to the next file
}

View File

@@ -74,10 +74,11 @@ public class DestReplyMessage extends I2CPMessageImpl {
}
protected byte[] doWriteMessage() throws I2CPMessageException, IOException {
if (_dest == null && _hash == null)
return new byte[0]; // null response allowed
if (_dest == null && _hash != null)
if (_dest == null) {
if (_hash == null)
return new byte[0]; // null response allowed
return _hash.getData();
}
ByteArrayOutputStream os = new ByteArrayOutputStream(_dest.size());
try {
_dest.writeBytes(os);

View File

@@ -89,7 +89,7 @@ public class FrequencyStat {
/** @since 0.8.2 */
@Override
public boolean equals(Object obj) {
if ((obj == null) || (obj.getClass() != FrequencyStat.class)) return false;
if ((obj == null) || !(obj instanceof FrequencyStat)) return false;
return _statName.equals(((FrequencyStat)obj)._statName);
}

View File

@@ -473,7 +473,7 @@ public class Rate {
@Override
public boolean equals(Object obj) {
if ((obj == null) || (obj.getClass() != Rate.class)) return false;
if ((obj == null) || !(obj instanceof Rate)) return false;
if (obj == this) return true;
Rate r = (Rate) obj;
return _period == r.getPeriod() && _creationDate == r.getCreationDate() &&

View File

@@ -108,7 +108,7 @@ public class RateStat {
@Override
public boolean equals(Object obj) {
if ((obj == null) || (obj.getClass() != RateStat.class)) return false;
if ((obj == null) || !(obj instanceof RateStat)) return false;
RateStat rs = (RateStat) obj;
if (DataHelper.eq(getGroupName(), rs.getGroupName()) && DataHelper.eq(getDescription(), rs.getDescription())
&& DataHelper.eq(getName(), rs.getName())) {