From 2e8fd23f2b732d16857084816ed1b6fe6ac5cbbb Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Thu, 18 Mar 2010 12:32:01 +0000 Subject: [PATCH] concurrent --- .../net/i2p/client/streaming/Connection.java | 45 ++++--- .../client/streaming/ConnectionManager.java | 122 +++++++----------- .../i2p/client/streaming/MessageHandler.java | 23 ++-- 3 files changed, 79 insertions(+), 111 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index d9dd9b4886..f825f25a72 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -6,6 +6,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; @@ -29,7 +30,7 @@ public class Connection { private long _sendStreamId; private long _receiveStreamId; private long _lastSendTime; - private long _lastSendId; + private AtomicLong _lastSendId; private boolean _resetReceived; private boolean _resetSent; private long _resetSentOn; @@ -49,7 +50,7 @@ public class Connection { private boolean _isInbound; private boolean _updatedShareOpts; /** Packet ID (Long) to PacketLocal for sent but unacked packets */ - private final Map _outboundPackets; + private final Map<Long, PacketLocal> _outboundPackets; private PacketQueue _outboundQueue; private ConnectionPacketHandler _handler; private ConnectionOptions _options; @@ -102,7 +103,7 @@ public class Connection { _options = (opts != null ? opts : new ConnectionOptions()); _outputStream.setWriteTimeout((int)_options.getWriteTimeout()); _inputStream.setReadTimeout((int)_options.getReadTimeout()); - _lastSendId = -1; + _lastSendId = new AtomicLong(-1); _nextSendTime = -1; _ackedPackets = 0; _createdOn = _context.clock().now(); @@ -137,9 +138,7 @@ public class Connection { } public long getNextOutboundPacketNum() { - synchronized (this) { - return ++_lastSendId; - } + return _lastSendId.incrementAndGet(); } void closeReceived() { @@ -175,7 +174,7 @@ public class Connection { return false; started = true; if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) || - (_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) { + (_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) { if (timeoutMs > 0) { if (timeLeft <= 0) { if (_log.shouldLog(Log.INFO)) @@ -211,10 +210,10 @@ public class Connection { void ackImmediately() { PacketLocal packet = null; synchronized (_outboundPackets) { - if (_outboundPackets.size() > 0) { + if (!_outboundPackets.isEmpty()) { // ordered, so pick the lowest to retransmit - Iterator iter = _outboundPackets.values().iterator(); - packet = (PacketLocal)iter.next(); + Iterator<PacketLocal> iter = _outboundPackets.values().iterator(); + packet = iter.next(); //iter.remove(); } } @@ -403,10 +402,10 @@ public class Connection { } } - List acked = null; + List<PacketLocal> acked = null; synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { - Long id = (Long)iter.next(); + for (Iterator<Long> iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { + Long id = iter.next(); if (id.longValue() <= ackThrough) { boolean nacked = false; if (nacks != null) { @@ -414,7 +413,7 @@ public class Connection { for (int i = 0; i < nacks.length; i++) { if (nacks[i] == id.longValue()) { nacked = true; - PacketLocal nackedPacket = (PacketLocal)_outboundPackets.get(id); + PacketLocal nackedPacket = _outboundPackets.get(id); nackedPacket.incrementNACKs(); break; // NACKed } @@ -423,7 +422,7 @@ public class Connection { if (!nacked) { // aka ACKed if (acked == null) acked = new ArrayList(1); - PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id); + PacketLocal ackedPacket = _outboundPackets.get(id); ackedPacket.ackReceived(); acked.add(ackedPacket); } @@ -433,7 +432,7 @@ public class Connection { } if (acked != null) { for (int i = 0; i < acked.size(); i++) { - PacketLocal p = (PacketLocal)acked.get(i); + PacketLocal p = acked.get(i); _outboundPackets.remove(new Long(p.getSequenceNum())); _ackedPackets++; if (p.getNumSends() > 1) { @@ -443,7 +442,7 @@ public class Connection { } } } - if ( (_outboundPackets.size() <= 0) && (_activeResends != 0) ) { + if ( (_outboundPackets.isEmpty()) && (_activeResends != 0) ) { if (_log.shouldLog(Log.INFO)) _log.info("All outbound packets acked, clearing " + _activeResends); _activeResends = 0; @@ -570,8 +569,8 @@ public class Connection { private void killOutstandingPackets() { //boolean tagsCancelled = false; synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { - PacketLocal pl = (PacketLocal)iter.next(); + for (Iterator<PacketLocal> iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { + PacketLocal pl = iter.next(); //if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) ) // tagsCancelled = true; pl.cancelled(); @@ -652,11 +651,11 @@ public class Connection { /** What was the last packet Id sent to the peer? * @return The last sent packet ID */ - public long getLastSendId() { return _lastSendId; } + public long getLastSendId() { return _lastSendId.get(); } /** Set the packet Id that was sent to a peer. * @param id The packet ID */ - public void setLastSendId(long id) { _lastSendId = id; } + public void setLastSendId(long id) { _lastSendId.set(id); } /** * Retrieve the current ConnectionOptions. @@ -783,7 +782,7 @@ public class Connection { if (_ackSinceCongestion) { _lastCongestionSeenAt = _options.getWindowSize(); _lastCongestionTime = _context.clock().now(); - _lastCongestionHighestUnacked = _lastSendId; + _lastCongestionHighestUnacked = _lastSendId.get(); _ackSinceCongestion = false; } } @@ -1022,7 +1021,7 @@ public class Connection { } if (getCloseReceivedOn() > 0) buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago"); - buf.append(" sent: ").append(1 + _lastSendId); + buf.append(" sent: ").append(1 + _lastSendId.get()); if (_inputStream != null) buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index af44c41f24..685b87b061 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -1,10 +1,10 @@ package net.i2p.client.streaming; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.I2PAppContext; import net.i2p.I2PException; @@ -32,14 +32,13 @@ public class ConnectionManager { private ConnectionPacketHandler _conPacketHandler; private TCBShare _tcbShare; /** Inbound stream ID (Long) to Connection map */ - private Map _connectionByInboundId; + private ConcurrentHashMap<Long, Connection> _connectionByInboundId; /** Ping ID (Long) to PingRequest */ - private final Map _pendingPings; + private final Map<Long, PingRequest> _pendingPings; private boolean _allowIncoming; private int _maxConcurrentStreams; private ConnectionOptions _defaultOptions; private volatile int _numWaiting; - private final Object _connectionLock; private long SoTimeout; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { @@ -48,9 +47,8 @@ public class ConnectionManager { _maxConcurrentStreams = maxConcurrent; _defaultOptions = defaultOptions; _log = _context.logManager().getLog(ConnectionManager.class); - _connectionByInboundId = new HashMap(32); - _pendingPings = new HashMap(4); - _connectionLock = new Object(); + _connectionByInboundId = new ConcurrentHashMap(32); + _pendingPings = new ConcurrentHashMap(4); _messageHandler = new MessageHandler(_context, this); _packetHandler = new PacketHandler(_context, this); _connectionHandler = new ConnectionHandler(_context, this); @@ -77,22 +75,17 @@ public class ConnectionManager { } Connection getConnectionByInboundId(long id) { - synchronized (_connectionLock) { - return (Connection)_connectionByInboundId.get(new Long(id)); - } + return _connectionByInboundId.get(Long.valueOf(id)); } /** * not guaranteed to be unique, but in case we receive more than one packet * on an inbound connection that we havent ack'ed yet... */ Connection getConnectionByOutboundId(long id) { - synchronized (_connectionLock) { - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); + for (Connection con : _connectionByInboundId.values()) { if (DataHelper.eq(con.getSendStreamId(), id)) return con; } - } return null; } @@ -135,27 +128,26 @@ public class ConnectionManager { boolean reject = false; int active = 0; int total = 0; - synchronized (_connectionLock) { - total = _connectionByInboundId.size(); - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - if ( ((Connection)iter.next()).getIsConnected() ) - active++; - } + + // just for the stat + //total = _connectionByInboundId.size(); + //for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + // if ( ((Connection)iter.next()).getIsConnected() ) + // active++; + //} if (locked_tooManyStreams()) { reject = true; } else { while (true) { - Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); + Connection oldCon = _connectionByInboundId.putIfAbsent(Long.valueOf(receiveId), con); if (oldCon == null) { break; } else { - _connectionByInboundId.put(new Long(receiveId), oldCon); // receiveId already taken, try another receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; } } } - } _context.statManager().addRateData("stream.receiveActive", active, total); @@ -179,9 +171,7 @@ public class ConnectionManager { try { con.getPacketHandler().receivePacket(synPacket, con); } catch (I2PException ie) { - synchronized (_connectionLock) { - _connectionByInboundId.remove(new Long(receiveId)); - } + _connectionByInboundId.remove(Long.valueOf(receiveId)); return null; } @@ -215,8 +205,7 @@ public class ConnectionManager { _numWaiting--; return null; } - boolean reject = false; - synchronized (_connectionLock) { + if (locked_tooManyStreams()) { // allow a full buffer of pending/waiting streams if (_numWaiting > _maxConcurrentStreams) { @@ -227,27 +216,30 @@ public class ConnectionManager { _numWaiting--; return null; } - + // no remaining streams, lets wait a bit - try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} + // got rid of the lock, so just sleep (fixme?) + // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} + try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {} } else { con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); con.setRemotePeer(peer); - while (_connectionByInboundId.containsKey(new Long(receiveId))) { + while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) { receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; } - _connectionByInboundId.put(new Long(receiveId), con); + _connectionByInboundId.put(Long.valueOf(receiveId), con); break; // stop looping as a psuedo-wait } - } + } // ok we're in... con.setReceiveStreamId(receiveId); con.eventOccurred(); - _log.debug("Connect() conDelay = " + opts.getConnectDelay()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Connect() conDelay = " + opts.getConnectDelay()); if (opts.getConnectDelay() <= 0) { con.waitForConnect(); } @@ -258,12 +250,15 @@ public class ConnectionManager { return con; } + /** + * Doesn't need to be locked any more + * @return too many + */ private boolean locked_tooManyStreams() { if (_maxConcurrentStreams <= 0) return false; if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; int active = 0; - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); + for (Connection con : _connectionByInboundId.values()) { if (con.getIsConnected()) active++; } @@ -293,13 +288,10 @@ public class ConnectionManager { * */ public void disconnectAllHard() { - synchronized (_connectionLock) { - for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - con.disconnect(false, false); - } - _connectionByInboundId.clear(); - _connectionLock.notifyAll(); + for (Iterator<Connection> iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = iter.next(); + con.disconnect(false, false); + iter.remove(); } _tcbShare.stop(); } @@ -310,17 +302,15 @@ public class ConnectionManager { * @param con Connection to drop. */ public void removeConnection(Connection con) { - boolean removed = false; - synchronized (_connectionLock) { - Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); - removed = (o == con); + + Object o = _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId())); + boolean removed = (o == con); if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con); if (!removed && _log.shouldLog(Log.DEBUG)) _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); - _connectionLock.notifyAll(); - } + if (removed) { _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime()); MessageInputStream stream = con.getInputStream(); @@ -344,10 +334,8 @@ public class ConnectionManager { /** return a set of Connection objects * @return set of Connection objects */ - public Set listConnections() { - synchronized (_connectionLock) { + public Set<Connection> listConnections() { return new HashSet(_connectionByInboundId.values()); - } } /** blocking */ @@ -368,7 +356,7 @@ public class ConnectionManager { } public boolean ping(Destination peer, long timeoutMs, boolean blocking, PingNotifier notifier) { - Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); + Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); PacketLocal packet = new PacketLocal(_context, peer); packet.setSendStreamId(id.longValue()); packet.setFlag(Packet.FLAG_ECHO); @@ -381,9 +369,7 @@ public class ConnectionManager { PingRequest req = new PingRequest(peer, packet, notifier); - synchronized (_pendingPings) { - _pendingPings.put(id, req); - } + _pendingPings.put(id, req); _outboundQueue.enqueue(packet); packet.releasePayload(); @@ -393,10 +379,7 @@ public class ConnectionManager { if (!req.pongReceived()) try { req.wait(timeoutMs); } catch (InterruptedException ie) {} } - - synchronized (_pendingPings) { - _pendingPings.remove(id); - } + _pendingPings.remove(id); } else { SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs); } @@ -418,13 +401,8 @@ public class ConnectionManager { } public void timeReached() { - boolean removed = false; - synchronized (_pendingPings) { - Object o = _pendingPings.remove(_id); - if (o != null) - removed = true; - } - if (removed) { + PingRequest pr = _pendingPings.remove(_id); + if (pr != null) { if (_notifier != null) _notifier.pingComplete(false); if (_log.shouldLog(Log.INFO)) @@ -433,7 +411,7 @@ public class ConnectionManager { } } - private class PingRequest { + private static class PingRequest { private boolean _ponged; private Destination _peer; private PacketLocal _packet; @@ -445,7 +423,8 @@ public class ConnectionManager { _notifier = notifier; } public void pong() { - _log.debug("Ping successful"); + // static, no log + //_log.debug("Ping successful"); //_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); synchronized (ConnectionManager.PingRequest.this) { _ponged = true; @@ -458,10 +437,7 @@ public class ConnectionManager { } void receivePong(long pingId) { - PingRequest req = null; - synchronized (_pendingPings) { - req = (PingRequest)_pendingPings.remove(new Long(pingId)); - } + PingRequest req = _pendingPings.remove(Long.valueOf(pingId)); if (req != null) req.pong(); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 98165cf7de..cfd208c014 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -1,13 +1,14 @@ package net.i2p.client.streaming; -import java.util.ArrayList; -import java.util.List; +import java.util.Iterator; +import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.client.I2PSessionListener; import net.i2p.util.Log; +import net.i2p.util.ConcurrentHashSet; /** * Receive raw information from the I2PSession and turn it into @@ -18,12 +19,12 @@ public class MessageHandler implements I2PSessionListener { private ConnectionManager _manager; private I2PAppContext _context; private Log _log; - private final List _listeners; + private final Set<I2PSocketManager.DisconnectListener> _listeners; public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { _manager = mgr; _context = ctx; - _listeners = new ArrayList(1); + _listeners = new ConcurrentHashSet(1); _log = ctx.logManager().getLog(MessageHandler.class); _context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); } @@ -77,14 +78,10 @@ public class MessageHandler implements I2PSessionListener { _log.warn("I2PSession disconnected"); _manager.disconnectAllHard(); - List listeners = null; - synchronized (_listeners) { - listeners = new ArrayList(_listeners); - _listeners.clear(); - } - for (int i = 0; i < listeners.size(); i++) { - I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i); + for (Iterator<I2PSocketManager.DisconnectListener> iter = _listeners.iterator(); iter.hasNext(); ) { + I2PSocketManager.DisconnectListener lsnr = iter.next(); lsnr.sessionDisconnected(); + iter.remove(); } } @@ -104,13 +101,9 @@ public class MessageHandler implements I2PSessionListener { } public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - synchronized (_listeners) { _listeners.add(lsnr); - } } public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { - synchronized (_listeners) { _listeners.remove(lsnr); - } } } -- GitLab