From ff0023a889615b65b1c2fc906d103cdfe65e7572 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Sun, 16 May 2004 04:54:50 +0000 Subject: [PATCH] big ol' memory, cpu usage, and shutdown handling update. main changes include: * rather than have all jobs created hooked into the clock for offset updates, have the jobQueue stay hooked up and update any active jobs accordingly (killing a memory leak of a JobTiming objects - one per job) * dont go totally insane during shutdown and log like mad (though the clientApp things still log like mad, since they don't know the router is going down) * adjust memory buffer sizes based on real world values so we don't have to expand/contract a lot * dont display things that are completely useless (who cares what the first 32 bytes of a public key are?) * reduce temporary object creation * use more efficient collections at times * on shutdown, log some state information (ready/timed jobs, pending messages, etc) * explicit GC every 10 jobs. yeah, not efficient, but just for now we'll keep 'er in there * only reread the router config file if it changes (duh) --- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 2 + .../i2p/client/streaming/ByteCollector.java | 2 +- core/java/src/net/i2p/data/PrivateKey.java | 8 +-- core/java/src/net/i2p/data/PublicKey.java | 8 +-- core/java/src/net/i2p/data/RouterInfo.java | 2 +- core/java/src/net/i2p/data/SessionKey.java | 8 +-- core/java/src/net/i2p/data/Signature.java | 8 +-- .../src/net/i2p/data/SigningPrivateKey.java | 8 +-- .../src/net/i2p/data/SigningPublicKey.java | 8 +-- core/java/src/net/i2p/stat/Rate.java | 2 +- core/java/src/net/i2p/stat/RateStat.java | 6 +- core/java/src/net/i2p/stat/StatManager.java | 26 +++++---- core/java/src/net/i2p/util/LogManager.java | 21 ++++--- .../src/net/i2p/util/LogRecordFormatter.java | 2 +- core/java/src/net/i2p/util/LogWriter.java | 8 +-- .../src/net/i2p/data/i2np/TunnelMessage.java | 2 +- router/java/src/net/i2p/router/JobImpl.java | 4 +- router/java/src/net/i2p/router/JobQueue.java | 56 +++++++++++++++++-- .../src/net/i2p/router/JobQueueRunner.java | 16 ++++-- router/java/src/net/i2p/router/JobTiming.java | 2 +- .../src/net/i2p/router/MessageHistory.java | 10 ++-- .../src/net/i2p/router/MessageValidator.java | 8 ++- .../src/net/i2p/router/OutNetMessage.java | 4 +- router/java/src/net/i2p/router/Router.java | 4 +- .../src/net/i2p/router/StatisticsManager.java | 2 +- .../router/client/ClientConnectionRunner.java | 5 +- .../router/client/ClientListenerRunner.java | 16 ++++-- .../router/peermanager/ProfileOrganizer.java | 14 ++--- .../net/i2p/router/startup/ReadConfigJob.java | 15 ++++- .../transport/OutboundMessageRegistry.java | 9 +++ .../i2p/router/transport/VMCommSystem.java | 5 +- .../i2p/router/tunnelmanager/TunnelPool.java | 3 +- 32 files changed, 195 insertions(+), 99 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index deb56127eb..6394e925e4 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -121,6 +121,8 @@ public class I2PTunnelRunner extends I2PThread { } catch (IOException ex) { ex.printStackTrace(); _log.debug("Error forwarding", ex); + } catch (Exception e) { + _log.error("Internal error", e); } finally { try { if (s != null) s.close(); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java b/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java index 912914bf55..f35b0a2215 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java @@ -6,7 +6,7 @@ public class ByteCollector { int size; public ByteCollector() { - contents = new byte[80]; + contents = new byte[1024]; size = 0; } diff --git a/core/java/src/net/i2p/data/PrivateKey.java b/core/java/src/net/i2p/data/PrivateKey.java index 5bbebc2b54..1462795004 100644 --- a/core/java/src/net/i2p/data/PrivateKey.java +++ b/core/java/src/net/i2p/data/PrivateKey.java @@ -69,10 +69,10 @@ public class PrivateKey extends DataStructureImpl { buf.append("null key"); } else { buf.append("size: ").append(_data.length); - int len = 32; - if (len > _data.length) len = _data.length; - buf.append(" first ").append(len).append(" bytes: "); - buf.append(DataHelper.toString(_data, len)); + //int len = 32; + //if (len > _data.length) len = _data.length; + //buf.append(" first ").append(len).append(" bytes: "); + //buf.append(DataHelper.toString(_data, len)); } buf.append("]"); return buf.toString(); diff --git a/core/java/src/net/i2p/data/PublicKey.java b/core/java/src/net/i2p/data/PublicKey.java index 6713febe50..d182d99331 100644 --- a/core/java/src/net/i2p/data/PublicKey.java +++ b/core/java/src/net/i2p/data/PublicKey.java @@ -68,10 +68,10 @@ public class PublicKey extends DataStructureImpl { buf.append("null key"); } else { buf.append("size: ").append(_data.length); - int len = 32; - if (len > _data.length) len = _data.length; - buf.append(" first ").append(len).append(" bytes: "); - buf.append(DataHelper.toString(_data, len)); + //int len = 32; + //if (len > _data.length) len = _data.length; + //buf.append(" first ").append(len).append(" bytes: "); + //buf.append(DataHelper.toString(_data, len)); } buf.append("]"); return buf.toString(); diff --git a/core/java/src/net/i2p/data/RouterInfo.java b/core/java/src/net/i2p/data/RouterInfo.java index 89f332e27e..2f36d79b5b 100644 --- a/core/java/src/net/i2p/data/RouterInfo.java +++ b/core/java/src/net/i2p/data/RouterInfo.java @@ -422,7 +422,7 @@ public class RouterInfo extends DataStructureImpl { public String toString() { if (_stringified != null) return _stringified; - StringBuffer buf = new StringBuffer(128); + StringBuffer buf = new StringBuffer(5*1024); buf.append("[RouterInfo: "); buf.append("\n\tIdentity: ").append(getIdentity()); buf.append("\n\tSignature: ").append(getSignature()); diff --git a/core/java/src/net/i2p/data/SessionKey.java b/core/java/src/net/i2p/data/SessionKey.java index c8e6ca56dd..423b47a241 100644 --- a/core/java/src/net/i2p/data/SessionKey.java +++ b/core/java/src/net/i2p/data/SessionKey.java @@ -67,10 +67,10 @@ public class SessionKey extends DataStructureImpl { buf.append("null key"); } else { buf.append("size: ").append(_data.length); - int len = 32; - if (len > _data.length) len = _data.length; - buf.append(" first ").append(len).append(" bytes: "); - buf.append(DataHelper.toString(_data, len)); + //int len = 32; + //if (len > _data.length) len = _data.length; + //buf.append(" first ").append(len).append(" bytes: "); + //buf.append(DataHelper.toString(_data, len)); } buf.append("]"); return buf.toString(); diff --git a/core/java/src/net/i2p/data/Signature.java b/core/java/src/net/i2p/data/Signature.java index b389b2d59f..b0a2a95238 100644 --- a/core/java/src/net/i2p/data/Signature.java +++ b/core/java/src/net/i2p/data/Signature.java @@ -73,10 +73,10 @@ public class Signature extends DataStructureImpl { buf.append("null signature"); } else { buf.append("size: ").append(_data.length); - int len = 32; - if (len > _data.length) len = _data.length; - buf.append(" first ").append(len).append(" bytes: "); - buf.append(DataHelper.toString(_data, len)); + //int len = 32; + //if (len > _data.length) len = _data.length; + //buf.append(" first ").append(len).append(" bytes: "); + //buf.append(DataHelper.toString(_data, len)); } buf.append("]"); return buf.toString(); diff --git a/core/java/src/net/i2p/data/SigningPrivateKey.java b/core/java/src/net/i2p/data/SigningPrivateKey.java index fe0a2a0f93..8e58f146df 100644 --- a/core/java/src/net/i2p/data/SigningPrivateKey.java +++ b/core/java/src/net/i2p/data/SigningPrivateKey.java @@ -69,10 +69,10 @@ public class SigningPrivateKey extends DataStructureImpl { buf.append("null key"); } else { buf.append("size: ").append(_data.length); - int len = 32; - if (len > _data.length) len = _data.length; - buf.append(" first ").append(len).append(" bytes: "); - buf.append(DataHelper.toString(_data, len)); + //int len = 32; + //if (len > _data.length) len = _data.length; + //buf.append(" first ").append(len).append(" bytes: "); + //buf.append(DataHelper.toString(_data, len)); } buf.append("]"); return buf.toString(); diff --git a/core/java/src/net/i2p/data/SigningPublicKey.java b/core/java/src/net/i2p/data/SigningPublicKey.java index d320589e3d..2af8e5a474 100644 --- a/core/java/src/net/i2p/data/SigningPublicKey.java +++ b/core/java/src/net/i2p/data/SigningPublicKey.java @@ -69,10 +69,10 @@ public class SigningPublicKey extends DataStructureImpl { buf.append("null key"); } else { buf.append("size: ").append(_data.length); - int len = 32; - if (len > _data.length) len = _data.length; - buf.append(" first ").append(len).append(" bytes: "); - buf.append(DataHelper.toString(_data, len)); + //int len = 32; + //if (len > _data.length) len = _data.length; + //buf.append(" first ").append(len).append(" bytes: "); + //buf.append(DataHelper.toString(_data, len)); } buf.append("]"); return buf.toString(); diff --git a/core/java/src/net/i2p/stat/Rate.java b/core/java/src/net/i2p/stat/Rate.java index fb2dae5767..efe1574c26 100644 --- a/core/java/src/net/i2p/stat/Rate.java +++ b/core/java/src/net/i2p/stat/Rate.java @@ -350,7 +350,7 @@ public class Rate { } public void store(OutputStream out, String prefix) throws IOException { - StringBuffer buf = new StringBuffer(2048); + StringBuffer buf = new StringBuffer(16*1048); PersistenceHelper.add(buf, prefix, ".period", "Number of milliseconds in the period", _period); PersistenceHelper.add(buf, prefix, ".creationDate", "When was this rate created? (milliseconds since the epoch, GMT)", _creationDate); diff --git a/core/java/src/net/i2p/stat/RateStat.java b/core/java/src/net/i2p/stat/RateStat.java index f770f0eb9f..1938a4a99c 100644 --- a/core/java/src/net/i2p/stat/RateStat.java +++ b/core/java/src/net/i2p/stat/RateStat.java @@ -76,7 +76,7 @@ public class RateStat { private final static String NL = System.getProperty("line.separator"); public String toString() { - StringBuffer buf = new StringBuffer(512); + StringBuffer buf = new StringBuffer(4096); buf.append(getGroupName()).append('.').append(getName()).append(": ").append(getDescription()).append('\n'); long periods[] = getPeriods(); Arrays.sort(periods); @@ -103,7 +103,7 @@ public class RateStat { } public void store(OutputStream out, String prefix) throws IOException { - StringBuffer buf = new StringBuffer(128); + StringBuffer buf = new StringBuffer(1024); buf.append(NL); buf.append("################################################################################").append(NL); buf.append("# Rate: ").append(_groupName).append(": ").append(_statName).append(NL); @@ -112,7 +112,7 @@ public class RateStat { out.write(buf.toString().getBytes()); buf = null; for (int i = 0; i < _rates.length; i++) { - StringBuffer rbuf = new StringBuffer(256); + StringBuffer rbuf = new StringBuffer(1024); rbuf.append("#######").append(NL); rbuf.append("# Period : ").append(DataHelper.formatDuration(_rates[i].getPeriod())).append(" for rate ") .append(_groupName).append(" - ").append(_statName).append(NL); diff --git a/core/java/src/net/i2p/stat/StatManager.java b/core/java/src/net/i2p/stat/StatManager.java index 72ea068233..f768fdafcf 100644 --- a/core/java/src/net/i2p/stat/StatManager.java +++ b/core/java/src/net/i2p/stat/StatManager.java @@ -80,18 +80,20 @@ public class StatManager { } public void coallesceStats() { - for (Iterator iter = getFrequencyNames().iterator(); iter.hasNext();) { - String name = (String) iter.next(); - FrequencyStat stat = getFrequency(name); - if (stat != null) { - stat.coallesceStats(); + synchronized (_frequencyStats) { + for (Iterator iter = _frequencyStats.values().iterator(); iter.hasNext();) { + FrequencyStat stat = (FrequencyStat)iter.next(); + if (stat != null) { + stat.coallesceStats(); + } } } - for (Iterator iter = getRateNames().iterator(); iter.hasNext();) { - String name = (String) iter.next(); - RateStat stat = getRate(name); - if (stat != null) { - stat.coallesceStats(); + synchronized (_rateStats) { + for (Iterator iter = _rateStats.values().iterator(); iter.hasNext();) { + RateStat stat = (RateStat)iter.next(); + if (stat != null) { + stat.coallesceStats(); + } } } } @@ -105,11 +107,11 @@ public class StatManager { } public Set getFrequencyNames() { - return Collections.unmodifiableSet(new HashSet(_frequencyStats.keySet())); + return new HashSet(_frequencyStats.keySet()); } public Set getRateNames() { - return Collections.unmodifiableSet(new HashSet(_rateStats.keySet())); + return new HashSet(_rateStats.keySet()); } /** is the given stat a monitored rate? */ diff --git a/core/java/src/net/i2p/util/LogManager.java b/core/java/src/net/i2p/util/LogManager.java index 4e94d63e44..d4b1238af4 100644 --- a/core/java/src/net/i2p/util/LogManager.java +++ b/core/java/src/net/i2p/util/LogManager.java @@ -18,7 +18,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Properties; -import java.util.Set; +import java.util.ArrayList; import java.util.Map; import java.util.HashMap; @@ -68,7 +68,7 @@ public class LogManager { private String _location; private List _records; - private Set _limits; + private List _limits; private Map _logs; private LogWriter _writer; @@ -88,7 +88,7 @@ public class LogManager { public LogManager(I2PAppContext context) { _displayOnScreen = true; _records = new ArrayList(); - _limits = new HashSet(); + _limits = new ArrayList(128); _logs = new HashMap(128); _defaultLimit = Log.DEBUG; _configLastRead = 0; @@ -197,7 +197,6 @@ public class LogManager { // private void loadConfig() { - Properties p = new Properties(); File cfgFile = new File(_location); if ((_configLastRead > 0) && (_configLastRead >= cfgFile.lastModified())) { if (_log.shouldLog(Log.DEBUG)) @@ -207,6 +206,7 @@ public class LogManager { if (_log.shouldLog(Log.DEBUG)) _log.debug("Loading config from " + _location); } + Properties p = new Properties(); FileInputStream fis = null; try { fis = new FileInputStream(cfgFile); @@ -293,7 +293,8 @@ public class LogManager { LogLimit lim = new LogLimit(name, Log.getLevel(val)); //_log.debug("Limit found for " + name + " as " + val); synchronized (_limits) { - _limits.add(lim); + if (!_limits.contains(lim)) + _limits.add(lim); } } } @@ -366,10 +367,10 @@ public class LogManager { } private List getLimits(Log log) { - ArrayList limits = new ArrayList(); + ArrayList limits = new ArrayList(4); synchronized (_limits) { - for (Iterator iter = _limits.iterator(); iter.hasNext();) { - LogLimit limit = (LogLimit) iter.next(); + for (int i = 0; i < _limits.size(); i++) { + LogLimit limit = (LogLimit)_limits.get(i); if (limit.matches(log)) limits.add(limit); } } @@ -395,6 +396,8 @@ public class LogManager { List _removeAll() { List vals = null; synchronized (_records) { + if (_records.size() <= 0) + return null; vals = new ArrayList(_records); _records.clear(); } @@ -431,7 +434,7 @@ public class LogManager { } public void shutdown() { - _log.log(Log.CRIT, "Shutting down logger", new Exception("Shutdown")); + _log.log(Log.CRIT, "Shutting down logger"); _writer.flushRecords(); } diff --git a/core/java/src/net/i2p/util/LogRecordFormatter.java b/core/java/src/net/i2p/util/LogRecordFormatter.java index 52ce0426e7..ddf950c272 100644 --- a/core/java/src/net/i2p/util/LogRecordFormatter.java +++ b/core/java/src/net/i2p/util/LogRecordFormatter.java @@ -27,7 +27,7 @@ class LogRecordFormatter { private final static int MAX_PRIORITY_LENGTH = 5; public static String formatRecord(LogManager manager, LogRecord rec) { - StringBuffer buf = new StringBuffer(); + StringBuffer buf = new StringBuffer(1024); char format[] = manager._getFormat(); for (int i = 0; i < format.length; ++i) { switch ((int) format[i]) { diff --git a/core/java/src/net/i2p/util/LogWriter.java b/core/java/src/net/i2p/util/LogWriter.java index 1dd472aac0..716b01b2f9 100644 --- a/core/java/src/net/i2p/util/LogWriter.java +++ b/core/java/src/net/i2p/util/LogWriter.java @@ -53,6 +53,7 @@ class LogWriter implements Runnable { public void flushRecords() { try { List records = _manager._removeAll(); + if (records == null) return; for (int i = 0; i < records.size(); i++) { LogRecord rec = (LogRecord) records.get(i); writeRecord(rec); @@ -64,13 +65,10 @@ class LogWriter implements Runnable { System.err.println("Error flushing the records"); } } - records.clear(); - try { - Thread.sleep(30); - } catch (InterruptedException ie) { - } } catch (Throwable t) { t.printStackTrace(); + } finally { + try { Thread.sleep(100); } catch (InterruptedException ie) {} } long now = Clock.getInstance().now(); if (now - _lastReadConfig > CONFIG_READ_ITERVAL) { diff --git a/router/java/src/net/i2p/data/i2np/TunnelMessage.java b/router/java/src/net/i2p/data/i2np/TunnelMessage.java index d8bb55ff55..5463a35c7e 100644 --- a/router/java/src/net/i2p/data/i2np/TunnelMessage.java +++ b/router/java/src/net/i2p/data/i2np/TunnelMessage.java @@ -89,7 +89,7 @@ public class TunnelMessage extends I2NPMessageImpl { if ( (_tunnelId == null) || (_data == null) || (_data.length <= 0) ) throw new I2NPMessageException("Not enough data to write out"); - ByteArrayOutputStream os = new ByteArrayOutputStream(32); + ByteArrayOutputStream os = new ByteArrayOutputStream(4096); try { _tunnelId.writeBytes(os); if (_log.shouldLog(Log.DEBUG)) diff --git a/router/java/src/net/i2p/router/JobImpl.java b/router/java/src/net/i2p/router/JobImpl.java index 6cc9d5f273..c08502fe64 100644 --- a/router/java/src/net/i2p/router/JobImpl.java +++ b/router/java/src/net/i2p/router/JobImpl.java @@ -9,6 +9,7 @@ package net.i2p.router; */ import net.i2p.util.Clock; +import net.i2p.util.Log; /** * Base implementation of a Job */ @@ -39,7 +40,8 @@ public abstract class JobImpl implements Job { } void addedToQueue() { - _addedBy = new Exception(); + if (_context.logManager().getLog(JobImpl.class).shouldLog(Log.DEBUG)) + _addedBy = new Exception(); } public Exception getAddedBy() { return _addedBy; } diff --git a/router/java/src/net/i2p/router/JobQueue.java b/router/java/src/net/i2p/router/JobQueue.java index bd02d75e12..1fa56e297a 100644 --- a/router/java/src/net/i2p/router/JobQueue.java +++ b/router/java/src/net/i2p/router/JobQueue.java @@ -90,9 +90,6 @@ public class JobQueue { private final static int DEFAULT_MAX_WAITING_JOBS = 20; private final static String PROP_MAX_WAITING_JOBS = "router.maxWaitingJobs"; - static { - } - /** * queue runners wait on this whenever they're not doing anything, and * this gets notified *once* whenever there are ready jobs @@ -229,7 +226,17 @@ public class JobQueue { } public void allowParallelOperation() { _allowParallelOperation = true; } - void shutdown() { _alive = false; } + void shutdown() { + _alive = false; + StringBuffer buf = new StringBuffer(1024); + buf.append("jobs: \nready jobs: ").append(_readyJobs.size()).append("\n\t"); + for (int i = 0; i < _readyJobs.size(); i++) + buf.append(_readyJobs.get(i).toString()).append("\n\t"); + buf.append("\n\ntimed jobs: ").append(_timedJobs.size()).append("\n\t"); + for (int i = 0; i < _timedJobs.size(); i++) + buf.append(_timedJobs.get(i).toString()).append("\n\t"); + _log.log(Log.CRIT, buf.toString()); + } boolean isAlive() { return _alive; } /** @@ -276,7 +283,7 @@ public class JobQueue { private int checkJobTimings() { boolean newJobsReady = false; long now = _context.clock().now(); - ArrayList toAdd = new ArrayList(4); + ArrayList toAdd = null; synchronized (_timedJobs) { for (int i = 0; i < _timedJobs.size(); i++) { Job j = (Job)_timedJobs.get(i); @@ -285,6 +292,7 @@ public class JobQueue { if (j instanceof JobImpl) ((JobImpl)j).madeReady(); + if (toAdd == null) toAdd = new ArrayList(4); toAdd.add(j); _timedJobs.remove(i); i--; // so the index stays consistent @@ -294,7 +302,15 @@ public class JobQueue { int ready = 0; synchronized (_readyJobs) { - _readyJobs.addAll(toAdd); + if (toAdd != null) { + // rather than addAll, which allocs a byte array rv before adding, + // we iterate, since toAdd is usually going to only be 1 or 2 entries + // and since readyJobs will often have the space, we can avoid the + // extra alloc. (no, i'm not just being insane - i'm updating this based + // on some profiling data ;) + for (int i = 0; i < toAdd.size(); i++) + _readyJobs.add(toAdd.get(i)); + } ready = _readyJobs.size(); } @@ -399,10 +415,38 @@ public class JobQueue { public void offsetChanged(long delta) { if (_lastLimitUpdated > 0) _lastLimitUpdated += delta; + updateJobTimings(delta); } } + /** + * Update the clock data for all jobs in process or scheduled for + * completion. + */ + private void updateJobTimings(long delta) { + synchronized (_timedJobs) { + for (int i = 0; i < _timedJobs.size(); i++) { + Job j = (Job)_timedJobs.get(i); + j.getTiming().offsetChanged(delta); + } + } + synchronized (_readyJobs) { + for (int i = 0; i < _readyJobs.size(); i++) { + Job j = (Job)_readyJobs.get(i); + j.getTiming().offsetChanged(delta); + } + } + synchronized (_runnerLock) { + for (Iterator iter = _queueRunners.values().iterator(); iter.hasNext(); ) { + JobQueueRunner runner = (JobQueueRunner)iter.next(); + Job job = runner.getCurrentJob(); + if (job != null) + job.getTiming().offsetChanged(delta); + } + } + } + /** * calculate and update the job timings * if it was lagged too much or took too long to run, spit out diff --git a/router/java/src/net/i2p/router/JobQueueRunner.java b/router/java/src/net/i2p/router/JobQueueRunner.java index 3f0d34826c..134fc465bb 100644 --- a/router/java/src/net/i2p/router/JobQueueRunner.java +++ b/router/java/src/net/i2p/router/JobQueueRunner.java @@ -34,12 +34,14 @@ class JobQueueRunner implements Runnable { public void stopRunning() { _keepRunning = false; } public void run() { long lastActive = _context.clock().now(); + long jobNum = 0; while ( (_keepRunning) && (_context.jobQueue().isAlive()) ) { try { Job job = _context.jobQueue().getNext(); if (job == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("getNext returned null - dead?"); + if (_context.router().isAlive()) + if (_log.shouldLog(Log.ERROR)) + _log.error("getNext returned null - dead?"); continue; } long now = _context.clock().now(); @@ -85,13 +87,18 @@ class JobQueueRunner implements Runnable { lastActive = _context.clock().now(); _lastJob = _currentJob; _currentJob = null; + jobNum++; + + if ( (jobNum % 10) == 0) + System.gc(); } catch (Throwable t) { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "WTF, error running?", t); } } - if (_log.shouldLog(Log.CRIT)) - _log.log(Log.CRIT, "Queue runner " + _id + " exiting"); + if (_context.router().isAlive()) + if (_log.shouldLog(Log.CRIT)) + _log.log(Log.CRIT, "Queue runner " + _id + " exiting"); _context.jobQueue().removeRunner(_id); } @@ -102,6 +109,7 @@ class JobQueueRunner implements Runnable { try { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, "Router ran out of memory, shutting down", oom); + _log.log(Log.CRIT, _currentJob.getClass().getName()); _context.router().shutdown(); } catch (Throwable t) { System.err.println("***Router ran out of memory, shutting down hard"); diff --git a/router/java/src/net/i2p/router/JobTiming.java b/router/java/src/net/i2p/router/JobTiming.java index c623c38d99..8f2e939ac8 100644 --- a/router/java/src/net/i2p/router/JobTiming.java +++ b/router/java/src/net/i2p/router/JobTiming.java @@ -25,7 +25,7 @@ public class JobTiming implements Clock.ClockUpdateListener { _start = context.clock().now(); _actualStart = 0; _actualEnd = 0; - context.clock().addUpdateListener(this); + //context.clock().addUpdateListener(this); } /** diff --git a/router/java/src/net/i2p/router/MessageHistory.java b/router/java/src/net/i2p/router/MessageHistory.java index c2cf7edb54..420e2b4bfd 100644 --- a/router/java/src/net/i2p/router/MessageHistory.java +++ b/router/java/src/net/i2p/router/MessageHistory.java @@ -5,7 +5,7 @@ import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Iterator; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.TimeZone; @@ -94,7 +94,7 @@ public class MessageHistory { _doLog = DEFAULT_KEEP_MESSAGE_HISTORY; _historyFile = filename; _localIdent = getName(_context.routerHash()); - _unwrittenEntries = new LinkedList(); + _unwrittenEntries = new ArrayList(64); updateSettings(); addEntry(getPrefix() + "** Router initialized (started up or changed identities)"); _context.jobQueue().addJob(_writeJob); @@ -338,7 +338,7 @@ public class MessageHistory { */ public void sendMessage(String messageType, long messageId, Date expiration, Hash peer, boolean sentOk) { if (!_doLog) return; - StringBuffer buf = new StringBuffer(128); + StringBuffer buf = new StringBuffer(256); buf.append(getPrefix()); buf.append("send [").append(messageType).append("] message [").append(messageId).append("] "); buf.append("to [").append(getName(peer)).append("] "); @@ -363,7 +363,7 @@ public class MessageHistory { */ public void receiveMessage(String messageType, long messageId, Date expiration, Hash from, boolean isValid) { if (!_doLog) return; - StringBuffer buf = new StringBuffer(128); + StringBuffer buf = new StringBuffer(256); buf.append(getPrefix()); buf.append("receive [").append(messageType).append("] with id [").append(messageId).append("] "); if (from != null) @@ -473,7 +473,7 @@ public class MessageHistory { if (_doPause) return; List entries = null; synchronized (_unwrittenEntries) { - entries = new LinkedList(_unwrittenEntries); + entries = new ArrayList(_unwrittenEntries); _unwrittenEntries.clear(); } writeEntries(entries); diff --git a/router/java/src/net/i2p/router/MessageValidator.java b/router/java/src/net/i2p/router/MessageValidator.java index 8dd8bc0500..4347e8f7a9 100644 --- a/router/java/src/net/i2p/router/MessageValidator.java +++ b/router/java/src/net/i2p/router/MessageValidator.java @@ -35,7 +35,7 @@ public class MessageValidator { public MessageValidator(RouterContext context) { _log = context.logManager().getLog(MessageValidator.class); _receivedIdExpirations = new TreeMap(); - _receivedIds = new HashSet(1024); + _receivedIds = new HashSet(32*1024); _receivedIdLock = new Object(); _context = context; } @@ -130,4 +130,10 @@ public class MessageValidator { if (_log.shouldLog(Log.INFO)) _log.info("Cleaned out " + toRemoveDates.size() + " expired messageIds, leaving " + _receivedIds.size() + " remaining"); } + + void shutdown() { + StringBuffer buf = new StringBuffer(1024); + buf.append("Validated messages: ").append(_receivedIds.size()); + _log.log(Log.CRIT, buf.toString()); + } } diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 2639283c23..0df345b720 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -115,7 +115,7 @@ public class OutNetMessage { public long getMessageSize() { if (_messageSize <= 0) { try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); // large enough to hold most messages + ByteArrayOutputStream baos = new ByteArrayOutputStream(2048); // large enough to hold most messages _message.writeBytes(baos); long sz = baos.size(); baos.reset(); @@ -136,7 +136,7 @@ public class OutNetMessage { ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); // large enough to hold most messages _message.writeBytes(baos); byte data[] = baos.toByteArray(); - baos.reset(); + _messageSize = data.length; return data; } catch (DataFormatException dfe) { _log.error("Error serializing the I2NPMessage for the OutNetMessage", dfe); diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 5c2dcfeff6..304434a297 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -392,6 +392,8 @@ public class Router { try { _context.netDb().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the networkDb", t); } try { _context.commSystem().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the comm system", t); } try { _context.peerManager().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the peer manager", t); } + try { _context.messageRegistry().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message registry", t); } + try { _context.messageValidator().shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the message validator", t); } try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); } dumpStats(); _log.log(Log.CRIT, "Shutdown complete", new Exception("Shutdown")); @@ -413,7 +415,7 @@ public class Router { private class ShutdownHook extends Thread { public void run() { - _log.log(Log.CRIT, "Shutting down the router...", new Exception("Shutting down")); + _log.log(Log.CRIT, "Shutting down the router..."); shutdown(); } } diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java index 435da9fabb..c18298e7ef 100644 --- a/router/java/src/net/i2p/router/StatisticsManager.java +++ b/router/java/src/net/i2p/router/StatisticsManager.java @@ -162,7 +162,7 @@ public class StatisticsManager implements Service { } private static String renderRate(Rate rate, boolean fudgeQuantity) { - StringBuffer buf = new StringBuffer(255); + StringBuffer buf = new StringBuffer(128); buf.append(num(rate.getAverageValue())).append(';'); buf.append(num(rate.getExtremeAverageValue())).append(';'); buf.append(pct(rate.getPercentageOfLifetimeValue())).append(';'); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 659c13864e..af9c9b36d0 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -109,8 +109,9 @@ public class ClientConnectionRunner { /** die a horrible death */ void stopRunning() { if (_dead) return; - _log.error("Stop the I2CP connection! current leaseSet: " - + _currentLeaseSet, new Exception("Stop client connection")); + if (_context.router().isAlive()) + _log.error("Stop the I2CP connection! current leaseSet: " + + _currentLeaseSet, new Exception("Stop client connection")); _dead = true; // we need these keys to unpublish the leaseSet if (_reader != null) _reader.stopReading(); diff --git a/router/java/src/net/i2p/router/client/ClientListenerRunner.java b/router/java/src/net/i2p/router/client/ClientListenerRunner.java index 310750c859..a314ce8106 100644 --- a/router/java/src/net/i2p/router/client/ClientListenerRunner.java +++ b/router/java/src/net/i2p/router/client/ClientListenerRunner.java @@ -73,28 +73,34 @@ public class ClientListenerRunner implements Runnable { socket.close(); } } catch (IOException ioe) { - _log.error("Server error accepting", ioe); + if (_context.router().isAlive()) + _log.error("Server error accepting", ioe); } catch (Throwable t) { - _log.error("Fatal error running client listener - killing the thread!", t); + if (_context.router().isAlive()) + _log.error("Fatal error running client listener - killing the thread!", t); return; } } } catch (IOException ioe) { - _log.error("Error listening on port " + _port, ioe); + if (_context.router().isAlive()) + _log.error("Error listening on port " + _port, ioe); } if (_socket != null) { try { _socket.close(); } catch (IOException ioe) {} _socket = null; } - + + if (!_context.router().isAlive()) break; + _log.error("Error listening, waiting " + _nextFailDelay + "ms before we try again"); try { Thread.sleep(_nextFailDelay); } catch (InterruptedException ie) {} curDelay += _nextFailDelay; _nextFailDelay *= 5; } - _log.error("CANCELING I2CP LISTEN. delay = " + curDelay, new Exception("I2CP Listen cancelled!!!")); + if (_context.router().isAlive()) + _log.error("CANCELING I2CP LISTEN. delay = " + curDelay, new Exception("I2CP Listen cancelled!!!")); _running = false; } diff --git a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java index 22afaaef4e..5cdbb49d5e 100644 --- a/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java +++ b/router/java/src/net/i2p/router/peermanager/ProfileOrganizer.java @@ -75,11 +75,11 @@ public class ProfileOrganizer { public ProfileOrganizer(RouterContext context) { _context = context; _log = context.logManager().getLog(ProfileOrganizer.class); - _fastAndReliablePeers = new HashMap(64); - _reliablePeers = new HashMap(512); - _wellIntegratedPeers = new HashMap(256); - _notFailingPeers = new HashMap(1024); - _failingPeers = new HashMap(4096); + _fastAndReliablePeers = new HashMap(16); + _reliablePeers = new HashMap(16); + _wellIntegratedPeers = new HashMap(16); + _notFailingPeers = new HashMap(16); + _failingPeers = new HashMap(16); _strictReliabilityOrder = new TreeSet(new InverseReliabilityComparator()); _thresholdSpeedValue = 0.0d; _thresholdReliabilityValue = 0.0d; @@ -466,11 +466,9 @@ public class ProfileOrganizer { all.remove(_us); howMany -= matches.size(); Collections.shuffle(all, _random); - Set rv = new HashSet(howMany); for (int i = 0; i < howMany && i < all.size(); i++) { - rv.add(all.get(i)); + matches.add(all.get(i)); } - matches.addAll(rv); } /** diff --git a/router/java/src/net/i2p/router/startup/ReadConfigJob.java b/router/java/src/net/i2p/router/startup/ReadConfigJob.java index e1ccb69499..b92cd67d6d 100644 --- a/router/java/src/net/i2p/router/startup/ReadConfigJob.java +++ b/router/java/src/net/i2p/router/startup/ReadConfigJob.java @@ -26,6 +26,7 @@ import net.i2p.router.RouterContext; */ public class ReadConfigJob extends JobImpl { private final static long DELAY = 30*1000; // reread every 30 seconds + private long _lastRead = -1; public ReadConfigJob(RouterContext ctx) { super(ctx); @@ -33,11 +34,23 @@ public class ReadConfigJob extends JobImpl { public String getName() { return "Read Router Configuration"; } public void runJob() { - doRead(_context); + if (shouldReread()) { + doRead(_context); + _lastRead = _context.clock().now(); + } getTiming().setStartAfter(_context.clock().now() + DELAY); _context.jobQueue().addJob(this); } + private boolean shouldReread() { + File configFile = new File(_context.router().getConfigFilename()); + if (!configFile.exists()) return false; + if (configFile.lastModified() > _lastRead) + return true; + else + return false; + } + public static void doRead(RouterContext ctx) { Router r = ctx.router(); String f = r.getConfigFilename(); diff --git a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java index 50bcf4b112..1a0f9282a0 100644 --- a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java +++ b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java @@ -41,6 +41,15 @@ public class OutboundMessageRegistry { _context.jobQueue().addJob(new CleanupPendingMessagesJob()); } + public void shutdown() { + StringBuffer buf = new StringBuffer(1024); + buf.append("Pending messages: ").append(_pendingMessages.size()).append("\n"); + for (Iterator iter = _pendingMessages.values().iterator(); iter.hasNext(); ) { + buf.append(iter.next().toString()).append("\n\t"); + } + _log.log(Log.CRIT, buf.toString()); + } + public List getOriginalMessages(I2NPMessage message) { HashSet matches = new HashSet(4); long beforeSync = _context.clock().now(); diff --git a/router/java/src/net/i2p/router/transport/VMCommSystem.java b/router/java/src/net/i2p/router/transport/VMCommSystem.java index 1d576c6940..9622247aa4 100644 --- a/router/java/src/net/i2p/router/transport/VMCommSystem.java +++ b/router/java/src/net/i2p/router/transport/VMCommSystem.java @@ -61,8 +61,9 @@ public class VMCommSystem extends CommSystemFacade { } else { _context.jobQueue().addJob(msg.getOnSendJob()); _context.profileManager().messageSent(msg.getTarget().getIdentity().getHash(), "vm", sendTime, msg.getMessageSize()); - _context.statManager().addRateData("transport.sendMessageSize", msg.getMessageSize(), sendTime); - peerSys.receive(msg.getMessage().toByteArray(), _context.routerHash()); + byte data[] = msg.getMessageData(); + _context.statManager().addRateData("transport.sendMessageSize", data.length, sendTime); + peerSys.receive(data, _context.routerHash()); //_context.jobQueue().addJob(new SendJob(peerSys, msg.getMessage(), _context)); sendSuccessful = true; } diff --git a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java index 83fac4dd64..819e8c2806 100644 --- a/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java +++ b/router/java/src/net/i2p/router/tunnelmanager/TunnelPool.java @@ -568,7 +568,8 @@ class TunnelPool { public void shutdown() { if (_log.shouldLog(Log.INFO)) _log.info("Shutting down tunnel pool"); - _persistenceHelper.writePool(this); + if (_persistenceHelper != null) + _persistenceHelper.writePool(this); _isLive = false; // the subjobs [should] check getIsLive() on each run _outboundTunnels = null; _freeInboundTunnels = null; -- GitLab