diff --git a/core/java/src/net/i2p/util/ConcurrentHashSet.java b/core/java/src/net/i2p/util/ConcurrentHashSet.java new file mode 100644 index 0000000000000000000000000000000000000000..b7bea9ee103eaea2a0c351e5361f950b9aed397e --- /dev/null +++ b/core/java/src/net/i2p/util/ConcurrentHashSet.java @@ -0,0 +1,53 @@ +package net.i2p.util; + +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Implement on top of a ConcurrentHashMap with a dummy value. + * + * @author zzz + */ +public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E> { + private static final Object DUMMY = new Object(); + private Map<E, Object> _map; + + public ConcurrentHashSet() { + _map = new ConcurrentHashMap(); + } + public ConcurrentHashSet(int capacity) { + _map = new ConcurrentHashMap(capacity); + } + + public boolean add(E o) { + return _map.put(o, DUMMY) == null; + } + + public void clear() { + _map.clear(); + } + + public boolean contains(Object o) { + return _map.containsKey(o); + } + + public boolean isEmpty() { + return _map.isEmpty(); + } + + public boolean remove(Object o) { + return _map.remove(o) != null; + } + + public int size() { + return _map.size(); + } + + public Iterator<E> iterator() { + return _map.keySet().iterator(); + } +} diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index af30a5d3da96646efead08542c41c35112a6902d..4f979c5c937a28a5638ebbf46a3398e1a33d3e80 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -14,7 +14,6 @@ import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -36,6 +35,7 @@ import net.i2p.data.i2cp.SessionId; import net.i2p.router.Job; import net.i2p.router.JobImpl; import net.i2p.router.RouterContext; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.I2PThread; import net.i2p.util.Log; import net.i2p.util.RandomSource; @@ -66,7 +66,7 @@ public class ClientConnectionRunner { /** currently allocated leaseSet, or null if none is allocated */ private LeaseSet _currentLeaseSet; /** set of messageIds created but not yet ACCEPTED */ - private Set _acceptedPending; + private Set<MessageId> _acceptedPending; /** thingy that does stuff */ private I2CPMessageReader _reader; /** @@ -91,7 +91,7 @@ public class ClientConnectionRunner { _config = null; _messages = new ConcurrentHashMap(); _alreadyProcessed = new ArrayList(); - _acceptedPending = new HashSet(); + _acceptedPending = new ConcurrentHashSet(); _dead = false; } @@ -242,19 +242,8 @@ public class ClientConnectionRunner { long expiration = 0; if (message instanceof SendMessageExpiresMessage) expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime(); - long beforeLock = _context.clock().now(); - long inLock = 0; - synchronized (_acceptedPending) { - inLock = _context.clock().now(); - _acceptedPending.add(id); - } - long afterLock = _context.clock().now(); - - if (_log.shouldLog(Log.DEBUG)) { - _log.warn("distributeMessage.locking took: " + (afterLock-beforeLock) - + " overall, synchronized took " + (inLock - beforeLock)); - } - + _acceptedPending.add(id); + if (_log.shouldLog(Log.DEBUG)) _log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size [" + payload.getSize() + "]" + " for session [" + _sessionId.getSessionId() @@ -291,18 +280,7 @@ public class ClientConnectionRunner { status.setStatus(MessageStatusMessage.STATUS_SEND_ACCEPTED); try { doSend(status); - long beforeLock = _context.clock().now(); - long inLock = 0; - synchronized (_acceptedPending) { - inLock = _context.clock().now(); - _acceptedPending.remove(id); - } - long afterLock = _context.clock().now(); - - if (afterLock - beforeLock > 50) { - _log.warn("ackSendMessage.locking took too long: " + (afterLock-beforeLock) - + " overall, synchronized took " + (inLock - beforeLock)); - } + _acceptedPending.remove(id); } catch (I2CPMessageException ime) { _log.error("Error writing out the message status message: " + ime); } @@ -504,28 +482,7 @@ public class ClientConnectionRunner { */ private boolean alreadyAccepted(MessageId id) { if (_dead) return false; - boolean isPending = false; - int pending = 0; - String buf = null; - long beforeLock = _context.clock().now(); - long inLock = 0; - synchronized (_acceptedPending) { - inLock = _context.clock().now(); - if (_acceptedPending.contains(id)) - isPending = true; - pending = _acceptedPending.size(); - buf = _acceptedPending.toString(); - } - long afterLock = _context.clock().now(); - - if (afterLock - beforeLock > 50) { - _log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock) - + " overall, synchronized took " + (inLock - beforeLock)); - } - if (pending >= 1) { - _log.warn("Pending acks: " + pending + ": " + buf); - } - return !isPending; + return !_acceptedPending.contains(id); } /**