diff --git a/history.txt b/history.txt index 8b7fd2ec7..22a2023a2 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.410 2006/02/20 11:42:36 jrandom Exp $ +$Id: history.txt,v 1.411 2006/02/20 13:12:47 jrandom Exp $ + +2006-02-20 jrandom + * Throttle the outbound SSU establishment queue, so it doesn't fill up the + heap when backlogged (and so that the messages queued up on it don't sit + there forever) + * Further SSU memory cleanup 2006-02-20 jrandom * Properly enable TCP this time (oops) diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index f7c388824..58a0e9c50 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -33,6 +33,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM private long _expiration; private long _uniqueId; private boolean _written; + private boolean _read; public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH; @@ -54,6 +55,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM _expiration = _context.clock().now() + DEFAULT_EXPIRATION_MS; _uniqueId = _context.random().nextLong(MAX_ID_VALUE); _written = false; + _read = false; //_context.statManager().createRateStat("i2np.writeTime", "How long it takes to write an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); //_context.statManager().createRateStat("i2np.readTime", "How long it takes to read an I2NP message", "I2NP", new long[] { 10*60*1000, 60*60*1000 }); } @@ -105,6 +107,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM long time = _context.clock().now() - start; //if (time > 50) // _context.statManager().addRateData("i2np.readTime", time, time); + _read = true; return size + Hash.HASH_LENGTH + 1 + 4 + DataHelper.DATE_LENGTH; } catch (DataFormatException dfe) { throw new I2NPMessageException("Error reading the message header", dfe); @@ -149,6 +152,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM long time = _context.clock().now() - start; //if (time > 50) // _context.statManager().addRateData("i2np.readTime", time, time); + _read = true; return cur - offset; } @@ -296,7 +300,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM public static I2NPMessage fromRawByteArray(I2PAppContext ctx, byte buffer[], int offset, int len, I2NPMessageHandler handler) throws I2NPMessageException { int type = (int)DataHelper.fromLong(buffer, offset, 1); offset++; - I2NPMessage msg = createMessage(ctx, type); + I2NPMessageImpl msg = (I2NPMessageImpl)createMessage(ctx, type); if (msg == null) throw new I2NPMessageException("Unknown message type: " + type); if (RAW_FULL_SIZE) { @@ -305,6 +309,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM } catch (IOException ioe) { throw new I2NPMessageException("Error reading the " + msg, ioe); } + msg.read(); return msg; } @@ -314,6 +319,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM try { msg.readMessage(buffer, offset, dataSize, type, handler); msg.setMessageExpiration(expiration); + msg.read(); return msg; } catch (IOException ioe) { throw new I2NPMessageException("IO error reading raw message", ioe); @@ -322,6 +328,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM protected void verifyUnwritten() { if (_written) throw new RuntimeException("Already written"); } protected void written() { _written = true; } + protected void read() { _read = true; } /** * Yes, this is fairly ugly, but its the only place it ever happens. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 30901398f..221ba136f 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.352 $ $Date: 2006/02/20 11:42:33 $"; + public final static String ID = "$Revision: 1.353 $ $Date: 2006/02/20 13:12:48 $"; public final static String VERSION = "0.6.1.10"; - public final static long BUILD = 8; + public final static long BUILD = 9; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index fc7b8de17..84e17c6d6 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -47,7 +47,7 @@ public class EstablishmentManager { private Object _activityLock; private int _activity; - private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 4; + private static final int DEFAULT_MAX_CONCURRENT_ESTABLISH = 10; public static final String PROP_MAX_CONCURRENT_ESTABLISH = "i2np.udp.maxConcurrentEstablish"; public EstablishmentManager(RouterContext ctx, UDPTransport transport) { @@ -67,6 +67,8 @@ public class EstablishmentManager { _context.statManager().createRateStat("udp.sendIntroRelayRequest", "How often we send a relay request to reach a peer", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendIntroRelayTimeout", "How often a relay request times out before getting a response (due to the target or intro peer being offline)", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.receiveIntroRelayResponse", "How long it took to receive a relay response", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.establishRejected", "How many pending outbound connections are there when we refuse to add any more?", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.establishOverflow", "How many messages were queued up on a pending connection when it was too much?", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); } public void startup() { @@ -112,6 +114,9 @@ public class EstablishmentManager { } return DEFAULT_MAX_CONCURRENT_ESTABLISH; } + + private static final int MAX_QUEUED_OUTBOUND = 10*1000; + private static final int MAX_QUEUED_PER_PEER = 3; /** * Send the message to its specified recipient by establishing a connection @@ -153,16 +158,24 @@ public class EstablishmentManager { OutboundEstablishState state = null; int deferred = 0; + boolean rejected = false; + int queueCount = 0; synchronized (_outboundStates) { state = (OutboundEstablishState)_outboundStates.get(to); if (state == null) { if (_outboundStates.size() >= getMaxConcurrentEstablish()) { List queued = (List)_queuedOutbound.get(to); if (queued == null) { - queued = new ArrayList(1); - _queuedOutbound.put(to, queued); + if (_queuedOutbound.size() > MAX_QUEUED_OUTBOUND) { + rejected = true; + } else { + queued = new ArrayList(1); + _queuedOutbound.put(to, queued); + } } - queued.add(msg); + queueCount = queued.size(); + if ( (queueCount < MAX_QUEUED_PER_PEER) && (!rejected) ) + queued.add(msg); deferred = _queuedOutbound.size(); } else { state = new OutboundEstablishState(_context, remAddr, port, @@ -181,6 +194,17 @@ public class EstablishmentManager { } } + if (rejected) { + _transport.failed(msg, "Too many pending outbound connections"); + _context.statManager().addRateData("udp.establishRejected", deferred, 0); + return; + } + if (queueCount >= MAX_QUEUED_PER_PEER) { + _transport.failed(msg, "Too many pending messages for the given peer"); + _context.statManager().addRateData("udp.establishOverflow", queueCount, deferred); + return; + } + if (deferred > 0) msg.timestamp("too many deferred establishers: " + deferred); else if (state != null) @@ -199,7 +223,7 @@ public class EstablishmentManager { Object removed = null; synchronized (_outboundStates) { removed = _outboundStates.remove(_to); - if (removed != _state) { // oops, we must have failed, then retried + if ( (removed != null) && (removed != _state) ) { // oops, we must have failed, then retried _outboundStates.put(_to, removed); removed = null; }/* else { @@ -388,6 +412,8 @@ public class EstablishmentManager { * */ private void handleCompletelyEstablished(InboundEstablishState state) { + if (state.complete()) return; + long now = _context.clock().now(); RouterIdentity remote = state.getConfirmedIdentity(); PeerState peer = new PeerState(_context, _transport); @@ -423,6 +449,12 @@ public class EstablishmentManager { * */ private PeerState handleCompletelyEstablished(OutboundEstablishState state) { + if (state.complete()) { + RouterIdentity rem = state.getRemoteIdentity(); + if (rem != null) + return _transport.getPeerState(rem.getHash()); + } + long now = _context.clock().now(); RouterIdentity remote = state.getRemoteIdentity(); PeerState peer = new PeerState(_context, _transport); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java index bd2c7a5cc..8ccfd748c 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState.java @@ -54,6 +54,7 @@ public class InboundEstablishState { private long _nextSend; private RemoteHostId _remoteHostId; private int _currentState; + private boolean _complete; /** nothin known yet */ public static final int STATE_UNKNOWN = 0; @@ -77,11 +78,17 @@ public class InboundEstablishState { _bobPort = localPort; _keyBuilder = null; _verificationAttempted = false; + _complete = false; _currentState = STATE_UNKNOWN; _establishBegin = ctx.clock().now(); } public synchronized int getState() { return _currentState; } + public synchronized boolean complete() { + boolean already = _complete; + _complete = true; + return already; + } public synchronized void receiveSessionRequest(UDPPacketReader.SessionRequestReader req) { if (_receivedX == null) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java index 23f311c50..882a48dde 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState.java @@ -59,6 +59,7 @@ public class OutboundEstablishState { private long _introductionNonce; // intro private UDPAddress _remoteAddress; + private boolean _complete; /** nothin sent yet */ public static final int STATE_UNKNOWN = 0; @@ -94,6 +95,7 @@ public class OutboundEstablishState { _establishBegin = ctx.clock().now(); _remoteAddress = addr; _introductionNonce = -1; + _complete = false; prepareSessionRequest(); if ( (addr != null) && (addr.getIntroducerCount() > 0) ) { if (_log.shouldLog(Log.DEBUG)) @@ -103,6 +105,11 @@ public class OutboundEstablishState { } public synchronized int getState() { return _currentState; } + public synchronized boolean complete() { + boolean already = _complete; + _complete = true; + return already; + } public UDPAddress getRemoteAddress() { return _remoteAddress; } public void setIntroNonce(long nonce) { _introductionNonce = nonce; } diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index d6baef7a8..1b942d808 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -34,7 +34,7 @@ public class OutboundMessageState { private int _nextSendFragment; public static final int MAX_FRAGMENTS = 32; - private static final ByteCache _cache = ByteCache.getInstance(128, MAX_FRAGMENTS*1024); + private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024); public OutboundMessageState(I2PAppContext context) { _context = context; diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 07f47bc39..73fe48d71 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -413,22 +413,24 @@ public class PacketHandler { state = _establisher.receiveData(outState); if (_log.shouldLog(Log.DEBUG)) _log.debug("Received new DATA packet from " + state + ": " + packet); - UDPPacketReader.DataReader dr = reader.getDataReader(); - if (_log.shouldLog(Log.INFO)) { - StringBuffer msg = new StringBuffer(); - msg.append("Receive ").append(System.identityHashCode(packet)); - msg.append(" from ").append(state.getRemotePeer().toBase64()).append(" ").append(state.getRemoteHostId()); - for (int i = 0; i < dr.readFragmentCount(); i++) { - msg.append(" msg ").append(dr.readMessageId(i)); - msg.append(":").append(dr.readMessageFragmentNum(i)); - if (dr.readMessageIsLast(i)) - msg.append("*"); + if (state != null) { + UDPPacketReader.DataReader dr = reader.getDataReader(); + if (_log.shouldLog(Log.INFO)) { + StringBuffer msg = new StringBuffer(); + msg.append("Receive ").append(System.identityHashCode(packet)); + msg.append(" from ").append(state.getRemotePeer().toBase64()).append(" ").append(state.getRemoteHostId()); + for (int i = 0; i < dr.readFragmentCount(); i++) { + msg.append(" msg ").append(dr.readMessageId(i)); + msg.append(":").append(dr.readMessageFragmentNum(i)); + if (dr.readMessageIsLast(i)) + msg.append("*"); + } + msg.append(": ").append(dr.toString()); + _log.info(msg.toString()); } - msg.append(": ").append(dr.toString()); - _log.info(msg.toString()); + packet.beforeReceiveFragments(); + _inbound.receiveData(state, dr); } - packet.beforeReceiveFragments(); - _inbound.receiveData(state, dr); break; case UDPPacket.PAYLOAD_TYPE_TEST: _state = 51; diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 1b5c32124..9a33e5b62 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -1427,6 +1427,7 @@ public class PeerState { tmp.addAll(oldPeer._outboundMessages); oldPeer._outboundMessages.clear(); retransmitter = oldPeer._retransmitter; + oldPeer._retransmitter = null; } synchronized (_outboundMessages) { _outboundMessages.addAll(tmp); @@ -1434,7 +1435,8 @@ public class PeerState { } tmp.clear(); } - + + /* public int hashCode() { if (_remotePeer != null) return _remotePeer.hashCode(); @@ -1454,6 +1456,7 @@ public class PeerState { return false; } } + */ public String toString() { StringBuffer buf = new StringBuffer(64); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index af8719305..a59ecb107 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -483,6 +483,11 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + if (oldPeer != null) { + oldPeer.dropOutbound(); + _introManager.remove(oldPeer); + _expireEvent.remove(oldPeer); + } oldPeer = null; RemoteHostId remoteId = peer.getRemoteHostId(); @@ -497,6 +502,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority } } + if (oldPeer != null) { + oldPeer.dropOutbound(); + _introManager.remove(oldPeer); + _expireEvent.remove(oldPeer); + } + if ( (oldPeer != null) && (_log.shouldLog(Log.WARN)) ) _log.warn("Peer already connected: old=" + oldPeer + " new=" + peer, new Exception("dup")); @@ -624,6 +635,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _log.warn(buf.toString(), new Exception("Dropped by")); } + peer.dropOutbound(); _introManager.remove(peer); _fragments.dropPeer(peer); // a bit overzealous - perhaps we should only rebuild the external if the peer being dropped @@ -1384,67 +1396,53 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int EXPIRE_TIMEOUT = 10*60*1000; private class ExpirePeerEvent implements SimpleTimer.TimedEvent { - private List _peers; - // toAdd and toRemove are kept separate from _peers so that add and - // remove calls won't block packet handling while the big iteration is - // in process - private List _toAdd; - private List _toRemove; + private List _expirePeers; + private List _expireBuffer; private boolean _alive; public ExpirePeerEvent() { - _peers = new ArrayList(128); - _toAdd = new ArrayList(4); - _toRemove = new ArrayList(4); + _expirePeers = new ArrayList(128); + _expireBuffer = new ArrayList(128); } public void timeReached() { long inactivityCutoff = _context.clock().now() - EXPIRE_TIMEOUT; - for (int i = 0; i < _peers.size(); i++) { - PeerState peer = (PeerState)_peers.get(i); - if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) { - dropPeer(peer, false); - _peers.remove(i); - i--; + _expireBuffer.clear(); + synchronized (_expirePeers) { + int sz = _expirePeers.size(); + for (int i = 0; i < sz; i++) { + PeerState peer = (PeerState)_expirePeers.get(i); + if ( (peer.getLastReceiveTime() < inactivityCutoff) && (peer.getLastSendTime() < inactivityCutoff) ) { + _expireBuffer.add(peer); + _expirePeers.remove(i); + i--; + sz--; + } } } - synchronized (_toAdd) { - for (int i = 0; i < _toAdd.size(); i++) { - PeerState peer = (PeerState)_toAdd.get(i); - _peers.remove(peer); // in case we are switching peers - _peers.add(peer); - } - _toAdd.clear(); - } - synchronized (_toRemove) { - for (int i = 0; i < _toRemove.size(); i++) { - PeerState peer = (PeerState)_toRemove.get(i); - _peers.remove(peer); - } - _toRemove.clear(); - } + for (int i = 0; i < _expireBuffer.size(); i++) + dropPeer((PeerState)_expireBuffer.get(i), false); + _expireBuffer.clear(); + if (_alive) - SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000); + SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000); } public void add(PeerState peer) { - synchronized (_toAdd) { - _toAdd.add(peer); + synchronized (_expirePeers) { + _expirePeers.add(peer); } } public void remove(PeerState peer) { - synchronized (_toRemove) { - _toRemove.add(peer); + synchronized (_expirePeers) { + _expirePeers.remove(peer); } } public void setIsAlive(boolean isAlive) { _alive = isAlive; if (isAlive) { - SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 5*60*1000); + SimpleTimer.getInstance().addEvent(ExpirePeerEvent.this, 30*1000); } else { SimpleTimer.getInstance().removeEvent(ExpirePeerEvent.this); - synchronized (_toAdd) { - _toAdd.clear(); - } - synchronized (_peers) { - _peers.clear(); + synchronized (_expirePeers) { + _expirePeers.clear(); } } }