From 36c1a1a28804f484ce48d65d24b5e45aea4f0067 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Mon, 4 Nov 2019 17:17:57 +0000 Subject: [PATCH] core side of certificate exchange --- .../main/groovy/com/muwire/core/Core.groovy | 5 +- .../core/connection/ConnectionAcceptor.groovy | 64 +++++++++++++- .../core/filecert/CertificateClient.groovy | 88 +++++++++++++++++++ .../filecert/CertificateFetchEvent.groovy | 8 ++ .../core/filecert/CertificateFetchStatus.java | 5 ++ .../filecert/CertificateFetchedEvent.groovy | 7 ++ .../core/filecert/CertificateManager.groovy | 65 +++++++++----- .../filecert/UIFetchCertificatesEvent.groovy | 10 +++ .../filecert/UIImportCertificateEvent.groovy | 7 ++ .../muwire/core/search/ResultsParser.groovy | 7 +- .../muwire/core/search/ResultsSender.groovy | 18 ++-- .../muwire/core/search/UIResultEvent.groovy | 1 + 12 files changed, 254 insertions(+), 31 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/filecert/CertificateClient.groovy create mode 100644 core/src/main/groovy/com/muwire/core/filecert/CertificateFetchEvent.groovy create mode 100644 core/src/main/groovy/com/muwire/core/filecert/CertificateFetchStatus.java create mode 100644 core/src/main/groovy/com/muwire/core/filecert/CertificateFetchedEvent.groovy create mode 100644 core/src/main/groovy/com/muwire/core/filecert/UIFetchCertificatesEvent.groovy create mode 100644 core/src/main/groovy/com/muwire/core/filecert/UIImportCertificateEvent.groovy diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 2a8a042d..ddb72694 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -270,7 +270,7 @@ public class Core { I2PConnector i2pConnector = new I2PConnector(socketManager) log.info "initializing results sender" - ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props) + ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me, props, certificateManager) log.info "initializing search manager" SearchManager searchManager = new SearchManager(eventBus, me, resultsSender) @@ -296,7 +296,8 @@ public class Core { log.info("initializing acceptor") I2PAcceptor i2pAcceptor = new I2PAcceptor(socketManager) connectionAcceptor = new ConnectionAcceptor(eventBus, connectionManager, props, - i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher) + i2pAcceptor, hostCache, trustService, searchManager, uploadManager, fileManager, connectionEstablisher, + certificateManager) log.info("initializing directory watcher") directoryWatcher = new DirectoryWatcher(eventBus, fileManager, home, props) diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index 2d725003..7b19a98d 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -1,6 +1,7 @@ package com.muwire.core.connection import java.nio.charset.StandardCharsets +import java.nio.file.attribute.DosFileAttributes import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.logging.Level @@ -11,8 +12,11 @@ import java.util.zip.InflaterInputStream import com.muwire.core.Constants import com.muwire.core.EventBus +import com.muwire.core.InfoHash import com.muwire.core.MuWireSettings import com.muwire.core.Persona +import com.muwire.core.filecert.Certificate +import com.muwire.core.filecert.CertificateManager import com.muwire.core.files.FileManager import com.muwire.core.hostcache.HostCache import com.muwire.core.trust.TrustLevel @@ -45,6 +49,7 @@ class ConnectionAcceptor { final UploadManager uploadManager final FileManager fileManager final ConnectionEstablisher establisher + final CertificateManager certificateManager final ExecutorService acceptorThread final ExecutorService handshakerThreads @@ -56,7 +61,7 @@ class ConnectionAcceptor { ConnectionAcceptor(EventBus eventBus, UltrapeerConnectionManager manager, MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache, TrustService trustService, SearchManager searchManager, UploadManager uploadManager, - FileManager fileManager, ConnectionEstablisher establisher) { + FileManager fileManager, ConnectionEstablisher establisher, CertificateManager certificateManager) { this.eventBus = eventBus this.manager = manager this.settings = settings @@ -67,6 +72,7 @@ class ConnectionAcceptor { this.fileManager = fileManager this.uploadManager = uploadManager this.establisher = establisher + this.certificateManager = certificateManager acceptorThread = Executors.newSingleThreadExecutor { r -> def rv = new Thread(r) @@ -145,6 +151,9 @@ class ConnectionAcceptor { case (byte)'B': processBROWSE(e) break + case (byte)'C': + processCERTIFICATES(e) + break default: throw new Exception("Invalid read $read") } @@ -353,7 +362,8 @@ class ConnectionAcceptor { JsonOutput jsonOutput = new JsonOutput() sharedFiles.each { it.hit() - def obj = ResultsSender.sharedFileToObj(it, false) + int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size() + def obj = ResultsSender.sharedFileToObj(it, false, certificates) def json = jsonOutput.toJson(obj) dos.writeShort((short)json.length()) dos.write(json.getBytes(StandardCharsets.US_ASCII)) @@ -406,5 +416,55 @@ class ConnectionAcceptor { e.close() } } + + private void processCERTIFICATES(Endpoint e) { + try { + byte [] ERTIFICATES = new byte[12] + DataInputStream dis = new DataInputStream(e.getInputStream()) + dis.readFully(ERTIFICATES) + if (ERTIFICATES != "ERTIFICATES ".getBytes(StandardCharsets.US_ASCII)) + throw new IOException("Invalid CERTIFICATES connection") + + byte [] infoHashStringBytes = new byte[44] + dis.readFully(infoHashStringBytes) + String infoHashString = new String(infoHashStringBytes, StandardCharsets.US_ASCII) + + byte[] rn = new byte[2] + dis.readFully(rn) + if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII)) + throw new IOException("Malformed CERTIFICATES request") + + String header + while ((header = DataUtil.readTillRN(dis)) != ""); // ignore headers for now + + log.info("responding to certificates request for $infoHashString") + byte [] root = Base64.decode(infoHashString) + + Set certs = certificateManager.getByInfoHash(new InfoHash(root)) + if (certs.isEmpty()) { + log.info("certs not found") + e.getOutputStream().write("404 Certs Not Found\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + e.getOutputStream().flush() + return + } + + OutputStream os = e.getOutputStream() + os.write("200 OK\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("Count: ${certs.size()}\r\n".getBytes(StandardCharsets.US_ASCII)) + os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + + DataOutputStream dos = new DataOutputStream(os) + certs.each { + ByteArrayOutputStream baos = new ByteArrayOutputStream() + it.write(baos) + byte [] payload = baos.toByteArray() + dos.writeShort(payload.length) + dos.write(payload) + } + dos.close() + } finally { + e.close() + } + } } diff --git a/core/src/main/groovy/com/muwire/core/filecert/CertificateClient.groovy b/core/src/main/groovy/com/muwire/core/filecert/CertificateClient.groovy new file mode 100644 index 00000000..1a33bea3 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filecert/CertificateClient.groovy @@ -0,0 +1,88 @@ +package com.muwire.core.filecert + +import java.nio.charset.StandardCharsets +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors +import java.util.logging.Level + +import net.i2p.data.Base64 + +import com.muwire.core.Constants +import com.muwire.core.EventBus +import com.muwire.core.InvalidSignatureException +import com.muwire.core.connection.Endpoint +import com.muwire.core.connection.I2PConnector +import com.muwire.core.util.DataUtil + +import groovy.util.logging.Log + +@Log +class CertificateClient { + private final EventBus eventBus + private final I2PConnector connector + + private final ExecutorService fetcherThread = Executors.newSingleThreadExecutor() + + CertificateClient(EventBus eventBus, I2PConnector connector) { + this.eventBus = eventBus + this.connector = connector + } + + void onUIFetchCertificatesEvent(UIFetchCertificatesEvent e) { + fetcherThread.execute({ + Endpoint endpoint = null + try { + eventBus.publish(new CertificateFetchEvent(status : CertificateFetchStatus.CONNECTING)) + endpoint = connector.connect(e.host.destination) + + String infoHashString = Base64.encode(e.infoHash.getRoot()) + OutputStream os = endpoint.getOutputStream() + os.write("CERTIFICATES ${infoHashString}\r\n\r\n".getBytes(StandardCharsets.US_ASCII)) + + InputStream is = endpoint.getInputStream() + String code = DataUtil.readTillRN(is) + if (!code.startsWith("200")) + throw new IOException("invalid code $code") + + // parse all headers + Map headers = new HashMap<>() + String header + while((header = DataUtil.readTillRN(is)) != "" && headers.size() < Constants.MAX_HEADERS) { + int colon = header.indexOf(':') + if (colon == -1 || colon == header.length() - 1) + throw new IOException("invalid header $header") + String key = header.substring(0, colon) + String value = header.substring(colon + 1) + headers[key] = value.trim() + } + + if (!headers.containsKey("Count")) + throw new IOException("No count header") + + int count = Integer.parseInt(headers['Count']) + + // start pulling the certs + eventBus.publish(new CertificateFetchEvent(status : CertificateFetchStatus.FETCHING, count : count)) + + DataInputStream dis = new DataInputStream(is) + for (int i = 0; i < count; i++) { + int size = dis.readUnsignedShort() + byte [] tmp = new byte[size] + dis.readFully(tmp) + Certificate cert = null + try { + cert = new Certificate(new ByteArrayInputStream(tmp)) + } catch (IOException | InvalidSignatureException ignore) { + continue + } + eventBus.publish(new CertificateFetchedEvent(certificate : cert)) + } + } catch (Exception bad) { + log.log(Level.WARNING,"Fetching certificates failed", bad) + eventBus.publish(new CertificateFetchEvent(status : CertificateFetchStatus.FAILED)) + } finally { + endpoint?.close() + } + }) + } +} diff --git a/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchEvent.groovy b/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchEvent.groovy new file mode 100644 index 00000000..29e93391 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchEvent.groovy @@ -0,0 +1,8 @@ +package com.muwire.core.filecert + +import com.muwire.core.Event + +class CertificateFetchEvent extends Event { + CertificateFetchStatus status + int count +} diff --git a/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchStatus.java b/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchStatus.java new file mode 100644 index 00000000..454de936 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchStatus.java @@ -0,0 +1,5 @@ +package com.muwire.core.filecert; + +public enum CertificateFetchStatus { + CONNECTING, FETCHING, DONE, FAILED +} diff --git a/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchedEvent.groovy b/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchedEvent.groovy new file mode 100644 index 00000000..869673cc --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filecert/CertificateFetchedEvent.groovy @@ -0,0 +1,7 @@ +package com.muwire.core.filecert + +import com.muwire.core.Event + +class CertificateFetchedEvent extends Event { + Certificate certificate +} diff --git a/core/src/main/groovy/com/muwire/core/filecert/CertificateManager.groovy b/core/src/main/groovy/com/muwire/core/filecert/CertificateManager.groovy index f36f1c31..ca2f1870 100644 --- a/core/src/main/groovy/com/muwire/core/filecert/CertificateManager.groovy +++ b/core/src/main/groovy/com/muwire/core/filecert/CertificateManager.groovy @@ -12,6 +12,7 @@ import com.muwire.core.Persona import groovy.util.logging.Log import net.i2p.data.Base64 import net.i2p.data.SigningPrivateKey +import net.i2p.util.ConcurrentHashSet @Log class CertificateManager { @@ -51,14 +52,14 @@ class CertificateManager { Set existing = byInfoHash.get(cert.infoHash) if (existing == null) { - existing = new HashSet<>() + existing = new ConcurrentHashSet<>() byInfoHash.put(cert.infoHash, existing) } existing.add(cert) existing = byIssuer.get(cert.issuer) if (existing == null) { - existing = new HashSet<>() + existing = new ConcurrentHashSet<>() byIssuer.put(cert.issuer, existing) } existing.add(cert) @@ -73,30 +74,45 @@ class CertificateManager { long timestamp = System.currentTimeMillis() Certificate cert = new Certificate(infoHash, name, timestamp, me, spk) - boolean added = true - Set existing = byInfoHash.get(cert.infoHash) - if (existing == null) { - existing = new HashSet<>() - byInfoHash.put(cert.infoHash, existing) - } - added &= existing.add(cert) - - existing = byIssuer.get(cert.issuer) - if (existing == null) { - existing = new HashSet<>() - byIssuer.put(cert.issuer, existing) - } - added &= existing.add(cert) - - if (added) { - String infoHashString = Base64.encode(infoHash.getRoot()) - File certFile = new File(certDir, "${infoHashString}${name}.mwcert") - certFile.withOutputStream { cert.write(it) } + if (addToMaps(cert)) { + saveCert(cert) eventBus.publish(new CertificateCreatedEvent(certificate : cert)) } } + + void onUIImportCertificateEvent(UIImportCertificateEvent e) { + Certificate cert = e.certificate + if (!addToMaps(cert)) + return + saveCert(cert) + } + private void saveCert(Certificate cert) { + String infoHashString = Base64.encode(cert.infoHash.getRoot()) + File certFile = new File(certDir, "${infoHashString}_${cert.issuer.getHumanReadableName()}.mwcert") + certFile.withOutputStream { cert.write(it) } + } + + private boolean addToMaps(Certificate cert) { + boolean added = true + + Set existing = byInfoHash.get(cert.infoHash) + if (existing == null) { + existing = new ConcurrentHashSet<>() + byInfoHash.put(cert.infoHash, existing) + } + added &= existing.add(cert) + + existing = byIssuer.get(cert.issuer) + if (existing == null) { + existing = new ConcurrentHashSet<>() + byIssuer.put(cert.issuer, existing) + } + added &= existing.add(cert) + added + } + boolean hasLocalCertificate(InfoHash infoHash) { if (!byInfoHash.containsKey(infoHash)) return false @@ -106,5 +122,12 @@ class CertificateManager { return true } return false + } + + Set getByInfoHash(InfoHash infoHash) { + Set rv = new HashSet<>() + if (byInfoHash.containsKey(infoHash)) + rv.addAll(byInfoHash.get(infoHash)) + rv } } diff --git a/core/src/main/groovy/com/muwire/core/filecert/UIFetchCertificatesEvent.groovy b/core/src/main/groovy/com/muwire/core/filecert/UIFetchCertificatesEvent.groovy new file mode 100644 index 00000000..b46edc18 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filecert/UIFetchCertificatesEvent.groovy @@ -0,0 +1,10 @@ +package com.muwire.core.filecert + +import com.muwire.core.Event +import com.muwire.core.InfoHash +import com.muwire.core.Persona + +class UIFetchCertificatesEvent extends Event { + Persona host + InfoHash infoHash +} diff --git a/core/src/main/groovy/com/muwire/core/filecert/UIImportCertificateEvent.groovy b/core/src/main/groovy/com/muwire/core/filecert/UIImportCertificateEvent.groovy new file mode 100644 index 00000000..a58fae8f --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/filecert/UIImportCertificateEvent.groovy @@ -0,0 +1,7 @@ +package com.muwire.core.filecert + +import com.muwire.core.Event + +class UIImportCertificateEvent extends Event { + Certificate certificate +} diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy index 198c1aee..53993ca2 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy @@ -99,6 +99,10 @@ class ResultsParser { boolean browse = false if (json.browse != null) browse = json.browse + + int certificates = 0 + if (json.certificates != null) + certificates = json.certificates return new UIResultEvent( sender : p, name : name, @@ -108,7 +112,8 @@ class ResultsParser { sources : sources, comment : comment, browse : browse, - uuid: uuid) + uuid: uuid, + certificates : certificates) } catch (Exception e) { throw new InvalidSearchResultException("parsing search result failed",e) } diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy index 763464cb..f1030cc1 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy @@ -3,6 +3,7 @@ package com.muwire.core.search import com.muwire.core.SharedFile import com.muwire.core.connection.Endpoint import com.muwire.core.connection.I2PConnector +import com.muwire.core.filecert.CertificateManager import com.muwire.core.files.FileHasher import com.muwire.core.util.DataUtil import com.muwire.core.Persona @@ -46,12 +47,14 @@ class ResultsSender { private final Persona me private final EventBus eventBus private final MuWireSettings settings + private final CertificateManager certificateManager - ResultsSender(EventBus eventBus, I2PConnector connector, Persona me, MuWireSettings settings) { + ResultsSender(EventBus eventBus, I2PConnector connector, Persona me, MuWireSettings settings, CertificateManager certificateManager) { this.connector = connector; this.eventBus = eventBus this.me = me this.settings = settings + this.certificateManager = certificateManager } void sendResults(UUID uuid, SharedFile[] results, Destination target, boolean oobInfohash, boolean compressedResults) { @@ -70,6 +73,7 @@ class ResultsSender { if (it.getComment() != null) { comment = DataUtil.readi18nString(Base64.decode(it.getComment())) } + int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size() def uiResultEvent = new UIResultEvent( sender : me, name : it.getFile().getName(), size : length, @@ -77,7 +81,8 @@ class ResultsSender { pieceSize : pieceSize, uuid : uuid, sources : suggested, - comment : comment + comment : comment, + certificates : certificates ) uiResultEvents << uiResultEvent } @@ -108,7 +113,8 @@ class ResultsSender { me.write(os) os.writeShort((short)results.length) results.each { - def obj = sharedFileToObj(it, settings.browseFiles) + int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size() + def obj = sharedFileToObj(it, settings.browseFiles, certificates) def json = jsonOutput.toJson(obj) os.writeShort((short)json.length()) os.write(json.getBytes(StandardCharsets.US_ASCII)) @@ -125,9 +131,10 @@ class ResultsSender { os.write("Sender: ${me.toBase64()}\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("Count: $results.length\r\n".getBytes(StandardCharsets.US_ASCII)) os.write("\r\n".getBytes(StandardCharsets.US_ASCII)) + int certificates = certificateManager.getByInfoHash(it.getInfoHash()).size() DataOutputStream dos = new DataOutputStream(new GZIPOutputStream(os)) results.each { - def obj = sharedFileToObj(it, settings.browseFiles) + def obj = sharedFileToObj(it, settings.browseFiles, certificates) def json = jsonOutput.toJson(obj) dos.writeShort((short)json.length()) dos.write(json.getBytes(StandardCharsets.US_ASCII)) @@ -143,7 +150,7 @@ class ResultsSender { } } - public static def sharedFileToObj(SharedFile sf, boolean browseFiles) { + public static def sharedFileToObj(SharedFile sf, boolean browseFiles, int certificates) { byte [] name = sf.getFile().getName().getBytes(StandardCharsets.UTF_8) def baos = new ByteArrayOutputStream() def daos = new DataOutputStream(baos) @@ -166,6 +173,7 @@ class ResultsSender { obj.comment = sf.getComment() obj.browse = browseFiles + obj.certificates = certificates obj } } diff --git a/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy b/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy index 01ecf573..7a148744 100644 --- a/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/search/UIResultEvent.groovy @@ -16,6 +16,7 @@ class UIResultEvent extends Event { int pieceSize String comment boolean browse + int certificates @Override public String toString() {