From dd287bae18cdd6645d6a7a0c3213b9841b822ae2 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Sat, 28 Jul 2018 20:19:02 +0100 Subject: [PATCH] add disconnection event, publish it on closing connections, handle it in connection manager --- core/src/main/groovy/com/muwire/core/Core.groovy | 2 ++ .../com/muwire/core/connection/Connection.groovy | 3 +-- .../core/connection/ConnectionManager.groovy | 2 ++ .../core/connection/DisconnectionEvent.groovy | 15 +++++++++++++++ .../core/connection/LeafConnectionManager.groovy | 7 +++++++ .../connection/UltrapeerConnectionManager.groovy | 9 +++++++++ 6 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/connection/DisconnectionEvent.groovy diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 3658067b..7e278ba4 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -4,6 +4,7 @@ import com.muwire.core.connection.ConnectionAcceptor import com.muwire.core.connection.ConnectionEstablisher import com.muwire.core.connection.ConnectionEvent import com.muwire.core.connection.ConnectionManager +import com.muwire.core.connection.DisconnectionEvent import com.muwire.core.connection.I2PAcceptor import com.muwire.core.connection.I2PConnector import com.muwire.core.connection.LeafConnectionManager @@ -88,6 +89,7 @@ class Core { new LeafConnectionManager(eventBus,3, hostCache) : new UltrapeerConnectionManager(eventBus, 512, 512, hostCache) eventBus.register(TrustEvent.class, connectionManager) eventBus.register(ConnectionEvent.class, connectionManager) + eventBus.register(DisconnectionEvent.class, connectionManager) connectionManager.start() log.info("initializing cache client") diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 6eaa649f..e4c00c9a 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -65,8 +65,7 @@ abstract class Connection implements Closeable { } reader.interrupt() writer.interrupt() - reader.join() - writer.join() + eventBus.publish(new DisconnectionEvent(destination: endpoint.destination)) } protected void readLoop() { diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy index 1c031085..efa96c15 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy @@ -53,6 +53,8 @@ abstract class ConnectionManager { abstract void onConnectionEvent(ConnectionEvent e) + abstract void onDisconnectionEvent(DisconnectionEvent e) + protected void sendPings() { final long now = System.currentTimeMillis() getConnections().each { diff --git a/core/src/main/groovy/com/muwire/core/connection/DisconnectionEvent.groovy b/core/src/main/groovy/com/muwire/core/connection/DisconnectionEvent.groovy new file mode 100644 index 00000000..ef8fe59a --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/connection/DisconnectionEvent.groovy @@ -0,0 +1,15 @@ +package com.muwire.core.connection + +import com.muwire.core.Event + +import net.i2p.data.Destination + +class DisconnectionEvent extends Event { + + Destination destination + + @Override + public String toString() { + "DisconnectionEvent ${super.toString()} destination:${destianation.toBase32()}" + } +} diff --git a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy index 4be9de32..19f9a79f 100644 --- a/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/LeafConnectionManager.groovy @@ -55,4 +55,11 @@ class LeafConnectionManager extends ConnectionManager { c.start() } + @Override + public void onDisconnectionEvent(DisconnectionEvent e) { + def removed = connections.remove(e.destination) + if (removed == null) + log.severe("removed destination not present in connection manager ${e.destination.toBase32()}") + } + } diff --git a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy index 2a4f5eea..b19ca890 100644 --- a/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/UltrapeerConnectionManager.groovy @@ -72,4 +72,13 @@ class UltrapeerConnectionManager extends ConnectionManager { map.put(e.endpoint.destination, c) c.start() } + + @Override + public void onDisconnectionEvent(DisconnectionEvent e) { + def removed = peerConnections.remove(e.destination) + if (removed == null) + removed = leafConnections.remove(e.destination) + if (removed == null) + log.severe("Removed connection not present in either leaf or peer map ${e.destination.toBase32()}") + } }