From 8afd387ca6168a90c999ece65182d3858994b6ea Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Mon, 11 Nov 2019 13:21:16 +0000 Subject: [PATCH] hook up chat components with core --- .../main/groovy/com/muwire/core/Core.groovy | 19 +++++- .../com/muwire/core/chat/ChatManager.groovy | 63 +++++++++++++++++++ .../core/chat/UIConnectChatEvent.groovy | 8 +++ .../core/chat/UIDisconnectChatEvent.groovy | 8 +++ .../core/connection/ConnectionAcceptor.groovy | 18 +++++- 5 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/chat/ChatManager.groovy create mode 100644 core/src/main/groovy/com/muwire/core/chat/UIConnectChatEvent.groovy create mode 100644 core/src/main/groovy/com/muwire/core/chat/UIDisconnectChatEvent.groovy diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index a16040da..8e5e5999 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -3,6 +3,11 @@ package com.muwire.core import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicBoolean +import com.muwire.core.chat.ChatManager +import com.muwire.core.chat.ChatMessageEvent +import com.muwire.core.chat.ChatServer +import com.muwire.core.chat.UIConnectChatEvent +import com.muwire.core.chat.UIDisconnectChatEvent import com.muwire.core.connection.ConnectionAcceptor import com.muwire.core.connection.ConnectionEstablisher import com.muwire.core.connection.ConnectionEvent @@ -302,11 +307,23 @@ public class Core { log.info("initializing connection establisher") connectionEstablisher = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) + log.info("initializing chat server") + ChatServer chatServer = new ChatServer(eventBus, props, trustService, me) + eventBus.register(ChatMessageEvent.class, chatServer) + + log.info("initializing chat manager") + ChatManager chatManager = new ChatManager(eventBus, me, i2pConnector, trustService, props) + eventBus.with { + register(UIConnectChatEvent.class, chatManager) + register(UIDisconnectChatEvent.class, chatManager) + register(ChatMessageEvent.class, chatManager) + } + log.info("initializing acceptor") I2PAcceptor i2pAcceptor = new I2PAcceptor(socketManager) connectionAcceptor = new ConnectionAcceptor(eventBus, connectionManager, props, i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher, - certificateManager) + certificateManager, chatServer) log.info("initializing directory watcher") directoryWatcher = new DirectoryWatcher(eventBus, fileManager, home, props) diff --git a/core/src/main/groovy/com/muwire/core/chat/ChatManager.groovy b/core/src/main/groovy/com/muwire/core/chat/ChatManager.groovy new file mode 100644 index 00000000..81bc5ae6 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/chat/ChatManager.groovy @@ -0,0 +1,63 @@ +package com.muwire.core.chat + +import java.util.concurrent.ConcurrentHashMap + +import com.muwire.core.EventBus +import com.muwire.core.MuWireSettings +import com.muwire.core.Persona +import com.muwire.core.connection.I2PConnector +import com.muwire.core.trust.TrustService + +class ChatManager { + private final EventBus eventBus + private final Persona me + private final I2PConnector connector + private final TrustService trustService + private final MuWireSettings settings + + private final Map clients = new ConcurrentHashMap<>() + + ChatManager(EventBus eventBus, Persona me, I2PConnector connector, TrustService trustService, + MuWireSettings settings) { + this.eventBus = eventBus + this.me = me + this.connector = connector + this.trustService = trustService + this.settings = settings + + Timer timer = new Timer("chat-connector", true) + timer.schedule({connect()} as TimerTask, 1000, 1000) + } + + void onUIConnectChatEvent(UIConnectChatEvent e) { + if (e.host == me) + return + ChatClient client = new ChatClient(connector, eventBus, e.host, me, trustService, settings) + clients.put(e.host, client) + } + + void onUIDisconnectChatEvent(UIDisconnectChatEvent e) { + if (e.host == me) + return + ChatClient client = clients.remove(e.host) + client?.close() + } + + void onChatMessageEvent(ChatMessageEvent e) { + if (e.host == me) + return + if (e.sender != me) + return + clients[e.host]?.connection?.sendChat(e) + } + + private void connect() { + clients.each { k, v -> v.connectIfNeeded() } + } + + void shutdown() { + clients.each { k, v -> + v.close() + } + } +} diff --git a/core/src/main/groovy/com/muwire/core/chat/UIConnectChatEvent.groovy b/core/src/main/groovy/com/muwire/core/chat/UIConnectChatEvent.groovy new file mode 100644 index 00000000..c643d787 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/chat/UIConnectChatEvent.groovy @@ -0,0 +1,8 @@ +package com.muwire.core.chat + +import com.muwire.core.Event +import com.muwire.core.Persona + +class UIConnectChatEvent extends Event { + Persona host +} diff --git a/core/src/main/groovy/com/muwire/core/chat/UIDisconnectChatEvent.groovy b/core/src/main/groovy/com/muwire/core/chat/UIDisconnectChatEvent.groovy new file mode 100644 index 00000000..9f18bf72 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/chat/UIDisconnectChatEvent.groovy @@ -0,0 +1,8 @@ +package com.muwire.core.chat + +import com.muwire.core.Event +import com.muwire.core.Persona + +class UIDisconnectChatEvent extends Event { + Persona host +} diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index 63826ab8..56a19319 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -15,6 +15,7 @@ import com.muwire.core.EventBus import com.muwire.core.InfoHash import com.muwire.core.MuWireSettings import com.muwire.core.Persona +import com.muwire.core.chat.ChatServer import com.muwire.core.filecert.Certificate import com.muwire.core.filecert.CertificateManager import com.muwire.core.files.FileManager @@ -50,6 +51,7 @@ class ConnectionAcceptor { final FileManager fileManager final ConnectionEstablisher establisher final CertificateManager certificateManager + final ChatServer chatServer final ExecutorService acceptorThread final ExecutorService handshakerThreads @@ -61,7 +63,8 @@ class ConnectionAcceptor { ConnectionAcceptor(EventBus eventBus, UltrapeerConnectionManager manager, MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache, TrustService trustService, SearchManager searchManager, UploadManager uploadManager, - FileManager fileManager, ConnectionEstablisher establisher, CertificateManager certificateManager) { + FileManager fileManager, ConnectionEstablisher establisher, CertificateManager certificateManager, + ChatServer chatServer) { this.eventBus = eventBus this.manager = manager this.settings = settings @@ -73,6 +76,7 @@ class ConnectionAcceptor { this.uploadManager = uploadManager this.establisher = establisher this.certificateManager = certificateManager + this.chatServer = chatServer acceptorThread = Executors.newSingleThreadExecutor { r -> def rv = new Thread(r) @@ -154,6 +158,9 @@ class ConnectionAcceptor { case (byte)'C': processCERTIFICATES(e) break + case (byte)'I': + processIRC(e) + break default: throw new Exception("Invalid read $read") } @@ -498,5 +505,14 @@ class ConnectionAcceptor { e.close() } } + + private void processIRC(Endpoint e) { + byte[] IRC = new byte[4] + DataInputStream dis = new DataInputStream(e.getInputStream()) + dis.readFully(IRC) + if (IRC != "RC\r\n".getBytes(StandardCharsets.US_ASCII)) + throw new Exception("Invalid IRC connection") + chatServer.handle(e) + } }