diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index 6abcf6b7af0354af9289cc419dcaf9ff1ab615b8..4bf1be1a616c89f7bfa9a706f6c817c2c5984b9d 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -59,7 +59,7 @@ class Connection {
     private I2PSocketFull _socket;
     /** set to an error cause if the connection could not be established */
     private String _connectionError;
-    private long _disconnectScheduledOn;
+    private final AtomicLong _disconnectScheduledOn = new AtomicLong();
     private long _lastReceivedOn;
     private final ActivityTimer _activityTimer;
     /** window size when we last saw congestion */
@@ -138,7 +138,6 @@ class Connection {
         _lastCongestionTime = -1;
         _lastCongestionHighestUnacked = -1;
         _connected = true;
-        _disconnectScheduledOn = -1;
         _lastReceivedOn = -1;
         _activityTimer = new ActivityTimer();
         _ackSinceCongestion = true;
@@ -283,10 +282,7 @@ class Connection {
      *
      */
     void sendReset() {
-        if (_disconnectScheduledOn < 0) {
-            _disconnectScheduledOn = _context.clock().now();
-            _context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
-        }
+        scheduleDisconnectEvent();
         long now = _context.clock().now();
         if (_resetSentOn + 10*1000 > now) return; // don't send resets too fast
         if (_resetReceived) return;
@@ -538,10 +534,7 @@ class Connection {
     }
     
     void resetReceived() {
-        if (_disconnectScheduledOn < 0) {
-            _disconnectScheduledOn = _context.clock().now();
-            _context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
-        }
+        scheduleDisconnectEvent();
         _resetReceived = true;
         IOException ioe = new IOException("Reset received");
         _outputStream.streamErrorOccurred(ioe);
@@ -565,7 +558,9 @@ class Connection {
     public boolean getHardDisconnected() { return _hardDisconnected; }
     public boolean getResetSent() { return _resetSent; }
     public long getResetSentOn() { return _resetSentOn; }
-    public long getDisconnectScheduledOn() { return _disconnectScheduledOn; }
+
+    /** @return 0 if not scheduled */
+    public long getDisconnectScheduledOn() { return _disconnectScheduledOn.get(); }
 
     void disconnect(boolean cleanDisconnect) {
         disconnect(cleanDisconnect, true);
@@ -591,10 +586,7 @@ class Connection {
             killOutstandingPackets();
         }
         if (removeFromConMgr) {
-            if (_disconnectScheduledOn < 0) {
-                _disconnectScheduledOn = _context.clock().now();
-                _context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
-            }
+            scheduleDisconnectEvent();
         }
         _connected = false;
     }
@@ -611,9 +603,7 @@ class Connection {
         _activityTimer.cancel();
         _inputStream.streamErrorOccurred(new IOException("disconnected!"));
         
-        if (_disconnectScheduledOn < 0) {
-            _disconnectScheduledOn = _context.clock().now();
-            
+        if (_disconnectScheduledOn.compareAndSet(0, _context.clock().now())) {
             if (_log.shouldLog(Log.INFO))
                 _log.info("Connection disconnect complete from dead, drop the con "
                           + toString());
@@ -640,6 +630,19 @@ class Connection {
         //    _context.sessionKeyManager().failTags(_remotePeer.getPublicKey());
     }
     
+    /**
+     *  Schedule the end of the TIME-WAIT state,
+     *  but only if not previously scheduled.
+     *  @return true if a new event was scheduled; false if already scheduled
+     *  @since 0.9.9
+     */
+    private boolean scheduleDisconnectEvent() {
+        if (!_disconnectScheduledOn.compareAndSet(0, _context.clock().now()))
+            return false;
+        _context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
+        return true;
+    }
+
     private class DisconnectEvent implements SimpleTimer.TimedEvent {
         public DisconnectEvent() {
             if (_log.shouldLog(Log.INFO))
@@ -809,10 +812,7 @@ class Connection {
     public long getCloseSentOn() { return _closeSentOn; }
     public void setCloseSentOn(long when) { 
         _closeSentOn = when;
-        if (_disconnectScheduledOn < 0) {
-            _disconnectScheduledOn = _context.clock().now();
-            _context.simpleScheduler().addEvent(new DisconnectEvent(), DISCONNECT_TIMEOUT);
-        }
+        scheduleDisconnectEvent();
     }
     public long getCloseReceivedOn() { return _closeReceivedOn; }
     public void setCloseReceivedOn(long when) { _closeReceivedOn = when; }
@@ -1005,7 +1005,7 @@ class Connection {
                     _outputStream.streamErrorOccurred(ioe);
                     // Clean disconnect if we have already scheduled one
                     // (generally because we already sent a close)
-                    disconnect(_disconnectScheduledOn >= 0);
+                    disconnect(_disconnectScheduledOn.get() > 0);
                     break;
             }
         }