diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java b/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java index 229391cd71f804c394ac3047515f9428c72c2db6..3ee8932f61f1653d5ba9dc3143fdd5c833001ee7 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerMonitorTask.java @@ -23,6 +23,8 @@ package org.klomp.snark; import java.util.Iterator; import java.util.TimerTask; +import net.i2p.data.DataHelper; + /** * TimerTask that monitors the peers and total up/download speeds. * Works together with the main Snark class to report periodical statistics. @@ -83,21 +85,12 @@ class PeerMonitorTask extends TimerTask // Print some statistics long downloaded = coordinator.getDownloaded(); - String totalDown; - if (downloaded >= 10 * 1024 * 1024) - totalDown = (downloaded / (1024 * 1024)) + "MB"; - else - totalDown = (downloaded / 1024 )+ "KB"; + String totalDown = DataHelper.formatSize(downloaded) + "B"; long uploaded = coordinator.getUploaded(); - String totalUp; - if (uploaded >= 10 * 1024 * 1024) - totalUp = (uploaded / (1024 * 1024)) + "MB"; - else - totalUp = (uploaded / 1024) + "KB"; + String totalUp = DataHelper.formatSize(uploaded) + "B"; int needP = coordinator.storage.needed(); - long needMB - = needP * coordinator.metainfo.getPieceLength(0) / (1024 * 1024); + long needMB = needP * coordinator.metainfo.getPieceLength(0) / (1024 * 1024); int totalP = coordinator.metainfo.getPieces(); long totalMB = coordinator.metainfo.getTotalLength() / (1024 * 1024); diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java index 054b58262b3ed013db52214738a6f76766cad0e5..378cd8758c94dbed20080b2fc6e2fd40c1ffe4b0 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerState.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerState.java @@ -60,9 +60,9 @@ class PeerState // If we have te resend outstanding requests (true after we got choked). private boolean resend = false; - private final static int MAX_PIPELINE = 3; // this is for outbound requests + private final static int MAX_PIPELINE = 5; // this is for outbound requests private final static int MAX_PIPELINE_BYTES = 128*1024; // this is for inbound requests - public final static int PARTSIZE = 32*1024; // Snark was 16K, i2p-bt uses 64KB + public final static int PARTSIZE = 16*1024; // outbound request private final static int MAX_PARTSIZE = 64*1024; // Don't let anybody request more than this PeerState(Peer peer, PeerListener listener, MetaInfo metainfo, diff --git a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java index 60c44f3d5976fa2a9fa9d805757d8e2e42261be6..a96793a35fe08a1efdc336182af2427b2a9a9142 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java +++ b/apps/i2psnark/java/src/org/klomp/snark/SnarkManager.java @@ -362,7 +362,7 @@ public class SnarkManager implements Snark.CompleteListener { public Properties getConfig() { return _config; } /** hardcoded for sanity. perhaps this should be customizable, for people who increase their ulimit, etc. */ - private static final int MAX_FILES_PER_TORRENT = 256; + private static final int MAX_FILES_PER_TORRENT = 512; /** set of filenames that we are dealing with */ public Set listTorrentFiles() { synchronized (_snarks) { return new HashSet(_snarks.keySet()); } } @@ -543,16 +543,18 @@ public class SnarkManager implements Snark.CompleteListener { return "Too many files in " + info.getName() + " (" + files.size() + "), deleting it"; } else if (info.getPieces() <= 0) { return "No pieces in " + info.getName() + "? deleting it"; - } else if (info.getPieceLength(0) > 1*1024*1024) { - return "Pieces are too large in " + info.getName() + " (" + info.getPieceLength(0)/1024 + "KB), deleting it"; - } else if (info.getTotalLength() > 10*1024*1024*1024l) { + } else if (info.getPieceLength(0) > Storage.MAX_PIECE_SIZE) { + return "Pieces are too large in " + info.getName() + " (" + DataHelper.formatSize(info.getPieceLength(0)) + + "B), deleting it"; + } else if (info.getTotalLength() > Storage.MAX_TOTAL_SIZE) { System.out.println("torrent info: " + info.toString()); List lengths = info.getLengths(); if (lengths != null) for (int i = 0; i < lengths.size(); i++) System.out.println("File " + i + " is " + lengths.get(i) + " long"); - return "Torrents larger than 10GB are not supported yet (because we're paranoid): " + info.getName() + ", deleting it"; + return "Torrents larger than " + DataHelper.formatSize(Storage.MAX_TOTAL_SIZE) + + "B are not supported yet (because we're paranoid): " + info.getName() + ", deleting it"; } else { // ok return null; @@ -637,8 +639,7 @@ public class SnarkManager implements Snark.CompleteListener { public void torrentComplete(Snark snark) { File f = new File(snark.torrent); long len = snark.meta.getTotalLength(); - addMessage("Download complete of " + f.getName() - + (len < 5*1024*1024 ? " (size: " + (len/1024) + "KB)" : " (size: " + (len/(1024*1024l)) + "MB)")); + addMessage("Download complete of " + f.getName() + " (size: " + DataHelper.formatSize(len) + "B)"); updateStatus(snark); } diff --git a/apps/i2psnark/java/src/org/klomp/snark/Storage.java b/apps/i2psnark/java/src/org/klomp/snark/Storage.java index 69e5a198f56a5117b800c44200554b9937f60811..51b4f97ebb67776c353070279af35ab057418368 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Storage.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Storage.java @@ -56,10 +56,11 @@ public class Storage boolean changed; /** The default piece size. */ - private static int MIN_PIECE_SIZE = 256*1024; - private static int MAX_PIECE_SIZE = 1024*1024; + private static final int MIN_PIECE_SIZE = 256*1024; + public static final int MAX_PIECE_SIZE = 1024*1024; /** The maximum number of pieces in a torrent. */ - private static long MAX_PIECES = 100*1024/20; + public static final int MAX_PIECES = 10*1024; + public static final long MAX_TOTAL_SIZE = MAX_PIECE_SIZE * (long) MAX_PIECES; /** * Creates a new storage based on the supplied MetaInfo. This will diff --git a/core/java/src/net/i2p/util/EepGet.java b/core/java/src/net/i2p/util/EepGet.java index 5f0e8d5e9fcdfcf9ed1470c3d477324ef6cc1b8b..c3c18866a7d251cde86d187735d02fa4cb80c668 100644 --- a/core/java/src/net/i2p/util/EepGet.java +++ b/core/java/src/net/i2p/util/EepGet.java @@ -159,6 +159,9 @@ public class EepGet { markSize = Integer.parseInt(args[i+1]); lineLen = Integer.parseInt(args[i+2]); i += 2; + } else if (args[i].startsWith("-")) { + usage(); + return; } else { url = args[i]; } diff --git a/router/java/src/net/i2p/router/Blocklist.java b/router/java/src/net/i2p/router/Blocklist.java index 791cc333c0189cbda314d0b7b52e9046b031263a..7546ac4a713494b9b61de6da9efeda60996bb70a 100644 --- a/router/java/src/net/i2p/router/Blocklist.java +++ b/router/java/src/net/i2p/router/Blocklist.java @@ -19,6 +19,7 @@ import net.i2p.data.Hash; import net.i2p.data.RouterAddress; import net.i2p.data.RouterInfo; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; /** @@ -55,20 +56,16 @@ public class Blocklist { private int _blocklistSize; private final Object _lock = new Object(); private Entry _wrapSave; - private final Set _inProcess = new HashSet(0); - private Map _peerBlocklist = new HashMap(0); - private final Set _singleIPBlocklist = new HashSet(0); + private final Set<Hash> _inProcess = new HashSet(0); + private Map<Hash, String> _peerBlocklist = new HashMap(0); + private final Set<Integer> _singleIPBlocklist = new ConcurrentHashSet(0); public Blocklist(RouterContext context) { _context = context; _log = context.logManager().getLog(Blocklist.class); _blocklist = null; _blocklistSize = 0; - // _lock = new Object(); _wrapSave = null; - // _inProcess = new HashSet(0); - // _peerBlocklist = new HashMap(0); - // _singleIPBlocklist = new HashSet(0); } public Blocklist() { @@ -446,15 +443,11 @@ public class Blocklist { } private boolean add(int ip) { - synchronized(_singleIPBlocklist) { - return _singleIPBlocklist.add(new Integer(ip)); - } + return _singleIPBlocklist.add(Integer.valueOf(ip)); } private boolean isOnSingleList(int ip) { - synchronized(_singleIPBlocklist) { - return _singleIPBlocklist.contains(new Integer(ip)); - } + return _singleIPBlocklist.contains(Integer.valueOf(ip)); } /** @@ -586,11 +579,11 @@ public class Blocklist { // methods to get and store the from/to values in the array - private int getFrom(long entry) { + private static int getFrom(long entry) { return (int) ((entry >> 32) & 0xffffffff); } - private int getTo(long entry) { + private static int getTo(long entry) { return (int) (entry & 0xffffffff); } @@ -602,7 +595,7 @@ public class Blocklist { * So the size is (cough) almost 2MB for the 240,000 line splist.txt. * */ - private long toEntry(byte ip1[], byte ip2[]) { + private static long toEntry(byte ip1[], byte ip2[]) { long entry = 0; for (int i = 0; i < 4; i++) entry |= ((long) (ip2[i] & 0xff)) << ((3-i)*8); @@ -621,14 +614,18 @@ public class Blocklist { _blocklist[idx] = entry; } - private int toInt(byte ip[]) { + private static int toInt(byte ip[]) { int rv = 0; for (int i = 0; i < 4; i++) rv |= (ip[i] & 0xff) << ((3-i)*8); return rv; } - private String toStr(long entry) { + public static String toStr(byte[] ip) { + return toStr(toInt(ip)); + } + + private static String toStr(long entry) { StringBuffer buf = new StringBuffer(32); for (int i = 7; i >= 0; i--) { buf.append((entry >> (8*i)) & 0xff); @@ -640,7 +637,7 @@ public class Blocklist { return buf.toString(); } - private String toStr(int ip) { + private static String toStr(int ip) { StringBuffer buf = new StringBuffer(16); for (int i = 3; i >= 0; i--) { buf.append((ip >> (8*i)) & 0xff); @@ -756,9 +753,7 @@ public class Blocklist { public void renderStatusHTML(Writer out) throws IOException { out.write("<h2>IP Blocklist</h2>"); Set singles = new TreeSet(); - synchronized(_singleIPBlocklist) { - singles.addAll(_singleIPBlocklist); - } + singles.addAll(_singleIPBlocklist); if (singles.size() > 0) { out.write("<table><tr><td><b>Transient IPs</b></td></tr>"); for (Iterator iter = singles.iterator(); iter.hasNext(); ) { diff --git a/router/java/src/net/i2p/router/CommSystemFacade.java b/router/java/src/net/i2p/router/CommSystemFacade.java index 6d0927c63ec51cb039a6120ffecf3cf0a92211f9..f3535cf4c89b108292fdd7cbcd1202a7e8510982 100644 --- a/router/java/src/net/i2p/router/CommSystemFacade.java +++ b/router/java/src/net/i2p/router/CommSystemFacade.java @@ -58,6 +58,7 @@ public abstract class CommSystemFacade implements Service { public boolean isBacklogged(Hash dest) { return false; } public boolean wasUnreachable(Hash dest) { return false; } public boolean isEstablished(Hash dest) { return false; } + public byte[] getIP(Hash dest) { return null; } /** * Tell other transports our address changed diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 089d5f1e7b4109104f255781735621ace530df07..906df20368d1e6700f196450a7fd911b0a43bf6a 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -19,6 +19,8 @@ import java.util.Properties; import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantReadWriteLock; import net.i2p.data.Hash; import net.i2p.data.RouterAddress; @@ -40,17 +42,17 @@ public class ProfileOrganizer { private Log _log; private RouterContext _context; /** H(routerIdentity) to PeerProfile for all peers that are fast and high capacity*/ - private Map _fastPeers; + private Map<Hash, PeerProfile> _fastPeers; /** H(routerIdentity) to PeerProfile for all peers that have high capacities */ - private Map _highCapacityPeers; + private Map<Hash, PeerProfile> _highCapacityPeers; /** H(routerIdentity) to PeerProfile for all peers that well integrated into the network and not failing horribly */ - private Map _wellIntegratedPeers; + private Map<Hash, PeerProfile> _wellIntegratedPeers; /** H(routerIdentity) to PeerProfile for all peers that are not failing horribly */ - private Map _notFailingPeers; + private Map<Hash, PeerProfile> _notFailingPeers; /** H(routerIdnetity), containing elements in _notFailingPeers */ - private List _notFailingPeersList; + private List<Hash> _notFailingPeersList; /** H(routerIdentity) to PeerProfile for all peers that ARE failing horribly (but that we haven't dropped reference to yet) */ - private Map _failingPeers; + private Map<Hash, PeerProfile> _failingPeers; /** who are we? */ private Hash _us; private ProfilePersistenceHelper _persistenceHelper; @@ -84,7 +86,7 @@ public class ProfileOrganizer { public static final int DEFAULT_MINIMUM_HIGH_CAPACITY_PEERS = 10; /** synchronized against this lock when updating the tier that peers are located in (and when fetching them from a peer) */ - private final Object _reorganizeLock = new Object(); + private final ReentrantReadWriteLock _reorganizeLock = new ReentrantReadWriteLock(true); /** incredibly weak PRNG, just used for shuffling peers. no need to waste the real PRNG on this */ private Random _random = new Random(); @@ -112,6 +114,29 @@ public class ProfileOrganizer { _context.statManager().createRateStat("peer.profileReorgTime", "How long the reorg takes overall", "Peers", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); } + private void getReadLock() { + _reorganizeLock.readLock().lock(); + } + + private void releaseReadLock() { + _reorganizeLock.readLock().unlock(); + } + + /** @return true if the lock was acquired */ + private boolean getWriteLock() { + try { + boolean rv = _reorganizeLock.writeLock().tryLock(5000, TimeUnit.MILLISECONDS); + if (!rv) + _log.error("no lock, size is: " + _reorganizeLock.getQueueLength(), new Exception("rats")); + return rv; + } catch (InterruptedException ie) {} + return false; + } + + private void releaseWriteLock() { + _reorganizeLock.writeLock().unlock(); + } + public void setUs(Hash us) { _us = us; } Hash getUs() { return _us; } @@ -124,42 +149,52 @@ public class ProfileOrganizer { * */ public PeerProfile getProfile(Hash peer) { - synchronized (_reorganizeLock) { + getReadLock(); + try { return locked_getProfile(peer); - } + } finally { releaseReadLock(); } } /** * Add the new profile, returning the old value (or null if no profile existed) * */ - public PeerProfile addProfile(PeerProfile profile) throws IllegalStateException { + public PeerProfile addProfile(PeerProfile profile) { if ( (profile == null) || (profile.getPeer() == null) ) return null; if (_log.shouldLog(Log.DEBUG)) _log.debug("New profile created for " + profile.getPeer().toBase64()); - synchronized (_reorganizeLock) { - PeerProfile old = locked_getProfile(profile.getPeer()); - profile.coalesceStats(); + PeerProfile old = getProfile(profile.getPeer()); + profile.coalesceStats(); + if (!getWriteLock()) + return old; + try { locked_placeProfile(profile); _strictCapacityOrder.add(profile); - return old; - } + } finally { releaseWriteLock(); } + return old; } - public int countFastPeers() { synchronized (_reorganizeLock) { return _fastPeers.size(); } } - public int countHighCapacityPeers() { synchronized (_reorganizeLock) { return _highCapacityPeers.size(); } } - public int countWellIntegratedPeers() { synchronized (_reorganizeLock) { return _wellIntegratedPeers.size(); } } - public int countNotFailingPeers() { synchronized (_reorganizeLock) { return _notFailingPeers.size(); } } - public int countFailingPeers() { synchronized (_reorganizeLock) { return _failingPeers.size(); } } + private int count(Map m) { + getReadLock(); + try { + return m.size(); + } finally { releaseReadLock(); } + } + + public int countFastPeers() { return count(_fastPeers); } + public int countHighCapacityPeers() { return count(_highCapacityPeers); } + public int countWellIntegratedPeers() { return count(_wellIntegratedPeers); } + public int countNotFailingPeers() { return count(_notFailingPeers); } + public int countFailingPeers() { return count(_failingPeers); } public int countActivePeers() { - synchronized (_reorganizeLock) { - int activePeers = 0; - - long hideBefore = _context.clock().now() - 6*60*60*1000; - + int activePeers = 0; + long hideBefore = _context.clock().now() - 6*60*60*1000; + + getReadLock(); + try { for (Iterator iter = _failingPeers.values().iterator(); iter.hasNext(); ) { PeerProfile profile = (PeerProfile)iter.next(); if (profile.getLastSendSuccessful() >= hideBefore) @@ -174,15 +209,21 @@ public class ProfileOrganizer { else if (profile.getLastHeardFrom() >= hideBefore) activePeers++; } - - return activePeers; - } + } finally { releaseReadLock(); } + return activePeers; } - public boolean isFast(Hash peer) { synchronized (_reorganizeLock) { return _fastPeers.containsKey(peer); } } - public boolean isHighCapacity(Hash peer) { synchronized (_reorganizeLock) { return _highCapacityPeers.containsKey(peer); } } - public boolean isWellIntegrated(Hash peer) { synchronized (_reorganizeLock) { return _wellIntegratedPeers.containsKey(peer); } } - public boolean isFailing(Hash peer) { synchronized (_reorganizeLock) { return _failingPeers.containsKey(peer); } } + private boolean isX(Map m, Hash peer) { + getReadLock(); + try { + return m.containsKey(peer); + } finally { releaseReadLock(); } + } + + public boolean isFast(Hash peer) { return isX(_fastPeers, peer); } + public boolean isHighCapacity(Hash peer) { return isX(_highCapacityPeers, peer); } + public boolean isWellIntegrated(Hash peer) { return isX(_wellIntegratedPeers, peer); } + public boolean isFailing(Hash peer) { return isX(_failingPeers, peer); } /** * if a peer sends us more than 5 replies in a searchReply that we cannot @@ -236,9 +277,10 @@ public class ProfileOrganizer { selectFastPeers(howMany, exclude, matches, 0); } public void selectFastPeers(int howMany, Set exclude, Set matches, int mask) { - synchronized (_reorganizeLock) { + getReadLock(); + try { locked_selectPeers(_fastPeers, howMany, exclude, matches, mask); - } + } finally { releaseReadLock(); } if (matches.size() < howMany) { if (_log.shouldLog(Log.INFO)) _log.info("selectFastPeers("+howMany+"), not enough fast (" + matches.size() + ") going on to highCap"); @@ -258,7 +300,8 @@ public class ProfileOrganizer { selectHighCapacityPeers(howMany, exclude, matches, 0); } public void selectHighCapacityPeers(int howMany, Set exclude, Set matches, int mask) { - synchronized (_reorganizeLock) { + getReadLock(); + try { // we only use selectHighCapacityPeers when we are selecting for PURPOSE_TEST // or we are falling back due to _fastPeers being too small, so we can always // exclude the fast peers @@ -269,7 +312,7 @@ public class ProfileOrganizer { exclude.addAll(_fastPeers.keySet()); */ locked_selectPeers(_highCapacityPeers, howMany, exclude, matches, mask); - } + } finally { releaseReadLock(); } if (matches.size() < howMany) { if (_log.shouldLog(Log.INFO)) _log.info("selectHighCap("+howMany+"), not enough fast (" + matches.size() + ") going on to notFailing"); @@ -288,9 +331,10 @@ public class ProfileOrganizer { selectWellIntegratedPeers(howMany, exclude, matches, 0); } public void selectWellIntegratedPeers(int howMany, Set exclude, Set matches, int mask) { - synchronized (_reorganizeLock) { + getReadLock(); + try { locked_selectPeers(_wellIntegratedPeers, howMany, exclude, matches, mask); - } + } finally { releaseReadLock(); } if (matches.size() < howMany) { if (_log.shouldLog(Log.INFO)) _log.info("selectWellIntegrated("+howMany+"), not enough integrated (" + matches.size() + ") going on to notFailing"); @@ -375,7 +419,8 @@ public class ProfileOrganizer { int needed = howMany - orig; int start = 0; List selected = new ArrayList(needed); - synchronized (_reorganizeLock) { + getReadLock(); + try { // we randomize the whole list when rebuilding it, but randomizing // the entire list on each peer selection is a bit crazy start = _context.random().nextInt(_notFailingPeersList.size()); @@ -397,7 +442,7 @@ public class ProfileOrganizer { _log.debug("Not selectable: " + cur.toBase64()); } } - } + } finally { releaseReadLock(); } if (_log.shouldLog(Log.INFO)) _log.info("Selecting all not failing (strict? " + onlyNotFailing + " start=" + start + ") found " + selected.size() + " new peers: " + selected + " all=" + _notFailingPeersList.size() + " strict=" + _strictCapacityOrder.size()); @@ -418,25 +463,27 @@ public class ProfileOrganizer { * */ public void selectFailingPeers(int howMany, Set exclude, Set matches) { - synchronized (_reorganizeLock) { + getReadLock(); + try { locked_selectPeers(_failingPeers, howMany, exclude, matches); - } - return; - } - - /** + } finally { releaseReadLock(); } + return; + } + + /** * Get the peers the transport layer thinks are unreachable, and * add in the peers with the SSU peer testing bug, * and peers requiring introducers. - * - */ + * + */ public List selectPeersLocallyUnreachable() { List n; int count; - synchronized (_reorganizeLock) { + getReadLock(); + try { count = _notFailingPeers.size(); n = new ArrayList(_notFailingPeers.keySet()); - } + } finally { releaseReadLock(); } List l = new ArrayList(count / 4); for (Iterator iter = n.iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); @@ -483,7 +530,8 @@ public class ProfileOrganizer { * */ public List selectPeersRecentlyRejecting() { - synchronized (_reorganizeLock) { + getReadLock(); + try { long cutoff = _context.clock().now() - (20*1000); int count = _notFailingPeers.size(); List l = new ArrayList(count / 128); @@ -493,7 +541,7 @@ public class ProfileOrganizer { l.add(prof.getPeer()); } return l; - } + } finally { releaseReadLock(); } } /** @@ -501,14 +549,15 @@ public class ProfileOrganizer { * */ public Set selectAllPeers() { - synchronized (_reorganizeLock) { + getReadLock(); + try { Set allPeers = new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); allPeers.addAll(_failingPeers.keySet()); allPeers.addAll(_notFailingPeers.keySet()); allPeers.addAll(_highCapacityPeers.keySet()); allPeers.addAll(_fastPeers.keySet()); return allPeers; - } + } finally { releaseReadLock(); } } /** @@ -532,8 +581,10 @@ public class ProfileOrganizer { expireOlderThan = _context.clock().now() - 6*60*60*1000; } + if (!getWriteLock()) + return; long start = System.currentTimeMillis(); - synchronized (_reorganizeLock) { + try { Set allPeers = _strictCapacityOrder; //new HashSet(_failingPeers.size() + _notFailingPeers.size() + _highCapacityPeers.size() + _fastPeers.size()); //allPeers.addAll(_failingPeers.values()); //allPeers.addAll(_notFailingPeers.values()); @@ -557,35 +608,37 @@ public class ProfileOrganizer { } sortTime = System.currentTimeMillis() - sortStart; _strictCapacityOrder = reordered; - + long thresholdStart = System.currentTimeMillis(); locked_calculateThresholds(allPeers); thresholdTime = System.currentTimeMillis()-thresholdStart; - + _failingPeers.clear(); _fastPeers.clear(); _highCapacityPeers.clear(); _notFailingPeers.clear(); _notFailingPeersList.clear(); _wellIntegratedPeers.clear(); - + long placeStart = System.currentTimeMillis(); - + for (Iterator iter = allPeers.iterator(); iter.hasNext(); ) { PeerProfile profile = (PeerProfile)iter.next(); locked_placeProfile(profile); } - + locked_unfailAsNecessary(); locked_promoteFastAsNecessary(); Collections.shuffle(_notFailingPeersList, _context.random()); - + placeTime = System.currentTimeMillis()-placeStart; + } finally { releaseWriteLock(); } - if (_log.shouldLog(Log.INFO)) - _log.info("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue - + ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]"); + + if (_log.shouldLog(Log.INFO)) + _log.info("Profiles reorganized. averages: [integration: " + _thresholdIntegrationValue + + ", capacity: " + _thresholdCapacityValue + ", speed: " + _thresholdSpeedValue + "]"); /***** if (_log.shouldLog(Log.DEBUG)) { StringBuffer buf = new StringBuffer(512); @@ -597,7 +650,6 @@ public class ProfileOrganizer { _log.debug("fast: " + _fastPeers.values()); } *****/ - } long total = System.currentTimeMillis()-start; _context.statManager().addRateData("peer.profileSortTime", sortTime, profileCount); @@ -899,11 +951,12 @@ public class ProfileOrganizer { all.removeAll(matches); all.remove(_us); Collections.shuffle(all, _random); + Set IPSet = new HashSet(8); for (int i = 0; (matches.size() < howMany) && (i < all.size()); i++) { Hash peer = (Hash)all.get(i); boolean ok = isSelectable(peer); if (ok) { - ok = mask <= 0 || notRestricted(peer, matches, mask); + ok = mask <= 0 || notRestricted(peer, IPSet, mask); if ((!ok) && _log.shouldLog(Log.WARN)) _log.warn("IP restriction prevents " + peer + " from joining " + matches); } @@ -917,77 +970,67 @@ public class ProfileOrganizer { /** * Does the peer's IP address NOT match the IP address of any peer already in the set, * on any transport, within a given mask? - * mask is 1-4 (number of bytes to match) or 0 to disable - * Perhaps rewrite this to just make a set of all the IP addresses rather than loop. + * @param mask is 1-4 (number of bytes to match) + * @param IPMatches all IPs so far, modified by this routine */ - private boolean notRestricted(Hash peer, Set matches, int mask) { - if (mask <= 0) return true; - if (matches.size() <= 0) return true; - RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer); - if (pinfo == null) return false; - Set paddr = pinfo.getAddresses(); - if (paddr == null || paddr.size() == 0) + private boolean notRestricted(Hash peer, Set IPSet, int mask) { + Set peerIPs = maskedIPSet(peer, mask); + if (containsAny(IPSet, peerIPs)) return false; - List pladdr = new ArrayList(paddr); - List lmatches = new ArrayList(matches); - // for each match - for (int i = 0; i < matches.size(); i++) { - RouterInfo minfo = _context.netDb().lookupRouterInfoLocally((Hash) lmatches.get(i)); - if (minfo == null) continue; - Set maddr = minfo.getAddresses(); - if (maddr == null || maddr.size() == 0) + IPSet.addAll(peerIPs); + return true; + } + + /** + * The Set of IPs for this peer, with a given mask. + * Includes the comm system's record of the IP, and all netDb addresses. + * + * @return an opaque set of masked IPs for this peer + */ + private Set maskedIPSet(Hash peer, int mask) { + Set rv = new HashSet(2); + byte[] commIP = _context.commSystem().getIP(peer); + if (commIP != null) + rv.add(maskedIP(commIP, mask)); + RouterInfo pinfo = _context.netDb().lookupRouterInfoLocally(peer); + if (pinfo == null) + return rv; + Set<RouterAddress> paddr = pinfo.getAddresses(); + if (paddr == null) + return rv; + for (RouterAddress pa : paddr) { + Properties pprops = pa.getOptions(); + if (pprops == null) continue; + String phost = pprops.getProperty("host"); + if (phost == null) continue; + InetAddress pi; + try { + pi = InetAddress.getByName(phost); + } catch (UnknownHostException uhe) { continue; - List mladdr = new ArrayList(maddr); - String oldphost = null; - // for each peer address - for (int j = 0; j < paddr.size(); j++) { - RouterAddress pa = (RouterAddress) pladdr.get(j); - if (pa == null) continue; - Properties pprops = pa.getOptions(); - if (pprops == null) continue; - String phost = pprops.getProperty("host"); - if (phost == null) continue; - if (oldphost != null && oldphost.equals(phost)) continue; - oldphost = phost; - InetAddress pi; - try { - pi = InetAddress.getByName(phost); - } catch (UnknownHostException uhe) { - continue; - } - if (pi == null) continue; - byte[] pib = pi.getAddress(); - String oldmhost = null; - // for each match address - for (int k = 0; k < maddr.size(); k++) { - RouterAddress ma = (RouterAddress) mladdr.get(k); - if (ma == null) continue; - Properties mprops = ma.getOptions(); - if (mprops == null) continue; - String mhost = mprops.getProperty("host"); - if (mhost == null) continue; - if (oldmhost != null && oldmhost.equals(mhost)) continue; - oldmhost = mhost; - InetAddress mi; - try { - mi = InetAddress.getByName(mhost); - } catch (UnknownHostException uhe) { - continue; - } - if (mi == null) continue; - byte[] mib = mi.getAddress(); - // assume ipv4, compare 1 to 4 bytes - // log.info("Comparing " + pi + " with " + mi); - for (int m = 0; m < mask; m++) { - if (pib[m] != mib[m]) - break; - if (m == mask-1) - return false; // IP match - } - } } + if (pi == null) continue; + byte[] pib = pi.getAddress(); + rv.add(maskedIP(pib, mask)); } - return true; + return rv; + } + + /** generate an arbitrary unique value for this ip/mask (mask = 1-4) */ + private Integer maskedIP(byte[] ip, int mask) { + int rv = 0; + for (int i = 0; i < mask; i++) + rv = (rv << 8) | (ip[i] & 0xff); + return Integer.valueOf(rv); + } + + /** does a contain any of the elements in b? */ + private boolean containsAny(Set a, Set b) { + for (Object o : b) { + if (a.contains(o)) + return true; + } + return false; } public boolean isSelectable(Hash peer) { diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 48864d3efce05457bcae7c6565103e2e7d8c9c6f..2d3a8504d0f8712d26256f7faa7907db33511eaf 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -36,6 +36,8 @@ import net.i2p.router.RouterContext; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; +import net.i2p.util.SimpleScheduler; +import net.i2p.util.SimpleTimer; /** * Defines a way to send a message to another peer and start listening for messages @@ -72,6 +74,7 @@ public abstract class TransportImpl implements Transport { _unreachableEntries = new HashMap(16); _wasUnreachableEntries = new ConcurrentHashSet(16); _currentAddress = null; + SimpleScheduler.getInstance().addPeriodicEvent(new CleanupUnreachable(), 2 * UNREACHABLE_PERIOD, UNREACHABLE_PERIOD / 2); } /** @@ -462,13 +465,10 @@ public abstract class TransportImpl implements Transport { if (!isInbound) markWasUnreachable(peer, false); } - private class CleanupUnreachable extends JobImpl { - public CleanupUnreachable(RouterContext ctx) { - super(ctx); - } - public String getName() { return "Cleanup " + getStyle() + " unreachable list"; } - public void runJob() { - long now = getContext().clock().now(); + + private class CleanupUnreachable implements SimpleTimer.TimedEvent { + public void timeReached() { + long now = _context.clock().now(); synchronized (_unreachableEntries) { for (Iterator iter = _unreachableEntries.keySet().iterator(); iter.hasNext(); ) { Hash peer = (Hash)iter.next(); @@ -477,7 +477,6 @@ public abstract class TransportImpl implements Transport { iter.remove(); } } - requeue(60*1000); } } diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 95152c6a07519c3c00f302bb384619835c495deb..3acabafdc48a809aa199d060c90230684eda25d0 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -398,6 +398,8 @@ public class NTCPConnection implements FIFOBandwidthLimiter.CompleteListener { continue; RouterInfo info = fac.lookupRouterInfoLocally(peer); + if (info == null) + continue; OutNetMessage infoMsg = new OutNetMessage(_context); infoMsg.setExpiration(_context.clock().now()+10*1000); diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 7261845c1a90892c9ba5b7384bc14b0bb5b4c521..43e1d13144de93911a3c84eccdb6c1fac94a16ff 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -106,7 +106,18 @@ public class FragmentHandler { if (_log.shouldLog(Log.ERROR)) _log.error("Corrupt fragment received: offset = " + offset, e); _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); - throw e; + // java.lang.IllegalStateException: wtf, don't get the completed size when we're not complete - null fragment i=0 of 1 + // at net.i2p.router.tunnel.FragmentedMessage.getCompleteSize(FragmentedMessage.java:194) + // at net.i2p.router.tunnel.FragmentedMessage.toByteArray(FragmentedMessage.java:223) + // at net.i2p.router.tunnel.FragmentHandler.receiveComplete(FragmentHandler.java:380) + // at net.i2p.router.tunnel.FragmentHandler.receiveSubsequentFragment(FragmentHandler.java:353) + // at net.i2p.router.tunnel.FragmentHandler.receiveFragment(FragmentHandler.java:208) + // at net.i2p.router.tunnel.FragmentHandler.receiveTunnelMessage(FragmentHandler.java:92) + // ... + // still trying to find root cause + // let's limit the damage here and skip the: + // .transport.udp.MessageReceiver: b0rked receiving a message.. wazza huzza hmm? + //throw e; } finally { // each of the FragmentedMessages populated make a copy out of the // payload, which they release separately, so we can release