forked from I2P_Developers/i2p.i2p
2005-11-05 jrandom
* Include the most recent ACKs with packets, rather than only sending an
ack exactly once. SSU differs from TCP in this regard, as TCP has ever
increasing sequence numbers, while each message ID in SSU is random, so
we don't get the benefit of later ACKs implicitly ACKing earlier
messages.
* Reduced the max retransmission timeout for SSU
* Don't try to send messages queued up for a long time waiting for
establishment.
This commit is contained in:
@@ -147,6 +147,7 @@ public class OutboundMessageFragments {
|
||||
active = _activeMessages.size();
|
||||
_activeMessages.notifyAll();
|
||||
}
|
||||
msg.timestamp("made active along with: " + active);
|
||||
_context.statManager().addRateData("udp.outboundActiveCount", active, 0);
|
||||
}
|
||||
|
||||
@@ -181,6 +182,8 @@ public class OutboundMessageFragments {
|
||||
if (_nextPacketMessage < 0)
|
||||
_nextPacketMessage = 0;
|
||||
}
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("sending complete");
|
||||
i--;
|
||||
} else if (state.isExpired()) {
|
||||
_activeMessages.remove(i);
|
||||
@@ -188,6 +191,7 @@ public class OutboundMessageFragments {
|
||||
_context.statManager().addRateData("udp.sendFailed", state.getPushCount(), state.getLifetime());
|
||||
|
||||
if (state.getMessage() != null) {
|
||||
state.getMessage().timestamp("expired in the active pool");
|
||||
_transport.failed(state);
|
||||
} else {
|
||||
// it can not have an OutNetMessage if the source is the
|
||||
@@ -212,6 +216,7 @@ public class OutboundMessageFragments {
|
||||
// state.getPeer().congestionOccurred();
|
||||
|
||||
if (state.getMessage() != null) {
|
||||
state.getMessage().timestamp("too many sends");
|
||||
_transport.failed(state);
|
||||
} else {
|
||||
// it can not have an OutNetMessage if the source is the
|
||||
@@ -291,6 +296,8 @@ public class OutboundMessageFragments {
|
||||
// peer disconnected
|
||||
_activeMessages.remove(cur);
|
||||
locked_removeRetransmitter(state);
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("peer disconnected");
|
||||
_transport.failed(state);
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Peer disconnected for " + state);
|
||||
@@ -331,14 +338,35 @@ public class OutboundMessageFragments {
|
||||
} catch (InterruptedException ie) {}
|
||||
} // end of the while (alive && !found)
|
||||
|
||||
return preparePackets(state, peer);
|
||||
UDPPacket packets[] = preparePackets(state, peer);
|
||||
if ( (state != null) && (state.getMessage() != null) ) {
|
||||
int valid = 0;
|
||||
for (int i = 0; packets != null && i < packets.length ; i++)
|
||||
if (packets[i] != null)
|
||||
valid++;
|
||||
state.getMessage().timestamp("sending a volley of " + valid
|
||||
+ " lastReceived: "
|
||||
+ (_context.clock().now() - peer.getLastReceiveTime())
|
||||
+ " lastSentFully: "
|
||||
+ (_context.clock().now() - peer.getLastSendFullyTime()));
|
||||
}
|
||||
return packets;
|
||||
}
|
||||
|
||||
/**
|
||||
* If set to true, we should throttle retransmissions of all but the first message in
|
||||
* flight to a peer. If set to false, we will only throttle the initial flight of a
|
||||
* message to a peer while a retransmission is going on.
|
||||
*/
|
||||
private static final boolean THROTTLE_RESENDS = true;
|
||||
|
||||
private boolean locked_shouldSend(OutboundMessageState state, PeerState peer) {
|
||||
long now = _context.clock().now();
|
||||
if (state.getNextSendTime() <= now) {
|
||||
if (!state.isFragmented()) {
|
||||
state.fragment(fragmentSize(peer.getMTU()));
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("fragment into " + state.getFragmentCount());
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Fragmenting " + state);
|
||||
@@ -349,7 +377,16 @@ public class OutboundMessageFragments {
|
||||
// choke it, since there's already another message retransmitting to this
|
||||
// peer.
|
||||
_context.statManager().addRateData("udp.blockedRetransmissions", peer.getPacketsRetransmitted(), peer.getPacketsTransmitted());
|
||||
return false;
|
||||
if ( (state.getMaxSends() <= 0) || (THROTTLE_RESENDS) ) {
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("choked, with another message retransmitting");
|
||||
return false;
|
||||
} else {
|
||||
if (curRetransMsg.isExpired() || curRetransMsg.isComplete())
|
||||
_retransmitters.remove(peer);
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("another message is retransmitting, but since we've already begun sending...");
|
||||
}
|
||||
}
|
||||
|
||||
int size = state.getUnackedSize();
|
||||
@@ -390,6 +427,8 @@ public class OutboundMessageFragments {
|
||||
return true;
|
||||
} else {
|
||||
_context.statManager().addRateData("udp.sendRejected", state.getPushCount(), state.getLifetime());
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("send rejected, available=" + peer.getSendWindowBytesRemaining());
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Allocation of " + size + " rejected w/ wsize=" + peer.getSendWindowBytes()
|
||||
+ " available=" + peer.getSendWindowBytesRemaining()
|
||||
@@ -398,8 +437,17 @@ public class OutboundMessageFragments {
|
||||
if (_log.shouldLog(Log.WARN))
|
||||
_log.warn("Retransmit after choke for next send time in " + (state.getNextSendTime()-now) + "ms");
|
||||
_throttle.choke(peer.getRemotePeer());
|
||||
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("choked, not enough available, wsize="
|
||||
+ peer.getSendWindowBytes() + " available="
|
||||
+ peer.getSendWindowBytesRemaining());
|
||||
return false;
|
||||
}
|
||||
} // nextTime <= now
|
||||
|
||||
//if (state.getMessage() != null)
|
||||
// state.getMessage().timestamp("choked, time remaining to retransmit: " + (state.getNextSendTime() - now));
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -424,6 +472,8 @@ public class OutboundMessageFragments {
|
||||
else
|
||||
sparseCount++;
|
||||
}
|
||||
if (sparseCount > 0)
|
||||
remaining.clear();
|
||||
|
||||
int piggybackedAck = 0;
|
||||
if (msgIds.size() != remaining.size()) {
|
||||
@@ -501,6 +551,14 @@ public class OutboundMessageFragments {
|
||||
|
||||
if (state != null) {
|
||||
int numSends = state.getMaxSends();
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("acked after " + numSends
|
||||
+ " lastReceived: "
|
||||
+ (_context.clock().now() - state.getPeer().getLastReceiveTime())
|
||||
+ " lastSentFully: "
|
||||
+ (_context.clock().now() - state.getPeer().getLastSendFullyTime()));
|
||||
|
||||
|
||||
if (_log.shouldLog(Log.INFO))
|
||||
_log.info("Received ack of " + messageId + " by " + ackedBy.toBase64()
|
||||
+ " after " + state.getLifetime() + " and " + numSends + " sends");
|
||||
@@ -594,6 +652,8 @@ public class OutboundMessageFragments {
|
||||
_context.statManager().addRateData("udp.sendConfirmFragments", state.getFragmentCount(), state.getLifetime());
|
||||
if (numSends > 1)
|
||||
_context.statManager().addRateData("udp.sendConfirmVolley", numSends, state.getFragmentCount());
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("partial ack to complete after " + numSends);
|
||||
_transport.succeeded(state.getMessage());
|
||||
|
||||
if (state.getPeer() != null) {
|
||||
@@ -604,6 +664,9 @@ public class OutboundMessageFragments {
|
||||
}
|
||||
|
||||
state.releaseResources();
|
||||
} else {
|
||||
if (state.getMessage() != null)
|
||||
state.getMessage().timestamp("partial ack after " + numSends + ": " + bitfield.toString());
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user