diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index a5c219022163d39bb462430af1d9b1e07c72d879..39091606f823639de6691992087dedcee6e78857 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -60,11 +60,13 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { //local is fast, so synchronously. Does not need that many //threads. try { - socket.setReadTimeout(readTimeout); + // give them 5 seconds to send in the HTTP request + socket.setReadTimeout(5*1000); String modifiedHeader = getModifiedHeader(socket); if (_log.shouldLog(Log.DEBUG)) _log.debug("Modified header: [" + modifiedHeader + "]"); + socket.setReadTimeout(readTimeout); Socket s = new Socket(remoteHost, remotePort); afterSocket = getTunnel().getContext().clock().now(); new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), null); diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 5e0a8702e410b5c43d30ef18aaffbd6ebdb47faf..f6d1c2ea0a6cbc884a54b9e8d7ebea7ca481544a 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -11,6 +11,7 @@ import java.io.InputStream; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.net.ConnectException; import java.util.Iterator; import java.util.Properties; @@ -39,6 +40,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { protected InetAddress remoteHost; protected int remotePort; + private boolean _usePool; private Logging l; @@ -46,15 +48,27 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { /** default timeout to 3 minutes - override if desired */ protected long readTimeout = DEFAULT_READ_TIMEOUT; + private static final boolean DEFAULT_USE_POOL = false; + public I2PTunnelServer(InetAddress host, int port, String privData, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super(host + ":" + port + " <- " + privData, notifyThis, tunnel); ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decode(privData)); + String usePool = tunnel.getClientOptions().getProperty("i2ptunnel.usePool"); + if (usePool != null) + _usePool = "true".equalsIgnoreCase(usePool); + else + _usePool = DEFAULT_USE_POOL; init(host, port, bais, privData, l); } public I2PTunnelServer(InetAddress host, int port, File privkey, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super(host + ":" + port + " <- " + privkeyname, notifyThis, tunnel); + String usePool = tunnel.getClientOptions().getProperty("i2ptunnel.usePool"); + if (usePool != null) + _usePool = "true".equalsIgnoreCase(usePool); + else + _usePool = DEFAULT_USE_POOL; try { init(host, port, new FileInputStream(privkey), privkeyname, l); } catch (IOException ioe) { @@ -65,6 +79,11 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { public I2PTunnelServer(InetAddress host, int port, InputStream privData, String privkeyname, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super(host + ":" + port + " <- " + privkeyname, notifyThis, tunnel); + String usePool = tunnel.getClientOptions().getProperty("i2ptunnel.usePool"); + if (usePool != null) + _usePool = "true".equalsIgnoreCase(usePool); + else + _usePool = DEFAULT_USE_POOL; init(host, port, privData, privkeyname, l); } @@ -178,22 +197,34 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } public void run() { + if (shouldUsePool()) { I2PServerSocket i2pss = sockMgr.getServerSocket(); int handlers = getHandlerCount(); for (int i = 0; i < handlers; i++) { I2PThread handler = new I2PThread(new Handler(i2pss), "Handle Server " + i); handler.start(); } - /* + } else { + I2PServerSocket i2pss = sockMgr.getServerSocket(); while (true) { - I2PSocket i2ps = i2pss.accept(); - if (i2ps == null) throw new I2PException("I2PServerSocket closed"); - I2PThread t = new I2PThread(new Handler(i2ps)); - t.start(); + try { + final I2PSocket i2ps = i2pss.accept(); + if (i2ps == null) throw new I2PException("I2PServerSocket closed"); + new I2PThread(new Runnable() { public void run() { blockingHandle(i2ps); } }).start(); + } catch (I2PException ipe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error accepting - KILLING THE TUNNEL SERVER", ipe); + return; + } catch (ConnectException ce) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error accepting", ce); + // not killing the server.. + } } - */ + } } + public boolean shouldUsePool() { return _usePool; } /** * minor thread pool to pull off the accept() concurrently. there are still lots diff --git a/core/java/src/net/i2p/data/RouterInfo.java b/core/java/src/net/i2p/data/RouterInfo.java index f1faec759a2f31a20c22bb8d79f24655b5a879ab..aa2bcc1ce78ec35b385ee6b8e8ae282927abcc3b 100644 --- a/core/java/src/net/i2p/data/RouterInfo.java +++ b/core/java/src/net/i2p/data/RouterInfo.java @@ -50,6 +50,7 @@ public class RouterInfo extends DataStructureImpl { private volatile boolean _hashCodeInitialized; public static final String PROP_NETWORK_ID = "netId"; + public static final String PROP_CAPABILITIES = "caps"; public RouterInfo() { setIdentity(null); @@ -298,6 +299,22 @@ public class RouterInfo extends DataStructureImpl { } return -1; } + + /** + * what special capabilities this router offers + * + */ + public String getCapabilities() { + if (_options == null) return ""; + String capabilities = null; + synchronized (_options) { + capabilities = _options.getProperty(PROP_CAPABILITIES); + } + if (capabilities != null) + return capabilities; + else + return ""; + } /** * Get the routing key for the structure using the current modifier in the RoutingKeyGenerator. diff --git a/core/java/src/net/i2p/stat/RateStat.java b/core/java/src/net/i2p/stat/RateStat.java index 16f23765aa2adaef3d229fda5c27b29bdfa18f13..960721fa9625ff6bedab89bd2148bbeee1e9b5f1 100644 --- a/core/java/src/net/i2p/stat/RateStat.java +++ b/core/java/src/net/i2p/stat/RateStat.java @@ -66,6 +66,15 @@ public class RateStat { return rv; } + public double getLifetimeAverageValue() { + if ( (_rates == null) || (_rates.length <= 0) ) return 0; + return _rates[0].getLifetimeAverageValue(); + } + public double getLifetimeEventCount() { + if ( (_rates == null) || (_rates.length <= 0) ) return 0; + return _rates[0].getLifetimeEventCount(); + } + public Rate getRate(long period) { for (int i = 0; i < _rates.length; i++) { if (_rates[i].getPeriod() == period) return _rates[i]; diff --git a/history.txt b/history.txt index 879e8c1bbffc5b776a2144ab362bb3918977adb9..6108993756c65d677a10630e00f5b4c5d2b72187 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,23 @@ -$Id: history.txt,v 1.228 2005/08/21 13:39:06 jrandom Exp $ +$Id: history.txt,v 1.229 2005/08/23 16:25:49 jrandom Exp $ + +2005-08-24 jrandom + * Catch errors with corrupt tunnel messages more gracefully (no need to + kill the thread and cause an OOM...) + * Don't skip shitlisted peers for netDb store messages, as they aren't + necessarily shitlisted by other people (though they probably are). + * Adjust the netDb store per-peer timeout based on each particular peer's + profile (timeout = 4x their average netDb store response time) + * Don't republish leaseSets to *failed* peers - send them to peers who + replied but just didn't know the value. + * Set a 5 second timeout on the I2PTunnelHTTPServer reading the client's + HTTP headers, rather than blocking indefinitely. HTTP headers should be + sent entirely within the first streaming packet anyway, so this won't be + a problem. + * Don't use the I2PTunnel*Server handler thread pool by default, as it may + prevent any clients from accessing the server if the handlers get + blocked by the streaming lib or other issues. + * Don't overwrite a known status (OK/ERR-Reject/ERR-SymmetricNAT) with + Unknown. 2005-08-23 jrandom * Removed the concept of "no bandwidth limit" - if none is specified, its diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 41f84e589dfc7a873d8220eff1a6c7c19e8e2c0c..b002dc5e5834479b7d62402ea910baf3a5c7ef3b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.217 $ $Date: 2005/08/21 13:39:05 $"; + public final static String ID = "$Revision: 1.218 $ $Date: 2005/08/23 16:25:49 $"; public final static String VERSION = "0.6.0.3"; - public final static long BUILD = 1; + public final static long BUILD = 2; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java index dbb68411986db712019e50ec645e61d6e735f60a..f4d3481ebaf27331767bcda32527b79aa954554e 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/KademliaNetworkDatabaseFacade.java @@ -38,6 +38,7 @@ import net.i2p.router.RouterContext; import net.i2p.router.networkdb.DatabaseLookupMessageHandler; import net.i2p.router.networkdb.DatabaseStoreMessageHandler; import net.i2p.router.networkdb.PublishLocalRouterInfoJob; +import net.i2p.router.peermanager.PeerProfile; import net.i2p.util.Log; /** @@ -804,6 +805,20 @@ public class KademliaNetworkDatabaseFacade extends NetworkDatabaseFacade { } return routers; } + + /** smallest allowed period */ + private static final int MIN_PER_PEER_TIMEOUT = 1*1000; + private static final int MAX_PER_PEER_TIMEOUT = 5*1000; + + public int getPeerTimeout(Hash peer) { + PeerProfile prof = _context.profileOrganizer().getProfile(peer); + double responseTime = prof.getDbResponseTime().getLifetimeAverageValue(); + if (responseTime < MIN_PER_PEER_TIMEOUT) + responseTime = MIN_PER_PEER_TIMEOUT; + else if (responseTime > MAX_PER_PEER_TIMEOUT) + responseTime = MAX_PER_PEER_TIMEOUT; + return 4 * (int)responseTime; // give it up to 4x the average response time + } public void renderStatusHTML(Writer out) throws IOException { StringBuffer buf = new StringBuffer(10*1024); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java index 7b90d2d8444ca9bb802a05635ebb654fcc90b954..0282a5514154dc25ec35fb4023358ca342fdc7d9 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java @@ -135,10 +135,8 @@ class SearchJob extends JobImpl { protected int getPerPeerTimeoutMs() { int rv = -1; RateStat rs = getContext().statManager().getRate("netDb.successTime"); - if (rs != null) { - Rate r = rs.getRate(rs.getPeriods()[0]); - rv = (int)r.getLifetimeAverageValue(); - } + if (rs != null) + rv = (int)rs.getLifetimeAverageValue(); rv <<= 1; // double it to give some leeway. (bah, too lazy to record stdev) if ( (rv <= 0) || (rv > PER_PEER_TIMEOUT) ) @@ -344,7 +342,8 @@ class SearchJob extends JobImpl { // return; //} - long expiration = getContext().clock().now() + getPerPeerTimeoutMs(); + int timeout = _facade.getPeerTimeout(router.getIdentity().getHash()); + long expiration = getContext().clock().now() + timeout; DatabaseLookupMessage msg = buildMessage(inTunnelId, inTunnel.getPeer(0), expiration); @@ -366,13 +365,14 @@ class SearchJob extends JobImpl { SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); - getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), getPerPeerTimeoutMs()); + getContext().messageRegistry().registerPending(sel, reply, new FailedJob(getContext(), router), timeout); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnelId, router.getIdentity().getHash()); } /** we're searching for a router, so we can just send direct */ protected void sendRouterSearch(RouterInfo router) { - long expiration = getContext().clock().now() + getPerPeerTimeoutMs(); + int timeout = _facade.getPeerTimeout(router.getIdentity().getHash()); + long expiration = getContext().clock().now() + timeout; DatabaseLookupMessage msg = buildMessage(expiration); @@ -383,7 +383,7 @@ class SearchJob extends JobImpl { SearchMessageSelector sel = new SearchMessageSelector(getContext(), router, _expiration, _state); SearchUpdateReplyFoundJob reply = new SearchUpdateReplyFoundJob(getContext(), router, _state, _facade, this); SendMessageDirectJob j = new SendMessageDirectJob(getContext(), msg, router.getIdentity().getHash(), - reply, new FailedJob(getContext(), router), sel, getPerPeerTimeoutMs(), SEARCH_PRIORITY); + reply, new FailedJob(getContext(), router), sel, timeout, SEARCH_PRIORITY); getContext().jobQueue().addJob(j); } @@ -662,7 +662,7 @@ class SearchJob extends JobImpl { _state.getSuccessful())); } } else { - Set sendTo = _state.getFailed(); + Set sendTo = _state.getRepliedPeers(); // _state.getFailed(); sendTo.addAll(_state.getPending()); int numSent = 0; for (Iterator iter = sendTo.iterator(); iter.hasNext(); ) { diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java index bf2e161aa7357983b3084781febfce1987ebd700..829d7434a71adb2738ec536a01de2133723631da 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchState.java @@ -22,6 +22,7 @@ class SearchState { private HashSet _attemptedPeers; private HashSet _failedPeers; private HashSet _successfulPeers; + private HashSet _repliedPeers; private Hash _searchKey; private volatile long _completed; private volatile long _started; @@ -34,6 +35,7 @@ class SearchState { _failedPeers = new HashSet(16); _successfulPeers = new HashSet(16); _pendingPeerTimes = new HashMap(16); + _repliedPeers = new HashSet(16); _completed = -1; _started = _context.clock().now(); } @@ -120,6 +122,9 @@ class SearchState { /** how long did it take to get the reply, or -1 if we dont know */ public long replyFound(Hash peer) { + synchronized (_repliedPeers) { + _repliedPeers.add(peer); + } synchronized (_pendingPeers) { _pendingPeers.remove(peer); Long when = (Long)_pendingPeerTimes.remove(peer); @@ -130,6 +135,8 @@ class SearchState { } } + public Set getRepliedPeers() { synchronized (_repliedPeers) { return (Set)_repliedPeers.clone(); } } + public void replyTimeout(Hash peer) { synchronized (_pendingPeers) { _pendingPeers.remove(peer); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index cce346bbb59bdddc8d11630279bf653be6d892d2..5ef07d1a600f7cae79988e474f0e9528e496bde5 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -24,6 +24,7 @@ import net.i2p.router.JobImpl; import net.i2p.router.ReplyJob; import net.i2p.router.RouterContext; import net.i2p.router.TunnelInfo; +import net.i2p.router.peermanager.PeerProfile; import net.i2p.stat.Rate; import net.i2p.stat.RateStat; import net.i2p.util.Log; @@ -38,7 +39,7 @@ class StoreJob extends JobImpl { private long _expiration; private PeerSelector _peerSelector; - private final static int PARALLELIZATION = 6; // how many sent at a time + private final static int PARALLELIZATION = 3; // how many sent at a time private final static int REDUNDANCY = 6; // we want the data sent to 6 peers /** * additionally send to 1 outlier(s), in case all of the routers chosen in our @@ -52,11 +53,6 @@ class StoreJob extends JobImpl { private final static int EXPLORATORY_REDUNDANCY = 1; private final static int STORE_PRIORITY = 100; - /** default period we allow for an ACK to take after a store */ - private final static int PER_PEER_TIMEOUT = 5*1000; - /** smallest allowed period */ - private static final int MIN_PER_PEER_TIMEOUT = 1*1000; - /** * Create a new search for the routingKey specified * @@ -157,12 +153,22 @@ class StoreJob extends JobImpl { _log.warn(getJobId() + ": Error selecting closest hash that wasnt a router! " + peer + " : " + ds); _state.addSkipped(peer); } else { - if (getContext().shitlist().isShitlisted(((RouterInfo)ds).getIdentity().calculateHash())) { - _state.addSkipped(peer); - } else { + int peerTimeout = _facade.getPeerTimeout(peer); + //RateStat failing = prof.getDBHistory().getFailedLookupRate(); + //Rate failed = failing.getRate(60*60*1000); + //if (failed.getCurrentEventCount() + failed.getLastEventCount() > avg) { + // _state.addSkipped(peer); + //} + + // we don't want to filter out peers based on our local shitlist, as that opens an avenue for + // manipulation (since a peer can get us to shitlist them by, well, being shitty, and that + // in turn would let them assume that a netDb store received didn't come from us) + //if (getContext().shitlist().isShitlisted(((RouterInfo)ds).getIdentity().calculateHash())) { + // _state.addSkipped(peer); + //} else { _state.addPending(peer); - sendStore((RouterInfo)ds); - } + sendStore((RouterInfo)ds, peerTimeout); + //} } } } @@ -189,7 +195,7 @@ class StoreJob extends JobImpl { * DeliveryStatusMessage so we know it got there * */ - private void sendStore(RouterInfo router) { + private void sendStore(RouterInfo router, int responseTime) { DatabaseStoreMessage msg = new DatabaseStoreMessage(getContext()); msg.setKey(_state.getTarget()); if (_state.getData() instanceof RouterInfo) @@ -210,7 +216,7 @@ class StoreJob extends JobImpl { // _log.debug(getJobId() + ": Send store to " + router.getIdentity().getHash().toBase64()); } - sendStore(msg, router, getContext().clock().now() + getPerPeerTimeoutMs()); + sendStore(msg, router, getContext().clock().now() + responseTime); } private void sendStore(DatabaseStoreMessage msg, RouterInfo peer, long expiration) { @@ -257,7 +263,7 @@ class StoreJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("sending store to " + peer.getIdentity().getHash() + " through " + outTunnel + ": " + msg); - getContext().messageRegistry().registerPending(selector, onReply, onFail, getPerPeerTimeoutMs()); + getContext().messageRegistry().registerPending(selector, onReply, onFail, (int)(expiration - getContext().clock().now())); getContext().tunnelDispatcher().dispatchOutbound(msg, outTunnel.getSendTunnelId(0), null, peer.getIdentity().getHash()); } else { if (_log.shouldLog(Log.ERROR)) @@ -360,27 +366,4 @@ class StoreJob extends JobImpl { _state.complete(true); getContext().statManager().addRateData("netDb.storeFailedPeers", _state.getAttempted().size(), _state.getWhenCompleted()-_state.getWhenStarted()); } - - /** - * Let each peer take up to the average successful search RTT - * - */ - private int getPerPeerTimeoutMs() { - int rv = -1; - RateStat rs = getContext().statManager().getRate("netDb.ackTime"); - if (rs != null) { - Rate r = rs.getRate(rs.getPeriods()[0]); - rv = (int)r.getLifetimeAverageValue(); - } - - rv <<= 1; // double it to give some leeway. (bah, too lazy to record stdev) - if (rv <= 0) - return PER_PEER_TIMEOUT; - else if (rv < MIN_PER_PEER_TIMEOUT) - return MIN_PER_PEER_TIMEOUT; - else if (rv > PER_PEER_TIMEOUT) - return PER_PEER_TIMEOUT; - else - return rv; - } } \ No newline at end of file diff --git a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java index aa73ded7748d55f7cc1ecc85ac59b969c31c8d18..4b80c32e12e66d8b450ff1d07f2092077f7c1aea 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerTestManager.java @@ -352,8 +352,8 @@ class PeerTestManager { PeerState bob = _transport.getPeerState(from); if (bob == null) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Received from bob (" + from + ") who hasn't established a session with us, refusing to help him test " + aliceIP +":" + alicePort); + if (_log.shouldLog(Log.WARN)) + _log.warn("Received from bob (" + from + ") who hasn't established a session with us, refusing to help him test " + aliceIP +":" + alicePort); return; } else { state.setBobCipherKey(bob.getCurrentCipherKey()); diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 9dcc8af137a1df1539a362a3d104e0d24e3de2a1..99ec4e59545e3aeb163967673d2e872015807ced 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -1080,13 +1080,13 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority case CommSystemFacade.STATUS_UNKNOWN: default: _context.statManager().addRateData("udp.statusUnknown", 1, 0); - if (now - _reachabilityStatusLastUpdated < STATUS_GRACE_PERIOD) { - _testEvent.forceRun(); - SimpleTimer.getInstance().addEvent(_testEvent, 5*1000); - } else { - _reachabilityStatus = status; - _reachabilityStatusLastUpdated = now; - } + //if (now - _reachabilityStatusLastUpdated < STATUS_GRACE_PERIOD) { + // _testEvent.forceRun(); + // SimpleTimer.getInstance().addEvent(_testEvent, 5*1000); + //} else { + // _reachabilityStatus = status; + // _reachabilityStatusLastUpdated = now; + //} break; } } diff --git a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java index 12ff97c634474d78a67a8a15bbe740692c4bf063..61d8a08497521904b2b4b2f66924108b2ba5bc35 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentHandler.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentHandler.java @@ -82,8 +82,14 @@ public class FragmentHandler { //_log.debug("fragments: " + Base64.encode(preprocessed, offset, preprocessed.length-offset)); } try { - while (offset < length) - offset = receiveFragment(preprocessed, offset, length); + while (offset < length) { + int off = receiveFragment(preprocessed, offset, length); + if (off < 0) { + _context.statManager().addRateData("tunnel.corruptMessage", 1, 1); + return; + } + offset = off; + } } catch (RuntimeException e) { if (_log.shouldLog(Log.ERROR)) _log.error("Corrupt fragment received: offset = " + offset, e); @@ -253,7 +259,8 @@ public class FragmentHandler { msg = new FragmentedMessage(_context); } - msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId); + boolean ok = msg.receive(messageId, preprocessed, offset, size, !fragmented, router, tunnelId); + if (!ok) return -1; if (msg.isComplete()) { if (fragmented) { synchronized (_fragmentedMessages) { @@ -315,7 +322,8 @@ public class FragmentHandler { } } - msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast); + boolean ok = msg.receive(messageId, fragmentNum, preprocessed, offset, size, isLast); + if (!ok) return -1; if (msg.isComplete()) { synchronized (_fragmentedMessages) { diff --git a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java index c5a314014560ae79a87a441531ffc7538e3b7146..d39a5e56b2b950c86f1c0f516db6620b27083366 100644 --- a/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java +++ b/router/java/src/net/i2p/router/tunnel/FragmentedMessage.java @@ -65,11 +65,27 @@ public class FragmentedMessage { * @param length how much past the offset should we snag? * @param isLast is this the last fragment in the message? */ - public void receive(long messageId, int fragmentNum, byte payload[], int offset, int length, boolean isLast) { - if (fragmentNum < 0) throw new RuntimeException("Fragment # == " + fragmentNum + " for messageId " + messageId); - if (payload == null) throw new RuntimeException("Payload is null for messageId " + messageId); - if (length <= 0) throw new RuntimeException("Length is impossible (" + length + ") for messageId " + messageId); - if (offset + length > payload.length) throw new RuntimeException("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId); + public boolean receive(long messageId, int fragmentNum, byte payload[], int offset, int length, boolean isLast) { + if (fragmentNum < 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Fragment # == " + fragmentNum + " for messageId " + messageId); + return false; + } + if (payload == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Payload is null for messageId " + messageId); + return false; + } + if (length <= 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Length is impossible (" + length + ") for messageId " + messageId); + return false; + } + if (offset + length > payload.length) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId); + return false; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive message " + messageId + " fragment " + fragmentNum + " with " + length + " bytes (last? " + isLast + ") offset = " + offset); _messageId = messageId; @@ -87,8 +103,12 @@ public class FragmentedMessage { _lastReceived = _lastReceived || isLast; if (fragmentNum > _highFragmentNum) _highFragmentNum = fragmentNum; - if (isLast && fragmentNum <= 0) - throw new RuntimeException("hmm, isLast and fragmentNum=" + fragmentNum + " for message " + messageId); + if (isLast && fragmentNum <= 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("hmm, isLast and fragmentNum=" + fragmentNum + " for message " + messageId); + return false; + } + return true; } /** @@ -103,10 +123,22 @@ public class FragmentedMessage { * @param toRouter what router is this destined for (may be null) * @param toTunnel what tunnel is this destined for (may be null) */ - public void receive(long messageId, byte payload[], int offset, int length, boolean isLast, Hash toRouter, TunnelId toTunnel) { - if (payload == null) throw new RuntimeException("Payload is null for messageId " + messageId); - if (length <= 0) throw new RuntimeException("Length is impossible (" + length + ") for messageId " + messageId); - if (offset + length > payload.length) throw new RuntimeException("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId); + public boolean receive(long messageId, byte payload[], int offset, int length, boolean isLast, Hash toRouter, TunnelId toTunnel) { + if (payload == null) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Payload is null for messageId " + messageId); + return false; + } + if (length <= 0) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Length is impossible (" + length + ") for messageId " + messageId); + return false; + } + if (offset + length > payload.length) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Length is impossible (" + length + "/" + offset + " out of " + payload.length + ") for messageId " + messageId); + return false; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("Receive message " + messageId + " with " + length + " bytes (last? " + isLast + ") targetting " + toRouter + " / " + toTunnel + " offset=" + offset); _messageId = messageId; @@ -124,6 +156,7 @@ public class FragmentedMessage { _toTunnel = toTunnel; if (_highFragmentNum < 0) _highFragmentNum = 0; + return true; } public long getMessageId() { return _messageId; }