From 4b77ddedccf0fe86aa82cfd124aa3fac74ebd9ba Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Mon, 20 Feb 2006 14:19:52 +0000 Subject: [PATCH] 2006-02-20 jrandom * Major SSU and router tuning to reduce contention, memory usage, and GC churn. There are still issues to be worked out, but this should be a substantial improvement. * Modified the optional netDb harvester task to support choosing whether to use (non-anonymous) direct connections or (anonymous) exploratory tunnels to do the harvesting. Harvesting itself is enabled via the advanced config "netDb.shouldHarvest=true" (default is false) and the connection type can be chosen via "netDb.harvestDirectly=false" (default is false). --- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 4 + .../src/net/i2p/router/web/ReseedHandler.java | 4 +- .../src/net/i2p/client/I2PSessionImpl.java | 32 ++- .../src/net/i2p/crypto/ElGamalAESEngine.java | 8 +- .../src/net/i2p/crypto/ElGamalEngine.java | 2 + .../net/i2p/data/i2cp/I2CPMessageReader.java | 10 +- core/java/src/net/i2p/util/SimpleTimer.java | 28 ++- .../src/net/i2p/data/i2np/DataMessage.java | 21 +- .../src/net/i2p/data/i2np/GarlicMessage.java | 19 +- .../net/i2p/data/i2np/I2NPMessageImpl.java | 8 +- router/java/src/net/i2p/router/JobTiming.java | 2 +- .../src/net/i2p/router/RouterVersion.java | 4 +- .../router/client/ClientConnectionRunner.java | 6 +- .../net/i2p/router/client/ClientManager.java | 39 +--- .../networkdb/kademlia/HarvesterJob.java | 36 +++- .../transport/FIFOBandwidthLimiter.java | 105 +++++----- .../transport/FIFOBandwidthRefiller.java | 10 +- .../i2p/router/transport/TransportImpl.java | 8 +- .../udp/InboundMessageFragments.java | 67 ++++--- .../udp/OutboundMessageFragments.java | 8 +- .../router/transport/udp/PacketBuilder.java | 42 ++-- .../i2p/router/transport/udp/PeerState.java | 186 ++++++++++-------- .../router/transport/udp/UDPEndpointTest.java | 13 +- .../i2p/router/transport/udp/UDPPacket.java | 66 ++++--- .../router/transport/udp/UDPPacketReader.java | 18 +- .../i2p/router/transport/udp/UDPReceiver.java | 21 +- .../i2p/router/transport/udp/UDPSender.java | 6 +- .../router/transport/udp/UDPTransport.java | 14 +- 28 files changed, 469 insertions(+), 318 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index d3a8973d27..77e825e5fb 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -250,6 +250,10 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL + from + " and " + to); } + // boo, hiss! shouldn't need this - the streaming lib should be configurable, but + // somehow the inactivity timer is sometimes failing to get triggered properly + //i2ps.setReadTimeout(2*60*1000); + ByteArray ba = _cache.acquire(); byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE]; try { diff --git a/apps/routerconsole/java/src/net/i2p/router/web/ReseedHandler.java b/apps/routerconsole/java/src/net/i2p/router/web/ReseedHandler.java index c5449a0bca..eb298e7c03 100644 --- a/apps/routerconsole/java/src/net/i2p/router/web/ReseedHandler.java +++ b/apps/routerconsole/java/src/net/i2p/router/web/ReseedHandler.java @@ -71,7 +71,9 @@ public class ReseedHandler { seedURL = DEFAULT_SEED_URL; try { URL dir = new URL(seedURL); - String content = new String(readURL(dir)); + byte contentRaw[] = readURL(dir); + if (contentRaw == null) return; + String content = new String(contentRaw); Set urls = new HashSet(); int cur = 0; while (true) { diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index bb573c76fc..4d952228b3 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -140,6 +140,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa loadConfig(options); _sessionId = null; _leaseSet = null; + _context.statManager().createRateStat("client.availableMessages", "How many messages are available for the current client", "ClientMessages", new long[] { 60*1000, 10*60*1000 }); } /** @@ -299,11 +300,17 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * */ public byte[] receiveMessage(int msgId) throws I2PSessionException { + int remaining = 0; MessagePayloadMessage msg = null; synchronized (_availableMessages) { msg = (MessagePayloadMessage) _availableMessages.remove(new Long(msgId)); + remaining = _availableMessages.size(); + } + _context.statManager().addRateData("client.availableMessages", remaining, 0); + if (msg == null) { + _log.error("Receive message " + msgId + " had no matches, remaining=" + remaining); + return null; } - if (msg == null) return null; return msg.getPayload().getUnencryptedData(); } @@ -339,9 +346,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Recieve a payload message and let the app know its available */ public void addNewMessage(MessagePayloadMessage msg) { + Long mid = new Long(msg.getMessageId()); + int avail = 0; synchronized (_availableMessages) { - _availableMessages.put(new Long(msg.getMessageId()), msg); + _availableMessages.put(mid, msg); + avail = _availableMessages.size(); } + _context.statManager().addRateData("client.availableMessages", avail, 0); long id = msg.getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { @@ -354,20 +365,23 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id); } - SimpleTimer.getInstance().addEvent(new VerifyUsage(id), 30*1000); + SimpleTimer.getInstance().addEvent(new VerifyUsage(mid), 30*1000); } private class VerifyUsage implements SimpleTimer.TimedEvent { - private long _msgId; - public VerifyUsage(long id) { _msgId = id; } + private Long _msgId; + public VerifyUsage(Long id) { _msgId = id; } public void timeReached() { MessagePayloadMessage removed = null; + int remaining = 0; synchronized (_availableMessages) { - removed = (MessagePayloadMessage)_availableMessages.remove(new Long(_msgId)); + removed = (MessagePayloadMessage)_availableMessages.remove(_msgId); + remaining = _availableMessages.size(); + } + if (removed != null) { + _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed + ": remaining: " + remaining); + _context.statManager().addRateData("client.availableMessages", remaining, 0); } - if (removed != null) - _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed); } - } private class AvailabilityNotifier implements Runnable { diff --git a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java index eecaeeff13..31f2f104b0 100644 --- a/core/java/src/net/i2p/crypto/ElGamalAESEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalAESEngine.java @@ -283,11 +283,12 @@ public class ElGamalAESEngine { try { SessionKey newKey = null; Hash readHash = null; - List tags = new ArrayList(); + List tags = null; //ByteArrayInputStream bais = new ByteArrayInputStream(decrypted); int cur = 0; long numTags = DataHelper.fromLong(decrypted, cur, 2); + if (numTags > 0) tags = new ArrayList((int)numTags); cur += 2; //_log.debug("# tags: " + numTags); if ((numTags < 0) || (numTags > 200)) throw new Exception("Invalid number of session tags"); @@ -326,7 +327,8 @@ public class ElGamalAESEngine { if (eq) { // everything matches. w00t. - foundTags.addAll(tags); + if (tags != null) + foundTags.addAll(tags); if (newKey != null) foundKey.setData(newKey.getData()); return unencrData; } @@ -610,4 +612,4 @@ public class ElGamalAESEngine { } } } -} \ No newline at end of file +} diff --git a/core/java/src/net/i2p/crypto/ElGamalEngine.java b/core/java/src/net/i2p/crypto/ElGamalEngine.java index b0f170120a..bb7585b26a 100644 --- a/core/java/src/net/i2p/crypto/ElGamalEngine.java +++ b/core/java/src/net/i2p/crypto/ElGamalEngine.java @@ -129,6 +129,7 @@ public class ElGamalEngine { (ybytes.length > 257 ? 257 : ybytes.length)); System.arraycopy(dbytes, 0, out, (dbytes.length < 257 ? 514 - dbytes.length : 257), (dbytes.length > 257 ? 257 : dbytes.length)); + /* StringBuffer buf = new StringBuffer(1024); buf.append("Timing\n"); buf.append("0-1: ").append(t1 - t0).append('\n'); @@ -142,6 +143,7 @@ public class ElGamalEngine { buf.append("8-9: ").append(t9 - t8).append('\n'); buf.append("9-10: ").append(t10 - t9).append('\n'); //_log.debug(buf.toString()); + */ long end = _context.clock().now(); long diff = end - start; diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java index b4fa132dd4..21dbb5d0f6 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java @@ -151,15 +151,17 @@ public class I2CPMessageReader { _log.debug("After handling the newly received message"); } } catch (I2CPMessageException ime) { - _log.error("Error handling message", ime); + _log.warn("Error handling message", ime); _listener.readError(I2CPMessageReader.this, ime); cancelRunner(); } catch (IOException ioe) { - _log.error("IO Error handling message", ioe); + _log.warn("IO Error handling message", ioe); _listener.disconnected(I2CPMessageReader.this); cancelRunner(); - } catch (Throwable t) { - _log.log(Log.CRIT, "Unhandled error reading I2CP stream", t); + } catch (OutOfMemoryError oom) { + throw oom; + } catch (Exception e) { + _log.log(Log.CRIT, "Unhandled error reading I2CP stream", e); _listener.disconnected(I2CPMessageReader.this); cancelRunner(); } diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index bd840cf901..a5db2561d6 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -45,6 +45,10 @@ public class SimpleTimer { } } + public void reschedule(TimedEvent event, long timeoutMs) { + addEvent(event, timeoutMs, false); + } + /** * Queue up the given event to be fired no sooner than timeoutMs from now. * However, if this event is already scheduled, the event will be scheduled @@ -52,7 +56,12 @@ public class SimpleTimer { * timeout. If this is not the desired behavior, call removeEvent first. * */ - public void addEvent(TimedEvent event, long timeoutMs) { + public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } + /** + * @param useEarliestEventTime if its already scheduled, use the earlier of the + * two timeouts, else use the later + */ + public void addEvent(TimedEvent event, long timeoutMs, boolean useEarliestTime) { int totalEvents = 0; long now = System.currentTimeMillis(); long eventTime = now + timeoutMs; @@ -61,11 +70,20 @@ public class SimpleTimer { // remove the old scheduled position, then reinsert it Long oldTime = (Long)_eventTimes.get(event); if (oldTime != null) { - if (oldTime.longValue() < eventTime) { - _events.notifyAll(); - return; // already scheduled for sooner than requested + if (useEarliestTime) { + if (oldTime.longValue() < eventTime) { + _events.notifyAll(); + return; // already scheduled for sooner than requested + } else { + _events.remove(oldTime); + } } else { - _events.remove(oldTime); + if (oldTime.longValue() > eventTime) { + _events.notifyAll(); + return; // already scheduled for later than the given period + } else { + _events.remove(oldTime); + } } } while (_events.containsKey(time)) diff --git a/router/java/src/net/i2p/data/i2np/DataMessage.java b/router/java/src/net/i2p/data/i2np/DataMessage.java index 9162e41a30..124606cabd 100644 --- a/router/java/src/net/i2p/data/i2np/DataMessage.java +++ b/router/java/src/net/i2p/data/i2np/DataMessage.java @@ -30,10 +30,19 @@ public class DataMessage extends I2NPMessageImpl { _data = null; } - public byte[] getData() { return _data; } - public void setData(byte data[]) { _data = data; } + public byte[] getData() { + verifyUnwritten(); + return _data; + } + public void setData(byte[] data) { + verifyUnwritten(); + _data = data; + } - public int getSize() { return _data.length; } + public int getSize() { + verifyUnwritten(); + return _data.length; + } public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException { if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message"); @@ -55,6 +64,7 @@ public class DataMessage extends I2NPMessageImpl { } /** write the message body to the output array, starting at the given index */ protected int writeMessageBody(byte out[], int curIndex) { + verifyUnwritten(); if (_data == null) { out[curIndex++] = 0x0; out[curIndex++] = 0x0; @@ -70,6 +80,11 @@ public class DataMessage extends I2NPMessageImpl { return curIndex; } + protected void written() { + super.written(); + _data = null; + } + public int getType() { return MESSAGE_TYPE; } public int hashCode() { diff --git a/router/java/src/net/i2p/data/i2np/GarlicMessage.java b/router/java/src/net/i2p/data/i2np/GarlicMessage.java index f0bc92c45d..b27a20dfe9 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicMessage.java +++ b/router/java/src/net/i2p/data/i2np/GarlicMessage.java @@ -28,8 +28,14 @@ public class GarlicMessage extends I2NPMessageImpl { setData(null); } - public byte[] getData() { return _data; } - public void setData(byte[] data) { _data = data; } + public byte[] getData() { + verifyUnwritten(); + return _data; + } + public void setData(byte[] data) { + verifyUnwritten(); + _data = data; + } public void readMessage(byte data[], int offset, int dataSize, int type) throws I2NPMessageException, IOException { if (type != MESSAGE_TYPE) throw new I2NPMessageException("Message type is incorrect for this message"); @@ -43,11 +49,13 @@ public class GarlicMessage extends I2NPMessageImpl { } /** calculate the message body's length (not including the header and footer */ - protected int calculateWrittenLength() { + protected int calculateWrittenLength() { + verifyUnwritten(); return 4 + _data.length; } /** write the message body to the output array, starting at the given index */ protected int writeMessageBody(byte out[], int curIndex) throws I2NPMessageException { + verifyUnwritten(); byte len[] = DataHelper.toLong(4, _data.length); System.arraycopy(len, 0, out, curIndex, 4); curIndex += 4; @@ -62,6 +70,11 @@ public class GarlicMessage extends I2NPMessageImpl { return DataHelper.hashCode(getData()); } + protected void written() { + super.written(); + _data = null; + } + public boolean equals(Object object) { if ( (object != null) && (object instanceof GarlicMessage) ) { GarlicMessage msg = (GarlicMessage)object; diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index 5495004811..f7c3888249 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -32,7 +32,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM protected I2PAppContext _context; private long _expiration; private long _uniqueId; - private byte _data[]; + private boolean _written; public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH; @@ -53,6 +53,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM _log = context.logManager().getLog(I2NPMessageImpl.class); _expiration = _context.clock().now() + DEFAULT_EXPIRATION_MS; _uniqueId = _context.random().nextLong(MAX_ID_VALUE); + _written = false; //_context.statManager().createRateStat("i2np.writeTime", "How long it takes to write an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); //_context.statManager().createRateStat("i2np.readTime", "How long it takes to read an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); } @@ -264,6 +265,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM public int toRawByteArray(byte buffer[]) { + verifyUnwritten(); if (RAW_FULL_SIZE) return toByteArray(buffer); try { @@ -277,6 +279,8 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM _context.logManager().getLog(getClass()).log(Log.CRIT, "Error writing", ime); throw new IllegalStateException("Unable to serialize the message (" + getClass().getName() + "): " + ime.getMessage()); + } finally { + written(); } } @@ -316,6 +320,8 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM } } + protected void verifyUnwritten() { if (_written) throw new RuntimeException("Already written"); } + protected void written() { _written = true; } /** * Yes, this is fairly ugly, but its the only place it ever happens. diff --git a/router/java/src/net/i2p/router/JobTiming.java b/router/java/src/net/i2p/router/JobTiming.java index 52b4885ec0..366587eaa4 100644 --- a/router/java/src/net/i2p/router/JobTiming.java +++ b/router/java/src/net/i2p/router/JobTiming.java @@ -66,7 +66,7 @@ public class JobTiming implements Clock.ClockUpdateListener { */ public void end() { _actualEnd = _context.clock().now(); - _context.clock().removeUpdateListener(this); + //_context.clock().removeUpdateListener(this); } public void offsetChanged(long delta) { diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 0e5cf48630..86df1eab48 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.349 $ $Date: 2006/02/18 22:22:32 $"; + public final static String ID = "$Revision: 1.350 $ $Date: 2006/02/19 07:29:59 $"; public final static String VERSION = "0.6.1.10"; - public final static long BUILD = 5; + public final static long BUILD = 6; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 6178f7ed7e..9702f0ca0c 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -21,6 +21,7 @@ import java.util.Set; import net.i2p.data.Destination; import net.i2p.data.LeaseSet; import net.i2p.data.Payload; +import net.i2p.data.Hash; import net.i2p.data.i2cp.DisconnectMessage; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; @@ -71,6 +72,7 @@ public class ClientConnectionRunner { */ private List _alreadyProcessed; private ClientWriterRunner _writer; + private Hash _destHashCache; /** are we, uh, dead */ private boolean _dead; @@ -144,6 +146,7 @@ public class ClientConnectionRunner { /** currently allocated leaseSet */ public LeaseSet getLeaseSet() { return _currentLeaseSet; } void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; } + public Hash getDestHash() { return _destHashCache; } /** current client's sessionId */ SessionId getSessionId() { return _sessionId; } @@ -206,8 +209,9 @@ public class ClientConnectionRunner { } void sessionEstablished(SessionConfig config) { + _destHashCache = config.getDestination().calculateHash(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("SessionEstablished called for destination " + config.getDestination().calculateHash().toBase64()); + _log.debug("SessionEstablished called for destination " + _destHashCache.toBase64()); _config = config; _manager.destinationEstablished(this); } diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 9cd6e0c58f..c9acbe430a 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -251,21 +251,11 @@ public class ClientManager { } public boolean isLocal(Hash destHash) { if (destHash == null) return false; - Set dests = new HashSet(); - long beforeLock = _ctx.clock().now(); - long inLock = 0; synchronized (_runners) { - inLock = _ctx.clock().now(); - dests.addAll(_runners.keySet()); - } - long afterLock = _ctx.clock().now(); - if (afterLock - beforeLock > 50) { - _log.warn("isLocal(Hash).locking took too long: " + (afterLock-beforeLock) - + " overall, synchronized took " + (inLock - beforeLock)); - } - for (Iterator iter = dests.iterator(); iter.hasNext();) { - Destination d = (Destination)iter.next(); - if (d.calculateHash().equals(destHash)) return true; + for (Iterator iter = _runners.values().iterator(); iter.hasNext(); ) { + ClientConnectionRunner cur = (ClientConnectionRunner)iter.next(); + if (destHash.equals(cur.getDestHash())) return true; + } } return false; } @@ -324,23 +314,12 @@ public class ClientManager { private ClientConnectionRunner getRunner(Hash destHash) { if (destHash == null) return null; - Set dests = new HashSet(); - long beforeLock = _ctx.clock().now(); - long inLock = 0; synchronized (_runners) { - inLock = _ctx.clock().now(); - dests.addAll(_runners.keySet()); - } - long afterLock = _ctx.clock().now(); - if (afterLock - beforeLock > 50) { - _log.warn("getRunner(Hash).locking took too long: " + (afterLock-beforeLock) - + " overall, synchronized took " + (inLock - beforeLock)); - } - - for (Iterator iter = dests.iterator(); iter.hasNext(); ) { - Destination d = (Destination)iter.next(); - if (d.calculateHash().equals(destHash)) - return getRunner(d); + for (Iterator iter = _runners.values().iterator(); iter.hasNext(); ) { + ClientConnectionRunner cur = (ClientConnectionRunner)iter.next(); + if (cur.getDestHash().equals(destHash)) + return cur; + } } return null; } diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java index f635501e72..aa23dfff5f 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/HarvesterJob.java @@ -13,6 +13,7 @@ import net.i2p.data.i2np.DatabaseLookupMessage; import net.i2p.util.Log; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; +import net.i2p.router.TunnelInfo; import net.i2p.router.message.SendMessageDirectJob; /** @@ -41,6 +42,10 @@ class HarvesterJob extends JobImpl { private static final int PRIORITY = 100; private static final String PROP_ENABLED = "netDb.shouldHarvest"; + + private boolean harvestDirectly() { + return Boolean.valueOf(getContext().getProperty("netDb.harvestDirectly", "false")).booleanValue(); + } public HarvesterJob(RouterContext context, KademliaNetworkDatabaseFacade facade) { super(context); @@ -107,13 +112,28 @@ class HarvesterJob extends JobImpl { */ private void harvest(Hash peer) { long now = getContext().clock().now(); - DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true); - msg.setFrom(getContext().routerHash()); - msg.setMessageExpiration(10*1000+now); - msg.setSearchKey(peer); - msg.setReplyTunnel(null); - SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY); - job.runJob(); - //getContext().jobQueue().addJob(job); + if (harvestDirectly()) { + DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true); + msg.setFrom(getContext().routerHash()); + msg.setMessageExpiration(10*1000+now); + msg.setSearchKey(peer); + msg.setReplyTunnel(null); + SendMessageDirectJob job = new SendMessageDirectJob(getContext(), msg, peer, 10*1000, PRIORITY); + job.runJob(); + //getContext().jobQueue().addJob(job); + } else { + TunnelInfo replyTunnel = getContext().tunnelManager().selectInboundTunnel(); + TunnelInfo sendTunnel = getContext().tunnelManager().selectOutboundTunnel(); + if ( (replyTunnel != null) && (sendTunnel != null) ) { + DatabaseLookupMessage msg = new DatabaseLookupMessage(getContext(), true); + msg.setFrom(replyTunnel.getPeer(0)); + msg.setMessageExpiration(10*1000+now); + msg.setSearchKey(peer); + msg.setReplyTunnel(replyTunnel.getReceiveTunnelId(0)); + // we don't even bother to register a reply selector, because we don't really care. + // just send it out, and if we get a reply, neat. if not, oh well + getContext().tunnelDispatcher().dispatchOutbound(msg, sendTunnel.getSendTunnelId(0), peer); + } + } } } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index 4f5fa7235b..802093afc5 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -119,6 +119,8 @@ public class FIFOBandwidthLimiter { _refiller.reinitialize(); } + public Request createRequest() { return new SimpleRequest(); } + /** * Request some bytes, blocking until they become available * @@ -130,15 +132,21 @@ public class FIFOBandwidthLimiter { } SimpleRequest req = new SimpleRequest(bytesIn, 0, purpose); + requestInbound(req, bytesIn, purpose); + return req; + } + public void requestInbound(Request req, int bytesIn, String purpose) { + req.init(bytesIn, 0, purpose); + if (false) { ((SimpleRequest)req).allocateAll(); return; } int pending = 0; synchronized (_pendingInboundRequests) { pending = _pendingInboundRequests.size(); _pendingInboundRequests.add(req); } - satisfyInboundRequests(); + satisfyInboundRequests(((SimpleRequest)req).satisfiedBuffer); + ((SimpleRequest)req).satisfiedBuffer.clear(); if (pending > 0) _context.statManager().addRateData("bwLimiter.pendingInboundRequests", pending, pending); - return req; } /** * Request some bytes, blocking until they become available @@ -151,15 +159,21 @@ public class FIFOBandwidthLimiter { } SimpleRequest req = new SimpleRequest(0, bytesOut, purpose); + requestOutbound(req, bytesOut, purpose); + return req; + } + public void requestOutbound(Request req, int bytesOut, String purpose) { + req.init(0, bytesOut, purpose); + if (false) { ((SimpleRequest)req).allocateAll(); return; } int pending = 0; synchronized (_pendingOutboundRequests) { pending = _pendingOutboundRequests.size(); _pendingOutboundRequests.add(req); } - satisfyOutboundRequests(); + satisfyOutboundRequests(((SimpleRequest)req).satisfiedBuffer); + ((SimpleRequest)req).satisfiedBuffer.clear(); if (pending > 0) _context.statManager().addRateData("bwLimiter.pendingOutboundRequests", pending, pending); - return req; } void setInboundBurstKBps(int kbytesPerSecond) { @@ -189,7 +203,7 @@ public class FIFOBandwidthLimiter { * @param maxBurstIn allow up to this many bytes in from the burst section for this time period (may be negative) * @param maxBurstOut allow up to this many bytes in from the burst section for this time period (may be negative) */ - final void refillBandwidthQueues(long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) { + final void refillBandwidthQueues(List buf, long bytesInbound, long bytesOutbound, long maxBurstIn, long maxBurstOut) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Refilling the queues with " + bytesInbound + "/" + bytesOutbound + ": " + getStatus().toString()); _availableInbound += bytesInbound; @@ -251,7 +265,7 @@ public class FIFOBandwidthLimiter { } } - satisfyRequests(); + satisfyRequests(buf); updateStats(); } @@ -292,19 +306,20 @@ public class FIFOBandwidthLimiter { * Go through the queue, satisfying as many requests as possible (notifying * each one satisfied that the request has been granted). */ - private final void satisfyRequests() { - satisfyInboundRequests(); - satisfyOutboundRequests(); + private final void satisfyRequests(List buffer) { + buffer.clear(); + satisfyInboundRequests(buffer); + buffer.clear(); + satisfyOutboundRequests(buffer); } - private final void satisfyInboundRequests() { - List satisfied = null; + private final void satisfyInboundRequests(List satisfied) { synchronized (_pendingInboundRequests) { if (_inboundUnlimited) { - satisfied = locked_satisfyInboundUnlimited(); + locked_satisfyInboundUnlimited(satisfied); } else { if (_availableInbound > 0) { - satisfied = locked_satisfyInboundAvailable(); + locked_satisfyInboundAvailable(satisfied); } else { // no bandwidth available if (_log.shouldLog(Log.DEBUG)) @@ -317,8 +332,8 @@ public class FIFOBandwidthLimiter { if (satisfied != null) { for (int i = 0; i < satisfied.size(); i++) { - SimpleRequest req = (SimpleRequest)satisfied.get(i); - req.notifyAllocation(); + SimpleRequest creq = (SimpleRequest)satisfied.get(i); + creq.notifyAllocation(); } } } @@ -353,16 +368,12 @@ public class FIFOBandwidthLimiter { * There are no limits, so just give every inbound request whatever they want * */ - private final List locked_satisfyInboundUnlimited() { - List satisfied = null; - + private final void locked_satisfyInboundUnlimited(List satisfied) { while (_pendingInboundRequests.size() > 0) { SimpleRequest req = (SimpleRequest)_pendingInboundRequests.remove(0); int allocated = req.getPendingInboundRequested(); _totalAllocatedInboundBytes += allocated; req.allocateBytes(allocated, 0); - if (satisfied == null) - satisfied = new ArrayList(2); satisfied.add(req); long waited = now() - req.getRequestTime(); if (_log.shouldLog(Log.DEBUG)) @@ -373,7 +384,6 @@ public class FIFOBandwidthLimiter { if (waited > 10) _context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited); } - return satisfied; } /** @@ -383,9 +393,7 @@ public class FIFOBandwidthLimiter { * * @return list of requests that were completely satisfied */ - private final List locked_satisfyInboundAvailable() { - List satisfied = null; - + private final void locked_satisfyInboundAvailable(List satisfied) { for (int i = 0; i < _pendingInboundRequests.size(); i++) { if (_availableInbound <= 0) break; SimpleRequest req = (SimpleRequest)_pendingInboundRequests.get(i); @@ -418,8 +426,6 @@ public class FIFOBandwidthLimiter { _availableInbound -= allocated; _totalAllocatedInboundBytes += allocated; req.allocateBytes(allocated, 0); - if (satisfied == null) - satisfied = new ArrayList(2); satisfied.add(req); if (req.getPendingInboundRequested() > 0) { if (_log.shouldLog(Log.DEBUG)) @@ -443,17 +449,15 @@ public class FIFOBandwidthLimiter { _context.statManager().addRateData("bwLimiter.inboundDelayedTime", waited, waited); } } - return satisfied; } - private final void satisfyOutboundRequests() { - List satisfied = null; + private final void satisfyOutboundRequests(List satisfied) { synchronized (_pendingOutboundRequests) { if (_outboundUnlimited) { - satisfied = locked_satisfyOutboundUnlimited(); + locked_satisfyOutboundUnlimited(satisfied); } else { if (_availableOutbound > 0) { - satisfied = locked_satisfyOutboundAvailable(); + locked_satisfyOutboundAvailable(satisfied); } else { // no bandwidth available if (_log.shouldLog(Log.DEBUG)) @@ -466,8 +470,8 @@ public class FIFOBandwidthLimiter { if (satisfied != null) { for (int i = 0; i < satisfied.size(); i++) { - SimpleRequest req = (SimpleRequest)satisfied.get(i); - req.notifyAllocation(); + SimpleRequest creq = (SimpleRequest)satisfied.get(i); + creq.notifyAllocation(); } } } @@ -476,16 +480,12 @@ public class FIFOBandwidthLimiter { * There are no limits, so just give every outbound request whatever they want * */ - private final List locked_satisfyOutboundUnlimited() { - List satisfied = null; - + private final void locked_satisfyOutboundUnlimited(List satisfied) { while (_pendingOutboundRequests.size() > 0) { SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.remove(0); int allocated = req.getPendingOutboundRequested(); _totalAllocatedOutboundBytes += allocated; req.allocateBytes(0, allocated); - if (satisfied == null) - satisfied = new ArrayList(2); satisfied.add(req); long waited = now() - req.getRequestTime(); if (_log.shouldLog(Log.DEBUG)) @@ -497,7 +497,6 @@ public class FIFOBandwidthLimiter { if (waited > 10) _context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited); } - return satisfied; } /** @@ -507,9 +506,7 @@ public class FIFOBandwidthLimiter { * * @return list of requests that were completely satisfied */ - private final List locked_satisfyOutboundAvailable() { - List satisfied = null; - + private final void locked_satisfyOutboundAvailable(List satisfied) { for (int i = 0; i < _pendingOutboundRequests.size(); i++) { if (_availableOutbound <= 0) break; SimpleRequest req = (SimpleRequest)_pendingOutboundRequests.get(i); @@ -542,8 +539,6 @@ public class FIFOBandwidthLimiter { _availableOutbound -= allocated; _totalAllocatedOutboundBytes += allocated; req.allocateBytes(0, allocated); - if (satisfied == null) - satisfied = new ArrayList(2); satisfied.add(req); if (req.getPendingOutboundRequested() > 0) { if (_log.shouldLog(Log.DEBUG)) @@ -567,7 +562,6 @@ public class FIFOBandwidthLimiter { _context.statManager().addRateData("bwLimiter.outboundDelayedTime", waited, waited); } } - return satisfied; } public void renderStatusHTML(Writer out) throws IOException { @@ -613,14 +607,24 @@ public class FIFOBandwidthLimiter { private String _target; private int _allocationsSinceWait; private boolean _aborted; + List satisfiedBuffer; + public SimpleRequest() { + satisfiedBuffer = new ArrayList(1); + init(0, 0, null); + } public SimpleRequest(int in, int out, String target) { + satisfiedBuffer = new ArrayList(1); + init(in, out, target); + } + public void init(int in, int out, String target) { _inTotal = in; _outTotal = out; _inAllocated = 0; _outAllocated = 0; _aborted = false; _target = target; + satisfiedBuffer.clear(); _requestId = ++__requestId; _requestTime = now(); } @@ -646,6 +650,13 @@ public class FIFOBandwidthLimiter { } catch (InterruptedException ie) {} } int getAllocationsSinceWait() { return _allocationsSinceWait; } + void allocateAll() { + _inAllocated = _inTotal; + _outAllocated = _outTotal; + _outAllocated = _outTotal; + _allocationsSinceWait++; + notifyAllocation(); + } void allocateBytes(int in, int out) { _inAllocated += in; _outAllocated += out; @@ -677,7 +688,8 @@ public class FIFOBandwidthLimiter { public void abort(); /** was this request aborted? */ public boolean getAborted(); - + /** thar be dragons */ + public void init(int in, int out, String target); } private static final NoopRequest _noop = new NoopRequest(); @@ -691,5 +703,6 @@ public class FIFOBandwidthLimiter { public int getTotalInboundRequested() { return 0; } public int getTotalOutboundRequested() { return 0; } public void waitForNextAllocation() {} + public void init(int in, int out, String target) {} } } diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java index 3af79a8e0a..903220bf34 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java @@ -1,5 +1,6 @@ package net.i2p.router.transport; +import java.util.*; import net.i2p.I2PAppContext; import net.i2p.util.Log; @@ -62,6 +63,7 @@ class FIFOBandwidthRefiller implements Runnable { public void run() { // bootstrap 'em with nothing _lastRefillTime = _limiter.now(); + List buffer = new ArrayList(2); while (true) { long now = _limiter.now(); if (now >= _lastCheckConfigTime + _configCheckPeriodMs) { @@ -70,7 +72,7 @@ class FIFOBandwidthRefiller implements Runnable { _lastCheckConfigTime = now; } - boolean updated = updateQueues(now); + boolean updated = updateQueues(buffer, now); if (updated) { _lastRefillTime = now; } @@ -85,7 +87,7 @@ class FIFOBandwidthRefiller implements Runnable { _lastCheckConfigTime = _lastRefillTime; } - private boolean updateQueues(long now) { + private boolean updateQueues(List buffer, long now) { long numMs = (now - _lastRefillTime); if (_log.shouldLog(Log.INFO)) _log.info("Updating bandwidth after " + numMs + " (status: " + _limiter.getStatus().toString() @@ -114,7 +116,7 @@ class FIFOBandwidthRefiller implements Runnable { long maxBurstIn = ((_inboundBurstKBytesPerSecond-_inboundKBytesPerSecond)*1024*numMs)/1000; long maxBurstOut = ((_outboundBurstKBytesPerSecond-_outboundKBytesPerSecond)*1024*numMs)/1000; - _limiter.refillBandwidthQueues(inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut); + _limiter.refillBandwidthQueues(buffer, inboundToAdd, outboundToAdd, maxBurstIn, maxBurstOut); if (_log.shouldLog(Log.DEBUG)) { _log.debug("Adding " + inboundToAdd + " bytes to inboundAvailable"); @@ -331,4 +333,4 @@ class FIFOBandwidthRefiller implements Runnable { int getOutboundKBytesPerSecond() { return _outboundKBytesPerSecond; } int getInboundKBytesPerSecond() { return _inboundKBytesPerSecond; } -} \ No newline at end of file +} diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index 292a12c479..b59a415e4e 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -140,12 +140,12 @@ public abstract class TransportImpl implements Transport { long lifetime = msg.getLifetime(); if (lifetime > 3000) { int level = Log.WARN; - //if (!sendSuccessful) - // level = Log.INFO; + if (!sendSuccessful) + level = Log.INFO; if (_log.shouldLog(level)) - _log.log(level, "afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + _log.log(level, "afterSend slow (" + lifetime + "): [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " + msg.getMessageType() + " " + msg.getMessageId() + " from " + _context.routerHash().toBase64().substring(0,6) - + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + "\n" + msg.toString()); + + " to " + msg.getTarget().getIdentity().calculateHash().toBase64().substring(0,6) + ": " + msg.toString()); } else { if (_log.shouldLog(Log.INFO)) _log.info("afterSend: [success=" + sendSuccessful + "]" + msg.getMessageSize() + "byte " diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 6c9acb3695..ecdce78a41 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -125,8 +125,6 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource boolean fragmentOK = false; boolean partialACK = false; - // perhaps compact the synchronized block further by synchronizing on the - // particular state once its found? synchronized (messages) { state = (InboundMessageState)messages.get(messageId); if (state == null) { @@ -145,36 +143,36 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource } else { partialACK = true; } + } - if (messageComplete) { - _recentlyCompletedMessages.add(mid); - _messageReceiver.receiveMessage(state); - - from.messageFullyReceived(messageId, state.getCompleteSize()); - _ackSender.ackPeer(from); + if (messageComplete) { + _recentlyCompletedMessages.add(mid); + _messageReceiver.receiveMessage(state); - if (_log.shouldLog(Log.INFO)) - _log.info("Message received completely! " + state); + from.messageFullyReceived(messageId, state.getCompleteSize()); + _ackSender.ackPeer(from); - _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); - if (state.getFragmentCount() > 0) - _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); - } else if (messageExpired) { - state.releaseResources(); - if (_log.shouldLog(Log.WARN)) - _log.warn("Message expired while only being partially read: " + state); - _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString()); - } else if (partialACK) { - // not expired but not yet complete... lets queue up a partial ACK - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Queueing up a partial ACK for peer: " + from + " for " + state); - from.messagePartiallyReceived(); - _ackSender.ackPeer(from); - } + if (_log.shouldLog(Log.INFO)) + _log.info("Message received completely! " + state); - if (!fragmentOK) - break; + _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); + if (state.getFragmentCount() > 0) + _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); + } else if (messageExpired) { + state.releaseResources(); + if (_log.shouldLog(Log.WARN)) + _log.warn("Message expired while only being partially read: " + state); + _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired hile partially read: " + state.toString()); + } else if (partialACK) { + // not expired but not yet complete... lets queue up a partial ACK + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Queueing up a partial ACK for peer: " + from + " for " + state); + from.messagePartiallyReceived(); + _ackSender.ackPeer(from); } + + if (!fragmentOK) + break; } return fragments; } @@ -183,16 +181,17 @@ public class InboundMessageFragments /*implements UDPTransport.PartialACKSource int rv = 0; if (data.readACKsIncluded()) { int fragments = 0; - long acks[] = data.readACKs(); - if (acks != null) { - rv += acks.length; - _context.statManager().addRateData("udp.receivedACKs", acks.length, 0); + int ackCount = data.readACKCount(); + if (ackCount > 0) { + rv += ackCount; + _context.statManager().addRateData("udp.receivedACKs", ackCount, 0); //_context.statManager().getStatLog().addData(from.getRemoteHostId().toString(), "udp.peer.receiveACKCount", acks.length, 0); - for (int i = 0; i < acks.length; i++) { + for (int i = 0; i < ackCount; i++) { + long id = data.readACK(i); if (_log.shouldLog(Log.INFO)) - _log.info("Full ACK of message " + acks[i] + " received!"); - fragments += _outbound.acked(acks[i], from.getRemotePeer()); + _log.info("Full ACK of message " + id + " received!"); + fragments += _outbound.acked(id, from.getRemotePeer()); } } else { _log.error("Received ACKs with no acks?! " + data); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 7f49532150..2e80276be0 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -202,14 +202,16 @@ public class OutboundMessageFragments { */ private void finishMessages() { int rv = 0; - List peers = new ArrayList(); + List peers = null; synchronized (_activePeers) { - peers = new ArrayList(_activePeers); + peers = new ArrayList(_activePeers.size()); for (int i = 0; i < _activePeers.size(); i++) { PeerState state = (PeerState)_activePeers.get(i); if (state.getOutboundMessageCount() <= 0) { _activePeers.remove(i); i--; + } else { + peers.add(state); } } _activePeers.notifyAll(); @@ -297,11 +299,13 @@ public class OutboundMessageFragments { for (int i = 0; packets != null && i < packets.length ; i++) if (packets[i] != null) valid++; + /* state.getMessage().timestamp("sending a volley of " + valid + " lastReceived: " + (_context.clock().now() - peer.getLastReceiveTime()) + " lastSentFully: " + (_context.clock().now() - peer.getLastSendFullyTime())); + */ } return packets; } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index c48f680b4e..21a82f229c 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -56,7 +56,7 @@ public class PacketBuilder { * included, it should be removed from the list. */ public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer, List ackIdsRemaining, List partialACKsRemaining) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); StringBuffer msg = null; boolean acksIncluded = false; @@ -156,8 +156,10 @@ public class PacketBuilder { off++; int size = state.fragmentSize(fragment); - if (size < 0) + if (size < 0) { + packet.release(); return null; + } DataHelper.toLong(data, off, 2, size); data[off] &= (byte)0x3F; // 2 highest bits are reserved off += 2; @@ -166,12 +168,16 @@ public class PacketBuilder { if (sizeWritten != size) { _log.error("Size written: " + sizeWritten + " but size: " + size + " for fragment " + fragment + " of " + state.getMessageId()); + packet.release(); return null; } else if (_log.shouldLog(Log.DEBUG)) _log.debug("Size written: " + sizeWritten + " for fragment " + fragment + " of " + state.getMessageId()); size = sizeWritten; - if (size < 0) return null; + if (size < 0) { + packet.release(); + return null; + } off += size; // we can pad here if we want, maybe randomized? @@ -202,7 +208,7 @@ public class PacketBuilder { * @param ackBitfields list of ACKBitfield instances to either fully or partially ACK */ public UDPPacket buildACK(PeerState peer, List ackBitfields) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); StringBuffer msg = null; if (_log.shouldLog(Log.DEBUG)) { @@ -308,7 +314,7 @@ public class PacketBuilder { * @return ready to send packet, or null if there was a problem */ public UDPPacket buildSessionCreatedPacket(InboundEstablishState state, int externalPort, SessionKey ourIntroKey) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); InetAddress to = null; try { @@ -316,6 +322,7 @@ public class PacketBuilder { } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.ERROR)) _log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString()); + packet.release(); return null; } @@ -337,6 +344,7 @@ public class PacketBuilder { if (_log.shouldLog(Log.ERROR)) _log.error("How did our sent IP become invalid? " + state); state.fail(); + packet.release(); return null; } // now for the body @@ -408,9 +416,10 @@ public class PacketBuilder { * @return ready to send packet, or null if there was a problem */ public UDPPacket buildSessionRequestPacket(OutboundEstablishState state) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte toIP[] = state.getSentIP(); if ( (_transport !=null) && (!_transport.isValid(toIP)) ) { + packet.release(); return null; } InetAddress to = null; @@ -419,6 +428,7 @@ public class PacketBuilder { } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.ERROR)) _log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString()); + packet.release(); return null; } @@ -488,13 +498,14 @@ public class PacketBuilder { * @return ready to send packets, or null if there was a problem */ public UDPPacket buildSessionConfirmedPacket(OutboundEstablishState state, int fragmentNum, int numFragments, byte identity[]) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); InetAddress to = null; try { to = InetAddress.getByAddress(state.getSentIP()); } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.ERROR)) _log.error("How did we think this was a valid IP? " + state.getRemoteHostId().toString()); + packet.release(); return null; } @@ -578,7 +589,7 @@ public class PacketBuilder { return buildPeerTestFromAlice(toIP, toPort, toIntroKey, toIntroKey, nonce, aliceIntroKey); } public UDPPacket buildPeerTestFromAlice(InetAddress toIP, int toPort, SessionKey toCipherKey, SessionKey toMACKey, long nonce, SessionKey aliceIntroKey) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -619,7 +630,7 @@ public class PacketBuilder { * @return ready to send packet, or null if there was a problem */ public UDPPacket buildPeerTestToAlice(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, SessionKey charlieIntroKey, long nonce) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -665,7 +676,7 @@ public class PacketBuilder { public UDPPacket buildPeerTestToCharlie(InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce, InetAddress charlieIP, int charliePort, SessionKey charlieCipherKey, SessionKey charlieMACKey) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -709,7 +720,7 @@ public class PacketBuilder { * @return ready to send packet, or null if there was a problem */ public UDPPacket buildPeerTestToBob(InetAddress bobIP, int bobPort, InetAddress aliceIP, int alicePort, SessionKey aliceIntroKey, long nonce, SessionKey bobCipherKey, SessionKey bobMACKey) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -783,7 +794,7 @@ public class PacketBuilder { } public UDPPacket buildRelayRequest(InetAddress introHost, int introPort, byte introKey[], long introTag, SessionKey ourIntroKey, long introNonce, boolean encrypt) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -851,7 +862,7 @@ public class PacketBuilder { private static final byte PEER_RELAY_INTRO_FLAG_BYTE = (UDPPacket.PAYLOAD_TYPE_RELAY_INTRO << 4); public UDPPacket buildRelayIntro(RemoteHostId alice, PeerState charlie, UDPPacketReader.RelayRequestReader request) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -907,7 +918,7 @@ public class PacketBuilder { return null; } - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -954,7 +965,7 @@ public class PacketBuilder { } public UDPPacket buildHolePunch(UDPPacketReader reader) { - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, false); byte data[] = packet.getPacket().getData(); Arrays.fill(data, 0, data.length, (byte)0x0); int off = UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE; @@ -970,6 +981,7 @@ public class PacketBuilder { } catch (UnknownHostException uhe) { if (_log.shouldLog(Log.WARN)) _log.warn("IP for alice to hole punch to is invalid", uhe); + packet.release(); return null; } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index ab75feefc3..1b5c321246 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -167,10 +167,10 @@ public class PeerState { private long _packetsReceivedDuplicate; private long _packetsReceived; - /** Message (Long) to InboundMessageState for active message */ + /** list of InboundMessageState for active message */ private Map _inboundMessages; - /** Message (Long) to OutboundMessageState */ - private Map _outboundMessages; + /** list of OutboundMessageState */ + private List _outboundMessages; /** which outbound message is currently being retransmitted */ private OutboundMessageState _retransmitter; @@ -262,7 +262,7 @@ public class PeerState { _packetsReceived = 0; _packetsReceivedDuplicate = 0; _inboundMessages = new HashMap(8); - _outboundMessages = new HashMap(8); + _outboundMessages = new ArrayList(32); _dead = false; _context.statManager().createRateStat("udp.congestionOccurred", "How large the cwin was when congestion occurred (duration == sendBps)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.congestedRTO", "retransmission timeout after congestion (duration == rtt dev)", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -603,7 +603,8 @@ public class PeerState { int rv = 0; synchronized (_inboundMessages) { - for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) { + int remaining = _inboundMessages.size(); + for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) { InboundMessageState state = (InboundMessageState)iter.next(); if (state.isExpired()) { iter.remove(); @@ -687,7 +688,7 @@ public class PeerState { List rv = null; int bytesRemaining = countMaxACKData(); synchronized (_currentACKs) { - rv = new ArrayList(_currentACKs.size()); + rv = new ArrayList(16); //_currentACKs.size()); int oldIndex = _currentACKsResend.size(); while ( (bytesRemaining >= 4) && (_currentACKs.size() > 0) ) { Long val = (Long)_currentACKs.remove(0); @@ -701,7 +702,10 @@ public class PeerState { if (alwaysIncludeRetransmissions || rv.size() > 0) { // now repeat by putting in some old ACKs for (int i = 0; (i < oldIndex) && (bytesRemaining >= 4); i++) { - rv.add(new FullACKBitfield(((Long)_currentACKsResend.get(i)).longValue())); + Long cur = (Long)_currentACKsResend.get(i); + long c = cur.longValue(); + FullACKBitfield bf = new FullACKBitfield(c); + rv.add(bf); bytesRemaining -= 4; } } @@ -749,7 +753,9 @@ public class PeerState { int numMessages = _inboundMessages.size(); if (numMessages <= 0) return; - for (Iterator iter = _inboundMessages.values().iterator(); iter.hasNext(); ) { + // todo: make this a list instead of a map, so we can iterate faster w/out the memory overhead? + int remaining = _inboundMessages.size(); + for (Iterator iter = _inboundMessages.values().iterator(); remaining > 0; remaining--) { InboundMessageState state = (InboundMessageState)iter.next(); if (state.isExpired()) { //if (_context instanceof RouterContext) @@ -974,10 +980,10 @@ public class PeerState { state.setPeer(this); if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding to " + _remotePeer.toBase64() + ": " + state.getMessageId()); - Map msgs = _outboundMessages; + List msgs = _outboundMessages; if (msgs == null) return 0; synchronized (msgs) { - msgs.put(new Long(state.getMessageId()), state); + msgs.add(state); return msgs.size(); } } @@ -985,20 +991,23 @@ public class PeerState { public void dropOutbound() { if (_dead) return; _dead = true; - Map msgs = _outboundMessages; + List msgs = _outboundMessages; //_outboundMessages = null; _retransmitter = null; if (msgs != null) { + List tempList = null; synchronized (msgs) { - for (Iterator iter = msgs.values().iterator(); iter.hasNext();) - _transport.failed((OutboundMessageState)iter.next()); + tempList = new ArrayList(msgs); msgs.clear(); } + int sz = tempList.size(); + for (int i = 0; i < sz; i++) + _transport.failed((OutboundMessageState)tempList.get(i)); } } public int getOutboundMessageCount() { - Map msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return 0; if (msgs != null) { synchronized (msgs) { @@ -1015,29 +1024,35 @@ public class PeerState { */ public int finishMessages() { int rv = 0; - Map msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return 0; List succeeded = null; List failed = null; synchronized (msgs) { - for (Iterator iter = msgs.keySet().iterator(); iter.hasNext(); ) { - Long id = (Long)iter.next(); - OutboundMessageState state = (OutboundMessageState)msgs.get(id); + int size = msgs.size(); + for (int i = 0; i < size; i++) { + OutboundMessageState state = (OutboundMessageState)msgs.get(i); if (state.isComplete()) { - iter.remove(); + msgs.remove(i); + i--; + size--; if (_retransmitter == state) _retransmitter = null; if (succeeded == null) succeeded = new ArrayList(4); succeeded.add(state); } else if (state.isExpired()) { - iter.remove(); + msgs.remove(i); + i--; + size--; if (_retransmitter == state) _retransmitter = null; _context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime()); if (failed == null) failed = new ArrayList(4); failed.add(state); } else if (state.getPushCount() > OutboundMessageFragments.MAX_VOLLEYS) { - iter.remove(); + msgs.remove(i); + i--; + size--; if (state == _retransmitter) _retransmitter = null; _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); @@ -1082,11 +1097,12 @@ public class PeerState { */ public OutboundMessageState allocateSend() { int total = 0; - Map msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return null; synchronized (msgs) { - for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) { - OutboundMessageState state = (OutboundMessageState)iter.next(); + int size = msgs.size(); + for (int i = 0; i < size; i++) { + OutboundMessageState state = (OutboundMessageState)msgs.get(i); if (locked_shouldSend(state)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Allocate sending to " + _remotePeer.toBase64() + ": " + state.getMessageId()); @@ -1099,11 +1115,11 @@ public class PeerState { } */ return state; - } else { + } /* else { OutNetMessage msg = state.getMessage(); if (msg != null) msg.timestamp("passed over for allocation with " + msgs.size() + " peers"); - } + } */ } total = msgs.size(); } @@ -1118,7 +1134,7 @@ public class PeerState { public int getNextDelay() { int rv = -1; long now = _context.clock().now(); - Map msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return -1; synchronized (msgs) { if (_retransmitter != null) { @@ -1128,8 +1144,9 @@ public class PeerState { else return rv; } - for (Iterator iter = msgs.values().iterator(); iter.hasNext(); ) { - OutboundMessageState state = (OutboundMessageState)iter.next(); + int size = msgs.size(); + for (int i = 0; i < size; i++) { + OutboundMessageState state = (OutboundMessageState)msgs.get(i); int delay = (int)(state.getNextSendTime() - now); if (delay <= 0) delay = 1; @@ -1140,7 +1157,6 @@ public class PeerState { return rv; } - /** * If set to true, we should throttle retransmissions of all but the first message in * flight to a peer. If set to false, we will only throttle the initial flight of a @@ -1182,17 +1198,18 @@ public class PeerState { if ( (_retransmitter != null) && (_retransmitter != state) ) { // choke it, since there's already another message retransmitting to this // peer. - _context.statManager().addRateData("udp.blockedRetransmissions", getPacketsRetransmitted(), getPacketsTransmitted()); - if ( (state.getMaxSends() <= 0) && (!THROTTLE_INITIAL_SEND) ) { - if (state.getMessage() != null) - state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley..."); - } else if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) { - if (state.getMessage() != null) - state.getMessage().timestamp("choked, with another message retransmitting"); + _context.statManager().addRateData("udp.blockedRetransmissions", _packetsRetransmitted, _packetsTransmitted); + int max = state.getMaxSends(); + if ( (max <= 0) && (!THROTTLE_INITIAL_SEND) ) { + //if (state.getMessage() != null) + // state.getMessage().timestamp("another message is retransmitting, but we want to send our first volley..."); + } else if ( (max <= 0) || (THROTTLE_RESENDS) ) { + //if (state.getMessage() != null) + // state.getMessage().timestamp("choked, with another message retransmitting"); return false; } else { - if (state.getMessage() != null) - state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); + //if (state.getMessage() != null) + // state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending..."); } } @@ -1218,8 +1235,8 @@ public class PeerState { return true; } else { _context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime()); - if (state.getMessage() != null) - state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); + //if (state.getMessage() != null) + // state.getMessage().timestamp("send rejected, available=" + getSendWindowBytesRemaining()); if (_log.shouldLog(Log.WARN)) _log.warn("Allocation of " + size + " rejected w/ wsize=" + getSendWindowBytes() + " available=" + getSendWindowBytesRemaining() @@ -1229,10 +1246,10 @@ public class PeerState { _log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms"); //_throttle.choke(peer.getRemotePeer()); - if (state.getMessage() != null) - state.getMessage().timestamp("choked, not enough available, wsize=" - + getSendWindowBytes() + " available=" - + getSendWindowBytesRemaining()); + //if (state.getMessage() != null) + // state.getMessage().timestamp("choked, not enough available, wsize=" + // + getSendWindowBytes() + " available=" + // + getSendWindowBytesRemaining()); return false; } } // nextTime <= now @@ -1242,23 +1259,32 @@ public class PeerState { public int acked(long messageId) { OutboundMessageState state = null; - Map msgs = _outboundMessages; + List msgs = _outboundMessages; if (_dead) return 0; synchronized (msgs) { - state = (OutboundMessageState)msgs.remove(new Long(messageId)); + int sz = msgs.size(); + for (int i = 0; i < sz; i++) { + state = (OutboundMessageState)msgs.get(i); + if (state.getMessageId() == messageId) { + msgs.remove(i); + break; + } else { + state = null; + } + } if ( (state != null) && (state == _retransmitter) ) _retransmitter = null; } if (state != null) { int numSends = state.getMaxSends(); - if (state.getMessage() != null) { - state.getMessage().timestamp("acked after " + numSends - + " lastReceived: " - + (_context.clock().now() - getLastReceiveTime()) - + " lastSentFully: " - + (_context.clock().now() - getLastSendFullyTime())); - } + //if (state.getMessage() != null) { + // state.getMessage().timestamp("acked after " + numSends + // + " lastReceived: " + // + (_context.clock().now() - getLastReceiveTime()) + // + " lastSentFully: " + // + (_context.clock().now() - getLastSendFullyTime())); + //} if (_log.shouldLog(Log.INFO)) _log.info("Received ack of " + messageId + " by " + _remotePeer.toBase64() @@ -1294,19 +1320,24 @@ public class PeerState { return; } - Map msgs = _outboundMessages; + List msgs = _outboundMessages; OutboundMessageState state = null; boolean isComplete = false; synchronized (msgs) { - state = (OutboundMessageState)msgs.get(new Long(bitfield.getMessageId())); - if (state != null) { - if (state.acked(bitfield)) { - // this partial ack actually clears it fully - isComplete = true; - msgs.remove(new Long(bitfield.getMessageId())); - if (state == _retransmitter) - _retransmitter = null; + for (int i = 0; i < msgs.size(); i++) { + state = (OutboundMessageState)msgs.get(i); + if (state.getMessageId() == bitfield.getMessageId()) { + boolean complete = state.acked(bitfield); + if (complete) { + isComplete = true; + msgs.remove(i); + if (state == _retransmitter) + _retransmitter = null; + } + break; + } else { + state = null; } } } @@ -1333,8 +1364,8 @@ public class PeerState { _context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime()); if (numSends > 1) _context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount()); - if (state.getMessage() != null) - state.getMessage().timestamp("partial ack to complete after " + numSends); + //if (state.getMessage() != null) + // state.getMessage().timestamp("partial ack to complete after " + numSends); _transport.succeeded(state); // this adjusts the rtt/rto/window/etc @@ -1344,8 +1375,8 @@ public class PeerState { state.releaseResources(); } else { - if (state.getMessage() != null) - state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); + //if (state.getMessage() != null) + // state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString()); } return; } else { @@ -1392,21 +1423,16 @@ public class PeerState { msgs.clear(); OutboundMessageState retransmitter = null; - Map omsgs = oldPeer._outboundMessages; - if (omsgs != null) { - synchronized (omsgs) { - msgs.putAll(omsgs); - omsgs.clear(); - retransmitter = oldPeer._retransmitter; - } + synchronized (oldPeer._outboundMessages) { + tmp.addAll(oldPeer._outboundMessages); + oldPeer._outboundMessages.clear(); + retransmitter = oldPeer._retransmitter; } - omsgs = _outboundMessages; - if (omsgs != null) { - synchronized (omsgs) { - omsgs.putAll(msgs); - _retransmitter = retransmitter; - } + synchronized (_outboundMessages) { + _outboundMessages.addAll(tmp); + _retransmitter = retransmitter; } + tmp.clear(); } public int hashCode() { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java index bf73e1cb6b..ed592212c5 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPEndpointTest.java @@ -108,9 +108,10 @@ public class UDPEndpointTest { curPeer = 0; short priority = 1; long expiration = -1; - UDPPacket packet = UDPPacket.acquire(_context); - try { - packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort()); + UDPPacket packet = UDPPacket.acquire(_context, true); + //try { + if (true) throw new RuntimeException("fixme"); + //packet.initialize(priority, expiration, InetAddress.getLocalHost(), _endpoints[curPeer].getListenPort()); packet.writeData(data, 0, 1024); packet.getPacket().setLength(1024); int outstanding = _sentNotReceived.size() + 1; @@ -118,9 +119,9 @@ public class UDPEndpointTest { _log.debug("Sending packet " + curPacket + " with outstanding " + outstanding); _endpoint.send(packet); //try { Thread.sleep(10); } catch (InterruptedException ie) {} - } catch (UnknownHostException uhe) { - _log.error("foo!", uhe); - } + //} catch (UnknownHostException uhe) { + // _log.error("foo!", uhe); + //} //if (_log.shouldLog(Log.DEBUG)) { // _log.debug("Sent to " + _endpoints[curPeer].getListenPort() + " from " + _endpoint.getListenPort()); //} diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java index aa5df44198..95b756968f 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacket.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacket.java @@ -40,6 +40,7 @@ public class UDPPacket { private long _receivedTime; private long _beforeReceiveFragments; private long _afterHandlingTime; + private boolean _isInbound; private static final List _packetCache; static { @@ -47,7 +48,8 @@ public class UDPPacket { _log = I2PAppContext.getGlobalContext().logManager().getLog(UDPPacket.class); } - private static final boolean CACHE = false; // TODO: support caching to cut churn down a /lot/ + private static final boolean CACHE = true; // TODO: support caching to cut churn down a /lot/ + private static final int CACHE_SIZE = 64; static final int MAX_PACKET_SIZE = 2048; public static final int IV_SIZE = 16; @@ -75,18 +77,31 @@ public class UDPPacket { private static final int MAX_VALIDATE_SIZE = MAX_PACKET_SIZE; private static final ByteCache _validateCache = ByteCache.getInstance(64, MAX_VALIDATE_SIZE); private static final ByteCache _ivCache = ByteCache.getInstance(64, IV_SIZE); - private static final ByteCache _dataCache = ByteCache.getInstance(128, MAX_PACKET_SIZE); + private static final ByteCache _dataCache = ByteCache.getInstance(64, MAX_PACKET_SIZE); - private UDPPacket(I2PAppContext ctx) { + private UDPPacket(I2PAppContext ctx, boolean inbound) { + ctx.statManager().createRateStat("udp.packetsLiveInbound", "Number of live inbound packets in memory", "udp", new long[] { 60*1000, 5*60*1000 }); + ctx.statManager().createRateStat("udp.packetsLiveOutbound", "Number of live outbound packets in memory", "udp", new long[] { 60*1000, 5*60*1000 }); + ctx.statManager().createRateStat("udp.packetsLivePendingRecvInbound", "Number of live inbound packets not yet handled by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 }); + ctx.statManager().createRateStat("udp.packetsLivePendingHandleInbound", "Number of live inbound packets not yet handled fully by the PacketHandler", "udp", new long[] { 60*1000, 5*60*1000 }); + // the data buffer is clobbered on init(..), but we need it to bootstrap + _packet = new DatagramPacket(new byte[MAX_PACKET_SIZE], MAX_PACKET_SIZE); + init(ctx, inbound); + } + private void init(I2PAppContext ctx, boolean inbound) { _context = ctx; _dataBuf = _dataCache.acquire(); _data = _dataBuf.getData(); - _packet = new DatagramPacket(_data, MAX_PACKET_SIZE); + //_packet = new DatagramPacket(_data, MAX_PACKET_SIZE); + _packet.setData(_data); + _isInbound = inbound; _initializeTime = _context.clock().now(); _markedType = -1; _remoteHost = null; + _released = false; } + /* public void initialize(int priority, long expiration, InetAddress host, int port) { _priority = (short)priority; _expiration = expiration; @@ -99,6 +114,7 @@ public class UDPPacket { _released = false; _releasedBy = null; } + */ public void writeData(byte src[], int offset, int len) { verifyNotReleased(); @@ -129,8 +145,12 @@ public class UDPPacket { void setFragmentCount(int count) { _fragmentCount = count; } public RemoteHostId getRemoteHost() { - if (_remoteHost == null) - _remoteHost = new RemoteHostId(_packet.getAddress().getAddress(), _packet.getPort()); + if (_remoteHost == null) { + InetAddress addr = _packet.getAddress(); + byte ip[] = addr.getAddress(); + int port = _packet.getPort(); + _remoteHost = new RemoteHostId(ip, port); + } return _remoteHost; } @@ -234,7 +254,7 @@ public class UDPPacket { return buf.toString(); } - public static UDPPacket acquire(I2PAppContext ctx) { + public static UDPPacket acquire(I2PAppContext ctx, boolean inbound) { UDPPacket rv = null; if (CACHE) { synchronized (_packetCache) { @@ -242,27 +262,12 @@ public class UDPPacket { rv = (UDPPacket)_packetCache.remove(0); } } - /* - if (rv != null) { - rv._context = ctx; - //rv._log = ctx.logManager().getLog(UDPPacket.class); - rv.resetBegin(); - Arrays.fill(rv._data, (byte)0x00); - rv._markedType = -1; - rv._dataBuf.setValid(0); - rv._released = false; - rv._releasedBy = null; - rv._acquiredBy = null; - rv.setPacketDataLength(0); - synchronized (rv._packet) { - //rv._packet.setLength(0); - //rv._packet.setPort(1); - } - } - */ + + if (rv != null) + rv.init(ctx, inbound); } if (rv == null) - rv = new UDPPacket(ctx); + rv = new UDPPacket(ctx, inbound); //if (rv._acquiredBy != null) { // _log.log(Log.CRIT, "Already acquired! current stack trace is:", new Exception()); // _log.log(Log.CRIT, "Earlier acquired:", rv._acquiredBy); @@ -277,15 +282,12 @@ public class UDPPacket { //_releasedBy = new Exception("released by"); //_acquiredBy = null; // - if (!CACHE) { - _dataCache.release(_dataBuf); + _dataCache.release(_dataBuf); + if (!CACHE) return; - } synchronized (_packetCache) { - if (_packetCache.size() <= 64) { + if (_packetCache.size() <= CACHE_SIZE) { _packetCache.add(this); - } else { - _dataCache.release(_dataBuf); } } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index b80383e764..85caf53ad6 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -270,17 +270,17 @@ public class UDPPacketReader { public boolean readExtendedDataIncluded() { return flagSet(UDPPacket.DATA_FLAG_EXTENDED); } - public long[] readACKs() { - if (!readACKsIncluded()) return null; + public int readACKCount() { + if (!readACKsIncluded()) return 0; int off = readBodyOffset() + 1; - int num = (int)DataHelper.fromLong(_message, off, 1); + return (int)DataHelper.fromLong(_message, off, 1); + } + public long readACK(int index) { + if (!readACKsIncluded()) return -1; + int off = readBodyOffset() + 1; + //int num = (int)DataHelper.fromLong(_message, off, 1); off++; - long rv[] = new long[num]; - for (int i = 0; i < num; i++) { - rv[i] = DataHelper.fromLong(_message, off, 4); - off += 4; - } - return rv; + return DataHelper.fromLong(_message, off + (4 * index), 4); } public ACKBitfield[] readACKBitfields() { if (!readACKBitfieldsIncluded()) return null; diff --git a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java index 4cde762c6e..34a660d279 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPReceiver.java @@ -42,6 +42,7 @@ public class UDPReceiver { _transport = transport; _runner = new Runner(); _context.statManager().createRateStat("udp.receivePacketSize", "How large packets received are", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.receiveRemaining", "How many packets are left sitting on the receiver's queue", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInbound", "How many packet are queued up but not yet received when we drop", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.droppedInboundProbabalistically", "How many packet we drop probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.acceptedInboundProbabalistically", "How many packet we accept probabalistically (to simulate failures)", "udp", new long[] { 60*1000, 5*60*1000, 10*60*1000, 60*60*1000 }); @@ -145,6 +146,7 @@ public class UDPReceiver { } // rejected + packet.release(); _context.statManager().addRateData("udp.droppedInbound", queueSize, headPeriod); if (_log.shouldLog(Log.WARN)) { StringBuffer msg = new StringBuffer(); @@ -171,31 +173,36 @@ public class UDPReceiver { * */ public UDPPacket receiveNext() { + UDPPacket rv = null; + int remaining = 0; while (_keepRunning) { synchronized (_inboundQueue) { if (_inboundQueue.size() <= 0) try { _inboundQueue.wait(); } catch (InterruptedException ie) {} if (_inboundQueue.size() > 0) { - UDPPacket rv = (UDPPacket)_inboundQueue.remove(0); - if (_inboundQueue.size() > 0) + rv = (UDPPacket)_inboundQueue.remove(0); + remaining = _inboundQueue.size(); + if (remaining > 0) _inboundQueue.notifyAll(); - return rv; + break; } } } - return null; + _context.statManager().addRateData("udp.receiveRemaining", remaining, 0); + return rv; } private class Runner implements Runnable { private boolean _socketChanged; public void run() { _socketChanged = false; + FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); while (_keepRunning) { if (_socketChanged) { Thread.currentThread().setName(_name + "." + _id); _socketChanged = false; } - UDPPacket packet = UDPPacket.acquire(_context); + UDPPacket packet = UDPPacket.acquire(_context, true); // block before we read... if (_log.shouldLog(Log.DEBUG)) @@ -217,7 +224,9 @@ public class UDPReceiver { // and block after we know how much we read but before // we release the packet to the inbound queue if (size > 0) { - FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); + //FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); + //_context.bandwidthLimiter().requestInbound(req, size, "UDP receiver"); + req = _context.bandwidthLimiter().requestInbound(size, "UDP receiver"); while (req.getPendingInboundRequested() > 0) req.waitForNextAllocation(); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index 5fd02cdfcc..6ae185b6b0 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -178,6 +178,7 @@ public class UDPSender { private class Runner implements Runnable { private boolean _socketChanged; + FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().createRequest(); public void run() { if (_log.shouldLog(Log.DEBUG)) _log.debug("Running the UDP sender"); @@ -196,7 +197,8 @@ public class UDPSender { int size = packet.getPacket().getLength(); int size2 = packet.getPacket().getLength(); if (size > 0) { - FIFOBandwidthLimiter.Request req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); + //_context.bandwidthLimiter().requestOutbound(req, size, "UDP sender"); + req = _context.bandwidthLimiter().requestOutbound(size, "UDP sender"); while (req.getPendingOutboundRequested() > 0) req.waitForNextAllocation(); } @@ -209,7 +211,7 @@ public class UDPSender { //_log.debug("Sending packet: (size="+size + "/"+size2 +")\nraw: " + Base64.encode(packet.getPacket().getData(), 0, size)); } - _context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); + //_context.statManager().addRateData("udp.sendPacketSize." + packet.getMessageType(), size, packet.getFragmentCount()); //packet.getPacket().setLength(size); try { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 3d9fdcf3cd..af87193052 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -961,13 +961,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if ( (msg.getPeer() != null) && ( (msg.getMaxSends() >= OutboundMessageFragments.MAX_VOLLEYS) || (msg.isExpired())) ) { - long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime(); - long sendDelay = _context.clock().now() - msg.getPeer().getLastSendFullyTime(); - if (m != null) - m.timestamp("message failure - volleys = " + msg.getMaxSends() - + " lastReceived: " + recvDelay - + " lastSentFully: " + sendDelay - + " expired? " + msg.isExpired()); + //long recvDelay = _context.clock().now() - msg.getPeer().getLastReceiveTime(); + //long sendDelay = _context.clock().now() - msg.getPeer().getLastSendFullyTime(); + //if (m != null) + // m.timestamp("message failure - volleys = " + msg.getMaxSends() + // + " lastReceived: " + recvDelay + // + " lastSentFully: " + sendDelay + // + " expired? " + msg.isExpired()); consecutive = msg.getPeer().incrementConsecutiveFailedSends(); if (_log.shouldLog(Log.WARN)) _log.warn("Consecutive failure #" + consecutive -- GitLab