forked from I2P_Developers/i2p.i2p
SSU2: Fail session if first outbound message fails
Reduce max consecutive message failures to fail session (SSU 1/2) Set peer as unreachable for these failures Reduce max consectutive session confirmed retransmissions Fix session confirmed retransmission timing Respond to relay tag request in data phase Log tweaks
This commit is contained in:
@@ -99,8 +99,8 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
|
||||
int type = data[off + TYPE_OFFSET] & 0xff;
|
||||
long token = DataHelper.fromLong8(data, off + TOKEN_OFFSET);
|
||||
if (type == TOKEN_REQUEST_FLAG_BYTE) {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got token request from: " + _aliceSocketAddress);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Got token request from: " + _aliceSocketAddress);
|
||||
_currentState = InboundState.IB_STATE_TOKEN_REQUEST_RECEIVED;
|
||||
// decrypt in-place
|
||||
ChaChaPolyCipherState chacha = new ChaChaPolyCipherState();
|
||||
@@ -472,8 +472,8 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa
|
||||
public synchronized void receiveSessionRequestAfterRetry(UDPPacket packet) throws GeneralSecurityException {
|
||||
if (_currentState != InboundState.IB_STATE_RETRY_SENT)
|
||||
throw new GeneralSecurityException("Bad state for Session Request after Retry: " + _currentState);
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got session request after retry from: " + _aliceSocketAddress);
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Got session request after retry from: " + _aliceSocketAddress);
|
||||
DatagramPacket pkt = packet.getPacket();
|
||||
SocketAddress from = pkt.getSocketAddress();
|
||||
if (!from.equals(_aliceSocketAddress))
|
||||
|
||||
@@ -1434,8 +1434,8 @@ public class PeerState {
|
||||
_transport.failed(state, false);
|
||||
return;
|
||||
}
|
||||
if (_log.shouldLog(Log.DEBUG))
|
||||
_log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
|
||||
//if (_log.shouldLog(Log.DEBUG))
|
||||
// _log.debug("Adding to " + _remotePeer + ": " + state.getMessageId());
|
||||
int rv = 0;
|
||||
// will never fail for CDPQ
|
||||
boolean fail;
|
||||
@@ -1548,6 +1548,7 @@ public class PeerState {
|
||||
if (failed != null) {
|
||||
int failedSize = 0;
|
||||
int failedCount = 0;
|
||||
boolean totalFail = false;
|
||||
for (int i = 0; i < failed.size(); i++) {
|
||||
OutboundMessageState state = failed.get(i);
|
||||
failedSize += state.getUnackedSize();
|
||||
@@ -1558,6 +1559,8 @@ public class PeerState {
|
||||
_transport.failed(state);
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Message expired: " + state + " to: " + this);
|
||||
if (!_isInbound && msg.getSeqNum() == 0)
|
||||
totalFail = true; // see below
|
||||
} else {
|
||||
// it can not have an OutNetMessage if the source is the
|
||||
// final after establishment message
|
||||
@@ -1566,6 +1569,17 @@ public class PeerState {
|
||||
}
|
||||
}
|
||||
if (failedSize > 0) {
|
||||
if (totalFail) {
|
||||
// first outbound message failed
|
||||
// This also ensures that in SSU2 if we never get an ACK of the
|
||||
// Session Confirmed, we will fail quickly (because we don't have
|
||||
// a separate timer for retransmitting it)
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("First message failed on " + this);
|
||||
_transport.sendDestroy(this, SSU2Util.REASON_FRAME_TIMEOUT);
|
||||
_transport.dropPeer(this, true, "OB First Message Fail");
|
||||
return 0;
|
||||
}
|
||||
// restore the window
|
||||
synchronized(_sendWindowBytesRemainingLock) {
|
||||
// this isn't exactly right, because some fragments may not have been sent at all,
|
||||
|
||||
@@ -81,7 +81,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
public static final int DEFAULT_MTU = MAX_MTU;
|
||||
|
||||
private static final int BITFIELD_SIZE = 512;
|
||||
private static final int MAX_SESS_CONF_RETX = 6;
|
||||
private static final int MAX_SESS_CONF_RETX = 4;
|
||||
private static final int SESS_CONF_RETX_TIME = 1000;
|
||||
private static final long SENT_MESSAGES_CLEAN_TIME = 60*1000;
|
||||
|
||||
@@ -218,16 +218,20 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
if (_sessConfForReTX != null) {
|
||||
// retransmit Session Confirmed when it's time
|
||||
if (_sessConfSentTime + (SESS_CONF_RETX_TIME << (_sessConfSentCount - 1)) < now) {
|
||||
// note: we generally won't get here, because the
|
||||
// first outbound message will timeout before this
|
||||
// and close the session in super.finishMessages()
|
||||
if (_sessConfSentCount >= MAX_SESS_CONF_RETX) {
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Fail, no Sess Conf ACK rcvd on " + this);
|
||||
UDPPacket pkt = _transport.getBuilder2().buildSessionDestroyPacket(SSU2Util.REASON_FRAME_TIMEOUT, this);
|
||||
_transport.send(pkt);
|
||||
_transport.dropPeer(this, false, "No Sess Conf ACK rcvd");
|
||||
_transport.dropPeer(this, true, "No Sess Conf ACK rcvd");
|
||||
_sessConfForReTX = null;
|
||||
return null;
|
||||
}
|
||||
_sessConfSentCount++;
|
||||
_sessConfSentTime = now;
|
||||
packets = getRetransmitSessionConfirmedPackets();
|
||||
}
|
||||
}
|
||||
@@ -414,6 +418,23 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
public void gotRelayTagRequest() {
|
||||
if (!ENABLE_RELAY)
|
||||
return;
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got RELAY TAG REQUEST on " + this);
|
||||
long tag = getWeRelayToThemAs();
|
||||
if (tag <= 0) {
|
||||
if (_transport.canIntroduce(isIPv6())) {
|
||||
tag = 1 + _context.random().nextLong(EstablishmentManager.MAX_TAG_VALUE);
|
||||
setWeRelayToThemAs(tag);
|
||||
_transport.getIntroManager().add(this);
|
||||
}
|
||||
}
|
||||
if (tag > 0) {
|
||||
SSU2Payload.Block block = new SSU2Payload.RelayTagBlock(tag);
|
||||
UDPPacket pkt = _transport.getBuilder2().buildPacket(Collections.emptyList(),
|
||||
Collections.singletonList(block),
|
||||
this);
|
||||
_transport.send(pkt);
|
||||
}
|
||||
}
|
||||
|
||||
public void gotRelayTag(long tag) {
|
||||
@@ -483,8 +504,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
}
|
||||
|
||||
public void gotFragment(byte[] data, int off, int len, long messageId, int frag, boolean isLast) throws DataFormatException {
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Got FRAGMENT block: " + messageId + " fragment " + frag + " len " + len +
|
||||
if (_log.shouldDebug())
|
||||
_log.debug("Got FRAGMENT block: " + messageId + " fragment " + frag + " len " + len +
|
||||
" isLast? " + isLast + " on " + _remotePeer.toBase64());
|
||||
InboundMessageState state;
|
||||
boolean messageComplete = false;
|
||||
@@ -681,8 +702,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
_log.debug("New ACK of fragment " + f.num + " of " + state);
|
||||
} else {
|
||||
// will happen with retransmission as a different packet number
|
||||
if (_log.shouldWarn())
|
||||
_log.warn("Dup ACK of fragment " + f.num + " of " + state);
|
||||
if (_log.shouldInfo())
|
||||
_log.info("Dup ACK of fragment " + f.num + " of " + state + " on " + this);
|
||||
}
|
||||
long sn = state.getSeqNum();
|
||||
if (sn > highest)
|
||||
@@ -695,7 +716,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
|
||||
/**
|
||||
* Note that we just sent the SessionConfirmed packets
|
||||
* and save them for retransmission.
|
||||
*
|
||||
* This is only called the first time.
|
||||
* For retransmit see allocateSend() above.
|
||||
*/
|
||||
synchronized void confirmedPacketsSent(byte[][] data) {
|
||||
if (_sessConfForReTX == null)
|
||||
|
||||
@@ -236,7 +236,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
/** should we flood all UDP peers with the configured rate? This is for testing only! */
|
||||
//private static final boolean SHOULD_FLOOD_PEERS = false;
|
||||
|
||||
private static final int MAX_CONSECUTIVE_FAILED = 5;
|
||||
private static final int MAX_CONSECUTIVE_FAILED = 3;
|
||||
|
||||
public static final int DEFAULT_COST = 5;
|
||||
private static final int SSU_OUTBOUND_COST = 14;
|
||||
@@ -3244,7 +3244,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
|
||||
} else {
|
||||
_context.statManager().addRateData("udp.dropPeerConsecutiveFailures", consecutive, msg.getPeer().getInactivityTime());
|
||||
sendDestroy(msg.getPeer(), SSU2Util.REASON_FRAME_TIMEOUT);
|
||||
dropPeer(msg.getPeer(), false, "too many failures");
|
||||
dropPeer(msg.getPeer(), true, "too many failures");
|
||||
}
|
||||
//if ( (consecutive > MAX_CONSECUTIVE_FAILED) && (msg.getPeer().getInactivityTime() > DROP_INACTIVITY_TIME))
|
||||
// dropPeer(msg.getPeer(), false);
|
||||
|
||||
Reference in New Issue
Block a user