From e2acf7fddc08bab5b8e4beaab2dd71b1318e75d2 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Fri, 24 May 2019 13:06:42 +0100 Subject: [PATCH] wip on handling search results --- .../main/groovy/com/muwire/core/Core.groovy | 32 +++++++++++++++++ .../muwire/core/connection/Connection.groovy | 17 +++++++++ .../core/connection/PeerConnection.groovy | 1 + .../muwire/core/files/HasherService.groovy | 13 +++---- .../com/muwire/core/search/QueryEvent.groovy | 3 +- .../muwire/core/search/ResultsSender.groovy | 14 ++++++++ .../muwire/core/search/SearchManager.groovy | 35 +++++++++++++++++++ .../core/files/HasherServiceTest.groovy | 9 +++-- 8 files changed, 115 insertions(+), 9 deletions(-) create mode 100644 core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy create mode 100644 core/src/main/groovy/com/muwire/core/search/SearchManager.groovy diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index 28881e3c..9ef87367 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -11,9 +11,21 @@ import com.muwire.core.connection.I2PAcceptor import com.muwire.core.connection.I2PConnector import com.muwire.core.connection.LeafConnectionManager import com.muwire.core.connection.UltrapeerConnectionManager +import com.muwire.core.files.FileDownloadedEvent +import com.muwire.core.files.FileHashedEvent +import com.muwire.core.files.FileHasher +import com.muwire.core.files.FileLoadedEvent +import com.muwire.core.files.FileManager +import com.muwire.core.files.FileSharedEvent +import com.muwire.core.files.FileUnsharedEvent +import com.muwire.core.files.HasherService import com.muwire.core.hostcache.CacheClient import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostDiscoveredEvent +import com.muwire.core.search.QueryEvent +import com.muwire.core.search.ResultsEvent +import com.muwire.core.search.ResultsSender +import com.muwire.core.search.SearchManager import com.muwire.core.trust.TrustEvent import com.muwire.core.trust.TrustService @@ -143,6 +155,26 @@ class Core { I2PConnector i2pConnector = new I2PConnector(socketManager) ConnectionEstablisher connector = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) connector.start() + + log.info("initializing hasher service") + HasherService hasherService = new HasherService(new FileHasher(), eventBus) + eventBus.register(FileSharedEvent.class, hasherService) + hasherService.start() + + log.info "initializing file manager" + FileManager fileManager = new FileManager(eventBus) + eventBus.register(FileHashedEvent.class, fileManager) + eventBus.register(FileLoadedEvent.class, fileManager) + eventBus.register(FileDownloadedEvent.class, fileManager) + eventBus.register(FileUnsharedEvent.class, fileManager) + + log.info "initializing results sender" + ResultsSender resultsSender = new ResultsSender() + + log.info "initializing search manager" + SearchManager searchManager = new SearchManager(eventBus, resultsSender) + eventBus.register(QueryEvent.class, searchManager) + eventBus.register(ResultsEvent.class, searchManager) // ... at the end, sleep or execute script if (args.length == 0) { diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index e3dad356..e990e8de 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -8,6 +8,8 @@ import java.util.logging.Level import com.muwire.core.EventBus import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostDiscoveredEvent +import com.muwire.core.search.QueryEvent +import com.muwire.core.search.SearchEvent import groovy.util.logging.Log import net.i2p.data.Destination @@ -128,4 +130,19 @@ abstract class Connection implements Closeable { eventBus.publish(new HostDiscoveredEvent(destination: dest)) } } + + protected void handleSearch(def search) { + UUID uuid = UUID.fromString(search.uuid) + if (search.infohash != null) + search.keywords = null + Destination replyTo = new Destination(search.replyTo) + SearchEvent searchEvent = new SearchEvent(searchTerms : search.keywords, + searchHash : search.infohash, + uuid : uuid) + QueryEvent event = new QueryEvent ( searchEvent : searchEvent, + replyTo : replyTo, + receivedOn : endpoint.destination ) + eventBus.publish(event) + + } } diff --git a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy index c88b70b6..df97a84d 100644 --- a/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/PeerConnection.groovy @@ -52,6 +52,7 @@ class PeerConnection extends Connection { switch(json.type) { case "Ping" : handlePing(); break; case "Pong" : handlePong(json); break; + case "Search": handleSearch(json); break default : throw new Exception("unknown json type ${json.type}") } diff --git a/core/src/main/groovy/com/muwire/core/files/HasherService.groovy b/core/src/main/groovy/com/muwire/core/files/HasherService.groovy index fd570d15..e5829086 100644 --- a/core/src/main/groovy/com/muwire/core/files/HasherService.groovy +++ b/core/src/main/groovy/com/muwire/core/files/HasherService.groovy @@ -3,17 +3,18 @@ package com.muwire.core.files import java.util.concurrent.Executor import java.util.concurrent.Executors +import com.muwire.core.EventBus import com.muwire.core.SharedFile class HasherService { final FileHasher hasher - final def listener + final EventBus eventBus Executor executor - HasherService(FileHasher hasher, def listener) { + HasherService(FileHasher hasher, EventBus eventBus) { this.hasher = hasher - this.listener = listener + this.eventBus = eventBus } void start() { @@ -30,12 +31,12 @@ class HasherService { f.listFiles().each {onFileSharedEvent new FileSharedEvent(file: it) } } else { if (f.length() == 0) { - listener.publish new FileHashedEvent(error: "Not sharing empty file $f") + eventBus.publish new FileHashedEvent(error: "Not sharing empty file $f") } else if (f.length() > FileHasher.MAX_SIZE) { - listener.publish new FileHashedEvent(error: "$f is too large to be shared ${f.length()}") + eventBus.publish new FileHashedEvent(error: "$f is too large to be shared ${f.length()}") } else { def hash = hasher.hashFile f - listener.publish new FileHashedEvent(sharedFile: new SharedFile(f, hash)) + eventBus.publish new FileHashedEvent(sharedFile: new SharedFile(f, hash)) } } } diff --git a/core/src/main/groovy/com/muwire/core/search/QueryEvent.groovy b/core/src/main/groovy/com/muwire/core/search/QueryEvent.groovy index 4e532683..4e6023f5 100644 --- a/core/src/main/groovy/com/muwire/core/search/QueryEvent.groovy +++ b/core/src/main/groovy/com/muwire/core/search/QueryEvent.groovy @@ -2,8 +2,9 @@ package com.muwire.core.search import net.i2p.data.Destination -class QueryEvent extends SearchEvent { +class QueryEvent { + SearchEvent searchEvent boolean firstHop Destination replyTo Destination receivedOn diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy new file mode 100644 index 00000000..158aa4da --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy @@ -0,0 +1,14 @@ +package com.muwire.core.search + +import com.muwire.core.SharedFile + +import groovy.util.logging.Log +import net.i2p.data.Destination + +@Log +class ResultsSender { + + void sendResults(UUID uuid, SharedFile[] results, Destination target) { + log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()}") + } +} diff --git a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy new file mode 100644 index 00000000..e7ff6737 --- /dev/null +++ b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy @@ -0,0 +1,35 @@ +package com.muwire.core.search + +import com.muwire.core.EventBus + +import groovy.util.logging.Log +import net.i2p.data.Destination + +@Log +public class SearchManager { + + private final EventBus eventBus + private final ResultsSender resultsSender + private final Map responderAddress = new HashMap<>() + + SearchManager(EventBus eventBus, ResultsSender resultsSender) { + this.eventBus = eventBus + this.resultsSender = resultsSender + } + + void onQueryEvent(QueryEvent event) { + responderAddress.put(event.searchEvent.uuid, event.replyTo) + eventBus.publish(event.searchEvent) + } + + void onResultsEvent(ResultsEvent event) { + Destination target = responderAddress.get(event.uuid) + if (target == null) + throw new IllegalStateException("UUID unknown $event.uuid") + if (event.results.length == 0) { + log.info("No results for search uuid $event.uuid") + return + } + resultsSender.sendResults(event.uuid, event.results, target) + } +} diff --git a/core/src/test/groovy/com/muwire/core/files/HasherServiceTest.groovy b/core/src/test/groovy/com/muwire/core/files/HasherServiceTest.groovy index 7944d978..d15d8888 100644 --- a/core/src/test/groovy/com/muwire/core/files/HasherServiceTest.groovy +++ b/core/src/test/groovy/com/muwire/core/files/HasherServiceTest.groovy @@ -7,20 +7,25 @@ import org.junit.After import org.junit.Before import org.junit.Test +import com.muwire.core.EventBus + class HasherServiceTest { HasherService service FileHasher hasher + EventBus eventBus def listener = new ArrayBlockingQueue(100) { - void publish(def evt) { + void onFileHashedEvent(FileHashedEvent evt) { offer evt } } @Before void before() { + eventBus = new EventBus() hasher = new FileHasher() - service = new HasherService(hasher, listener) + service = new HasherService(hasher, eventBus) + eventBus.register(FileHashedEvent.class, listener) service.start() }