diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java index fde13148cf853e10b8c810f47dd5c92796cf96b2..1c3e5cf77321c53cbd69e6fab2da9fc2aa1a50cc 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPTransport.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; import net.i2p.data.DataHelper; import net.i2p.data.Hash; @@ -140,7 +141,7 @@ public class NTCPTransport extends TransportImpl { _context.statManager().createRateStat("ntcp.writeError", "", "ntcp", RATES); _establishing = new ConcurrentHashSet(16); _conLock = new Object(); - _conByIdent = new HashMap(64); + _conByIdent = new ConcurrentHashMap(64); _finisher = new NTCPSendFinisher(ctx, this); @@ -160,7 +161,7 @@ public class NTCPTransport extends TransportImpl { _context.statManager().addRateData("ntcp.inboundEstablished", 1); markReachable(con.getRemotePeer().calculateHash(), true); //_context.shitlist().unshitlistRouter(con.getRemotePeer().calculateHash()); - NTCPConnection old = null; + NTCPConnection old; synchronized (_conLock) { old = _conByIdent.put(con.getRemotePeer().calculateHash(), con); } @@ -263,6 +264,7 @@ public class NTCPTransport extends TransportImpl { public void afterSend(OutNetMessage msg, boolean sendSuccessful, boolean allowRequeue, long msToSend) { super.afterSend(msg, sendSuccessful, allowRequeue, msToSend); } + public TransportBid bid(RouterInfo toAddress, long dataSize) { if (!isAlive()) return null; @@ -354,26 +356,23 @@ public class NTCPTransport extends TransportImpl { @Override public boolean isEstablished(Hash dest) { - synchronized (_conLock) { NTCPConnection con = _conByIdent.get(dest); return (con != null) && con.isEstablished() && !con.isClosed(); - } } @Override public boolean isBacklogged(Hash dest) { - synchronized (_conLock) { NTCPConnection con = _conByIdent.get(dest); return (con != null) && con.isEstablished() && con.tooBacklogged(); - } } void removeCon(NTCPConnection con) { NTCPConnection removed = null; - synchronized (_conLock) { - RouterIdentity ident = con.getRemotePeer(); - if (ident != null) + RouterIdentity ident = con.getRemotePeer(); + if (ident != null) { + synchronized (_conLock) { removed = _conByIdent.remove(ident.calculateHash()); + } } if ( (removed != null) && (removed != con) ) {// multiple cons, close 'em both if (_log.shouldLog(Log.WARN)) @@ -388,19 +387,17 @@ public class NTCPTransport extends TransportImpl { * */ @Override - public int countActivePeers() { synchronized (_conLock) { return _conByIdent.size(); } } + public int countActivePeers() { return _conByIdent.size(); } + /** * How many peers are we actively sending messages to (this minute) */ @Override public int countActiveSendPeers() { int active = 0; - synchronized (_conLock) { - for (Iterator iter = _conByIdent.values().iterator(); iter.hasNext(); ) { - NTCPConnection con = (NTCPConnection)iter.next(); + for (NTCPConnection con : _conByIdent.values()) { if ( (con.getTimeSinceSend() <= 60*1000) || (con.getTimeSinceReceive() <= 60*1000) ) active++; - } } return active; } @@ -416,16 +413,9 @@ public class NTCPTransport extends TransportImpl { */ @Override public Vector<Long> getClockSkews() { - - Vector<NTCPConnection> peers = new Vector(); Vector<Long> skews = new Vector(); - synchronized (_conLock) { - peers.addAll(_conByIdent.values()); - } - - for (Iterator<NTCPConnection> iter = peers.iterator(); iter.hasNext(); ) { - NTCPConnection con = iter.next(); + for (NTCPConnection con : _conByIdent.values()) { if (con.isEstablished()) skews.addElement(Long.valueOf(con.getClockSkew())); } @@ -603,6 +593,7 @@ public class NTCPTransport extends TransportImpl { void establishing(NTCPConnection con) { _establishing.add(con); } + /** * called in the EventPumper no more than once a second or so, closing * any unconnected/unestablished connections @@ -694,12 +685,10 @@ public class NTCPTransport extends TransportImpl { @Override public short getReachabilityStatus() { if (isAlive() && _myAddress != null) { - synchronized (_conLock) { for (NTCPConnection con : _conByIdent.values()) { if (con.isInbound()) return CommSystemFacade.STATUS_OK; } - } } return CommSystemFacade.STATUS_UNKNOWN; } @@ -727,17 +716,17 @@ public class NTCPTransport extends TransportImpl { // will this work? replaceAddress(null); } + public static final String STYLE = "NTCP"; public void renderStatusHTML(java.io.Writer out, int sortFlags) throws IOException {} + @Override public void renderStatusHTML(java.io.Writer out, String urlBase, int sortFlags) throws IOException { TreeSet peers = new TreeSet(getComparator(sortFlags)); - synchronized (_conLock) { - peers.addAll(_conByIdent.values()); - } - long offsetTotal = 0; + peers.addAll(_conByIdent.values()); + long offsetTotal = 0; float bpsSend = 0; float bpsRecv = 0; long totalUptime = 0; @@ -838,6 +827,7 @@ public class NTCPTransport extends TransportImpl { } private static final NumberFormat _rateFmt = new DecimalFormat("#,##0.00"); + private static String formatRate(float rate) { synchronized (_rateFmt) { return _rateFmt.format(rate); } } @@ -858,14 +848,12 @@ public class NTCPTransport extends TransportImpl { public static final AlphaComparator instance() { return _instance; } } - private static class PeerComparator implements Comparator { - public int compare(Object lhs, Object rhs) { - if ( (lhs == null) || (rhs == null) || !(lhs instanceof NTCPConnection) || !(rhs instanceof NTCPConnection)) - throw new IllegalArgumentException("rhs = " + rhs + " lhs = " + lhs); - return compare((NTCPConnection)lhs, (NTCPConnection)rhs); - } - protected int compare(NTCPConnection l, NTCPConnection r) { + private static class PeerComparator implements Comparator<NTCPConnection> { + public int compare(NTCPConnection l, NTCPConnection r) { + if (l == null || r == null) + throw new IllegalArgumentException(); // base64 retains binary ordering + // UM, no it doesn't, but close enough return l.getRemotePeer().calculateHash().toBase64().compareTo(r.getRemotePeer().calculateHash().toBase64()); } }