From c02522b0fed242ee3aa8bb6016b376d1f0fde6d9 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Thu, 29 Jul 2004 05:37:10 +0000 Subject: [PATCH] * track the message progress through the send process more carefully * drop the outbound message as soon as it expires rather than transferring an expired message * drop hard any outbound message that takes us over 5 seconds to process (if we have a 5s message processing time, we do no one any good) * don't try to resend (only useful when dealing with multiple transports - aka insufficiently tested code) * don't republish netDb messages as often --- .../src/net/i2p/router/OutNetMessage.java | 15 ++++- router/java/src/net/i2p/router/Router.java | 2 +- .../networkdb/kademlia/DataPublisherJob.java | 2 +- .../i2p/router/transport/TransportImpl.java | 64 +++++++++++++------ .../router/transport/tcp/TCPConnection.java | 33 ++++++++-- 5 files changed, 87 insertions(+), 29 deletions(-) diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index 93306e1746..4d76ed20e1 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -85,10 +85,21 @@ public class OutNetMessage { "OutNetMessage", new long[] { 5*60*1000, 30*60*1000, 60*60*1000 }); } - public void timestamp(String eventName) { + /** + * Stamp the message's progress + * + * @param eventName what occurred + * @return how long this message has been 'in flight' + */ + public long timestamp(String eventName) { synchronized (_timestamps) { - _timestamps.put(eventName, new Long(_context.clock().now())); + long now = _context.clock().now(); + while (_timestamps.containsKey(eventName)) { + eventName = eventName + '.'; + } + _timestamps.put(eventName, new Long(now)); _timestampOrder.add(eventName); + return now - _created; } } public Map getTimestamps() { diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 05d28709ac..d7a1f7202f 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -286,7 +286,7 @@ public class Router { if ( (notSent > 0) && (notReceived > 0) ) { double notSendKBps = notSent / (lifetime*1024.0); double notReceivedKBps = notReceived / (lifetime*1024.0); - buf.append("<li>Lifetime rate: "); + buf.append("<li>Lifetime unused rate: "); buf.append(fmt.format(notSendKBps)).append("KBps outbound unused "); buf.append(fmt.format(notReceivedKBps)).append("KBps inbound unused"); buf.append("</li>"); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java index 295e0eb4c8..38bccd48ba 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/DataPublisherJob.java @@ -23,7 +23,7 @@ import net.i2p.util.Log; class DataPublisherJob extends JobImpl { private Log _log; private KademliaNetworkDatabaseFacade _facade; - private final static long RERUN_DELAY_MS = 60*1000; + private final static long RERUN_DELAY_MS = 120*1000; private final static int MAX_SEND_PER_RUN = 1; // publish no more than 2 at a time private final static long STORE_TIMEOUT = 60*1000; // give 'er a minute to send the data diff --git a/router/java/src/net/i2p/router/transport/TransportImpl.java b/router/java/src/net/i2p/router/transport/TransportImpl.java index a6f1925600..bcf42d42d7 100644 --- a/router/java/src/net/i2p/router/transport/TransportImpl.java +++ b/router/java/src/net/i2p/router/transport/TransportImpl.java @@ -63,15 +63,28 @@ public abstract class TransportImpl implements Transport { } public void afterSend(OutNetMessage msg, boolean sendSuccessful) { - afterSend(msg, sendSuccessful, true); + afterSend(msg, sendSuccessful, true, 0); } public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue) { + afterSend(msg, sendSuccessful, allowRequeue, 0); + } + public void afterSend(OutNetMessage msg, boolean sendSuccessful, long msToSend) { + afterSend(msg, sendSuccessful, true, msToSend); + } + public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) { boolean log = false; msg.timestamp("afterSend(" + sendSuccessful + ")"); if (!sendSuccessful) msg.transportFailed(getStyle()); + if (msToSend > 1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("afterSend: [success=" + sendSuccessful + "] " + msg.getMessageSize() + "byte " + + msg.getMessageType() + " " + msg.getMessageId() + " from " + + _context.routerHash().toBase64().substring(0,6) + " took " + msToSend); + } + long lifetime = msg.getLifetime(); if (lifetime > 5000) { if (_log.shouldLog(Log.WARN)) @@ -104,23 +117,31 @@ public abstract class TransportImpl implements Transport { _context.statManager().addRateData("transport.expiredOnQueueLifetime", lifetime, lifetime); if (allowRequeue) { - if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) { - // this may not be the last transport available - keep going - _context.outNetMessagePool().add(msg); - // don't discard the data yet! - } else { - if (_log.shouldLog(Log.INFO)) - _log.info("No more time left (" + new Date(msg.getExpiration()) - + ", expiring without sending successfully the " - + msg.getMessageType()); - if (msg.getOnFailedSendJob() != null) - _context.jobQueue().addJob(msg.getOnFailedSendJob()); - MessageSelector selector = msg.getReplySelector(); - if (selector != null) { - _context.messageRegistry().unregisterPending(msg); - } + if (true) { + if (_log.shouldLog(Log.ERROR)) + _log.error("wtf, requeueing message " + msg.getMessageId() + " of type " + msg.getMessageType(), + new Exception("requeued by")); log = true; msg.discardData(); + } else { + if ( (msg.getExpiration() <= 0) || (msg.getExpiration() > _context.clock().now()) ) { + // this may not be the last transport available - keep going + _context.outNetMessagePool().add(msg); + // don't discard the data yet! + } else { + if (_log.shouldLog(Log.INFO)) + _log.info("No more time left (" + new Date(msg.getExpiration()) + + ", expiring without sending successfully the " + + msg.getMessageType()); + if (msg.getOnFailedSendJob() != null) + _context.jobQueue().addJob(msg.getOnFailedSendJob()); + MessageSelector selector = msg.getReplySelector(); + if (selector != null) { + _context.messageRegistry().unregisterPending(msg); + } + log = true; + msg.discardData(); + } } } else { if (_log.shouldLog(Log.INFO)) @@ -211,9 +232,13 @@ public abstract class TransportImpl implements Transport { protected abstract void outboundMessageReady(); public void messageReceived(I2NPMessage inMsg, RouterIdentity remoteIdent, Hash remoteIdentHash, long msToReceive, int bytesReceived) { - if (_log.shouldLog(Log.INFO)) { + int level = Log.INFO; + if (msToReceive > 5000) + level = Log.ERROR; + if (_log.shouldLog(level)) { StringBuffer buf = new StringBuffer(128); buf.append("Message received: ").append(inMsg.getClass().getName()); + buf.append(" / ").append(inMsg.getUniqueId()); buf.append(" in ").append(msToReceive).append("ms containing "); buf.append(bytesReceived).append(" bytes "); buf.append(" from "); @@ -228,7 +253,7 @@ public abstract class TransportImpl implements Transport { if (_listener != null) buf.append(_listener); - _log.info(buf.toString()); + _log.log(level, buf.toString()); } if (remoteIdent != null) @@ -239,8 +264,9 @@ public abstract class TransportImpl implements Transport { } _context.statManager().addRateData("transport.receiveMessageTime", msToReceive, msToReceive); - if (msToReceive > 1000) + if (msToReceive > 1000) { _context.statManager().addRateData("transport.receiveMessageTimeSlow", msToReceive, msToReceive); + } //// this functionality is built into the InNetMessagePool //String type = inMsg.getClass().getName(); diff --git a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java index 98aa1af1b5..72c27cf373 100644 --- a/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java +++ b/router/java/src/net/i2p/router/transport/tcp/TCPConnection.java @@ -309,7 +309,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { for (int i = 0; i < removed.size(); i++) { OutNetMessage cur = (OutNetMessage)removed.get(i); msg.timestamp("TCPConnection.addMessage expired but not our fault"); - _transport.afterSend(cur, false); + _transport.afterSend(cur, false, false); } } @@ -331,7 +331,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { _context.profileManager().commErrorOccurred(_remoteIdentity.getHash()); msg.timestamp("TCPConnection.addMessage saw an expired queued message"); - _transport.afterSend(msg, false); + _transport.afterSend(msg, false, false); // should we really be closing a connection if they're that slow? // yeah, i think we should. closeConnection(); @@ -490,7 +490,19 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { timedOut.add(cur); _toBeSent.remove(i); i--; - } + } else { + long lifetime = cur.timestamp("TCPConnection.runner.locked_expireOldMessages still ok with " + + (i) + " ahead and " + (_toBeSent.size()-i-1) + + " behind on the queue"); + if (lifetime > 5*1000) { + cur.timestamp("TCPConnection.runner.locked_expireOldMessages lifetime too long - " + lifetime); + if (timedOut == null) + timedOut = new ArrayList(2); + timedOut.add(cur); + _toBeSent.remove(i); + i--; + } + } } boolean reallySlowFound = false; @@ -503,8 +515,8 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { + " timed out while sitting on the TCP Connection's queue! was too slow by: " + (now-failed.getExpiration()) + "ms to " + _remoteIdentity.getHash().toBase64() + ": " + failed); - failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired"); - _transport.afterSend(failed, false); + failed.timestamp("TCPConnection.runner.locked_expireOldMessages expired with " + _toBeSent.size() + " left"); + _transport.afterSend(failed, false, false); if (failed.getLifetime() >= MIN_MESSAGE_LIFETIME_FOR_PENALTY) reallySlowFound = true; } @@ -521,6 +533,15 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { msg.timestamp("TCPConnection.runner.doSend fetched"); long afterExpire = _context.clock().now(); + long remaining = msg.getExpiration() - afterExpire; + if (remaining < 0) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Message " + msg.getMessageType() + "/" + msg.getMessageId() + + " expired before doSend (too slow by " + remaining + "ms)"); + _transport.afterSend(msg, false, false); + return true; + } + byte data[] = msg.getMessageData(); if (data == null) { if (_log.shouldLog(Log.WARN)) @@ -573,7 +594,7 @@ class TCPConnection implements I2NPMessageReader.I2NPMessageEventListener { + "ms) - time left (" + timeLeft + ") to " + _remoteIdentity.getHash().toBase64() + "\n" + msg.toString()); } - _transport.afterSend(msg, true); + _transport.afterSend(msg, true, (end-beforeWrite)); if (_log.shouldLog(Log.DEBUG)) _log.debug("doSend - message sent completely: " -- GitLab