forked from I2P_Developers/i2p.i2p
merge of '1045fe48c576267959eae499a22776d9f9acafc3'
and 'bed1572eff55282ffcb5a2b92d02813eb04c0548'
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
*/
|
||||
package net.i2p.i2ptunnel;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@@ -219,6 +220,8 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
|
||||
// shadows _log in super()
|
||||
private final Log _log;
|
||||
|
||||
private static final int BUF_SIZE = 16*1024;
|
||||
|
||||
public CompressedRequestor(Socket webserver, I2PSocket browser, String headers, I2PAppContext ctx, Log log) {
|
||||
_webserver = webserver;
|
||||
_browser = browser;
|
||||
@@ -259,7 +262,7 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
|
||||
// at java.lang.Thread.run(Thread.java:619)
|
||||
// at net.i2p.util.I2PThread.run(I2PThread.java:71)
|
||||
try {
|
||||
serverin = _webserver.getInputStream();
|
||||
serverin = new BufferedInputStream(_webserver.getInputStream(), BUF_SIZE);
|
||||
} catch (NullPointerException npe) {
|
||||
throw new IOException("getInputStream NPE");
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
*/
|
||||
package net.i2p.i2ptunnel;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InterruptedIOException;
|
||||
@@ -17,6 +18,7 @@ import net.i2p.data.ByteArray;
|
||||
import net.i2p.util.ByteCache;
|
||||
import net.i2p.util.Clock;
|
||||
import net.i2p.util.I2PAppThread;
|
||||
import net.i2p.util.InternalSocket;
|
||||
import net.i2p.util.Log;
|
||||
|
||||
public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener {
|
||||
@@ -167,6 +169,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
|
||||
_log.debug("Initial data " + (initialI2PData != null ? initialI2PData.length : 0)
|
||||
+ " written to I2P, " + (initialSocketData != null ? initialSocketData.length : 0)
|
||||
+ " written to the socket, starting forwarders");
|
||||
if (!(s instanceof InternalSocket))
|
||||
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
|
||||
Thread t1 = new StreamForwarder(in, i2pout, true);
|
||||
Thread t2 = new StreamForwarder(i2pin, out, false);
|
||||
synchronized (finishLock) {
|
||||
|
||||
@@ -9,6 +9,7 @@ package net.i2p.client;
|
||||
*
|
||||
*/
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@@ -116,7 +117,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
|
||||
protected volatile boolean _closing;
|
||||
|
||||
/** have we received the current date from the router yet? */
|
||||
private boolean _dateReceived;
|
||||
private volatile boolean _dateReceived;
|
||||
/** lock that we wait upon, that the SetDateMessageHandler notifies */
|
||||
private final Object _dateReceivedLock = new Object();
|
||||
|
||||
@@ -154,6 +155,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
|
||||
}
|
||||
|
||||
public static final int LISTEN_PORT = 7654;
|
||||
|
||||
private static final int BUF_SIZE = 32*1024;
|
||||
|
||||
/** for extension */
|
||||
protected I2PSessionImpl(I2PAppContext context, Properties options) {
|
||||
@@ -353,7 +356,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
|
||||
_out.write(I2PClient.PROTOCOL_BYTE);
|
||||
_out.flush();
|
||||
_writer = new ClientWriterRunner(_out, this);
|
||||
InputStream in = _socket.getInputStream();
|
||||
InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
|
||||
_reader = new I2CPMessageReader(in, this);
|
||||
}
|
||||
Thread notifier = new I2PAppThread(_availabilityNotifier, "ClientNotifier " + getPrefix(), true);
|
||||
|
||||
@@ -5,6 +5,7 @@ package net.i2p.client;
|
||||
* with no warranty of any kind, either expressed or implied.
|
||||
*/
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.Socket;
|
||||
@@ -28,6 +29,8 @@ import net.i2p.internal.QueuedI2CPMessageReader;
|
||||
*/
|
||||
class I2PSimpleSession extends I2PSessionImpl2 {
|
||||
|
||||
private static final int BUF_SIZE = 1024;
|
||||
|
||||
/**
|
||||
* Create a new session for doing naming and bandwidth queries only. Do not create a destination.
|
||||
*
|
||||
@@ -68,7 +71,7 @@ class I2PSimpleSession extends I2PSessionImpl2 {
|
||||
_out.write(I2PClient.PROTOCOL_BYTE);
|
||||
_out.flush();
|
||||
_writer = new ClientWriterRunner(_out, this);
|
||||
InputStream in = _socket.getInputStream();
|
||||
InputStream in = new BufferedInputStream(_socket.getInputStream(), BUF_SIZE);
|
||||
_reader = new I2CPMessageReader(in, this);
|
||||
}
|
||||
// we do not receive payload messages, so we do not need an AvailabilityNotifier
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package net.i2p.util;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
@@ -1057,6 +1058,8 @@ public class EepGet {
|
||||
//}
|
||||
}
|
||||
_proxyIn = _proxy.getInputStream();
|
||||
if (!(_proxy instanceof InternalSocket))
|
||||
_proxyIn = new BufferedInputStream(_proxyIn);
|
||||
_proxyOut = _proxy.getOutputStream();
|
||||
|
||||
if (timeout != null)
|
||||
|
||||
@@ -27,7 +27,7 @@ public class InternalServerSocket extends ServerSocket {
|
||||
private static final ConcurrentHashMap<Integer, InternalServerSocket> _sockets = new ConcurrentHashMap(4);
|
||||
private final BlockingQueue<InternalSocket> _acceptQueue;
|
||||
private final Integer _port;
|
||||
private boolean _running;
|
||||
private volatile boolean _running;
|
||||
//private static Log _log = I2PAppContext.getGlobalContext().logManager().getLog(InternalServerSocket.class);
|
||||
|
||||
/**
|
||||
|
||||
@@ -37,6 +37,7 @@ package net.i2p.util;
|
||||
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
@@ -675,6 +676,8 @@ public class SSLEepGet extends EepGet {
|
||||
// this is an IOE
|
||||
throw sslhe;
|
||||
}
|
||||
|
||||
_proxyIn = new BufferedInputStream(_proxyIn);
|
||||
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Request flushed");
|
||||
|
||||
11
history.txt
11
history.txt
@@ -1,3 +1,14 @@
|
||||
2012-08-05 zzz
|
||||
* I2PSessionImpl: One more volatile (ticket #659)
|
||||
* i2ptunnel, I2CP, EepGet: Buffer socket input streams (ticket #666)
|
||||
* UDP:
|
||||
- Limit PacketHandler threads to 1 (ticket #660)
|
||||
- Limit queue sizes between UDPReceiver and PacketHandler,
|
||||
and between PacketHandler and MessageReceiver, to prevent OOMs
|
||||
and/or excessive queue delays
|
||||
- Increase UDPPacket cache size based on max mem
|
||||
- Remove more stats
|
||||
|
||||
2012-08-03 zzz
|
||||
* build.xml: Add buildI2PTunnelJar target for Android
|
||||
* i2psnark: Finish migration to I2P logging to reduce object churn (ticket #673)
|
||||
|
||||
@@ -18,7 +18,7 @@ public class RouterVersion {
|
||||
/** deprecated */
|
||||
public final static String ID = "Monotone";
|
||||
public final static String VERSION = CoreVersion.VERSION;
|
||||
public final static long BUILD = 2;
|
||||
public final static long BUILD = 3;
|
||||
|
||||
/** for example "-test" */
|
||||
public final static String EXTRA = "";
|
||||
|
||||
@@ -8,6 +8,7 @@ package net.i2p.router.client;
|
||||
*
|
||||
*/
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
@@ -96,6 +97,8 @@ class ClientConnectionRunner {
|
||||
// e.g. on local access
|
||||
private static final int MAX_MESSAGE_ID = 0x4000000;
|
||||
|
||||
private static final int BUF_SIZE = 32*1024;
|
||||
|
||||
/** @since 0.9.2 */
|
||||
private static final String PROP_TAGS = "crypto.tagsToSend";
|
||||
private static final String PROP_THRESH = "crypto.lowTagThreshold";
|
||||
@@ -124,7 +127,8 @@ class ClientConnectionRunner {
|
||||
*/
|
||||
public void startRunning() {
|
||||
try {
|
||||
_reader = new I2CPMessageReader(_socket.getInputStream(), new ClientMessageEventListener(_context, this, true));
|
||||
_reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE),
|
||||
new ClientMessageEventListener(_context, this, true));
|
||||
_writer = new ClientWriterRunner(_context, this);
|
||||
I2PThread t = new I2PThread(_writer);
|
||||
t.setName("I2CP Writer " + ++__id);
|
||||
|
||||
@@ -43,8 +43,8 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
_context.statManager().createRateStat("udp.receivedCompleteFragments", "How many fragments go in a fully received message", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receivedACKs", "How many messages were ACKed at a time", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.ignoreRecentDuplicate", "Take note that we received a packet for a recently completed message", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", UDPTransport.RATES);
|
||||
//_context.statManager().createRateStat("udp.receiveMessagePeriod", "How long it takes to pull the message fragments out of a packet", "udp", UDPTransport.RATES);
|
||||
//_context.statManager().createRateStat("udp.receiveACKPeriod", "How long it takes to pull the ACKs out of a packet", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.receivePiggyback", "How many acks were included in a packet with data fragments (time == # data fragments)", "udp", UDPTransport.RATES);
|
||||
}
|
||||
|
||||
@@ -71,15 +71,16 @@ class InboundMessageFragments /*implements UDPTransport.PartialACKSource */{
|
||||
* Pull the fragments and ACKs out of the authenticated data packet
|
||||
*/
|
||||
public void receiveData(PeerState from, UDPPacketReader.DataReader data) {
|
||||
long beforeMsgs = _context.clock().now();
|
||||
//long beforeMsgs = _context.clock().now();
|
||||
int fragmentsIncluded = receiveMessages(from, data);
|
||||
long afterMsgs = _context.clock().now();
|
||||
//long afterMsgs = _context.clock().now();
|
||||
int acksIncluded = receiveACKs(from, data);
|
||||
long afterACKs = _context.clock().now();
|
||||
//long afterACKs = _context.clock().now();
|
||||
|
||||
from.packetReceived(data.getPacketSize());
|
||||
_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
|
||||
_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
|
||||
// each of these was less than 0.1 ms
|
||||
//_context.statManager().addRateData("udp.receiveMessagePeriod", afterMsgs-beforeMsgs, afterACKs-beforeMsgs);
|
||||
//_context.statManager().addRateData("udp.receiveACKPeriod", afterACKs-afterMsgs, afterACKs-beforeMsgs);
|
||||
if ( (fragmentsIncluded > 0) && (acksIncluded > 0) )
|
||||
_context.statManager().addRateData("udp.receivePiggyback", acksIncluded, fragmentsIncluded);
|
||||
}
|
||||
|
||||
@@ -28,8 +28,11 @@ class MessageReceiver {
|
||||
private final BlockingQueue<InboundMessageState> _completeMessages;
|
||||
private boolean _alive;
|
||||
//private ByteCache _cache;
|
||||
|
||||
private static final int MIN_THREADS = 2; // unless < 32MB
|
||||
private static final int MAX_THREADS = 5;
|
||||
private static final int MIN_QUEUE_SIZE = 32; // unless < 32MB
|
||||
private static final int MAX_QUEUE_SIZE = 128;
|
||||
private final int _threadCount;
|
||||
private static final long POISON_IMS = -99999999999l;
|
||||
|
||||
@@ -37,17 +40,22 @@ class MessageReceiver {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(MessageReceiver.class);
|
||||
_transport = transport;
|
||||
_completeMessages = new LinkedBlockingQueue();
|
||||
|
||||
long maxMemory = Runtime.getRuntime().maxMemory();
|
||||
if (maxMemory == Long.MAX_VALUE)
|
||||
maxMemory = 96*1024*1024l;
|
||||
if (maxMemory < 32*1024*1024)
|
||||
int qsize;
|
||||
if (maxMemory < 32*1024*1024) {
|
||||
_threadCount = 1;
|
||||
else if (maxMemory < 64*1024*1024)
|
||||
qsize = 16;
|
||||
} else if (maxMemory < 64*1024*1024) {
|
||||
_threadCount = 2;
|
||||
else
|
||||
qsize = 32;
|
||||
} else {
|
||||
_threadCount = Math.max(MIN_THREADS, Math.min(MAX_THREADS, ctx.bandwidthLimiter().getInboundKBytesPerSecond() / 20));
|
||||
qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
|
||||
}
|
||||
_completeMessages = new LinkedBlockingQueue(qsize);
|
||||
|
||||
// the runners run forever, no need to have a cache
|
||||
//_cache = ByteCache.getInstance(64, I2NPMessage.MAX_SIZE);
|
||||
@@ -56,7 +64,7 @@ class MessageReceiver {
|
||||
_context.statManager().createRateStat("udp.inboundReady", "How many messages were ready when a message is added to the complete queue?", "udp", UDPTransport.RATES);
|
||||
//_context.statManager().createRateStat("udp.inboundReadTime", "How long it takes to parse in the completed fragments into a message?", "udp", UDPTransport.RATES);
|
||||
//_context.statManager().createRateStat("udp.inboundReceiveProcessTime", "How long it takes to add the message to the transport?", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.inboundLag", "How long the olded ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
|
||||
_context.statManager().createRateStat("udp.inboundLag", "How long the oldest ready message has been sitting on the queue (period is the queue size)?", "udp", UDPTransport.RATES);
|
||||
|
||||
_alive = true;
|
||||
}
|
||||
@@ -93,12 +101,18 @@ class MessageReceiver {
|
||||
/**
|
||||
* This queues the message for processing.
|
||||
* Processing will call state.releaseResources(), do not access state after calling this.
|
||||
* BLOCKING if queue is full.
|
||||
*/
|
||||
public void receiveMessage(InboundMessageState state) {
|
||||
//int total = 0;
|
||||
//long lag = -1;
|
||||
if (_alive)
|
||||
_completeMessages.offer(state);
|
||||
if (_alive) {
|
||||
try {
|
||||
_completeMessages.put(state);
|
||||
} catch (InterruptedException ie) {
|
||||
_alive = false;
|
||||
}
|
||||
}
|
||||
//total = _completeMessages.size();
|
||||
//if (total > 1)
|
||||
// lag = ((InboundMessageState)_completeMessages.get(0)).getLifetime();
|
||||
|
||||
@@ -31,8 +31,8 @@ class PacketHandler {
|
||||
private volatile boolean _keepReading;
|
||||
private final Handler[] _handlers;
|
||||
|
||||
private static final int MIN_NUM_HANDLERS = 2; // unless < 32MB
|
||||
private static final int MAX_NUM_HANDLERS = 5;
|
||||
private static final int MIN_NUM_HANDLERS = 1; // unless < 32MB
|
||||
private static final int MAX_NUM_HANDLERS = 1;
|
||||
/** let packets be up to 30s slow */
|
||||
private static final long GRACE_PERIOD = Router.CLOCK_FUDGE_FACTOR + 30*1000;
|
||||
|
||||
|
||||
@@ -42,12 +42,18 @@ class UDPPacket {
|
||||
// Warning - this mixes contexts in a multi-router JVM
|
||||
private static final Queue<UDPPacket> _packetCache;
|
||||
private static final boolean CACHE = true;
|
||||
private static final int CACHE_SIZE = 64;
|
||||
private static final int MIN_CACHE_SIZE = 64;
|
||||
private static final int MAX_CACHE_SIZE = 256;
|
||||
static {
|
||||
if (CACHE)
|
||||
_packetCache = new LinkedBlockingQueue(CACHE_SIZE);
|
||||
else
|
||||
if (CACHE) {
|
||||
long maxMemory = Runtime.getRuntime().maxMemory();
|
||||
if (maxMemory == Long.MAX_VALUE)
|
||||
maxMemory = 96*1024*1024l;
|
||||
int csize = (int) Math.max(MIN_CACHE_SIZE, Math.min(MAX_CACHE_SIZE, maxMemory / (1024*1024)));
|
||||
_packetCache = new LinkedBlockingQueue(csize);
|
||||
} else {
|
||||
_packetCache = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -30,14 +30,21 @@ class UDPReceiver {
|
||||
private final UDPTransport _transport;
|
||||
private static int __id;
|
||||
private final int _id;
|
||||
|
||||
private static final int TYPE_POISON = -99999;
|
||||
private static final int MIN_QUEUE_SIZE = 16;
|
||||
private static final int MAX_QUEUE_SIZE = 192;
|
||||
|
||||
public UDPReceiver(RouterContext ctx, UDPTransport transport, DatagramSocket socket, String name) {
|
||||
_context = ctx;
|
||||
_log = ctx.logManager().getLog(UDPReceiver.class);
|
||||
_id = ++__id;
|
||||
_name = name;
|
||||
_inboundQueue = new LinkedBlockingQueue();
|
||||
long maxMemory = Runtime.getRuntime().maxMemory();
|
||||
if (maxMemory == Long.MAX_VALUE)
|
||||
maxMemory = 96*1024*1024l;
|
||||
int qsize = (int) Math.max(MIN_QUEUE_SIZE, Math.min(MAX_QUEUE_SIZE, maxMemory / (2*1024*1024)));
|
||||
_inboundQueue = new LinkedBlockingQueue(qsize);
|
||||
_socket = socket;
|
||||
_transport = transport;
|
||||
_runner = new Runner();
|
||||
@@ -138,7 +145,11 @@ class UDPReceiver {
|
||||
return doReceive(packet);
|
||||
}
|
||||
|
||||
/** @return zero (was queue size) */
|
||||
/**
|
||||
* BLOCKING if queue between here and PacketHandler is full.
|
||||
*
|
||||
* @return zero (was queue size)
|
||||
*/
|
||||
private final int doReceive(UDPPacket packet) {
|
||||
if (!_keepRunning)
|
||||
return 0;
|
||||
@@ -168,7 +179,12 @@ class UDPReceiver {
|
||||
}
|
||||
}
|
||||
if (!rejected) {
|
||||
_inboundQueue.offer(packet);
|
||||
try {
|
||||
_inboundQueue.put(packet);
|
||||
} catch (InterruptedException ie) {
|
||||
packet.release();
|
||||
_keepRunning = false;
|
||||
}
|
||||
//return queueSize + 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user