Tunnels: Improve error handling of zero tunnel ID at OBEP

Reduce max time to defragment
Make logging of errors consistent
cleanups, stat tweaks
This commit is contained in:
zzz
2021-01-02 09:57:27 -05:00
parent 139594df8f
commit a481255adb
3 changed files with 45 additions and 21 deletions

View File

@@ -99,8 +99,8 @@ class FragmentHandler {
private final AtomicInteger _completed = new AtomicInteger();
private final AtomicInteger _failed = new AtomicInteger();
/** don't wait more than 60s to defragment the partial message */
static long MAX_DEFRAGMENT_TIME = 60*1000;
/** don't wait more than this long to completely receive a fragmented message */
static long MAX_DEFRAGMENT_TIME = 45*1000;
private static final ByteCache _cache = ByteCache.getInstance(512, TrivialPreprocessor.PREPROCESSED_SIZE);
public FragmentHandler(RouterContext context, DefragmentedReceiver receiver) {
@@ -125,7 +125,7 @@ class FragmentHandler {
_log.warn("Unable to verify preprocessed data (pre.length="
+ preprocessed.length + " off=" +offset + " len=" + length);
_cache.release(new ByteArray(preprocessed));
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
return;
}
offset += HopProcessor.IV_LENGTH; // skip the IV
@@ -136,7 +136,9 @@ class FragmentHandler {
// AIOOBE http://forum.i2p/viewtopic.php?t=3187
if (offset >= TrivialPreprocessor.PREPROCESSED_SIZE) {
_cache.release(new ByteArray(preprocessed));
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
if (_log.shouldWarn())
_log.warn("Corrupt fragment received: off = " + offset);
return;
}
padding++;
@@ -150,21 +152,25 @@ class FragmentHandler {
while (offset < length) {
int off = receiveFragment(preprocessed, offset, length);
if (off < 0) {
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
if (_log.shouldWarn())
_log.warn("Corrupt fragment received: off = " + off);
return;
}
offset = off;
}
} catch (ArrayIndexOutOfBoundsException aioobe) {
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
if (_log.shouldWarn())
_log.warn("Corrupt fragment received: offset = " + offset, aioobe);
} catch (NullPointerException npe) {
if (_log.shouldLog(Log.ERROR))
_log.error("Corrupt fragment received: offset = " + offset, npe);
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
if (_log.shouldWarn())
_log.warn("Corrupt fragment received: offset = " + offset, npe);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
} catch (RuntimeException e) {
if (_log.shouldLog(Log.ERROR))
_log.error("Corrupt fragment received: offset = " + offset, e);
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
if (_log.shouldWarn())
_log.warn("Corrupt fragment received: offset = " + offset, e);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
// java.lang.IllegalStateException: don't get the completed size when we're not complete - null fragment i=0 of 1
// at net.i2p.router.tunnel.FragmentedMessage.getCompleteSize(FragmentedMessage.java:194)
// at net.i2p.router.tunnel.FragmentedMessage.toByteArray(FragmentedMessage.java:223)
@@ -284,7 +290,8 @@ class FragmentHandler {
static final short TYPE_UNDEF = 3;
/**
* @return the offset for the next byte after the received fragment
* @return the offset for the next byte after the received fragment or -1 on error
* @throws RuntimeException
*/
private int receiveFragment(byte preprocessed[], int offset, int length) {
if (_log.shouldLog(Log.DEBUG))
@@ -299,7 +306,8 @@ class FragmentHandler {
/**
* Handle the initial fragment in a message (or a full message, if it fits)
*
* @return offset after reading the full fragment
* @return offset after reading the full fragment or -1 on error
* @throws RuntimeException
*/
private int receiveInitialFragment(byte preprocessed[], int offset, int length) {
if (_log.shouldLog(Log.DEBUG))
@@ -317,7 +325,11 @@ class FragmentHandler {
if (offset + 4 >= preprocessed.length)
return -1;
long id = DataHelper.fromLong(preprocessed, offset, 4);
tunnelId = new TunnelId(id);
// i2pd 2.19 bug? 0 will throw IAE.
// message checked and discarded below.
// don't throw so we can process the other fragments if any, if they're from a different message
if (id != 0)
tunnelId = new TunnelId(id);
offset += 4;
}
if ( (type == TYPE_ROUTER) || (type == TYPE_TUNNEL) ) {
@@ -359,6 +371,18 @@ class FragmentHandler {
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping msg at tunnel endpoint with unsupported delivery instruction type " +
type + " rcvr: " + _receiver);
_context.statManager().addRateData("tunnel.corruptMessage", 1);
} else if (type == TYPE_TUNNEL && tunnelId == null) {
// do this after the above since we have to return offset
// i2pd 2.19 bug? see above
if (_log.shouldLog(Log.WARN))
_log.warn("Dropping msg at tunnel endpoint with delivery instruction to tunnel 0" +
" gw: " + router +
" fragmented? " + fragmented +
" id: " + messageId +
" size: " + size +
" type: " + (preprocessed[offset] & 0xff));
_context.statManager().addRateData("tunnel.corruptMessage", 1);
} else if (fragmented) {
FragmentedMessage msg;
synchronized (_fragmentedMessages) {
@@ -408,7 +432,8 @@ class FragmentHandler {
/**
* Handle a fragment beyond the initial fragment in a message
*
* @return offset after reading the full fragment
* @return offset after reading the full fragment or -1 on error
* @throws RuntimeException
*/
private int receiveSubsequentFragment(byte preprocessed[], int offset, int length) {
if (_log.shouldLog(Log.DEBUG))

View File

@@ -39,7 +39,6 @@ class OutboundTunnelEndpoint {
// If we don't, the data buf won't get released from the cache... that's ok
if (_log.shouldLog(Log.WARN))
_log.warn("Invalid IV, dropping at OBEP " + _config);
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return;
}
_handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length);

View File

@@ -85,10 +85,11 @@ class TunnelParticipant {
public void dispatch(TunnelDataMessage msg, Hash recvFrom) {
boolean ok = false;
byte[] data = msg.getData();
if (_processor != null)
ok = _processor.process(msg.getData(), 0, msg.getData().length, recvFrom);
ok = _processor.process(data, 0, data.length, recvFrom);
else if (_inboundEndpointProcessor != null)
ok = _inboundEndpointProcessor.retrievePreprocessedData(msg.getData(), 0, msg.getData().length, recvFrom);
ok = _inboundEndpointProcessor.retrievePreprocessedData(data, 0, data.length, recvFrom);
if (!ok) {
if (_log.shouldLog(Log.WARN))
@@ -96,7 +97,6 @@ class TunnelParticipant {
+ " inboundEndpoint=" + _inboundEndpointProcessor);
if (_config != null)
_config.incrementProcessedMessages();
_context.statManager().addRateData("tunnel.corruptMessage", 1, 1);
return;
}
@@ -125,7 +125,7 @@ class TunnelParticipant {
_inboundEndpointProcessor.getConfig().incrementProcessedMessages();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Receive fragment: on " + _config + ": " + msg);
_handler.receiveTunnelMessage(msg.getData(), 0, msg.getData().length);
_handler.receiveTunnelMessage(data, 0, data.length);
}
}