Merge branch 'i2ptunnel-keepalive-client' into 'master'

Implement client-side HTTP persistence (keepalive) for the browser-to-client proxy socket

See merge request i2p-hackers/i2p.i2p!176
This commit is contained in:
zzz
2024-02-02 15:42:58 +00:00
5 changed files with 666 additions and 144 deletions

View File

@@ -16,7 +16,8 @@ import java.util.Locale;
import net.i2p.I2PAppContext;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.i2ptunnel.util.GunzipOutputStream;
import net.i2p.i2ptunnel.util.*;
import net.i2p.i2ptunnel.util.LimitOutputStream.DoneCallback;
import net.i2p.util.ByteCache;
import net.i2p.util.Log;
@@ -38,11 +39,13 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private boolean _headerWritten;
private final byte _buf1[];
protected boolean _gzip;
protected long _dataExpected;
protected long _dataExpected = -1;
protected boolean _keepAliveIn, _keepAliveOut;
/** lower-case, trimmed */
protected String _contentType;
/** lower-case, trimmed */
protected String _contentEncoding;
private final DoneCallback _callback;
private static final int CACHE_SIZE = 4*1024;
private static final ByteCache _cache = ByteCache.getInstance(8, CACHE_SIZE);
@@ -54,11 +57,66 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private static final byte[] CRLF = DataHelper.getASCII("\r\n");
public HTTPResponseOutputStream(OutputStream raw) {
this(raw, null);
}
/**
* Optionally call callback when we're done.
*
* @param cb may be null
* @since 0.9.62
*/
private HTTPResponseOutputStream(OutputStream raw, DoneCallback cb) {
super(raw);
I2PAppContext context = I2PAppContext.getGlobalContext();
_log = context.logManager().getLog(getClass());
_log = context.logManager().getLog(HTTPResponseOutputStream.class);
_headerBuffer = _cache.acquire();
_buf1 = new byte[1];
_callback = cb;
}
/**
* Optionally keep sockets alive and call callback when we're done.
*
* @param allowKeepAliveIn We may, but are not required to, keep the input socket alive.
* This is the server on the server side and I2P on the client side.
* @param allowKeepAliveOut We may, but are not required to, keep the output socket alive.
* This is I2P on the server side and the browser on the client side.
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
* @param cb non-null if allowKeepAlive is true
* @since 0.9.62
*/
public HTTPResponseOutputStream(OutputStream raw, boolean allowKeepAliveIn, boolean allowKeepAliveOut,
boolean isHead, DoneCallback cb) {
this(raw, cb);
_keepAliveIn = allowKeepAliveIn;
_keepAliveOut = allowKeepAliveOut;
if (isHead)
_dataExpected = 0;
if (_log.shouldInfo())
_log.info("Before headers: keepaliveIn? " + allowKeepAliveIn + " keepaliveOut? " + allowKeepAliveOut);
}
/**
* Should we keep the input stream alive when done?
*
* @return false before the headers are written
* @since 0.9.62
*/
public boolean getKeepAliveIn() {
return _keepAliveIn && _headerWritten;
}
/**
* Should we keep the output stream alive when done?
* Only supported for the browser socket side.
* I2P socket on server side not supported yet.
*
* @return false before the headers are written
* @since 0.9.62
*/
public boolean getKeepAliveOut() {
return _keepAliveOut && _headerWritten;
}
@Override
@@ -130,7 +188,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
byte second = data[valid - 2];
return second == NL; // \n\n
}
/**
* Possibly tweak that first HTTP response line (HTTP/1.0 200 OK, etc).
* Overridden on server side.
@@ -142,9 +200,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
/** ok, received, now munge & write it */
private void writeHeader() throws IOException {
String responseLine = null;
boolean connectionSent = false;
boolean chunked = false;
int lastEnd = -1;
byte[] data = _headerBuffer.getData();
@@ -152,11 +209,28 @@ class HTTPResponseOutputStream extends FilterOutputStream {
for (int i = 0; i < valid; i++) {
if (data[i] == NL) {
if (lastEnd == -1) {
responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
responseLine = filterResponseLine(responseLine);
String responseLine = DataHelper.getUTF8(data, 0, i+1); // includes NL
responseLine = (responseLine.trim() + "\r\n");
if (_log.shouldInfo())
_log.info("Response: " + responseLine.trim());
// Persistent conn requires HTTP/1.1
if (!responseLine.startsWith("HTTP/1.1 ")) {
_keepAliveIn = false;
_keepAliveOut = false;
}
// force zero datalen for 1xx, 204, 304 (RFC 2616 sec. 4.4)
// so that these don't prevent keepalive
int sp = responseLine.indexOf(" ");
if (sp > 0) {
String s = responseLine.substring(sp + 1);
if (s.startsWith("1") || s.startsWith("204") || s.startsWith("304"))
_dataExpected = 0;
} else {
// no status?
_keepAliveIn = false;
_keepAliveOut = false;
}
out.write(DataHelper.getUTF8(responseLine));
} else {
for (int j = lastEnd+1; j < i; j++) {
@@ -180,14 +254,26 @@ class HTTPResponseOutputStream extends FilterOutputStream {
if (val.toLowerCase(Locale.US).contains("upgrade")) {
// pass through for websocket
out.write(DataHelper.getASCII("Connection: " + val + "\r\n"));
// Disable persistence
_keepAliveOut = false;
} else {
out.write(CONNECTION_CLOSE);
// Strip to allow persistence, replace to disallow
if (!_keepAliveOut)
out.write(CONNECTION_CLOSE);
}
// We do not expect Connection: keep-alive here,
// as it's the default for HTTP/1.1, the server proxy doesn't support it,
// and we don't support keepalive for HTTP/1.0
_keepAliveIn = false;
connectionSent = true;
} else if ("proxy-connection".equals(lcKey)) {
// Nonstandard, strip
} else if ("content-encoding".equals(lcKey) && "x-i2p-gzip".equals(val.toLowerCase(Locale.US))) {
_gzip = true;
// client side only
// x-i2p-gzip is not chunked, which is nonstandard, but we track the
// end of data in GunzipOutputStream and call the callback,
// so we can support i2p-side keepalive here.
} else if ("proxy-authenticate".equals(lcKey)) {
// filter this hop-by-hop header; outproxy authentication must be configured in I2PTunnelHTTPClient
// see e.g. http://blog.c22.cc/2013/03/11/privoxy-proxy-authentication-credential-exposure-cve-2013-2503/
@@ -203,6 +289,9 @@ class HTTPResponseOutputStream extends FilterOutputStream {
} else if ("content-encoding".equals(lcKey)) {
// save for compress decision on server side
_contentEncoding = val.toLowerCase(Locale.US);
} else if ("transfer-encoding".equals(lcKey) && val.toLowerCase(Locale.US).contains("chunked")) {
// save for keepalive decision on client side
chunked = true;
} else if ("set-cookie".equals(lcKey)) {
String lcVal = val.toLowerCase(Locale.US);
if (lcVal.contains("domain=b32.i2p") ||
@@ -225,19 +314,55 @@ class HTTPResponseOutputStream extends FilterOutputStream {
lastEnd = i;
}
}
// Now make the final keepalive decisions
if (_keepAliveOut) {
// we need one but not both
if ((chunked && _dataExpected >= 0) ||
(!chunked && _dataExpected < 0))
_keepAliveOut = false;
}
if (_keepAliveIn) {
// we need one but not both
if ((chunked && _dataExpected >= 0) ||
(!chunked && _dataExpected < 0))
_keepAliveIn = false;
}
if (!connectionSent)
if (!connectionSent && !_keepAliveOut)
out.write(CONNECTION_CLOSE);
finishHeaders();
boolean shouldCompress = shouldCompress();
if (_log.shouldInfo())
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress);
_log.info("After headers: gzip? " + _gzip + " compress? " + shouldCompress + " keepaliveIn? " + _keepAliveIn + " keepaliveOut? " + _keepAliveOut);
if (data.length == CACHE_SIZE)
_cache.release(_headerBuffer);
_headerBuffer = null;
// Setup the keepalive streams
// Until we have keepalive for the i2p socket, the client side
// does not need to do this, we just wait for the socket to close.
// Until we have keepalive for the server socket, the server side
// does not need to do this, we just wait for the socket to close.
if (_keepAliveIn && !shouldCompress) {
if (_dataExpected > 0) {
// content-length
// filter output stream to count the data
out = new ByteLimitOutputStream(out, _callback, _dataExpected);
} else if (_dataExpected == 0) {
if (_callback != null)
_callback.streamDone();
} else {
// -1, chunked
// filter output stream to look for the end
// do not strip the chunking; pass it through
out = new DechunkedOutputStream(out, _callback, false);
}
}
if (shouldCompress) {
beginProcessing();
}
@@ -252,7 +377,8 @@ class HTTPResponseOutputStream extends FilterOutputStream {
@Override
public void close() throws IOException {
if (_log.shouldInfo())
_log.info("Closing " + out + " compressed? " + shouldCompress(), new Exception("I did it"));
_log.info("Closing " + out + " headers written? " + _headerWritten + " compressed? " + shouldCompress() +
" keepaliveIn? " + _keepAliveIn + " keepaliveOut? " + _keepAliveOut, new Exception("I did it"));
synchronized(this) {
// synch with changing out field below
super.close();
@@ -260,7 +386,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
}
protected void beginProcessing() throws IOException {
OutputStream po = new GunzipOutputStream(out);
OutputStream po = new GunzipOutputStream(out, _callback);
synchronized(this) {
out = po;
}

View File

@@ -40,6 +40,7 @@ import net.i2p.i2ptunnel.localServer.LocalHTTPServer;
import net.i2p.util.ConvertToHash;
import net.i2p.util.DNSOverHTTPS;
import net.i2p.util.EventDispatcher;
import net.i2p.util.InternalSocket;
import net.i2p.util.Log;
import net.i2p.util.PortMapper;
@@ -98,6 +99,14 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
// overrides
private static final String PROP_UA_I2P = "httpclient.userAgent.i2p";
private static final String PROP_UA_CLEARNET = "httpclient.userAgent.outproxy";
public static final String OPT_KEEPALIVE_BROWSER = "keepalive.browser";
public static final String OPT_KEEPALIVE_I2P = "keepalive.i2p";
// how long to wait for another request on the same socket
// Firefox timeout appears to be about 114 seconds, so it will close before we do.
static final int BROWSER_KEEPALIVE_TIMEOUT = 2*60*1000;
private static final boolean DEFAULT_KEEPALIVE_BROWSER = true;
private static final boolean DEFAULT_KEEPALIVE_I2P = true;
/**
* These are backups if the xxx.ht error page is missing.
@@ -396,11 +405,39 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
String currentProxy = null;
long requestId = __requestId.incrementAndGet();
boolean shout = false;
boolean isConnect = false;
boolean isHead = false;
I2PSocket i2ps = null;
try {
s.setSoTimeout(INITIAL_SO_TIMEOUT);
out = s.getOutputStream();
InputReader reader = new InputReader(s.getInputStream());
int requestCount = 0;
// HTTP Persistent Connections (RFC 2616)
// for the local browser-to-client-proxy socket.
// Keep it very simple.
// Will be set to false for non-GET/HEAD, non-HTTP/1.1,
// Connection: close, InternalSocket,
// or after analysis of the response headers in HTTPResponseOutputStream,
// or on errors in I2PTunnelRunner.
boolean keepalive = getBooleanOption(OPT_KEEPALIVE_BROWSER, DEFAULT_KEEPALIVE_BROWSER) &&
!(s instanceof InternalSocket);
// indent
do { // while (keepalive)
// indent
if (requestCount > 0) {
try {
s.setSoTimeout(BROWSER_KEEPALIVE_TIMEOUT);
} catch (IOException ioe) {
if (_log.shouldInfo())
_log.info("Socket closed before request #" + requestCount);
return;
}
if (_log.shouldInfo())
_log.info("Keepalive, awaiting request #" + requestCount);
}
String line, method = null, protocol = null, host = null, destination = null;
String hostLowerCase = null;
StringBuilder newRequest = new StringBuilder();
@@ -422,10 +459,10 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
String lowercaseLine = line.toLowerCase(Locale.US);
if(method == null) { // first line (GET /base64/realaddr)
if(_log.shouldLog(Log.DEBUG)) {
_log.debug(getPrefix(requestId) + "First line [" + line + "]");
}
if(method == null) {
// first line GET/POST/etc.
if (_log.shouldInfo())
_log.info(getPrefix(requestId) + "req #" + requestCount + " first line [" + line + "]");
String[] params = DataHelper.split(line, " ", 3);
if(params.length != 3) {
@@ -472,12 +509,19 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
****/
}
method = params[0];
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
method = params[0].toUpperCase(Locale.US);
if (method.equals("HEAD")) {
isHead = true;
} else if (method.equals("CONNECT")) {
// this makes things easier later, by spoofing a
// protocol so the URI parser find the host and port
// For in-net outproxy, will be fixed up below
request = "https://" + request + '/';
isConnect = true;
keepalive = false;
} else if (!method.equals("GET")) {
// POST, PUT, ...
keepalive = false;
}
// Now use the Java URI parser
@@ -559,6 +603,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
String protocolVersion = params[2];
if (!protocolVersion.equals("HTTP/1.1"))
keepalive = false;
protocol = requestURI.getScheme();
host = requestURI.getHost();
@@ -641,8 +687,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
break;
}
******/
} else if ("https".equals(protocol) ||
method.toUpperCase(Locale.US).equals("CONNECT")) {
} else if ("https".equals(protocol) || isConnect) {
remotePort = 443;
} else {
remotePort = 80;
@@ -806,19 +851,21 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
host = getHostName(addressHelper);
}
// now strip everything but path and query from URI
targetRequest = requestURI.toASCIIString();
String newURI = requestURI.getRawPath();
if(query != null) {
newURI += '?' + query;
}
try {
requestURI = new URI(newURI);
} catch(URISyntaxException use) {
// shouldnt happen
_log.warn(request, use);
method = null;
break;
if (!isConnect) {
// now strip everything but path and query from URI
String newURI = requestURI.getRawPath();
if(query != null) {
newURI += '?' + query;
}
try {
requestURI = new URI(newURI);
} catch(URISyntaxException use) {
// shouldnt happen
_log.warn(request, use);
method = null;
break;
}
}
// end of (host endsWith(".i2p"))
@@ -844,8 +891,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
int rPort = requestURI.getPort();
if (rPort > 0)
remotePort = rPort;
else if ("https".equals(protocol) ||
method.toUpperCase(Locale.US).equals("CONNECT"))
else if ("https".equals(protocol) || isConnect)
remotePort = 443;
else
remotePort = 80;
@@ -864,8 +910,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if(_log.shouldLog(Log.DEBUG)) {
_log.debug("Before selecting outproxy for " + host);
}
if ("https".equals(protocol) ||
method.toUpperCase(Locale.US).equals("CONNECT"))
if ("https".equals(protocol) || isConnect)
currentProxy = selectSSLProxy(hostLowerCase);
else
currentProxy = selectProxy(hostLowerCase);
@@ -921,16 +966,22 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
break;
}
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
if (isConnect) {
// fix up the change to requestURI above to get back to the original host:port
line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion;
if (usingInternalOutproxy || usingWWWProxy)
line = method + ' ' + requestURI.getHost() + ':' + requestURI.getPort() + ' ' + protocolVersion;
else
line = method + ' ' + host + ':' + remotePort + ' ' + protocolVersion;
} else {
line = method + ' ' + requestURI.toASCIIString() + ' ' + protocolVersion;
}
if(_log.shouldLog(Log.DEBUG)) {
_log.debug(getPrefix(requestId) + "REQ : \"" + request + "\"");
_log.debug(getPrefix(requestId) + "REQURI: \"" + requestURI + "\"");
_log.debug(getPrefix(requestId) + "NEWREQ: \"" + line + "\"");
_log.debug(getPrefix(requestId) + "HOST : \"" + host + "\"");
_log.debug(getPrefix(requestId) + "RPORT : \"" + remotePort + "\"");
_log.debug(getPrefix(requestId) + "DEST : \"" + destination + "\"");
}
@@ -941,11 +992,22 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if (lowercaseLine.contains("upgrade")) {
// pass through for websocket
preserveConnectionHeader = true;
keepalive = false;
} else if (lowercaseLine.contains("keep-alive")) {
// pass through
if (!keepalive)
continue;
// pass through
preserveConnectionHeader = true;
} else {
if (lowercaseLine.contains("close"))
keepalive = false;
continue;
}
} else if (lowercaseLine.startsWith("keep-alive: ") ||
lowercaseLine.startsWith("proxy-connection: ")) {
if (lowercaseLine.contains("close"))
keepalive = false;
continue;
} else if (lowercaseLine.startsWith("host: ") && !usingWWWProxy && !usingInternalOutproxy) {
// Note that we only pass the original Host: line through to the outproxy
@@ -1053,8 +1115,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if(ok != null) {
gzip = Boolean.parseBoolean(ok);
}
if(gzip && !usingInternalServer &&
!method.toUpperCase(Locale.US).equals("CONNECT")) {
if(gzip && !usingInternalServer && !isConnect) {
// according to rfc2616 s14.3, this *should* force identity, even if
// an explicit q=0 for gzip doesn't. tested against orion.i2p, and it
// seems to work.
@@ -1063,7 +1124,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
if (!usingInternalOutproxy)
newRequest.append("X-Accept-Encoding: x-i2p-gzip;q=1.0, identity;q=0.5, deflate;q=0, gzip;q=0, *;q=0\r\n");
}
if(!shout && !method.toUpperCase(Locale.US).equals("CONNECT")) {
if(!shout && !isConnect) {
if(!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_USER_AGENT))) {
// let's not advertise to external sites that we are from I2P
String ua;
@@ -1110,12 +1171,16 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
} // end header processing
if(_log.shouldLog(Log.DEBUG)) {
_log.debug(getPrefix(requestId) + "NewRequest header: [" + newRequest.toString() + "]");
}
if (newRequest.length() > 0 && _log.shouldDebug())
_log.debug(getPrefix(requestId) + "NewRequest header: [" + newRequest + ']');
if(method == null || (destination == null && !usingInternalOutproxy)) {
//l.log("No HTTP method found in the request.");
if (requestCount > 0) {
// SocketTimeout, normal to get here for persistent connections,
// because DataHelper.readLine() returns null on EOF
return;
}
_log.debug("No HTTP method found in the request.");
try {
if (protocol != null && "http".equals(protocol.toLowerCase(Locale.US))) {
out.write(getErrorPage("denied", ERR_REQUEST_DENIED).getBytes("UTF-8"));
@@ -1134,6 +1199,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
// Authorization
// Yes, this is sent and checked for every request on a persistent connection
AuthResult result = authorize(s, requestId, method, authorization);
if (result != AuthResult.AUTH_GOOD) {
if(_log.shouldLog(Log.WARN)) {
@@ -1175,7 +1241,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy, currentProxy, requestId);
byte[] data;
byte[] response;
if (method.toUpperCase(Locale.US).equals("CONNECT")) {
if (isConnect) {
data = null;
response = SUCCESS_RESPONSE.getBytes("UTF-8");
} else {
@@ -1320,7 +1386,7 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
}
// as of 0.9.35, allowInternalSSL defaults to true, and overridden to true unless PROP_SSL_SET is set
if (method.toUpperCase(Locale.US).equals("CONNECT") &&
if (isConnect &&
!usingWWWProxy &&
getTunnel().getClientOptions().getProperty(PROP_SSL_SET) != null &&
!Boolean.parseBoolean(getTunnel().getClientOptions().getProperty(PROP_INTERNAL_SSL, "true"))) {
@@ -1368,33 +1434,51 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
return;
}
Properties opts = new Properties();
//opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000);
// 1 == disconnect. see ConnectionOptions in the new streaming lib, which i
// dont want to hard link to here
//opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1);
I2PSocketOptions sktOpts;
try {
sktOpts = getDefaultOptions(opts);
} catch (RuntimeException re) {
// tunnel build failure
StringBuilder buf = new StringBuilder(128);
buf.append("HTTP/1.1 503 Service Unavailable");
if (re.getMessage() != null)
buf.append(" - ").append(re.getMessage());
buf.append("\r\n\r\n");
// Close persistent I2PSocket if destination or port changes
// and open a new one.
// We do not maintain a pool of open I2PSockets or look for
// an available one. Keep it very simple.
// As long as the traffic keeps going to the same place
// we will keep reusing it.
// While we should be able to reuse it if only the port changes,
// that should be extremely rare, so don't bother.
// For common use patterns including outproxy use,
// this should still be quite effective.
if (i2ps == null || i2ps.isClosed() ||
remotePort != i2ps.getPort() ||
!clientDest.equals(i2ps.getPeerDestination())) {
if (i2ps != null) {
if (_log.shouldInfo())
_log.info("Old socket closed or different dest/port, opening new one");
try { i2ps.close(); } catch (IOException ioe) {}
}
Properties opts = new Properties();
//opts.setProperty("i2p.streaming.inactivityTimeout", ""+120*1000);
// 1 == disconnect. see ConnectionOptions in the new streaming lib, which i
// dont want to hard link to here
//opts.setProperty("i2p.streaming.inactivityTimeoutAction", ""+1);
I2PSocketOptions sktOpts;
try {
out.write(buf.toString().getBytes("UTF-8"));
} catch (IOException ioe) {}
throw re;
sktOpts = getDefaultOptions(opts);
} catch (RuntimeException re) {
// tunnel build failure
StringBuilder buf = new StringBuilder(128);
buf.append("HTTP/1.1 503 Service Unavailable");
if (re.getMessage() != null)
buf.append(" - ").append(re.getMessage());
buf.append("\r\n\r\n");
try {
out.write(buf.toString().getBytes("UTF-8"));
} catch (IOException ioe) {}
throw re;
}
if (remotePort > 0)
sktOpts.setPort(remotePort);
i2ps = createI2PSocket(clientDest, sktOpts);
}
if (remotePort > 0)
sktOpts.setPort(remotePort);
i2ps = createI2PSocket(clientDest, sktOpts);
boolean isConnect = method.toUpperCase(Locale.US).equals("CONNECT");
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy,
currentProxy, requestId, hostLowerCase, isConnect);
I2PTunnelRunner t;
I2PTunnelHTTPClientRunner hrunner = null;
if (isConnect) {
byte[] data;
byte[] response;
@@ -1409,7 +1493,12 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
t = new I2PTunnelRunner(s, i2ps, sockLock, data, response, mySockets, (OnTimeout) null);
} else {
byte[] data = newRequest.toString().getBytes("ISO-8859-1");
t = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout);
OnTimeout onTimeout = new OnTimeout(s, s.getOutputStream(), targetRequest, usingWWWProxy,
currentProxy, requestId, hostLowerCase, isConnect);
boolean keepaliveI2P = keepalive && getBooleanOption(OPT_KEEPALIVE_I2P, DEFAULT_KEEPALIVE_I2P);
hrunner = new I2PTunnelHTTPClientRunner(s, i2ps, sockLock, data, mySockets, onTimeout,
keepaliveI2P, keepalive, isHead);
t = hrunner;
}
if (usingWWWProxy) {
t.setSuccessCallback(new OnProxySuccess(currentProxy, hostLowerCase, isConnect));
@@ -1417,7 +1506,26 @@ public class I2PTunnelHTTPClient extends I2PTunnelHTTPClientBase implements Runn
// we are called from an unlimited thread pool, so run inline
//t.start();
t.run();
// I2PTunnelHTTPClientRunner spins off the browser-to-i2p thread and keeps
// the i2p-to-socket copier in-line. So we won't get here until the i2p socket is closed.
// check if whatever was in the response does not allow keepalive
if (keepalive && hrunner != null && !hrunner.getKeepAliveSocket())
break;
// The old I2P socket was closed, null it out so we'll get a new one
// next time around
if (hrunner != null && !hrunner.getKeepAliveI2P())
i2ps = null;
// go around again
requestCount++;
// indent
} while (keepalive);
// indent
} catch(IOException ex) {
// This is normal for keepalive when the browser closed the socket,
// or a SocketTimeoutException if we gave up first
if(_log.shouldLog(Log.INFO)) {
_log.info(getPrefix(requestId) + "Error trying to connect", ex);
}

View File

@@ -8,8 +8,10 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.util.I2PAppThread;
/**
* Override the response with a stream filtering the HTTP headers
@@ -23,53 +25,186 @@ import net.i2p.client.streaming.I2PSocket;
* Warning - not maintained as a stable API for external use.
*/
public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
private HTTPResponseOutputStream _hout;
private final boolean _isHead;
/**
* Does NOT start itself. Caller must call start().
*
* @deprecated use other constructor
*/
@Deprecated
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
List<I2PSocket> sockList, FailCallback onFail) {
super(s, i2ps, slock, initialI2PData, null, sockList, onFail);
_isHead = false;
}
/**
* Does NOT start itself. Caller must call start().
*
* @param allowKeepAliveI2P we may, but are not required to, keep the I2P socket alive
* - Requires allowKeepAliveSocket
* @param allowKeepAliveSocket we may, but are not required to, keep the browser-side socket alive
* NO data will be forwarded from the socket to the i2psocket other than
* initialI2PData if this is true.
* @param isHead is this a response to a HEAD, and thus no data is expected (RFC 2616 sec. 4.4)
* @since 0.9.62
*/
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
List<I2PSocket> sockList, FailCallback onFail,
boolean allowKeepAliveI2P,
boolean allowKeepAliveSocket, boolean isHead) {
super(s, i2ps, slock, initialI2PData, null, sockList, onFail, allowKeepAliveI2P, allowKeepAliveSocket);
if (allowKeepAliveI2P && !allowKeepAliveSocket)
throw new IllegalArgumentException();
_isHead = isHead;
}
/**
* Only call once!
*
* @return an HTTPResponseOutputStream
* @throws IllegalStateException if called again
*/
@Override
protected OutputStream getSocketOut() throws IOException {
if (_hout != null)
throw new IllegalStateException("already called");
OutputStream raw = super.getSocketOut();
return new HTTPResponseOutputStream(raw);
_hout = new HTTPResponseOutputStream(raw, super.getKeepAliveI2P(), super.getKeepAliveSocket(), _isHead, this);
return _hout;
}
/**
* Should we keep the local browser socket open when done?
* @since 0.9.62
*/
@Override
boolean getKeepAliveSocket() {
return _hout != null && _hout.getKeepAliveOut() && super.getKeepAliveSocket();
}
/**
* Why is this overridden?
* Why flush in super but not here?
* Why do things in different order than in super?
* Should we keep the I2P socket open when done?
* @since 0.9.62
*/
@Override
boolean getKeepAliveI2P() {
return _hout != null && _hout.getKeepAliveIn() && super.getKeepAliveI2P();
}
/**
* May not actually close either socket, depending on keepalive settings.
*
* @param out may be null
* @param in may be null
* @param i2pout may be null
* @param i2pin may be null
* @param s non-null
* @param i2ps non-null
* @param t1 may be null
* @param t2 may be null, ignored, we only join t1
*/
@Override
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
if (i2pin != null) { try {
i2pin.close();
} catch (IOException ioe) {} }
if (i2pout != null) { try {
i2pout.close();
} catch (IOException ioe) {} }
if (in != null) { try {
in.close();
} catch (IOException ioe) {} }
boolean keepaliveSocket = getKeepAliveSocket();
boolean keepaliveI2P = getKeepAliveI2P();
boolean threadI2PClose = keepaliveSocket && !keepaliveI2P && i2pout != null && !i2ps.isClosed();
if (_log.shouldInfo())
_log.info("Closing HTTPClientRunner keepaliveI2P? " + keepaliveI2P + " keepaliveSocket? " + keepaliveSocket +
" threadedClose? " + threadI2PClose, new Exception("I did it"));
if (threadI2PClose) {
// Thread the I2P stream/socket closing, because it is blocking, may take several seconds,
// and we don't want to delay the next request
Thread t = new I2PSocketCloser(i2pin, i2pout, i2ps);
TunnelControllerGroup tcg = TunnelControllerGroup.getInstance();
if (tcg != null) {
try {
tcg.getClientExecutor().execute(t);
} catch (RejectedExecutionException ree) {}
} else {
t.start();
}
} else {
if (!keepaliveI2P) {
if (i2pin != null) { try {
i2pin.close();
} catch (IOException ioe) {} }
}
if (i2pout != null) { try {
if (keepaliveI2P)
i2pout.flush();
else
i2pout.close();
} catch (IOException ioe) {} }
}
if (!keepaliveSocket) {
if (in != null) { try {
in.close();
} catch (IOException ioe) {} }
}
if (out != null) { try {
out.close();
if (keepaliveSocket)
out.flush();
else
out.close();
} catch (IOException ioe) {} }
try {
i2ps.close();
} catch (IOException ioe) {}
try {
s.close();
} catch (IOException ioe) {}
if (!threadI2PClose && !keepaliveI2P) {
try {
i2ps.close();
} catch (IOException ioe) {}
}
if (!keepaliveSocket) {
try {
s.close();
} catch (IOException ioe) {}
}
if (t1 != null)
t1.join(30*1000);
// t2 = fromI2P now run inline
//t2.join(30*1000);
}
/**
* Thread the I2P socket close, so we don't hold up
* the next request if the browser socket is keepalive.
*
* @since 0.9.xx
*/
private class I2PSocketCloser extends I2PAppThread {
private final InputStream in;
private final OutputStream out;
private final I2PSocket s;
/**
* @param in may be null
* @param out non-null
* @param i2ps non-null
*/
public I2PSocketCloser(InputStream i2pin, OutputStream i2pout, I2PSocket i2ps) {
in = i2pin;
out = i2pout;
s = i2ps;
}
@Override
public void run() {
if (in != null) {
try {
in.close();
} catch (IOException ioe) {}
}
try {
out.close();
} catch (IOException ioe) {}
try {
s.close();
} catch (IOException ioe) {}
//_log.info("(threaded) i2p socket closed");
}
}
}

View File

@@ -20,6 +20,7 @@ import net.i2p.client.streaming.I2PSocket;
import net.i2p.client.streaming.I2PSocketException;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
import net.i2p.i2ptunnel.util.LimitOutputStream.DoneCallback;
import net.i2p.util.ByteCache;
import net.i2p.util.Clock;
import net.i2p.util.I2PAppThread;
@@ -27,11 +28,14 @@ import net.i2p.util.InternalSocket;
import net.i2p.util.Log;
/**
* A thread that starts two more threads, one to forward traffic in each direction.
* A thread that starts one more thread if keepAliveSocket is false,
* to forward traffic in each direction.
* When keepAliveSocket is true, we do not expect additional data and do not
* need a forwarding thread from the socket to I2P.
*
* Warning - not maintained as a stable API for external use.
*/
public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener {
public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErrorListener, DoneCallback {
protected final Log _log;
private static final AtomicLong __runnerId = new AtomicLong();
@@ -52,8 +56,6 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private volatile boolean finished;
private final byte[] initialI2PData;
private final byte[] initialSocketData;
/** when the last data was sent/received (or -1 if never) */
private long lastActivityOn;
/** when the runner started up */
private final long startedOn;
private final List<I2PSocket> sockList;
@@ -65,6 +67,10 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private long totalSent;
// does not include initialSocketData
private long totalReceived;
// not final, may be changed by extending classes
protected volatile boolean _keepAliveI2P, _keepAliveSocket;
private StreamForwarder toI2P;
private StreamForwarder fromI2P;
/**
* For use in new constructor
@@ -166,6 +172,29 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false);
}
/**
* With keepAlive args. Does NOT start itself. Caller must call start().
*
* @param slock the socket lock, non-null
* @param initialI2PData may be null
* @param initialSocketData may be null
* @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
* Will synchronize on slock when removing.
* @param onFail May be null. If non-null and no data (except initial data) was received,
* it will be run before closing s.
* @param keepAliveI2P Do not close the I2P socket when done.
* @param keepAliveSocket Do not close the local socket when done.
* For client side only; must be false for server side.
* NO data will be forwarded from the socket to the i2psocket other than
* initialI2PData if this is true.
* @since 0.9.62
*/
public I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
byte[] initialSocketData, List<I2PSocket> sockList, FailCallback onFail,
boolean keepAliveI2P, boolean keepAliveSocket) {
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, keepAliveI2P, keepAliveSocket, false);
}
/**
* Base constructor
*
@@ -182,6 +211,33 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout,
FailCallback onFail, boolean shouldStart) {
this(s, i2ps, slock, initialI2PData, initialSocketData, sockList, null, onFail, false, false, shouldStart);
}
/**
* Base constructor with keepAlive args
*
* @param slock the socket lock, non-null
* @param initialI2PData may be null
* @param initialSocketData may be null
* @param sockList may be null. Caller must add i2ps to the list! It will be removed here on completion.
* Will synchronize on slock when removing.
* @param onTimeout May be null. If non-null and no data (except initial data) was received,
* it will be run before closing s.
* @param onFail Trumps onTimeout
* @param shouldStart should thread be started in constructor (bad, false recommended)
* @param keepAliveI2P Do not close the I2P socket when done.
* @param keepAliveSocket Do not close the local socket when done.
* For client side only; must be false for server side.
* NO data will be forwarded from the socket to the i2psocket other than
* initialI2PData if this is true.
* @since 0.9.62
*/
private I2PTunnelRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData,
byte[] initialSocketData, List<I2PSocket> sockList, Runnable onTimeout,
FailCallback onFail,
boolean keepAliveI2P, boolean keepAliveSocket,
boolean shouldStart) {
this.sockList = sockList;
this.s = s;
this.i2ps = i2ps;
@@ -190,15 +246,17 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
this.initialSocketData = initialSocketData;
this.onTimeout = onTimeout;
_onFail = onFail;
lastActivityOn = -1;
startedOn = Clock.getInstance().now();
_log = I2PAppContext.getGlobalContext().logManager().getLog(getClass());
_keepAliveI2P = keepAliveI2P;
_keepAliveSocket = keepAliveSocket;
if (_log.shouldLog(Log.INFO))
_log.info("I2PTunnelRunner started");
_runnerId = __runnerId.incrementAndGet();
setName("I2PTunnelRunner " + _runnerId);
if (shouldStart)
if (shouldStart) {
setName("I2PTunnelRunner " + _runnerId);
start();
}
}
/**
@@ -221,15 +279,9 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
*/
@Deprecated
public long getLastActivityOn() {
return lastActivityOn;
return -1L;
}
/****
private void updateActivity() {
lastActivityOn = Clock.getInstance().now();
}
****/
/**
* When this runner started up transferring data
*
@@ -251,6 +303,50 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
protected InputStream getSocketIn() throws IOException { return s.getInputStream(); }
protected OutputStream getSocketOut() throws IOException { return s.getOutputStream(); }
/**
* Should we keep the I2P socket open when done?
* On the client side, only true if the browser and the server side support it.
* On the server side, only true if the client supports it.
* @since 0.9.62
*/
boolean getKeepAliveI2P() {
return _keepAliveI2P;
}
/**
* Should we keep the local browser/server socket open when done?
* Usually true for client side.
* Always false for server side.
* @since 0.9.62
*/
boolean getKeepAliveSocket() {
return _keepAliveSocket;
}
/**
* The DoneCallback for the I2P socket.
*
* @since 0.9.62
*/
public void streamDone() {
if (_keepAliveSocket && fromI2P != null) {
// we are client-side
// tell the from-I2P runner
if (_log.shouldInfo())
_log.info("Stream done from I2P", new Exception("I did it"));
fromI2P.done = true;
} else if (_keepAliveI2P && toI2P != null) {
// we are server-side
// tell the to-I2P runner
if (_log.shouldInfo())
_log.info("Stream done from Server", new Exception("I did it"));
toI2P.done = true;
} else {
if (_log.shouldWarn())
_log.info("Unexpected stream done", new Exception("I did it"));
}
}
private static final byte[] POST = { 'P', 'O', 'S', 'T', ' ' };
private static final byte[] PUT = { 'P', 'U', 'T', ' ' };
@@ -263,10 +359,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
OutputStream out = null;
InputStream i2pin = null;
OutputStream i2pout = null;
StreamForwarder toI2P = null;
StreamForwarder fromI2P = null;
try {
in = getSocketIn();
out = getSocketOut(); // = new BufferedOutputStream(s.getOutputStream(), NETWORK_BUFFER_SIZE);
// unimplemented in streaming
//i2ps.setSocketErrorListener(this);
@@ -299,15 +392,25 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// this does not increment totalReceived
out.write(initialSocketData);
}
if (_log.shouldLog(Log.DEBUG))
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("Initial data " + (initialI2PData != null ? initialI2PData.length : 0)
+ " written to I2P, " + (initialSocketData != null ? initialSocketData.length : 0)
+ " written to the socket, starting forwarders");
if (!(s instanceof InternalSocket))
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
toI2P = new StreamForwarder(in, i2pout, true, null);
}
if (_keepAliveSocket) {
// standard GET or HEAD, no data, do not thread a forwarder
// because we don't need it and
// we don't want it to swallow the next request
} else {
in = getSocketIn();
// InternalSocket already has buffering
if (!(s instanceof InternalSocket))
in = new BufferedInputStream(in, 2*NETWORK_BUFFER_SIZE);
toI2P = new StreamForwarder(in, i2pout, true, null);
toI2P.start();
}
fromI2P = new StreamForwarder(i2pin, out, false, _onSuccess);
toI2P.start();
// We are already a thread, so run the second one inline
//fromI2P.start();
fromI2P.run();
@@ -330,7 +433,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// HTTPClient never sets initialSocketData.
if (_onFail != null) {
Exception e = fromI2P.getFailure();
if (e == null)
if (e == null && toI2P != null)
e = toI2P.getFailure();
_onFail.onFail(e);
} else {
@@ -339,7 +442,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} else {
// Detect a reset on one side, and propagate to the other
Exception e1 = fromI2P.getFailure();
Exception e2 = toI2P.getFailure();
Exception e2 = toI2P != null ? toI2P.getFailure() : null;
Throwable c1 = e1 != null ? e1.getCause() : null;
Throwable c2 = e2 != null ? e2.getCause() : null;
if (c1 != null && c1 instanceof I2PSocketException) {
@@ -365,11 +468,17 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} catch (InterruptedException ex) {
if (_log.shouldLog(Log.ERROR))
_log.error("Interrupted", ex);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (SSLException she) {
_log.error("SSL error", she);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (IOException ex) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Error forwarding", ex);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (IllegalStateException ise) {
// JamVM (Gentoo: jamvm-1.5.4, gnu-classpath-0.98+gmp)
//java.nio.channels.NotYetConnectedException
@@ -384,9 +493,13 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// at net.i2p.i2ptunnel.I2PTunnelRunner.run(I2PTunnelRunner.java:167)
if (_log.shouldLog(Log.WARN))
_log.warn("gnu?", ise);
_keepAliveI2P = false;
_keepAliveSocket = false;
} catch (RuntimeException e) {
if (_log.shouldLog(Log.ERROR))
_log.error("Internal error", e);
_keepAliveI2P = false;
_keepAliveSocket = false;
} finally {
removeRef();
if (i2pReset) {
@@ -401,6 +514,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
try {
i2ps.close();
} catch (IOException ioe) {}
_keepAliveI2P = false;
_keepAliveSocket = false;
} else if (sockReset) {
if (_log.shouldWarn())
_log.warn("Got socket reset, resetting I2P socket");
@@ -410,6 +525,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
try {
s.close();
} catch (IOException ioe) {}
_keepAliveI2P = false;
_keepAliveSocket = false;
} else {
// now one connection is dead - kill the other as well, after making sure we flush
try {
@@ -418,14 +535,18 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
}
}
}
/**
* Warning - overridden in I2PTunnelHTTPClientRunner.
* Here we ignore keepalive and always close both sides.
* The HTTP flavor handles keepalive.
*
* @param out may be null
* @param in may be null
* @param i2pout may be null
* @param i2pin may be null
* @param t1 may be null
* @param t2 may be null
* @param t2 may be null, ignored, we only join t1
*/
protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin,
Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException {
@@ -451,20 +572,20 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
} catch (IOException ioe) {}
if (t1 != null)
t1.join(30*1000);
// t2 = fromI2P now run inline
//t2.join(30*1000);
}
/**
* Deprecated, unimplemented in streaming, never called.
* @deprecated unused
*/
@Deprecated
public void errorOccurred() {
synchronized (finishLock) {
finished = true;
finishLock.notifyAll();
}
}
private void removeRef() {
if (sockList != null) {
synchronized (slock) {
@@ -472,7 +593,7 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
}
}
}
/**
* Forward data in one direction
*/
@@ -485,6 +606,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
private final ByteCache _cache;
private final SuccessCallback _callback;
private volatile Exception _failure;
// does not need to be volatile, will be set from same thread
public boolean done;
/**
* Does not start itself. Caller must start()
@@ -497,7 +620,8 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
_callback = cb;
direction = (toI2P ? "toI2P" : "fromI2P");
_cache = ByteCache.getInstance(32, NETWORK_BUFFER_SIZE);
setName("StreamForwarder " + _runnerId + '.' + direction);
if (toI2P)
setName("StreamForwarder " + _runnerId + '.' + direction);
}
@Override
@@ -510,15 +634,11 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
+ from + " and " + to);
}
// boo, hiss! shouldn't need this - the streaming lib should be configurable, but
// somehow the inactivity timer is sometimes failing to get triggered properly
//i2ps.setReadTimeout(2*60*1000);
ByteArray ba = _cache.acquire();
byte[] buffer = ba.getData(); // new byte[NETWORK_BUFFER_SIZE];
byte[] buffer = ba.getData();
try {
int len;
while ((len = in.read(buffer)) != -1) {
while (!done && (len = in.read(buffer)) != -1) {
if (len > 0) {
out.write(buffer, 0, len);
if (_toI2P) {
@@ -583,15 +703,28 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
_failure = ex;
} finally {
_cache.release(ba);
if (_log.shouldLog(Log.INFO)) {
_log.info(direction + ": done forwarding between "
+ from + " and " + to);
boolean keepAliveFrom, keepAliveTo;
if (_toI2P) {
keepAliveFrom = _keepAliveSocket;
keepAliveTo = _keepAliveI2P;
} else {
keepAliveFrom = _keepAliveI2P;
keepAliveTo = _keepAliveSocket;
}
try {
in.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error closing input stream", ex);
if (_log.shouldLog(Log.INFO)) {
_log.info(direction + ": done forwarding from "
+ from + " to " + to
+ " keepalive from? " + keepAliveFrom
+ " keepalive to? " + keepAliveTo
+ " bytes: " + (_toI2P ? totalSent : totalReceived));
}
if (!keepAliveFrom) {
try {
in.close();
} catch (IOException ex) {
if (_log.shouldLog(Log.WARN))
_log.warn(direction + ": Error closing input stream", ex);
}
}
try {
// Thread must close() before exiting for a PipedOutputStream,
@@ -601,10 +734,17 @@ public class I2PTunnelRunner extends I2PAppThread implements I2PSocket.SocketErr
// DON'T close if we have a timeout job and we haven't received anything,
// or else the timeout job can't write the error message to the stream.
// close() above will close it after the timeout job is run.
if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0))
out.close();
else if (_log.shouldLog(Log.INFO))
_log.info(direction + ": not closing so we can write the error message");
if (!((onTimeout != null || _onFail != null) && (!_toI2P) && totalReceived <= 0)) {
if (keepAliveTo)
out.flush();
else
out.close();
} else {
if (_log.shouldInfo())
_log.info(direction + ": not closing so we can write the error message");
if (keepAliveTo)
out.flush();
}
} catch (IOException ioe) {
if (_log.shouldLog(Log.DEBUG))
_log.debug(direction + ": Error flushing to close", ioe);

View File

@@ -3,6 +3,8 @@
*/
package net.i2p.i2ptunnel;
import java.util.Properties;
import net.i2p.client.I2PSession;
import net.i2p.util.EventDispatcher;
import net.i2p.util.EventDispatcherImpl;
@@ -111,6 +113,17 @@ public abstract class I2PTunnelTask extends EventDispatcherImpl {
getTunnel().removeSession(session);
}
/**
* @since 0.9.62
*/
protected boolean getBooleanOption(String opt, boolean dflt) {
Properties opts = getTunnel().getClientOptions();
String o = opts.getProperty(opt);
if (o != null)
return Boolean.parseBoolean(o);
return dflt;
}
/**
* Does nothing here. Extending classes may override.
*/