* ProfileManager: Make some profile update methods non-blocking to reduce

stalls in the transports during profile reorg
    - Make isFailing() non-blocking since it's always false
This commit is contained in:
zzz
2011-12-17 14:02:01 +00:00
parent bf45e31c62
commit 48841481f0
3 changed files with 94 additions and 36 deletions

View File

@@ -21,6 +21,10 @@ import net.i2p.router.ProfileManager;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Methods to update profiles.
* Unless otherwise noted, methods are blocking on the reorganize lock.
*/
public class ProfileManagerImpl implements ProfileManager {
private final Log _log;
private final RouterContext _context;
@@ -33,31 +37,31 @@ public class ProfileManagerImpl implements ProfileManager {
/**
* Note that it took msToSend to send a message of size bytesSent to the peer over the transport.
* This should only be called if the transport considered the send successful.
*
* Non-blocking. Will not update the profile if we can't get the lock.
*/
public void messageSent(Hash peer, String transport, long msToSend, long bytesSent) {
PeerProfile data = getProfile(peer);
PeerProfile data = getProfileNonblocking(peer);
if (data == null) return;
data.setLastSendSuccessful(_context.clock().now());
//data.getSendSuccessSize().addData(bytesSent, msToSend);
}
/**
* Note that the router failed to send a message to the peer over the transport specified
*
* Note that the router failed to send a message to the peer over the transport specified.
* Non-blocking. Will not update the profile if we can't get the lock.
*/
public void messageFailed(Hash peer, String transport) {
PeerProfile data = getProfile(peer);
PeerProfile data = getProfileNonblocking(peer);
if (data == null) return;
data.setLastSendFailed(_context.clock().now());
}
/**
* Note that the router failed to send a message to the peer over any transport
*
* Note that the router failed to send a message to the peer over any transport.
* Non-blocking. Will not update the profile if we can't get the lock.
*/
public void messageFailed(Hash peer) {
PeerProfile data = getProfile(peer);
PeerProfile data = getProfileNonblocking(peer);
if (data == null) return;
data.setLastSendFailed(_context.clock().now());
}
@@ -70,7 +74,7 @@ public class ProfileManagerImpl implements ProfileManager {
if (_log.shouldLog(Log.INFO))
_log.info("Comm error occurred for peer " + peer.toBase64(), new Exception("Comm error"));
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastSendFailed(_context.clock().now());
}
@@ -80,7 +84,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void tunnelJoined(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.getTunnelCreateResponseTime().addData(responseTimeMs, responseTimeMs);
data.setLastHeardFrom(_context.clock().now());
data.getTunnelHistory().incrementAgreedTo();
@@ -95,7 +99,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void tunnelRejected(Hash peer, long responseTimeMs, int severity) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
data.getTunnelHistory().incrementRejected(severity);
}
@@ -108,7 +112,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void tunnelTimedOut(Hash peer) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.getTunnelHistory().incrementRejected(TunnelHistory.TUNNEL_REJECT_BANDWIDTH);
}
@@ -119,7 +123,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void tunnelTestSucceeded(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.updateTunnelTestTimeAverage(responseTimeMs);
data.getTunnelTestResponseTime().addData(responseTimeMs, responseTimeMs);
}
@@ -128,14 +132,15 @@ public class ProfileManagerImpl implements ProfileManager {
if (_context.routerHash().equals(peer))
return;
PeerProfile data = getProfile(peer);
if (data != null)
//if (data != null)
data.dataPushed(size); // ignore rtt, as we are averaging over a minute
}
public void tunnelDataPushed1m(Hash peer, int size) {
if (_context.routerHash().equals(peer))
return;
PeerProfile data = getProfile(peer);
if (data != null)
//if (data != null)
data.dataPushed1m(size);
}
@@ -144,7 +149,7 @@ public class ProfileManagerImpl implements ProfileManager {
if (_context.routerHash().equals(peer))
return;
PeerProfile data = getProfile(peer);
if (data != null)
//if (data != null)
data.tunnelDataTransferred(size);
}
@@ -162,7 +167,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void tunnelFailed(Hash peer, int pct) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
data.getTunnelHistory().incrementFailed(pct);
}
@@ -174,7 +179,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbLookupSuccessful(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
if (!data.getIsExpandedDB())
data.expandDBProfile();
@@ -191,7 +196,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbLookupFailed(Hash peer) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
if (!data.getIsExpandedDB())
data.expandDBProfile();
DBHistory hist = data.getDBHistory();
@@ -208,7 +213,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbLookupReply(Hash peer, int newPeers, int oldPeers, int invalid, int duplicate, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
if (!data.getIsExpandedDB())
return;
@@ -224,7 +229,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbLookupReceived(Hash peer) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
if (!data.getIsExpandedDB())
return;
@@ -238,7 +243,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbStoreReceived(Hash peer, boolean wasNewKey) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
if (!data.getIsExpandedDB())
return;
@@ -257,7 +262,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbStoreSent(Hash peer, long responseTimeMs) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
long now = _context.clock().now();
data.setLastHeardFrom(now);
data.setLastSendSuccessful(now);
@@ -275,7 +280,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbStoreSuccessful(Hash peer) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
long now = _context.clock().now();
data.setLastHeardFrom(now);
data.setLastSendSuccessful(now);
@@ -293,7 +298,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void dbStoreFailed(Hash peer) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
if (!data.getIsExpandedDB())
data.expandDBProfile();
DBHistory hist = data.getDBHistory();
@@ -308,7 +313,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void heardAbout(Hash peer) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
data.setLastHeardAbout(_context.clock().now());
}
@@ -318,7 +323,7 @@ public class ProfileManagerImpl implements ProfileManager {
*/
public void heardAbout(Hash peer, long when) {
PeerProfile data = getProfile(peer);
if (data == null) return;
//if (data == null) return;
if (when > data.getLastHeardAbout())
data.setLastHeardAbout(when);
}
@@ -327,16 +332,21 @@ public class ProfileManagerImpl implements ProfileManager {
* Note that the router received a message from the given peer on the specified
* transport. Messages received without any "from" information aren't recorded
* through this metric. If msToReceive is negative, there was no timing information
* available
*
* available.
* Non-blocking. Will not update the profile if we can't get the lock.
*/
public void messageReceived(Hash peer, String style, long msToReceive, int bytesRead) {
PeerProfile data = getProfile(peer);
PeerProfile data = getProfileNonblocking(peer);
if (data == null) return;
data.setLastHeardFrom(_context.clock().now());
//data.getReceiveSize().addData(bytesRead, msToReceive);
}
/**
* Blocking.
* Creates a new profile if it didn't exist.
* @return non-null
*/
private PeerProfile getProfile(Hash peer) {
PeerProfile prof = _context.profileOrganizer().getProfile(peer);
if (prof == null) {
@@ -347,14 +357,28 @@ public class ProfileManagerImpl implements ProfileManager {
return prof;
}
/**
* Non-blocking.
* @return null if the profile doesn't exist, or the fetch would have blocked
* @since 0.8.12
*/
private PeerProfile getProfileNonblocking(Hash peer) {
return _context.profileOrganizer().getProfileNonblocking(peer);
}
/** provide a simple summary of a number of peers, suitable for publication in the netDb */
/**
* provide a simple summary of a number of peers, suitable for publication in the netDb
* @deprecated unused
*/
public Properties summarizePeers(int numPeers) {
/****
Set peers = new HashSet(numPeers);
// lets get the fastest ones we've got (this fails over to include just plain reliable,
// or even notFailing peers if there aren't enough fast ones)
_context.profileOrganizer().selectFastPeers(numPeers, null, peers);
****/
Properties props = new Properties();
/****
for (Iterator iter = peers.iterator(); iter.hasNext(); ) {
Hash peer = (Hash)iter.next();
PeerProfile prof = getProfile(peer);
@@ -384,11 +408,14 @@ public class ProfileManagerImpl implements ProfileManager {
props.setProperty("profile." + peer.toBase64().replace('=', '_'), buf.toString());
}
****/
return props;
}
/****
private final static DecimalFormat _fmt = new DecimalFormat("##0.00", new DecimalFormatSymbols(Locale.UK));
private final static String num(double val) {
synchronized (_fmt) { return _fmt.format(val); }
}
****/
}

View File

@@ -52,7 +52,7 @@ public class ProfileOrganizer {
private final Map<Hash, PeerProfile> _notFailingPeers;
/** H(routerIdnetity), containing elements in _notFailingPeers */
private final List<Hash> _notFailingPeersList;
/** H(routerIdentity) to PeerProfile for all peers that ARE failing horribly (but that we haven't dropped reference to yet) */
/** TO BE REMOVED H(routerIdentity) to PeerProfile for all peers that ARE failing horribly (but that we haven't dropped reference to yet) */
private final Map<Hash, PeerProfile> _failingPeers;
/** who are we? */
private Hash _us;
@@ -120,6 +120,15 @@ public class ProfileOrganizer {
_reorganizeLock.readLock().lock();
}
/**
* Get the lock if we can. Non-blocking.
* @return true if the lock was acquired
* @since 0.8.12
*/
private boolean tryReadLock() {
return _reorganizeLock.readLock().tryLock();
}
private void releaseReadLock() {
_reorganizeLock.readLock().unlock();
}
@@ -147,8 +156,8 @@ public class ProfileOrganizer {
public double getIntegrationThreshold() { return _thresholdIntegrationValue; }
/**
* Retrieve the profile for the given peer, if one exists (else null)
*
* Retrieve the profile for the given peer, if one exists (else null).
* Blocking if a reorganize is happening.
*/
public PeerProfile getProfile(Hash peer) {
getReadLock();
@@ -157,6 +166,20 @@ public class ProfileOrganizer {
} finally { releaseReadLock(); }
}
/**
* Retrieve the profile for the given peer, if one exists (else null).
* Non-blocking. Returns null if a reorganize is happening.
* @since 0.8.12
*/
public PeerProfile getProfileNonblocking(Hash peer) {
if (tryReadLock()) {
try {
return locked_getProfile(peer);
} finally { releaseReadLock(); }
}
return null;
}
/**
* Add the new profile, returning the old value (or null if no profile existed)
*
@@ -243,7 +266,15 @@ public class ProfileOrganizer {
public boolean isFast(Hash peer) { return isX(_fastPeers, peer); }
public boolean isHighCapacity(Hash peer) { return isX(_highCapacityPeers, peer); }
public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); }
public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); }
/**
* Deprecated for now, always false
*/
public boolean isFailing(Hash peer) {
// Always false so skip the lock
//return isX(_failingPeers, peer);
return false;
}
/** @since 0.8.8 */
void clearProfiles() {

View File

@@ -126,7 +126,7 @@ class IntroductionManager {
_log.info("Picked peer has no SSU address: " + ri);
continue;
}
if (_context.profileOrganizer().isFailing(cur.getRemotePeer()) ||
if ( /* _context.profileOrganizer().isFailing(cur.getRemotePeer()) || */
_context.shitlist().isShitlisted(cur.getRemotePeer()) ||
_transport.wasUnreachable(cur.getRemotePeer())) {
if (_log.shouldLog(Log.INFO))