UDP PeerState findbugs volatile/atomic/synch

This commit is contained in:
zzz
2013-11-23 14:37:33 +00:00
parent 74f2fd06cc
commit f42ac71fe0

View File

@@ -11,6 +11,7 @@ import java.util.Map;
import java.util.Set;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import net.i2p.data.Hash;
import net.i2p.data.SessionKey;
@@ -119,9 +120,9 @@ class PeerState {
*/
//private boolean _remoteWantsPreviousACKs;
/** how many bytes should we send to the peer in a second */
private volatile int _sendWindowBytes;
private int _sendWindowBytes;
/** how many bytes can we send to the peer in the current second */
private volatile int _sendWindowBytesRemaining;
private int _sendWindowBytesRemaining;
private long _lastSendRefill;
private int _sendBps;
private int _sendBytes;
@@ -225,13 +226,13 @@ class PeerState {
/** Make sure a 4229 byte TunnelBuildMessage can be sent in one volley with small MTU */
private static final int MIN_CONCURRENT_MSGS = 8;
/** how many concurrent outbound messages do we allow throws OutboundMessageFragments to send */
private volatile int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
private int _concurrentMessagesAllowed = MIN_CONCURRENT_MSGS;
/**
* how many outbound messages are currently being transmitted. Not thread safe, as we're not strict
*/
private volatile int _concurrentMessagesActive = 0;
private int _concurrentMessagesActive;
/** how many concurrency rejections have we had in a row */
private volatile int _consecutiveRejections = 0;
private int _consecutiveRejections;
/** is it inbound? **/
private final boolean _isInbound;
/** Last time it was made an introducer **/
@@ -436,9 +437,19 @@ class PeerState {
//public boolean getRemoteWantsPreviousACKs() { return _remoteWantsPreviousACKs; }
/** how many bytes should we send to the peer in a second */
public int getSendWindowBytes() { return _sendWindowBytes; }
public int getSendWindowBytes() {
synchronized(_outboundMessages) {
return _sendWindowBytes;
}
}
/** how many bytes can we send to the peer in the current second */
public int getSendWindowBytesRemaining() { return _sendWindowBytesRemaining; }
public int getSendWindowBytesRemaining() {
synchronized(_outboundMessages) {
return _sendWindowBytesRemaining;
}
}
/** what IP is the peer sending and receiving packets on? */
public byte[] getRemoteIP() { return _remoteIP; }
@@ -580,20 +591,24 @@ class PeerState {
/** return the smoothed send transfer rate */
public int getSendBps() { return _sendBps; }
public int getReceiveBps() { return _receiveBps; }
public int incrementConsecutiveFailedSends() {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
synchronized(_outboundMessages) {
_concurrentMessagesActive--;
if (_concurrentMessagesActive < 0)
_concurrentMessagesActive = 0;
//long now = _context.clock().now()/(10*1000);
//if (_lastFailedSendPeriod >= now) {
// // ignore... too fast
//} else {
// _lastFailedSendPeriod = now;
_consecutiveFailedSends++;
//}
return _consecutiveFailedSends;
}
}
public long getInactivityTime() {
long now = _context.clock().now();
long lastActivity = Math.max(_lastReceiveTime, _lastSendFullyTime);
@@ -620,15 +635,17 @@ class PeerState {
* returning true if the full size can be decremented, false if it
* cannot. If it is not decremented, the window size remaining is
* not adjusted at all.
*
* Caller should synch
*/
public boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
private boolean allocateSendingBytes(int size, int messagePushCount) { return allocateSendingBytes(size, false, messagePushCount); }
public boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
//private boolean allocateSendingBytes(int size, boolean isForACK) { return allocateSendingBytes(size, isForACK, -1); }
/**
* Caller should synch
*/
public boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
private boolean allocateSendingBytes(int size, boolean isForACK, int messagePushCount) {
long now = _context.clock().now();
long duration = now - _lastSendRefill;
if (duration >= 1000) {
@@ -694,9 +711,25 @@ class PeerState {
****/
public int getSlowStartThreshold() { return _slowStartThreshold; }
public int getConcurrentSends() { return _concurrentMessagesActive; }
public int getConcurrentSendWindow() { return _concurrentMessagesAllowed; }
public int getConsecutiveSendRejections() { return _consecutiveRejections; }
public int getConcurrentSends() {
synchronized(_outboundMessages) {
return _concurrentMessagesActive;
}
}
public int getConcurrentSendWindow() {
synchronized(_outboundMessages) {
return _concurrentMessagesAllowed;
}
}
public int getConsecutiveSendRejections() {
synchronized(_outboundMessages) {
return _consecutiveRejections;
}
}
public boolean isInbound() { return _isInbound; }
/** @since IPv6 */
@@ -1674,6 +1707,8 @@ class PeerState {
/**
* Have 3 return values, because if allocateSendingBytes() returns false,
* then allocateSend() can stop iterating
*
* Caller should synch
*/
private ShouldSend locked_shouldSend(OutboundMessageState state) {
long now = _context.clock().now();