SSU2: Fixes part 3

Handle acks
Set ack timer
Retransmit session confirmed
Fix bitfield constructor when no ranges
This commit is contained in:
zzz
2022-03-07 09:55:10 -05:00
parent 3ce669575f
commit 25cdc988e1
5 changed files with 289 additions and 32 deletions

View File

@@ -328,6 +328,18 @@ class OutboundMessageState implements CDPQEntry {
return isComplete();
}
/**
* Ack this fragment number.
* For SSU 2 only.
*
* @return true if the message was completely ACKed
* @since 0.9.54
*/
public synchronized boolean acked(int fragmentNum) {
_fragmentAcks &= ~mask(fragmentNum);
return isComplete();
}
/**
* The max number of sends for any fragment.
* As of 0.9.49, may be less than getPushCount() if we pushed only some fragments

View File

@@ -264,10 +264,19 @@ class PacketBuilder2 {
}
packet.setPriority(priority);
if (fragments.isEmpty())
if (fragments.isEmpty()) {
peer.getAckedMessages().set(pktNum); // not ack-eliciting
else
packet.markType(1);
packet.setFragmentCount(-1);
packet.setMessageType(TYPE_ACK);
} else {
// OMF reuses/clears the fragments list, so we must copy it
if (fragments.size() == 1)
fragments = Collections.singletonList(fragments.get(0));
else
fragments = new ArrayList<Fragment>(fragments);
peer.fragmentsSent(pktNum, fragments);
}
return packet;
}

View File

@@ -70,7 +70,7 @@ public class PeerState {
private SessionKey _nextCipherKey;
/** when were the current cipher and MAC keys established/rekeyed? */
private long _keyEstablishedTime;
private final long _keyEstablishedTime;
/**
* How far off is the remote peer from our clock, in milliseconds?
@@ -111,7 +111,7 @@ public class PeerState {
/** when did we last send ACKs to the peer? */
private volatile long _lastACKSend;
/** when did we decide we need to ACK to this peer? */
private volatile long _wantACKSendSince;
protected volatile long _wantACKSendSince;
/** have we received a packet with the ECN bit set in the current second? */
private boolean _currentSecondECNReceived;
/**
@@ -169,7 +169,7 @@ public class PeerState {
private int _mtuIncreases;
private int _mtuDecreases;
/** current round trip time estimate */
private int _rtt;
protected int _rtt;
/** smoothed mean deviation in the rtt */
private int _rttDeviation;
/** current retransmission timeout */
@@ -228,7 +228,7 @@ public class PeerState {
/** how many concurrency rejections have we had in a row */
private int _consecutiveRejections;
/** is it inbound? **/
private final boolean _isInbound;
protected final boolean _isInbound;
/** Last time it was made an introducer **/
private long _lastIntroducerTime;
@@ -304,7 +304,7 @@ public class PeerState {
private static final int INIT_RTT = 0;
private static final int MAX_RTO = 60*1000;
/** how frequently do we want to send ACKs to a peer? */
private static final int ACK_FREQUENCY = 150;
protected static final int ACK_FREQUENCY = 150;
private static final int CLOCK_SKEW_FUDGE = (ACK_FREQUENCY * 2) / 3;
/**
@@ -777,7 +777,9 @@ public class PeerState {
_receiveBytes = 0;
_receivePeriodBegin = now;
}
_currentACKs.add(messageId);
// null for PeerState2
if (_currentACKs != null)
_currentACKs.add(messageId);
messagePartiallyReceived(now);
}
@@ -792,7 +794,7 @@ public class PeerState {
* We received a partial message, or we want to send some acks.
* @since 0.9.52
*/
private synchronized void messagePartiallyReceived(long now) {
protected synchronized void messagePartiallyReceived(long now) {
if (_wantACKSendSince <= 0) {
_wantACKSendSince = now;
new ACKTimer();
@@ -1180,7 +1182,7 @@ public class PeerState {
/**
* We sent a message which was ACKed containing the given # of bytes.
*/
private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending, boolean anyQueued) {
protected void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending, boolean anyQueued) {
synchronized(this) {
locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued);
}
@@ -1841,12 +1843,15 @@ public class PeerState {
long sn = state.getSeqNum();
if (sn > highestSeqNumAcked.value)
highestSeqNumAcked.value = sn;
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
// null for PS2
if (_ackedMessages != null) {
synchronized(_ackedMessages) {
_ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn));
}
}
// this adjusts the rtt/rto/window/etc
messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued);
} else {
} else if (_ackedMessages != null) { // null for PS2
// dupack, likely
Long seq;
synchronized(_ackedMessages) {

View File

@@ -7,6 +7,7 @@ import java.net.UnknownHostException;
import java.security.GeneralSecurityException;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import com.southernstorm.noise.protocol.CipherState;
@@ -20,9 +21,11 @@ import net.i2p.data.i2np.I2NPMessage;
import net.i2p.data.i2np.I2NPMessageException;
import net.i2p.data.i2np.I2NPMessageImpl;
import net.i2p.router.RouterContext;
import net.i2p.router.transport.udp.InboundMessageFragments.ModifiableLong;
import static net.i2p.router.transport.udp.SSU2Util.*;
import net.i2p.util.HexDump;
import net.i2p.util.Log;
import net.i2p.util.SimpleTimer2;
/**
* Contain all of the state about a UDP connection to a peer.
@@ -34,7 +37,7 @@ import net.i2p.util.Log;
*
* @since 0.9.54
*/
public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback {
public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback, SSU2Bitfield.Callback {
private final long _sendConnID;
private final long _rcvConnID;
private final AtomicInteger _packetNumber = new AtomicInteger();
@@ -45,8 +48,18 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
private final byte[] _sendHeaderEncryptKey2;
private final byte[] _rcvHeaderEncryptKey2;
private final SSU2Bitfield _receivedMessages;
/**
* PS1 has _ackedMessages which is a map of message ID to sequence number.
* Here we have the reverse, a bitfield of acked packet (sequence) numbers,
* and map of unacked packet (sequence) numbers to the fragments that packet contained.
*/
private final SSU2Bitfield _ackedMessages;
private final ConcurrentHashMap<Long, List<PacketBuilder.Fragment>> _sentMessages;
// Session Confirmed retransmit
private byte[] _sessConfForReTX;
private long _sessConfSentTime;
private int _sessConfSentCount;
// As SSU
public static final int MIN_SSU_IPV4_MTU = 1292;
@@ -60,6 +73,11 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
public static final int MAX_MTU = 1500;
public static final int DEFAULT_MTU = MAX_MTU;
private static final int MAX_SESS_CONF_RETX = 6;
private static final int SESS_CONF_RETX_TIME = 1000;
/**
* @param rtt from the EstablishState, or 0 if not available
*/
@@ -78,6 +96,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
_rcvHeaderEncryptKey2 = rcvHdrKey2;
_receivedMessages = new SSU2Bitfield(256, 0);
_ackedMessages = new SSU2Bitfield(256, 0);
_sentMessages = new ConcurrentHashMap<Long, List<PacketBuilder.Fragment>>(32);
if (isInbound) {
// Send immediate ack of Session Confirmed
_receivedMessages.set(0);
@@ -130,18 +149,54 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
*/
@Override
void clearWantedACKSendSince() {
// TODO
//if ( )
// _wantACKSendSince = 0;
// race prevention
if (_sentMessages.isEmpty())
_wantACKSendSince = 0;
}
/**
* We received the message specified completely.
* @param bytes if less than or equal to zero, message is a duplicate.
* Overridden to use our version of ACKTimer
*/
@Override
void messageFullyReceived(Long messageId, int bytes) {
// TODO
protected synchronized void messagePartiallyReceived(long now) {
if (_wantACKSendSince <= 0) {
_wantACKSendSince = now;
new ACKTimer();
}
}
/**
* Overridden to retransmit SessionConfirmed also
*/
@Override
List<OutboundMessageState> allocateSend(long now) {
if (!_isInbound && _ackedMessages.getOffset() == 0 && !_ackedMessages.get(0)) {
UDPPacket[] packets = null;
synchronized(this) {
if (_sessConfForReTX != null) {
// retransmit Session Confirmed when it's time
if (_sessConfSentTime + (SESS_CONF_RETX_TIME << (_sessConfSentCount - 1)) < now) {
if (_sessConfSentCount >= MAX_SESS_CONF_RETX) {
if (_log.shouldWarn())
_log.warn("Fail, no Sess Conf ACK rcvd on " + this);
_transport.dropPeer(this, false, "No Sess Conf ACK rcvd");
_sessConfForReTX = null;
return null;
}
_sessConfSentCount++;
packets = getRetransmitSessionConfirmedPackets();
}
}
}
if (packets != null) {
if (_log.shouldInfo())
_log.info("ReTX Sess Conf on " + this);
for (int i = 0; i < packets.length; i++) {
_transport.send(packets[i]);
}
}
}
return super.allocateSend(now);
}
// SSU 1 unsupported things
@@ -174,7 +229,12 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
byte[] getRcvHeaderEncryptKey1() { return _rcvHeaderEncryptKey1; }
byte[] getSendHeaderEncryptKey2() { return _sendHeaderEncryptKey2; }
byte[] getRcvHeaderEncryptKey2() { return _rcvHeaderEncryptKey2; }
SSU2Bitfield getReceivedMessages() { return _receivedMessages; }
SSU2Bitfield getReceivedMessages() {
if (_log.shouldDebug())
_log.debug("Sending acks " + _receivedMessages + " on " + this);
return _receivedMessages;
}
SSU2Bitfield getAckedMessages() { return _ackedMessages; }
/**
@@ -337,16 +397,21 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
_context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString());
// all state access must be before this
state.releaseResources();
} else {
messagePartiallyReceived();
}
}
public void gotACK(long ackThru, int acks, byte[] ranges) {
if (_log.shouldDebug()) {
if (ranges != null)
_log.debug("Got ACK block: " + SSU2Bitfield.toString(ackThru, acks, ranges, ranges.length / 2));
else
_log.debug("Got ACK block: " + SSU2Bitfield.toString(ackThru, acks, null, 0));
}
SSU2Bitfield ackbf;
if (ranges != null)
ackbf = SSU2Bitfield.fromACKBlock(ackThru, acks, ranges, ranges.length / 2);
else
ackbf = SSU2Bitfield.fromACKBlock(ackThru, acks, null, 0);
if (_log.shouldDebug())
_log.debug("Got ACK block: " + ackbf);
// calls bitSet() below
ackbf.forEachAndNot(_ackedMessages, this);
}
public void gotTermination(int reason, long count) {
@@ -409,7 +474,76 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
* so we can process acks.
*/
void fragmentsSent(long pktNum, List<PacketBuilder.Fragment> fragments) {
List<PacketBuilder.Fragment> old = _sentMessages.putIfAbsent(Long.valueOf(pktNum), fragments);
if (old != null) {
// shouldn't happen
if (_log.shouldWarn())
_log.warn("Dup send of pkt " + pktNum + " on " + this);
} else {
if (_log.shouldWarn())
_log.warn("New data pkt " + pktNum + " sent with " + fragments.size() + " fragments on " + this);
}
}
/**
* Callback from SSU2Bitfield.forEachAndNot().
* A new ack was received.
*/
public void bitSet(long pktNum) {
if (pktNum == 0 && !_isInbound) {
// we don't need to save the Session Confirmed for retransmission any more
synchronized(this) {
_sessConfForReTX = null;
}
if (_log.shouldDebug())
_log.debug("New ACK of Session Confirmed on " + this);
return;
}
List<PacketBuilder.Fragment> fragments = _sentMessages.remove(Long.valueOf(pktNum));
if (fragments == null) {
// shouldn't happen
if (_log.shouldWarn())
_log.warn("New ACK of pkt " + pktNum + " not found on " + this);
return;
}
if (_log.shouldDebug())
_log.debug("New ACK of pkt " + pktNum + " containing " + fragments.size() + " fragments on " + this);
ModifiableLong highestSeqNumAcked = new ModifiableLong(-1);
for (PacketBuilder.Fragment f : fragments) {
OutboundMessageState state = f.state;
if (state.isComplete()) {
if (_log.shouldWarn())
_log.warn("New ACK but state complete? " + state);
continue;
}
int num = f.num;
int ackedSize = state.getUnackedSize();
boolean complete = state.acked(f.num);
if (complete) {
if (_log.shouldDebug())
_log.debug("Received ACK of fragment " + num + " of " + state +
", now complete");
acked(state.getMessageId(), highestSeqNumAcked);
if (ackedSize > 0) {
// TODO acked() will have an ackedSize of 0, so do it here also
messageACKed(ackedSize, state.getLifetime(), state.getMaxSends(), false, false);
}
} else {
ackedSize -= state.getUnackedSize();
if (_log.shouldDebug())
_log.debug("Received ACK of fragment " + num + " of " + state +
", still incomplete");
if (ackedSize > 0) {
state.clearNACKs();
// this adjusts the rtt/rto/window/etc
// flags TODO
messageACKed(ackedSize, state.getLifetime(), state.getMaxSends(), false, false);
}
}
}
long highest = highestSeqNumAcked.value;
if (highest >= 0)
highestSeqNumAcked(highest);
}
/**
@@ -419,6 +553,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
public synchronized void confirmedPacketsSent(byte[] data) {
if (_sessConfForReTX == null)
_sessConfForReTX = data;
_sessConfSentTime = _context.clock().now();
_sessConfSentCount++;
}
/**
@@ -440,4 +576,37 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
packet.setPriority(PacketBuilder2.PRIORITY_HIGH);
return rv;
}
/**
* A timer to send an ack-only packet.
*/
private class ACKTimer extends SimpleTimer2.TimedEvent {
public ACKTimer() {
super(_context.simpleTimer2());
long delta = Math.min(_rtt/2, ACK_FREQUENCY);
if (_log.shouldDebug())
_log.debug("Sending delayed ack in " + delta + ": " + PeerState2.this);
schedule(delta);
}
/**
* Send an ack-only packet, unless acks were already sent
* as indicated by _wantACKSendSince == 0.
* Will not requeue unless the acks don't all fit (unlikely).
*/
public void timeReached() {
synchronized(PeerState2.this) {
long wanted = _wantACKSendSince;
if (wanted <= 0) {
if (_log.shouldDebug())
_log.debug("Already acked:" + PeerState2.this);
return;
}
UDPPacket ack = _transport.getBuilder2().buildACK(PeerState2.this);
if (_log.shouldDebug())
_log.debug("Sending acks to " + PeerState2.this);
_transport.send(ack);
}
}
}
}

View File

@@ -14,7 +14,7 @@ import net.i2p.router.transport.udp.SSU2Payload.AckBlock;
*
* @since 0.9.54
*/
public class SSU2Bitfield {
class SSU2Bitfield {
private final long[] bitfield;
private final int size;
@@ -38,7 +38,8 @@ public class SSU2Bitfield {
if (size <= 0 || offset < 0)
throw new IllegalArgumentException();
// force mult. of 256
this.size = (size + 255) & 0x7FFFFF00;
size = (size + 255) & 0x7FFFFF00;
this.size = size;
this.offset = offset;
max_shift = Math.max(1024, size * 8);
min_shift = Math.max(8, size / 4);
@@ -194,7 +195,7 @@ public class SSU2Bitfield {
int t = (int) thru;
if (ranges == null || rangeCount == 0) {
// easy case, no ranges
SSU2Bitfield rv = new SSU2Bitfield(acnt + 1, t);
SSU2Bitfield rv = new SSU2Bitfield(acnt + 1, thru - acnt);
for (int i = t; i >= t - acnt; i--) {
rv.set(i);
}
@@ -206,7 +207,7 @@ public class SSU2Bitfield {
min -= ranges[i] & 0xff;
}
// fixup if the last ack count was zero
// this doesn't handle multple ranges with a zero ack count
// this doesn't handle multiple ranges with a zero ack count
if (ranges[(rangeCount * 2) - 1] == 0)
min += ranges[(rangeCount * 2) - 2] & 0xff;
@@ -228,6 +229,51 @@ public class SSU2Bitfield {
return rv;
}
public interface Callback {
public void bitSet(long bit);
}
/**
* Callback for all bits
* set in this bitfield but not set in bf2.
*
* If this offset is greater than bf2's highest bit set,
* i.e. this bitfield is completely newer,
* calls back for all bits in this bitfield.
*
* If this highest bit set is less than than bf2's offset,
* i.e. this bitfield is completely older,
* the callback will not be called.
*
* Synchs on this and then on bf2.
*
* Usage: this is the received acks, bf2 is previously acked,
* callback for each newly acked.
*
*/
public synchronized void forEachAndNot(SSU2Bitfield bf2, Callback cb) {
synchronized(bf2) {
long highest = getHighestSet();
if (highest < bf2.offset) {
// completely older
return;
}
// We MUST go bottom-up, because bf2 may shift
// overlap portion
long start = Math.max(offset, bf2.offset);
long bf2Highest = bf2.getHighestSet();
for (long bit = start; bit < bf2Highest && bit <= highest; bit++) {
if (get(bit) && !bf2.set(bit))
cb.bitSet(bit);
}
// portion that is strictly newer
for (long bit = bf2Highest + 1; bit <= highest; bit++) {
bf2.set(bit);
cb.bitSet(bit);
}
}
}
/**
* Pretty print an ACK block
*
@@ -291,23 +337,35 @@ public class SSU2Bitfield {
/****
private static class CallbackImpl implements Callback {
public void bitSet(long bit) {
System.out.print(" " + bit);
}
}
public static void main(String[] args) {
Callback cbi = new CallbackImpl();
int off = 100;
SSU2Bitfield bf = new SSU2Bitfield(256, off);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 1);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 2);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 4);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 5);
System.out.println(bf.toString());
bf.toAckBlock(20);
@@ -316,18 +374,22 @@ public class SSU2Bitfield {
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 88);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 254);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 255);
System.out.println(bf.toString());
bf.toAckBlock(20);
bf.set(off + 300);
System.out.println(bf.toString());
bf.toAckBlock(20);