diff --git a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy index d48cf891..648a5fa7 100644 --- a/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/ConnectionEstablisher.groovy @@ -3,14 +3,19 @@ package com.muwire.core.connection import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.ThreadFactory +import java.util.zip.DeflaterInputStream +import java.util.zip.DeflaterOutputStream +import java.util.zip.InflaterInputStream import com.muwire.core.EventBus import com.muwire.core.MuWireSettings import com.muwire.core.hostcache.HostCache +import groovy.util.logging.Log import net.i2p.data.Destination import net.i2p.util.ConcurrentHashSet +@Log class ConnectionEstablisher { private static final int CONCURRENT = 4 @@ -74,6 +79,71 @@ class ConnectionEstablisher { } private void connect(Destination toTry) { - // TODO: implement + log.info("starting connect to ${toTry.toBase32()}") + try { + def endpoint = i2pConnector.connect(toTry) + log.info("successful transport connect to ${toTry.toBase32()}") + + // outgoing handshake + endpoint.outputStream.write("MuWire ".bytes) + def type = settings.isLeaf() ? "leaf" : "peer" + endpoint.outputStream.write(type.bytes) + endpoint.outputStream.flush() + + InputStream is = endpoint.inputStream + int read = is.read() + if (read == -1) { + fail endpoint + return + } + switch(read) { + case 'O': readK(endpoint); break + case 'R': readEJECT(endpoint); break + default : + log.warning("unknown response $read") + fail endpoint + } + } catch (Exception e) { + log.warning("Couldn't connect to ${toTry.toBase32()}", e) + def endpoint = new Endpoint(toTry, null, null) + fail(endpoint) + } finally { + inProgress.remove(toTry) + } + } + + private void fail(Endpoint endpoint) { + endpoint.close() + eventBus.publish(new ConnectionEvent(endpoint: endpoint, incoming: false, status: ConnectionAttemptStatus.FAILED)) + } + + private void readK(Endpoint e) { + int read = e.inputStream.read() + if (read != 'K') { + log.warning("unknown response after O: $read") + fail e + return + } + + // wrap into deflater / inflater streams and publish + def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream)) + eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, status: ConnectionAttemptStatus.SUCCESSFUL)) + } + + private void readEJECT(Endpoint e) { + byte[] eject = "EJECT".bytes + for (int i = 0; i < eject.length; i++) { + int read = e.inputStream.read() + if (read != eject[i]) { + log.warning("Unknown response after R at position $i") + fail e + return + } + } + + // can publish the rejected event now + eventBus.publish(new ConnectionEvent(endpoint: e, incoming: false, status: ConnectionAttemptStatus.REJECTED)) + + // TODO: read the json for suggested peers if present } } diff --git a/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy b/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy index 24eaa8c5..9b841b1b 100644 --- a/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy +++ b/core/src/main/groovy/com/muwire/core/connection/Endpoint.groovy @@ -1,15 +1,35 @@ package com.muwire.core.connection +import java.util.concurrent.atomic.AtomicBoolean + +import groovy.util.logging.Log import net.i2p.data.Destination -class Endpoint { +@Log +class Endpoint implements Closeable { final Destination destination final InputStream inputStream final OutputStream outputStream + private final AtomicBoolean closed = new AtomicBoolean() + Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream) { this.destination = destination this.inputStream = inputStream this.outputStream = outputStream } + + @Override + public void close() { + if (!closed.compareAndSet(false, true)) { + log.warning("Close loop detected for ${destination.toBase32()}", new Exception()) + return + } + if (inputStream != null) { + try {inputStream.close()} catch (Exception ignore) {} + } + if (outputStream != null) { + try {outputStream.close()} catch (Exception ignore) {} + } + } }