propagate from branch 'i2p.i2p' (head 871765966dc474b763ff0d5c017bed7535981c1e)

to branch 'i2p.i2p.zzz.test' (head 096242c22aa550274cb383a6a0c984cef07ae08c)
This commit is contained in:
zzz
2012-08-31 14:41:18 +00:00
83 changed files with 522 additions and 213 deletions

View File

@@ -53,7 +53,7 @@ class KBucketImpl<T extends SimpleDataStructure> implements KBucket<T> {
/** include if no bits higher than this bit (inclusive) are set */
private final int _end;
private final int _max;
private final KBucketSet.KBucketTrimmer _trimmer;
private final KBucketTrimmer _trimmer;
/** when did we last shake things up */
private long _lastChanged;
private final I2PAppContext _context;
@@ -62,7 +62,7 @@ class KBucketImpl<T extends SimpleDataStructure> implements KBucket<T> {
* All entries in this bucket will have at least one bit different
* from us in the range [begin, end] inclusive.
*/
public KBucketImpl(I2PAppContext context, int begin, int end, int max, KBucketSet.KBucketTrimmer trimmer) {
public KBucketImpl(I2PAppContext context, int begin, int end, int max, KBucketTrimmer trimmer) {
if (begin > end)
throw new IllegalArgumentException(begin + " > " + end);
_context = context;

View File

@@ -768,70 +768,6 @@ public class KBucketSet<T extends SimpleDataStructure> {
}
}
/**
* Called when a kbucket can no longer be split and is too big
*/
public interface KBucketTrimmer<K extends SimpleDataStructure> {
/**
* Called from add() just before adding the entry.
* You may call getEntries() and/or remove() from here.
* Do NOT call add().
* To always discard a newer entry, always return false.
*
* @param kbucket the kbucket that is now too big
* @return true to actually add the entry.
*/
public boolean trim(KBucket<K> kbucket, K toAdd);
}
/**
* Removes a random element. Not resistant to flooding.
*/
public static class RandomTrimmer<T extends SimpleDataStructure> implements KBucketTrimmer<T> {
protected final I2PAppContext _ctx;
private final int _max;
public RandomTrimmer(I2PAppContext ctx, int max) {
_ctx = ctx;
_max = max;
}
public boolean trim(KBucket<T> kbucket, T toAdd) {
List<T> e = new ArrayList(kbucket.getEntries());
int sz = e.size();
// concurrency
if (sz < _max)
return true;
T toRemove = e.get(_ctx.random().nextInt(sz));
return kbucket.remove(toRemove);
}
}
/**
* Removes a random element, but only if the bucket hasn't changed in 5 minutes.
*/
public static class RandomIfOldTrimmer<T extends SimpleDataStructure> extends RandomTrimmer<T> {
public RandomIfOldTrimmer(I2PAppContext ctx, int max) {
super(ctx, max);
}
public boolean trim(KBucket<T> kbucket, T toAdd) {
if (kbucket.getLastChanged() > _ctx.clock().now() - 5*60*1000)
return false;
return super.trim(kbucket, toAdd);
}
}
/**
* Removes nothing and always rejects the add. Flood resistant..
*/
public static class RejectTrimmer<T extends SimpleDataStructure> implements KBucketTrimmer<T> {
public boolean trim(KBucket<T> kbucket, T toAdd) {
return false;
}
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder(1024);

View File

@@ -0,0 +1,20 @@
package net.i2p.kademlia;
import net.i2p.data.SimpleDataStructure;
/**
* Called when a kbucket can no longer be split and is too big
* @since 0.9.2
*/
public interface KBucketTrimmer<K extends SimpleDataStructure> {
/**
* Called from add() just before adding the entry.
* You may call getEntries() and/or remove() from here.
* Do NOT call add().
* To always discard a newer entry, always return false.
*
* @param kbucket the kbucket that is now too big
* @return true to actually add the entry.
*/
public boolean trim(KBucket<K> kbucket, K toAdd);
}

View File

@@ -0,0 +1,21 @@
package net.i2p.kademlia;
import net.i2p.I2PAppContext;
import net.i2p.data.SimpleDataStructure;
/**
* Removes a random element, but only if the bucket hasn't changed in 5 minutes.
* @since 0.9.2
*/
public class RandomIfOldTrimmer<T extends SimpleDataStructure> extends RandomTrimmer<T> {
public RandomIfOldTrimmer(I2PAppContext ctx, int max) {
super(ctx, max);
}
public boolean trim(KBucket<T> kbucket, T toAdd) {
if (kbucket.getLastChanged() > _ctx.clock().now() - 5*60*1000)
return false;
return super.trim(kbucket, toAdd);
}
}

View File

@@ -0,0 +1,31 @@
package net.i2p.kademlia;
import java.util.ArrayList;
import java.util.List;
import net.i2p.I2PAppContext;
import net.i2p.data.SimpleDataStructure;
/**
* Removes a random element. Not resistant to flooding.
* @since 0.9.2
*/
public class RandomTrimmer<T extends SimpleDataStructure> implements KBucketTrimmer<T> {
protected final I2PAppContext _ctx;
private final int _max;
public RandomTrimmer(I2PAppContext ctx, int max) {
_ctx = ctx;
_max = max;
}
public boolean trim(KBucket<T> kbucket, T toAdd) {
List<T> e = new ArrayList(kbucket.getEntries());
int sz = e.size();
// concurrency
if (sz < _max)
return true;
T toRemove = e.get(_ctx.random().nextInt(sz));
return kbucket.remove(toRemove);
}
}

View File

@@ -0,0 +1,13 @@
package net.i2p.kademlia;
import net.i2p.data.SimpleDataStructure;
/**
* Removes nothing and always rejects the add. Flood resistant..
* @since 0.9.2
*/
public class RejectTrimmer<T extends SimpleDataStructure> implements KBucketTrimmer<T> {
public boolean trim(KBucket<T> kbucket, T toAdd) {
return false;
}
}

View File

@@ -338,7 +338,7 @@ abstract class ExtensionHandler {
System.arraycopy(ids, off, hash, 0, HASH_LENGTH);
if (DataHelper.eq(hash, peer.getPeerID().getDestHash()))
continue;
PeerID pID = new PeerID(hash);
PeerID pID = new PeerID(hash, listener.getUtil());
peers.add(pID);
}
// could include ourselves, listener must remove

View File

@@ -1435,5 +1435,13 @@ class PeerCoordinator implements PeerListener
return listener.overUpBWLimit(total * 1000 / CHECK_PERIOD);
return false;
}
/**
* Convenience
* @since 0.9.2
*/
public I2PSnarkUtil getUtil() {
return _util;
}
}

View File

@@ -52,6 +52,7 @@ public class PeerID implements Comparable
/** whether we have tried to get the dest from the hash - only do once */
private boolean triedDestLookup;
private final int hash;
private final I2PSnarkUtil util;
public PeerID(byte[] id, Destination address)
{
@@ -60,6 +61,7 @@ public class PeerID implements Comparable
this.port = 6881;
this.destHash = address.calculateHash().getData();
hash = calculateHash();
util = null;
}
/**
@@ -93,13 +95,15 @@ public class PeerID implements Comparable
port = 6881;
this.destHash = address.calculateHash().getData();
hash = calculateHash();
util = null;
}
/**
* Creates a PeerID from a destHash
* @param util for eventual destination lookup
* @since 0.8.1
*/
public PeerID(byte[] dest_hash) throws InvalidBEncodingException
public PeerID(byte[] dest_hash, I2PSnarkUtil util) throws InvalidBEncodingException
{
// id and address remain null
port = 6881;
@@ -107,6 +111,7 @@ public class PeerID implements Comparable
throw new InvalidBEncodingException("bad hash length");
destHash = dest_hash;
hash = DataHelper.hashCode(dest_hash);
this.util = util;
}
public byte[] getID()
@@ -131,7 +136,7 @@ public class PeerID implements Comparable
{
if (address == null && destHash != null && !triedDestLookup) {
String b32 = Base32.encode(destHash) + ".b32.i2p";
address = I2PAppContext.getGlobalContext().namingService().lookup(b32);
address = util.getDestination(b32);
triedDestLookup = true;
}
return address;

View File

@@ -207,4 +207,10 @@ interface PeerListener
* @since 0.8.4
*/
void gotPeers(Peer peer, List<PeerID> pIDList);
/**
* Convenience
* @since 0.9.2
*/
public I2PSnarkUtil getUtil();
}

View File

@@ -1484,7 +1484,7 @@ public class SnarkManager implements Snark.CompleteListener {
byte[] ih = Base64.decode(b64);
// ignore value - TODO put tracker URL in value
if (ih != null && ih.length == 20)
addMagnet("* " + _("Magnet") + ' ' + I2PSnarkUtil.toHex(ih), ih, null, false);
addMagnet(_("Magnet") + ' ' + I2PSnarkUtil.toHex(ih), ih, null, false);
// else remove from config?
}
}

View File

@@ -487,7 +487,7 @@ public class TrackerClient implements Runnable {
if ((!stop) && !hashes.isEmpty()) {
List<Peer> peers = new ArrayList(hashes.size());
for (Hash h : hashes) {
PeerID pID = new PeerID(h.getData());
PeerID pID = new PeerID(h.getData(), _util);
peers.add(new Peer(pID, snark.getID(), snark.getInfoHash(), snark.getMetaInfo()));
}
Collections.shuffle(peers, r);
@@ -651,7 +651,7 @@ public class TrackerClient implements Runnable {
in = new FileInputStream(fetched);
TrackerInfo info = new TrackerInfo(in, snark.getID(),
snark.getInfoHash(), snark.getMetaInfo());
snark.getInfoHash(), snark.getMetaInfo(), _util);
if (_log.shouldLog(Log.INFO))
_log.info("TrackerClient response: " + info);

View File

@@ -47,19 +47,19 @@ class TrackerInfo
private int incomplete;
/** @param metainfo may be null */
public TrackerInfo(InputStream in, byte[] my_id, byte[] infohash, MetaInfo metainfo)
public TrackerInfo(InputStream in, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util)
throws IOException
{
this(new BDecoder(in), my_id, infohash, metainfo);
this(new BDecoder(in), my_id, infohash, metainfo, util);
}
private TrackerInfo(BDecoder be, byte[] my_id, byte[] infohash, MetaInfo metainfo)
private TrackerInfo(BDecoder be, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util)
throws IOException
{
this(be.bdecodeMap().getMap(), my_id, infohash, metainfo);
this(be.bdecodeMap().getMap(), my_id, infohash, metainfo, util);
}
private TrackerInfo(Map m, byte[] my_id, byte[] infohash, MetaInfo metainfo)
private TrackerInfo(Map m, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util)
throws IOException
{
BEValue reason = (BEValue)m.get("failure reason");
@@ -85,10 +85,10 @@ class TrackerInfo
Set<Peer> p;
try {
// One big string (the official compact format)
p = getPeers(bePeers.getBytes(), my_id, infohash, metainfo);
p = getPeers(bePeers.getBytes(), my_id, infohash, metainfo, util);
} catch (InvalidBEncodingException ibe) {
// List of Dictionaries or List of Strings
p = getPeers(bePeers.getList(), my_id, infohash, metainfo);
p = getPeers(bePeers.getList(), my_id, infohash, metainfo, util);
}
peers = p;
}
@@ -124,7 +124,7 @@ class TrackerInfo
******/
/** List of Dictionaries or List of Strings */
private static Set<Peer> getPeers(List<BEValue> l, byte[] my_id, byte[] infohash, MetaInfo metainfo)
private static Set<Peer> getPeers(List<BEValue> l, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util)
throws IOException
{
Set<Peer> peers = new HashSet(l.size());
@@ -138,7 +138,7 @@ class TrackerInfo
try {
// Case 2 - compact - A list of 32-byte binary strings (hashes)
// This was just for testing and is not the official format
peerID = new PeerID(bev.getBytes());
peerID = new PeerID(bev.getBytes(), util);
} catch (InvalidBEncodingException ibe2) {
// don't let one bad entry spoil the whole list
//Snark.debug("Discarding peer from list: " + ibe, Snark.ERROR);
@@ -157,7 +157,7 @@ class TrackerInfo
* One big string of concatenated 32-byte hashes
* @since 0.8.1
*/
private static Set<Peer> getPeers(byte[] l, byte[] my_id, byte[] infohash, MetaInfo metainfo)
private static Set<Peer> getPeers(byte[] l, byte[] my_id, byte[] infohash, MetaInfo metainfo, I2PSnarkUtil util)
throws IOException
{
int count = l.length / HASH_LENGTH;
@@ -168,7 +168,7 @@ class TrackerInfo
byte[] hash = new byte[HASH_LENGTH];
System.arraycopy(l, i * HASH_LENGTH, hash, 0, HASH_LENGTH);
try {
peerID = new PeerID(hash);
peerID = new PeerID(hash, util);
} catch (InvalidBEncodingException ibe) {
// won't happen
continue;

View File

@@ -12,6 +12,7 @@ import net.i2p.data.Hash;
/**
* Stub for KRPC
* @since 0.8.4
*/
public interface DHT {

View File

@@ -25,7 +25,7 @@ import net.i2p.util.SimpleTimer2;
*
* And a real Kademlia routing table, which stores node IDs only.
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class DHTNodes {
@@ -44,13 +44,17 @@ class DHTNodes {
private static final long MIN_EXPIRE_TIME = 10*60*1000;
private static final long DELTA_EXPIRE_TIME = 3*60*1000;
private static final int MAX_PEERS = 799;
/** Buckets older than this are refreshed - BEP 5 says 15 minutes */
private static final long MAX_BUCKET_AGE = 15*60*1000;
private static final int KAD_K = 8;
private static final int KAD_B = 1;
public DHTNodes(I2PAppContext ctx, NID me) {
_context = ctx;
_expireTime = MAX_EXPIRE_TIME;
_log = _context.logManager().getLog(DHTNodes.class);
_nodeMap = new ConcurrentHashMap();
_kad = new KBucketSet(ctx, me, 8, 1);
_kad = new KBucketSet(ctx, me, KAD_K, KAD_B, new KBTrimmer(ctx, KAD_K));
}
public void start() {
@@ -121,7 +125,7 @@ class DHTNodes {
* DHT - get random keys to explore
*/
public List<NID> getExploreKeys() {
return _kad.getExploreKeys(15*60*1000);
return _kad.getExploreKeys(MAX_BUCKET_AGE);
}
/** */

View File

@@ -17,7 +17,7 @@ import net.i2p.util.SimpleTimer2;
/**
* The tracker stores peers, i.e. Dest hashes (not nodes).
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class DHTTracker {

View File

@@ -9,7 +9,7 @@ import org.klomp.snark.I2PSnarkUtil;
/**
* A 20-byte SHA1 info hash
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class InfoHash extends SHA1Hash {

View File

@@ -0,0 +1,38 @@
package org.klomp.snark.dht;
import java.util.Set;
import net.i2p.I2PAppContext;
import net.i2p.kademlia.KBucket;
import net.i2p.kademlia.KBucketTrimmer;
/**
* Removes an element older than 15 minutes, but only if the bucket hasn't changed in 5 minutes.
* @since 0.9.2
*/
class KBTrimmer implements KBucketTrimmer<NID> {
private final I2PAppContext _ctx;
private final int _max;
private static final long MIN_BUCKET_AGE = 5*60*1000;
private static final long MAX_NODE_AGE = 15*60*1000;
public KBTrimmer(I2PAppContext ctx, int max) {
_ctx = ctx;
_max = max;
}
public boolean trim(KBucket<NID> kbucket, NID toAdd) {
long now = _ctx.clock().now();
if (kbucket.getLastChanged() > now - MIN_BUCKET_AGE)
return false;
Set<NID> entries = kbucket.getEntries();
for (NID nid : entries) {
if (nid.lastSeen() < now - MAX_NODE_AGE) {
if (kbucket.remove(nid))
return true;
}
}
return entries.size() < _max;
}
}

View File

@@ -79,7 +79,7 @@ import org.klomp.snark.bencode.InvalidBEncodingException;
* - nodes (in the find_node and get_peers response) is one concatenated string, not a list of strings, right?
* - Node ID enforcement, keyspace rotation?
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
public class KRPC implements I2PSessionMuxedListener, DHT {
@@ -133,6 +133,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private static final int REPLY_PONG = 1;
private static final int REPLY_PEERS = 2;
private static final int REPLY_NODES = 3;
private static final int REPLY_NETWORK_FAIL = 4;
public static final boolean SECURE_NID = true;
@@ -272,6 +273,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (! (ni.equals(_myNodeInfo) || (toTry.contains(ni) && tried.contains(ni))))
toTry.add(ni);
}
} else if (replyType == REPLY_NETWORK_FAIL) {
break;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject());
@@ -370,6 +373,8 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
if (! (ni.equals(_myNodeInfo) || tried.contains(ni) || toTry.contains(ni)))
toTry.add(ni);
}
} else if (replyType == REPLY_NETWORK_FAIL) {
break;
} else {
if (_log.shouldLog(Log.INFO))
_log.info("Got unexpected reply " + replyType + ": " + waiter.getReplyObject());
@@ -564,7 +569,11 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
_tracker.stop();
PersistDHT.saveDHT(_knownNodes, _dhtFile);
_knownNodes.stop();
_sentQueries.clear();
for (Iterator<ReplyWaiter> iter = _sentQueries.values().iterator(); iter.hasNext(); ) {
ReplyWaiter waiter = iter.next();
iter.remove();
waiter.networkFail();
}
_outgoingTokens.clear();
_incomingTokens.clear();
}
@@ -1317,7 +1326,7 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
private final NodeInfo sentTo;
private final Runnable onReply;
private final Runnable onTimeout;
private int replyCode;
private volatile int replyCode;
private Object sentObject;
private Object replyObject;
@@ -1400,6 +1409,18 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
this.notifyAll();
}
}
/**
* Will notify this but not run onReply or onTimeout,
* or remove from _sentQueries, or call heardFrom().
*/
public void networkFail() {
cancel();
replyCode = REPLY_NETWORK_FAIL;
synchronized(this) {
this.notifyAll();
}
}
}
// I2PSessionMuxedListener interface ----------------
@@ -1532,22 +1553,24 @@ public class KRPC implements I2PSessionMuxedListener, DHT {
return;
if (!_hasBootstrapped) {
if (_log.shouldLog(Log.INFO))
_log.info("Bootstrap start size: " + _knownNodes.size());
_log.info("Bootstrap start, size: " + _knownNodes.size());
explore(_myNID, 8, 60*1000, 1);
if (_log.shouldLog(Log.INFO))
_log.info("Bootstrap done size: " + _knownNodes.size());
_log.info("Bootstrap done, size: " + _knownNodes.size());
_hasBootstrapped = true;
}
if (!_isRunning)
return;
if (_log.shouldLog(Log.INFO))
_log.info("Explore start size: " + _knownNodes.size());
_log.info("Explore start. size: " + _knownNodes.size());
List<NID> keys = _knownNodes.getExploreKeys();
for (NID nid : keys) {
explore(nid, 8, 60*1000, 1);
if (!_isRunning)
return;
}
if (_log.shouldLog(Log.INFO))
_log.info("Explore done size: " + _knownNodes.size());
_log.info("Explore of " + keys.size() + " buckets done, new size: " + _knownNodes.size());
new Explorer(EXPLORE_TIME);
}
}

View File

@@ -9,12 +9,14 @@ import net.i2p.data.ByteArray;
/**
* Used for both incoming and outgoing message IDs
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class MsgID extends ByteArray {
/** BEP 5: 2 bytes, incremented */
private static final int MY_TOK_LEN = 8;
private static final int MAX_TOK_LEN = 16;
/** outgoing - generate a random ID */
public MsgID(I2PAppContext ctx) {
@@ -28,5 +30,8 @@ class MsgID extends ByteArray {
/** incoming - save the ID (arbitrary length) */
public MsgID(byte[] data) {
super(data);
// lets not get carried away
if (data.length > MAX_TOK_LEN)
throw new IllegalArgumentException();
}
}

View File

@@ -10,7 +10,7 @@ import net.i2p.util.Clock;
* A 20-byte peer ID, used as a Map key in lots of places.
* Must be public for constructor in KBucketSet.generateRandomKey()
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
public class NID extends SHA1Hash {

View File

@@ -20,7 +20,7 @@ import net.i2p.util.RandomSource;
* always have the Destination.
* The conpact info is immutable. The Destination may be added later.
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/

View File

@@ -12,7 +12,7 @@ import net.i2p.data.DataHelper;
* Closest to a InfoHash or NID key.
* Use for NodeInfos.
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class NodeInfoComparator implements Comparator<NodeInfo> {

View File

@@ -9,7 +9,7 @@ import net.i2p.data.Hash;
* A single peer for a single torrent.
* This is what the DHT tracker remembers.
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class Peer extends Hash {

View File

@@ -10,7 +10,7 @@ import net.i2p.data.Hash;
/**
* All the peers for a single torrent
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class Peers extends ConcurrentHashMap<Hash, Peer> {

View File

@@ -17,6 +17,7 @@ import net.i2p.util.SecureFileOutputStream;
/**
* Retrieve / Store the local DHT in a file
*
* @since 0.9.2
*/
abstract class PersistDHT {

View File

@@ -12,7 +12,7 @@ import net.i2p.data.DataHelper;
/**
* Used for Both outgoing and incoming tokens
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class Token extends ByteArray {

View File

@@ -9,7 +9,7 @@ import net.i2p.data.DataHelper;
/**
* Used to index incoming Tokens
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class TokenKey extends SHA1Hash {

View File

@@ -8,7 +8,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* All the torrents
*
* @since 0.8.4
* @since 0.9.2
* @author zzz
*/
class Torrents extends ConcurrentHashMap<InfoHash, Peers> {

View File

@@ -72,7 +72,7 @@ public class FetchAndAdd extends Snark implements EepGet.StatusListener, Runnabl
_log = ctx.logManager().getLog(FetchAndAdd.class);
_mgr = mgr;
_url = url;
_name = "* " + _("Download torrent file from {0}", url);
_name = _("Download torrent file from {0}", url);
byte[] fake = null;
try {
fake = SHA1.getInstance().digest(url.getBytes("ISO-8859-1"));

View File

@@ -89,7 +89,8 @@ public class I2PSnarkServlet extends DefaultServlet {
@Override
public void destroy() {
_manager.stop();
if (_manager != null)
_manager.stop();
super.destroy();
}
@@ -911,11 +912,17 @@ public class I2PSnarkServlet extends DefaultServlet {
TreeSet<String> fileNames = new TreeSet(new TorrentNameComparator());
fileNames.addAll(files);
ArrayList<Snark> rv = new ArrayList(fileNames.size());
int magnet = 0;
for (Iterator iter = fileNames.iterator(); iter.hasNext(); ) {
String name = (String)iter.next();
Snark snark = _manager.getTorrent(name);
if (snark != null)
rv.add(snark);
if (snark != null) {
// put downloads and magnets first
if (snark.getStorage() == null)
rv.add(magnet++, snark);
else
rv.add(snark);
}
}
return rv;
}
@@ -1782,7 +1789,7 @@ public class I2PSnarkServlet extends DefaultServlet {
}
ihash = xt.substring("urn:btih:".length());
trackerURL = getTrackerParam(url);
name = "* " + _("Magnet") + ' ' + ihash;
name = _("Magnet") + ' ' + ihash;
String dn = getParam("dn", url);
if (dn != null)
name += " (" + Storage.filterName(dn) + ')';
@@ -1792,7 +1799,7 @@ public class I2PSnarkServlet extends DefaultServlet {
int col = ihash.indexOf(':');
if (col >= 0)
ihash = ihash.substring(0, col);
name = "* " + _("Magnet") + ' ' + ihash;
name = _("Magnet") + ' ' + ihash;
} else {
return;
}

View File

@@ -100,6 +100,8 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
private static final boolean DEFAULT_ANSWER_PINGS = true;
private static final int DEFAULT_INACTIVITY_TIMEOUT = 90*1000;
private static final int DEFAULT_INACTIVITY_ACTION = INACTIVITY_ACTION_SEND;
private static final int DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR = 1;
private static final int DEFAULT_SLOW_START_GROWTH_RATE_FACTOR = 1;
/**
@@ -327,8 +329,10 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setInactivityTimeout(getInt(opts, PROP_INACTIVITY_TIMEOUT, DEFAULT_INACTIVITY_TIMEOUT));
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 1));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 1));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR,
DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR,
DEFAULT_SLOW_START_GROWTH_RATE_FACTOR));
// overrides default in super()
setConnectTimeout(getInt(opts, PROP_CONNECT_TIMEOUT, Connection.DISCONNECT_TIMEOUT));
setAnswerPings(getBool(opts, PROP_ANSWER_PINGS, DEFAULT_ANSWER_PINGS));
@@ -378,9 +382,11 @@ class ConnectionOptions extends I2PSocketOptionsImpl {
setInactivityAction(getInt(opts, PROP_INACTIVITY_ACTION, DEFAULT_INACTIVITY_ACTION));
setInboundBufferSize(getMaxMessageSize() * (Connection.MAX_WINDOW_SIZE + 2));
if (opts.contains(PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR))
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR, 2));
setCongestionAvoidanceGrowthRateFactor(getInt(opts, PROP_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR,
DEFAULT_CONGESTION_AVOIDANCE_GROWTH_RATE_FACTOR));
if (opts.contains(PROP_SLOW_START_GROWTH_RATE_FACTOR))
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR, 2));
setSlowStartGrowthRateFactor(getInt(opts, PROP_SLOW_START_GROWTH_RATE_FACTOR,
DEFAULT_SLOW_START_GROWTH_RATE_FACTOR));
if (opts.containsKey(PROP_CONNECT_TIMEOUT))
// wow 5 minutes!!! FIXME!!
// overrides default in super()

View File

@@ -23,6 +23,8 @@ import net.i2p.util.SimpleTimer;
class ConnectionPacketHandler {
private final I2PAppContext _context;
private final Log _log;
public static final int MAX_SLOW_START_WINDOW = 24;
public ConnectionPacketHandler(I2PAppContext context) {
_context = context;
@@ -367,9 +369,16 @@ class ConnectionPacketHandler {
// grow acked/N times (where N = the slow start factor)
// always grow at least 1
int factor = con.getOptions().getSlowStartGrowthRateFactor();
if (factor <= 1)
newWindowSize += acked;
else if (acked < factor)
if (factor <= 1) {
// above a certain point, don't grow exponentially
// as it often leads to a big packet loss (30-50) all at once that
// takes quite a while (a minute or more) to recover from,
// especially if crypto tags are lost
if (newWindowSize >= MAX_SLOW_START_WINDOW)
newWindowSize++;
else
newWindowSize = Math.min(MAX_SLOW_START_WINDOW, newWindowSize + acked);
} else if (acked < factor)
newWindowSize++;
else
newWindowSize += acked / factor;

View File

@@ -31,7 +31,7 @@ class TCBShare {
private static final double RTT_DAMPENING = 0.75;
private static final double WDW_DAMPENING = 0.75;
private static final int MAX_RTT = ((int) Connection.MAX_RESEND_DELAY) / 2;
private static final int MAX_WINDOW_SIZE = Connection.MAX_WINDOW_SIZE / 4;
private static final int MAX_WINDOW_SIZE = ConnectionPacketHandler.MAX_SLOW_START_WINDOW;
public TCBShare(I2PAppContext ctx, SimpleTimer2 timer) {
_context = ctx;
@@ -45,6 +45,7 @@ class TCBShare {
_cleaner.cancel();
}
/** retrieve from cache */
public void updateOptsFromShare(Connection con) {
Destination dest = con.getRemotePeer();
if (dest == null)
@@ -65,6 +66,7 @@ class TCBShare {
opts.setWindowSize(e.getWindowSize());
}
/** store to cache */
public void updateShareOpts(Connection con) {
Destination dest = con.getRemotePeer();
if (dest == null)

View File

@@ -1,3 +1,44 @@
2012-08-31 zzz
* i2psnark: Remove * from magnet and download names
* Router: Lengthen shutdown spinner life
* Startup: Don't complain about clients.config missing on Android
2012-08-29 zzz
* ClientManager: Cleanups
* i2psnark:
- Fix NPE on destroy() if init() failed
- Add new flood-resistant KBucket trim policy
- Limit received MsgID size
* NTCP: Reduce lock contention (ticket #697)
* RandomIterator: Workaround for Android bug (ticket #703)
2012-08-27 zzz
* i2psnark:
- Notify threads awaiting DHT replies at shutdown
- Fix cases where we weren't using the session for b32 lookup
* Reseed: Remove forum.i2p2.de
* Streaming: Limit amount of slow-start exponential growth
* SSU:
- Limit UDPSender queue size
- Increase UDPSender max packet lifetime
- Clear UDPSender queue before sending destroys to all
- Increase PeerState queue size so large streaming windows
don't get dropped right away, especially at slow start
- Various improvements on iterating over pending outbound
messages in PeerState
* Wrapper: Update armv7 to 3.5.15
2012-08-27 kytv
* Update Java Service Wrapper to v3.5.15.
- Windows: Self-compiled with VS2010 in Windows 7. The icon has been
changed from Tanuki's default to Itoopie.
- FreeBSD: Self-compiled in FreeBSD 7.4 to eliminate the dependency on the
compat6x port.
- Linux ARMv5, Linux PPC32: Self-compiled in Debian Squeeze
- Linux x86, Linux x64, MacOSX & Solaris: Binares are from the "community
edition" deltapack offered by Tanuki. The x86 and x64 binaries for Linux
have been stripped.
2012-08-26 zzz
* DataHelper: Trim trailing whitespace when loading properties
* NetDB: Increase floodfills, decrease flood redundancy

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -18,7 +18,7 @@ public class RouterVersion {
/** deprecated */
public final static String ID = "Monotone";
public final static String VERSION = CoreVersion.VERSION;
public final static long BUILD = 19;
public final static long BUILD = 22;
/** for example "-test" */
public final static String EXTRA = "";

View File

@@ -86,7 +86,7 @@ class ClientConnectionRunner {
private ClientWriterRunner _writer;
private Hash _destHashCache;
/** are we, uh, dead */
private boolean _dead;
private volatile boolean _dead;
/** For outbound traffic. true if i2cp.messageReliability = "none"; @since 0.8.1 */
private boolean _dontSendMSM;
private final AtomicInteger _messageId; // messageId counter
@@ -463,10 +463,10 @@ class ClientConnectionRunner {
}
private class Rerequest implements SimpleTimer.TimedEvent {
private LeaseSet _ls;
private long _expirationTime;
private Job _onCreate;
private Job _onFailed;
private final LeaseSet _ls;
private final long _expirationTime;
private final Job _onCreate;
private final Job _onFailed;
public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) {
_ls = ls;
_expirationTime = expirationTime;

View File

@@ -31,8 +31,8 @@ class ClientListenerRunner implements Runnable {
protected ServerSocket _socket;
protected final int _port;
protected final boolean _bindAllInterfaces;
protected boolean _running;
protected boolean _listening;
protected volatile boolean _running;
protected volatile boolean _listening;
public static final String BIND_ALL_INTERFACES = "i2cp.tcp.bindAllInterfaces";

View File

@@ -45,7 +45,7 @@ class ClientManager {
private final HashMap<Destination, ClientConnectionRunner> _runners; // Destination --> ClientConnectionRunner
private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet
private final RouterContext _ctx;
private boolean _isStarted;
private volatile boolean _isStarted;
/** Disable external interface, allow internal clients only @since 0.8.3 */
private static final String PROP_DISABLE_EXTERNAL = "i2cp.disableInterface";

View File

@@ -38,7 +38,7 @@ import net.i2p.util.Log;
public class ClientManagerFacadeImpl extends ClientManagerFacade implements InternalClientManager {
private final Log _log;
private ClientManager _manager;
private RouterContext _context;
private final RouterContext _context;
/** note that this is different than the property the client side uses, i2cp.tcp.port */
public final static String PROP_CLIENT_PORT = "i2cp.port";
public final static int DEFAULT_PORT = 7654;

View File

@@ -17,10 +17,10 @@ import net.i2p.util.Log;
* @author zzz modded to use concurrent
*/
class ClientWriterRunner implements Runnable {
private BlockingQueue<I2CPMessage> _messagesToWrite;
private ClientConnectionRunner _runner;
private Log _log;
private long _id;
private final BlockingQueue<I2CPMessage> _messagesToWrite;
private final ClientConnectionRunner _runner;
private final Log _log;
private final long _id;
private static long __id = 0;
public ClientWriterRunner(RouterContext context, ClientConnectionRunner runner) {

View File

@@ -24,8 +24,8 @@ import net.i2p.util.Log;
*
*/
class CreateSessionJob extends JobImpl {
private Log _log;
private ClientConnectionRunner _runner;
private final Log _log;
private final ClientConnectionRunner _runner;
public CreateSessionJob(RouterContext context, ClientConnectionRunner runner) {
super(context);

View File

@@ -20,12 +20,12 @@ import net.i2p.router.Job;
*/
class LeaseRequestState {
private LeaseSet _grantedLeaseSet;
private LeaseSet _requestedLeaseSet;
private PrivateKey _leaseSetPrivateKey;
private SigningPrivateKey _leaseSetSigningPrivateKey;
private Job _onGranted;
private Job _onFailed;
private long _expiration;
private final LeaseSet _requestedLeaseSet;
//private PrivateKey _leaseSetPrivateKey;
//private SigningPrivateKey _leaseSetSigningPrivateKey;
private final Job _onGranted;
private final Job _onFailed;
private final long _expiration;
private boolean _successful;
public LeaseRequestState(Job onGranted, Job onFailed, long expiration, LeaseSet requested) {
@@ -35,26 +35,34 @@ class LeaseRequestState {
_requestedLeaseSet = requested;
}
/** created lease set from client */
/** created lease set from client - FIXME always null */
public LeaseSet getGranted() { return _grantedLeaseSet; }
/** FIXME unused - why? */
public void setGranted(LeaseSet ls) { _grantedLeaseSet = ls; }
/** lease set that is being requested */
public LeaseSet getRequested() { return _requestedLeaseSet; }
public void setRequested(LeaseSet ls) { _requestedLeaseSet = ls; }
//public void setRequested(LeaseSet ls) { _requestedLeaseSet = ls; }
/** the private encryption key received regarding the lease set */
public PrivateKey getPrivateKey() { return _leaseSetPrivateKey; }
public void getPrivateKey(PrivateKey pk) { _leaseSetPrivateKey = pk; }
//public PrivateKey getPrivateKey() { return _leaseSetPrivateKey; }
//public void setPrivateKey(PrivateKey pk) { _leaseSetPrivateKey = pk; }
/** the private signing key received regarding the lease set (for revocation) */
public SigningPrivateKey getSigningPrivateKey() { return _leaseSetSigningPrivateKey; }
public void getSigningPrivateKey(SigningPrivateKey spk) { _leaseSetSigningPrivateKey = spk; }
//public SigningPrivateKey getSigningPrivateKey() { return _leaseSetSigningPrivateKey; }
//public void setSigningPrivateKey(SigningPrivateKey spk) { _leaseSetSigningPrivateKey = spk; }
/** what to do once the lease set is created */
public Job getOnGranted() { return _onGranted; }
public void setOnGranted(Job jb) { _onGranted = jb; }
//public void setOnGranted(Job jb) { _onGranted = jb; }
/** what to do if the lease set create fails / times out */
public Job getOnFailed() { return _onFailed; }
public void setOnFailed(Job jb) { _onFailed = jb; }
//public void setOnFailed(Job jb) { _onFailed = jb; }
/** when the request for the lease set expires */
public long getExpiration() { return _expiration; }
/** whether the request was successful in the time allotted */
public boolean getIsSuccessful() { return _successful; }
public void setIsSuccessful(boolean is) { _successful = is; }

View File

@@ -16,8 +16,8 @@ import net.i2p.router.RouterContext;
* Look up the lease of a hash, to convert it to a Destination for the client
*/
class LookupDestJob extends JobImpl {
private ClientConnectionRunner _runner;
private Hash _hash;
private final ClientConnectionRunner _runner;
private final Hash _hash;
public LookupDestJob(RouterContext context, ClientConnectionRunner runner, Hash h) {
super(context);

View File

@@ -22,9 +22,9 @@ import net.i2p.util.Log;
*
*/
class MessageReceivedJob extends JobImpl {
private Log _log;
private ClientConnectionRunner _runner;
private Payload _payload;
private final Log _log;
private final ClientConnectionRunner _runner;
private final Payload _payload;
public MessageReceivedJob(RouterContext ctx, ClientConnectionRunner runner, Destination toDest, Destination fromDest, Payload payload) {
super(ctx);
_log = ctx.logManager().getLog(MessageReceivedJob.class);

View File

@@ -21,10 +21,10 @@ import net.i2p.util.Log;
*
*/
class ReportAbuseJob extends JobImpl {
private Log _log;
private ClientConnectionRunner _runner;
private String _reason;
private int _severity;
private final Log _log;
private final ClientConnectionRunner _runner;
private final String _reason;
private final int _severity;
public ReportAbuseJob(RouterContext context, ClientConnectionRunner runner, String reason, int severity) {
super(context);
_log = context.logManager().getLog(ReportAbuseJob.class);

View File

@@ -62,7 +62,7 @@ public class Reseeder {
public static final String DEFAULT_SEED_URL =
"http://netdb.i2p2.de/" + "," +
"http://reseed.i2p-projekt.de/" + "," +
"http://forum.i2p2.de/netdb/" + "," +
// "http://forum.i2p2.de/netdb/" + "," +
"http://euve5653.vserver.de/netDb/" + "," +
// "http://r31453.ovh.net/static_media/files/netDb/" + "," +
"http://cowpuncher.drollette.com/netdb/" + "," +
@@ -73,7 +73,7 @@ public class Reseeder {
/** @since 0.8.2 */
public static final String DEFAULT_SSL_SEED_URL =
"https://netdb.i2p2.de/" + "," +
"https://forum.i2p2.de/netdb/" + "," +
// "https://forum.i2p2.de/netdb/" + "," +
"https://euve5653.vserver.de/netDb/" + "," +
"https://reseed.i2p-projekt.de/" + "," +
// "https://r31453.ovh.net/static_media/files/netDb/" + "," +

View File

@@ -33,7 +33,8 @@ public class StartupJob extends JobImpl {
public String getName() { return "Startup Router"; }
public void runJob() {
getContext().jobQueue().addJob(new LoadClientAppsJob(getContext()));
if (!System.getProperty("java.vendor").contains("Android"))
getContext().jobQueue().addJob(new LoadClientAppsJob(getContext()));
getContext().statPublisher().startup();
getContext().jobQueue().addJob(new LoadRouterInfoJob(getContext()));
}

View File

@@ -17,7 +17,7 @@ public class Spinner extends Thread {
@Override
public void run() {
try {
sleep(60*1000);
sleep(5*60*1000);
} catch (InterruptedException ie) {}
}
}

View File

@@ -64,8 +64,9 @@ class Reader {
already = true;
} else {
_pendingConnections.add(con);
// only notify here if added?
}
_pendingConnections.notifyAll();
_pendingConnections.notify();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("wantsRead: " + con + " already live? " + already);
@@ -75,7 +76,8 @@ class Reader {
synchronized (_pendingConnections) {
_readAfterLive.remove(con);
_pendingConnections.remove(con);
_pendingConnections.notifyAll();
// necessary?
_pendingConnections.notify();
}
}

View File

@@ -62,8 +62,9 @@ class Writer {
already = true;
} else {
pending = _pendingConnections.add(con);
// only notify here if added?
}
_pendingConnections.notifyAll();
_pendingConnections.notify();
}
if (_log.shouldLog(Log.DEBUG))
_log.debug("wantsWrite: " + con + " already live? " + already + " added to pending? " + pending + ": " + source);
@@ -73,7 +74,8 @@ class Writer {
synchronized (_pendingConnections) {
_writeAfterLive.remove(con);
_pendingConnections.remove(con);
_pendingConnections.notifyAll();
// necessary?
_pendingConnections.notify();
}
}

View File

@@ -42,6 +42,8 @@ class OutboundMessageState {
private static final int MAX_ENTRIES = 64;
/** would two caches, one for small and one for large messages, be better? */
private static final ByteCache _cache = ByteCache.getInstance(MAX_ENTRIES, MAX_MSG_SIZE);
private static final long EXPIRATION = 10*1000;
public OutboundMessageState(I2PAppContext context) {
_context = context;
@@ -64,6 +66,7 @@ class OutboundMessageState {
/**
* Called from UDPTransport
* TODO make two constructors, remove this, and make more things final
* @return success
*/
public boolean initialize(I2NPMessage msg, PeerState peer) {
@@ -82,6 +85,7 @@ class OutboundMessageState {
/**
* Called from OutboundMessageFragments
* TODO make two constructors, remove this, and make more things final
* @return success
*/
public boolean initialize(OutNetMessage m, I2NPMessage msg) {
@@ -121,7 +125,7 @@ class OutboundMessageState {
_startedOn = _context.clock().now();
_nextSendTime = _startedOn;
_expiration = _startedOn + 10*1000;
_expiration = _startedOn + EXPIRATION;
//_expiration = msg.getExpiration();
if (_log.shouldLog(Log.DEBUG))

View File

@@ -38,7 +38,7 @@ class PacketPusher implements Runnable {
if (packets != null) {
for (int i = 0; i < packets.length; i++) {
if (packets[i] != null) // null for ACKed fragments
//_sender.add(packets[i], 0); // 0 does not block //100); // blocks for up to 100ms
// BLOCKING if queue is full
_sender.add(packets[i]);
}
}

View File

@@ -217,6 +217,13 @@ class PeerState {
private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES;
private static final int MAX_SEND_WINDOW_BYTES = 1024*1024;
/**
* Was 32 before 0.9.2, but since the streaming lib goes up to 128,
* we would just drop our own msgs right away during slow start.
* May need to adjust based on memory.
*/
private static final int MAX_SEND_MSGS_PENDING = 128;
/*
* 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message,
* 522 fragment bytes, which is enough to send a tunnel data message in 2
@@ -1181,6 +1188,14 @@ class PeerState {
RemoteHostId getRemoteHostId() { return _remoteHostId; }
/**
* TODO should this use a queue, separate from the list of msgs pending an ack?
* TODO bring back tail drop?
* TODO priority queue? (we don't implement priorities in SSU now)
* TODO backlog / pushback / block instead of dropping? Can't really block here.
* TODO SSU does not support isBacklogged() now
* @return total pending messages
*/
public int add(OutboundMessageState state) {
if (_dead) {
_transport.failed(state, false);
@@ -1193,8 +1208,8 @@ class PeerState {
boolean fail = false;
synchronized (_outboundMessages) {
rv = _outboundMessages.size() + 1;
if (rv > 32) {
// 32 queued messages? to *one* peer? nuh uh.
if (rv > MAX_SEND_MSGS_PENDING) {
// too many queued messages to one peer? nuh uh.
fail = true;
rv--;
@@ -1240,8 +1255,11 @@ class PeerState {
_outboundMessages.add(state);
}
}
if (fail)
if (fail) {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping msg, OB queue full for " + toString());
_transport.failed(state, false);
}
return rv;
}
@@ -1278,6 +1296,10 @@ class PeerState {
/**
* Expire / complete any outbound messages
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 1st.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return number of active outbound messages remaining
*/
public int finishMessages() {
@@ -1350,14 +1372,20 @@ class PeerState {
/**
* Pick a message we want to send and allocate it out of our window
* @return allocated message to send, or null if no messages or no resources
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 2nd, if finishMessages() returned > 0.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return allocated message to send, or null if no messages or no resources
*/
public OutboundMessageState allocateSend() {
if (_dead) return null;
synchronized (_outboundMessages) {
for (OutboundMessageState state : _outboundMessages) {
if (locked_shouldSend(state)) {
// We have 3 return values, because if allocateSendingBytes() returns false,
// then we can stop iterating.
ShouldSend should = locked_shouldSend(state);
if (should == ShouldSend.YES) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Allocate sending to " + _remotePeer + ": " + state.getMessageId());
/*
@@ -1369,6 +1397,12 @@ class PeerState {
}
*/
return state;
} else if (should == ShouldSend.NO_BW) {
// no more bandwidth available
// we don't bother looking for a smaller msg that would fit.
// By not looking further, we keep strict sending order, and that allows
// some efficiency in acked() below.
break;
} /* else {
OutNetMessage msg = state.getMessage();
if (msg != null)
@@ -1382,6 +1416,10 @@ class PeerState {
}
/**
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
* TODO combine finishMessages(), allocateSend(), and getNextDelay() so we don't iterate 3 times.
*
* @return how long to wait before sending, or Integer.MAX_VALUE if we have nothing to send.
* If ready now, will return 0 or a negative value.
*/
@@ -1396,6 +1434,9 @@ class PeerState {
}
for (OutboundMessageState state : _outboundMessages) {
int delay = (int)(state.getNextSendTime() - now);
// short circuit once we hit something ready to go
if (delay <= 0)
return delay;
if (delay < rv)
rv = delay;
}
@@ -1435,7 +1476,13 @@ class PeerState {
return mtu - (PacketBuilder.MIN_DATA_PACKET_OVERHEAD + MIN_ACK_SIZE);
}
private boolean locked_shouldSend(OutboundMessageState state) {
private enum ShouldSend { YES, NO, NO_BW };
/**
* Have 3 return values, because if allocateSendingBytes() returns false,
* then allocateSend() can stop iterating
*/
private ShouldSend locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();
if (state.getNextSendTime() <= now) {
if (!state.isFragmented()) {
@@ -1465,7 +1512,7 @@ class PeerState {
} else if ( (max <= 0) || (THROTTLE_RESENDS) ) {
//if (state.getMessage() != null)
// state.getMessage().timestamp("choked, with another message retransmitting");
return false;
return ShouldSend.NO;
} else {
//if (state.getMessage() != null)
// state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
@@ -1491,7 +1538,7 @@ class PeerState {
//if (peer.getSendWindowBytesRemaining() > 0)
// _throttle.unchoke(peer.getRemotePeer());
return true;
return ShouldSend.YES;
} else {
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
//if (state.getMessage() != null)
@@ -1510,15 +1557,16 @@ class PeerState {
// state.getMessage().timestamp("choked, not enough available, wsize="
// + getSendWindowBytes() + " available="
// + getSendWindowBytesRemaining());
return false;
return ShouldSend.NO_BW;
}
} // nextTime <= now
return false;
return ShouldSend.NO;
}
/**
* A full ACK was received.
* TODO if messages awaiting ack were a HashSet this would be faster.
*
* @return true if the message was acked for the first time
*/
@@ -1531,6 +1579,11 @@ class PeerState {
if (state.getMessageId() == messageId) {
iter.remove();
break;
} else if (state.getPushCount() <= 0) {
// _outboundMessages is ordered, so once we get to a msg that
// hasn't been transmitted yet, we can stop
state = null;
break;
} else {
state = null;
}
@@ -1600,6 +1653,11 @@ class PeerState {
_retransmitter = null;
}
break;
} else if (state.getPushCount() <= 0) {
// _outboundMessages is ordered, so once we get to a msg that
// hasn't been transmitted yet, we can stop
state = null;
break;
} else {
state = null;
}

View File

@@ -136,13 +136,10 @@ class UDPEndpoint {
/**
* Add the packet to the outobund queue to be sent ASAP (as allowed by
* the bandwidth limiter)
*
* @return ZERO (used to be number of packets in the queue)
* BLOCKING if queue is full.
*/
public int send(UDPPacket packet) {
if (_sender == null)
return 0;
return _sender.add(packet);
public void send(UDPPacket packet) {
_sender.add(packet);
}
/**
@@ -154,4 +151,12 @@ class UDPEndpoint {
return null;
return _receiver.receiveNext();
}
/**
* Clear outbound queue, probably in preparation for sending destroy() to everybody.
* @since 0.9.2
*/
public void clearOutbound() {
_sender.clear();
}
}

View File

@@ -25,12 +25,17 @@ class UDPSender {
private final Runner _runner;
private static final int TYPE_POISON = 99999;
//private static final int MAX_QUEUED = 4;
private static final int MIN_QUEUE_SIZE = 64;
private static final int MAX_QUEUE_SIZE = 384;
public UDPSender(RouterContext ctx, DatagramSocket socket, String name) {
_context = ctx;
_log = ctx.logManager().getLog(UDPSender.class);
_outboundQueue = new LinkedBlockingQueue();
long maxMemory = Runtime.getRuntime().maxMemory();
if (maxMemory == Long.MAX_VALUE)
maxMemory = 96*1024*1024l;
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (1024*1024)));
_outboundQueue = new LinkedBlockingQueue(qsize);
_socket = socket;
_runner = new Runner();
_name = name;
@@ -81,6 +86,14 @@ class UDPSender {
_outboundQueue.clear();
}
/**
* Clear outbound queue, probably in preparation for sending destroy() to everybody.
* @since 0.9.2
*/
public void clear() {
_outboundQueue.clear();
}
/*********
public DatagramSocket updateListeningPort(DatagramSocket socket, int newPort) {
return _runner.updateListeningPort(socket, newPort);
@@ -93,10 +106,9 @@ class UDPSender {
* available, if requested, otherwise it returns immediately
*
* @param blockTime how long to block IGNORED
* @return ZERO (used to be number of packets in the queue)
* @deprecated use add(packet)
*/
public int add(UDPPacket packet, int blockTime) {
public void add(UDPPacket packet, int blockTime) {
/********
//long expiration = _context.clock().now() + blockTime;
int remaining = -1;
@@ -148,31 +160,32 @@ class UDPSender {
_log.debug("Added the packet onto the queue with " + remaining + " remaining and a lifetime of " + lifetime);
return remaining;
********/
return add(packet);
add(packet);
}
private static final int MAX_HEAD_LIFETIME = 1000;
private static final int MAX_HEAD_LIFETIME = 3*1000;
/**
* Put it on the queue
* @return ZERO (used to be number of packets in the queue)
* Put it on the queue.
* BLOCKING if queue is full (backs up PacketPusher thread)
*/
public int add(UDPPacket packet) {
if (packet == null || !_keepRunning) return 0;
int size = 0;
public void add(UDPPacket packet) {
if (packet == null || !_keepRunning) return;
int psz = packet.getPacket().getLength();
if (psz > PeerState.LARGE_MTU) {
_log.error("Dropping large UDP packet " + psz + " bytes: " + packet);
return 0;
return;
}
try {
_outboundQueue.put(packet);
} catch (InterruptedException ie) {
return;
}
_outboundQueue.offer(packet);
//size = _outboundQueue.size();
//_context.statManager().addRateData("udp.sendQueueSize", size, lifetime);
if (_log.shouldLog(Log.DEBUG)) {
size = _outboundQueue.size();
_log.debug("Added the packet onto the queue with " + size + " remaining and a lifetime of " + packet.getLifetime());
_log.debug("Added the packet onto the queue with a lifetime of " + packet.getLifetime());
}
return size;
}
private class Runner implements Runnable {

View File

@@ -1119,17 +1119,17 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/**
* This sends it directly out, bypassing OutboundMessageFragments
* and the PacketPusher. The only queueing is for the bandwidth limiter.
*
* @return ZERO (used to be number of packets in the queue)
* BLOCKING if OB queue is full.
*/
int send(UDPPacket packet) {
void send(UDPPacket packet) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending packet " + packet);
return _endpoint.send(packet);
_endpoint.send(packet);
}
/**
* Send a session destroy message, bypassing OMF and PacketPusher.
* BLOCKING if OB queue is full.
*
* @since 0.8.9
*/
@@ -1145,10 +1145,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
/**
* Send a session destroy message to everybody
* BLOCKING if OB queue is full.
*
* @since 0.8.9
*/
private void destroyAll() {
_endpoint.clearOutbound();
int howMany = _peersByIdent.size();
if (_log.shouldLog(Log.WARN))
_log.warn("Sending destroy to : " + howMany + " peers");

View File

@@ -6,7 +6,7 @@ package net.i2p.router.util;
* No license, free to use
*/
//import java.util.ArrayList;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Iterator;
@@ -89,6 +89,13 @@ public class RandomIterator<E> implements Iterator<E> {
/** Used to narrow the range to take random indexes from */
private int lower, upper;
private static final boolean isAndroid = System.getProperty("java.vendor").contains("Android");
static {
if (isAndroid)
testAndroid();
}
public RandomIterator(List<E> list){
this.list = list;
LIST_SIZE = list.size();
@@ -129,9 +136,10 @@ public class RandomIterator<E> implements Iterator<E> {
// I2P - ensure lower and upper are always clear
if (hasNext()) {
if (index == lower)
lower = served.nextClearBit(lower);
// workaround for Android ICS bug - see below
lower = isAndroid ? nextClearBit(index) : served.nextClearBit(index);
else if (index == upper)
upper = previousClearBit(upper - 1);
upper = previousClearBit(index - 1);
}
return list.get(index);
}
@@ -146,6 +154,20 @@ public class RandomIterator<E> implements Iterator<E> {
return -1;
}
/**
* Workaround for bug in Android (ICS only?)
* http://code.google.com/p/android/issues/detail?id=31036
* @since 0.9.2
*/
private int nextClearBit(int n) {
for (int i = n; i <= upper; i++) {
if (!served.get(i)) {
return i;
}
}
return -1;
}
/**
* @throws UnsupportedOperationException always
*/
@@ -153,19 +175,16 @@ public class RandomIterator<E> implements Iterator<E> {
throw new UnsupportedOperationException();
}
/*****
public static void main(String[] args) {
System.out.println("\n testing with 0");
testAndroid();
test(0);
System.out.println("\n testing with 1");
test(1);
System.out.println("\n testing with 2");
test(2);
System.out.println("\n testing with 1000");
test(1000);
}
public static void test(int n) {
private static void test(int n) {
System.out.println("testing with " + n);
List<Integer> l = new ArrayList(n);
for (int i = 0; i < n; i++) {
l.add(Integer.valueOf(i));
@@ -174,5 +193,23 @@ public class RandomIterator<E> implements Iterator<E> {
System.out.println(iter.next().toString());
}
}
*****/
/**
* Test case from android ticket above
* @since 0.9.2
*/
private static void testAndroid() {
System.out.println("checking for Android bug");
BitSet theBitSet = new BitSet(864);
for (int exp =0; exp < 864; exp++) {
int act = theBitSet.nextClearBit(0);
if (exp != act) {
System.err.println(String.format("Test failed for: exp=%d, act=%d", exp, act));
System.err.println("Android BitSet bug detected, workaround implemented!");
return;
}
theBitSet.set(exp);
}
System.err.println("Android BitSet bug NOT detected, no workaround needed!");
}
}