I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 4824cae3 authored by zab's avatar zab
Browse files

Properly synchronize the Rate class

	Add a new class to store results from rate calculations
	Add a new method to compute average, last and current measurements
	Use the new method in RouterThrottleImpl
parent 99179eda
No related branches found
No related tags found
No related merge requests found
......@@ -14,25 +14,25 @@ import net.i2p.data.DataHelper;
*/
public class Rate {
//private final static Log _log = new Log(Rate.class);
private volatile double _currentTotalValue;
private double _currentTotalValue;
// was long, save space
private volatile int _currentEventCount;
private volatile long _currentTotalEventTime;
private volatile double _lastTotalValue;
private int _currentEventCount;
private long _currentTotalEventTime;
private double _lastTotalValue;
// was long, save space
private volatile int _lastEventCount;
private volatile long _lastTotalEventTime;
private volatile double _extremeTotalValue;
private int _lastEventCount;
private long _lastTotalEventTime;
private double _extremeTotalValue;
// was long, save space
private volatile int _extremeEventCount;
private volatile long _extremeTotalEventTime;
private volatile double _lifetimeTotalValue;
private volatile long _lifetimeEventCount;
private volatile long _lifetimeTotalEventTime;
private int _extremeEventCount;
private long _extremeTotalEventTime;
private double _lifetimeTotalValue;
private long _lifetimeEventCount;
private long _lifetimeTotalEventTime;
private RateSummaryListener _summaryListener;
private RateStat _stat;
private volatile long _lastCoalesceDate;
private long _lastCoalesceDate;
private long _creationDate;
// was long, save space
private int _period;
......@@ -41,37 +41,37 @@ public class Rate {
// private final Object _lock = new Object();
/** in the current (partial) period, what is the total value acrued through all events? */
public double getCurrentTotalValue() {
public synchronized double getCurrentTotalValue() {
return _currentTotalValue;
}
/** in the current (partial) period, how many events have occurred? */
public long getCurrentEventCount() {
public synchronized long getCurrentEventCount() {
return _currentEventCount;
}
/** in the current (partial) period, how much of the time has been spent doing the events? */
public long getCurrentTotalEventTime() {
public synchronized long getCurrentTotalEventTime() {
return _currentTotalEventTime;
}
/** in the last full period, what was the total value acrued through all events? */
public double getLastTotalValue() {
public synchronized double getLastTotalValue() {
return _lastTotalValue;
}
/** in the last full period, how many events occurred? */
public long getLastEventCount() {
public synchronized long getLastEventCount() {
return _lastEventCount;
}
/** in the last full period, how much of the time was spent doing the events? */
public long getLastTotalEventTime() {
public synchronized long getLastTotalEventTime() {
return _lastTotalEventTime;
}
/** what was the max total value acrued in any period? */
public double getExtremeTotalValue() {
public synchronized double getExtremeTotalValue() {
return _extremeTotalValue;
}
......@@ -79,42 +79,42 @@ public class Rate {
* when the max(totalValue) was achieved, how many events occurred in that period?
* Note that this is not necesarily the highest event count; that isn't tracked.
*/
public long getExtremeEventCount() {
public synchronized long getExtremeEventCount() {
return _extremeEventCount;
}
/** when the max(totalValue) was achieved, how much of the time was spent doing the events? */
public long getExtremeTotalEventTime() {
public synchronized long getExtremeTotalEventTime() {
return _extremeTotalEventTime;
}
/** since rate creation, what was the total value acrued through all events? */
public double getLifetimeTotalValue() {
public synchronized double getLifetimeTotalValue() {
return _lifetimeTotalValue;
}
/** since rate creation, how many events have occurred? */
public long getLifetimeEventCount() {
public synchronized long getLifetimeEventCount() {
return _lifetimeEventCount;
}
/** since rate creation, how much of the time was spent doing the events? */
public long getLifetimeTotalEventTime() {
public synchronized long getLifetimeTotalEventTime() {
return _lifetimeTotalEventTime;
}
/** when was the rate last coalesced? */
public long getLastCoalesceDate() {
public synchronized long getLastCoalesceDate() {
return _lastCoalesceDate;
}
/** when was this rate created? */
public long getCreationDate() {
public synchronized long getCreationDate() {
return _creationDate;
}
/** how large should this rate's cycle be? */
public long getPeriod() {
public synchronized long getPeriod() {
return _period;
}
......@@ -160,13 +160,11 @@ public class Rate {
* If you always use this call, eventDuration is always zero,
* and the various get*Saturation*() and get*EventTime() methods will return zero.
*/
public void addData(long value) {
synchronized (this) {
_currentTotalValue += value;
_currentEventCount++;
_lifetimeTotalValue += value;
_lifetimeEventCount++;
}
public synchronized void addData(long value) {
_currentTotalValue += value;
_currentEventCount++;
_lifetimeTotalValue += value;
_lifetimeEventCount++;
}
/**
......@@ -202,16 +200,14 @@ public class Rate {
* @param value value to accrue in the current period
* @param eventDuration how long it took to accrue this data (set to 0 if it was instantaneous)
*/
public void addData(long value, long eventDuration) {
synchronized (this) {
_currentTotalValue += value;
_currentEventCount++;
_currentTotalEventTime += eventDuration;
public synchronized void addData(long value, long eventDuration) {
_currentTotalValue += value;
_currentEventCount++;
_currentTotalEventTime += eventDuration;
_lifetimeTotalValue += value;
_lifetimeEventCount++;
_lifetimeTotalEventTime += eventDuration;
}
_lifetimeTotalValue += value;
_lifetimeEventCount++;
_lifetimeTotalEventTime += eventDuration;
}
/** 2s is plenty of slack to deal with slow coalescing (across many stats) */
......@@ -261,10 +257,8 @@ public class Rate {
/**
* What was the average value across the events in the last period?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*/
public double getAverageValue() {
public synchronized double getAverageValue() {
int lec = _lastEventCount; // avoid race NPE
if ((_lastTotalValue != 0) && (lec > 0))
return _lastTotalValue / lec;
......@@ -275,10 +269,8 @@ public class Rate {
/**
* During the extreme period (i.e. the period with the highest total value),
* what was the average value?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*/
public double getExtremeAverageValue() {
public synchronized double getExtremeAverageValue() {
if ((_extremeTotalValue != 0) && (_extremeEventCount > 0))
return _extremeTotalValue / _extremeEventCount;
......@@ -287,10 +279,8 @@ public class Rate {
/**
* What was the average value across the events since the stat was created?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*/
public double getLifetimeAverageValue() {
public synchronized double getLifetimeAverageValue() {
if ((_lifetimeTotalValue != 0) && (_lifetimeEventCount > 0))
return _lifetimeTotalValue / _lifetimeEventCount;
......@@ -303,7 +293,7 @@ public class Rate {
*
* @return ratio, or 0 if event times aren't used
*/
public double getLastEventSaturation() {
public synchronized double getLastEventSaturation() {
if ((_lastEventCount > 0) && (_lastTotalEventTime > 0)) {
/*double eventTime = (double) _lastTotalEventTime / (double) _lastEventCount;
double maxEvents = _period / eventTime;
......@@ -321,11 +311,9 @@ public class Rate {
* how much of the time was spent actually processing events
* in proportion to how many events could have occurred if there were no intervals?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*
* @return ratio, or 0 if the statistic doesn't use event times
*/
public double getExtremeEventSaturation() {
public synchronized double getExtremeEventSaturation() {
if ((_extremeEventCount > 0) && (_extremeTotalEventTime > 0)) {
double eventTime = (double) _extremeTotalEventTime / (double) _extremeEventCount;
double maxEvents = _period / eventTime;
......@@ -338,11 +326,9 @@ public class Rate {
* During the lifetime of this stat, how much of the time was spent actually processing events in proportion
* to how many events could have occurred if there were no intervals?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*
* @return ratio, or 0 if event times aren't used
*/
public double getLifetimeEventSaturation() {
public synchronized double getLifetimeEventSaturation() {
if ((_lastEventCount > 0) && (_lifetimeTotalEventTime > 0)) {
double eventTime = (double) _lifetimeTotalEventTime / (double) _lifetimeEventCount;
double maxEvents = _period / eventTime;
......@@ -354,7 +340,7 @@ public class Rate {
}
/** how many periods have we already completed? */
public long getLifetimePeriods() {
public synchronized long getLifetimePeriods() {
long lifetime = now() - _creationDate;
double periods = lifetime / (double) _period;
return (long) Math.floor(periods);
......@@ -364,11 +350,9 @@ public class Rate {
* using the last period's rate, what is the total value that could have been sent
* if events were constant?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*
* @return max total value, or 0 if event times aren't used
*/
public double getLastSaturationLimit() {
public synchronized double getLastSaturationLimit() {
if ((_lastTotalValue != 0) && (_lastEventCount > 0) && (_lastTotalEventTime > 0)) {
double saturation = getLastEventSaturation();
if (saturation != 0.0D) return _lastTotalValue / saturation;
......@@ -383,11 +367,9 @@ public class Rate {
* what is the total value that could have been
* sent if events were constant?
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*
* @return event total at saturation, or 0 if no event times are measured
*/
public double getExtremeSaturationLimit() {
public synchronized double getExtremeSaturationLimit() {
if ((_extremeTotalValue != 0) && (_extremeEventCount > 0) && (_extremeTotalEventTime > 0)) {
double saturation = getExtremeEventSaturation();
if (saturation != 0.0d) return _extremeTotalValue / saturation;
......@@ -402,10 +384,8 @@ public class Rate {
* What was the total value, compared to the total value in
* the extreme period (i.e. the period with the highest total value),
* Warning- returns ratio, not percentage (i.e. it is not multiplied by 100 here)
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*/
public double getPercentageOfExtremeValue() {
public synchronized double getPercentageOfExtremeValue() {
if ((_lastTotalValue != 0) && (_extremeTotalValue != 0))
return _lastTotalValue / _extremeTotalValue;
......@@ -415,10 +395,8 @@ public class Rate {
/**
* How large was the last period's value as compared to the lifetime average value?
* Warning- returns ratio, not percentage (i.e. it is not multiplied by 100 here)
*
* Warning - unsynchronized, might glitch during coalesce, caller may prevent by synchronizing on this.
*/
public double getPercentageOfLifetimeValue() {
public synchronized double getPercentageOfLifetimeValue() {
if ((_lastTotalValue != 0) && (_lifetimeTotalValue != 0)) {
double lifetimePeriodValue = _period * (_lifetimeTotalValue / (now() - _creationDate));
return _lastTotalValue / lifetimePeriodValue;
......@@ -426,8 +404,27 @@ public class Rate {
return 0.0D;
}
/**
* @param out where to store the computed averages.
*/
public synchronized void computeAverages(RateAverages out) {
final long total = _currentEventCount + _lastEventCount;
if (total <= 0) {
out.setAverage(getAverageValue());
return;
}
if (_currentEventCount > 0)
out.setCurrent( getCurrentTotalValue() / _currentEventCount );
if (_lastEventCount > 0)
out.setLast( getLastTotalValue() / _lastEventCount );
out.setAverage( ( getCurrentTotalValue() + getLastTotalValue() ) / total );
}
public void store(String prefix, StringBuilder buf) throws IOException {
public synchronized void store(String prefix, StringBuilder buf) throws IOException {
PersistenceHelper.addTime(buf, prefix, ".period", "Length of the period:", _period);
PersistenceHelper.addDate(buf, prefix, ".creationDate",
"When was this rate created?", _creationDate);
......@@ -476,7 +473,7 @@ public class Rate {
* treat the data with as much freshness (or staleness) as appropriate.
* @throws IllegalArgumentException if the data was formatted incorrectly
*/
public void load(Properties props, String prefix, boolean treatAsCurrent) throws IllegalArgumentException {
public synchronized void load(Properties props, String prefix, boolean treatAsCurrent) throws IllegalArgumentException {
_period = PersistenceHelper.getInt(props, prefix, ".period");
_creationDate = PersistenceHelper.getLong(props, prefix, ".creationDate");
_lastCoalesceDate = PersistenceHelper.getLong(props, prefix, ".lastCoalesceDate");
......@@ -504,7 +501,7 @@ public class Rate {
* We base it on the stat we are tracking, not the stored data.
*/
@Override
public boolean equals(Object obj) {
public synchronized boolean equals(Object obj) {
if ((obj == null) || !(obj instanceof Rate)) return false;
if (obj == this) return true;
Rate r = (Rate) obj;
......@@ -519,12 +516,12 @@ public class Rate {
* (RateStat stores in an array) so let's make this easy.
*/
@Override
public int hashCode() {
public synchronized int hashCode() {
return DataHelper.hashCode(_stat) ^ _period ^ ((int) _creationDate);
}
@Override
public String toString() {
public synchronized String toString() {
StringBuilder buf = new StringBuilder(2048);
buf.append("\n\t total value: ").append(getLastTotalValue());
buf.append("\n\t highest total value: ").append(getExtremeTotalValue());
......@@ -554,39 +551,4 @@ public class Rate {
// skew periodically
return System.currentTimeMillis(); //Clock.getInstance().now();
}
/******
public static void main(String args[]) {
Rate rate = new Rate(1000);
for (int i = 0; i < 50; i++) {
try {
Thread.sleep(20);
} catch (InterruptedException ie) { // nop
}
rate.addData(i * 100, 20);
}
rate.coalesce();
StringBuilder buf = new StringBuilder(1024);
try {
rate.store("rate.test", buf);
byte data[] = buf.toString().getBytes();
_log.error("Stored rate: size = " + data.length + "\n" + buf.toString());
Properties props = new Properties();
props.load(new java.io.ByteArrayInputStream(data));
//_log.error("Properties loaded: \n" + props);
Rate r = new Rate(props, "rate.test", true);
_log.error("Comparison after store/load: " + r.equals(rate));
} catch (Throwable t) {
_log.error("b0rk", t);
}
try {
Thread.sleep(5000);
} catch (InterruptedException ie) { // nop
}
}
******/
}
package net.i2p.stat;
public class RateAverages {
private double average, current, last;
public void reset() {
average = 0;
current = 0;
last = 0;
}
public double getAverage() {
return average;
}
public void setAverage(double average) {
this.average = average;
}
public double getCurrent() {
return current;
}
public void setCurrent(double current) {
this.current = current;
}
public double getLast() {
return last;
}
public void setLast(double last) {
this.last = last;
}
}
......@@ -3,6 +3,7 @@ package net.i2p.router;
import net.i2p.data.Hash;
import net.i2p.router.peermanager.TunnelHistory;
import net.i2p.stat.Rate;
import net.i2p.stat.RateAverages;
import net.i2p.stat.RateStat;
import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler;
......@@ -40,6 +41,15 @@ class RouterThrottleImpl implements RouterThrottle {
private static final int PREPROCESSED_SIZE = 1024;
private static final long REJECT_STARTUP_TIME = 20*60*1000;
/** scratch space for calculations of rate averages */
private static final ThreadLocal<RateAverages> RATE_AVERAGES =
new ThreadLocal<RateAverages>() {
@Override
public RateAverages initialValue() {
return new RateAverages();
}
};
public RouterThrottleImpl(RouterContext context) {
_context = context;
......@@ -119,6 +129,9 @@ class RouterThrottleImpl implements RouterThrottle {
//long lag = _context.jobQueue().getMaxLag();
// reject here if lag too high???
RateAverages ra = RATE_AVERAGES.get();
ra.reset();
// TODO
// This stat is highly dependent on transport mix.
// For NTCP, it is queueing delay only, ~25ms
......@@ -133,37 +146,19 @@ class RouterThrottleImpl implements RouterThrottle {
//Reject tunnels if the time to process messages and send them is too large. Too much time implies congestion.
if(r != null) {
long current = r.getCurrentEventCount();
long last = r.getLastEventCount();
long total = current + last;
double avgSendProcessingTime = 0;
double currentSendProcessingTime = 0;
double lastSendProcessingTime = 0;
r.computeAverages(ra);
//Calculate times
if(total > 0) {
if(current > 0)
currentSendProcessingTime = r.getCurrentTotalValue() / current;
if(last > 0)
lastSendProcessingTime = r.getLastTotalValue() / last;
avgSendProcessingTime = (r.getCurrentTotalValue() + r.getLastTotalValue()) / total;
} else {
avgSendProcessingTime = r.getAverageValue();
//if(_log.shouldLog(Log.WARN))
// _log.warn("No events occurred. Using 1 minute average to look at message delay.");
}
int maxProcessingTime = _context.getProperty(PROP_MAX_PROCESSINGTIME, DEFAULT_MAX_PROCESSINGTIME);
//Set throttling if necessary
if((avgSendProcessingTime > maxProcessingTime*0.9
|| currentSendProcessingTime > maxProcessingTime
|| lastSendProcessingTime > maxProcessingTime)) {
if((ra.getAverage() > maxProcessingTime*0.9
|| ra.getCurrent() > maxProcessingTime
|| ra.getLast() > maxProcessingTime)) {
if(_log.shouldLog(Log.WARN)) {
_log.warn("Refusing tunnel request due to sendProcessingTime " +
((int)currentSendProcessingTime) + " / " +
((int)lastSendProcessingTime) + " / " +
((int)avgSendProcessingTime) + " / " +
((int)ra.getCurrent()) + " / " +
((int)ra.getLast()) + " / " +
((int)ra.getAverage()) + " / " +
maxProcessingTime +
" current/last/avg/max ms");
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment