I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit d236b9b4 authored by zzz's avatar zzz
Browse files

more concurrent

parent 7ec29b0c
No related branches found
No related tags found
No related merge requests found
package net.i2p.util;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* Implement on top of a ConcurrentHashMap with a dummy value.
*
* @author zzz
*/
public class ConcurrentHashSet<E> extends AbstractSet<E> implements Set<E> {
private static final Object DUMMY = new Object();
private Map<E, Object> _map;
public ConcurrentHashSet() {
_map = new ConcurrentHashMap();
}
public ConcurrentHashSet(int capacity) {
_map = new ConcurrentHashMap(capacity);
}
public boolean add(E o) {
return _map.put(o, DUMMY) == null;
}
public void clear() {
_map.clear();
}
public boolean contains(Object o) {
return _map.containsKey(o);
}
public boolean isEmpty() {
return _map.isEmpty();
}
public boolean remove(Object o) {
return _map.remove(o) != null;
}
public int size() {
return _map.size();
}
public Iterator<E> iterator() {
return _map.keySet().iterator();
}
}
......@@ -14,7 +14,6 @@ import java.net.Socket;
import java.util.concurrent.ConcurrentHashMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
......@@ -36,6 +35,7 @@ import net.i2p.data.i2cp.SessionId;
import net.i2p.router.Job;
import net.i2p.router.JobImpl;
import net.i2p.router.RouterContext;
import net.i2p.util.ConcurrentHashSet;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
import net.i2p.util.RandomSource;
......@@ -66,7 +66,7 @@ public class ClientConnectionRunner {
/** currently allocated leaseSet, or null if none is allocated */
private LeaseSet _currentLeaseSet;
/** set of messageIds created but not yet ACCEPTED */
private Set _acceptedPending;
private Set<MessageId> _acceptedPending;
/** thingy that does stuff */
private I2CPMessageReader _reader;
/**
......@@ -91,7 +91,7 @@ public class ClientConnectionRunner {
_config = null;
_messages = new ConcurrentHashMap();
_alreadyProcessed = new ArrayList();
_acceptedPending = new HashSet();
_acceptedPending = new ConcurrentHashSet();
_dead = false;
}
......@@ -242,19 +242,8 @@ public class ClientConnectionRunner {
long expiration = 0;
if (message instanceof SendMessageExpiresMessage)
expiration = ((SendMessageExpiresMessage) message).getExpiration().getTime();
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_acceptedPending) {
inLock = _context.clock().now();
_acceptedPending.add(id);
}
long afterLock = _context.clock().now();
if (_log.shouldLog(Log.DEBUG)) {
_log.warn("distributeMessage.locking took: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
_acceptedPending.add(id);
if (_log.shouldLog(Log.DEBUG))
_log.debug("** Receiving message [" + id.getMessageId() + "] with payload of size ["
+ payload.getSize() + "]" + " for session [" + _sessionId.getSessionId()
......@@ -291,18 +280,7 @@ public class ClientConnectionRunner {
status.setStatus(MessageStatusMessage.STATUS_SEND_ACCEPTED);
try {
doSend(status);
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_acceptedPending) {
inLock = _context.clock().now();
_acceptedPending.remove(id);
}
long afterLock = _context.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("ackSendMessage.locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
_acceptedPending.remove(id);
} catch (I2CPMessageException ime) {
_log.error("Error writing out the message status message: " + ime);
}
......@@ -504,28 +482,7 @@ public class ClientConnectionRunner {
*/
private boolean alreadyAccepted(MessageId id) {
if (_dead) return false;
boolean isPending = false;
int pending = 0;
String buf = null;
long beforeLock = _context.clock().now();
long inLock = 0;
synchronized (_acceptedPending) {
inLock = _context.clock().now();
if (_acceptedPending.contains(id))
isPending = true;
pending = _acceptedPending.size();
buf = _acceptedPending.toString();
}
long afterLock = _context.clock().now();
if (afterLock - beforeLock > 50) {
_log.warn("alreadyAccepted.locking took too long: " + (afterLock-beforeLock)
+ " overall, synchronized took " + (inLock - beforeLock));
}
if (pending >= 1) {
_log.warn("Pending acks: " + pending + ": " + buf);
}
return !isPending;
return !_acceptedPending.contains(id);
}
/**
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment