wip on handling search results

This commit is contained in:
Zlatin Balevsky
2019-05-24 13:06:42 +01:00
parent 0fa913ff89
commit e2acf7fddc
8 changed files with 115 additions and 9 deletions

View File

@@ -11,9 +11,21 @@ import com.muwire.core.connection.I2PAcceptor
import com.muwire.core.connection.I2PConnector import com.muwire.core.connection.I2PConnector
import com.muwire.core.connection.LeafConnectionManager import com.muwire.core.connection.LeafConnectionManager
import com.muwire.core.connection.UltrapeerConnectionManager import com.muwire.core.connection.UltrapeerConnectionManager
import com.muwire.core.files.FileDownloadedEvent
import com.muwire.core.files.FileHashedEvent
import com.muwire.core.files.FileHasher
import com.muwire.core.files.FileLoadedEvent
import com.muwire.core.files.FileManager
import com.muwire.core.files.FileSharedEvent
import com.muwire.core.files.FileUnsharedEvent
import com.muwire.core.files.HasherService
import com.muwire.core.hostcache.CacheClient import com.muwire.core.hostcache.CacheClient
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.hostcache.HostDiscoveredEvent import com.muwire.core.hostcache.HostDiscoveredEvent
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.ResultsEvent
import com.muwire.core.search.ResultsSender
import com.muwire.core.search.SearchManager
import com.muwire.core.trust.TrustEvent import com.muwire.core.trust.TrustEvent
import com.muwire.core.trust.TrustService import com.muwire.core.trust.TrustService
@@ -144,6 +156,26 @@ class Core {
ConnectionEstablisher connector = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) ConnectionEstablisher connector = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache)
connector.start() connector.start()
log.info("initializing hasher service")
HasherService hasherService = new HasherService(new FileHasher(), eventBus)
eventBus.register(FileSharedEvent.class, hasherService)
hasherService.start()
log.info "initializing file manager"
FileManager fileManager = new FileManager(eventBus)
eventBus.register(FileHashedEvent.class, fileManager)
eventBus.register(FileLoadedEvent.class, fileManager)
eventBus.register(FileDownloadedEvent.class, fileManager)
eventBus.register(FileUnsharedEvent.class, fileManager)
log.info "initializing results sender"
ResultsSender resultsSender = new ResultsSender()
log.info "initializing search manager"
SearchManager searchManager = new SearchManager(eventBus, resultsSender)
eventBus.register(QueryEvent.class, searchManager)
eventBus.register(ResultsEvent.class, searchManager)
// ... at the end, sleep or execute script // ... at the end, sleep or execute script
if (args.length == 0) { if (args.length == 0) {
log.info("initialized everything, sleeping") log.info("initialized everything, sleeping")

View File

@@ -8,6 +8,8 @@ import java.util.logging.Level
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.hostcache.HostDiscoveredEvent import com.muwire.core.hostcache.HostDiscoveredEvent
import com.muwire.core.search.QueryEvent
import com.muwire.core.search.SearchEvent
import groovy.util.logging.Log import groovy.util.logging.Log
import net.i2p.data.Destination import net.i2p.data.Destination
@@ -128,4 +130,19 @@ abstract class Connection implements Closeable {
eventBus.publish(new HostDiscoveredEvent(destination: dest)) eventBus.publish(new HostDiscoveredEvent(destination: dest))
} }
} }
protected void handleSearch(def search) {
UUID uuid = UUID.fromString(search.uuid)
if (search.infohash != null)
search.keywords = null
Destination replyTo = new Destination(search.replyTo)
SearchEvent searchEvent = new SearchEvent(searchTerms : search.keywords,
searchHash : search.infohash,
uuid : uuid)
QueryEvent event = new QueryEvent ( searchEvent : searchEvent,
replyTo : replyTo,
receivedOn : endpoint.destination )
eventBus.publish(event)
}
} }

View File

