NTCP reader: parallize operation
Opened 20 months ago
Last modified 19 months ago
#2620newenhancement
NTCP reader: parallize operation
Reported by:joggerOwned by:zzz Priority: minor Milestone: undecided Component: router/transport Version: 0.9.42 Keywords:
Cc:
Parent Tickets:
Sensitive: no
Description
NTCP reader has the same unnecessary notify()´s as the writer. It is already an improvement to just apply the same changes.
However here we have the same situation as with #2617. The reader is just too slow to fill the pumper queue. The inbound code provided in #2432 comment 11 just works for the UDP message receiver, the liveReads logic here makes NTCP reader run single threaded, causing all messages to trickle one at a time through the router. Takes ages to get a 16k torrent piece to the local client.
Here the opposite is needed: if a read is in play, prioritize the same con for reading on another reader (ideally up to the number of processors). This way we have a chance to really get a bunch of messages to pump(). Load tested patch as follows:
--- i2p-0.9.42/router/java/src/net/i2p/router/transport/ntcp/Reader.java 2019-08-27 14:34:07.000000000 +0200
+++ 42dev/router/java/src/net/i2p/router/transport/ntcp/Reader.java 2019-09-12 09:07:08.000000000 +0200
@@ -7,6 +7,8 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
import net.i2p.router.RouterContext;
import net.i2p.util.I2PThread;
@@ -23,17 +25,15 @@
private final Log _log;
// TODO change to LBQ ??
private final Set<NTCPConnection> _pendingConnections;
- private final Set<NTCPConnection> _liveReads;
- private final Set<NTCPConnection> _readAfterLive;
+ private final Map<NTCPConnection, Integer> _liveReads;
private final List<Runner> _runners;
public Reader(RouterContext ctx) {
_context = ctx;
_log = ctx.logManager().getLog(getClass());
- _pendingConnections = new LinkedHashSet<NTCPConnection>(16);
+ _pendingConnections = new HashSet<NTCPConnection>(16);
_runners = new ArrayList<Runner>(8);
- _liveReads = new HashSet<NTCPConnection>(8);
- _readAfterLive = new HashSet<NTCPConnection>(8);
+ _liveReads = new HashMap<NTCPConnection, Integer>(8);
}
public synchronized void startReading(int numReaders) {
@@ -51,21 +51,17 @@
r.stop();
}
synchronized (_pendingConnections) {
- _readAfterLive.clear();
+ _liveReads.clear();
+ _pendingConnections.clear();
_pendingConnections.notifyAll();
}
}
public void wantsRead(NTCPConnection con) {
- boolean already = false;
+ boolean already;
synchronized (_pendingConnections) {
- if (_liveReads.contains(con)) {
- _readAfterLive.add(con);
- already = true;
- } else {
+ already = _liveReads.containsKey(con);
_pendingConnections.add(con);
- // only notify here if added?
- }
_pendingConnections.notify();
}
if (_log.shouldLog(Log.DEBUG))
@@ -74,10 +70,8 @@
public void connectionClosed(NTCPConnection con) {
synchronized (_pendingConnections) {
- _readAfterLive.remove(con);
+ _liveReads.remove(con);
_pendingConnections.remove(con);
- // necessary?
- _pendingConnections.notify();
}
}
@@ -94,25 +88,30 @@
while (!_stop) {
try {
synchronized (_pendingConnections) {
- boolean keepReading = (con != null) && _readAfterLive.remove(con);
- if (keepReading) {
- // keep on reading the same one
- } else {
- if (con != null) {
+ Integer num = _liveReads.get(con);
+ if (num != null) {
+ int val = num.intValue();
+ if (val <= 1)
_liveReads.remove(con);
- con = null;
+ else
+ _liveReads.replace(con, Integer.valueOf(val - 1));
}
- if (_pendingConnections.isEmpty()) {
- _pendingConnections.wait();
- } else {
+ con = null;
Iterator<NTCPConnection> iter = _pendingConnections.iterator();
+ while (iter.hasNext()) {
con = iter.next();
- iter.remove();
- _liveReads.add(con);
- }
+ num = _liveReads.get(con);
+ if (num != null)
+ break;
}
+ if (con == null)
+ _pendingConnections.wait();
+ else
+ _liveReads.put(con, Integer.valueOf(num == null ? 1 : num.intValue() + 1));
+ _pendingConnections.remove(con);
}
} catch (InterruptedException ie) {}
+
if (!_stop && (con != null) ) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("begin read for " + con);