Add SSU2 support to OMF

This commit is contained in:
zzz
2022-02-26 17:15:07 -05:00
parent e3db28542c
commit 759f6968f6
3 changed files with 76 additions and 33 deletions

View File

@@ -53,6 +53,8 @@ class OutboundMessageFragments {
private volatile boolean _alive;
private final PacketBuilder _builder;
// null if SSU2 not enabled
private final PacketBuilder2 _builder2;
/** if we can handle more messages explicitly, set this to true */
// private boolean _allowExcess; // LINT not used??
@@ -70,6 +72,7 @@ class OutboundMessageFragments {
// _throttle = throttle;
_activePeers = new ConcurrentHashSet<PeerState>(256);
_builder = transport.getBuilder();
_builder2 = transport.getBuilder2();
_alive = true;
// _allowExcess = false;
_context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", UDPTransport.RATES);
@@ -388,16 +391,33 @@ class OutboundMessageFragments {
if (states == null || peer == null)
return null;
// ok, simplest possible thing is to always tack on the bitfields if
List<Long> msgIds = peer.getCurrentFullACKs();
int newFullAckCount = msgIds.size();
msgIds.addAll(peer.getCurrentResendACKs());
List<ACKBitfield> partialACKBitfields = new ArrayList<ACKBitfield>();
peer.fetchPartialACKs(partialACKBitfields);
int piggybackedPartialACK = partialACKBitfields.size();
// getCurrentFullACKs() already makes a copy, do we need to copy again?
// YES because buildPacket() now removes them (maybe)
Set<Long> remaining = new HashSet<Long>(msgIds);
List<Long> msgIds;
int newFullAckCount;
List<ACKBitfield> partialACKBitfields;
int piggybackedPartialACK;
Set<Long> remaining;
int before;
if (peer.getVersion() == 1) {
// ok, simplest possible thing is to always tack on the bitfields if
msgIds = peer.getCurrentFullACKs();
newFullAckCount = msgIds.size();
msgIds.addAll(peer.getCurrentResendACKs());
partialACKBitfields = new ArrayList<ACKBitfield>();
peer.fetchPartialACKs(partialACKBitfields);
piggybackedPartialACK = partialACKBitfields.size();
// getCurrentFullACKs() already makes a copy, do we need to copy again?
// YES because buildPacket() now removes them (maybe)
remaining = new HashSet<Long>(msgIds);
before = remaining.size();
} else {
// all unused
msgIds = null;
newFullAckCount = 0;
partialACKBitfields = null;
piggybackedPartialACK = 0;
remaining = null;
before = 0;
}
// build the list of fragments to send
List<Fragment> toSend = new ArrayList<Fragment>(8);
@@ -439,7 +459,11 @@ class OutboundMessageFragments {
int curTotalDataSize = state.fragmentSize(next.num);
// now stuff in more fragments if they fit
if (i +1 < toSend.size()) {
int maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
int maxAvail;
if (peer.getVersion() == 1)
maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
else
maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
for (int j = i + 1; j < toSend.size(); j++) {
next = toSend.get(j);
int nextDataSize = next.state.fragmentSize(next.num);
@@ -451,15 +475,21 @@ class OutboundMessageFragments {
j--;
sendNext.add(next);
curTotalDataSize += nextDataSize;
maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
if (peer.getVersion() == 1)
maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
else
maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize);
if (_log.shouldLog(Log.INFO))
_log.info("Adding in additional " + next + " to " + peer);
} // else too big
}
}
int before = remaining.size();
UDPPacket pkt = _builder.buildPacket(sendNext, peer, remaining, newFullAckCount, partialACKBitfields);
UDPPacket pkt;
if (peer.getVersion() == 1)
pkt = _builder.buildPacket(sendNext, peer, remaining, newFullAckCount, partialACKBitfields);
else
pkt = _builder2.buildPacket(sendNext, (PeerState2) peer);
if (pkt != null) {
if (_log.shouldDebug())
_log.debug("Built packet with " + sendNext.size() + " fragments totalling " + curTotalDataSize +
@@ -473,25 +503,25 @@ class OutboundMessageFragments {
}
rv.add(pkt);
int after = remaining.size();
newFullAckCount = Math.max(0, newFullAckCount - (before - after));
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int j = 0; j < msgIds.size(); j++) {
Long id = msgIds.get(j);
if (!remaining.contains(id)) {
peer.removeACKMessage(id);
piggybackedAck++;
if (peer.getVersion() == 1) {
int after = remaining.size();
newFullAckCount = Math.max(0, newFullAckCount - (before - after));
int piggybackedAck = 0;
if (msgIds.size() != remaining.size()) {
for (int j = 0; j < msgIds.size(); j++) {
Long id = msgIds.get(j);
if (!remaining.contains(id)) {
peer.removeACKMessage(id);
piggybackedAck++;
}
}
}
if (piggybackedAck > 0)
_context.statManager().addRateData("udp.sendPiggyback", piggybackedAck);
if (piggybackedPartialACK - partialACKBitfields.size() > 0)
_context.statManager().addRateData("udp.sendPiggybackPartial", piggybackedPartialACK - partialACKBitfields.size(), state.getLifetime());
}
if (piggybackedAck > 0)
_context.statManager().addRateData("udp.sendPiggyback", piggybackedAck);
if (piggybackedPartialACK - partialACKBitfields.size() > 0)
_context.statManager().addRateData("udp.sendPiggybackPartial", piggybackedPartialACK - partialACKBitfields.size(), state.getLifetime());
// following for debugging and stats
pkt.setFragmentCount(sendNext.size());
pkt.setMessageType(msgType); //type of first fragment

View File

@@ -100,7 +100,7 @@ class PacketBuilder2 {
*
* @param numFragments &gt;= 1
*/
public static int getMaxAdditionalFragmentSize(PeerState2 peer, int numFragments, int curDataSize) {
public static int getMaxAdditionalFragmentSize(PeerState peer, int numFragments, int curDataSize) {
int available = peer.getMTU() - curDataSize;
if (peer.isIPv6())
available -= MIN_IPV6_DATA_PACKET_OVERHEAD;
@@ -235,6 +235,10 @@ class PacketBuilder2 {
}
packet.setPriority(priority);
if (fragments.isEmpty())
peer.getAckedMessages().set(pktNum); // not ack-eliciting
else
peer.fragmentsSent(pktNum, fragments);
return packet;
}
@@ -256,6 +260,7 @@ class PacketBuilder2 {
encryptDataPacket(packet, peer.getSendCipher(), pktNum, peer.getSendHeaderEncryptKey1(), peer.getSendHeaderEncryptKey2());
setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort());
packet.setPriority(PRIORITY_LOW);
peer.getAckedMessages().set(pktNum); // not ack-eliciting
return packet;
}

View File

@@ -43,7 +43,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
private final byte[] _sendHeaderEncryptKey2;
private final byte[] _rcvHeaderEncryptKey2;
private final SSU2Bitfield _receivedMessages;
private final SSU2Bitfield _sentMessages;
private final SSU2Bitfield _ackedMessages;
public static final int MIN_MTU = 1280;
@@ -64,7 +64,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
_sendHeaderEncryptKey2 = sendHdrKey2;
_rcvHeaderEncryptKey2 = rcvHdrKey2;
_receivedMessages = new SSU2Bitfield(256, 0);
_sentMessages = new SSU2Bitfield(256, 0);
_ackedMessages = new SSU2Bitfield(256, 0);
}
// SSU 1 overrides
@@ -130,7 +130,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
byte[] getSendHeaderEncryptKey2() { return _sendHeaderEncryptKey2; }
byte[] getRcvHeaderEncryptKey2() { return _rcvHeaderEncryptKey2; }
SSU2Bitfield getReceivedMessages() { return _receivedMessages; }
SSU2Bitfield getSentMessages() { return _sentMessages; }
SSU2Bitfield getAckedMessages() { return _ackedMessages; }
void receivePacket(UDPPacket packet) {
DatagramPacket dpacket = packet.getPacket();
@@ -324,4 +324,12 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback
state.releaseResources();
}
}
/**
* Record the mapping of packet number to what fragments were in it,
* so we can process acks.
*/
void fragmentsSent(long pktNum, List<PacketBuilder.Fragment> fragments) {
}
}