From 36e1e82fa35fbeed91c0d19e8d5068d6df0f7ee2 Mon Sep 17 00:00:00 2001 From: Zlatin Balevsky Date: Sat, 25 May 2019 00:34:18 +0100 Subject: [PATCH] End-to-end working sending and receiving of results --- core/src/main/groovy/com/muwire/core/Core.groovy | 3 ++- .../com/muwire/core/connection/Connection.groovy | 11 +++++++++++ .../muwire/core/connection/ConnectionAcceptor.groovy | 8 ++++---- .../muwire/core/connection/ConnectionManager.groovy | 8 ++++++++ .../com/muwire/core/search/ResultsParser.groovy | 4 ++-- .../com/muwire/core/search/ResultsSender.groovy | 10 +++++----- .../com/muwire/core/search/SearchManager.groovy | 8 ++++++-- 7 files changed, 38 insertions(+), 14 deletions(-) diff --git a/core/src/main/groovy/com/muwire/core/Core.groovy b/core/src/main/groovy/com/muwire/core/Core.groovy index c7f4f239..3e9a089c 100644 --- a/core/src/main/groovy/com/muwire/core/Core.groovy +++ b/core/src/main/groovy/com/muwire/core/Core.groovy @@ -141,6 +141,7 @@ class Core { eventBus.register(TrustEvent.class, connectionManager) eventBus.register(ConnectionEvent.class, connectionManager) eventBus.register(DisconnectionEvent.class, connectionManager) + eventBus.register(QueryEvent.class, connectionManager) connectionManager.start() log.info("initializing cache client") @@ -154,7 +155,7 @@ class Core { ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me) log.info "initializing search manager" - SearchManager searchManager = new SearchManager(eventBus, resultsSender) + SearchManager searchManager = new SearchManager(eventBus, me, resultsSender) eventBus.register(QueryEvent.class, searchManager) eventBus.register(ResultsEvent.class, searchManager) diff --git a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy index 1f0b0155..8463fbec 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Connection.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Connection.groovy @@ -114,6 +114,17 @@ abstract class Connection implements Closeable { messages.put(ping) lastPingSentTime = System.currentTimeMillis() } + + void sendQuery(QueryEvent e) { + def query = [:] + query.type = "Search" + query.version = 1 + query.uuid = e.searchEvent.getUuid() + // TODO: first hop figure out + query.keywords = e.searchEvent.getSearchTerms() + query.replyTo = e.getReceivedOn().toBase64() + messages.put(query) + } protected void handlePing() { log.fine("$name received ping") diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy index 9609fcff..4fbb53d6 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionAcceptor.groovy @@ -176,13 +176,13 @@ class ConnectionAcceptor { throw new IOException("Invalid POST connection") JsonSlurper slurper = new JsonSlurper() try { - byte uuid = new byte[36] + byte[] uuid = new byte[36] dis.readFully(uuid) UUID resultsUUID = UUID.fromString(new String(uuid, StandardCharsets.US_ASCII)) if (!searchManager.hasLocalSearch(resultsUUID)) throw new UnexpectedResultsException(resultsUUID.toString()) - byte rn = new byte[2] + byte[] rn = new byte[2] dis.readFully(rn) if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII)) throw new IOException("invalid request header") @@ -199,9 +199,9 @@ class ConnectionAcceptor { eventBus.publish(ResultsParser.parse(sender, json)) } } catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) { - log.warning(bad) + log.log(Level.WARNING, "failed to process POST", bad) } finally { - e.closeQuietly() + e.close() } } diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy index efa96c15..c7b82862 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionManager.groovy @@ -2,6 +2,7 @@ package com.muwire.core.connection import com.muwire.core.EventBus import com.muwire.core.hostcache.HostCache +import com.muwire.core.search.QueryEvent import com.muwire.core.trust.TrustEvent import com.muwire.core.trust.TrustLevel @@ -38,6 +39,13 @@ abstract class ConnectionManager { if (e.level == TrustLevel.DISTRUSTED) drop(e.destination) } + + void onQueryEvent(QueryEvent e) { + getConnections().each { + if (e.getReceivedOn() != it.getEndpoint().getDestination()) + it.sendQuery(e) + } + } abstract void drop(Destination d) diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy index 813e499a..6024bf63 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsParser.groovy @@ -26,11 +26,11 @@ class ResultsParser { throw new InvalidSearchResultException("hashlist not a list") try { String name = DataUtil.readi18nString(Base64.decode(json.name)) - long size = Long.parseLong(json.size) + long size = json.size byte [] infoHash = Base64.decode(json.infohash) if (infoHash.length != InfoHash.SIZE) throw new InvalidSearchResultException("invalid infohash size $infoHash.length") - int pieceSize = Integer.parseInt(json.pieceSize) + int pieceSize = json.pieceSize byte [] hashList = new byte[json.hashList.size() * InfoHash.SIZE] json.hashList.eachWithIndex { string, index -> byte [] hashPiece = Base64.decode(string) diff --git a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy index a1b2b32b..31359679 100644 --- a/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy +++ b/core/src/main/groovy/com/muwire/core/search/ResultsSender.groovy @@ -29,7 +29,8 @@ class ResultsSender { new ThreadFactory() { @Override public Thread newThread(Runnable r) { - Thread rv = new Thread("Results Sender "+THREAD_NO.incrementAndGet(), r) + Thread rv = new Thread(r) + rv.setName("Results Sender "+THREAD_NO.incrementAndGet()) rv.setDaemon(true) rv } @@ -86,11 +87,11 @@ class ResultsSender { daos.writeShort((short) name.length) daos.write(name) daos.flush() - name = Base64.encode(baos.toByteArray()) + String encodedName = Base64.encode(baos.toByteArray()) def obj = [:] obj.type = "Result" obj.version = 1 - obj.name = name + obj.name = encodedName obj.infohash = Base64.encode(it.getInfoHash().getRoot()) obj.size = it.getFile().length() obj.pieceSize = FileHasher.getPieceSize(it.getFile().length()) @@ -108,8 +109,7 @@ class ResultsSender { } os.flush() } finally { - if (endpoint != null) - endpoint.closeQuietly() + endpoint?.close() } } } diff --git a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy index 8719305e..11886f05 100644 --- a/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy +++ b/core/src/main/groovy/com/muwire/core/search/SearchManager.groovy @@ -1,6 +1,7 @@ package com.muwire.core.search import com.muwire.core.EventBus +import com.muwire.core.Persona import groovy.util.logging.Log import net.i2p.data.Destination @@ -9,17 +10,20 @@ import net.i2p.data.Destination public class SearchManager { private final EventBus eventBus + private final Persona me private final ResultsSender resultsSender private final Map responderAddress = new HashMap<>() SearchManager(){} - SearchManager(EventBus eventBus, ResultsSender resultsSender) { + SearchManager(EventBus eventBus, Persona me, ResultsSender resultsSender) { this.eventBus = eventBus + this.me = me this.resultsSender = resultsSender } void onQueryEvent(QueryEvent event) { + // TODO: duplicate UUID check responderAddress.put(event.searchEvent.uuid, event.replyTo) eventBus.publish(event.searchEvent) } @@ -36,6 +40,6 @@ public class SearchManager { } boolean hasLocalSearch(UUID uuid) { - false + me.destination.equals(responderAddress.get(uuid)) } }