diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index d9dd9b48869c7fce1e8ebcd41ea15932aefbf9ce..f825f25a726a661bae8040b6cd34cd070130b550 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -6,6 +6,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import net.i2p.I2PAppContext;
 import net.i2p.client.I2PSession;
@@ -29,7 +30,7 @@ public class Connection {
     private long _sendStreamId;
     private long _receiveStreamId;
     private long _lastSendTime;
-    private long _lastSendId;
+    private AtomicLong _lastSendId;
     private boolean _resetReceived;
     private boolean _resetSent;
     private long _resetSentOn;
@@ -49,7 +50,7 @@ public class Connection {
     private boolean _isInbound;
     private boolean _updatedShareOpts;
     /** Packet ID (Long) to PacketLocal for sent but unacked packets */
-    private final Map _outboundPackets;
+    private final Map<Long, PacketLocal> _outboundPackets;
     private PacketQueue _outboundQueue;
     private ConnectionPacketHandler _handler;
     private ConnectionOptions _options;
@@ -102,7 +103,7 @@ public class Connection {
         _options = (opts != null ? opts : new ConnectionOptions());
         _outputStream.setWriteTimeout((int)_options.getWriteTimeout());
         _inputStream.setReadTimeout((int)_options.getReadTimeout());
-        _lastSendId = -1;
+        _lastSendId = new AtomicLong(-1);
         _nextSendTime = -1;
         _ackedPackets = 0;
         _createdOn = _context.clock().now();
@@ -137,9 +138,7 @@ public class Connection {
     }
     
     public long getNextOutboundPacketNum() { 
-        synchronized (this) {
-            return ++_lastSendId;
-        }
+        return _lastSendId.incrementAndGet();
     }
     
     void closeReceived() {
@@ -175,7 +174,7 @@ public class Connection {
                        return false;
                 started = true;
                 if ( (_outboundPackets.size() >= _options.getWindowSize()) || (_activeResends > 0) ||
-                     (_lastSendId - _highestAckedThrough > _options.getWindowSize()) ) {
+                     (_lastSendId.get() - _highestAckedThrough > _options.getWindowSize()) ) {
                     if (timeoutMs > 0) {
                         if (timeLeft <= 0) {
                             if (_log.shouldLog(Log.INFO))
@@ -211,10 +210,10 @@ public class Connection {
     void ackImmediately() {
         PacketLocal packet = null;
         synchronized (_outboundPackets) {
-            if (_outboundPackets.size() > 0) {
+            if (!_outboundPackets.isEmpty()) {
                 // ordered, so pick the lowest to retransmit
-                Iterator iter = _outboundPackets.values().iterator();
-                packet = (PacketLocal)iter.next();
+                Iterator<PacketLocal> iter = _outboundPackets.values().iterator();
+                packet = iter.next();
                 //iter.remove();
             }
         }
@@ -403,10 +402,10 @@ public class Connection {
             }
         }
         
-        List acked = null;
+        List<PacketLocal> acked = null;
         synchronized (_outboundPackets) {
-            for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
-                Long id = (Long)iter.next();
+            for (Iterator<Long> iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) {
+                Long id = iter.next();
                 if (id.longValue() <= ackThrough) {
                     boolean nacked = false;
                     if (nacks != null) {
@@ -414,7 +413,7 @@ public class Connection {
                         for (int i = 0; i < nacks.length; i++) {
                             if (nacks[i] == id.longValue()) {
                                 nacked = true;
-                                PacketLocal nackedPacket = (PacketLocal)_outboundPackets.get(id);
+                                PacketLocal nackedPacket = _outboundPackets.get(id);
                                 nackedPacket.incrementNACKs();
                                 break; // NACKed
                             }
@@ -423,7 +422,7 @@ public class Connection {
                     if (!nacked) { // aka ACKed
                         if (acked == null) 
                             acked = new ArrayList(1);
-                        PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id);
+                        PacketLocal ackedPacket = _outboundPackets.get(id);
                         ackedPacket.ackReceived();
                         acked.add(ackedPacket);
                     }
@@ -433,7 +432,7 @@ public class Connection {
             }
             if (acked != null) {
                 for (int i = 0; i < acked.size(); i++) {
-                    PacketLocal p = (PacketLocal)acked.get(i);
+                    PacketLocal p = acked.get(i);
                     _outboundPackets.remove(new Long(p.getSequenceNum()));
                     _ackedPackets++;
                     if (p.getNumSends() > 1) {
@@ -443,7 +442,7 @@ public class Connection {
                     }
                 }
             }
-            if ( (_outboundPackets.size() <= 0) && (_activeResends != 0) ) {
+            if ( (_outboundPackets.isEmpty()) && (_activeResends != 0) ) {
                 if (_log.shouldLog(Log.INFO))
                     _log.info("All outbound packets acked, clearing " + _activeResends);
                 _activeResends = 0;
@@ -570,8 +569,8 @@ public class Connection {
     private void killOutstandingPackets() {
         //boolean tagsCancelled = false;
         synchronized (_outboundPackets) {
-            for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
-                PacketLocal pl = (PacketLocal)iter.next();
+            for (Iterator<PacketLocal> iter = _outboundPackets.values().iterator(); iter.hasNext(); ) {
+                PacketLocal pl = iter.next();
                 //if ( (pl.getTagsSent() != null) && (pl.getTagsSent().size() > 0) )
                 //    tagsCancelled = true;
                 pl.cancelled();
@@ -652,11 +651,11 @@ public class Connection {
     /** What was the last packet Id sent to the peer?
      * @return The last sent packet ID
      */
-    public long getLastSendId() { return _lastSendId; }
+    public long getLastSendId() { return _lastSendId.get(); }
     /** Set the packet Id that was sent to a peer.
      * @param id The packet ID
      */
-    public void setLastSendId(long id) { _lastSendId = id; }
+    public void setLastSendId(long id) { _lastSendId.set(id); }
     
     /**
      * Retrieve the current ConnectionOptions.
@@ -783,7 +782,7 @@ public class Connection {
         if (_ackSinceCongestion) {
             _lastCongestionSeenAt = _options.getWindowSize();
             _lastCongestionTime = _context.clock().now();
-            _lastCongestionHighestUnacked = _lastSendId;
+            _lastCongestionHighestUnacked = _lastSendId.get();
             _ackSinceCongestion = false;
         }
     }
@@ -1022,7 +1021,7 @@ public class Connection {
         }
         if (getCloseReceivedOn() > 0)
             buf.append(" close received ").append(DataHelper.formatDuration(_context.clock().now() - getCloseReceivedOn())).append(" ago");
-        buf.append(" sent: ").append(1 + _lastSendId);
+        buf.append(" sent: ").append(1 + _lastSendId.get());
         if (_inputStream != null)
             buf.append(" rcvd: ").append(1 + _inputStream.getHighestBlockId() - missing);
         
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index af44c41f24b1f0d02552f9241b5700732bb3a4f4..685b87b06132f9ceccae1e9cce787d570efe300c 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -1,10 +1,10 @@
 package net.i2p.client.streaming;
 
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import net.i2p.I2PAppContext;
 import net.i2p.I2PException;
@@ -32,14 +32,13 @@ public class ConnectionManager {
     private ConnectionPacketHandler _conPacketHandler;
     private TCBShare _tcbShare;
     /** Inbound stream ID (Long) to Connection map */
-    private Map _connectionByInboundId;
+    private ConcurrentHashMap<Long, Connection> _connectionByInboundId;
     /** Ping ID (Long) to PingRequest */
-    private final Map _pendingPings;
+    private final Map<Long, PingRequest> _pendingPings;
     private boolean _allowIncoming;
     private int _maxConcurrentStreams;
     private ConnectionOptions _defaultOptions;
     private volatile int _numWaiting;
-    private final Object _connectionLock;
     private long SoTimeout;
     
     public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) {
@@ -48,9 +47,8 @@ public class ConnectionManager {
         _maxConcurrentStreams = maxConcurrent;
         _defaultOptions = defaultOptions;
         _log = _context.logManager().getLog(ConnectionManager.class);
-        _connectionByInboundId = new HashMap(32);
-        _pendingPings = new HashMap(4);
-        _connectionLock = new Object();
+        _connectionByInboundId = new ConcurrentHashMap(32);
+        _pendingPings = new ConcurrentHashMap(4);
         _messageHandler = new MessageHandler(_context, this);
         _packetHandler = new PacketHandler(_context, this);
         _connectionHandler = new ConnectionHandler(_context, this);
@@ -77,22 +75,17 @@ public class ConnectionManager {
     }
     
     Connection getConnectionByInboundId(long id) {
-        synchronized (_connectionLock) {
-            return (Connection)_connectionByInboundId.get(new Long(id));
-        }
+        return _connectionByInboundId.get(Long.valueOf(id));
     }
     /** 
      * not guaranteed to be unique, but in case we receive more than one packet
      * on an inbound connection that we havent ack'ed yet...
      */
     Connection getConnectionByOutboundId(long id) {
-        synchronized (_connectionLock) {
-            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
-                Connection con = (Connection)iter.next();
+            for (Connection con : _connectionByInboundId.values()) {
                 if (DataHelper.eq(con.getSendStreamId(), id))
                     return con;
             }
-        }
         return null;
     }
     
@@ -135,27 +128,26 @@ public class ConnectionManager {
         boolean reject = false;
         int active = 0;
         int total = 0;
-        synchronized (_connectionLock) {
-            total = _connectionByInboundId.size();
-            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
-                if ( ((Connection)iter.next()).getIsConnected() )
-                    active++;
-            }
+
+            // just for the stat
+            //total = _connectionByInboundId.size();
+            //for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
+            //    if ( ((Connection)iter.next()).getIsConnected() )
+            //        active++;
+            //}
             if (locked_tooManyStreams()) {
                 reject = true;
             } else { 
                 while (true) {
-                    Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
+                    Connection oldCon = _connectionByInboundId.putIfAbsent(Long.valueOf(receiveId), con);
                     if (oldCon == null) {
                         break;
                     } else { 
-                        _connectionByInboundId.put(new Long(receiveId), oldCon);
                         // receiveId already taken, try another
                         receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
                     }
                 }
             }
-        }
         
         _context.statManager().addRateData("stream.receiveActive", active, total);
         
@@ -179,9 +171,7 @@ public class ConnectionManager {
         try {
             con.getPacketHandler().receivePacket(synPacket, con);
         } catch (I2PException ie) {
-            synchronized (_connectionLock) {
-                _connectionByInboundId.remove(new Long(receiveId));
-            }
+            _connectionByInboundId.remove(Long.valueOf(receiveId));
             return null;
         }
         
@@ -215,8 +205,7 @@ public class ConnectionManager {
                 _numWaiting--;
                 return null;
             }
-            boolean reject = false;
-            synchronized (_connectionLock) {
+
                 if (locked_tooManyStreams()) {
                     // allow a full buffer of pending/waiting streams
                     if (_numWaiting > _maxConcurrentStreams) {
@@ -227,27 +216,30 @@ public class ConnectionManager {
                         _numWaiting--;
                         return null;
                     }
-                    
+
                     // no remaining streams, lets wait a bit
-                    try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
+                    // got rid of the lock, so just sleep (fixme?)
+                    // try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {}
+                    try { Thread.sleep(remaining/4); } catch (InterruptedException ie) {}
                 } else { 
                     con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
                     con.setRemotePeer(peer);
             
-                    while (_connectionByInboundId.containsKey(new Long(receiveId))) {
+                    while (_connectionByInboundId.containsKey(Long.valueOf(receiveId))) {
                         receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
                     }
-                    _connectionByInboundId.put(new Long(receiveId), con);
+                    _connectionByInboundId.put(Long.valueOf(receiveId), con);
                     break; // stop looping as a psuedo-wait
                 }
-            }
+
         }
 
         // ok we're in...
         con.setReceiveStreamId(receiveId);        
         con.eventOccurred();
         
-        _log.debug("Connect() conDelay = " + opts.getConnectDelay());
+        if (_log.shouldLog(Log.DEBUG))
+            _log.debug("Connect() conDelay = " + opts.getConnectDelay());
         if (opts.getConnectDelay() <= 0) {
             con.waitForConnect();
         }
@@ -258,12 +250,15 @@ public class ConnectionManager {
         return con;
     }
 
+    /**
+     *  Doesn't need to be locked any more
+     *  @return too many
+     */
     private boolean locked_tooManyStreams() {
         if (_maxConcurrentStreams <= 0) return false;
         if (_connectionByInboundId.size() < _maxConcurrentStreams) return false;
         int active = 0;
-        for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
-            Connection con = (Connection)iter.next();
+        for (Connection con : _connectionByInboundId.values()) {
             if (con.getIsConnected())
                 active++;
         }
@@ -293,13 +288,10 @@ public class ConnectionManager {
      *
      */
     public void disconnectAllHard() {
-        synchronized (_connectionLock) {
-            for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
-                Connection con = (Connection)iter.next();
-                con.disconnect(false, false);
-            }
-            _connectionByInboundId.clear();
-            _connectionLock.notifyAll();
+        for (Iterator<Connection> iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
+            Connection con = iter.next();
+            con.disconnect(false, false);
+            iter.remove();
         }
         _tcbShare.stop();
     }
@@ -310,17 +302,15 @@ public class ConnectionManager {
      * @param con Connection to drop.
      */
     public void removeConnection(Connection con) {
-        boolean removed = false;
-        synchronized (_connectionLock) {
-            Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
-            removed = (o == con);
+
+            Object o = _connectionByInboundId.remove(Long.valueOf(con.getReceiveStreamId()));
+            boolean removed = (o == con);
             if (_log.shouldLog(Log.DEBUG))
                 _log.debug("Connection removed? " + removed + " remaining: " 
                            + _connectionByInboundId.size() + ": " + con);
             if (!removed && _log.shouldLog(Log.DEBUG))
                 _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values());
-            _connectionLock.notifyAll();
-        }
+
         if (removed) {
             _context.statManager().addRateData("stream.con.lifetimeMessagesSent", 1+con.getLastSendId(), con.getLifetime());
             MessageInputStream stream = con.getInputStream();
@@ -344,10 +334,8 @@ public class ConnectionManager {
     /** return a set of Connection objects
      * @return set of Connection objects
      */
-    public Set listConnections() {
-        synchronized (_connectionLock) {
+    public Set<Connection> listConnections() {
             return new HashSet(_connectionByInboundId.values());
-        }
     }
 
     /** blocking */
@@ -368,7 +356,7 @@ public class ConnectionManager {
     }
 
     public boolean ping(Destination peer, long timeoutMs, boolean blocking, PingNotifier notifier) {
-        Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
+        Long id = Long.valueOf(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
         PacketLocal packet = new PacketLocal(_context, peer);
         packet.setSendStreamId(id.longValue());
         packet.setFlag(Packet.FLAG_ECHO);
@@ -381,9 +369,7 @@ public class ConnectionManager {
         
         PingRequest req = new PingRequest(peer, packet, notifier);
         
-        synchronized (_pendingPings) {
-            _pendingPings.put(id, req);
-        }
+        _pendingPings.put(id, req);
         
         _outboundQueue.enqueue(packet);
         packet.releasePayload();
@@ -393,10 +379,7 @@ public class ConnectionManager {
                 if (!req.pongReceived())
                     try { req.wait(timeoutMs); } catch (InterruptedException ie) {}
             }
-            
-            synchronized (_pendingPings) {
-                _pendingPings.remove(id);
-            }
+            _pendingPings.remove(id);
         } else {
             SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
         }
@@ -418,13 +401,8 @@ public class ConnectionManager {
         }
         
         public void timeReached() {
-            boolean removed = false;
-            synchronized (_pendingPings) {
-                Object o = _pendingPings.remove(_id);
-                if (o != null)
-                    removed = true;
-            }
-            if (removed) {
+            PingRequest pr = _pendingPings.remove(_id);
+            if (pr != null) {
                 if (_notifier != null)
                     _notifier.pingComplete(false);
                 if (_log.shouldLog(Log.INFO))
@@ -433,7 +411,7 @@ public class ConnectionManager {
         }
     }
     
-    private class PingRequest {
+    private static class PingRequest {
         private boolean _ponged;
         private Destination _peer;
         private PacketLocal _packet;
@@ -445,7 +423,8 @@ public class ConnectionManager {
             _notifier = notifier;
         }
         public void pong() { 
-            _log.debug("Ping successful");
+            // static, no log
+            //_log.debug("Ping successful");
             //_context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent());
             synchronized (ConnectionManager.PingRequest.this) {
                 _ponged = true; 
@@ -458,10 +437,7 @@ public class ConnectionManager {
     }
     
     void receivePong(long pingId) {
-        PingRequest req = null;
-        synchronized (_pendingPings) {
-            req = (PingRequest)_pendingPings.remove(new Long(pingId));
-        }
+        PingRequest req = _pendingPings.remove(Long.valueOf(pingId));
         if (req != null) 
             req.pong();
     }
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java
index 98165cf7de165a0b21bb518d12f191f532689caa..cfd208c014bf89f95a2e5ee6e567971a08fd8bcb 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java
@@ -1,13 +1,14 @@
 package net.i2p.client.streaming;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Iterator;
+import java.util.Set;
 
 import net.i2p.I2PAppContext;
 import net.i2p.client.I2PSession;
 import net.i2p.client.I2PSessionException;
 import net.i2p.client.I2PSessionListener;
 import net.i2p.util.Log;
+import net.i2p.util.ConcurrentHashSet;
 
 /**
  * Receive raw information from the I2PSession and turn it into
@@ -18,12 +19,12 @@ public class MessageHandler implements I2PSessionListener {
     private ConnectionManager _manager;
     private I2PAppContext _context;
     private Log _log;
-    private final List _listeners;
+    private final Set<I2PSocketManager.DisconnectListener> _listeners;
     
     public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) {
         _manager = mgr;
         _context = ctx;
-        _listeners = new ArrayList(1);
+        _listeners = new ConcurrentHashSet(1);
         _log = ctx.logManager().getLog(MessageHandler.class);
         _context.statManager().createRateStat("stream.packetReceiveFailure", "When do we fail to decrypt or otherwise receive a packet sent to us?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
     }
@@ -77,14 +78,10 @@ public class MessageHandler implements I2PSessionListener {
             _log.warn("I2PSession disconnected");
         _manager.disconnectAllHard();
         
-        List listeners = null;
-        synchronized (_listeners) {
-            listeners = new ArrayList(_listeners);
-            _listeners.clear();
-        }
-        for (int i = 0; i < listeners.size(); i++) {
-            I2PSocketManager.DisconnectListener lsnr = (I2PSocketManager.DisconnectListener)listeners.get(i);
+        for (Iterator<I2PSocketManager.DisconnectListener> iter = _listeners.iterator(); iter.hasNext(); ) {
+            I2PSocketManager.DisconnectListener lsnr = iter.next();
             lsnr.sessionDisconnected();
+            iter.remove();
         }
     }
 
@@ -104,13 +101,9 @@ public class MessageHandler implements I2PSessionListener {
     }
     
     public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { 
-        synchronized (_listeners) {
             _listeners.add(lsnr);
-        }
     }
     public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) {
-        synchronized (_listeners) {
             _listeners.remove(lsnr);
-        }
     }
 }