End-to-end working sending and receiving of results

This commit is contained in:
Zlatin Balevsky
2019-05-25 00:34:18 +01:00
parent f4dab915f4
commit 36e1e82fa3
7 changed files with 38 additions and 14 deletions

View File

@@ -141,6 +141,7 @@ class Core {
eventBus.register(TrustEvent.class, connectionManager)
eventBus.register(ConnectionEvent.class, connectionManager)
eventBus.register(DisconnectionEvent.class, connectionManager)
eventBus.register(QueryEvent.class, connectionManager)
connectionManager.start()
log.info("initializing cache client")
@@ -154,7 +155,7 @@ class Core {
ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me)
log.info "initializing search manager"
SearchManager searchManager = new SearchManager(eventBus, resultsSender)
SearchManager searchManager = new SearchManager(eventBus, me, resultsSender)
eventBus.register(QueryEvent.class, searchManager)
eventBus.register(ResultsEvent.class, searchManager)

View File

@@ -115,6 +115,17 @@ abstract class Connection implements Closeable {
lastPingSentTime = System.currentTimeMillis()
}
void sendQuery(QueryEvent e) {
def query = [:]
query.type = "Search"
query.version = 1
query.uuid = e.searchEvent.getUuid()
// TODO: first hop figure out
query.keywords = e.searchEvent.getSearchTerms()
query.replyTo = e.getReceivedOn().toBase64()
messages.put(query)
}
protected void handlePing() {
log.fine("$name received ping")
def pong = [:]

View File

@@ -176,13 +176,13 @@ class ConnectionAcceptor {
throw new IOException("Invalid POST connection")
JsonSlurper slurper = new JsonSlurper()
try {
byte uuid = new byte[36]
byte[] uuid = new byte[36]
dis.readFully(uuid)
UUID resultsUUID = UUID.fromString(new String(uuid, StandardCharsets.US_ASCII))
if (!searchManager.hasLocalSearch(resultsUUID))
throw new UnexpectedResultsException(resultsUUID.toString())
byte rn = new byte[2]
byte[] rn = new byte[2]
dis.readFully(rn)
if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII))
throw new IOException("invalid request header")
@@ -199,9 +199,9 @@ class ConnectionAcceptor {
eventBus.publish(ResultsParser.parse(sender, json))
}
} catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) {
log.warning(bad)
log.log(Level.WARNING, "failed to process POST", bad)
} finally {
e.closeQuietly()
e.close()
}
}

View File

@@ -2,6 +2,7 @@ package com.muwire.core.connection
import com.muwire.core.EventBus
import com.muwire.core.hostcache.HostCache
import com.muwire.core.search.QueryEvent
import com.muwire.core.trust.TrustEvent
import com.muwire.core.trust.TrustLevel
@@ -39,6 +40,13 @@ abstract class ConnectionManager {
drop(e.destination)
}
void onQueryEvent(QueryEvent e) {
getConnections().each {
if (e.getReceivedOn() != it.getEndpoint().getDestination())
it.sendQuery(e)
}
}
abstract void drop(Destination d)
abstract Collection<Connection> getConnections()

View File

@@ -26,11 +26,11 @@ class ResultsParser {
throw new InvalidSearchResultException("hashlist not a list")
try {
String name = DataUtil.readi18nString(Base64.decode(json.name))
long size = Long.parseLong(json.size)
long size = json.size
byte [] infoHash = Base64.decode(json.infohash)
if (infoHash.length != InfoHash.SIZE)
throw new InvalidSearchResultException("invalid infohash size $infoHash.length")
int pieceSize = Integer.parseInt(json.pieceSize)
int pieceSize = json.pieceSize
byte [] hashList = new byte[json.hashList.size() * InfoHash.SIZE]
json.hashList.eachWithIndex { string, index ->
byte [] hashPiece = Base64.decode(string)

View File

@@ -29,7 +29,8 @@ class ResultsSender {
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread rv = new Thread("Results Sender "+THREAD_NO.incrementAndGet(), r)
Thread rv = new Thread(r)
rv.setName("Results Sender "+THREAD_NO.incrementAndGet())
rv.setDaemon(true)
rv
}
@@ -86,11 +87,11 @@ class ResultsSender {
daos.writeShort((short) name.length)
daos.write(name)
daos.flush()
name = Base64.encode(baos.toByteArray())
String encodedName = Base64.encode(baos.toByteArray())
def obj = [:]
obj.type = "Result"
obj.version = 1
obj.name = name
obj.name = encodedName
obj.infohash = Base64.encode(it.getInfoHash().getRoot())
obj.size = it.getFile().length()
obj.pieceSize = FileHasher.getPieceSize(it.getFile().length())
@@ -108,8 +109,7 @@ class ResultsSender {
}
os.flush()
} finally {
if (endpoint != null)
endpoint.closeQuietly()
endpoint?.close()
}
}
}

View File

@@ -1,6 +1,7 @@
package com.muwire.core.search
import com.muwire.core.EventBus
import com.muwire.core.Persona
import groovy.util.logging.Log
import net.i2p.data.Destination
@@ -9,17 +10,20 @@ import net.i2p.data.Destination
public class SearchManager {
private final EventBus eventBus
private final Persona me
private final ResultsSender resultsSender
private final Map<UUID, Destination> responderAddress = new HashMap<>()
SearchManager(){}
SearchManager(EventBus eventBus, ResultsSender resultsSender) {
SearchManager(EventBus eventBus, Persona me, ResultsSender resultsSender) {
this.eventBus = eventBus
this.me = me
this.resultsSender = resultsSender
}
void onQueryEvent(QueryEvent event) {
// TODO: duplicate UUID check
responderAddress.put(event.searchEvent.uuid, event.replyTo)
eventBus.publish(event.searchEvent)
}
@@ -36,6 +40,6 @@ public class SearchManager {
}
boolean hasLocalSearch(UUID uuid) {
false
me.destination.equals(responderAddress.get(uuid))
}
}