diff --git a/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java new file mode 100644 index 000000000..48669acb0 --- /dev/null +++ b/router/java/src/net/i2p/router/peermanager/CapacityCalculator.java @@ -0,0 +1,111 @@ +package net.i2p.router.peermanager; + +import net.i2p.router.RouterContext; +import net.i2p.stat.RateStat; +import net.i2p.stat.Rate; +import net.i2p.util.Log; + +/** + * Estimate how many of our tunnels the peer can join per hour. + * Pseudocode: + *
+ * int growthFactor = 5;
+ * int capacity = 0;
+ * foreach i (10, 30, 60) {
+ * if (# tunnels rejected in last $i minutes > 0) continue;
+ * int val = (# tunnels joined in last $i minutes) * (60 / $i);
+ * val -= (# tunnels failed in last $i minutes) * (60 / $i);
+ * if (val >= 0) // if we're failing lots of tunnels, dont grow
+ * capacity += ((val + growthFactor) * periodWeight($i));
+ * }
+ *
+ * periodWeight(int curWeight) {
+ * switch (curWeight) {
+ * case 10: return .6;
+ * case 30: return .3;
+ * case 60: return .1;
+ * }
+ * }
+ *
+ *
+ */
+public class CapacityCalculator extends Calculator {
+ private Log _log;
+ private RouterContext _context;
+
+ public CapacityCalculator(RouterContext context) {
+ _context = context;
+ _log = context.logManager().getLog(CapacityCalculator.class);
+ }
+
+ /** used to adjust each period so that we keep trying to expand the peer's capacity */
+ private static long GROWTH_FACTOR = 5;
+
+ /** the calculator estimates over a 1 hour period */
+ private static long ESTIMATE_PERIOD = 60*60*1000;
+
+ public double calc(PeerProfile profile) {
+ double capacity = 0;
+
+ RateStat acceptStat = profile.getTunnelCreateResponseTime();
+ RateStat rejectStat = profile.getTunnelHistory().getRejectionRate();
+ RateStat failedStat = profile.getTunnelHistory().getFailedRate();
+
+ capacity += estimatePartial(acceptStat, rejectStat, failedStat, 10*60*1000);
+ capacity += estimatePartial(acceptStat, rejectStat, failedStat, 30*60*1000);
+ capacity += estimatePartial(acceptStat, rejectStat, failedStat, 60*60*1000);
+ capacity += estimatePartial(acceptStat, rejectStat, failedStat, 24*60*60*1000);
+
+ if (tooOld(profile))
+ capacity = 1;
+
+ capacity += profile.getReliabilityBonus();
+ return capacity;
+ }
+
+ /**
+ * If we haven't heard from them in an hour, they aren't too useful.
+ *
+ */
+ private boolean tooOld(PeerProfile profile) {
+ if (profile.getIsActive())
+ return false;
+ else
+ return true;
+ }
+
+ private double estimatePartial(RateStat acceptStat, RateStat rejectStat, RateStat failedStat, int period) {
+ Rate curAccepted = acceptStat.getRate(period);
+ Rate curRejected = rejectStat.getRate(period);
+ Rate curFailed = failedStat.getRate(period);
+ if (curRejected.getCurrentEventCount() + curRejected.getLastEventCount() > 0)
+ return 0.0d;
+
+ long eventCount = 0;
+ if (curAccepted != null)
+ eventCount = curAccepted.getCurrentEventCount() + curAccepted.getLastEventCount();
+ double stretch = ESTIMATE_PERIOD / period;
+ double val = eventCount * stretch;
+ long failed = 0;
+ if (curFailed != null)
+ failed = curFailed.getCurrentEventCount() + curFailed.getLastEventCount();
+ if (failed > 0)
+ val -= failed * stretch;
+ if (val >= 0) {
+ return (val + GROWTH_FACTOR) * periodWeight(period);
+ } else {
+ // failed too much, don't grow
+ return 0.0d;
+ }
+ }
+
+ private double periodWeight(int period) {
+ switch (period) {
+ case 10*60*1000: return .4;
+ case 30*60*1000: return .3;
+ case 60*60*1000: return .2;
+ case 24*60*60*1000: return .1;
+ default: throw new IllegalArgumentException("wtf, period [" + period + "]???");
+ }
+ }
+}
diff --git a/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java b/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java
index 70df31b14..728487762 100644
--- a/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java
+++ b/router/java/src/net/i2p/router/peermanager/IsFailingCalculator.java
@@ -1,6 +1,7 @@
package net.i2p.router.peermanager;
import net.i2p.router.RouterContext;
+import net.i2p.stat.Rate;
import net.i2p.util.Log;
/**
@@ -31,40 +32,33 @@ public class IsFailingCalculator extends Calculator {
public boolean calcBoolean(PeerProfile profile) {
// have we failed in the last 119 seconds?
if ( (profile.getCommError().getRate(60*1000).getCurrentEventCount() > 0) ||
- (profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ) {
+ (profile.getCommError().getRate(60*1000).getLastEventCount() > 0) ||
+ (profile.getCommError().getRate(10*60*1000).getCurrentEventCount() > 0) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64()
- + " is failing because it had a comm error in the last 2 minutes");
+ + " is failing because it had a comm error recently ");
return true;
} else {
+
//if ( (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getCurrentEventCount() > 0) ||
// (profile.getDBHistory().getFailedLookupRate().getRate(60*1000).getLastEventCount() > 0) ) {
// // are they overloaded (or disconnected)?
// return true;
//}
- long recently = _context.clock().now() - GRACE_PERIOD;
-
- if (false && (profile.getTunnelHistory().getLastRejected() >= recently) ) {
- // have they refused to participate in a tunnel in the last 5 minutes?
+ Rate rejectRate = profile.getTunnelHistory().getRejectionRate().getRate(10*60*1000);
+ if (rejectRate.getCurrentEventCount() >= 2) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64()
- + " is failing because it refused to participate in a tunnel in the last few minutes");
+ + " is failing because they rejected some tunnels recently");
return true;
}
- if (false && (profile.getTunnelHistory().getLastFailed() >= recently) ) {
- // has a tunnel they participate in failed in the last 5 minutes?
+ Rate failedRate = profile.getTunnelHistory().getFailedRate().getRate(60*1000);
+ if (failedRate.getCurrentEventCount() >= 2) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Peer " + profile.getPeer().toBase64()
- + " is failing because it one of their tunnels failed in the last few minutes");
- return true;
- }
-
- if (false && profile.getLastSendFailed() >= recently) {
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("Peer " + profile.getPeer().toBase64()
- + " is failing because we couldnt send to it recently");
+ + " is failing because too many of their tunnels failed recently");
return true;
}
diff --git a/router/java/src/net/i2p/router/peermanager/PeerManager.java b/router/java/src/net/i2p/router/peermanager/PeerManager.java
index 98fa86d52..fed203ffe 100644
--- a/router/java/src/net/i2p/router/peermanager/PeerManager.java
+++ b/router/java/src/net/i2p/router/peermanager/PeerManager.java
@@ -84,16 +84,16 @@ class PeerManager {
case PeerSelectionCriteria.PURPOSE_TEST:
// for now, the peers we test will be the reliable ones
//_organizer.selectWellIntegratedPeers(criteria.getMinimumRequired(), exclude, curVals);
- _organizer.selectReliablePeers(criteria.getMinimumRequired(), exclude, curVals);
+ _organizer.selectHighCapacityPeers(criteria.getMinimumRequired(), exclude, curVals);
break;
case PeerSelectionCriteria.PURPOSE_TUNNEL:
- _organizer.selectFastAndReliablePeers(criteria.getMinimumRequired(), exclude, curVals);
+ _organizer.selectFastPeers(criteria.getMinimumRequired(), exclude, curVals);
break;
case PeerSelectionCriteria.PURPOSE_SOURCE_ROUTE:
- _organizer.selectReliablePeers(criteria.getMinimumRequired(), exclude, curVals);
+ _organizer.selectHighCapacityPeers(criteria.getMinimumRequired(), exclude, curVals);
break;
case PeerSelectionCriteria.PURPOSE_GARLIC:
- _organizer.selectReliablePeers(criteria.getMinimumRequired(), exclude, curVals);
+ _organizer.selectHighCapacityPeers(criteria.getMinimumRequired(), exclude, curVals);
break;
default:
break;
@@ -103,8 +103,8 @@ class PeerManager {
_log.warn("We ran out of peers when looking for reachable ones after finding "
+ rv.size() + " with "
+ _organizer.countWellIntegratedPeers() + "/"
- + _organizer.countReliablePeers() + "/"
- + _organizer.countFastAndReliablePeers() + " integrated/reliable/fast peers");
+ + _organizer.countHighCapacityPeers() + "/"
+ + _organizer.countFastPeers() + " integrated/high capacity/fast peers");
break;
} else {
for (Iterator iter = curVals.iterator(); iter.hasNext(); ) {
diff --git a/router/java/src/net/i2p/router/peermanager/PeerProfile.java b/router/java/src/net/i2p/router/peermanager/PeerProfile.java
index 5d8890c1d..168890bf6 100644
--- a/router/java/src/net/i2p/router/peermanager/PeerProfile.java
+++ b/router/java/src/net/i2p/router/peermanager/PeerProfile.java
@@ -32,10 +32,12 @@ public class PeerProfile {
// calculation bonuses
private long _speedBonus;
private long _reliabilityBonus;
+ private long _capacityBonus;
private long _integrationBonus;
// calculation values
private double _speedValue;
private double _reliabilityValue;
+ private double _capacityValue;
private double _integrationValue;
private boolean _isFailing;
// good vs bad behavior
@@ -56,6 +58,7 @@ public class PeerProfile {
_expanded = false;
_speedValue = 0;
_reliabilityValue = 0;
+ _capacityValue = 0;
_integrationValue = 0;
_isFailing = false;
_peer = peer;
@@ -151,6 +154,14 @@ public class PeerProfile {
public long getReliabilityBonus() { return _reliabilityBonus; }
public void setReliabilityBonus(long bonus) { _reliabilityBonus = bonus; }
+ /**
+ * extra factor added to the capacity ranking - this can be updated in the profile
+ * written to disk to affect how the algorithm ranks capacity. Negative values are
+ * penalties
+ */
+ public double getCapacityBonus() { return _capacityBonus; }
+ public void setCapacityBonus(long bonus) { _capacityBonus = bonus; }
+
/**
* extra factor added to the integration ranking - this can be updated in the profile
* written to disk to affect how the algorithm ranks integration. Negative values are
@@ -173,6 +184,11 @@ public class PeerProfile {
*
*/
public double getReliabilityValue() { return _reliabilityValue; }
+ /**
+ * How many tunnels do we think this peer can handle over the next hour?
+ *
+ */
+ public double getCapacityValue() { return _capacityValue; }
/**
* How well integrated into the network is this peer (as measured by how much they've
* told us that we didn't already know). Higher numbers means better integrated
@@ -226,7 +242,7 @@ public class PeerProfile {
if (_tunnelTestResponseTime == null)
_tunnelTestResponseTime = new RateStat("tunnelTestResponseTime", "how long it takes to successfully test a tunnel this peer participates in (in milliseconds)", "profile", new long[] { 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_commError == null)
- _commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000 } );
+ _commError = new RateStat("commErrorRate", "how long between communication errors with the peer (e.g. disconnection)", "profile", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000 } );
if (_dbIntroduction == null)
_dbIntroduction = new RateStat("dbIntroduction", "how many new peers we get from dbSearchReplyMessages or dbStore messages", "profile", new long[] { 60*60*1000l, 24*60*60*1000l, 7*24*60*60*1000l });
@@ -250,18 +266,21 @@ public class PeerProfile {
_tunnelCreateResponseTime.coallesceStats();
_tunnelTestResponseTime.coallesceStats();
_dbHistory.coallesceStats();
+ _tunnelHistory.coallesceStats();
_speedValue = calculateSpeed();
_reliabilityValue = calculateReliability();
+ _capacityValue = calculateCapacity();
_integrationValue = calculateIntegration();
_isFailing = calculateIsFailing();
if (_log.shouldLog(Log.DEBUG))
- _log.debug("Coallesced: speed [" + _speedValue + "] reliability [" + _reliabilityValue + "] integration [" + _integrationValue + "] failing? [" + _isFailing + "]");
+ _log.debug("Coallesced: speed [" + _speedValue + "] reliability [" + _reliabilityValue + "] capacity [" + _capacityValue + "] integration [" + _integrationValue + "] failing? [" + _isFailing + "]");
}
private double calculateSpeed() { return _context.speedCalculator().calc(this); }
private double calculateReliability() { return _context.reliabilityCalculator().calc(this); }
+ private double calculateCapacity() { return _context.capacityCalculator().calc(this); }
private double calculateIntegration() { return _context.integrationCalculator().calc(this); }
private boolean calculateIsFailing() { return _context.isFailingCalculator().calcBoolean(this); }
void setIsFailing(boolean val) { _isFailing = val; }
@@ -314,6 +333,7 @@ public class PeerProfile {
buf.append("Peer " + profile.getPeer().toBase64()
+ ":\t Speed:\t" + fmt.format(profile.calculateSpeed())
+ " Reliability:\t" + fmt.format(profile.calculateReliability())
+ + " Capacity:\t" + fmt.format(profile.calculateCapacity())
+ " Integration:\t" + fmt.format(profile.calculateIntegration())
+ " Active?\t" + profile.getIsActive()
+ " Failing?\t" + profile.calculateIsFailing()
diff --git a/router/java/src/net/i2p/router/peermanager/PeerTestJob.java b/router/java/src/net/i2p/router/peermanager/PeerTestJob.java
index 6694f7f1f..970f30204 100644
--- a/router/java/src/net/i2p/router/peermanager/PeerTestJob.java
+++ b/router/java/src/net/i2p/router/peermanager/PeerTestJob.java
@@ -230,6 +230,10 @@ public class PeerTestJob extends JobImpl {
return false;
}
}
+
+
+ private boolean getShouldFailTunnels() { return true; }
+
/**
* Called when the peer's response is found
*/
@@ -256,22 +260,25 @@ public class PeerTestJob extends JobImpl {
+ _replyTunnel.getTunnelId().getTunnelId());
getContext().profileManager().dbLookupSuccessful(_peer.getIdentity().getHash(), responseTime);
- _sendTunnel.setLastTested(getContext().clock().now());
- _replyTunnel.setLastTested(getContext().clock().now());
-
- TunnelInfo cur = _replyTunnel;
- while (cur != null) {
- Hash peer = cur.getThisHop();
- if ( (peer != null) && (!getContext().routerHash().equals(peer)) )
- getContext().profileManager().tunnelTestSucceeded(peer, responseTime);
- cur = cur.getNextHopInfo();
- }
- cur = _sendTunnel;
- while (cur != null) {
- Hash peer = cur.getThisHop();
- if ( (peer != null) && (!getContext().routerHash().equals(peer)) )
- getContext().profileManager().tunnelTestSucceeded(peer, responseTime);
- cur = cur.getNextHopInfo();
+ // only honor success if we also honor failure
+ if (getShouldFailTunnels()) {
+ _sendTunnel.setLastTested(getContext().clock().now());
+ _replyTunnel.setLastTested(getContext().clock().now());
+
+ TunnelInfo cur = _replyTunnel;
+ while (cur != null) {
+ Hash peer = cur.getThisHop();
+ if ( (peer != null) && (!getContext().routerHash().equals(peer)) )
+ getContext().profileManager().tunnelTestSucceeded(peer, responseTime);
+ cur = cur.getNextHopInfo();
+ }
+ cur = _sendTunnel;
+ while (cur != null) {
+ Hash peer = cur.getThisHop();
+ if ( (peer != null) && (!getContext().routerHash().equals(peer)) )
+ getContext().profileManager().tunnelTestSucceeded(peer, responseTime);
+ cur = cur.getNextHopInfo();
+ }
}
}
@@ -294,7 +301,6 @@ public class PeerTestJob extends JobImpl {
_sendTunnel = sendTunnel;
}
public String getName() { return "Peer test failed"; }
- private boolean getShouldFailTunnels() { return true; }
private boolean getShouldFailPeer() { return true; }
public void runJob() {
if (getShouldFailPeer())
@@ -307,7 +313,6 @@ public class PeerTestJob extends JobImpl {
+ _replyTunnel.getTunnelId().getTunnelId());
if (getShouldFailTunnels()) {
-
_sendTunnel.setLastTested(getContext().clock().now());
_replyTunnel.setLastTested(getContext().clock().now());
diff --git a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java
index ee0ef5e93..00162e0b5 100644
--- a/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java
+++ b/router/java/src/net/i2p/router/peermanager/ProfileManagerImpl.java
@@ -258,7 +258,7 @@ public class ProfileManagerImpl implements ProfileManager {
Set peers = new HashSet(numPeers);
// lets get the fastest ones we've got (this fails over to include just plain reliable,
// or even notFailing peers if there aren't enough fast ones)
- _context.profileOrganizer().selectFastAndReliablePeers(numPeers, null, peers);
+ _context.profileOrganizer().selectFastPeers(numPeers, null, peers);
Properties props = new Properties();
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
@@ -268,10 +268,10 @@ public class ProfileManagerImpl implements ProfileManager {
StringBuffer buf = new StringBuffer(64);
buf.append("status: ");
- if (_context.profileOrganizer().isFastAndReliable(peer)) {
- buf.append("fastReliable");
- } else if (_context.profileOrganizer().isReliable(peer)) {
- buf.append("reliable");
+ if (_context.profileOrganizer().isFast(peer)) {
+ buf.append("fast");
+ } else if (_context.profileOrganizer().isHighCapacity(peer)) {
+ buf.append("highCapacity");
} else if (_context.profileOrganizer().isFailing(peer)) {
buf.append("failing");
} else {
@@ -283,7 +283,7 @@ public class ProfileManagerImpl implements ProfileManager {
else
buf.append(" ");
- buf.append("reliability: ").append(num(prof.getReliabilityValue())).append(" ");
+ buf.append("capacity: ").append(num(prof.getCapacityValue())).append(" ");
buf.append("speed: ").append(num(prof.getSpeedValue())).append(" ");
buf.append("integration: ").append(num(prof.getIntegrationValue()));
diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
index 080ab92bd..6449ba585 100644
--- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
+++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java
@@ -32,10 +32,10 @@ import net.i2p.util.Log;
public class ProfileOrganizer {
private Log _log;
private RouterContext _context;
- /** H(routerIdentity) to PeerProfile for all peers that are fast and reliable */
- private Map _fastAndReliablePeers;
- /** H(routerIdentity) to PeerProfile for all peers that are reliable */
- private Map _reliablePeers;
+ /** H(routerIdentity) to PeerProfile for all peers that are fast and high capacity*/
+ private Map _fastPeers;
+ /** H(routerIdentity) to PeerProfile for all peers that have high capacities */
+ private Map _highCapacityPeers;
/** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */
private Map _wellIntegratedPeers;
/** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */
@@ -46,27 +46,18 @@ public class ProfileOrganizer {
private Hash _us;
private ProfilePersistenceHelper _persistenceHelper;
- /** PeerProfile objects for all peers profiled, orderd by most reliable first */
- private Set _strictReliabilityOrder;
+ /** PeerProfile objects for all peers profiled, orderd by the ones with the highest capacity first */
+ private Set _strictCapacityOrder;
/** threshold speed value, seperating fast from slow */
private double _thresholdSpeedValue;
/** threshold reliability value, seperating reliable from unreliable */
- private double _thresholdReliabilityValue;
+ private double _thresholdCapacityValue;
/** integration value, seperating well integrated from not well integrated */
private double _thresholdIntegrationValue;
- private InverseReliabilityComparator _calc;
-
- /**
- * Defines what percentage of the average reliability will be used as the
- * reliability threshold. For example, .5 means all peers with the reliability
- * greater than half of the average will be considered "reliable".
- *
- */
- public static final String PROP_RELIABILITY_THRESHOLD_FACTOR = "profileOrganizer.reliabilityThresholdFactor";
- public static final double DEFAULT_RELIABILITY_THRESHOLD_FACTOR = .5d;
-
+ private InverseCapacityComparator _comp;
+
/**
* Defines the minimum number of 'fast' peers that the organizer should select. See
* {@link ProfileOrganizer#getMinimumFastPeers}
@@ -84,28 +75,28 @@ public class ProfileOrganizer {
public ProfileOrganizer(RouterContext context) {
_context = context;
_log = context.logManager().getLog(ProfileOrganizer.class);
- _calc = new InverseReliabilityComparator();
- _fastAndReliablePeers = new HashMap(16);
- _reliablePeers = new HashMap(16);
+ _comp = new InverseCapacityComparator();
+ _fastPeers = new HashMap(16);
+ _highCapacityPeers = new HashMap(16);
_wellIntegratedPeers = new HashMap(16);
_notFailingPeers = new HashMap(16);
_failingPeers = new HashMap(16);
- _strictReliabilityOrder = new TreeSet(_calc);
+ _strictCapacityOrder = new TreeSet(_comp);
_thresholdSpeedValue = 0.0d;
- _thresholdReliabilityValue = 0.0d;
+ _thresholdCapacityValue = 0.0d;
_thresholdIntegrationValue = 0.0d;
_persistenceHelper = new ProfilePersistenceHelper(_context);
}
/**
- * Order profiles by their reliability, but backwards (most reliable / highest value first).
+ * Order profiles by their capacity, but backwards (highest capacity / value first).
*
*/
- private final class InverseReliabilityComparator implements Comparator {
+ private final class InverseCapacityComparator implements Comparator {
/**
* Compare the two objects backwards. The standard comparator returns
* -1 if lhs is less than rhs, 1 if lhs is greater than rhs, or 0 if they're
- * equal. To keep a strict ordering, we measure peers with equal reliability
+ * equal. To keep a strict ordering, we measure peers with equal capacity
* values according to their hashes
*
* @return -1 if the right hand side is smaller, 1 if the left hand side is
@@ -117,8 +108,8 @@ public class ProfileOrganizer {
PeerProfile left = (PeerProfile)lhs;
PeerProfile right= (PeerProfile)rhs;
- double rval = right.getReliabilityValue();
- double lval = left.getReliabilityValue();
+ double rval = right.getCapacityValue();
+ double lval = left.getCapacityValue();
if (lval == rval) // note the following call inverts right and left (see: classname)
return DataHelper.compareTo(right.getPeer().getData(), left.getPeer().getData());
@@ -126,10 +117,11 @@ public class ProfileOrganizer {
boolean rightBigger = rval > lval;
if (_log.shouldLog(Log.DEBUG))
- _log.debug("The reliability of " + right.getPeer().toBase64()
+ _log.debug("The capacity of " + right.getPeer().toBase64()
+ " and " + left.getPeer().toBase64() + " marks " + (rightBigger ? "right" : "left")
- + " as larger: r=" + right.getReliabilityValue() + " l="
- + left.getReliabilityValue());
+ + " as larger: r=" + right.getCapacityValue()
+ + " l="
+ + left.getCapacityValue());
if (rightBigger)
return 1;
@@ -164,26 +156,26 @@ public class ProfileOrganizer {
PeerProfile old = locked_getProfile(profile.getPeer());
profile.coallesceStats();
locked_placeProfile(profile);
- _strictReliabilityOrder.add(profile);
+ _strictCapacityOrder.add(profile);
return old;
}
}
- public int countFastAndReliablePeers() { synchronized (_reorganizeLock) { return _fastAndReliablePeers.size(); } }
- public int countReliablePeers() { synchronized (_reorganizeLock) { return _reliablePeers.size(); } }
+ public int countFastPeers() { synchronized (_reorganizeLock) { return _fastPeers.size(); } }
+ public int countHighCapacityPeers() { synchronized (_reorganizeLock) { return _highCapacityPeers.size(); } }
public int countWellIntegratedPeers() { synchronized (_reorganizeLock) { return _wellIntegratedPeers.size(); } }
public int countNotFailingPeers() { synchronized (_reorganizeLock) { return _notFailingPeers.size(); } }
public int countFailingPeers() { synchronized (_reorganizeLock) { return _failingPeers.size(); } }
- public boolean isFastAndReliable(Hash peer) { synchronized (_reorganizeLock) { return _fastAndReliablePeers.containsKey(peer); } }
- public boolean isReliable(Hash peer) { synchronized (_reorganizeLock) { return _reliablePeers.containsKey(peer); } }
+ public boolean isFast(Hash peer) { synchronized (_reorganizeLock) { return _fastPeers.containsKey(peer); } }
+ public boolean isHighCapacity(Hash peer) { synchronized (_reorganizeLock) { return _highCapacityPeers.containsKey(peer); } }
public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } }
public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } }
/**
* Return a set of Hashes for peers that are both fast and reliable. If an insufficient
- * number of peers are both fast and reliable, fall back onto reliable peers, and if reliable
- * peers doesn't contain sufficient peers, fall back onto not failing peers, and even THAT doesn't
+ * number of peers are both fast and reliable, fall back onto high capacity peers, and if that
+ * doesn't contain sufficient peers, fall back onto not failing peers, and even THAT doesn't
* have sufficient peers, fall back onto failing peers.
*
* @param howMany how many peers are desired
@@ -191,22 +183,26 @@ public class ProfileOrganizer {
* @param matches set to store the return value in
*
*/
- public void selectFastAndReliablePeers(int howMany, Set exclude, Set matches) {
+ public void selectFastPeers(int howMany, Set exclude, Set matches) {
synchronized (_reorganizeLock) {
- locked_selectPeers(_fastAndReliablePeers, howMany, exclude, matches);
+ locked_selectPeers(_fastPeers, howMany, exclude, matches);
}
if (matches.size() < howMany)
- selectReliablePeers(howMany, exclude, matches);
+ selectHighCapacityPeers(howMany, exclude, matches);
return;
}
/**
- * Return a set of Hashes for peers that are reliable.
+ * Return a set of Hashes for peers that have a high capacity
*
*/
- public void selectReliablePeers(int howMany, Set exclude, Set matches) {
+ public void selectHighCapacityPeers(int howMany, Set exclude, Set matches) {
synchronized (_reorganizeLock) {
- locked_selectPeers(_reliablePeers, howMany, exclude, matches);
+ // we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST
+ // or we are falling back due to _fastPeers being too small, so we can always
+ // exclude the fast peers
+ exclude.addAll(_fastPeers.keySet());
+ locked_selectPeers(_highCapacityPeers, howMany, exclude, matches);
}
if (matches.size() < howMany)
selectNotFailingPeers(howMany, exclude, matches);
@@ -274,7 +270,7 @@ public class ProfileOrganizer {
int needed = howMany - orig;
List selected = new ArrayList(needed);
synchronized (_reorganizeLock) {
- for (Iterator iter = _strictReliabilityOrder.iterator(); selected.size() < needed && iter.hasNext(); ) {
+ for (Iterator iter = _strictCapacityOrder.iterator(); selected.size() < needed && iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next();
if (matches.contains(prof.getPeer()) ||
(exclude != null && exclude.contains(prof.getPeer())) ||
@@ -309,11 +305,11 @@ public class ProfileOrganizer {
*/
public Set selectAllPeers() {
synchronized (_reorganizeLock) {
- Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _reliablePeers.size() + _fastAndReliablePeers.size());
+ Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.keySet());
allPeers.addAll(_notFailingPeers.keySet());
- allPeers.addAll(_reliablePeers.keySet());
- allPeers.addAll(_fastAndReliablePeers.keySet());
+ allPeers.addAll(_highCapacityPeers.keySet());
+ allPeers.addAll(_fastPeers.keySet());
return allPeers;
}
}
@@ -326,23 +322,23 @@ public class ProfileOrganizer {
*/
public void reorganize() {
synchronized (_reorganizeLock) {
- Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _reliablePeers.size() + _fastAndReliablePeers.size());
+ Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size());
allPeers.addAll(_failingPeers.values());
allPeers.addAll(_notFailingPeers.values());
- allPeers.addAll(_reliablePeers.values());
- allPeers.addAll(_fastAndReliablePeers.values());
+ allPeers.addAll(_highCapacityPeers.values());
+ allPeers.addAll(_fastPeers.values());
_failingPeers.clear();
_notFailingPeers.clear();
- _reliablePeers.clear();
- _fastAndReliablePeers.clear();
+ _highCapacityPeers.clear();
+ _fastPeers.clear();
- Set reordered = new TreeSet(_calc);
- for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) {
+ Set reordered = new TreeSet(_comp);
+ for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next();
reordered.add(prof);
}
- _strictReliabilityOrder = reordered;
+ _strictCapacityOrder = reordered;
calculateThresholds(allPeers);
@@ -355,14 +351,15 @@ public class ProfileOrganizer {
locked_promoteFastAsNecessary();
if (_log.shouldLog(Log.DEBUG)) {
- _log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue + ", reliability: " + _thresholdReliabilityValue + ", speed: " + _thresholdSpeedValue + "]");
+ _log.debug("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue
+ + ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]");
StringBuffer buf = new StringBuffer(512);
- for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) {
+ for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile prof = (PeerProfile)iter.next();
- buf.append('[').append(prof.toString()).append('=').append(prof.getReliabilityValue()).append("] ");
+ buf.append('[').append(prof.toString()).append('=').append(prof.getCapacityValue()).append("] ");
}
- _log.debug("Strictly organized (most reliable first): " + buf.toString());
- _log.debug("fast and reliable: " + _fastAndReliablePeers.values());
+ _log.debug("Strictly organized (highest capacity first): " + buf.toString());
+ _log.debug("fast: " + _fastPeers.values());
}
}
}
@@ -370,23 +367,23 @@ public class ProfileOrganizer {
/**
* As with locked_unfailAsNecessary, I'm not sure how much I like this - if there
* aren't enough fast peers, move some of the not-so-fast peers into the fast group.
- * This picks the not-so-fast peers based on reliability, not speed, and skips over any
+ * This picks the not-so-fast peers based on capacity, not speed, and skips over any
* failing peers. Perhaps it should build a seperate strict ordering by speed? Nah, not
* worth the maintenance and memory overhead, at least not for now.
*
*/
private void locked_promoteFastAsNecessary() {
int minFastPeers = getMinimumFastPeers();
- int numToPromote = minFastPeers - _fastAndReliablePeers.size();
+ int numToPromote = minFastPeers - _fastPeers.size();
if (numToPromote > 0) {
if (_log.shouldLog(Log.DEBUG))
- _log.debug("Need to explicitly promote " + numToPromote + " peers to the fast+reliable group");
- for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) {
+ _log.debug("Need to explicitly promote " + numToPromote + " peers to the fast group");
+ for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile cur = (PeerProfile)iter.next();
- if ( (!_fastAndReliablePeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) {
- _fastAndReliablePeers.put(cur.getPeer(), cur);
+ if ( (!_fastPeers.containsKey(cur.getPeer())) && (!cur.getIsFailing()) ) {
+ _fastPeers.put(cur.getPeer(), cur);
// no need to remove it from any of the other groups, since if it is
- // fast and reliable, it is reliable, and it is not failing
+ // fast, it has a high capacity, and it is not failing
numToPromote--;
if (numToPromote <= 0)
break;
@@ -421,7 +418,7 @@ public class ProfileOrganizer {
int needToUnfail = MIN_NOT_FAILING_ACTIVE - notFailingActive;
if (needToUnfail > 0) {
int unfailed = 0;
- for (Iterator iter = _strictReliabilityOrder.iterator(); iter.hasNext(); ) {
+ for (Iterator iter = _strictCapacityOrder.iterator(); iter.hasNext(); ) {
PeerProfile best = (PeerProfile)iter.next();
if ( (best.getIsActive()) && (best.getIsFailing()) ) {
if (_log.shouldLog(Log.WARN))
@@ -436,19 +433,23 @@ public class ProfileOrganizer {
}
}
+ public double getSpeedThreshold() { return _thresholdSpeedValue; }
+ public double getCapacityThreshold() { return _thresholdCapacityValue; }
+
////////
// no more public stuff below
////////
/**
* Update the thresholds based on the profiles in this set. currently
- * implements the reliability threshold based on the median reliability (ignoring
- * failing peers) with integration and speed thresholds being derived from the average
- * of the active reliable peers.
+ * implements the capacity threshold based on the median capacity (ignoring
+ * failing or inactive peers), using the median speed from that group to
+ * define the speed threshold, and use the mean integration value from the
+ * high capacity group to define the integration threshold.
*
*/
private void calculateThresholds(Set allPeers) {
- Set reordered = new TreeSet(_calc);
+ Set reordered = new TreeSet(_comp);
for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) {
PeerProfile profile = (PeerProfile)iter.next();
@@ -460,42 +461,54 @@ public class ProfileOrganizer {
reordered.add(profile);
}
- int numNotFailing = reordered.size();
- // how many are in the "top half" of the reliability peers?
- int topCount = 0;
- if (numNotFailing != 0)
- topCount = (int)(numNotFailing / 2);
-
- if (_log.shouldLog(Log.DEBUG))
- _log.debug("top count is " + topCount + " out of " + numNotFailing);
+ int numNotFailing = reordered.size();
+ // how many are in the "top half" of the high capacity peers?
+ int i = 0;
+ int threshold = 0;
+ if (numNotFailing > 0)
+ threshold = numNotFailing / 2;
+ for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
+ PeerProfile profile = (PeerProfile)iter.next();
+ if (i >= threshold) {
+ _thresholdCapacityValue = profile.getCapacityValue();
+ break;
+ }
+ }
+
+ Set speeds = new TreeSet();
int numActive = 0;
double totalIntegration = 0;
double totalSpeed = 0;
- int i = 0;
for (Iterator iter = reordered.iterator(); iter.hasNext(); i++) {
PeerProfile profile = (PeerProfile)iter.next();
- if (i < topCount) {
+ if (profile.getCapacityValue() >= _thresholdCapacityValue) {
if (profile.getIsActive()) {
numActive++;
if (profile.getIntegrationValue() > 0)
totalIntegration += profile.getIntegrationValue();
- if (profile.getSpeedValue() > 0)
- totalSpeed += profile.getSpeedValue();
+ // duplicates being clobbered is fine by us
+ speeds.add(new Double(0-profile.getSpeedValue()));
}
- } else if (i == topCount) {
- if (profile.getReliabilityValue() < 0)
- _thresholdReliabilityValue = 0;
- else
- _thresholdReliabilityValue = profile.getReliabilityValue();
- break;
} else {
+ // its ordered
break;
}
}
+
+ // calc the median speed of high capacity peers
+ i = 0;
+ for (Iterator iter = speeds.iterator(); iter.hasNext(); i++) {
+ Double speed = (Double)iter.next();
+ if (i >= (speeds.size() / 2)) {
+ _thresholdSpeedValue = 0-speed.doubleValue();
+ break;
+ }
+ }
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Threshold value for speed: " + _thresholdSpeedValue + " with speeds: " + speeds);
_thresholdIntegrationValue = 1.0d * avg(totalIntegration, numActive);
- _thresholdSpeedValue = 1.0d * avg(totalSpeed, numActive);
}
/** simple average, or 0 if NaN */
@@ -543,23 +556,23 @@ public class ProfileOrganizer {
if (profile.getIsFailing()) {
if (!shouldDrop(profile))
_failingPeers.put(profile.getPeer(), profile);
- _fastAndReliablePeers.remove(profile.getPeer());
- _reliablePeers.remove(profile.getPeer());
+ _fastPeers.remove(profile.getPeer());
+ _highCapacityPeers.remove(profile.getPeer());
_wellIntegratedPeers.remove(profile.getPeer());
_notFailingPeers.remove(profile.getPeer());
} else {
_failingPeers.remove(profile.getPeer());
- _fastAndReliablePeers.remove(profile.getPeer());
- _reliablePeers.remove(profile.getPeer());
+ _fastPeers.remove(profile.getPeer());
+ _highCapacityPeers.remove(profile.getPeer());
_wellIntegratedPeers.remove(profile.getPeer());
_notFailingPeers.put(profile.getPeer(), profile);
- if (_thresholdReliabilityValue <= profile.getReliabilityValue()) {
- _reliablePeers.put(profile.getPeer(), profile);
+ if (_thresholdCapacityValue <= profile.getCapacityValue()) {
+ _highCapacityPeers.put(profile.getPeer(), profile);
if (_log.shouldLog(Log.DEBUG))
- _log.debug("Reliable: \t" + profile.getPeer().toBase64());
+ _log.debug("High capacity: \t" + profile.getPeer().toBase64());
if (_thresholdSpeedValue <= profile.getSpeedValue()) {
- _fastAndReliablePeers.put(profile.getPeer(), profile);
+ _fastPeers.put(profile.getPeer(), profile);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Fast: \t" + profile.getPeer().toBase64());
}
@@ -570,7 +583,7 @@ public class ProfileOrganizer {
_log.debug("Integrated: \t" + profile.getPeer().toBase64());
}
} else {
- // not reliable, but not failing (yet)
+ // not high capacity, but not failing (yet)
}
}
}
@@ -615,9 +628,10 @@ public class ProfileOrganizer {
buf.append("+ * ProfileOrganizer [filename]* + *+ */ + public static void main(String args[]) { + RouterContext ctx = new RouterContext(new net.i2p.router.Router()); + ProfileOrganizer organizer = new ProfileOrganizer(ctx); + organizer.setUs(Hash.FAKE_HASH); + ProfilePersistenceHelper helper = new ProfilePersistenceHelper(ctx); + for (int i = 0; i < args.length; i++) { + PeerProfile profile = helper.readProfile(new java.io.File(args[i])); + if (profile == null) { + System.err.println("Could not load profile " + args[i]); + continue; + } + organizer.addProfile(profile); + } + organizer.reorganize(); + DecimalFormat fmt = new DecimalFormat("0,000.0"); + fmt.setPositivePrefix("+"); + + for (Iterator iter = organizer.selectAllPeers().iterator(); iter.hasNext(); ) { + Hash peer = (Hash)iter.next(); + PeerProfile profile = organizer.getProfile(peer); + if (!profile.getIsActive()) continue; + System.out.println("Peer " + profile.getPeer().toBase64().substring(0,4) + + " [" + (organizer.isFast(peer) ? "F+R" : + organizer.isHighCapacity(peer) ? "R " : + organizer.isFailing(peer) ? "X " : " ") + "]: " + + "\t Speed:\t" + fmt.format(profile.getSpeedValue()) + + " Reliability:\t" + fmt.format(profile.getReliabilityValue()) + + " Capacity:\t" + fmt.format(profile.getCapacityValue()) + + " Integration:\t" + fmt.format(profile.getIntegrationValue()) + + " Active?\t" + profile.getIsActive() + + " Failing?\t" + profile.getIsFailing()); + + } + + System.out.println("Thresholds:"); + System.out.println("Speed: " + num(organizer.getSpeedThreshold()) + " (" + organizer.countFastPeers() + " fast peers)"); + System.out.println("Capacity: " + num(organizer.getCapacityThreshold()) + " (" + organizer.countHighCapacityPeers() + " reliable peers)"); + } + } diff --git a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java index d4ed3423a..26926f47d 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java +++ b/router/java/src/net/i2p/router/peermanager/ProfilePersistenceHelper.java @@ -65,10 +65,10 @@ class ProfilePersistenceHelper { String groups = null; if (_context.profileOrganizer().isFailing(profile.getPeer())) { groups = "failing"; - } else if (!_context.profileOrganizer().isReliable(profile.getPeer())) { + } else if (!_context.profileOrganizer().isHighCapacity(profile.getPeer())) { groups = "not failing"; } else { - if (_context.profileOrganizer().isFastAndReliable(profile.getPeer())) + if (_context.profileOrganizer().isFast(profile.getPeer())) groups = "fast and reliable"; else groups = "reliable"; diff --git a/router/java/src/net/i2p/router/peermanager/TunnelHistory.java b/router/java/src/net/i2p/router/peermanager/TunnelHistory.java index 3f05973d1..d9c498e6f 100644 --- a/router/java/src/net/i2p/router/peermanager/TunnelHistory.java +++ b/router/java/src/net/i2p/router/peermanager/TunnelHistory.java @@ -22,6 +22,7 @@ public class TunnelHistory { private volatile long _lifetimeFailed; private volatile long _lastFailed; private RateStat _rejectRate; + private RateStat _failRate; public TunnelHistory(RouterContext context) { _context = context; @@ -36,7 +37,8 @@ public class TunnelHistory { } private void createRates() { - _rejectRate = new RateStat("tunnelHistory.rejectRate", "How often does this peer reject a tunnel request?", "tunnelHistory", new long[] { 60*1000l, 10*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _rejectRate = new RateStat("tunnelHistory.rejectRate", "How often does this peer reject a tunnel request?", "tunnelHistory", new long[] { 60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l }); + _failRate = new RateStat("tunnelHistory.failRate", "How often do tunnels this peer accepts fail?", "tunnelHistory", new long[] { 60*1000l, 10*60*1000l, 30*60*1000l, 60*60*1000l, 24*60*60*1000l }); } /** total tunnels the peer has agreed to participate in */ @@ -63,6 +65,7 @@ public class TunnelHistory { } public void incrementFailed() { _lifetimeFailed++; + _failRate.addData(1, 1); _lastFailed = _context.clock().now(); } @@ -74,6 +77,14 @@ public class TunnelHistory { public void setLastFailed(long when) { _lastFailed = when; } public RateStat getRejectionRate() { return _rejectRate; } + public RateStat getFailedRate() { return _failRate; } + + public void coallesceStats() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Coallescing stats"); + _rejectRate.coallesceStats(); + _failRate.coallesceStats(); + } private final static String NL = System.getProperty("line.separator"); @@ -91,6 +102,7 @@ public class TunnelHistory { add(buf, "lifetimeRejected", _lifetimeRejected, "How many tunnels has the peer ever refused to participate in?"); out.write(buf.toString().getBytes()); _rejectRate.store(out, "tunnelHistory.rejectRate"); + _rejectRate.store(out, "tunnelHistory.failRate"); } private void add(StringBuffer buf, String name, long val, String description) { @@ -107,12 +119,15 @@ public class TunnelHistory { _lifetimeRejected = getLong(props, "tunnels.lifetimeRejected"); try { _rejectRate.load(props, "tunnelHistory.rejectRate", true); - _log.debug("Loading tunnelHistory.rejectRate"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Loading tunnelHistory.rejectRate"); + _rejectRate.load(props, "tunnelHistory.failRate", true); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Loading tunnelHistory.failRate"); } catch (IllegalArgumentException iae) { - _log.warn("TunnelHistory reject rate is corrupt, resetting", iae); + _log.warn("TunnelHistory rates are corrupt, resetting", iae); createRates(); } - } private final static long getLong(Properties props, String key) {