diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java index 3bd98e71c774101f7316ba5e9661436d8643b4f9..fc3fd999666d9c19bb67d5cf955b15ce27330fbb 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -14,8 +14,14 @@ import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.NoRouteToHostException; import java.net.SocketTimeoutException; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; import java.security.GeneralSecurityException; import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; import javax.net.ssl.SSLSocket; @@ -30,11 +36,6 @@ import net.i2p.data.Destination; import net.i2p.util.I2PAppThread; import net.i2p.util.I2PSSLSocketFactory; import net.i2p.util.Log; -import java.nio.channels.Channels; -import java.nio.channels.ReadableByteChannel; -import java.nio.channels.WritableByteChannel; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; /** * SAMv3 STREAM session class. @@ -48,7 +49,11 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi private static final int BUFFER_SIZE = 1024 ; private final Object socketServerLock = new Object(); + /** this is ONLY set for FORWARD, not for ACCEPT */ private I2PServerSocket socketServer; + /** this is the count of active ACCEPT sockets */ + private final AtomicInteger _acceptors = new AtomicInteger(); + private static I2PSSLSocketFactory _sslSocketFactory; private final String nick ; @@ -154,6 +159,8 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi /** * Accept a single incoming STREAM on the socket stolen from the handler. + * As of version 3.2 (0.9.24), multiple simultaneous accepts are allowed. + * Accepts and forwarding may not be done at the same time. * * @param handler The handler that communicates with the requesting client * @param verbose If true, SAM will send the Base64-encoded peer Destination of an @@ -170,23 +177,22 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi public void accept(SAMv3Handler handler, boolean verbose) throws I2PException, InterruptedIOException, IOException, SAMException { - synchronized( this.socketServerLock ) - { - if (this.socketServer!=null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("a socket server is already defined for this destination"); - throw new SAMException("a socket server is already defined for this destination"); - } - this.socketServer = this.socketMgr.getServerSocket(); - } - - I2PSocket i2ps = this.socketServer.accept(); + synchronized(this.socketServerLock) { + if (this.socketServer != null) { + if (_log.shouldWarn()) + _log.warn("a forwarding server is already defined for this destination"); + throw new SAMException("a forwarding server is already defined for this destination"); + } + } + + I2PSocket i2ps; + _acceptors.incrementAndGet(); + try { + i2ps = socketMgr.getServerSocket().accept(); + } finally { + _acceptors.decrementAndGet(); + } - synchronized( this.socketServerLock ) - { - this.socketServer = null ; - } - SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if ( rec==null || i2ps==null ) throw new InterruptedIOException() ; @@ -212,7 +218,8 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi /** - * Forward sockets from I2P to the host/port provided + * Forward sockets from I2P to the host/port provided. + * Accepts and forwarding may not be done at the same time. */ public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException { @@ -236,14 +243,17 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi _log.debug("no host specified. Taken from the client socket : " + host +':'+port); } boolean isSSL = Boolean.parseBoolean(props.getProperty("SSL")); - - synchronized( this.socketServerLock ) - { - if (this.socketServer!=null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("a socket server is already defined for this destination"); - throw new SAMException("a socket server is already defined for this destination"); - } + if (_acceptors.get() > 0) { + if (_log.shouldWarn()) + _log.warn("an accepting server is already defined for this destination"); + throw new SAMException("an accepting server is already defined for this destination"); + } + synchronized(this.socketServerLock) { + if (this.socketServer!=null) { + if (_log.shouldWarn()) + _log.warn("a forwarding server is already defined for this destination"); + throw new SAMException("a forwarding server is already defined for this destination"); + } this.socketServer = this.socketMgr.getServerSocket(); } @@ -427,7 +437,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi } } - public I2PServerSocket getSocketServer() + private I2PServerSocket getSocketServer() { synchronized ( this.socketServerLock ) { return this.socketServer ; diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index 05b79e1eeb27e8a67384792cc0b2c552a89fc561..b7a94bfb8049e3fc44e35ca05e4a46063182a82e 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -168,18 +168,22 @@ public class SAMStreamSink { t.start(); } if (_isV3 && mode == STREAM) { - Socket sock2 = connect(isSSL); - out = sock2.getOutputStream(); - eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); - _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); - _reader2.startReading(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Reader2 created"); - String ok = handshake(out, version, false, eventHandler, mode, user, password, ""); - if (ok == null) - throw new IOException("2nd handshake failed"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Handshake2 complete."); + // test multiple acceptors, only works in 3.2 + int acceptors = isV32 ? 4 : 1; + for (int i = 0; i < acceptors; i++) { + Socket sock2 = connect(isSSL); + out = sock2.getOutputStream(); + eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); + _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); + _reader2.startReading(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Reader " + (2 + i) + " created"); + String ok = handshake(out, version, false, eventHandler, mode, user, password, ""); + if (ok == null) + throw new IOException("handshake " + (2 + i) + " failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handshake " + (2 + i) + " complete."); + } } else if (_isV3 && (mode == DG || mode == RAW || mode == RAWHDR)) { // set up a listening DatagramSocket (new DGRcvr(mode)).start(); @@ -622,6 +626,8 @@ public class SAMStreamSink { sinkDir.mkdirs(); File out = File.createTempFile("sink", ".dat", sinkDir); + if (_log.shouldWarn()) + _log.warn("outputting to " + out); _out = new FileOutputStream(out); _started = _context.clock().now(); }