diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index b5f4e8ab3bf30af02355969fd08e7e35860eb48a..00da88aa222b1b9b3ebd07c283843e42c3b79b25 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -14,6 +14,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; +import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -81,7 +82,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** class that generates new messages */ protected I2CPMessageProducer _producer; /** map of Long --> MessagePayloadMessage */ - private Map _availableMessages; + private Map<Long, MessagePayloadMessage> _availableMessages; protected I2PClientMessageHandlerMap _handlerMap; @@ -139,7 +140,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _closing = false; _producer = new I2CPMessageProducer(context); _availabilityNotifier = new AvailabilityNotifier(); - _availableMessages = new HashMap(); + _availableMessages = new ConcurrentHashMap(); try { readDestination(destKeyStream); } catch (DataFormatException dfe) { @@ -152,7 +153,6 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa loadConfig(options); _sessionId = null; _leaseSet = null; - _context.statManager().createRateStat("client.availableMessages", "How many messages are available for the current client", "ClientMessages", new long[] { 60*1000, 10*60*1000 }); } /** @@ -309,15 +309,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * */ public byte[] receiveMessage(int msgId) throws I2PSessionException { - int remaining = 0; - MessagePayloadMessage msg = null; - synchronized (_availableMessages) { - msg = (MessagePayloadMessage) _availableMessages.remove(new Long(msgId)); - remaining = _availableMessages.size(); - } - _context.statManager().addRateData("client.availableMessages", remaining, 0); + MessagePayloadMessage msg = _availableMessages.remove(new Long(msgId)); if (msg == null) { - _log.error("Receive message " + msgId + " had no matches, remaining=" + remaining); + _log.error("Receive message " + msgId + " had no matches"); return null; } updateActivity(); @@ -357,12 +351,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ public void addNewMessage(MessagePayloadMessage msg) { Long mid = new Long(msg.getMessageId()); - int avail = 0; - synchronized (_availableMessages) { - _availableMessages.put(mid, msg); - avail = _availableMessages.size(); - } - _context.statManager().addRateData("client.availableMessages", avail, 0); + _availableMessages.put(mid, msg); long id = msg.getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { @@ -382,16 +371,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa public VerifyUsage(Long id) { _msgId = id; } public void timeReached() { - MessagePayloadMessage removed = null; - int remaining = 0; - synchronized (_availableMessages) { - removed = (MessagePayloadMessage)_availableMessages.remove(_msgId); - remaining = _availableMessages.size(); - } - if (removed != null) { - _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed + ": remaining: " + remaining); - _context.statManager().addRateData("client.availableMessages", remaining, 0); - } + MessagePayloadMessage removed = _availableMessages.remove(_msgId); + if (removed != null && !isClosed()) + _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed); } }