From 759f6968f6acb40bc0aa9c3b9f4badd37a8dc5e9 Mon Sep 17 00:00:00 2001 From: zzz <zzz@i2pmail.org> Date: Sat, 26 Feb 2022 17:15:07 -0500 Subject: [PATCH] Add SSU2 support to OMF --- .../udp/OutboundMessageFragments.java | 88 +++++++++++++------ .../router/transport/udp/PacketBuilder2.java | 7 +- .../i2p/router/transport/udp/PeerState2.java | 14 ++- 3 files changed, 76 insertions(+), 33 deletions(-) diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 87043ce416..8e4896c1c1 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -53,6 +53,8 @@ class OutboundMessageFragments { private volatile boolean _alive; private final PacketBuilder _builder; + // null if SSU2 not enabled + private final PacketBuilder2 _builder2; /** if we can handle more messages explicitly, set this to true */ // private boolean _allowExcess; // LINT not used?? @@ -70,6 +72,7 @@ class OutboundMessageFragments { // _throttle = throttle; _activePeers = new ConcurrentHashSet<PeerState>(256); _builder = transport.getBuilder(); + _builder2 = transport.getBuilder2(); _alive = true; // _allowExcess = false; _context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", UDPTransport.RATES); @@ -388,16 +391,33 @@ class OutboundMessageFragments { if (states == null || peer == null) return null; - // ok, simplest possible thing is to always tack on the bitfields if - List<Long> msgIds = peer.getCurrentFullACKs(); - int newFullAckCount = msgIds.size(); - msgIds.addAll(peer.getCurrentResendACKs()); - List<ACKBitfield> partialACKBitfields = new ArrayList<ACKBitfield>(); - peer.fetchPartialACKs(partialACKBitfields); - int piggybackedPartialACK = partialACKBitfields.size(); - // getCurrentFullACKs() already makes a copy, do we need to copy again? - // YES because buildPacket() now removes them (maybe) - Set<Long> remaining = new HashSet<Long>(msgIds); + List<Long> msgIds; + int newFullAckCount; + List<ACKBitfield> partialACKBitfields; + int piggybackedPartialACK; + Set<Long> remaining; + int before; + if (peer.getVersion() == 1) { + // ok, simplest possible thing is to always tack on the bitfields if + msgIds = peer.getCurrentFullACKs(); + newFullAckCount = msgIds.size(); + msgIds.addAll(peer.getCurrentResendACKs()); + partialACKBitfields = new ArrayList<ACKBitfield>(); + peer.fetchPartialACKs(partialACKBitfields); + piggybackedPartialACK = partialACKBitfields.size(); + // getCurrentFullACKs() already makes a copy, do we need to copy again? + // YES because buildPacket() now removes them (maybe) + remaining = new HashSet<Long>(msgIds); + before = remaining.size(); + } else { + // all unused + msgIds = null; + newFullAckCount = 0; + partialACKBitfields = null; + piggybackedPartialACK = 0; + remaining = null; + before = 0; + } // build the list of fragments to send List<Fragment> toSend = new ArrayList<Fragment>(8); @@ -439,7 +459,11 @@ class OutboundMessageFragments { int curTotalDataSize = state.fragmentSize(next.num); // now stuff in more fragments if they fit if (i +1 < toSend.size()) { - int maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize); + int maxAvail; + if (peer.getVersion() == 1) + maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize); + else + maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize); for (int j = i + 1; j < toSend.size(); j++) { next = toSend.get(j); int nextDataSize = next.state.fragmentSize(next.num); @@ -451,15 +475,21 @@ class OutboundMessageFragments { j--; sendNext.add(next); curTotalDataSize += nextDataSize; - maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize); + if (peer.getVersion() == 1) + maxAvail = PacketBuilder.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize); + else + maxAvail = PacketBuilder2.getMaxAdditionalFragmentSize(peer, sendNext.size(), curTotalDataSize); if (_log.shouldLog(Log.INFO)) _log.info("Adding in additional " + next + " to " + peer); } // else too big } } - int before = remaining.size(); - UDPPacket pkt = _builder.buildPacket(sendNext, peer, remaining, newFullAckCount, partialACKBitfields); + UDPPacket pkt; + if (peer.getVersion() == 1) + pkt = _builder.buildPacket(sendNext, peer, remaining, newFullAckCount, partialACKBitfields); + else + pkt = _builder2.buildPacket(sendNext, (PeerState2) peer); if (pkt != null) { if (_log.shouldDebug()) _log.debug("Built packet with " + sendNext.size() + " fragments totalling " + curTotalDataSize + @@ -473,25 +503,25 @@ class OutboundMessageFragments { } rv.add(pkt); - int after = remaining.size(); - newFullAckCount = Math.max(0, newFullAckCount - (before - after)); - - int piggybackedAck = 0; - if (msgIds.size() != remaining.size()) { - for (int j = 0; j < msgIds.size(); j++) { - Long id = msgIds.get(j); - if (!remaining.contains(id)) { - peer.removeACKMessage(id); - piggybackedAck++; + if (peer.getVersion() == 1) { + int after = remaining.size(); + newFullAckCount = Math.max(0, newFullAckCount - (before - after)); + int piggybackedAck = 0; + if (msgIds.size() != remaining.size()) { + for (int j = 0; j < msgIds.size(); j++) { + Long id = msgIds.get(j); + if (!remaining.contains(id)) { + peer.removeACKMessage(id); + piggybackedAck++; + } } } + if (piggybackedAck > 0) + _context.statManager().addRateData("udp.sendPiggyback", piggybackedAck); + if (piggybackedPartialACK - partialACKBitfields.size() > 0) + _context.statManager().addRateData("udp.sendPiggybackPartial", piggybackedPartialACK - partialACKBitfields.size(), state.getLifetime()); } - if (piggybackedAck > 0) - _context.statManager().addRateData("udp.sendPiggyback", piggybackedAck); - if (piggybackedPartialACK - partialACKBitfields.size() > 0) - _context.statManager().addRateData("udp.sendPiggybackPartial", piggybackedPartialACK - partialACKBitfields.size(), state.getLifetime()); - // following for debugging and stats pkt.setFragmentCount(sendNext.size()); pkt.setMessageType(msgType); //type of first fragment diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java index 93992cacdb..1fdef299e8 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java @@ -100,7 +100,7 @@ class PacketBuilder2 { * * @param numFragments >= 1 */ - public static int getMaxAdditionalFragmentSize(PeerState2 peer, int numFragments, int curDataSize) { + public static int getMaxAdditionalFragmentSize(PeerState peer, int numFragments, int curDataSize) { int available = peer.getMTU() - curDataSize; if (peer.isIPv6()) available -= MIN_IPV6_DATA_PACKET_OVERHEAD; @@ -235,6 +235,10 @@ class PacketBuilder2 { } packet.setPriority(priority); + if (fragments.isEmpty()) + peer.getAckedMessages().set(pktNum); // not ack-eliciting + else + peer.fragmentsSent(pktNum, fragments); return packet; } @@ -256,6 +260,7 @@ class PacketBuilder2 { encryptDataPacket(packet, peer.getSendCipher(), pktNum, peer.getSendHeaderEncryptKey1(), peer.getSendHeaderEncryptKey2()); setTo(packet, peer.getRemoteIPAddress(), peer.getRemotePort()); packet.setPriority(PRIORITY_LOW); + peer.getAckedMessages().set(pktNum); // not ack-eliciting return packet; } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState2.java b/router/java/src/net/i2p/router/transport/udp/PeerState2.java index b05bf0de54..8b98c9e14a 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState2.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState2.java @@ -43,7 +43,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback private final byte[] _sendHeaderEncryptKey2; private final byte[] _rcvHeaderEncryptKey2; private final SSU2Bitfield _receivedMessages; - private final SSU2Bitfield _sentMessages; + private final SSU2Bitfield _ackedMessages; public static final int MIN_MTU = 1280; @@ -64,7 +64,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback _sendHeaderEncryptKey2 = sendHdrKey2; _rcvHeaderEncryptKey2 = rcvHdrKey2; _receivedMessages = new SSU2Bitfield(256, 0); - _sentMessages = new SSU2Bitfield(256, 0); + _ackedMessages = new SSU2Bitfield(256, 0); } // SSU 1 overrides @@ -130,7 +130,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback byte[] getSendHeaderEncryptKey2() { return _sendHeaderEncryptKey2; } byte[] getRcvHeaderEncryptKey2() { return _rcvHeaderEncryptKey2; } SSU2Bitfield getReceivedMessages() { return _receivedMessages; } - SSU2Bitfield getSentMessages() { return _sentMessages; } + SSU2Bitfield getAckedMessages() { return _ackedMessages; } void receivePacket(UDPPacket packet) { DatagramPacket dpacket = packet.getPacket(); @@ -324,4 +324,12 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback state.releaseResources(); } } + + /** + * Record the mapping of packet number to what fragments were in it, + * so we can process acks. + */ + void fragmentsSent(long pktNum, List<PacketBuilder.Fragment> fragments) { + + } } -- GitLab