wip on receiving results

This commit is contained in:
Zlatin Balevsky
2019-05-24 17:44:41 +01:00
parent d36f41d563
commit 0a5c5ad520
10 changed files with 148 additions and 35 deletions

View File

@@ -147,13 +147,23 @@ class Core {
CacheClient cacheClient = new CacheClient(eventBus,hostCache, connectionManager, i2pSession, props, 10000) CacheClient cacheClient = new CacheClient(eventBus,hostCache, connectionManager, i2pSession, props, 10000)
cacheClient.start() cacheClient.start()
log.info("initializing acceptor")
I2PAcceptor i2pAcceptor = new I2PAcceptor(socketManager)
ConnectionAcceptor acceptor = new ConnectionAcceptor(eventBus, connectionManager, props, i2pAcceptor, hostCache, trustService)
acceptor.start()
log.info("initializing connector") log.info("initializing connector")
I2PConnector i2pConnector = new I2PConnector(socketManager) I2PConnector i2pConnector = new I2PConnector(socketManager)
log.info "initializing results sender"
ResultsSender resultsSender = new ResultsSender(eventBus, i2pConnector, me)
log.info "initializing search manager"
SearchManager searchManager = new SearchManager(eventBus, resultsSender)
eventBus.register(QueryEvent.class, searchManager)
eventBus.register(ResultsEvent.class, searchManager)
log.info("initializing acceptor")
I2PAcceptor i2pAcceptor = new I2PAcceptor(socketManager)
ConnectionAcceptor acceptor = new ConnectionAcceptor(eventBus, connectionManager, props, i2pAcceptor, hostCache, trustService, searchManager)
acceptor.start()
ConnectionEstablisher connector = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache) ConnectionEstablisher connector = new ConnectionEstablisher(eventBus, i2pConnector, props, connectionManager, hostCache)
connector.start() connector.start()
@@ -170,13 +180,6 @@ class Core {
eventBus.register(FileUnsharedEvent.class, fileManager) eventBus.register(FileUnsharedEvent.class, fileManager)
eventBus.register(SearchEvent.class, fileManager) eventBus.register(SearchEvent.class, fileManager)
log.info "initializing results sender"
ResultsSender resultsSender = new ResultsSender(eventBus, me)
log.info "initializing search manager"
SearchManager searchManager = new SearchManager(eventBus, resultsSender)
eventBus.register(QueryEvent.class, searchManager)
eventBus.register(ResultsEvent.class, searchManager)
// ... at the end, sleep or execute script // ... at the end, sleep or execute script
if (args.length == 0) { if (args.length == 0) {

View File

@@ -1,5 +1,6 @@
package com.muwire.core.connection package com.muwire.core.connection
import java.nio.charset.StandardCharsets
import java.util.concurrent.ExecutorService import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors import java.util.concurrent.Executors
import java.util.logging.Level import java.util.logging.Level
@@ -8,11 +9,17 @@ import java.util.zip.InflaterInputStream
import com.muwire.core.EventBus import com.muwire.core.EventBus
import com.muwire.core.MuWireSettings import com.muwire.core.MuWireSettings
import com.muwire.core.Persona
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.trust.TrustLevel import com.muwire.core.trust.TrustLevel
import com.muwire.core.trust.TrustService import com.muwire.core.trust.TrustService
import com.muwire.core.search.InvalidSearchResultException
import com.muwire.core.search.ResultsParser
import com.muwire.core.search.SearchManager
import com.muwire.core.search.UnexpectedResultsException
import groovy.json.JsonOutput import groovy.json.JsonOutput
import groovy.json.JsonSlurper
import groovy.util.logging.Log import groovy.util.logging.Log
@Log @Log
@@ -24,19 +31,21 @@ class ConnectionAcceptor {
final I2PAcceptor acceptor final I2PAcceptor acceptor
final HostCache hostCache final HostCache hostCache
final TrustService trustService final TrustService trustService
final SearchManager searchManager
final ExecutorService acceptorThread final ExecutorService acceptorThread
final ExecutorService handshakerThreads final ExecutorService handshakerThreads
ConnectionAcceptor(EventBus eventBus, UltrapeerConnectionManager manager, ConnectionAcceptor(EventBus eventBus, UltrapeerConnectionManager manager,
MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache, MuWireSettings settings, I2PAcceptor acceptor, HostCache hostCache,
TrustService trustService) { TrustService trustService, searchManager) {
this.eventBus = eventBus this.eventBus = eventBus
this.manager = manager this.manager = manager
this.settings = settings this.settings = settings
this.acceptor = acceptor this.acceptor = acceptor
this.hostCache = hostCache this.hostCache = hostCache
this.trustService = trustService this.trustService = trustService
this.searchManager = searchManager
acceptorThread = Executors.newSingleThreadExecutor { r -> acceptorThread = Executors.newSingleThreadExecutor { r ->
def rv = new Thread(r) def rv = new Thread(r)
@@ -86,11 +95,16 @@ class ConnectionAcceptor {
int read = is.read() int read = is.read()
switch(read) { switch(read) {
case (byte)'M': case (byte)'M':
if (settings.isLeaf())
throw new IOException("Incoming connection as leaf")
processMuWire(e) processMuWire(e)
break break
case (byte)'G': case (byte)'G':
processGET(e) processGET(e)
break break
case (byte)'P':
processPOST(e)
break
default: default:
throw new Exception("Invalid read $read") throw new Exception("Invalid read $read")
} }
@@ -114,24 +128,13 @@ class ConnectionAcceptor {
DataInputStream dis = new DataInputStream(e.inputStream) DataInputStream dis = new DataInputStream(e.inputStream)
dis.readFully(type) dis.readFully(type)
if (settings.isLeaf()) { if (type == "leaf".bytes)
if (type != "resu".bytes) { handleIncoming(e, true)
throw new IOException("Received incoming non-results connection as leaf") else if (type == "peer".bytes)
} handleIncoming(e, false)
byte [] lts = new byte[3] else
dis.readFully(lts) throw new IOException("unknown connection type $type")
if (lts != "lts".bytes) }
throw new IOException("malformed results connection")
// TODO: hand-off results connection
} else {
if (type == "leaf".bytes)
handleIncoming(e, true)
else if (type == "peer".bytes)
handleIncoming(e, false)
else
throw new IOException("unknown connection type $type")
}
}
private void handleIncoming(Endpoint e, boolean leaf) { private void handleIncoming(Endpoint e, boolean leaf) {
boolean accept = leaf ? manager.hasLeafSlots() : manager.hasPeerSlots() boolean accept = leaf ? manager.hasLeafSlots() : manager.hasPeerSlots()
@@ -165,4 +168,41 @@ class ConnectionAcceptor {
// TODO: implement // TODO: implement
} }
private void processPOST(final Endpoint e) throws IOException {
byte [] ost = new byte[4]
final DataInputStream dis = new DataInputStream(e.getInputStream())
dis.readFully(ost)
if (ost != "OST ".getBytes(StandardCharsets.US_ASCII))
throw new IOException("Invalid POST connection")
handshakerThreads.execute({
JsonSlurper slurper = new JsonSlurper()
try {
byte uuid = new byte[36]
dis.readFully(uuid)
UUID resultsUUID = UUID.fromString(new String(uuid, StandardCharsets.US_ASCII))
if (!searchManager.hasLocalSearch(resultsUUID))
throw new UnexpectedResultsException(resultsUUID.toString())
byte rn = new byte[2]
dis.readFully(rn)
if (rn != "\r\n".getBytes(StandardCharsets.US_ASCII))
throw new IOException("invalid request header")
Persona sender = new Persona(dis)
int nResults = dis.readUnsignedShort()
for (int i = 0; i < nResults; i++) {
int jsonSize = dis.readUnsignedShort()
byte [] payload = new byte[jsonSize]
dis.readFully(payload)
def json = slurper.parse(payload)
eventBus.publish(ResultsParser.parse(sender, json))
}
} catch (IOException | UnexpectedResultsException | InvalidSearchResultException bad) {
log.warning(bad)
} finally {
e.closeQuietly()
}
} as Runnable)
}
} }

View File

@@ -0,0 +1,25 @@
package com.muwire.core.search
class InvalidSearchResultException extends Exception {
public InvalidSearchResultException() {
super();
// TODO Auto-generated constructor stub
}
public InvalidSearchResultException(String message, Throwable cause) {
super(message, cause);
// TODO Auto-generated constructor stub
}
public InvalidSearchResultException(String message) {
super(message);
// TODO Auto-generated constructor stub
}
public InvalidSearchResultException(Throwable cause) {
super(cause);
// TODO Auto-generated constructor stub
}
}

View File

@@ -7,5 +7,4 @@ class ResultsEvent extends Event {
SharedFile[] results SharedFile[] results
UUID uuid UUID uuid
} }

View File

@@ -0,0 +1,9 @@
package com.muwire.core.search
import com.muwire.core.Persona
class ResultsParser {
public static UIResultEvent parse(Persona p, def json) throws InvalidSearchResultException {
null
}
}

View File

@@ -49,7 +49,7 @@ class ResultsSender {
log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()}") log.info("Sending $results.length results for uuid $uuid to ${target.toBase32()}")
if (target.equals(me.destination)) { if (target.equals(me.destination)) {
def resultEvent = new ResultsEvent( uuid : uuid, results : results ) def resultEvent = new ResultsEvent( uuid : uuid, results : results )
def uiResultEvent = new UIResultEvent(resultsEvent : resultEvent) def uiResultEvent = new UIResultEvent(sender: me, resultsEvent : resultEvent)
eventBus.publish(uiResultEvent) eventBus.publish(uiResultEvent)
} else { } else {
executor.execute(new ResultSendJob(uuid : uuid, results : results, target: target)) executor.execute(new ResultSendJob(uuid : uuid, results : results, target: target))

View File

@@ -12,6 +12,8 @@ public class SearchManager {
private final ResultsSender resultsSender private final ResultsSender resultsSender
private final Map<UUID, Destination> responderAddress = new HashMap<>() private final Map<UUID, Destination> responderAddress = new HashMap<>()
SearchManager(){}
SearchManager(EventBus eventBus, ResultsSender resultsSender) { SearchManager(EventBus eventBus, ResultsSender resultsSender) {
this.eventBus = eventBus this.eventBus = eventBus
this.resultsSender = resultsSender this.resultsSender = resultsSender
@@ -32,4 +34,8 @@ public class SearchManager {
} }
resultsSender.sendResults(event.uuid, event.results, target) resultsSender.sendResults(event.uuid, event.results, target)
} }
boolean hasLocalSearch(UUID uuid) {
false
}
} }

View File

@@ -1,7 +1,9 @@
package com.muwire.core.search package com.muwire.core.search
import com.muwire.core.Event import com.muwire.core.Event
import com.muwire.core.Persona
class UIResultEvent extends Event { class UIResultEvent extends Event {
Persona sender
ResultsEvent resultsEvent ResultsEvent resultsEvent
} }

View File

@@ -0,0 +1,22 @@
package com.muwire.core.search
class UnexpectedResultsException extends Exception {
public UnexpectedResultsException() {
super();
}
public UnexpectedResultsException(String message, Throwable cause, boolean enableSuppression,
boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
public UnexpectedResultsException(String message, Throwable cause) {
super(message, cause);
}
public UnexpectedResultsException(String message) {
super(message);
}
}

View File

@@ -14,6 +14,7 @@ import com.muwire.core.MuWireSettings
import com.muwire.core.hostcache.HostCache import com.muwire.core.hostcache.HostCache
import com.muwire.core.trust.TrustLevel import com.muwire.core.trust.TrustLevel
import com.muwire.core.trust.TrustService import com.muwire.core.trust.TrustService
import com.muwire.core.search.SearchManager
import groovy.json.JsonSlurper import groovy.json.JsonSlurper
import groovy.mock.interceptor.MockFor import groovy.mock.interceptor.MockFor
@@ -36,6 +37,9 @@ class ConnectionAcceptorTest {
def trustServiceMock def trustServiceMock
TrustService trustService TrustService trustService
def searchManagerMock
SearchManager searchManager
ConnectionAcceptor acceptor ConnectionAcceptor acceptor
List<ConnectionEvent> connectionEvents List<ConnectionEvent> connectionEvents
InputStream inputStream InputStream inputStream
@@ -47,6 +51,7 @@ class ConnectionAcceptorTest {
i2pAcceptorMock = new MockFor(I2PAcceptor.class) i2pAcceptorMock = new MockFor(I2PAcceptor.class)
hostCacheMock = new MockFor(HostCache.class) hostCacheMock = new MockFor(HostCache.class)
trustServiceMock = new MockFor(TrustService.class) trustServiceMock = new MockFor(TrustService.class)
searchManagerMock = new MockFor(SearchManager.class)
} }
@After @After
@@ -56,6 +61,7 @@ class ConnectionAcceptorTest {
i2pAcceptorMock.verify i2pAcceptor i2pAcceptorMock.verify i2pAcceptor
hostCacheMock.verify hostCache hostCacheMock.verify hostCache
trustServiceMock.verify trustService trustServiceMock.verify trustService
searchManagerMock.verify searchManager
Thread.sleep(100) Thread.sleep(100)
} }
@@ -73,8 +79,9 @@ class ConnectionAcceptorTest {
i2pAcceptor = i2pAcceptorMock.proxyInstance() i2pAcceptor = i2pAcceptorMock.proxyInstance()
hostCache = hostCacheMock.proxyInstance() hostCache = hostCacheMock.proxyInstance()
trustService = trustServiceMock.proxyInstance() trustService = trustServiceMock.proxyInstance()
searchManager = searchManagerMock.proxyInstance()
acceptor = new ConnectionAcceptor(eventBus, connectionManager, settings, i2pAcceptor, hostCache, trustService) acceptor = new ConnectionAcceptor(eventBus, connectionManager, settings, i2pAcceptor, hostCache, trustService, searchManager)
acceptor.start() acceptor.start()
Thread.sleep(100) Thread.sleep(100)
} }