From 532c9d3fc59a649b51fdbee158001478e63f2c81 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Mon, 3 Jan 2011 15:56:02 +0000 Subject: [PATCH] * I2CP: - Add experimental bandwidth limiter - Add I2PSession API method to update tunnel and bandwidth configuration on an existing session - Filter more system properties before passing them to the router --- .../net/i2p/client/I2CPMessageProducer.java | 156 ++++++++++++++++-- core/java/src/net/i2p/client/I2PSession.java | 8 + .../src/net/i2p/client/I2PSessionImpl.java | 42 ++++- .../src/net/i2p/client/I2PSimpleSession.java | 7 + 4 files changed, 187 insertions(+), 26 deletions(-) diff --git a/core/java/src/net/i2p/client/I2CPMessageProducer.java b/core/java/src/net/i2p/client/I2CPMessageProducer.java index 0fa35d8cff..f9840c5c34 100644 --- a/core/java/src/net/i2p/client/I2CPMessageProducer.java +++ b/core/java/src/net/i2p/client/I2CPMessageProducer.java @@ -12,6 +12,8 @@ package net.i2p.client; import java.util.Date; import java.util.Properties; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import net.i2p.I2PAppContext; import net.i2p.data.DataFormatException; @@ -41,22 +43,51 @@ import net.i2p.util.Log; * @author jrandom */ class I2CPMessageProducer { - private final static Log _log = new Log(I2CPMessageProducer.class); + private final Log _log; private final I2PAppContext _context; - private int _sendBps; - private long _sendPeriodBytes; - private long _sendPeriodBeginTime; + private int _maxBytesPerSecond; + private volatile int _sendPeriodBytes; + private volatile long _sendPeriodBeginTime; + private final ReentrantLock _lock; + private static final String PROP_MAX_BW = "i2cp.outboundBytesPerSecond"; + /** see ConnectionOptions in streaming - MTU + streaming overhead + gzip overhead */ + private static final int TYP_SIZE = 1730 + 28 + 23; + private static final int MIN_RATE = 2 * TYP_SIZE; public I2CPMessageProducer(I2PAppContext context) { _context = context; - context.statManager().createRateStat("client.sendBpsRaw", "How fast we pump out I2CP data messages", "ClientMessages", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); + _log = context.logManager().getLog(I2CPMessageProducer.class); + _lock = new ReentrantLock(true); + context.statManager().createRateStat("client.sendThrottled", "Times waited for bandwidth", "ClientMessages", new long[] { 60*1000 }); + context.statManager().createRateStat("client.sendDropped", "Length of msg dropped waiting for bandwidth", "ClientMessages", new long[] { 60*1000 }); } + /** + * Update the bandwidth setting + * @since 0.8.4 + */ + public void updateBandwidth(I2PSessionImpl session) { + String max = session.getOptions().getProperty(PROP_MAX_BW); + if (max != null) { + try { + int iMax = Integer.parseInt(max); + if (iMax > 0) + // round up to next higher TYP_SIZE for efficiency, then add some fudge for small messages + _maxBytesPerSecond = 256 + Math.max(MIN_RATE, TYP_SIZE * ((iMax + TYP_SIZE - 1) / TYP_SIZE)); + else + _maxBytesPerSecond = 0; + } catch (NumberFormatException nfe) {} + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Setting " + _maxBytesPerSecond + " BPS max"); + } + /** * Send all the messages that a client needs to send to a router to establish * a new session. */ public void connect(I2PSessionImpl session) throws I2PSessionException { + updateBandwidth(session); CreateSessionMessage msg = new CreateSessionMessage(); SessionConfig cfg = new SessionConfig(session.getMyDestination()); cfg.setOptions(session.getOptions()); @@ -99,6 +130,9 @@ class I2CPMessageProducer { */ public void sendMessage(I2PSessionImpl session, Destination dest, long nonce, byte[] payload, SessionTag tag, SessionKey key, Set tags, SessionKey newKey, long expires) throws I2PSessionException { + if (!updateBps(payload.length, expires)) + // drop the message... send fail notification? + return; SendMessageMessage msg; if (expires > 0) { msg = new SendMessageExpiresMessage(); @@ -111,20 +145,108 @@ class I2CPMessageProducer { Payload data = createPayload(dest, payload, tag, key, tags, newKey); msg.setPayload(data); session.sendMessage(msg); - updateBps(payload.length); } - private void updateBps(int len) { - long now = _context.clock().now(); - float period = ((float)now-_sendPeriodBeginTime)/1000f; - if (period >= 1f) { - // first term decays on slow transmission - _sendBps = (int)(((float)0.9f * (float)_sendBps) + ((float)0.1f*((float)_sendPeriodBytes)/period)); - _sendPeriodBytes = len; - _sendPeriodBeginTime = now; - _context.statManager().addRateData("client.sendBpsRaw", _sendBps, 0); - } else { - _sendPeriodBytes += len; + /** + * Super-simple bandwidth throttler. + * We only calculate on a one-second basis, so large messages + * (compared to the one-second limit) may exceed the limits. + * Tuned for streaming, may not work well for large datagrams. + * + * This does poorly with low rate limits since it doesn't credit + * bandwidth across two periods. So the limit is rounded up, + * and the min limit is set to 2x the typ size, above. + * + * Blocking so this could be very bad for retransmissions, + * as it could clog StreamingTimer. + * Waits are somewhat "fair" using ReentrantLock. + * While out-of-order transmission is acceptable, fairness + * reduces the chance of starvation. ReentrantLock does not + * guarantee in-order execution due to thread priority issues, + * so out-of-order may still occur. But shouldn't happen within + * the same thread anyway... Also note that small messages may + * go ahead of large ones that are waiting for the next window. + * Also, threads waiting a second time go to the back of the line. + * + * Since this is at the I2CP layer, it includes streaming overhead, + * streaming acks and retransmissions, + * gzip overhead (or "underhead" for compression), + * repliable datagram overhead, etc. + * However, it does not, of course, include the substantial overhead + * imposed by the router for the leaseset, tags, encryption, + * and fixed-size tunnel messages. + * + * @param expires if > 0, an expiration date + * @return true if we should send the message, false to drop it + */ + private boolean updateBps(int len, long expires) { + if (_maxBytesPerSecond <= 0) + return true; + //synchronized(this) { + _lock.lock(); + try { + int waitCount = 0; + while (true) { + long now = _context.clock().now(); + if (waitCount > 0 && expires > 0 && expires < now) { + // just say no to bufferbloat... drop the message right here + _context.statManager().addRateData("client.sendDropped", len, 0); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping " + len + " byte msg expired in queue"); + return false; + } + + long period = now - _sendPeriodBeginTime; + if (period >= 2000) { + // start new period, always let it through no matter how big + _sendPeriodBytes = len; + _sendPeriodBeginTime = now; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New period after idle, " + len + " bytes"); + return true; + } + + if (period >= 1000) { + // start new period + // Allow burst within 2 sec, only advance window by 1 sec, and + // every other second give credit for unused bytes in previous period + if (_sendPeriodBytes > 0 && ((_sendPeriodBeginTime / 1000) & 0x01) == 0) + _sendPeriodBytes += len - _maxBytesPerSecond; + else + _sendPeriodBytes = len; + _sendPeriodBeginTime += 1000; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("New period, " + len + " bytes"); + return true; + } + + if (_sendPeriodBytes + len <= _maxBytesPerSecond) { + // still bytes available in this period + _sendPeriodBytes += len; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending " + len + ", Elapsed " + period + "ms, total " + _sendPeriodBytes + " bytes"); + return true; + } + + if (waitCount >= 2) { + // just say no to bufferbloat... drop the message right here + _context.statManager().addRateData("client.sendDropped", len, 0); + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping " + len + " byte msg after waiting " + waitCount + " times"); + return false; + } + + // wait until next period + _context.statManager().addRateData("client.sendThrottled", ++waitCount, 0); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Throttled " + len + " bytes, wait #" + waitCount + ' ' + (1000 - period) + "ms" /*, new Exception()*/); + try { + //this.wait(1000 - period); + _lock.newCondition().await(1000 - period, TimeUnit.MILLISECONDS); + } catch (InterruptedException ie) {} + } + } finally { + _lock.unlock(); } } diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index cd20cfc2d6..27138b8842 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -9,6 +9,7 @@ package net.i2p.client; * */ +import java.util.Properties; import java.util.Set; import net.i2p.data.Destination; @@ -151,6 +152,13 @@ public interface I2PSession { */ public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException; + /** + * Does not remove properties previously present but missing from this options parameter. + * @param options non-null + * @since 0.8.4 + */ + public void updateOptions(Properties options); + /** * Get the current bandwidth limits. Blocking. */ diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 8b4389e47d..10e7695c04 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -221,20 +221,32 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } } + /** save some memory, don't pass along the pointless properties */ private Properties filter(Properties options) { Properties rv = new Properties(); for (Iterator iter = options.keySet().iterator(); iter.hasNext();) { String key = (String) iter.next(); - String val = options.getProperty(key); - if (key.startsWith("java") || - key.startsWith("user") || - key.startsWith("os") || - key.startsWith("sun") || - key.startsWith("file") || - key.startsWith("line") || - key.startsWith("wrapper")) { + if (key.startsWith("java.") || + key.startsWith("user.") || + key.startsWith("os.") || + key.startsWith("sun.") || + key.startsWith("file.") || + key.equals("line.separator") || + key.equals("path.separator") || + key.equals("prng.buffers") || + key.equals("router.trustedUpdateKeys") || + key.startsWith("router.update") || + key.startsWith("routerconsole.") || + key.startsWith("time.") || + key.startsWith("stat.") || + key.startsWith("gnu.") || // gnu JVM + key.startsWith("net.i2p.router.web.") || // console nonces + key.startsWith("wrapper.")) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Skipping property: " + key); - } else if ((key.length() > 255) || (val.length() > 255)) { + continue; + } + String val = options.getProperty(key); + if ((key.length() > 255) || (val.length() > 255)) { if (_log.shouldLog(Log.WARN)) _log.warn(getPrefix() + "Not passing on property [" + key @@ -247,6 +259,18 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa return rv; } + /** + * Update the tunnel and bandwidth settings + * @since 0.8.4 + */ + public void updateOptions(Properties options) { + _options.putAll(filter(options)); + _producer.updateBandwidth(this); + try { + _producer.updateTunnels(this, 0); + } catch (I2PSessionException ise) {} + } + void setLeaseSet(LeaseSet ls) { _leaseSet = ls; if (ls != null) { diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index e984b6d307..1c564f0db4 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -98,6 +98,13 @@ class I2PSimpleSession extends I2PSessionImpl2 { } } + /** + * Ignore, does nothing + * @since 0.8.4 + */ + @Override + public void updateOptions(Properties options) {} + /** * Only map message handlers that we will use */ -- GitLab