From ef230cfa3d4394bf1655dfca067db0e3b1680199 Mon Sep 17 00:00:00 2001 From: jrandom Date: Thu, 3 Mar 2005 03:36:52 +0000 Subject: [PATCH] 2005-03-02 jrandom * Fix one substantial OOM cause (session tag manager was only dropping tags once the critical limit was met, rather than honoring their expiration) (duh) * Lots of small memory fixes * Double the allowable concurrent outstanding tunnel build tasks (20) --- .../i2p/client/streaming/PacketHandler.java | 4 +- .../net/i2p/client/streaming/PacketQueue.java | 4 +- .../src/net/i2p/client/I2PSessionImpl.java | 21 +++++++- .../client/RequestLeaseSetMessageHandler.java | 3 +- .../crypto/TransientSessionKeyManager.java | 48 ++++++++++--------- core/java/src/net/i2p/data/Payload.java | 6 ++- .../data/i2cp/ReceiveMessageBeginMessage.java | 12 ++--- .../data/i2cp/ReceiveMessageEndMessage.java | 12 ++--- history.txt | 9 +++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../router/tunnel/pool/TunnelPoolManager.java | 6 +-- 11 files changed, 76 insertions(+), 53 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 083ef6844..2aba60361 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -106,12 +106,12 @@ public class PacketHandler { private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS"); void displayPacket(Packet packet, String prefix, String suffix) { + if (!_log.shouldLog(Log.DEBUG)) return; String msg = null; synchronized (_fmt) { msg = _fmt.format(new Date()) + ": " + prefix + " " + packet.toString() + (suffix != null ? " " + suffix : ""); } - if (_log.shouldLog(Log.DEBUG)) - System.out.println(msg); + System.out.println(msg); } private void receiveKnownCon(Connection con, Packet packet) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 9d36d7e5e..4c2841be5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -48,7 +48,9 @@ class PacketQueue { tagsSent = new HashSet(0); // cache this from before sendMessage - String conStr = (packet.getConnection() != null ? packet.getConnection().toString() : ""); + String conStr = null; + if (_log.shouldLog(Log.DEBUG)) + conStr = (packet.getConnection() != null ? packet.getConnection().toString() : ""); if (packet.getAckTime() > 0) { _log.debug("Not resending " + packet); return; diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index fa28afce1..cd25bdd4c 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -39,6 +39,7 @@ import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.SessionId; import net.i2p.util.I2PThread; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * Implementation of an I2P session running over TCP. This class is NOT thread safe - @@ -348,8 +349,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa int id = msg.getMessageId().getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { - if (_log.shouldLog(Log.ERROR)) - _log.error(getPrefix() + "addNewMessage of a message with no unencrypted data", + if (_log.shouldLog(Log.CRIT)) + _log.log(Log.CRIT, getPrefix() + "addNewMessage of a message with no unencrypted data", new Exception("Empty message")); } else { int size = data.length; @@ -357,6 +358,20 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa if (_log.shouldLog(Log.INFO)) _log.info(getPrefix() + "Notified availability for session " + _sessionId + ", message " + id); } + SimpleTimer.getInstance().addEvent(new VerifyUsage(id), 30*1000); + } + private class VerifyUsage implements SimpleTimer.TimedEvent { + private int _msgId; + public VerifyUsage(int id) { _msgId = id; } + public void timeReached() { + MessagePayloadMessage removed = null; + synchronized (_availableMessages) { + removed = (MessagePayloadMessage)_availableMessages.remove(new Integer(_msgId)); + } + if (removed != null) + _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed); + } + } private class AvailabilityNotifier implements Runnable { @@ -407,6 +422,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } catch (Exception e) { _log.log(Log.CRIT, "Error notifying app of message availability", e); } + } else { + _log.log(Log.CRIT, "Unable to notify an app that " + msgId + " of size " + size + " is available!"); } } } diff --git a/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java b/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java index 67cf88ea8..41e9f118b 100644 --- a/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java +++ b/core/java/src/net/i2p/client/RequestLeaseSetMessageHandler.java @@ -41,7 +41,8 @@ class RequestLeaseSetMessageHandler extends HandlerImpl { } public void handleMessage(I2CPMessage message, I2PSessionImpl session) { - _log.debug("Handle message " + message); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handle message " + message); RequestLeaseSetMessage msg = (RequestLeaseSetMessage) message; LeaseSet leaseSet = new LeaseSet(); for (int i = 0; i < msg.getEndpoints(); i++) { diff --git a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java index 2a7c3ed57..e406fdda8 100644 --- a/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java +++ b/core/java/src/net/i2p/crypto/TransientSessionKeyManager.java @@ -24,6 +24,7 @@ import net.i2p.data.PublicKey; import net.i2p.data.SessionKey; import net.i2p.data.SessionTag; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * Implement the session key management, but keep everything in memory (don't write @@ -67,8 +68,21 @@ class TransientSessionKeyManager extends SessionKeyManager { _context = context; _outboundSessions = new HashMap(1024); _inboundTagSets = new HashMap(64*1024); + context.statManager().createRateStat("crypto.sessionTagsExpired", "How many tags/sessions are expired?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 }); + context.statManager().createRateStat("crypto.sessionTagsRemaining", "How many tags/sessions are remaining after a cleanup?", "Encryption", new long[] { 10*60*1000, 60*60*1000, 3*60*60*1000 }); + SimpleTimer.getInstance().addEvent(new CleanupEvent(), 60*1000); } private TransientSessionKeyManager() { this(null); } + + private class CleanupEvent implements SimpleTimer.TimedEvent { + public void timeReached() { + long beforeExpire = _context.clock().now(); + int expired = aggressiveExpire(); + long expireTime = _context.clock().now() - beforeExpire; + _context.statManager().addRateData("crypto.sessionTagsExpired", expired, expireTime); + SimpleTimer.getInstance().addEvent(CleanupEvent.this, 60*1000); + } + } /** TagSet */ protected Set getInboundTagSets() { @@ -247,6 +261,7 @@ class TransientSessionKeyManager extends SessionKeyManager { overage = _inboundTagSets.size() - MAX_INBOUND_SESSION_TAGS; } } + if (overage > 0) clearExcess(overage); @@ -361,45 +376,32 @@ class TransientSessionKeyManager extends SessionKeyManager { */ public int aggressiveExpire() { int removed = 0; + int remaining = 0; long now = _context.clock().now(); - Set tagsToDrop = null; // new HashSet(64); synchronized (_inboundTagSets) { for (Iterator iter = _inboundTagSets.keySet().iterator(); iter.hasNext();) { SessionTag tag = (SessionTag) iter.next(); TagSet ts = (TagSet) _inboundTagSets.get(tag); if (ts.getDate() < now - SESSION_LIFETIME_MAX_MS) { - if (tagsToDrop == null) - tagsToDrop = new HashSet(4); - tagsToDrop.add(tag); + iter.remove(); + removed++; } } - if (tagsToDrop != null) { - removed += tagsToDrop.size(); - for (Iterator iter = tagsToDrop.iterator(); iter.hasNext();) - _inboundTagSets.remove(iter.next()); - } + remaining = _inboundTagSets.size(); } + _context.statManager().addRateData("crypto.sessionTagsRemaining", remaining, 0); + + //_log.warn("Expiring tags: [" + tagsToDrop + "]"); synchronized (_outboundSessions) { - Set sessionsToDrop = null; for (Iterator iter = _outboundSessions.keySet().iterator(); iter.hasNext();) { PublicKey key = (PublicKey) iter.next(); OutboundSession sess = (OutboundSession) _outboundSessions.get(key); removed += sess.expireTags(); - if (sess.getTagSets().size() <= 0) { - if (sessionsToDrop == null) - sessionsToDrop = new HashSet(4); - sessionsToDrop.add(key); - } - } - if (sessionsToDrop != null) { - for (Iterator iter = sessionsToDrop.iterator(); iter.hasNext();) { - OutboundSession cur = (OutboundSession)_outboundSessions.remove(iter.next()); - if ( (cur != null) && (_log.shouldLog(Log.WARN)) ) - _log.warn("Removing session tags with " + cur.availableTags() + " available for " - + (cur.getLastExpirationDate()-_context.clock().now()) - + "ms more", new Exception("Removed by")); + if (sess.availableTags() <= 0) { + iter.remove(); + removed++; } } } diff --git a/core/java/src/net/i2p/data/Payload.java b/core/java/src/net/i2p/data/Payload.java index 8ba93d0e9..2b77614d1 100644 --- a/core/java/src/net/i2p/data/Payload.java +++ b/core/java/src/net/i2p/data/Payload.java @@ -73,14 +73,16 @@ public class Payload extends DataStructureImpl { _encryptedData = new byte[size]; int read = read(in, _encryptedData); if (read != size) throw new DataFormatException("Incorrect number of bytes read in the payload structure"); - _log.debug("read payload: " + read + " bytes"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read payload: " + read + " bytes"); } public void writeBytes(OutputStream out) throws DataFormatException, IOException { if (_encryptedData == null) throw new DataFormatException("Not yet encrypted. Please set the encrypted data"); DataHelper.writeLong(out, 4, _encryptedData.length); out.write(_encryptedData); - _log.debug("wrote payload: " + _encryptedData.length); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("wrote payload: " + _encryptedData.length); } public int writeBytes(byte target[], int offset) { if (_encryptedData == null) throw new IllegalStateException("Not yet encrypted. Please set the encrypted data"); diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java index eb56896d7..4c27ca9cc 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java @@ -64,14 +64,10 @@ public class ReceiveMessageBeginMessage extends I2CPMessageImpl { protected byte[] doWriteMessage() throws I2CPMessageException, IOException { if ((_sessionId == null) || (_messageId == null)) throw new I2CPMessageException("Unable to write out the message as there is not enough data"); - ByteArrayOutputStream os = new ByteArrayOutputStream(64); - try { - _sessionId.writeBytes(os); - _messageId.writeBytes(os); - } catch (DataFormatException dfe) { - throw new I2CPMessageException("Error writing out the message data", dfe); - } - return os.toByteArray(); + byte rv[] = new byte[2+4]; + DataHelper.toLong(rv, 0, 2, _sessionId.getSessionId()); + DataHelper.toLong(rv, 2, 4, _messageId.getMessageId()); + return rv; } public int getType() { diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java index 5257c5fe9..eb7adccad 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java @@ -64,14 +64,10 @@ public class ReceiveMessageEndMessage extends I2CPMessageImpl { protected byte[] doWriteMessage() throws I2CPMessageException, IOException { if ((_sessionId == null) || (_messageId == null)) throw new I2CPMessageException("Unable to write out the message as there is not enough data"); - ByteArrayOutputStream os = new ByteArrayOutputStream(64); - try { - _sessionId.writeBytes(os); - _messageId.writeBytes(os); - } catch (DataFormatException dfe) { - throw new I2CPMessageException("Error writing out the message data", dfe); - } - return os.toByteArray(); + byte rv[] = new byte[2+4]; + DataHelper.toLong(rv, 0, 2, _sessionId.getSessionId()); + DataHelper.toLong(rv, 2, 4, _messageId.getMessageId()); + return rv; } public int getType() { diff --git a/history.txt b/history.txt index a584c0b22..758c9a87c 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,11 @@ -$Id: history.txt,v 1.160 2005/02/27 17:09:37 jrandom Exp $ +$Id: history.txt,v 1.161 2005/03/01 12:50:54 jrandom Exp $ + +2005-03-02 jrandom + * Fix one substantial OOM cause (session tag manager was only dropping + tags once the critical limit was met, rather than honoring their + expiration) (duh) + * Lots of small memory fixes + * Double the allowable concurrent outstanding tunnel build tasks (20) 2005-03-01 jrandom * Really disable the streaming lib packet caching diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 464a5c899..b32052883 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.155 $ $Date: 2005/02/27 17:09:37 $"; + public final static String ID = "$Revision: 1.156 $ $Date: 2005/03/01 12:50:54 $"; public final static String VERSION = "0.5.0.1"; - public final static long BUILD = 7; + public final static long BUILD = 8; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java index 9a8f94468..8da0b6659 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TunnelPoolManager.java @@ -54,13 +54,13 @@ public class TunnelPoolManager implements TunnelManagerFacade { _clientInboundPools = new HashMap(4); _clientOutboundPools = new HashMap(4); _outstandingBuilds = 0; - _maxOutstandingBuilds = 10; - String max = ctx.getProperty("router.tunnel.maxConcurrentBuilds", "10"); + _maxOutstandingBuilds = 20; + String max = ctx.getProperty("router.tunnel.maxConcurrentBuilds", "20"); if (max != null) { try { _maxOutstandingBuilds = Integer.parseInt(max); } catch (NumberFormatException nfe) { - _maxOutstandingBuilds = 10; + _maxOutstandingBuilds = 20; } }