diff --git a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java index 6e70e924135b72b01213a5d1305c52220f18b771..ead02c0ae9b2146fe999a70043d829c78cc4ab5d 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundEstablishState2.java @@ -615,6 +615,7 @@ class InboundEstablishState2 extends InboundEstablishState implements SSU2Payloa byte data[] = pkt.getData(); int off = pkt.getOffset(); System.arraycopy(_sessCrForReTX, 0, data, off, _sessCrForReTX.length); + pkt.setLength(_sessCrForReTX.length); pkt.setSocketAddress(_aliceSocketAddress); packet.setMessageType(PacketBuilder2.TYPE_CONF); packet.setPriority(PacketBuilder2.PRIORITY_HIGH); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java index 3d4896a6b0dcf436b3b4dcbc550c38aacd2594f5..45d9cfb0cbe37ca975a4029c69644b9d792e6e7f 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundEstablishState2.java @@ -514,6 +514,7 @@ class OutboundEstablishState2 extends OutboundEstablishState implements SSU2Payl byte data[] = pkt.getData(); int off = pkt.getOffset(); System.arraycopy(_sessReqForReTX, 0, data, off, _sessReqForReTX.length); + pkt.setLength(_sessReqForReTX.length); pkt.setSocketAddress(_bobSocketAddress); packet.setMessageType(PacketBuilder2.TYPE_SREQ); packet.setPriority(PacketBuilder2.PRIORITY_HIGH); diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 11f6573691eb8a442ee1a1ee4fb8029451e8cfd3..9920602804fe85cef791ada7c9e8091ad434894a 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -70,7 +70,7 @@ public class PeerState { private SessionKey _nextCipherKey; /** when were the current cipher and MAC keys established/rekeyed? */ - private final long _keyEstablishedTime; + protected final long _keyEstablishedTime; /** * How far off is the remote peer from our clock, in milliseconds? diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState2.java b/router/java/src/net/i2p/router/transport/udp/PeerState2.java index 439226083eb575793416bc2f2e126dd2417d6202..4d9b1a1d578e7e1e1cdf3ebb1a039cf4d5a261c6 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState2.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState2.java @@ -5,6 +5,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.security.GeneralSecurityException; +import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentHashMap; @@ -55,6 +56,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback */ private final SSU2Bitfield _ackedMessages; private final ConcurrentHashMap<Long, List<PacketBuilder.Fragment>> _sentMessages; + private long _sentMessagesLastExpired; // Session Confirmed retransmit private byte[] _sessConfForReTX; @@ -76,7 +78,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback private static final int BITFIELD_SIZE = 512; private static final int MAX_SESS_CONF_RETX = 6; private static final int SESS_CONF_RETX_TIME = 1000; - + private static final long SENT_MESSAGES_CLEAN_TIME = 60*1000; /** @@ -98,6 +100,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback _receivedMessages = new SSU2Bitfield(BITFIELD_SIZE, 0); _ackedMessages = new SSU2Bitfield(BITFIELD_SIZE, 0); _sentMessages = new ConcurrentHashMap<Long, List<PacketBuilder.Fragment>>(32); + _sentMessagesLastExpired = _keyEstablishedTime; if (isInbound) { // Send immediate ack of Session Confirmed _receivedMessages.set(0); @@ -168,6 +171,37 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback } } + /** + * Overridden to expire unacked packets in _sentMessages. + * These will remain unacked if lost; fragments will be retransmitted + * in a new packet. + * + * @return number of active outbound messages remaining + */ + @Override + int finishMessages(long now) { + if (now >= _sentMessagesLastExpired + SENT_MESSAGES_CLEAN_TIME) { + _sentMessagesLastExpired = now; + if (!_sentMessages.isEmpty()) { + if (_log.shouldDebug()) + _log.debug("finishMessages() over " + _sentMessages.size() + " pending acks"); + loop: + for (Iterator<List<PacketBuilder.Fragment>> iter = _sentMessages.values().iterator(); iter.hasNext(); ) { + List<PacketBuilder.Fragment> frags = iter.next(); + for (PacketBuilder.Fragment f : frags) { + OutboundMessageState state = f.state; + if (!state.isComplete() && !state.isExpired(now)) + continue loop; + } + iter.remove(); + if (_log.shouldWarn()) + _log.warn("Cleaned from sentMessages: " + frags); + } + } + } + return super.finishMessages(now); + } + /** * Overridden to retransmit SessionConfirmed also */ @@ -570,6 +604,7 @@ public class PeerState2 extends PeerState implements SSU2Payload.PayloadCallback byte data[] = pkt.getData(); int off = pkt.getOffset(); System.arraycopy(_sessConfForReTX, 0, data, off, _sessConfForReTX.length); + pkt.setLength(_sessConfForReTX.length); pkt.setAddress(_remoteIPAddress); pkt.setPort(_remotePort); packet.setMessageType(PacketBuilder2.TYPE_CONF);