@@ -52,6 +52,7 @@ class PeerConnection extends Connection {
switch(json.type) { switch(json.type) {
case "Ping" : handlePing(); break; case "Ping" : handlePing(); break;
case "Pong" : handlePong(json); break; case "Pong" : handlePong(json); break;
case "Search": handleSearch(json); break
default : default :
throw new Exception("unknown json type ${json.type}") throw new Exception("unknown json type ${json.type}")
} }

View File

@@ -3,17 +3,18 @@ package com.muwire.core.files
import java.util.concurrent.Executor import java.util.concurrent.Executor
import java.util.concurrent.Executors import java.util.concurrent.Executors
import com.muwire.core.EventBus
import com.muwire.core.SharedFile import com.muwire.core.SharedFile
class HasherService { class HasherService {
final FileHasher hasher final FileHasher hasher
final def listener final EventBus eventBus
Executor executor Executor executor
HasherService(FileHasher hasher, def listener) { HasherService(FileHasher hasher, EventBus eventBus) {
this.hasher = hasher this.hasher = hasher
this.listener = listener this.eventBus = eventBus
} }
void start() { void start() {
@@ -30,12 +31,12 @@ class HasherService {
f.listFiles().each {onFileSharedEvent new FileSharedEvent(file: it) } f.listFiles().each {onFileSharedEvent new FileSharedEvent(file: it) }
} else { } else {
if (f.length() == 0) { if (f.length() == 0) {
listener.publish new FileHashedEvent(error: "Not sharing empty file $f") eventBus.publish new FileHashedEvent(error: "Not sharing empty file $f")
} else if (f.length() > FileHasher.MAX_SIZE) { } else if (f.length() > FileHasher.MAX_SIZE) {
listener.publish new FileHashedEvent(error: "$f is too large to be shared ${f.length()}") eventBus.publish new FileHashedEvent(error: "$f is too large to be shared ${f.length()}")
} else { } else {
def hash = hasher.hashFile f def hash = hasher.hashFile f
listener.publish new FileHashedEvent(sharedFile: new SharedFile(f, hash)) eventBus.publish new FileHashedEvent(sharedFile: new SharedFile(f, hash))
} }
} }
} }

View File

@@ -2,8 +2,9 @@ package com.muwire.core.search
import net.i2p.data.Destination import net.i2p.data.Destination
class QueryEvent extends SearchEvent { class QueryEvent {
SearchEvent searchEvent
boolean firstHop boolean firstHop
Destination replyTo Destination replyTo
Destination receivedOn Destination receivedOn

View File

@@ -0,0 +1,14 @@
package com.muwire.core.search
import com.muwire.core.SharedFile
import groovy.util.logging.Log
import net.i2p.data.Destination
@Log
class ResultsSender {
void sendResults(UUID uuid, SharedFile[] results, Destination target) {
log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()}")
}
}

View File

@@ -0,0 +1,35 @@
package com.muwire.core.search
import com.muwire.core.EventBus
import groovy.util.logging.Log
import net.i2p.data.Destination
@Log
public class SearchManager {
private final EventBus eventBus
private final ResultsSender resultsSender
private final Map<UUID, Destination> responderAddress = new HashMap<>()
SearchManager(EventBus eventBus, ResultsSender resultsSender) {
this.eventBus = eventBus
this.resultsSender = resultsSender
}
void onQueryEvent(QueryEvent event) {
responderAddress.put(event.searchEvent.uuid, event.replyTo)
eventBus.publish(event.searchEvent)
}
void onResultsEvent(ResultsEvent event) {
Destination target = responderAddress.get(event.uuid)
if (target == null)
throw new IllegalStateException("UUID unknown $event.uuid")
if (event.results.length == 0) {
log.info("No results for search uuid $event.uuid")
return
}
resultsSender.sendResults(event.uuid, event.results, target)
}
}

View File

@@ -7,20 +7,25 @@ import org.junit.After
import org.junit.Before import org.junit.Before
import org.junit.Test import org.junit.Test
import com.muwire.core.EventBus
class HasherServiceTest { class HasherServiceTest {
HasherService service HasherService service
FileHasher hasher FileHasher hasher
EventBus eventBus
def listener = new ArrayBlockingQueue(100) { def listener = new ArrayBlockingQueue(100) {
void publish(def evt) { void onFileHashedEvent(FileHashedEvent evt) {
offer evt offer evt
} }
} }
@Before @Before
void before() { void before() {
eventBus = new EventBus()
hasher = new FileHasher() hasher = new FileHasher()
service = new HasherService(hasher, listener) service = new HasherService(hasher, eventBus)
eventBus.register(FileHashedEvent.class, listener)
service.start() service.start()
} }