diff --git a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java index 55c0082eba1c630319dcfe67eda314b9bf803a46..872b744926c061d1101ae28ba26f1c61736b6c23 100644 --- a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java +++ b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java @@ -25,6 +25,7 @@ import net.i2p.router.MessageSelector; import net.i2p.router.OutNetMessage; import net.i2p.router.ReplyJob; import net.i2p.router.RouterContext; +import net.i2p.util.ConcurrentHashSet; import net.i2p.util.Log; import net.i2p.util.SimpleTimer; @@ -37,7 +38,14 @@ public class OutboundMessageRegistry { private final List<MessageSelector> _selectors; /** map of active MessageSelector to either an OutNetMessage or a List of OutNetMessages causing it (for quick removal) */ private final Map<MessageSelector, Object> _selectorToMessage; - /** set of active OutNetMessage (for quick removal and selector fetching) */ + /** + * set of active OutNetMessage (for quick removal and selector fetching) + * !! Really? seems only for dup detection in registerPending(). + * Changed to concurrent, but it could perhaps be removed completely, + * It would seem difficult to add a dup since every OutNetMessage is different, + * and it's generally instantiated just before ctx.outNetMessagePool().add(). + * But in TransportImpl.afterSend() it does requeue a previous ONM if allowRequeue=true. + */ private final Set<OutNetMessage> _activeMessages; private final CleanupTask _cleanupTask; private final RouterContext _context; @@ -47,7 +55,7 @@ public class OutboundMessageRegistry { _log = _context.logManager().getLog(OutboundMessageRegistry.class); _selectors = new ArrayList(64); _selectorToMessage = new HashMap(64); - _activeMessages = new HashSet(64); + _activeMessages = new ConcurrentHashSet(64); _cleanupTask = new CleanupTask(); } @@ -63,9 +71,7 @@ public class OutboundMessageRegistry { } // Calling the fail job for every active message would // be way too much at shutdown/restart, right? - synchronized (_activeMessages) { - _activeMessages.clear(); - } + _activeMessages.clear(); } /** @@ -84,6 +90,10 @@ public class OutboundMessageRegistry { * * This is called only by InNetMessagePool. * + * TODO this calls isMatch() in the selectors from inside the lock, which + * can lead to deadlocks if the selector does too much in isMatch(). + * Remove the lock if possible. + * * @param message Payload received that may be a reply to something we sent * @return non-null List of OutNetMessage describing messages that were waiting for * the payload @@ -97,7 +107,7 @@ public class OutboundMessageRegistry { //for (Iterator<MessageSelector> iter = _selectors.iterator(); iter.hasNext(); ) { // MessageSelector sel = iter.next(); for (int i = 0; i < _selectors.size(); i++) { - MessageSelector sel = (MessageSelector)_selectors.get(i); + MessageSelector sel = _selectors.get(i); boolean isMatch = sel.isMatch(message); if (isMatch) { if (matchedSelectors == null) matchedSelectors = new ArrayList(1); @@ -141,13 +151,9 @@ public class OutboundMessageRegistry { } if (removed) { if (msg != null) { - synchronized (_activeMessages) { - _activeMessages.remove(msg); - } + _activeMessages.remove(msg); } else if (msgs != null) { - synchronized (_activeMessages) { - _activeMessages.removeAll(msgs); - } + _activeMessages.removeAll(msgs); } } } @@ -195,11 +201,9 @@ public class OutboundMessageRegistry { MessageSelector sel = msg.getReplySelector(); if (sel == null) throw new IllegalArgumentException("No reply selector? wtf"); - boolean alreadyPending = false; - synchronized (_activeMessages) { - if (!_activeMessages.add(msg)) - return; // dont add dups - } + if (!_activeMessages.add(msg)) + return; // dont add dups + synchronized (_selectorToMessage) { Object oldMsg = _selectorToMessage.put(sel, msg); if (oldMsg != null) { @@ -246,7 +250,7 @@ public class OutboundMessageRegistry { } if (!stillActive) synchronized (_selectors) { _selectors.remove(sel); } - synchronized (_activeMessages) { _activeMessages.remove(msg); } + _activeMessages.remove(msg); } /** @deprecated unused */ @@ -293,16 +297,12 @@ public class OutboundMessageRegistry { } } if (msg != null) { - synchronized (_activeMessages) { - _activeMessages.remove(msg); - } + _activeMessages.remove(msg); Job fail = msg.getOnFailedReplyJob(); if (fail != null) _context.jobQueue().addJob(fail); } else if (msgs != null) { - synchronized (_activeMessages) { - _activeMessages.removeAll(msgs); - } + _activeMessages.removeAll(msgs); for (OutNetMessage m : msgs) { Job fail = m.getOnFailedReplyJob(); if (fail != null)