I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 072e4dc2 authored by zzz's avatar zzz
Browse files

Add ReadLine with timeouts

Implement PING
Handle QUIT, STOP, EXIT
synch DatagramServer start/stop
parent f56ac66d
No related branches found
No related tags found
No related merge requests found
package net.i2p.sam;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
/**
* Modified from I2PTunnelHTTPServer
*
* @since 0.9.22
*/
class ReadLine {
private static final int MAX_LINE_LENGTH = 8*1024;
/**
* Read a line teriminated by newline, with a total read timeout.
*
* Warning - strips \n but not \r
* Warning - 8KB line length limit as of 0.7.13, @throws IOException if exceeded
*
* @param buf output
* @param timeout throws SocketTimeoutException immediately if zero or negative
* @throws SocketTimeoutException if timeout is reached before newline
* @throws EOFException if EOF is reached before newline
* @throws LineTooLongException if too long
* @throws IOException on other errors in the underlying stream
*/
public static void readLine(Socket socket, StringBuilder buf, int timeout) throws IOException {
if (timeout <= 0)
throw new SocketTimeoutException();
long expires = System.currentTimeMillis() + timeout;
InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8");
int c;
int i = 0;
socket.setSoTimeout(timeout);
while ( (c = in.read()) != -1) {
if (++i > MAX_LINE_LENGTH)
throw new LineTooLongException("Line too long - max " + MAX_LINE_LENGTH);
if (c == '\n')
break;
int newTimeout = (int) (expires - System.currentTimeMillis());
if (newTimeout <= 0)
throw new SocketTimeoutException();
buf.append((char)c);
if (newTimeout != timeout) {
timeout = newTimeout;
socket.setSoTimeout(timeout);
}
}
if (c == -1) {
if (System.currentTimeMillis() >= expires)
throw new SocketTimeoutException();
else
throw new EOFException();
}
}
private static class LineTooLongException extends IOException {
public LineTooLongException(String s) {
super(s);
}
}
}
......@@ -97,7 +97,7 @@ class SAMv3DatagramServer implements Handler {
* Only call once.
* @since 0.9.22
*/
public void start() {
private synchronized void start() {
_listener.start();
if (_parent != null)
_parent.register(this);
......@@ -107,7 +107,7 @@ class SAMv3DatagramServer implements Handler {
* Cannot be restarted.
* @since 0.9.22
*/
public void stopHandling() {
public synchronized void stopHandling() {
synchronized(SAMv3DatagramServer.class) {
if (server != null) {
try {
......
......@@ -15,7 +15,10 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.NoRouteToHostException;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
......@@ -51,6 +54,8 @@ class SAMv3Handler extends SAMv1Handler
private volatile boolean stolenSocket;
private volatile boolean streamForwardingSocket;
private final boolean sendPorts;
private long _lastPing;
private static final int READ_TIMEOUT = 3*60*1000;
interface Session {
String getNick();
......@@ -226,6 +231,11 @@ class SAMv3Handler extends SAMv1Handler
public void stealSocket()
{
stolenSocket = true ;
if (sendPorts) {
try {
socket.socket().setSoTimeout(0);
} catch (SocketException se) {}
}
this.stopHandling();
}
......@@ -246,6 +256,7 @@ class SAMv3Handler extends SAMv1Handler
return session;
}
@Override
public void handle() {
String msg = null;
String domain = null;
......@@ -259,15 +270,72 @@ class SAMv3Handler extends SAMv1Handler
_log.debug("SAMv3 handling started");
try {
InputStream in = getClientSocket().socket().getInputStream();
Socket socket = getClientSocket().socket();
InputStream in = socket.getInputStream();
StringBuilder buf = new StringBuilder(1024);
while (true) {
if (shouldStop()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Stop request found");
break;
}
String line = DataHelper.readLine(in) ;
String line;
if (sendPorts) {
// client supports PING
try {
ReadLine.readLine(socket, buf, READ_TIMEOUT);
line = buf.toString();
buf.setLength(0);
} catch (SocketTimeoutException ste) {
long now = System.currentTimeMillis();
if (buf.length() <= 0) {
if (_lastPing > 0) {
if (now - _lastPing >= READ_TIMEOUT) {
if (_log.shouldWarn())
_log.warn("Failed to respond to PING");
writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
break;
}
} else {
if (_log.shouldDebug())
_log.debug("Sendng PING " + now);
_lastPing = now;
if (!writeString("PING " + now + '\n'))
break;
}
} else {
if (_lastPing > 0) {
if (now - _lastPing >= 2*READ_TIMEOUT) {
if (_log.shouldWarn())
_log.warn("Failed to respond to PING");
writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
break;
}
} else if (_lastPing < 0) {
if (_log.shouldWarn())
_log.warn("2nd timeout");
writeString("XXX STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
break;
} else {
// don't clear buffer, don't send ping,
// go around again
_lastPing = -1;
if (_log.shouldWarn())
_log.warn("timeout after partial: " + buf);
}
}
if (_log.shouldDebug())
_log.debug("loop after timeout");
continue;
}
} else {
buf.setLength(0);
if (DataHelper.readLine(in, buf))
line = buf.toString();
else
line = null;
}
if (line==null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection closed by client (line read : null)");
......@@ -302,6 +370,10 @@ class SAMv3Handler extends SAMv1Handler
} else if (domain.equals("PONG")) {
execPongMessage(tok);
continue;
} else if (domain.equals("QUIT") || domain.equals("STOP") ||
domain.equals("EXIT")) {
writeString(domain + " STATUS RESULT=OK MESSAGE=bye\n");
break;
}
if (count <= 1) {
// This is not a correct message, for sure
......@@ -345,7 +417,7 @@ class SAMv3Handler extends SAMv1Handler
if (!canContinue) {
break;
}
}
} // while
} catch (IOException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Caught IOException for message [" + msg + "]", e);
......@@ -855,7 +927,26 @@ class SAMv3Handler extends SAMv1Handler
* @since 0.9.22
*/
private void execPongMessage(StringTokenizer tok) {
// TODO. We don't send PINGs yet.
String s;
if (tok.hasMoreTokens()) {
s = tok.nextToken();
} else {
s = "";
}
if (_lastPing > 0) {
String expected = Long.toString(_lastPing);
if (expected.equals(s)) {
_lastPing = 0;
if (_log.shouldInfo())
_log.warn("Got expected pong: " + s);
} else {
if (_log.shouldInfo())
_log.warn("Got unexpected pong: " + s);
}
} else {
if (_log.shouldWarn())
_log.warn("Pong received without a ping: " + s);
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment