diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index 7a64c0efe..8940f6606 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -328,6 +328,18 @@ class OutboundMessageState implements CDPQEntry { return isComplete(); } + /** + * Ack this fragment number. + * For SSU 2 only. + * + * @return true if the message was completely ACKed + * @since 0.9.54 + */ + public synchronized boolean acked(int fragmentNum) { + _fragmentAcks &= ~mask(fragmentNum); + return isComplete(); + } + /** * The max number of sends for any fragment. * As of 0.9.49, may be less than getPushCount() if we pushed only some fragments diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java index 189302bbb..1f5a424a0 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder2.java @@ -264,10 +264,19 @@ class PacketBuilder2 { } packet.setPriority(priority); - if (fragments.isEmpty()) + if (fragments.isEmpty()) { peer.getAckedMessages().set(pktNum); // not ack-eliciting - else + packet.markType(1); + packet.setFragmentCount(-1); + packet.setMessageType(TYPE_ACK); + } else { + // OMF reuses/clears the fragments list, so we must copy it + if (fragments.size() == 1) + fragments = Collections.singletonList(fragments.get(0)); + else + fragments = new ArrayList(fragments); peer.fragmentsSent(pktNum, fragments); + } return packet; } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 58f933663..a10944420 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -70,7 +70,7 @@ public class PeerState { private SessionKey _nextCipherKey; /** when were the current cipher and MAC keys established/rekeyed? */ - private long _keyEstablishedTime; + private final long _keyEstablishedTime; /** * How far off is the remote peer from our clock, in milliseconds? @@ -111,7 +111,7 @@ public class PeerState { /** when did we last send ACKs to the peer? */ private volatile long _lastACKSend; /** when did we decide we need to ACK to this peer? */ - private volatile long _wantACKSendSince; + protected volatile long _wantACKSendSince; /** have we received a packet with the ECN bit set in the current second? */ private boolean _currentSecondECNReceived; /** @@ -169,7 +169,7 @@ public class PeerState { private int _mtuIncreases; private int _mtuDecreases; /** current round trip time estimate */ - private int _rtt; + protected int _rtt; /** smoothed mean deviation in the rtt */ private int _rttDeviation; /** current retransmission timeout */ @@ -228,7 +228,7 @@ public class PeerState { /** how many concurrency rejections have we had in a row */ private int _consecutiveRejections; /** is it inbound? **/ - private final boolean _isInbound; + protected final boolean _isInbound; /** Last time it was made an introducer **/ private long _lastIntroducerTime; @@ -304,7 +304,7 @@ public class PeerState { private static final int INIT_RTT = 0; private static final int MAX_RTO = 60*1000; /** how frequently do we want to send ACKs to a peer? */ - private static final int ACK_FREQUENCY = 150; + protected static final int ACK_FREQUENCY = 150; private static final int CLOCK_SKEW_FUDGE = (ACK_FREQUENCY * 2) / 3; /** @@ -777,7 +777,9 @@ public class PeerState { _receiveBytes = 0; _receivePeriodBegin = now; } - _currentACKs.add(messageId); + // null for PeerState2 + if (_currentACKs != null) + _currentACKs.add(messageId); messagePartiallyReceived(now); } @@ -792,7 +794,7 @@ public class PeerState { * We received a partial message, or we want to send some acks. * @since 0.9.52 */ - private synchronized void messagePartiallyReceived(long now) { + protected synchronized void messagePartiallyReceived(long now) { if (_wantACKSendSince <= 0) { _wantACKSendSince = now; new ACKTimer(); @@ -1180,7 +1182,7 @@ public class PeerState { /** * We sent a message which was ACKed containing the given # of bytes. */ - private void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending, boolean anyQueued) { + protected void messageACKed(int bytesACKed, long lifetime, int numSends, boolean anyPending, boolean anyQueued) { synchronized(this) { locked_messageACKed(bytesACKed, lifetime, numSends, anyPending, anyQueued); } @@ -1841,12 +1843,15 @@ public class PeerState { long sn = state.getSeqNum(); if (sn > highestSeqNumAcked.value) highestSeqNumAcked.value = sn; - synchronized(_ackedMessages) { - _ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn)); + // null for PS2 + if (_ackedMessages != null) { + synchronized(_ackedMessages) { + _ackedMessages.put(Integer.valueOf((int) messageId), Long.valueOf(sn)); + } } // this adjusts the rtt/rto/window/etc messageACKed(state.getUnackedSize(), lifetime, numSends, anyPending, anyQueued); - } else { + } else if (_ackedMessages != null) { // null for PS2 // dupack, likely Long seq; synchronized(_ackedMessages) { diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState2.java b/router/java/src/net/i2p/router/transport/udp/PeerState2.java index aba46fd9d..31fffcadf 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState2.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState2.java @@ -7,6 +7,7 @@ import java.net.UnknownHostException; import java.security.GeneralSecurityException; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; import com.southernstorm.noise.protocol.CipherState; @@ -20,9 +21,11 @@ import net.i2p.data.i2np.I2NPMessage; import net.i2p.data.i2np.I2NPMessageException; import net.i2p.data.i2np.I2NPMessageImpl; import net.i2p.router.RouterContext; +import net.i2p.router.transport.udp.InboundMessageFragments.ModifiableLong; import static net.i2p.router.transport.udp.SSU2Util.*; import net.i2p.util.HexDump; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer2; /** * Contain all of the state about a UDP connection to a peer. @@ -34,7 +37,7 @@ import net.i2p.util.Log; * * @since 0.9.54 */ -public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback { +public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback, SSU2Bitfield.Callback { private final long _sendConnID; private final long _rcvConnID; private final AtomicInteger _packetNumber = new AtomicInteger(); @@ -45,8 +48,18 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback private final byte[] _sendHeaderEncryptKey2; private final byte[] _rcvHeaderEncryptKey2; private final SSU2Bitfield _receivedMessages; + /** + * PS1 has _ackedMessages which is a map of message ID to sequence number. + * Here we have the reverse, a bitfield of acked packet (sequence) numbers, + * and map of unacked packet (sequence) numbers to the fragments that packet contained. + */ private final SSU2Bitfield _ackedMessages; + private final ConcurrentHashMap> _sentMessages; + + // Session Confirmed retransmit private byte[] _sessConfForReTX; + private long _sessConfSentTime; + private int _sessConfSentCount; // As SSU public static final int MIN_SSU_IPV4_MTU = 1292; @@ -60,6 +73,11 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback public static final int MAX_MTU = 1500; public static final int DEFAULT_MTU = MAX_MTU; + private static final int MAX_SESS_CONF_RETX = 6; + private static final int SESS_CONF_RETX_TIME = 1000; + + + /** * @param rtt from the EstablishState, or 0 if not available */ @@ -78,6 +96,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback _rcvHeaderEncryptKey2 = rcvHdrKey2; _receivedMessages = new SSU2Bitfield(256, 0); _ackedMessages = new SSU2Bitfield(256, 0); + _sentMessages = new ConcurrentHashMap>(32); if (isInbound) { // Send immediate ack of Session Confirmed _receivedMessages.set(0); @@ -130,18 +149,54 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback */ @Override void clearWantedACKSendSince() { - // TODO - //if ( ) - // _wantACKSendSince = 0; + // race prevention + if (_sentMessages.isEmpty()) + _wantACKSendSince = 0; } /** - * We received the message specified completely. - * @param bytes if less than or equal to zero, message is a duplicate. + * Overridden to use our version of ACKTimer */ @Override - void messageFullyReceived(Long messageId, int bytes) { - // TODO + protected synchronized void messagePartiallyReceived(long now) { + if (_wantACKSendSince <= 0) { + _wantACKSendSince = now; + new ACKTimer(); + } + } + + /** + * Overridden to retransmit SessionConfirmed also + */ + @Override + List allocateSend(long now) { + if (!_isInbound && _ackedMessages.getOffset() == 0 && !_ackedMessages.get(0)) { + UDPPacket[] packets = null; + synchronized(this) { + if (_sessConfForReTX != null) { + // retransmit Session Confirmed when it's time + if (_sessConfSentTime + (SESS_CONF_RETX_TIME << (_sessConfSentCount - 1)) < now) { + if (_sessConfSentCount >= MAX_SESS_CONF_RETX) { + if (_log.shouldWarn()) + _log.warn("Fail, no Sess Conf ACK rcvd on " + this); + _transport.dropPeer(this, false, "No Sess Conf ACK rcvd"); + _sessConfForReTX = null; + return null; + } + _sessConfSentCount++; + packets = getRetransmitSessionConfirmedPackets(); + } + } + } + if (packets != null) { + if (_log.shouldInfo()) + _log.info("ReTX Sess Conf on " + this); + for (int i = 0; i < packets.length; i++) { + _transport.send(packets[i]); + } + } + } + return super.allocateSend(now); } // SSU 1 unsupported things @@ -174,7 +229,12 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback byte[] getRcvHeaderEncryptKey1() { return _rcvHeaderEncryptKey1; } byte[] getSendHeaderEncryptKey2() { return _sendHeaderEncryptKey2; } byte[] getRcvHeaderEncryptKey2() { return _rcvHeaderEncryptKey2; } - SSU2Bitfield getReceivedMessages() { return _receivedMessages; } + + SSU2Bitfield getReceivedMessages() { + if (_log.shouldDebug()) + _log.debug("Sending acks " + _receivedMessages + " on " + this); + return _receivedMessages; + } SSU2Bitfield getAckedMessages() { return _ackedMessages; } /** @@ -337,16 +397,21 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback _context.messageHistory().droppedInboundMessage(state.getMessageId(), state.getFrom(), "expired while partially read: " + state.toString()); // all state access must be before this state.releaseResources(); + } else { + messagePartiallyReceived(); } } public void gotACK(long ackThru, int acks, byte[] ranges) { - if (_log.shouldDebug()) { - if (ranges != null) - _log.debug("Got ACK block: " + SSU2Bitfield.toString(ackThru, acks, ranges, ranges.length / 2)); - else - _log.debug("Got ACK block: " + SSU2Bitfield.toString(ackThru, acks, null, 0)); - } + SSU2Bitfield ackbf; + if (ranges != null) + ackbf = SSU2Bitfield.fromACKBlock(ackThru, acks, ranges, ranges.length / 2); + else + ackbf = SSU2Bitfield.fromACKBlock(ackThru, acks, null, 0); + if (_log.shouldDebug()) + _log.debug("Got ACK block: " + ackbf); + // calls bitSet() below + ackbf.forEachAndNot(_ackedMessages, this); } public void gotTermination(int reason, long count) { @@ -409,7 +474,76 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback * so we can process acks. */ void fragmentsSent(long pktNum, List fragments) { + List old = _sentMessages.putIfAbsent(Long.valueOf(pktNum), fragments); + if (old != null) { + // shouldn't happen + if (_log.shouldWarn()) + _log.warn("Dup send of pkt " + pktNum + " on " + this); + } else { + if (_log.shouldWarn()) + _log.warn("New data pkt " + pktNum + " sent with " + fragments.size() + " fragments on " + this); + } + } + /** + * Callback from SSU2Bitfield.forEachAndNot(). + * A new ack was received. + */ + public void bitSet(long pktNum) { + if (pktNum == 0 && !_isInbound) { + // we don't need to save the Session Confirmed for retransmission any more + synchronized(this) { + _sessConfForReTX = null; + } + if (_log.shouldDebug()) + _log.debug("New ACK of Session Confirmed on " + this); + return; + } + List fragments = _sentMessages.remove(Long.valueOf(pktNum)); + if (fragments == null) { + // shouldn't happen + if (_log.shouldWarn()) + _log.warn("New ACK of pkt " + pktNum + " not found on " + this); + return; + } + if (_log.shouldDebug()) + _log.debug("New ACK of pkt " + pktNum + " containing " + fragments.size() + " fragments on " + this); + ModifiableLong highestSeqNumAcked = new ModifiableLong(-1); + for (PacketBuilder.Fragment f : fragments) { + OutboundMessageState state = f.state; + if (state.isComplete()) { + if (_log.shouldWarn()) + _log.warn("New ACK but state complete? " + state); + continue; + } + int num = f.num; + int ackedSize = state.getUnackedSize(); + boolean complete = state.acked(f.num); + if (complete) { + if (_log.shouldDebug()) + _log.debug("Received ACK of fragment " + num + " of " + state + + ", now complete"); + acked(state.getMessageId(), highestSeqNumAcked); + if (ackedSize > 0) { + // TODO acked() will have an ackedSize of 0, so do it here also + messageACKed(ackedSize, state.getLifetime(), state.getMaxSends(), false, false); + } + } else { + ackedSize -= state.getUnackedSize(); + if (_log.shouldDebug()) + _log.debug("Received ACK of fragment " + num + " of " + state + + ", still incomplete"); + if (ackedSize > 0) { + state.clearNACKs(); + // this adjusts the rtt/rto/window/etc + // flags TODO + messageACKed(ackedSize, state.getLifetime(), state.getMaxSends(), false, false); + } + } + } + long highest = highestSeqNumAcked.value; + if (highest >= 0) + highestSeqNumAcked(highest); } /** @@ -419,6 +553,8 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback public synchronized void confirmedPacketsSent(byte[] data) { if (_sessConfForReTX == null) _sessConfForReTX = data; + _sessConfSentTime = _context.clock().now(); + _sessConfSentCount++; } /** @@ -440,4 +576,37 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback packet.setPriority(PacketBuilder2.PRIORITY_HIGH); return rv; } + + /** + * A timer to send an ack-only packet. + */ + private class ACKTimer extends SimpleTimer2.TimedEvent { + public ACKTimer() { + super(_context.simpleTimer2()); + long delta = Math.min(_rtt/2, ACK_FREQUENCY); + if (_log.shouldDebug()) + _log.debug("Sending delayed ack in " + delta + ": " + PeerState2.this); + schedule(delta); + } + + /** + * Send an ack-only packet, unless acks were already sent + * as indicated by _wantACKSendSince == 0. + * Will not requeue unless the acks don't all fit (unlikely). + */ + public void timeReached() { + synchronized(PeerState2.this) { + long wanted = _wantACKSendSince; + if (wanted <= 0) { + if (_log.shouldDebug()) + _log.debug("Already acked:" + PeerState2.this); + return; + } + UDPPacket ack = _transport.getBuilder2().buildACK(PeerState2.this); + if (_log.shouldDebug()) + _log.debug("Sending acks to " + PeerState2.this); + _transport.send(ack); + } + } + } } diff --git a/router/java/src/net/i2p/router/transport/udp/SSU2Bitfield.java b/router/java/src/net/i2p/router/transport/udp/SSU2Bitfield.java index 395d7463d..9028f428f 100644 --- a/router/java/src/net/i2p/router/transport/udp/SSU2Bitfield.java +++ b/router/java/src/net/i2p/router/transport/udp/SSU2Bitfield.java @@ -14,7 +14,7 @@ import net.i2p.router.transport.udp.SSU2Payload.AckBlock; * * @since 0.9.54 */ -public class SSU2Bitfield { +class SSU2Bitfield { private final long[] bitfield; private final int size; @@ -38,7 +38,8 @@ public class SSU2Bitfield { if (size <= 0 || offset < 0) throw new IllegalArgumentException(); // force mult. of 256 - this.size = (size + 255) & 0x7FFFFF00; + size = (size + 255) & 0x7FFFFF00; + this.size = size; this.offset = offset; max_shift = Math.max(1024, size * 8); min_shift = Math.max(8, size / 4); @@ -194,7 +195,7 @@ public class SSU2Bitfield { int t = (int) thru; if (ranges == null || rangeCount == 0) { // easy case, no ranges - SSU2Bitfield rv = new SSU2Bitfield(acnt + 1, t); + SSU2Bitfield rv = new SSU2Bitfield(acnt + 1, thru - acnt); for (int i = t; i >= t - acnt; i--) { rv.set(i); } @@ -206,7 +207,7 @@ public class SSU2Bitfield { min -= ranges[i] & 0xff; } // fixup if the last ack count was zero - // this doesn't handle multple ranges with a zero ack count + // this doesn't handle multiple ranges with a zero ack count if (ranges[(rangeCount * 2) - 1] == 0) min += ranges[(rangeCount * 2) - 2] & 0xff; @@ -228,6 +229,51 @@ public class SSU2Bitfield { return rv; } + public interface Callback { + public void bitSet(long bit); + } + + /** + * Callback for all bits + * set in this bitfield but not set in bf2. + * + * If this offset is greater than bf2's highest bit set, + * i.e. this bitfield is completely newer, + * calls back for all bits in this bitfield. + * + * If this highest bit set is less than than bf2's offset, + * i.e. this bitfield is completely older, + * the callback will not be called. + * + * Synchs on this and then on bf2. + * + * Usage: this is the received acks, bf2 is previously acked, + * callback for each newly acked. + * + */ + public synchronized void forEachAndNot(SSU2Bitfield bf2, Callback cb) { + synchronized(bf2) { + long highest = getHighestSet(); + if (highest < bf2.offset) { + // completely older + return; + } + // We MUST go bottom-up, because bf2 may shift + // overlap portion + long start = Math.max(offset, bf2.offset); + long bf2Highest = bf2.getHighestSet(); + for (long bit = start; bit < bf2Highest && bit <= highest; bit++) { + if (get(bit) && !bf2.set(bit)) + cb.bitSet(bit); + } + // portion that is strictly newer + for (long bit = bf2Highest + 1; bit <= highest; bit++) { + bf2.set(bit); + cb.bitSet(bit); + } + } + } + /** * Pretty print an ACK block * @@ -291,23 +337,35 @@ public class SSU2Bitfield { /**** + private static class CallbackImpl implements Callback { + public void bitSet(long bit) { + System.out.print(" " + bit); + } + } + public static void main(String[] args) { + Callback cbi = new CallbackImpl(); int off = 100; SSU2Bitfield bf = new SSU2Bitfield(256, off); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 1); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 2); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 4); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 5); System.out.println(bf.toString()); bf.toAckBlock(20); @@ -316,18 +374,22 @@ public class SSU2Bitfield { System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 88); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 254); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 255); System.out.println(bf.toString()); bf.toAckBlock(20); + bf.set(off + 300); System.out.println(bf.toString()); bf.toAckBlock(20);