diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 0cbde00ce..c11c71549 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -39,6 +39,7 @@ import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.SessionId; import net.i2p.util.I2PThread; +import net.i2p.util.InternalSocket; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; @@ -268,12 +269,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa long startConnect = _context.clock().now(); try { - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "connect begin to " + _hostname + ":" + _portNum); - _socket = new Socket(_hostname, _portNum); + // If we are in the router JVM, connect using the interal pseudo-socket + _socket = InternalSocket.getSocket(_hostname, _portNum); // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. _out = _socket.getOutputStream(); synchronized (_out) { _out.write(I2PClient.PROTOCOL_BYTE); + _out.flush(); } InputStream in = _socket.getInputStream(); _reader = new I2CPMessageReader(in, this); diff --git a/core/java/src/net/i2p/client/I2PSimpleClient.java b/core/java/src/net/i2p/client/I2PSimpleClient.java index 9ce4b8d6f..e91ae2f8b 100644 --- a/core/java/src/net/i2p/client/I2PSimpleClient.java +++ b/core/java/src/net/i2p/client/I2PSimpleClient.java @@ -20,12 +20,12 @@ import net.i2p.data.Destination; * just used to talk to the router. */ public class I2PSimpleClient implements I2PClient { - /** Don't do this */ + /** @deprecated Don't do this */ public Destination createDestination(OutputStream destKeyStream) throws I2PException, IOException { return null; } - /** or this */ + /** @deprecated or this */ public Destination createDestination(OutputStream destKeyStream, Certificate cert) throws I2PException, IOException { return null; } diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index bdc384b44..933a9578c 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -20,6 +20,7 @@ import net.i2p.data.i2cp.DestReplyMessage; import net.i2p.data.i2cp.GetBandwidthLimitsMessage; import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.util.I2PThread; +import net.i2p.util.InternalSocket; /** * Create a new session for doing naming and bandwidth queries only. Do not create a Destination. @@ -71,10 +72,12 @@ class I2PSimpleSession extends I2PSessionImpl2 { notifier.start(); try { - _socket = new Socket(_hostname, _portNum); + // If we are in the router JVM, connect using the interal pseudo-socket + _socket = InternalSocket.getSocket(_hostname, _portNum); _out = _socket.getOutputStream(); synchronized (_out) { _out.write(I2PClient.PROTOCOL_BYTE); + _out.flush(); } InputStream in = _socket.getInputStream(); _reader = new I2CPMessageReader(in, this); diff --git a/core/java/src/net/i2p/util/InternalServerSocket.java b/core/java/src/net/i2p/util/InternalServerSocket.java new file mode 100644 index 000000000..00d8352ee --- /dev/null +++ b/core/java/src/net/i2p/util/InternalServerSocket.java @@ -0,0 +1,199 @@ +package net.i2p.util; + +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ServerSocketChannel; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import net.i2p.I2PAppContext; + +/** + * A simple in-JVM ServerSocket using Piped Streams. + * We use port numbers just like regular sockets. + * Can only be connected by InternalSocket. + */ +public class InternalServerSocket extends ServerSocket { + private static final ConcurrentHashMap _sockets = new ConcurrentHashMap(4); + private BlockingQueue _acceptQueue; + private Integer _port; + private boolean _running; + private static Log _log = I2PAppContext.getGlobalContext().logManager().getLog(InternalServerSocket.class); + + public InternalServerSocket(int port) throws IOException { + if (port <= 0) + throw new IOException("Bad port: " + port); + _port = Integer.valueOf(port); + InternalServerSocket previous = _sockets.putIfAbsent(_port, this); + if (previous != null) + throw new IOException("Internal port in use: " + port); + _running = true; + _acceptQueue = new LinkedBlockingQueue(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Registered " + _port); + } + + @Override + public void close() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closing " + _port); + _running = false; + _sockets.remove(_port); + _acceptQueue.clear(); + try { + // use null streams as a poison + _acceptQueue.put(new InternalSocket(null, null)); + } catch (InterruptedException ie) {} + } + + @Override + public Socket accept() throws IOException { + InternalSocket serverSock = null; + while (_running) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accepting " + _port); + try { + serverSock = _acceptQueue.take(); + } catch (InterruptedException ie) { + continue; + } + if (serverSock.getInputStream() == null) // poison + throw new IOException("closed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accepted " + _port); + break; + } + return serverSock; + } + + @Override + public String toString() { + return ("Internal server socket on port " + _port); + } + + /** + * This is how the client connects. + * + * Todo: Java 1.5 PipedInputStream buffers are only 1024 bytes; our I2CP messages are typically 1730 bytes, + * thus causing thread blockage before the whole message is transferred. + * We can specify buffer size in 1.6 but not in 1.5. + * Does wrapping the PipedOutputStreams in BufferedOutputStreams gain capacity? + * No? + */ + static void internalConnect(int port, InternalSocket clientSock) throws IOException { + InternalServerSocket iss = _sockets.get(Integer.valueOf(port)); + if (iss == null) + throw new IOException("No server for port: " + port); + PipedInputStream cis = new BigPipedInputStream(); + PipedInputStream sis = new BigPipedInputStream(); + PipedOutputStream cos = new PipedOutputStream(sis); + PipedOutputStream sos = new PipedOutputStream(cis); + clientSock.setInputStream(cis); + clientSock.setOutputStream(cos); + iss.queueConnection(new InternalSocket(sis, sos)); + } + + /** + * Until we switch to Java 1.6 + * http://javatechniques.com/blog/low-memory-deep-copy-technique-for-java-objects/ + */ + private static class BigPipedInputStream extends PipedInputStream { + protected static int PIPE_SIZE = 64*1024; + public BigPipedInputStream() { + super(); + buffer = new byte[PIPE_SIZE]; + } + } + + private void queueConnection(InternalSocket sock) throws IOException { + if (!_running) + throw new IOException("Server closed for port: " + _port); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Queueing " + _port); + try { + _acceptQueue.put(sock); + } catch (InterruptedException ie) {} + } + + @Override + public int getLocalPort() { + return _port.intValue(); + } + + // ignored stuff + + /** warning - unsupported */ + @Override + public void setSoTimeout(int timeout) {} + + @Override + public int getSoTimeout () { + return 0; + } + + // everything below here unsupported + /** @deprecated unsupported */ + @Override + public void bind(SocketAddress endpoint) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void bind(SocketAddress endpoint, int backlog) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public ServerSocketChannel getChannel() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public InetAddress getInetAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public SocketAddress getLocalSocketAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getReceiveBufferSize() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean getReuseAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isBound() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isClosed() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setReceiveBufferSize(int size) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setReuseAddress(boolean on) { + throw new IllegalArgumentException("unsupported"); + } +} diff --git a/core/java/src/net/i2p/util/InternalSocket.java b/core/java/src/net/i2p/util/InternalSocket.java new file mode 100644 index 000000000..b1feb847f --- /dev/null +++ b/core/java/src/net/i2p/util/InternalSocket.java @@ -0,0 +1,259 @@ +package net.i2p.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.SocketChannel; + +/** + * A simple in-JVM Socket using Piped Streams. + * We use port numbers just like regular sockets. + * Can only connect to InternalServerSocket. + */ +public class InternalSocket extends Socket { + private InputStream _is; + private OutputStream _os; + + /** server side */ + InternalSocket(InputStream is, OutputStream os) { + _is = is; + _os = os; + } + + /** client side */ + public InternalSocket(int port) throws IOException { + if (port <= 0) + throw new IOException("bad port number"); + InternalServerSocket.internalConnect(port, this); + } + + /** + * Convenience method to return either a Socket or an InternalSocket + */ + public static Socket getSocket(String host, int port) throws IOException { + if (System.getProperty("router.version") != null && + (host.equals("127.0.0.1") || host.equals("localhost"))) { + return new InternalSocket(port); + } else { + return new Socket(host, port); + } + } + + @Override + public InputStream getInputStream() { + return _is; + } + + @Override + public OutputStream getOutputStream() { + return _os; + } + + void setInputStream(InputStream is) { + _is = is; + } + + void setOutputStream(OutputStream os) { + _os = os; + } + + @Override + public void close() { + try { + if (_is != null) _is.close(); + } catch (IOException ie) {} + try { + if (_os != null) _os.close(); + } catch (IOException ie) {} + } + + @Override + public String toString() { + return ("Internal socket"); + } + + // ignored stuff + /** warning - unsupported */ + @Override + public void setSoTimeout(int timeout) {} + + @Override + public int getSoTimeout () { + return 0; + } + + // everything below here unsupported + /** @deprecated unsupported */ + @Override + public void bind(SocketAddress endpoint) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void connect(SocketAddress endpoint) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void connect(SocketAddress endpoint, int timeout) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public SocketChannel getChannel() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public InetAddress getInetAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean getKeepAlive() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public InetAddress getLocalAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getLocalPort() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public SocketAddress getLocalSocketAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean getOOBInline() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getPort() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getReceiveBufferSize() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public SocketAddress getRemoteSocketAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean getReuseAddress() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getSendBufferSize() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getSoLinger() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean getTcpNoDelay() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public int getTrafficClass() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isBound() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isClosed() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isConnected() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isInputShutdown() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public boolean isOutputShutdown() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void sendUrgentData(int data) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setKeepAlive(boolean on) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setOOBInline(boolean on) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setReceiveBufferSize(int size) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setReuseAddress(boolean on) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setSendBufferSize(int size) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setSoLinger(boolean on, int linger) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setTcpNoDelay(boolean on) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void setTrafficClass(int cize) { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void shutdownInput() { + throw new IllegalArgumentException("unsupported"); + } + /** @deprecated unsupported */ + @Override + public void shutdownOutput() { + throw new IllegalArgumentException("unsupported"); + } +} diff --git a/router/java/src/net/i2p/router/Router.java b/router/java/src/net/i2p/router/Router.java index 3e9a6cc66..7e231097b 100644 --- a/router/java/src/net/i2p/router/Router.java +++ b/router/java/src/net/i2p/router/Router.java @@ -200,6 +200,9 @@ public class Router { // overwrite an existing running router's jar files. Other than ours. installUpdates(); + // Apps may use this as an easy way to determine if they are in the router JVM + System.setProperty("router.version", RouterVersion.VERSION); + // NOW we start all the activity _context.initAll(); diff --git a/router/java/src/net/i2p/router/client/ClientListenerRunner.java b/router/java/src/net/i2p/router/client/ClientListenerRunner.java index 0f26b8d00..e5e6d8443 100644 --- a/router/java/src/net/i2p/router/client/ClientListenerRunner.java +++ b/router/java/src/net/i2p/router/client/ClientListenerRunner.java @@ -24,14 +24,14 @@ import net.i2p.util.Log; * @author jrandom */ public class ClientListenerRunner implements Runnable { - private Log _log; - private RouterContext _context; - private ClientManager _manager; - private ServerSocket _socket; - private int _port; + protected Log _log; + protected RouterContext _context; + protected ClientManager _manager; + protected ServerSocket _socket; + protected int _port; private boolean _bindAllInterfaces; - private boolean _running; - private boolean _listening; + protected boolean _running; + protected boolean _listening; public static final String BIND_ALL_INTERFACES = "i2cp.tcp.bindAllInterfaces"; @@ -91,7 +91,9 @@ public class ClientListenerRunner implements Runnable { } else { if (_log.shouldLog(Log.WARN)) _log.warn("Refused connection from " + socket.getInetAddress()); - socket.close(); + try { + socket.close(); + } catch (IOException ioe) {} } } catch (IOException ioe) { if (_context.router().isAlive()) @@ -132,7 +134,7 @@ public class ClientListenerRunner implements Runnable { /** give the i2cp client 5 seconds to show that they're really i2cp clients */ private final static int CONNECT_TIMEOUT = 5*1000; - private boolean validate(Socket socket) { + protected boolean validate(Socket socket) { try { socket.setSoTimeout(CONNECT_TIMEOUT); int read = socket.getInputStream().read(); @@ -150,7 +152,7 @@ public class ClientListenerRunner implements Runnable { * Handle the connection by passing it off to a {@link ClientConnectionRunner ClientConnectionRunner} * */ - protected void runConnection(Socket socket) throws IOException { + protected void runConnection(Socket socket) { ClientConnectionRunner runner = new ClientConnectionRunner(_context, _manager, socket); _manager.registerConnection(runner); } diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index cbc5d778b..4e77bbf9f 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -42,6 +42,7 @@ import net.i2p.util.Log; public class ClientManager { private Log _log; private ClientListenerRunner _listener; + private ClientListenerRunner _internalListener; private final HashMap _runners; // Destination --> ClientConnectionRunner private final Set _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet private RouterContext _ctx; @@ -58,11 +59,21 @@ public class ClientManager { new long[] { 60*1000l, 60*60*1000l, 24*60*60*1000l }); _runners = new HashMap(); _pendingRunners = new HashSet(); + startListeners(port); + } + + /** Todo: Start a 3rd listener for IPV6? */ + private void startListeners(int port) { _listener = new ClientListenerRunner(_ctx, this, port); Thread t = new I2PThread(_listener); t.setName("ClientListener:" + port); t.setDaemon(true); t.start(); + _internalListener = new InternalClientListenerRunner(_ctx, this, port); + t = new I2PThread(_internalListener); + t.setName("ClientListener:" + port + "-i"); + t.setDaemon(true); + t.start(); } public void restart() { @@ -80,16 +91,13 @@ public class ClientManager { _log.error("Error setting the port: " + portStr + " is not valid", nfe); } } - _listener = new ClientListenerRunner(_ctx, this, port); - Thread t = new I2PThread(_listener); - t.setName("ClientListener:" + port); - t.setDaemon(true); - t.start(); + startListeners(port); } public void shutdown() { _log.info("Shutting down the ClientManager"); _listener.stopListening(); + _internalListener.stopListening(); Set runners = new HashSet(); synchronized (_runners) { for (Iterator iter = _runners.values().iterator(); iter.hasNext();) { diff --git a/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java b/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java new file mode 100644 index 000000000..2105f7691 --- /dev/null +++ b/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java @@ -0,0 +1,88 @@ +package net.i2p.router.client; +/* + * free (adj.): unencumbered; not under the control of others + * Written by jrandom in 2003 and released into the public domain + * with no warranty of any kind, either expressed or implied. + * It probably won't make your computer catch on fire, or eat + * your children, but it might. Use at your own risk. + * + */ + +import java.io.IOException; +import java.net.Socket; + +import net.i2p.router.RouterContext; +import net.i2p.util.Log; +import net.i2p.util.InternalServerSocket; + +/** + * Listen for in-JVM connections on the internal "socket" + * + * @author zzz + */ +public class InternalClientListenerRunner extends ClientListenerRunner { + + public InternalClientListenerRunner(RouterContext context, ClientManager manager, int port) { + super(context, manager, port); + _log = _context.logManager().getLog(InternalClientListenerRunner.class); + } + + /** + * Start up the socket listener, listens for connections, and + * fires those connections off via {@link #runConnection runConnection}. + * This only returns if the socket cannot be opened or there is a catastrophic + * failure. + * + */ + public void runServer() { + try { + if (_log.shouldLog(Log.INFO)) + _log.info("Listening on internal port " + _port); + _socket = new InternalServerSocket(_port); + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("InternalServerSocket created, before accept: " + _socket); + + _listening = true; + _running = true; + while (_running) { + try { + Socket socket = _socket.accept(); + if (validate(socket)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Internal connection received"); + runConnection(socket); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Refused connection from " + socket.getInetAddress()); + try { + socket.close(); + } catch (IOException ioe) {} + } + } catch (IOException ioe) { + if (_context.router().isAlive()) + _log.error("Server error accepting", ioe); + } catch (Throwable t) { + if (_context.router().isAlive()) + _log.error("Fatal error running client listener - killing the thread!", t); + _listening = false; + return; + } + } + } catch (IOException ioe) { + if (_context.router().isAlive()) + _log.error("Error listening on internal port " + _port, ioe); + } + + _listening = false; + if (_socket != null) { + try { _socket.close(); } catch (IOException ioe) {} + _socket = null; + } + + + if (_context.router().isAlive()) + _log.error("CANCELING I2CP LISTEN", new Exception("I2CP Listen cancelled!!!")); + _running = false; + } +}