diff --git a/apps/sam/java/src/net/i2p/sam/Handler.java b/apps/sam/java/src/net/i2p/sam/Handler.java new file mode 100644 index 0000000000000000000000000000000000000000..16cc27334e2863d207314fbb1c103e4850018ba8 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/Handler.java @@ -0,0 +1,14 @@ +package net.i2p.sam; + +/** + * Something that can be stopped by the SAMBridge. + * + * @since 0.9.20 + */ +public interface Handler { + + /** + * Stop the handler + */ + public void stopHandling(); +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMBridge.java b/apps/sam/java/src/net/i2p/sam/SAMBridge.java index 4679f68b686a26e0ca52715fedea2dbd1b02571f..383f44fb574b50a8ce6803eea3143a778143e67d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMBridge.java +++ b/apps/sam/java/src/net/i2p/sam/SAMBridge.java @@ -19,9 +19,13 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import net.i2p.I2PAppContext; import net.i2p.app.*; @@ -30,6 +34,7 @@ import net.i2p.data.DataFormatException; import net.i2p.data.Destination; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; +import net.i2p.util.PortMapper; /** * SAM bridge implementation. @@ -55,6 +60,7 @@ public class SAMBridge implements Runnable, ClientApp { * destination keys (Destination+PrivateKey+SigningPrivateKey) */ private final Map<String,String> nameToPrivKeys; + private final Set<Handler> _handlers; private volatile boolean acceptConnections = true; @@ -95,6 +101,7 @@ public class SAMBridge implements Runnable, ClientApp { _listenPort = options.port; persistFilename = options.keyFile; nameToPrivKeys = new HashMap<String,String>(8); + _handlers = new HashSet<Handler>(8); this.i2cpProps = options.opts; _state = INITIALIZED; } @@ -124,6 +131,7 @@ public class SAMBridge implements Runnable, ClientApp { _listenPort = listenPort; persistFilename = persistFile; nameToPrivKeys = new HashMap<String,String>(8); + _handlers = new HashSet<Handler>(8); loadKeys(); try { openSocket(); @@ -209,8 +217,9 @@ public class SAMBridge implements Runnable, ClientApp { } /** - * Load up the keys from the persistFilename - * + * Load up the keys from the persistFilename. + * TODO use DataHelper + * TODO store in config dir, not base dir */ private void loadKeys() { synchronized (nameToPrivKeys) { @@ -218,7 +227,7 @@ public class SAMBridge implements Runnable, ClientApp { BufferedReader br = null; try { br = new BufferedReader(new InputStreamReader( - new FileInputStream(persistFilename))); + new FileInputStream(persistFilename), "UTF-8")); String line = null; while ( (line = br.readLine()) != null) { int eq = line.indexOf('='); @@ -226,6 +235,8 @@ public class SAMBridge implements Runnable, ClientApp { String privKeys = line.substring(eq+1); nameToPrivKeys.put(name, privKeys); } + if (_log.shouldInfo()) + _log.info("Loaded " + nameToPrivKeys.size() + " private keys from " + persistFilename); } catch (FileNotFoundException fnfe) { _log.warn("Key file does not exist at " + persistFilename); } catch (IOException ioe) { @@ -237,8 +248,9 @@ public class SAMBridge implements Runnable, ClientApp { } /** - * Store the current keys to disk in the location specified on creation - * + * Store the current keys to disk in the location specified on creation. + * TODO use DataHelper + * TODO store in config dir, not base dir */ private void storeKeys() { synchronized (nameToPrivKeys) { @@ -248,11 +260,13 @@ public class SAMBridge implements Runnable, ClientApp { for (Map.Entry<String, String> entry : nameToPrivKeys.entrySet()) { String name = entry.getKey(); String privKeys = entry.getValue(); - out.write(name.getBytes()); + out.write(name.getBytes("UTF-8")); out.write('='); - out.write(privKeys.getBytes()); + out.write(privKeys.getBytes("UTF-8")); out.write('\n'); } + if (_log.shouldInfo()) + _log.info("Saved " + nameToPrivKeys.size() + " private keys to " + persistFilename); } catch (IOException ioe) { _log.error("Error writing out the SAM keys to " + persistFilename, ioe); } finally { @@ -261,6 +275,51 @@ public class SAMBridge implements Runnable, ClientApp { } } + /** + * Handlers must call on startup + * @since 0.9.20 + */ + public void register(Handler handler) { + if (_log.shouldInfo()) + _log.info("Register " + handler); + synchronized (_handlers) { + _handlers.add(handler); + } + } + + /** + * Handlers must call on stop + * @since 0.9.20 + */ + public void unregister(Handler handler) { + if (_log.shouldInfo()) + _log.info("Unregister " + handler); + synchronized (_handlers) { + _handlers.remove(handler); + } + } + + /** + * Stop all the handlers. + * @since 0.9.20 + */ + private void stopHandlers() { + List<Handler> handlers = null; + synchronized (_handlers) { + if (!_handlers.isEmpty()) { + handlers = new ArrayList<Handler>(_handlers); + _handlers.clear(); + } + } + if (handlers != null) { + for (Handler handler : handlers) { + if (_log.shouldInfo()) + _log.info("Stopping " + handler); + handler.stopHandling(); + } + } + } + ////// begin ClientApp interface, use only if using correct construtor /** @@ -270,6 +329,9 @@ public class SAMBridge implements Runnable, ClientApp { if (_state != INITIALIZED) return; changeState(STARTING); + synchronized (_handlers) { + _handlers.clear(); + } loadKeys(); try { openSocket(); @@ -285,7 +347,8 @@ public class SAMBridge implements Runnable, ClientApp { } /** - * Does NOT stop existing sessions. + * As of 0.9.20, stops running handlers and sessions. + * * @since 0.9.6 */ public synchronized void shutdown(String[] args) { @@ -293,11 +356,11 @@ public class SAMBridge implements Runnable, ClientApp { return; changeState(STOPPING); acceptConnections = false; + stopHandlers(); if (_runner != null) _runner.interrupt(); else changeState(STOPPED); - // TODO does not stop active connections / sessions } /** @@ -375,7 +438,7 @@ public class SAMBridge implements Runnable, ClientApp { * @since 0.9.6 */ private void startThread() { - I2PAppThread t = new I2PAppThread(this, "SAMListener"); + I2PAppThread t = new I2PAppThread(this, "SAMListener " + _listenPort); if (Boolean.parseBoolean(System.getProperty("sam.shutdownOnOOM"))) { t.addOOMEventThreadListener(new I2PAppThread.OOMEventListener() { public void outOfMemory(OutOfMemoryError err) { @@ -487,6 +550,7 @@ public class SAMBridge implements Runnable, ClientApp { changeState(RUNNING); if (_mgr != null) _mgr.register(this); + I2PAppContext.getGlobalContext().portMapper().register(PortMapper.SVC_SAM, _listenPort); try { while (acceptConnections) { SocketChannel s = serverSocket.accept(); @@ -495,18 +559,19 @@ public class SAMBridge implements Runnable, ClientApp { + s.socket().getInetAddress().toString() + ":" + s.socket().getPort()); - class HelloHandler implements Runnable { - private final SocketChannel s; - private final SAMBridge parent; + class HelloHandler implements Runnable, Handler { + private final SocketChannel s; + private final SAMBridge parent; - HelloHandler(SocketChannel s, SAMBridge parent) { + HelloHandler(SocketChannel s, SAMBridge parent) { this.s = s ; this.parent = parent ; - } + } - public void run() { + public void run() { + parent.register(this); try { - SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps); + SAMHandler handler = SAMHandlerFactory.createSAMHandler(s, i2cpProps, parent); if (handler == null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("SAM handler has not been instantiated"); @@ -515,7 +580,6 @@ public class SAMBridge implements Runnable, ClientApp { } catch (IOException e) {} return; } - handler.setBridge(parent); handler.startHandling(); } catch (SAMException e) { if (_log.shouldLog(Log.ERROR)) @@ -526,11 +590,17 @@ public class SAMBridge implements Runnable, ClientApp { } catch (Exception ee) { try { s.close(); } catch (IOException ioe) {} _log.log(Log.CRIT, "Unexpected error handling SAM connection", ee); - } - } + } finally { + parent.unregister(this); + } + } + + /** @since 0.9.20 */ + public void stopHandling() { + try { s.close(); } catch (IOException ioe) {} + } } - // TODO: Handler threads are not saved or tracked and cannot be stopped - new I2PAppThread(new HelloHandler(s,this), "HelloHandler").start(); + new I2PAppThread(new HelloHandler(s,this), "SAM HelloHandler").start(); } changeState(STOPPING); } catch (Exception e) { @@ -546,6 +616,8 @@ public class SAMBridge implements Runnable, ClientApp { if (serverSocket != null) serverSocket.close(); } catch (IOException e) {} + I2PAppContext.getGlobalContext().portMapper().unregister(PortMapper.SVC_SAM); + stopHandlers(); changeState(STOPPED); } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandler.java b/apps/sam/java/src/net/i2p/sam/SAMHandler.java index 30213ca6d7f1df2c07c45b1e19e01cd6dfd96dde..56d68878c612ec998fdc6a80f56e5b298f03f7e0 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandler.java @@ -25,12 +25,12 @@ import net.i2p.util.Log; * * @author human */ -abstract class SAMHandler implements Runnable { +abstract class SAMHandler implements Runnable, Handler { protected final Log _log; - protected I2PAppThread thread = null; - protected SAMBridge bridge = null; + protected I2PAppThread thread; + protected final SAMBridge bridge; private final Object socketWLock = new Object(); // Guards writings on socket protected final SocketChannel socket; @@ -41,8 +41,8 @@ abstract class SAMHandler implements Runnable { /** I2CP options configuring the I2CP connection (port, host, numHops, etc) */ protected final Properties i2cpProps; - private final Object stopLock = new Object(); - private volatile boolean stopHandler; + protected final Object stopLock = new Object(); + protected boolean stopHandler; /** * SAMHandler constructor (to be called by subclasses) @@ -53,14 +53,15 @@ abstract class SAMHandler implements Runnable { * @param i2cpProps properties to configure the I2CP connection (host, port, etc) * @throws IOException */ - protected SAMHandler(SocketChannel s, - int verMajor, int verMinor, Properties i2cpProps) throws IOException { + protected SAMHandler(SocketChannel s, int verMajor, int verMinor, + Properties i2cpProps, SAMBridge parent) throws IOException { _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); socket = s; this.verMajor = verMajor; this.verMinor = verMinor; this.i2cpProps = i2cpProps; + bridge = parent; } /** @@ -68,12 +69,10 @@ abstract class SAMHandler implements Runnable { * */ public final void startHandling() { - thread = new I2PAppThread(this, "SAMHandler"); + thread = new I2PAppThread(this, getClass().getSimpleName()); thread.start(); } - - public void setBridge(SAMBridge bridge) { this.bridge = bridge; } - + /** * Actually handle the SAM protocol. * @@ -81,10 +80,9 @@ abstract class SAMHandler implements Runnable { protected abstract void handle(); /** - * Get the input stream of the socket connected to the SAM client + * Get the channel of the socket connected to the SAM client * - * @return input stream - * @throws IOException + * @return channel */ protected final SocketChannel getClientSocket() { return socket ; @@ -156,13 +154,17 @@ abstract class SAMHandler implements Runnable { } /** - * Stop the SAM handler - * + * Stop the SAM handler, close the client socket, + * unregister with the bridge. */ - public final void stopHandling() { + public void stopHandling() { synchronized (stopLock) { stopHandler = true; } + try { + closeClientSocket(); + } catch (IOException e) {} + bridge.unregister(this); } /** @@ -183,14 +185,23 @@ abstract class SAMHandler implements Runnable { */ @Override public final String toString() { - return ("SAM handler (class: " + this.getClass().getName() + return (this.getClass().getSimpleName() + "; SAM version: " + verMajor + "." + verMinor + "; client: " + this.socket.socket().getInetAddress().toString() + ":" + this.socket.socket().getPort() + ")"); } + /** + * Register with the bridge, call handle(), + * unregister with the bridge. + */ public final void run() { - handle(); + bridge.register(this); + try { + handle(); + } finally { + bridge.unregister(this); + } } } diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index d605d57f9079e0d23b9032f5b9f891904f198fe0..582854d870617970e18ea5efbeb4dba2c11c7bb6 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -38,13 +38,15 @@ class SAMHandlerFactory { * @throws SAMException if the connection handshake (HELLO message) was malformed * @return A SAM protocol handler, or null if the client closed before the handshake */ - public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps) throws SAMException { + public static SAMHandler createSAMHandler(SocketChannel s, Properties i2cpProps, + SAMBridge parent) throws SAMException { StringTokenizer tok; Log log = I2PAppContext.getGlobalContext().logManager().getLog(SAMHandlerFactory.class); try { Socket sock = s.socket(); sock.setSoTimeout(HELLO_TIMEOUT); + sock.setKeepAlive(true); String line = DataHelper.readLine(sock.getInputStream()); sock.setSoTimeout(0); if (line == null) { @@ -103,13 +105,13 @@ class SAMHandlerFactory { try { switch (verMajor) { case 1: - handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps); + handler = new SAMv1Handler(s, verMajor, verMinor, i2cpProps, parent); break; case 2: - handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps); + handler = new SAMv2Handler(s, verMajor, verMinor, i2cpProps, parent); break; case 3: - handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps); + handler = new SAMv3Handler(s, verMajor, verMinor, i2cpProps, parent); break; default: log.error("BUG! Trying to initialize the wrong SAM version!"); diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java index dbeec9eda7b90a1c60f0b1b2b3339809b6720cef..db2450b0113dca84f2117c4a4faf05e0378adc2a 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java @@ -72,6 +72,7 @@ abstract class SAMMessageSession { handler = new SAMMessageSessionHandler(destStream, props); + // FIXME don't start threads in constructors Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); t.start(); } @@ -125,7 +126,6 @@ abstract class SAMMessageSession { /** * Close a SAM message-based session. - * */ public void close() { handler.stopRunning(); diff --git a/apps/sam/java/src/net/i2p/sam/SAMUtils.java b/apps/sam/java/src/net/i2p/sam/SAMUtils.java index 7adaadf7b5a087c5808478dd90c3e7d35522c539..e1e52f4fc4c1dead45aff0f3231fe043b2ce725d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMUtils.java +++ b/apps/sam/java/src/net/i2p/sam/SAMUtils.java @@ -212,6 +212,7 @@ class SAMUtils { } /* Dump a Properties object in an human-readable form */ +/**** private static String dumpProperties(Properties props) { StringBuilder builder = new StringBuilder(); String key, val; @@ -231,6 +232,7 @@ class SAMUtils { return builder.toString(); } +****/ /**** public static void main(String args[]) { diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index 3189f87e36722be2b675b6b9ce18168ec1c6af8b..cf4d46b62e63cd35bf5bf545bfcecd18d24b0760 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -60,8 +60,9 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece * @throws SAMException * @throws IOException */ - public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException { - this(s, verMajor, verMinor, new Properties()); + public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, + SAMBridge parent) throws SAMException, IOException { + this(s, verMajor, verMinor, new Properties(), parent); } /** * Create a new SAM version 1 handler. This constructor expects @@ -75,13 +76,14 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece * @throws SAMException * @throws IOException */ - public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException { - super(s, verMajor, verMinor, i2cpProps); + public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, + Properties i2cpProps, SAMBridge parent) throws SAMException, IOException { + super(s, verMajor, verMinor, i2cpProps, parent); _id = __id.incrementAndGet(); if (_log.shouldLog(Log.DEBUG)) _log.debug("SAM version 1 handler instantiated"); - if ( ! verifVersion() ) { + if ( ! verifVersion() ) { throw new SAMException("BUG! Wrong protocol version!"); } } @@ -183,8 +185,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Caught IOException (" - + e.getMessage() + ") for message [" + msg + "]", e); + _log.debug("Caught IOException for message [" + msg + "]", e); } catch (Exception e) { _log.error("Unexpected exception for message [" + msg + "]", e); } finally { @@ -193,7 +194,8 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece try { closeClientSocket(); } catch (IOException e) { - _log.error("Error closing socket: " + e.getMessage()); + if (_log.shouldWarn()) + _log.warn("Error closing socket", e); } if (getRawSession() != null) { getRawSession().close(); @@ -797,7 +799,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void receiveRawBytes(byte data[]) throws IOException { if (getRawSession() == null) { _log.error("BUG! Received raw bytes, but session is null!"); - throw new NullPointerException("BUG! RAW session is null!"); + return; } ByteArrayOutputStream msg = new ByteArrayOutputStream(); @@ -818,7 +820,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (getRawSession() == null) { _log.error("BUG! Got raw receiving stop, but session is null!"); - throw new NullPointerException("BUG! RAW session is null!"); + return; } try { @@ -833,7 +835,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException { if (getDatagramSession() == null) { _log.error("BUG! Received datagram bytes, but session is null!"); - throw new NullPointerException("BUG! DATAGRAM session is null!"); + return; } ByteArrayOutputStream msg = new ByteArrayOutputStream(); @@ -855,7 +857,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (getDatagramSession() == null) { _log.error("BUG! Got datagram receiving stop, but session is null!"); - throw new NullPointerException("BUG! DATAGRAM session is null!"); + return; } try { @@ -873,7 +875,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if ( getStreamSession() == null ) { _log.error ( "BUG! Want to answer to stream SEND, but session is null!" ); - throw new NullPointerException ( "BUG! STREAM session is null!" ); + return; } if ( !writeString ( "STREAM SEND ID=" + id @@ -891,7 +893,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if ( getStreamSession() == null ) { _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" ); - throw new NullPointerException ( "BUG! STREAM session is null!" ); + return; } if ( !writeString ( "STREAM READY_TO_SEND ID=" + id + "\n" ) ) @@ -904,7 +906,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void notifyStreamIncomingConnection(int id, Destination d) throws IOException { if (getStreamSession() == null) { _log.error("BUG! Received stream connection, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); + return; } if (!writeString("STREAM CONNECTED DESTINATION=" @@ -914,18 +916,16 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } } + /** @param msg may be null */ public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException { if ( getStreamSession() == null ) { _log.error ( "BUG! Received stream connection, but session is null!" ); - throw new NullPointerException ( "BUG! STREAM session is null!" ); + return; } - String msgString = "" ; - - if ( msg != null ) msgString = " MESSAGE=\"" + msg + "\""; - + String msgString = createMessageString(msg); if ( !writeString ( "STREAM STATUS RESULT=" + result + " ID=" + id @@ -935,11 +935,36 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece throw new IOException ( "Error notifying connection to SAM client" ); } } + + /** + * Create a string to be appended to a status. + * + * @param msg may be null + * @return non-null, "" if msg is null, MESSAGE=msg or MESSAGE="msg a b c" + * with leading space if msg is non-null + * @since 0.9.20 + */ + protected static String createMessageString(String msg) { + String rv; + if ( msg != null ) { + msg = msg.replace("\n", " "); + msg = msg.replace("\r", " "); + if (!msg.startsWith("\"")) { + msg = msg.replace("\"", ""); + if (msg.contains("\"") || msg.contains("\t")) + msg = '"' + msg + '"'; + } + rv = " MESSAGE=\"" + msg + "\""; + } else { + rv = ""; + } + return rv; + } public void receiveStreamBytes(int id, ByteBuffer data) throws IOException { if (getStreamSession() == null) { _log.error("Received stream bytes, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); + return; } String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n"; @@ -956,16 +981,15 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } } + /** @param msg may be null */ public void notifyStreamDisconnection(int id, String result, String msg) throws IOException { if (getStreamSession() == null) { _log.error("BUG! Received stream disconnection, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); + return; } - // FIXME: msg should be escaped! - if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result - + (msg == null ? "" : (" MESSAGE=" + msg)) - + "\n")) { + String msgString = createMessageString(msg); + if (!writeString("STREAM CLOSED ID=" + id + " RESULT=" + result + msgString + '\n')) { throw new IOException("Error notifying disconnection to SAM client"); } } @@ -976,7 +1000,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (getStreamSession() == null) { _log.error("BUG! Got stream receiving stop, but session is null!"); - throw new NullPointerException("BUG! STREAM session is null!"); + return; } try { diff --git a/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java index d96d82cb13841c23606ccaace397cbdec6957e9e..5eec29e1ea32a41b582cd6e04abcddacefe4ed5d 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv2Handler.java @@ -34,9 +34,10 @@ class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDatagramRe * @param verMajor SAM major version to manage (should be 2) * @param verMinor SAM minor version to manage */ - public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException + public SAMv2Handler(SocketChannel s, int verMajor, int verMinor, + SAMBridge parent) throws SAMException, IOException { - this ( s, verMajor, verMinor, new Properties() ); + this(s, verMajor, verMinor, new Properties(), parent); } /** @@ -50,9 +51,10 @@ class SAMv2Handler extends SAMv1Handler implements SAMRawReceiver, SAMDatagramRe * @param i2cpProps properties to configure the I2CP connection (host, port, etc) */ - public SAMv2Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException + public SAMv2Handler(SocketChannel s, int verMajor, int verMinor, + Properties i2cpProps, SAMBridge parent) throws SAMException, IOException { - super ( s, verMajor, verMinor, i2cpProps ); + super(s, verMajor, verMinor, i2cpProps, parent); } @Override diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index fd3b44a2055e3a860f4395f42296e1108c1f120d..5e75dcf661d485c1fa1b94e8a54683b122dfa849 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -48,8 +48,8 @@ class SAMv3Handler extends SAMv1Handler private Session session; public static final SessionsDB sSessionsHash = new SessionsDB(); - private boolean stolenSocket; - private boolean streamForwardingSocket; + private volatile boolean stolenSocket; + private volatile boolean streamForwardingSocket; interface Session { @@ -67,9 +67,10 @@ class SAMv3Handler extends SAMv1Handler * @param verMajor SAM major version to manage (should be 3) * @param verMinor SAM minor version to manage */ - public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor ) throws SAMException, IOException + public SAMv3Handler(SocketChannel s, int verMajor, int verMinor, + SAMBridge parent) throws SAMException, IOException { - this ( s, verMajor, verMinor, new Properties() ); + this(s, verMajor, verMinor, new Properties(), parent); } /** @@ -83,9 +84,10 @@ class SAMv3Handler extends SAMv1Handler * @param i2cpProps properties to configure the I2CP connection (host, port, etc) */ - public SAMv3Handler ( SocketChannel s, int verMajor, int verMinor, Properties i2cpProps ) throws SAMException, IOException + public SAMv3Handler(SocketChannel s, int verMajor, int verMinor, + Properties i2cpProps, SAMBridge parent) throws SAMException, IOException { - super ( s, verMajor, verMinor, i2cpProps ); + super(s, verMajor, verMinor, i2cpProps, parent); if (_log.shouldLog(Log.DEBUG)) _log.debug("SAM version 3 handler instantiated"); } @@ -214,6 +216,9 @@ class SAMv3Handler extends SAMv1Handler } } + /** + * The values in the SessionsDB + */ public static class SessionRecord { private final String m_dest ; @@ -266,11 +271,14 @@ class SAMv3Handler extends SAMv1Handler } } + /** + * basically a HashMap from String to SessionRecord + */ public static class SessionsDB { private static final long serialVersionUID = 0x1; - static class ExistingIdException extends Exception { + static class ExistingIdException extends Exception { private static final long serialVersionUID = 0x1; } @@ -284,6 +292,7 @@ class SAMv3Handler extends SAMv1Handler map = new HashMap<String, SessionRecord>() ; } + /** @return success */ synchronized public boolean put( String nick, SessionRecord session ) throws ExistingIdException, ExistingDestException { @@ -305,17 +314,12 @@ class SAMv3Handler extends SAMv1Handler return false ; } + /** @return true if removed */ synchronized public boolean del( String nick ) { - SessionRecord rec = map.get(nick); - - if ( rec!=null ) { - map.remove(nick); - return true ; - } - else - return false ; + return map.remove(nick) != null; } + synchronized public SessionRecord get(String nick) { return map.get(nick); @@ -332,12 +336,23 @@ class SAMv3Handler extends SAMv1Handler return this.socket.socket().getInetAddress().getHostAddress(); } + /** + * For SAMv3StreamSession connect and accept + */ public void stealSocket() { stolenSocket = true ; this.stopHandling(); } + /** + * For SAMv3StreamSession + * @since 0.9.20 + */ + SAMBridge getBridge() { + return bridge; + } + public void handle() { String msg = null; String domain = null; @@ -348,7 +363,7 @@ class SAMv3Handler extends SAMv1Handler this.thread.setName("SAMv3Handler " + _id); if (_log.shouldLog(Log.DEBUG)) - _log.debug("SAM handling started"); + _log.debug("SAMv3 handling started"); try { InputStream in = getClientSocket().socket().getInputStream(); @@ -422,8 +437,7 @@ class SAMv3Handler extends SAMv1Handler } } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Caught IOException (" - + e.getMessage() + ") for message [" + msg + "]", e); + _log.debug("Caught IOException for message [" + msg + "]", e); } catch (Exception e) { _log.error("Unexpected exception for message [" + msg + "]", e); } finally { @@ -435,7 +449,8 @@ class SAMv3Handler extends SAMv1Handler try { closeClientSocket(); } catch (IOException e) { - _log.error("Error closing socket: " + e.getMessage()); + if (_log.shouldWarn()) + _log.warn("Error closing socket", e); } } if (streamForwardingSocket) @@ -444,20 +459,40 @@ class SAMv3Handler extends SAMv1Handler try { ((SAMv3StreamSession)streamSession).stopForwardingIncoming(); } catch (SAMException e) { - _log.error("Error while stopping forwarding connections: " + e.getMessage()); + if (_log.shouldWarn()) + _log.warn("Error while stopping forwarding connections", e); } catch (InterruptedIOException e) { - _log.error("Interrupted while stopping forwarding connections: " + e.getMessage()); + if (_log.shouldWarn()) + _log.warn("Interrupted while stopping forwarding connections", e); } } } - - - die(); } } - protected void die() { + /** + * Stop the SAM handler, close the socket, + * unregister with the bridge. + * + * Overridden to not close the client socket if stolen. + * + * @since 0.9.20 + */ + @Override + public void stopHandling() { + synchronized (stopLock) { + stopHandler = true; + } + if (!stolenSocket) { + try { + closeClientSocket(); + } catch (IOException e) {} + } + bridge.unregister(this); + } + + private void die() { SessionRecord rec = null ; if (session!=null) { @@ -813,20 +848,15 @@ class SAMv3Handler extends SAMv1Handler } - public void notifyStreamResult(boolean verbose, String result, String message) throws IOException - { + public void notifyStreamResult(boolean verbose, String result, String message) throws IOException { if (!verbose) return ; - - String out = "STREAM STATUS RESULT="+result; - if (message!=null) - out = out + " MESSAGE=\"" + message + "\""; - out = out + '\n'; + String msgString = createMessageString(message); + String out = "STREAM STATUS RESULT=" + result + msgString + '\n'; - if ( !writeString ( out ) ) - { - throw new IOException ( "Error notifying connection to SAM client" ); - } - } + if (!writeString(out)) { + throw new IOException ( "Error notifying connection to SAM client" ); + } + } public void notifyStreamIncomingConnection(Destination d) throws IOException { if (getStreamSession() == null) { diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java index 99854637e78bcee448c1ae9690ad880f4fdec854..fe9561cedbc4fca9edcb31dc4d6ec77753705441 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -74,6 +74,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi /** * Connect the SAM STREAM session to the specified Destination + * for a single connection, using the socket stolen from the handler. * * @param handler The handler that communicates with the requesting client * @param dest Base64-encoded Destination to connect to @@ -87,7 +88,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi * @throws IOException */ public void connect ( SAMv3Handler handler, String dest, Properties props ) - throws I2PException, ConnectException, NoRouteToHostException, + throws I2PException, ConnectException, NoRouteToHostException, DataFormatException, InterruptedIOException, IOException { boolean verbose = (props.getProperty("SILENT", "false").equals("false")); @@ -117,13 +118,17 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi WritableByteChannel toClient = handler.getClientSocket(); WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); - (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start(); - (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start(); - + SAMBridge bridge = handler.getBridge(); + (new Thread(rec.getThreadGroup(), + new Pipe(fromClient, toI2P, bridge), + "ConnectV3 SAMPipeClientToI2P")).start(); + (new Thread(rec.getThreadGroup(), + new Pipe(fromI2P, toClient, bridge), + "ConnectV3 SAMPipeI2PToClient")).start(); } /** - * Accept an incoming STREAM + * Accept a single incoming STREAM on the socket stolen from the handler. * * @param handler The handler that communicates with the requesting client * @param verbose If true, SAM will send the Base64-encoded peer Destination of an @@ -150,8 +155,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi this.socketServer = this.socketMgr.getServerSocket(); } - I2PSocket i2ps; - i2ps = this.socketServer.accept(); + I2PSocket i2ps = this.socketServer.accept(); synchronized( this.socketServerLock ) { @@ -159,11 +163,11 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi } SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); - - if ( rec==null || i2ps==null ) throw new InterruptedIOException() ; - - if (verbose) - handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ; + + if ( rec==null || i2ps==null ) throw new InterruptedIOException() ; + + if (verbose) + handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ; handler.stealSocket() ; ReadableByteChannel fromClient = handler.getClientSocket(); @@ -171,8 +175,13 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi WritableByteChannel toClient = handler.getClientSocket(); WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); - (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start(); - (new Thread(rec.getThreadGroup(), new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start(); + SAMBridge bridge = handler.getBridge(); + (new Thread(rec.getThreadGroup(), + new Pipe(fromClient, toI2P, bridge), + "AcceptV3 SAMPipeClientToI2P")).start(); + (new Thread(rec.getThreadGroup(), + new Pipe(fromI2P, toClient, bridge), + "AcceptV3 SAMPipeI2PToClient")).start(); } @@ -210,10 +219,10 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi } SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose); - (new Thread(rec.getThreadGroup(), new I2PAppThread(forwarder, "SAMStreamForwarder"), "SAMStreamForwarder")).start(); + (new Thread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start(); } - private static class SocketForwarder extends Thread + private static class SocketForwarder implements Runnable { private final String host; private final int port; @@ -254,6 +263,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi // build pipes between both sockets try { + clientServerSock.socket().setKeepAlive(true); if (this.verbose) SAMv3Handler.notifyStreamIncomingConnection( clientServerSock, i2ps.getPeerDestination()); @@ -261,8 +271,10 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream()); WritableByteChannel toClient = clientServerSock ; WritableByteChannel toI2P = Channels.newChannel(i2ps.getOutputStream()); - (new I2PAppThread(new Pipe(fromClient,toI2P, "SAMPipeClientToI2P"), "SAMPipeClientToI2P")).start(); - (new I2PAppThread(new Pipe(fromI2P,toClient, "SAMPipeI2PToClient"), "SAMPipeI2PToClient")).start(); + (new I2PAppThread(new Pipe(fromClient, toI2P, null), + "ForwardV3 SAMPipeClientToI2P")).start(); + (new I2PAppThread(new Pipe(fromI2P,toClient, null), + "ForwardV3 SAMPipeI2PToClient")).start(); } catch (IOException e) { try { @@ -277,48 +289,62 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi } } - private static class Pipe extends Thread + private static class Pipe implements Runnable, Handler { private final ReadableByteChannel in ; private final WritableByteChannel out ; private final ByteBuffer buf ; + private final SAMBridge bridge; - public Pipe(ReadableByteChannel in, WritableByteChannel out, String name) + /** + * @param bridge may be null + */ + public Pipe(ReadableByteChannel in, WritableByteChannel out, SAMBridge bridge) { - super(name); this.in = in ; this.out = out ; this.buf = ByteBuffer.allocate(BUFFER_SIZE) ; + this.bridge = bridge; } - public void run() - { - try { - while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) { - buf.flip(); - out.write(buf); - buf.compact(); - } - } - catch (IOException e) - { - this.interrupt(); + public void run() { + if (bridge != null) + bridge.register(this); + try { + while (!Thread.interrupted() && (in.read(buf)>=0 || buf.position() != 0)) { + buf.flip(); + out.write(buf); + buf.compact(); } - try { - in.close(); - } - catch (IOException e) {} - try { - buf.flip(); - while (buf.hasRemaining()) - out.write(buf); - } - catch (IOException e) {} - try { - out.close(); - } - catch (IOException e) {} - } + } catch (IOException ioe) { + // ignore + } finally { + try { + in.close(); + } catch (IOException e) {} + try { + buf.flip(); + while (buf.hasRemaining()) { + out.write(buf); + } + } catch (IOException e) {} + try { + out.close(); + } catch (IOException e) {} + if (bridge != null) + bridge.unregister(this); + } + } + + /** + * Handler interface + * @since 0.9.20 + */ + public void stopHandling() { + try { + in.close(); + } catch (IOException e) {} + } } public I2PServerSocket getSocketServer() diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 540695a4a369fb1f993368ed34bcac5a50ce299e..d3a11d74eb8871810970380a33e47d6a8aaec1a1 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -493,6 +493,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 try { I2PSSLSocketFactory fact = new I2PSSLSocketFactory(_context, false, "certificates/i2cp"); _socket = fact.createSocket(_hostname, _portNum); + _socket.setKeepAlive(true); } catch (GeneralSecurityException gse) { IOException ioe = new IOException("SSL Fail"); ioe.initCause(gse); @@ -500,6 +501,7 @@ public abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2 } } else { _socket = new Socket(_hostname, _portNum); + _socket.setKeepAlive(true); } // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. OutputStream out = _socket.getOutputStream(); diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index b6f8bb7eb40998cbc2a383063d304c71de63bd5d..ce0ef254f3ce60810c35e3c730f279a7ec687228 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -89,6 +89,7 @@ class I2PSimpleSession extends I2PSessionImpl2 { } else { _socket = new Socket(_hostname, _portNum); } + _socket.setKeepAlive(true); OutputStream out = _socket.getOutputStream(); out.write(I2PClient.PROTOCOL_BYTE); out.flush(); diff --git a/router/java/src/net/i2p/router/client/ClientListenerRunner.java b/router/java/src/net/i2p/router/client/ClientListenerRunner.java index 8510fa0719f5de7bb6344243c4db5987c31e87eb..e88339b0e116c95c7da276ec1df6a2a1ea83cbc6 100644 --- a/router/java/src/net/i2p/router/client/ClientListenerRunner.java +++ b/router/java/src/net/i2p/router/client/ClientListenerRunner.java @@ -93,6 +93,7 @@ class ClientListenerRunner implements Runnable { if (validate(socket)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection received"); + socket.setKeepAlive(true); runConnection(socket); } else { if (_log.shouldLog(Log.WARN))