diff --git a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy index a31384a5..6618f9ac 100644 --- a/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy +++ b/core/src/main/groovy/com/muwire/core/MuWireSettings.groovy @@ -32,6 +32,7 @@ class MuWireSettings { boolean searchComments boolean browseFiles boolean enableChat + int maxChatConnections Set watchedDirectories float downloadSequentialRatio int hostClearInterval, hostHopelessInterval, hostRejectInterval @@ -81,6 +82,7 @@ class MuWireSettings { totalUploadSlots = Integer.valueOf(props.getProperty("totalUploadSlots","-1")) uploadSlotsPerUser = Integer.valueOf(props.getProperty("uploadSlotsPerUser","-1")) enableChat = Boolean.valueOf(props.getProperty("enableChat","false")) + maxChatConnections = Integer.valueOf(props.get("maxChatConnections", "-1")) watchedDirectories = DataUtil.readEncodedSet(props, "watchedDirectories") watchedKeywords = DataUtil.readEncodedSet(props, "watchedKeywords") @@ -130,6 +132,7 @@ class MuWireSettings { props.setProperty("totalUploadSlots", String.valueOf(totalUploadSlots)) props.setProperty("uploadSlotsPerUser", String.valueOf(uploadSlotsPerUser)) props.setProperty("enableChat", String.valueOf(enableChat)) + props.setProperty("maxChatConnectios", String.valueOf(maxChatConnections)) DataUtil.writeEncodedSet(watchedDirectories, "watchedDirectories", props) DataUtil.writeEncodedSet(watchedKeywords, "watchedKeywords", props) diff --git a/core/src/main/groovy/com/muwire/core/chat/ChatClient.groovy b/core/src/main/groovy/com/muwire/core/chat/ChatClient.groovy new file mode 100644 index 00000000..8ac54e1b --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/chat/ChatClient.groovy @@ -0,0 +1,108 @@ +package com.muwire.core.chat + +import java.nio.charset.StandardCharsets +import java.util.concurrent.Executor +import java.util.concurrent.Executors + +import com.muwire.core.Constants +import com.muwire.core.EventBus +import com.muwire.core.MuWireSettings +import com.muwire.core.Persona +import com.muwire.core.connection.Endpoint +import com.muwire.core.connection.I2PConnector +import com.muwire.core.trust.TrustService +import com.muwire.core.util.DataUtil + +import groovy.util.logging.Log + +@Log +class ChatClient implements Closeable { + + private static final long REJECTION_BACKOFF = 60 * 1000 + + private static final Executor CONNECTOR = Executors.newCachedThreadPool() + + private final I2PConnector connector + private final EventBus eventBus + private final Persona host, me + private final TrustService trustService + private final MuWireSettings settings + + private volatile ChatConnection connection + private volatile boolean connectInProgress + private volatile long lastRejectionTime + private volatile Thread connectThread + + ChatClient(I2PConnector connector, EventBus eventBus, Persona host, Persona me, TrustService trustService, + MuWireSettings settings) { + this.connector = connector + this.eventBus = eventBus + this.host = host + this.me = me + this.trustService = trustService + this.settings = settings + } + + void connectIfNeeded() { + if (connection != null || connectInProgress || (System.currentTimeMillis() - lastRejectionTime < REJECTION_BACKOFF)) + return + CONNECTOR.execute({connect()}) + } + + private void connect() { + connectInProgress = true + connectThread = Thread.currentThread() + Endpoint endpoint = null + try { + eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.CONNECTING, persona : host)) + endpoint = connector.connect(host.destination) + DataOutputStream dos = new DataOutputStream(endpoint.getOutputStream()) + DataInputStream dis = new DataInputStream(endpoint.getInputStream()) + + dos.with { + write("IRC\r\n".getBytes(StandardCharsets.US_ASCII)) + write("Version:${Constants.CHAT_VERSION}\r\n".getBytes(StandardCharsets.US_ASCII)) + write("Persona:${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII)) + write("\r\n".getBytes(StandardCharsets.US_ASCII)) + flush() + } + + String codeString = DataUtil.readTillRN(dis) + int code = Integer.parseInt(codeString.split(" ")[0]) + + if (code == 429) { + eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.REJECTED, persona : host)) + endpoint.close() + lastRejectionTime = System.currentTimeMillis() + return + } + + if (code != 200) + throw new Exception("unknown code $code") + + Map headers = DataUtil.readAllHeaders(dis) + if (!headers.containsKey('Version')) + throw new Exception("Version header missing") + + int version = Integer.parseInt(headers['Version']) + if (version != Constants.CHAT_VERSION) + throw new Exception("Unknown chat version $version") + + connection = new ChatConnection(eventBus, endpoint, host, false, trustService, settings) + eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.SUCCESSFUL, persona : host, + connection : connection)) + } catch (Exception e) { + eventBus.publish(new ChatConnectionEvent(status : ChatConnectionAttemptStatus.FAILED, persona : host)) + endpoint?.close() + } finally { + connectInProgress = false + connectThread = null + } + } + + @Override + public void close() { + connectThread?.interrupt() + connection?.close() + } +} diff --git a/core/src/main/groovy/com/muwire/core/chat/ChatConnectionAttemptStatus.java b/core/src/main/groovy/com/muwire/core/chat/ChatConnectionAttemptStatus.java index d736eca4..8ef9b1ef 100644 --- a/core/src/main/groovy/com/muwire/core/chat/ChatConnectionAttemptStatus.java +++ b/core/src/main/groovy/com/muwire/core/chat/ChatConnectionAttemptStatus.java @@ -1,5 +1,5 @@ package com.muwire.core.chat; public enum ChatConnectionAttemptStatus { - SUCCESSFUL, REJECTED, FAILED + CONNECTING, SUCCESSFUL, REJECTED, FAILED } diff --git a/core/src/main/groovy/com/muwire/core/chat/ChatConnectionEvent.groovy b/core/src/main/groovy/com/muwire/core/chat/ChatConnectionEvent.groovy index 65b314ec..4809d782 100644 --- a/core/src/main/groovy/com/muwire/core/chat/ChatConnectionEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/chat/ChatConnectionEvent.groovy @@ -6,4 +6,5 @@ import com.muwire.core.Persona class ChatConnectionEvent extends Event { ChatConnectionAttemptStatus status Persona persona + ChatConnection connection } diff --git a/core/src/main/groovy/com/muwire/core/chat/ChatServer.groovy b/core/src/main/groovy/com/muwire/core/chat/ChatServer.groovy new file mode 100644 index 00000000..cdb53a88 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/chat/ChatServer.groovy @@ -0,0 +1,77 @@ +package com.muwire.core.chat + +import java.nio.charset.StandardCharsets +import java.util.concurrent.ConcurrentHashMap + +import com.muwire.core.Constants +import com.muwire.core.EventBus +import com.muwire.core.MuWireSettings +import com.muwire.core.Persona +import com.muwire.core.connection.Endpoint +import com.muwire.core.trust.TrustService +import com.muwire.core.util.DataUtil + +import net.i2p.data.Base64 +import net.i2p.data.Destination +import net.i2p.util.ConcurrentHashSet + +class ChatServer { + private final EventBus eventBus + private final MuWireSettings settings + private final TrustService trustService + + private final Map connections = new ConcurrentHashMap() + + ChatServer(EventBus eventBus, MuWireSettings settings, TrustService trustService) { + this.eventBus = eventBus + this.settings = settings + this.trustService = trustService + } + + public void handle(Endpoint endpoint) { + InputStream is = endpoint.getInputStream() + OutputStream os = endpoint.getOutputStream() + + Map headers = DataUtil.readAllHeaders(is) + + if (!headers.containsKey("Version")) + throw new Exception("Version header missing") + + int version = Integer.parseInt(headers['Version']) + if (version != Constants.CHAT_VERSION) + throw new Exception("Unknown chat version $version") + + if (!headers.containsKey('Persona')) + throw new Exception("Persona header missing") + + Persona client = new Persona(new ByteArrayInputStream(Base64.decode(headers['Persona']))) + if (client.destination != endpoint.destination) + throw new Exception("Client destination mismatch") + + if (!settings.enableChat) { + os.write("400 Chat Not Enabled\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.close() + endpoint.close() + return + } + + if (connections.containsKey(client.destination) || connections.size() == settings.maxChatConnections) { + os.write("429 Rejected\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + os.close() + endpoint.close() + return + } + + os.with { + write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) + write("Version:${Constants.CHAT_VERSION}\r\n".getBytes(StandardCharsets.US_ASCII)) + write("\r\n".getBytes(StandardCharsets.US_ASCII)) + flush() + } + + ChatConnection connection = new ChatConnection(eventBus, endpoint, client, true, trustService, settings) + connections.put(endpoint.destination, connection) + connection.start() + eventBus.publish(new ChatConnectionEvent(connection : connection, status : ChatConnectionAttemptStatus.SUCCESSFUL, persona : client)) + } +} diff --git a/core/src/main/java/com/muwire/core/Constants.java b/core/src/main/java/com/muwire/core/Constants.java index 7c17236f..40ef5744 100644 --- a/core/src/main/java/com/muwire/core/Constants.java +++ b/core/src/main/java/com/muwire/core/Constants.java @@ -5,6 +5,8 @@ import net.i2p.crypto.SigType; public class Constants { public static final byte PERSONA_VERSION = (byte)1; public static final byte FILE_CERT_VERSION = (byte)2; + public static final int CHAT_VERSION = 1; + public static final SigType SIG_TYPE = SigType.EdDSA_SHA512_Ed25519; public static final int MAX_HEADER_SIZE = 0x1 << 14;