From 072e4dc2bfd8fe8eb9f859abc90678abc9a634a4 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 27 Jun 2015 19:46:45 +0000 Subject: [PATCH] Add ReadLine with timeouts Implement PING Handle QUIT, STOP, EXIT synch DatagramServer start/stop --- apps/sam/java/src/net/i2p/sam/ReadLine.java | 70 +++++++++++++ .../src/net/i2p/sam/SAMv3DatagramServer.java | 4 +- .../java/src/net/i2p/sam/SAMv3Handler.java | 99 ++++++++++++++++++- 3 files changed, 167 insertions(+), 6 deletions(-) create mode 100644 apps/sam/java/src/net/i2p/sam/ReadLine.java diff --git a/apps/sam/java/src/net/i2p/sam/ReadLine.java b/apps/sam/java/src/net/i2p/sam/ReadLine.java new file mode 100644 index 0000000000..889fc6c315 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/ReadLine.java @@ -0,0 +1,70 @@ +package net.i2p.sam; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.Socket; +import java.net.SocketException; +import java.net.SocketTimeoutException; + +/** + * Modified from I2PTunnelHTTPServer + * + * @since 0.9.22 + */ +class ReadLine { + + private static final int MAX_LINE_LENGTH = 8*1024; + + /** + * Read a line teriminated by newline, with a total read timeout. + * + * Warning - strips \n but not \r + * Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded + * + * @param buf output + * @param timeout throws SocketTimeoutException immediately if zero or negative + * @throws SocketTimeoutException if timeout is reached before newline + * @throws EOFException if EOF is reached before newline + * @throws LineTooLongException if too long + * @throws IOException on other errors in the underlying stream + */ + public static void readLine(Socket socket, StringBuilder buf, int timeout) throws IOException { + if (timeout <= 0) + throw new SocketTimeoutException(); + long expires = System.currentTimeMillis() + timeout; + InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8"); + int c; + int i = 0; + socket.setSoTimeout(timeout); + while ( (c = in.read()) != -1) { + if (++i > MAX_LINE_LENGTH) + throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH); + if (c == '\n') + break; + int newTimeout = (int) (expires - System.currentTimeMillis()); + if (newTimeout <= 0) + throw new SocketTimeoutException(); + buf.append((char)c); + if (newTimeout != timeout) { + timeout = newTimeout; + socket.setSoTimeout(timeout); + } + } + if (c == -1) { + if (System.currentTimeMillis() >= expires) + throw new SocketTimeoutException(); + else + throw new EOFException(); + } + } + + private static class LineTooLongException extends IOException { + public LineTooLongException(String s) { + super(s); + } + } +} + + diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java index f5d76c96d4..94268824c5 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramServer.java @@ -97,7 +97,7 @@ class SAMv3DatagramServer implements Handler { * Only call once. * @since 0.9.22 */ - public void start() { + private synchronized void start() { _listener.start(); if (_parent != null) _parent.register(this); @@ -107,7 +107,7 @@ class SAMv3DatagramServer implements Handler { * Cannot be restarted. * @since 0.9.22 */ - public void stopHandling() { + public synchronized void stopHandling() { synchronized(SAMv3DatagramServer.class) { if (server != null) { try { diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index 5505eb2863..7b073d2e29 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -15,7 +15,10 @@ import java.io.InputStream; import java.io.InterruptedIOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.net.Socket; import java.net.SocketAddress; +import java.net.SocketException; +import java.net.SocketTimeoutException; import java.net.NoRouteToHostException; import java.nio.channels.SocketChannel; import java.nio.ByteBuffer; @@ -51,6 +54,8 @@ class SAMv3Handler extends SAMv1Handler private volatile boolean stolenSocket; private volatile boolean streamForwardingSocket; private final boolean sendPorts; + private long _lastPing; + private static final int READ_TIMEOUT = 3*60*1000; interface Session { String getNick(); @@ -226,6 +231,11 @@ class SAMv3Handler extends SAMv1Handler public void stealSocket() { stolenSocket = true ; + if (sendPorts) { + try { + socket.socket().setSoTimeout(0); + } catch (SocketException se) {} + } this.stopHandling(); } @@ -246,6 +256,7 @@ class SAMv3Handler extends SAMv1Handler return session; } + @Override public void handle() { String msg = null; String domain = null; @@ -259,15 +270,72 @@ class SAMv3Handler extends SAMv1Handler _log.debug("SAMv3 handling started"); try { - InputStream in = getClientSocket().socket().getInputStream(); + Socket socket = getClientSocket().socket(); + InputStream in = socket.getInputStream(); + StringBuilder buf = new StringBuilder(1024); while (true) { if (shouldStop()) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Stop request found"); break; } - String line = DataHelper.readLine(in) ; + String line; + if (sendPorts) { + // client supports PING + try { + ReadLine.readLine(socket, buf, READ_TIMEOUT); + line = buf.toString(); + buf.setLength(0); + } catch (SocketTimeoutException ste) { + long now = System.currentTimeMillis(); + if (buf.length() <= 0) { + if (_lastPing > 0) { + if (now - _lastPing >= READ_TIMEOUT) { + if (_log.shouldWarn()) + _log.warn("Failed to respond to PING"); + writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); + break; + } + } else { + if (_log.shouldDebug()) + _log.debug("Sendng PING " + now); + _lastPing = now; + if (!writeString("PING " + now + '\n')) + break; + } + } else { + if (_lastPing > 0) { + if (now - _lastPing >= 2*READ_TIMEOUT) { + if (_log.shouldWarn()) + _log.warn("Failed to respond to PING"); + writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); + break; + } + } else if (_lastPing < 0) { + if (_log.shouldWarn()) + _log.warn("2nd timeout"); + writeString("XXX STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n"); + break; + } else { + // don't clear buffer, don't send ping, + // go around again + _lastPing = -1; + if (_log.shouldWarn()) + _log.warn("timeout after partial: " + buf); + } + } + if (_log.shouldDebug()) + _log.debug("loop after timeout"); + continue; + } + } else { + buf.setLength(0); + if (DataHelper.readLine(in, buf)) + line = buf.toString(); + else + line = null; + } if (line==null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection closed by client (line read : null)"); @@ -302,6 +370,10 @@ class SAMv3Handler extends SAMv1Handler } else if (domain.equals("PONG")) { execPongMessage(tok); continue; + } else if (domain.equals("QUIT") || domain.equals("STOP") || + domain.equals("EXIT")) { + writeString(domain + " STATUS RESULT=OK MESSAGE=bye\n"); + break; } if (count <= 1) { // This is not a correct message, for sure @@ -345,7 +417,7 @@ class SAMv3Handler extends SAMv1Handler if (!canContinue) { break; } - } + } // while } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Caught IOException for message [" + msg + "]", e); @@ -855,7 +927,26 @@ class SAMv3Handler extends SAMv1Handler * @since 0.9.22 */ private void execPongMessage(StringTokenizer tok) { - // TODO. We don't send PINGs yet. + String s; + if (tok.hasMoreTokens()) { + s = tok.nextToken(); + } else { + s = ""; + } + if (_lastPing > 0) { + String expected = Long.toString(_lastPing); + if (expected.equals(s)) { + _lastPing = 0; + if (_log.shouldInfo()) + _log.warn("Got expected pong: " + s); + } else { + if (_log.shouldInfo()) + _log.warn("Got unexpected pong: " + s); + } + } else { + if (_log.shouldWarn()) + _log.warn("Pong received without a ping: " + s); + } } } -- GitLab