Whitespace

This commit is contained in:
Zlatin Balevsky
2020-12-21 23:00:12 +00:00
parent 82e93a53a3
commit e88f40cd95
2 changed files with 153 additions and 153 deletions

View File

@@ -41,11 +41,11 @@ class OutboundMessageState implements CDPQEntry {
private long _seqNum;
/** how many bytes push() is allowed to send */
private int _allowedSendBytes;
public static final int MAX_MSG_SIZE = 32 * 1024;
private static final long EXPIRATION = 10*1000;
/**
* "injected" message from the establisher.
@@ -56,7 +56,7 @@ class OutboundMessageState implements CDPQEntry {
public OutboundMessageState(I2PAppContext context, I2NPMessage msg, PeerState peer) {
this(context, null, msg, peer);
}
/**
* Normal constructor.
*
@@ -66,7 +66,7 @@ class OutboundMessageState implements CDPQEntry {
public OutboundMessageState(I2PAppContext context, OutNetMessage m, PeerState peer) {
this(context, m, m.getMessage(), peer);
}
/**
* Internal.
* @param m null if msg is "injected"
@@ -102,7 +102,7 @@ class OutboundMessageState implements CDPQEntry {
_fragmentAcks = _numFragments < 64 ? mask(_numFragments) - 1L : -1L;
_fragmentSends = (numFragments > 1) ? new byte[numFragments] : null;
}
/**
* @param fragment 0-63
*/
@@ -117,14 +117,14 @@ class OutboundMessageState implements CDPQEntry {
public PeerState getPeer() { return _peer; }
public boolean isExpired() {
return _expiration < _context.clock().now();
return _expiration < _context.clock().now();
}
/**
* @since 0.9.38
*/
public boolean isExpired(long now) {
return _expiration < now;
return _expiration < now;
}
public synchronized boolean isComplete() {
@@ -282,7 +282,7 @@ class OutboundMessageState implements CDPQEntry {
}
public long getLifetime() { return _context.clock().now() - _startedOn; }
/**
* Ack all the fragments in the ack list.
*
@@ -297,7 +297,7 @@ class OutboundMessageState implements CDPQEntry {
}
return isComplete();
}
/**
* The max number of sends for any fragment.
* As of 0.9.49, may be less than getPushCount() if we pushed only some fragments
@@ -321,7 +321,7 @@ class OutboundMessageState implements CDPQEntry {
* @return the number of Fragments added
* @since 0.9.49
*/
public synchronized int push(List<Fragment> toSend) {
public synchronized int push(List<Fragment> toSend) {
int rv = 0;
if (_allowedSendBytes <= 0 || _numFragments == 1) {
// easy way
@@ -383,10 +383,10 @@ class OutboundMessageState implements CDPQEntry {
}
/**
* How many fragments in the message.
* How many fragments in the message
*/
public int getFragmentCount() {
return _numFragments;
public int getFragmentCount() {
return _numFragments;
}
/**
@@ -398,7 +398,7 @@ class OutboundMessageState implements CDPQEntry {
* The size in bytes of the fragment.
* Does NOT include any SSU overhead.
*
* @param fragmentNum the number of the fragment
* @param fragmentNum the number of the fragment
* @return the size of the fragment specified by the number
*/
public int fragmentSize(int fragmentNum) {
@@ -435,7 +435,7 @@ class OutboundMessageState implements CDPQEntry {
}
return -1;
}
/**
* For CDQ
* @since 0.9.3

View File

@@ -41,24 +41,24 @@ public class PeerState {
* is established.
*/
private final Hash _remotePeer;
/**
/**
* The AES key used to verify packets, set only after the connection is
* established.
* established.
*/
private SessionKey _currentMACKey;
/**
* The AES key used to encrypt/decrypt packets, set only after the
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*/
private SessionKey _currentCipherKey;
/**
* The pending AES key for verifying packets if we are rekeying the
/**
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
*/
private SessionKey _nextMACKey;
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
* of rekeying.
*/
private SessionKey _nextCipherKey;
@@ -95,7 +95,7 @@ public class PeerState {
*/
private final Set<Long> _currentACKs;
/**
/**
* list of the most recent messageIds (Long) that we have received and sent
* an ACK for. We keep a few of these around to retransmit with _currentACKs,
* hopefully saving some spurious retransmissions
@@ -108,7 +108,7 @@ public class PeerState {
private volatile long _wantACKSendSince;
/** have we received a packet with the ECN bit set in the current second? */
private boolean _currentSecondECNReceived;
/**
/**
* have all of the packets received in the current second requested that
* the previous second's ACKs be sent?
*/
@@ -123,7 +123,7 @@ public class PeerState {
private int _receiveBytes;
private long _receivePeriodBegin;
private volatile long _lastCongestionOccurred;
/**
/**
* when sendWindowBytes is below this, grow the window size quickly,
* but after we reach it, grow it slowly
*
@@ -141,10 +141,10 @@ public class PeerState {
/** if we need to contact them, do we need to talk to an introducer? */
//private boolean _remoteRequiresIntroduction;
/**
/**
* if we are serving as an introducer to them, this is the the tag that
* they can publish that, when presented to us, will cause us to send
* a relay introduction to the current peer
* a relay introduction to the current peer
*/
private long _weRelayToThemAs;
/**
@@ -167,10 +167,10 @@ public class PeerState {
private int _rttDeviation;
/** current retransmission timeout */
private int _rto;
/** how many packets will be considered within the retransmission rate calculation */
static final long RETRANSMISSION_PERIOD_WIDTH = 100;
private int _messagesReceived;
private int _messagesSent;
private int _packetsTransmitted;
@@ -181,7 +181,7 @@ public class PeerState {
private int _packetsReceivedDuplicate;
private int _packetsReceived;
private boolean _mayDisconnect;
/** list of InboundMessageState for active message */
private final Map<Long, InboundMessageState> _inboundMessages;
@@ -200,9 +200,9 @@ public class PeerState {
/** when the retransmit timer is about to trigger */
private long _retransmitTimer;
private final UDPTransport _transport;
/** have we migrated away from this peer to another newer one? */
private volatile boolean _dead;
@@ -233,10 +233,10 @@ public class PeerState {
/**
* IPv4 Min MTU
*
* 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message,
* 522 fragment bytes, which is enough to send a tunnel data message in 2
* packets. A tunnel data message sent over the wire is 1044 bytes, meaning
* we need 522 fragment bytes to fit it in 2 packets - add 46 for SSU, 20
* 596 gives us 588 IP byes, 568 UDP bytes, and with an SSU data message,
* 522 fragment bytes, which is enough to send a tunnel data message in 2
* packets. A tunnel data message sent over the wire is 1044 bytes, meaning
* we need 522 fragment bytes to fit it in 2 packets - add 46 for SSU, 20
* for UDP, and 8 for IP, giving us 596. round up to mod 16, giving a total
* of 608
*
@@ -281,19 +281,19 @@ public class PeerState {
* and so PacketBuilder.buildPacket() works correctly.
*/
public static final int LARGE_MTU = 1484;
/**
* Max of IPv4 and IPv6 max MTUs
* @since 0.9.28
*/
public static final int MAX_MTU = Math.max(LARGE_MTU, MAX_IPV6_MTU);
private static final int MIN_RTO = 1000;
private static final int INIT_RTO = 1000;
private static final int INIT_RTT = 0;
private static final int MAX_RTO = 60*1000;
private static final int CLOCK_SKEW_FUDGE = (ACKSender.ACK_FREQUENCY * 2) / 3;
/**
* The max number of acks we save to send as duplicates
*/
@@ -309,7 +309,7 @@ public class PeerState {
private static final long RESEND_ACK_TIMEOUT = 5*60*1000;
/**
* @param rtt from the EstablishState, or 0 if not available
*/
@@ -347,11 +347,11 @@ public class PeerState {
_rto = INIT_RTO;
_rtt = INIT_RTT;
if (rtt > 0)
if (rtt > 0)
recalculateTimeouts(rtt);
else
_rttDeviation = _rtt;
_inboundMessages = new HashMap<Long, InboundMessageState>(8);
_outboundMessages = new CachedIteratorCollection<OutboundMessageState>();
//_outboundQueue = new CoDelPriorityBlockingQueue(ctx, "UDP-PeerState", 32);
@@ -363,8 +363,8 @@ public class PeerState {
_remoteHostId = new RemoteHostId(remoteIP, remotePort);
_bwEstimator = new SimpleBandwidthEstimator(ctx, this);
}
/**
/**
* Caller should sync; UDPTransport must remove and add to peersByRemoteHost map
* @since 0.9.3
*/
@@ -382,28 +382,28 @@ public class PeerState {
* is established.
*/
public Hash getRemotePeer() { return _remotePeer; }
/**
/**
* The AES key used to verify packets, set only after the connection is
* established.
* established.
*/
SessionKey getCurrentMACKey() { return _currentMACKey; }
/**
* The AES key used to encrypt/decrypt packets, set only after the
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*/
SessionKey getCurrentCipherKey() { return _currentCipherKey; }
/**
* The pending AES key for verifying packets if we are rekeying the
/**
* The pending AES key for verifying packets if we are rekeying the
* connection, or null if we are not in the process of rekeying.
*
* @return null always, rekeying unimplemented
*/
SessionKey getNextMACKey() { return _nextMACKey; }
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
/**
* The pending AES key for encrypting/decrypting packets if we are
* rekeying the connection, or null if we are not in the process
* of rekeying.
*
* @return null always, rekeying unimplemented
@@ -467,10 +467,10 @@ public class PeerState {
/** what port is the peer sending and receiving packets on? */
public int getRemotePort() { return _remotePort; }
/**
/**
* if we are serving as an introducer to them, this is the the tag that
* they can publish that, when presented to us, will cause us to send
* a relay introduction to the current peer
* a relay introduction to the current peer
* @return 0 (no relay) if unset previously
*/
public long getWeRelayToThemAs() { return _weRelayToThemAs; }
@@ -492,14 +492,14 @@ public class PeerState {
*/
public int getReceiveMTU() { return _mtuReceive; }
/**
/**
* The AES key used to verify packets, set only after the connection is
* established.
* established.
*/
void setCurrentMACKey(SessionKey key) { _currentMACKey = key; }
/**
* The AES key used to encrypt/decrypt packets, set only after the
* The AES key used to encrypt/decrypt packets, set only after the
* connection is established.
*/
void setCurrentCipherKey(SessionKey key) { _currentCipherKey = key; }
@@ -510,23 +510,23 @@ public class PeerState {
* A positive number means our clock is ahead of theirs.
* @param skew milliseconds, NOT adjusted for RTT.
*/
void adjustClockSkew(long skew) {
void adjustClockSkew(long skew) {
// the real one-way delay is much less than RTT / 2, due to ack delays,
// so add a fudge factor
long actualSkew = skew + CLOCK_SKEW_FUDGE - (_rtt / 2);
long actualSkew = skew + CLOCK_SKEW_FUDGE - (_rtt / 2);
// First time...
// This is important because we need accurate
// skews right from the beginning, since the median is taken
// and fed to the timestamper. Lots of connections only send a few packets.
if (_packetsReceived <= 1) {
synchronized(_clockSkewLock) {
_clockSkew = actualSkew;
_clockSkew = actualSkew;
}
return;
}
double adj = 0.1 * actualSkew;
double adj = 0.1 * actualSkew;
synchronized(_clockSkewLock) {
_clockSkew = (long) (0.9*_clockSkew + adj);
_clockSkew = (long) (0.9*_clockSkew + adj);
}
}
@@ -561,14 +561,14 @@ public class PeerState {
*/
public synchronized int getReceiveBps() { return _receiveBps; }
int incrementConsecutiveFailedSends() {
int incrementConsecutiveFailedSends() {
synchronized(_outboundMessages) {
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
}
@@ -579,11 +579,11 @@ public class PeerState {
long lastActivity = Math.max(_lastReceiveTime, _lastSendFullyTime);
return now - lastActivity;
}
/**
/**
* Decrement the remaining bytes in the current period's window,
* returning true if the full size can be decremented, false if it
* cannot. If it is not decremented, the window size remaining is
* cannot. If it is not decremented, the window size remaining is
* not adjusted at all.
*
* Caller should synch
@@ -602,22 +602,22 @@ public class PeerState {
if (size > 0) {
if (messagePushCount == 0) {
_context.statManager().addRateData("udp.allowConcurrentActive", _outboundMessages.size(), _concurrentMessagesAllowed);
if (_consecutiveRejections > 0)
if (_consecutiveRejections > 0)
_context.statManager().addRateData("udp.rejectConcurrentSequence", _consecutiveRejections, _outboundMessages.size());
_consecutiveRejections = 0;
}
_sendWindowBytesRemaining -= size;
_sendWindowBytesRemaining -= size;
_lastSendTime = now;
return true;
} else {
return false;
}
}
/**
/**
* if we are serving as an introducer to them, this is the the tag that
* they can publish that, when presented to us, will cause us to send
* a relay introduction to the current peer
* a relay introduction to the current peer
* @param tag 1 to Integer.MAX_VALUE, or 0 if relaying disabled
*/
void setWeRelayToThemAs(long tag) { _weRelayToThemAs = tag; }
@@ -677,8 +677,8 @@ public class PeerState {
/** set the last time we used them as an introducer to now */
void setIntroducerTime() { _lastIntroducerTime = _context.clock().now(); }
/**
/**
* We received the message specified completely.
* @param bytes if less than or equal to zero, message is a duplicate.
*/
@@ -689,7 +689,7 @@ public class PeerState {
} else {
_packetsReceivedDuplicate++;
}
long now = _context.clock().now();
long duration = now - _receivePeriodBegin;
if (duration >= 1000) {
@@ -697,18 +697,18 @@ public class PeerState {
_receiveBytes = 0;
_receivePeriodBegin = now;
}
if (_wantACKSendSince <= 0)
_wantACKSendSince = now;
_currentACKs.add(messageId);
}
void messagePartiallyReceived() {
if (_wantACKSendSince <= 0)
_wantACKSendSince = _context.clock().now();
}
/**
/**
* Fetch the internal id (Long) to InboundMessageState for incomplete inbound messages.
* Access to this map must be synchronized explicitly!
*/
@@ -720,9 +720,9 @@ public class PeerState {
* try to send them any messages (and don't receive any messages from them either)
*
*/
int expireInboundMessages() {
int expireInboundMessages() {
int rv = 0;
synchronized (_inboundMessages) {
for (Iterator<InboundMessageState> iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
InboundMessageState state = iter.next();
@@ -742,10 +742,10 @@ public class PeerState {
}
return rv;
}
/**
* either they told us to back off, or we had to resend to get
* the data through.
/**
* either they told us to back off, or we had to resend to get
* the data through.
* Caller should synch on this
*/
private void congestionOccurred() {
@@ -775,10 +775,10 @@ public class PeerState {
" SST: " + oldsst + " -> " + _slowStartThreshold +
" BWE: " + DataHelper.formatSize2Decimal((long) (bwe * 1000), false) + "bps");
}
/**
* Grab a list of message ids (Long) that we want to send to the remote
* peer, regardless of the packet size, but don't remove it from our
* peer, regardless of the packet size, but don't remove it from our
* "want to send" list. If the message id is transmitted to the peer,
* removeACKMessage(Long) should be called.
*
@@ -797,7 +797,7 @@ public class PeerState {
/**
* Grab a list of message ids (Long) that we want to send to the remote
* peer, regardless of the packet size, but don't remove it from our
* peer, regardless of the packet size, but don't remove it from our
* "want to send" list.
*
* The returned list contains
@@ -850,10 +850,10 @@ public class PeerState {
// should we only do this if removed?
_lastACKSend = _context.clock().now();
}
/**
* grab a list of ACKBitfield instances, some of which may fully
* ACK a message while others may only partially ACK a message.
/**
* grab a list of ACKBitfield instances, some of which may fully
* ACK a message while others may only partially ACK a message.
* the values returned are limited in size so that they will fit within
* the peer's current MTU as an ACK - as such, not all messages may be
* ACKed with this call. Be sure to check getWantedACKSendSince() which
@@ -935,7 +935,7 @@ public class PeerState {
int partialIncluded = 0;
if (bytesRemaining > 4) {
// ok, there's room to *try* to fit in some partial ACKs, so
// we should try to find some packets to partially ACK
// we should try to find some packets to partially ACK
// (preferably the ones which have the most received fragments)
List<ACKBitfield> partial = new ArrayList<ACKBitfield>();
fetchPartialACKs(partial);
@@ -948,7 +948,7 @@ public class PeerState {
bytesRemaining -= bytes + 4;
partialIncluded++;
} else {
// continue on to another partial, in case there's a
// continue on to another partial, in case there's a
// smaller one that will fit
}
}
@@ -960,7 +960,7 @@ public class PeerState {
_context.statManager().addRateData("udp.sendACKPartial", partialIncluded, rv.size() - partialIncluded);
return rv;
}
/**
* @param rv out parameter, populated with true partial ACKBitfields.
* no full bitfields are included.
@@ -970,7 +970,7 @@ public class PeerState {
int curState = 0;
synchronized (_inboundMessages) {
int numMessages = _inboundMessages.size();
if (numMessages <= 0)
if (numMessages <= 0)
return;
// todo: make this a list instead of a map, so we can iterate faster w/out the memory overhead?
for (Iterator<InboundMessageState> iter = _inboundMessages.values().iterator(); iter.hasNext(); ) {
@@ -996,7 +996,7 @@ public class PeerState {
}
}
}
/**
* A dummy "partial" ack which represents a full ACK of a message
*/
@@ -1021,7 +1021,7 @@ public class PeerState {
@Override
public String toString() { return "Full ACK " + _msgId; }
}
/**
* We sent a message which was ACKed containing the given # of bytes.
* Caller should synch on this
@@ -1032,7 +1032,7 @@ public class PeerState {
if (numSends < 2) {
if (_context.random().nextInt(_concurrentMessagesAllowed) <= 0)
_concurrentMessagesAllowed++;
if (_sendWindowBytes <= _slowStartThreshold) {
_sendWindowBytes += bytesACKed;
_sendWindowBytesRemaining += bytesACKed;
@@ -1055,17 +1055,17 @@ public class PeerState {
_sendWindowBytes = MAX_SEND_WINDOW_BYTES;
_lastReceiveTime = _context.clock().now();
_lastSendFullyTime = _lastReceiveTime;
_sendWindowBytesRemaining += bytesACKed;
if (_sendWindowBytesRemaining > _sendWindowBytes)
_sendWindowBytesRemaining = _sendWindowBytes;
if (numSends < 2) {
// caller synchs
recalculateTimeouts(lifetime);
adjustMTU();
}
if (!anyPending) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " nothing pending, cancelling timer");
@@ -1096,7 +1096,7 @@ public class PeerState {
/** This is the value specified in RFC 2988 */
private static final float RTT_DAMPENING = 0.125f;
/**
* Adjust the tcp-esque timeouts.
* Caller should synch on this
@@ -1117,7 +1117,7 @@ public class PeerState {
// _log.debug("Recalculating timeouts w/ lifetime=" + lifetime + ": rtt=" + _rtt
// + " rttDev=" + _rttDeviation + " rto=" + _rto);
}
/**
* Caller should synch on this
*/
@@ -1153,9 +1153,9 @@ public class PeerState {
if (mtu < _mtu)
_mtu = mtu;
}
/** we are resending a packet, so lets jack up the rto */
synchronized void messageRetransmitted(int packets) {
synchronized void messageRetransmitted(int packets) {
_context.statManager().addRateData("udp.congestionOccurred", _sendWindowBytes);
_context.statManager().addRateData("udp.congestedRTO", _rto, _rttDeviation);
_packetsRetransmitted += packets;
@@ -1163,8 +1163,8 @@ public class PeerState {
adjustMTU();
}
synchronized void packetsTransmitted(int packets) {
_packetsTransmitted += packets;
synchronized void packetsTransmitted(int packets) {
_packetsTransmitted += packets;
}
/** how long does it usually take to get a message ACKed? */
@@ -1173,7 +1173,7 @@ public class PeerState {
public synchronized int getRTO() { return _rto; }
/** how skewed are the measured RTTs? */
public synchronized int getRTTDeviation() { return _rttDeviation; }
/**
* I2NP messages sent.
* Does not include duplicates.
@@ -1184,7 +1184,7 @@ public class PeerState {
return _messagesSent;
}
}
/**
* I2NP messages received.
* As of 0.9.24, does not include duplicates.
@@ -1205,11 +1205,11 @@ public class PeerState {
private static final int IPV6_OVERHEAD_SIZE = PacketBuilder.IPV6_HEADER_SIZE + PacketBuilder.UDP_HEADER_SIZE +
UDPPacket.MAC_SIZE + UDPPacket.IV_SIZE;
/**
/**
* @param size not including IP header, UDP header, MAC or IV
*/
synchronized void packetReceived(int size) {
_packetsReceived++;
synchronized void packetReceived(int size) {
_packetsReceived++;
int minMTU;
if (_remoteIP.length == 4) {
size += OVERHEAD_SIZE;
@@ -1228,8 +1228,8 @@ public class PeerState {
_mtuReceive = size;
}
}
/**
/**
* We received a backoff request, so cut our send window.
* NOTE: ECN sending is unimplemented, this is never called.
*/
@@ -1241,11 +1241,11 @@ public class PeerState {
_currentSecondECNReceived = true;
_lastReceiveTime = _context.clock().now();
}
void dataReceived() {
_lastReceiveTime = _context.clock().now();
}
/** when did we last send an ACK to the peer? */
public long getLastACKSend() { return _lastACKSend; }
@@ -1273,10 +1273,10 @@ public class PeerState {
*/
private int countMaxACKData() {
return Math.min(PacketBuilder.ABSOLUTE_MAX_ACKS * 4,
_mtu
_mtu
- (_remoteIP.length == 4 ? PacketBuilder.IP_HEADER_SIZE : PacketBuilder.IPV6_HEADER_SIZE)
- PacketBuilder.UDP_HEADER_SIZE
- UDPPacket.IV_SIZE
- UDPPacket.IV_SIZE
- UDPPacket.MAC_SIZE
- 1 // type flag
- 4 // timestamp
@@ -1287,7 +1287,7 @@ public class PeerState {
/** @return non-null */
RemoteHostId getRemoteHostId() { return _remoteHostId; }
/**
* TODO should this use a queue, separate from the list of msgs pending an ack?
* TODO bring back tail drop?
@@ -1296,7 +1296,7 @@ public class PeerState {
* TODO SSU does not support isBacklogged() now
*/
void add(OutboundMessageState state) {
if (_dead) {
if (_dead) {
_transport.failed(state, false);
return;
}
@@ -1343,7 +1343,7 @@ public class PeerState {
// so the ACKSender will drop this peer from its queue
_wantACKSendSince = -1;
}
/**
* @return number of active outbound messages remaining (unsynchronized)
*/
@@ -1363,7 +1363,7 @@ public class PeerState {
*/
public boolean getMayDisconnect() { return _mayDisconnect; }
/**
* Expire / complete any outbound messages
* High usage -
@@ -1406,7 +1406,7 @@ public class PeerState {
} // end iterating over outbound messages
rv = _outboundMessages.size();
}
for (int i = 0; succeeded != null && i < succeeded.size(); i++) {
OutboundMessageState state = succeeded.get(i);
_transport.succeeded(state);
@@ -1414,7 +1414,7 @@ public class PeerState {
if (msg != null)
msg.timestamp("sending complete");
}
if (failed != null) {
int failedSize = 0;
int failedCount = 0;
@@ -1448,10 +1448,10 @@ public class PeerState {
}
}
}
return rv + _outboundQueue.size();
}
/**
* Pick one or more messages we want to send and allocate them out of our window
* Adjusts the retransmit timer if necessary.
@@ -1536,11 +1536,11 @@ public class PeerState {
if (rv == null)
_log.debug("Nothing to send (BW) to " + _remotePeer + ", with " + _outboundMessages.size() +
" / " + _outboundQueue.size() + " remaining");
else
else
_log.debug(_remotePeer + " ran out of BW, but managed to send " + rv.size());
}
return rv;
}
}
}
// fall through to new messages
}
@@ -1581,7 +1581,7 @@ public class PeerState {
return rv;
}
/**
* High usage -
* OutboundMessageFragments.getNextVolley() calls this 3rd, if allocateSend() returned null.
@@ -1595,7 +1595,7 @@ public class PeerState {
int rv = Integer.MAX_VALUE;
if (_dead) return rv;
synchronized(this) {
if (_retransmitTimer >= now)
if (_retransmitTimer >= now)
return (int) (_retransmitTimer - now);
}
return rv;
@@ -1640,17 +1640,17 @@ public class PeerState {
return (_remoteIP.length == 4 ? PacketBuilder.MIN_DATA_PACKET_OVERHEAD : PacketBuilder.MIN_IPV6_DATA_PACKET_OVERHEAD) +
MIN_ACK_SIZE;
}
/**
* Locks this.
* Locks this
*/
private boolean locked_shouldSend(OutboundMessageState state, long now) {
synchronized(this) {
if (allocateSendingBytes(state, now)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(_remotePeer + " Allocation allowed with "
+ getSendWindowBytesRemaining()
+ "/" + getSendWindowBytes()
_log.debug(_remotePeer + " Allocation allowed with "
+ getSendWindowBytesRemaining()
+ "/" + getSendWindowBytes()
+ " remaining"
+ " for message " + state.getMessageId() + ": " + state);
if (state.getPushCount() == 0)
@@ -1666,7 +1666,7 @@ public class PeerState {
}
}
}
/**
* A full ACK was received.
* TODO if messages awaiting ack were a HashMap&lt;Long, OutboundMessageState&gt; this would be faster.
@@ -1694,7 +1694,7 @@ public class PeerState {
}
anyPending = !_outboundMessages.isEmpty();
}
if (state != null) {
int numSends = state.getMaxSends();
long lifetime = state.getLifetime();
@@ -1724,7 +1724,7 @@ public class PeerState {
}
return state != null;
}
/**
* A partial ACK was received. This is much less common than full ACKs.
*
@@ -1733,12 +1733,12 @@ public class PeerState {
boolean acked(ACKBitfield bitfield) {
if (_dead)
return false;
final long messageId = bitfield.getMessageId();
if (bitfield.receivedComplete()) {
return acked(messageId);
}
OutboundMessageState state = null;
boolean isComplete = false;
boolean anyPending;
@@ -1767,13 +1767,13 @@ public class PeerState {
}
anyPending = !_outboundMessages.isEmpty();
}
if (state != null) {
int numSends = state.getMaxSends();
int numACKed = bitfield.ackCount();
_context.statManager().addRateData("udp.partialACKReceived", numACKed);
long lifetime = state.getLifetime();
if (isComplete) {
_context.statManager().addRateData("udp.sendConfirmTime", lifetime);
@@ -1815,7 +1815,7 @@ public class PeerState {
return false;
}
}
/**
* Transfer the basic activity/state from the old peer to the current peer
*
@@ -1828,7 +1828,7 @@ public class PeerState {
_slowStartThreshold = oldPeer._slowStartThreshold;
_sendWindowBytes = oldPeer._sendWindowBytes;
oldPeer._dead = true;
List<Long> tmp = new ArrayList<Long>();
// AIOOBE from concurrent access
//tmp.addAll(oldPeer._currentACKs);
@@ -1840,7 +1840,7 @@ public class PeerState {
if (!_dead) {
_currentACKs.addAll(tmp);
}
List<ResendACK> tmp3 = new ArrayList<ResendACK>();
tmp3.addAll(oldPeer._currentACKsResend);
oldPeer._currentACKsResend.clear();
@@ -1848,7 +1848,7 @@ public class PeerState {
if (!_dead) {
_currentACKsResend.addAll(tmp3);
}
Map<Long, InboundMessageState> msgs = new HashMap<Long, InboundMessageState>();
synchronized (oldPeer._inboundMessages) {
msgs.putAll(oldPeer._inboundMessages);
@@ -1858,7 +1858,7 @@ public class PeerState {
synchronized (_inboundMessages) { _inboundMessages.putAll(msgs); }
}
msgs.clear();
List<OutboundMessageState> tmp2 = new ArrayList<OutboundMessageState>();
OutboundMessageState retransmitter = null;
synchronized (oldPeer._outboundMessages) {
@@ -1898,9 +1898,9 @@ public class PeerState {
/*
public int hashCode() {
if (_remotePeer != null)
if (_remotePeer != null)
return _remotePeer.hashCode();
else
else
return super.hashCode();
}
public boolean equals(Object o) {
@@ -1917,7 +1917,7 @@ public class PeerState {
}
}
*/
@Override
public String toString() {
StringBuilder buf = new StringBuilder(256);