Make Endpoint implement Closeable, more work on outgoing handshakes
This commit is contained in:
@@ -3,14 +3,19 @@ package com.muwire.core.connection
|
|||||||
import java.util.concurrent.ExecutorService
|
import java.util.concurrent.ExecutorService
|
||||||
import java.util.concurrent.Executors
|
import java.util.concurrent.Executors
|
||||||
import java.util.concurrent.ThreadFactory
|
import java.util.concurrent.ThreadFactory
|
||||||
|
import java.util.zip.DeflaterInputStream
|
||||||
|
import java.util.zip.DeflaterOutputStream
|
||||||
|
import java.util.zip.InflaterInputStream
|
||||||
|
|
||||||
import com.muwire.core.EventBus
|
import com.muwire.core.EventBus
|
||||||
import com.muwire.core.MuWireSettings
|
import com.muwire.core.MuWireSettings
|
||||||
import com.muwire.core.hostcache.HostCache
|
import com.muwire.core.hostcache.HostCache
|
||||||
|
|
||||||
|
import groovy.util.logging.Log
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
import net.i2p.util.ConcurrentHashSet
|
import net.i2p.util.ConcurrentHashSet
|
||||||
|
|
||||||
|
@Log
|
||||||
class ConnectionEstablisher {
|
class ConnectionEstablisher {
|
||||||
|
|
||||||
private static final int CONCURRENT = 4
|
private static final int CONCURRENT = 4
|
||||||
@@ -74,6 +79,71 @@ class ConnectionEstablisher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void connect(Destination toTry) {
|
private void connect(Destination toTry) {
|
||||||
// TODO: implement
|
log.info("starting connect to ${toTry.toBase32()}")
|
||||||
|
try {
|
||||||
|
def endpoint = i2pConnector.connect(toTry)
|
||||||
|
log.info("successful transport connect to ${toTry.toBase32()}")
|
||||||
|
|
||||||
|
// outgoing handshake
|
||||||
|
endpoint.outputStream.write("MuWire ".bytes)
|
||||||
|
def type = settings.isLeaf() ? "leaf" : "peer"
|
||||||
|
endpoint.outputStream.write(type.bytes)
|
||||||
|
endpoint.outputStream.flush()
|
||||||
|
|
||||||
|
InputStream is = endpoint.inputStream
|
||||||
|
int read = is.read()
|
||||||
|
if (read == -1) {
|
||||||
|
fail endpoint
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch(read) {
|
||||||
|
case 'O': readK(endpoint); break
|
||||||
|
case 'R': readEJECT(endpoint); break
|
||||||
|
default :
|
||||||
|
log.warning("unknown response $read")
|
||||||
|
fail endpoint
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.warning("Couldn't connect to ${toTry.toBase32()}", e)
|
||||||
|
def endpoint = new Endpoint(toTry, null, null)
|
||||||
|
fail(endpoint)
|
||||||
|
} finally {
|
||||||
|
inProgress.remove(toTry)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fail(Endpoint endpoint) {
|
||||||
|
endpoint.close()
|
||||||
|
eventBus.publish(new ConnectionEvent(endpoint: endpoint, incoming: false, status: ConnectionAttemptStatus.FAILED))
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readK(Endpoint e) {
|
||||||
|
int read = e.inputStream.read()
|
||||||
|
if (read != 'K') {
|
||||||
|
log.warning("unknown response after O: $read")
|
||||||
|
fail e
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// wrap into deflater / inflater streams and publish
|
||||||
|
def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream))
|
||||||
|
eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, status: ConnectionAttemptStatus.SUCCESSFUL))
|
||||||
|
}
|
||||||
|
|
||||||
|
private void readEJECT(Endpoint e) {
|
||||||
|
byte[] eject = "EJECT".bytes
|
||||||
|
for (int i = 0; i < eject.length; i++) {
|
||||||
|
int read = e.inputStream.read()
|
||||||
|
if (read != eject[i]) {
|
||||||
|
log.warning("Unknown response after R at position $i")
|
||||||
|
fail e
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// can publish the rejected event now
|
||||||
|
eventBus.publish(new ConnectionEvent(endpoint: e, incoming: false, status: ConnectionAttemptStatus.REJECTED))
|
||||||
|
|
||||||
|
// TODO: read the json for suggested peers if present
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,15 +1,35 @@
|
|||||||
package com.muwire.core.connection
|
package com.muwire.core.connection
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
|
||||||
|
import groovy.util.logging.Log
|
||||||
import net.i2p.data.Destination
|
import net.i2p.data.Destination
|
||||||
|
|
||||||
class Endpoint {
|
@Log
|
||||||
|
class Endpoint implements Closeable {
|
||||||
final Destination destination
|
final Destination destination
|
||||||
final InputStream inputStream
|
final InputStream inputStream
|
||||||
final OutputStream outputStream
|
final OutputStream outputStream
|
||||||
|
|
||||||
|
private final AtomicBoolean closed = new AtomicBoolean()
|
||||||
|
|
||||||
Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream) {
|
Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream) {
|
||||||
this.destination = destination
|
this.destination = destination
|
||||||
this.inputStream = inputStream
|
this.inputStream = inputStream
|
||||||
this.outputStream = outputStream
|
this.outputStream = outputStream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
if (!closed.compareAndSet(false, true)) {
|
||||||
|
log.warning("Close loop detected for ${destination.toBase32()}", new Exception())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (inputStream != null) {
|
||||||
|
try {inputStream.close()} catch (Exception ignore) {}
|
||||||
|
}
|
||||||
|
if (outputStream != null) {
|
||||||
|
try {outputStream.close()} catch (Exception ignore) {}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user