close i2p socket when closing connections

This commit is contained in:
Zlatin Balevsky
2019-05-26 13:09:46 +01:00
parent d3538bde39
commit ef39ca3943
9 changed files with 27 additions and 20 deletions

View File

@@ -69,6 +69,8 @@ abstract class Connection implements Closeable {
log.log(Level.WARNING, "$name already closed", new Exception() ) log.log(Level.WARNING, "$name already closed", new Exception() )
return return
} }
log.info("closing $name")
endpoint.close()
reader.interrupt() reader.interrupt()
writer.interrupt() writer.interrupt()
eventBus.publish(new DisconnectionEvent(destination: endpoint.destination)) eventBus.publish(new DisconnectionEvent(destination: endpoint.destination))

View File

@@ -142,7 +142,7 @@ class ConnectionAcceptor {
log.info("accepting connection, leaf:$leaf") log.info("accepting connection, leaf:$leaf")
e.outputStream.write("OK".bytes) e.outputStream.write("OK".bytes)
e.outputStream.flush() e.outputStream.flush()
def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true)) def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true), e.toClose)
eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: true, leaf: leaf, status: ConnectionAttemptStatus.SUCCESSFUL)) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: true, leaf: leaf, status: ConnectionAttemptStatus.SUCCESSFUL))
} else { } else {
log.info("rejecting connection, leaf:$leaf") log.info("rejecting connection, leaf:$leaf")

View File

@@ -110,7 +110,7 @@ class ConnectionEstablisher {
} }
} catch (Exception e) { } catch (Exception e) {
log.log(Level.WARNING, "Couldn't connect to ${toTry.toBase32()}", e) log.log(Level.WARNING, "Couldn't connect to ${toTry.toBase32()}", e)
def endpoint = new Endpoint(toTry, null, null) def endpoint = new Endpoint(toTry, null, null, null)
fail(endpoint) fail(endpoint)
} finally { } finally {
inProgress.remove(toTry) inProgress.remove(toTry)
@@ -133,7 +133,7 @@ class ConnectionEstablisher {
log.info("connection to ${e.destination.toBase32()} established") log.info("connection to ${e.destination.toBase32()} established")
// wrap into deflater / inflater streams and publish // wrap into deflater / inflater streams and publish
def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true)) def wrapped = new Endpoint(e.destination, new InflaterInputStream(e.inputStream), new DeflaterOutputStream(e.outputStream, true), e.toClose)
eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, leaf: false, status: ConnectionAttemptStatus.SUCCESSFUL)) eventBus.publish(new ConnectionEvent(endpoint: wrapped, incoming: false, leaf: false, status: ConnectionAttemptStatus.SUCCESSFUL))
} }

View File

@@ -10,13 +10,15 @@ class Endpoint implements Closeable {
final Destination destination final Destination destination
final InputStream inputStream final InputStream inputStream
final OutputStream outputStream final OutputStream outputStream
final def toClose
private final AtomicBoolean closed = new AtomicBoolean() private final AtomicBoolean closed = new AtomicBoolean()
Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream) { Endpoint(Destination destination, InputStream inputStream, OutputStream outputStream, def toClose) {
this.destination = destination this.destination = destination
this.inputStream = inputStream this.inputStream = inputStream
this.outputStream = outputStream this.outputStream = outputStream
this.toClose = toClose
} }
@Override @Override
@@ -31,6 +33,9 @@ class Endpoint implements Closeable {
if (outputStream != null) { if (outputStream != null) {
try {outputStream.close()} catch (Exception ignore) {} try {outputStream.close()} catch (Exception ignore) {}
} }
if (toClose != null) {
try {toClose.close()} catch (Exception ignore) {}
}
} }
@Override @Override

View File

@@ -17,6 +17,6 @@ class I2PAcceptor {
Endpoint accept() { Endpoint accept() {
def socket = serverSocket.accept() def socket = serverSocket.accept()
new Endpoint(socket.getPeerDestination(), socket.getInputStream(), socket.getOutputStream()) new Endpoint(socket.getPeerDestination(), socket.getInputStream(), socket.getOutputStream(), socket)
} }
} }

View File

@@ -15,7 +15,7 @@ class I2PConnector {
Endpoint connect(Destination dest) { Endpoint connect(Destination dest) {
def socket = socketManager.connect(dest) def socket = socketManager.connect(dest)
new Endpoint(dest, socket.getInputStream(), socket.getOutputStream()) new Endpoint(dest, socket.getInputStream(), socket.getOutputStream(), socket)
} }
} }

View File

@@ -98,7 +98,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
connectionManagerMock.demand.hasLeafSlots() { true } connectionManagerMock.demand.hasLeafSlots() { true }
@@ -136,7 +136,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
connectionManagerMock.demand.hasPeerSlots() { true } connectionManagerMock.demand.hasPeerSlots() { true }
@@ -174,7 +174,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
trustServiceMock.demand.getLevel { dest -> trustServiceMock.demand.getLevel { dest ->
@@ -210,7 +210,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
trustServiceMock.demand.getLevel { dest -> trustServiceMock.demand.getLevel { dest ->
@@ -246,7 +246,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
connectionManagerMock.demand.hasPeerSlots() { false } connectionManagerMock.demand.hasPeerSlots() { false }
@@ -288,7 +288,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
connectionManagerMock.demand.hasLeafSlots() { false } connectionManagerMock.demand.hasLeafSlots() { false }
@@ -330,7 +330,7 @@ class ConnectionAcceptorTest {
outputStream = new PipedOutputStream(is) outputStream = new PipedOutputStream(is)
def os = new PipedOutputStream() def os = new PipedOutputStream()
inputStream = new PipedInputStream(os) inputStream = new PipedInputStream(os)
new Endpoint(destinations.dest1, is, os) new Endpoint(destinations.dest1, is, os, null)
} }
i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) } i2pAcceptorMock.demand.accept { Thread.sleep(Integer.MAX_VALUE) }
connectionManagerMock.demand.hasPeerSlots() { false } connectionManagerMock.demand.hasPeerSlots() { false }

View File

@@ -126,7 +126,7 @@ class ConnectionEstablisherTest {
inputStream = new DataInputStream(new PipedInputStream(os)) inputStream = new DataInputStream(new PipedInputStream(os))
PipedInputStream is = new PipedInputStream() PipedInputStream is = new PipedInputStream()
outputStream = new DataOutputStream(new PipedOutputStream(is)) outputStream = new DataOutputStream(new PipedOutputStream(is))
new Endpoint(dest, is, os) new Endpoint(dest, is, os, null)
} }
initMocks() initMocks()
@@ -169,7 +169,7 @@ class ConnectionEstablisherTest {
inputStream = new DataInputStream(new PipedInputStream(os)) inputStream = new DataInputStream(new PipedInputStream(os))
PipedInputStream is = new PipedInputStream() PipedInputStream is = new PipedInputStream()
outputStream = new DataOutputStream(new PipedOutputStream(is)) outputStream = new DataOutputStream(new PipedOutputStream(is))
new Endpoint(dest, is, os) new Endpoint(dest, is, os, null)
} }
initMocks() initMocks()
@@ -211,7 +211,7 @@ class ConnectionEstablisherTest {
inputStream = new DataInputStream(new PipedInputStream(os)) inputStream = new DataInputStream(new PipedInputStream(os))
PipedInputStream is = new PipedInputStream() PipedInputStream is = new PipedInputStream()
outputStream = new DataOutputStream(new PipedOutputStream(is)) outputStream = new DataOutputStream(new PipedOutputStream(is))
new Endpoint(dest, is, os) new Endpoint(dest, is, os, null)
} }
initMocks() initMocks()
@@ -254,7 +254,7 @@ class ConnectionEstablisherTest {
inputStream = new DataInputStream(new PipedInputStream(os)) inputStream = new DataInputStream(new PipedInputStream(os))
PipedInputStream is = new PipedInputStream() PipedInputStream is = new PipedInputStream()
outputStream = new DataOutputStream(new PipedOutputStream(is)) outputStream = new DataOutputStream(new PipedOutputStream(is))
new Endpoint(dest, is, os) new Endpoint(dest, is, os, null)
} }
initMocks() initMocks()

View File

@@ -143,7 +143,7 @@ class HostCacheTest {
initMocks() initMocks()
cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1)) cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1))
def endpoint = new Endpoint(destinations.dest1, null, null) def endpoint = new Endpoint(destinations.dest1, null, null, null)
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED))
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED))
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED))
@@ -161,7 +161,7 @@ class HostCacheTest {
initMocks() initMocks()
cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1)) cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1))
def endpoint = new Endpoint(destinations.dest1, null, null) def endpoint = new Endpoint(destinations.dest1, null, null, null)
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED))
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED))
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.FAILED))
@@ -186,7 +186,7 @@ class HostCacheTest {
initMocks() initMocks()
cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1)) cache.onHostDiscoveredEvent(new HostDiscoveredEvent(destination: destinations.dest1))
def endpoint = new Endpoint(destinations.dest1, null, null) def endpoint = new Endpoint(destinations.dest1, null, null, null)
cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.SUCCESSFUL)) cache.onConnectionEvent( new ConnectionEvent(endpoint: endpoint, status: ConnectionAttemptStatus.SUCCESSFUL))
def rv = cache.getHosts(5) def rv = cache.getHosts(5)