SAM v3.3: Fixes after testing

- Fix master acceptor
- Clean up error message generation
- Add basic master session test for SAMStreamSink
This commit is contained in:
zzz
2016-02-06 13:44:08 +00:00
parent 62ad7996f1
commit 68c617950c
5 changed files with 113 additions and 51 deletions

View File

@@ -18,6 +18,7 @@ import net.i2p.I2PException;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.client.streaming.I2PServerSocket;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.DataFormatException;
import net.i2p.data.DataHelper;
@@ -327,13 +328,15 @@ class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, S
}
public void run() {
while (!stop && getSocketServer() != null) {
if (_log.shouldWarn())
_log.warn("Stream acceptor started");
final I2PServerSocket i2pss = socketMgr.getServerSocket();
while (!stop) {
// wait and accept a connection from I2P side
I2PSocket i2ps;
try {
i2ps = getSocketServer().accept();
if (i2ps == null)
i2ps = i2pss.accept();
if (i2ps == null) // never null as of 0.9.17
continue;
} catch (SocketTimeoutException ste) {
continue;

View File

@@ -51,6 +51,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
protected final long _id;
private static final AtomicLong __id = new AtomicLong();
private static final int FIRST_READ_TIMEOUT = 60*1000;
protected static final String SESSION_ERROR = "SESSION STATUS RESULT=I2P_ERROR";
/**
* Create a new SAM version 1 handler. This constructor expects
@@ -132,7 +133,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
ReadLine.readLine(sock, buf, gotFirstLine ? 0 : FIRST_READ_TIMEOUT);
sock.setSoTimeout(0);
} catch (SocketTimeoutException ste) {
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
writeString(SESSION_ERROR, "command timeout, bye");
break;
}
msg = buf.toString();
@@ -222,19 +223,19 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
|| (streamSession != null)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
return writeString(SESSION_ERROR, "Session already exists");
}
if (props.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in SESSION CREATE message");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
return writeString(SESSION_ERROR, "No parameters for SESSION CREATE");
}
dest = (String) props.remove("DESTINATION");
if (dest == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION DESTINATION parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
return writeString(SESSION_ERROR, "DESTINATION not specified");
}
String destKeystream = null;
@@ -264,7 +265,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (style == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION STYLE parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
return writeString(SESSION_ERROR, "No SESSION STYLE specified");
}
// Unconditionally override what the client may have set
@@ -288,7 +289,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
&& !dir.equals("BOTH")) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unknown DIRECTION parameter value: [" + dir + "]");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown DIRECTION parameter\"\n");
return writeString(SESSION_ERROR, "Unknown DIRECTION parameter");
}
streamSession = newSAMStreamSession(destKeystream, dir,props);
@@ -296,7 +297,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE");
}
return writeString("SESSION STATUS RESULT=OK DESTINATION="
+ dest + "\n");
@@ -304,22 +305,22 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
return writeString(SESSION_ERROR, "Unrecognized opcode");
}
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid destination specified");
return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage());
} catch (I2PSessionException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (SAMException e) {
_log.error("Unexpected SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
}
}
@@ -1012,6 +1013,18 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
return rv;
}
/**
* Write a string and message, escaping the message.
* Writes s + createMessageString(msg) + \n
*
* @param s The string, non-null
* @param s The message may be null
* @since 0.9.25
*/
protected boolean writeString(String s, String msg) {
return writeString(s + createMessageString(msg) + '\n');
}
public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
if (streamSession == null) {

View File

@@ -56,6 +56,7 @@ class SAMv3Handler extends SAMv1Handler
private long _lastPing;
private static final int FIRST_READ_TIMEOUT = 60*1000;
private static final int READ_TIMEOUT = 3*60*1000;
private static final String AUTH_ERROR = "AUTH STATUS RESULT=I2P_ERROR";
/**
* Create a new SAM version 3 handler. This constructor expects
@@ -196,7 +197,7 @@ class SAMv3Handler extends SAMv1Handler
if (now - _lastPing >= READ_TIMEOUT) {
if (_log.shouldWarn())
_log.warn("Failed to respond to PING");
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
writeString(SESSION_ERROR, "PONG timeout");
break;
}
} else {
@@ -211,13 +212,13 @@ class SAMv3Handler extends SAMv1Handler
if (now - _lastPing >= 2*READ_TIMEOUT) {
if (_log.shouldWarn())
_log.warn("Failed to respond to PING");
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n");
writeString(SESSION_ERROR, "PONG timeout");
break;
}
} else if (_lastPing < 0) {
if (_log.shouldWarn())
_log.warn("2nd timeout");
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
writeString(SESSION_ERROR, "command timeout, bye");
break;
} else {
// don't clear buffer, don't send ping,
@@ -238,7 +239,7 @@ class SAMv3Handler extends SAMv1Handler
ReadLine.readLine(socket, buf, gotFirstLine ? 0 : FIRST_READ_TIMEOUT);
socket.setSoTimeout(0);
} catch (SocketTimeoutException ste) {
writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"command timeout, bye\"\n");
writeString(SESSION_ERROR, "command timeout, bye");
break;
}
line = buf.toString();
@@ -275,7 +276,7 @@ class SAMv3Handler extends SAMv1Handler
if (opcode == null) {
// This is not a correct message, for sure
if (writeString(domain + " STATUS RESULT=I2P_ERROR MESSAGE=\"command not specified\"\n"))
if (writeString(domain + " STATUS RESULT=I2P_ERROR", "command not specified"))
continue;
else
break;
@@ -398,11 +399,11 @@ class SAMv3Handler extends SAMv1Handler
String nick = (String) props.remove("ID");
if (nick == null)
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n");
return writeString(SESSION_ERROR, "ID not specified");
String style = (String) props.remove("STYLE");
if (style == null && !opcode.equals("REMOVE"))
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n");
return writeString(SESSION_ERROR, "No SESSION STYLE specified");
try{
if (opcode.equals("CREATE")) {
@@ -410,19 +411,19 @@ class SAMv3Handler extends SAMv1Handler
|| (this.getStreamSession() != null)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
return writeString(SESSION_ERROR, "Session already exists");
}
if (props.isEmpty()) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("No parameters specified in SESSION CREATE message");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No parameters for SESSION CREATE\"\n");
return writeString(SESSION_ERROR, "No parameters for SESSION CREATE");
}
dest = (String) props.remove("DESTINATION");
if (dest == null) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION DESTINATION parameter not specified");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"DESTINATION not specified\"\n");
return writeString(SESSION_ERROR, "DESTINATION not specified");
}
if (dest.equals("TRANSIENT")) {
@@ -433,8 +434,8 @@ class SAMv3Handler extends SAMv1Handler
if (sigTypeStr != null) {
sigType = SigType.parseSigType(sigTypeStr);
if (sigType == null) {
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"SIGNATURE_TYPE "
+ sigTypeStr + " unsupported\"\n");
return writeString(SESSION_ERROR, "SIGNATURE_TYPE "
+ sigTypeStr + " unsupported");
}
} else {
sigType = SigType.DSA_SHA1;
@@ -511,7 +512,7 @@ class SAMv3Handler extends SAMv1Handler
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION STYLE: \"" + style +"\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized SESSION STYLE\"\n");
return writeString(SESSION_ERROR, "Unrecognized SESSION STYLE");
}
ok = true ;
return writeString("SESSION STATUS RESULT=OK DESTINATION="
@@ -520,7 +521,7 @@ class SAMv3Handler extends SAMv1Handler
// prevent trouble in finally block
ok = true;
if (streamSession == null || datagramSession == null || rawSession == null)
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Not a MASTER session\"\n");
return writeString(SESSION_ERROR, "Not a MASTER session");
MasterSession msess = (MasterSession) session;
String msg;
if (opcode.equals("ADD")) {
@@ -529,30 +530,30 @@ class SAMv3Handler extends SAMv1Handler
msg = msess.remove(nick, props);
}
if (msg == null)
return writeString("SESSION STATUS RESULT=OK MESSAGE=\"" + opcode + ' ' + nick + "\"\n");
return writeString("SESSION STATUS RESULT=OK", opcode + ' ' + nick);
else
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + msg + "\"\n");
return writeString(SESSION_ERROR, msg);
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Unrecognized SESSION message opcode: \""
+ opcode + "\"");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Unrecognized opcode\"\n");
return writeString(SESSION_ERROR, "Unrecognized opcode");
}
} catch (DataFormatException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Invalid destination specified");
return writeString("SESSION STATUS RESULT=INVALID_KEY DESTINATION=" + dest + " MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString("SESSION STATUS RESULT=INVALID_KEY", e.getMessage());
} catch (I2PSessionException e) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P error when instantiating session", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (SAMException e) {
if (_log.shouldLog(Log.INFO))
_log.info("Funny SAM error", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} catch (IOException e) {
_log.error("Unexpected IOException", e);
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + e.getMessage() + "\"\n");
return writeString(SESSION_ERROR, e.getMessage());
} finally {
// unregister the session if it has not been created
if ( !ok && nick!=null ) {
@@ -796,29 +797,29 @@ class SAMv3Handler extends SAMv1Handler
String user = props.getProperty("USER");
String pw = props.getProperty("PASSWORD");
if (user == null || pw == null)
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER and PASSWORD required\"\n");
return writeString(AUTH_ERROR, "USER and PASSWORD required");
String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
if (i2cpProps.containsKey(prop))
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " already exists\"\n");
return writeString(AUTH_ERROR, "user " + user + " already exists");
PasswordManager pm = new PasswordManager(I2PAppContext.getGlobalContext());
String shash = pm.createHash(pw);
i2cpProps.setProperty(prop, shash);
} else if (opcode.equals("REMOVE")) {
String user = props.getProperty("USER");
if (user == null)
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"USER required\"\n");
return writeString(AUTH_ERROR, "USER required");
String prop = SAMBridge.PROP_PW_PREFIX + user + SAMBridge.PROP_PW_SUFFIX;
if (!i2cpProps.containsKey(prop))
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"user " + user + " not found\"\n");
return writeString(AUTH_ERROR, "user " + user + " not found");
i2cpProps.remove(prop);
} else {
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Unknown AUTH command\"\n");
return writeString(AUTH_ERROR, "Unknown AUTH command");
}
try {
bridge.saveConfig();
return writeString("AUTH STATUS RESULT=OK\n");
} catch (IOException ioe) {
return writeString("AUTH STATUS RESULT=I2P_ERROR MESSAGE=\"Config save failed: " + ioe + "\"\n");
return writeString(AUTH_ERROR, "Config save failed: " + ioe);
}
}

View File

@@ -296,7 +296,7 @@ public class SAMStreamSend {
style = "RAW";
if (masterMode) {
String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=master " + opts + '\n';
String req = "SESSION CREATE DESTINATION=TRANSIENT STYLE=MASTER ID=masterSend " + opts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))

View File

@@ -58,19 +58,22 @@ public class SAMStreamSink {
private static I2PSSLSocketFactory _sslSocketFactory;
private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6, FORWARDSSL=7;
private static final int MASTER=8;
private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort]\n" +
" [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" +
" modes: stream: 0; datagram: 1; v1datagram: 2;\n" +
" raw: 3; v1raw: 4; raw-with-headers: 5;\n" +
" stream-forward: 6; stream-forward-ssl: 7\n" +
" -s: use SSL to connect to bridge\n" +
" -x: use master session (forces -v 3.3)\n" +
" multiple -o session options are allowed";
private static final int V3FORWARDPORT=9998;
private static final int V3DGPORT=9999;
public static void main(String args[]) {
Getopt g = new Getopt("SAM", args, "sb:m:p:u:v:w:");
Getopt g = new Getopt("SAM", args, "sxb:m:p:u:v:w:");
boolean isSSL = false;
boolean isMaster = false;
int mode = STREAM;
String version = "1.0";
String host = "127.0.0.1";
@@ -85,6 +88,10 @@ public class SAMStreamSink {
isSSL = true;
break;
case 'x':
isMaster = true;
break;
case 'm':
mode = Integer.parseInt(g.getOptarg());
if (mode < 0 || mode > FORWARDSSL) {
@@ -131,6 +138,10 @@ public class SAMStreamSink {
System.err.println(USAGE);
return;
}
if (isMaster) {
mode += MASTER;
version = "3.3";
}
if ((user == null && password != null) ||
(user != null && password == null)) {
System.err.println("both user and password or neither");
@@ -169,6 +180,8 @@ public class SAMStreamSink {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Reader created");
String ourDest = handshake(out, version, true, eventHandler, mode, user, password, sessionOpts);
if (mode >= MASTER)
mode -= MASTER;
if (ourDest == null)
throw new IOException("handshake failed");
if (_log.shouldLog(Log.DEBUG))
@@ -560,7 +573,10 @@ public class SAMStreamSink {
return sock;
}
/** @return our b64 dest or null */
/**
* @param isMaster is this the control socket
* @return our b64 dest or null
*/
private String handshake(OutputStream samOut, String version, boolean isMaster,
SAMEventHandler eventHandler, int mode, String user, String password,
String sopts) {
@@ -641,6 +657,16 @@ public class SAMStreamSink {
// and give it to the SAM server
dest = _destFile;
}
boolean masterMode; // are we using v3.3 master session
String command;
if (mode >= MASTER) {
masterMode = true;
command = "ADD";
mode -= MASTER;
} else {
masterMode = false;
command = "CREATE DESTINATION=" + dest;
}
String style;
if (mode == STREAM || mode == FORWARD || mode == FORWARDSSL)
style = "STREAM";
@@ -654,17 +680,36 @@ public class SAMStreamSink {
style = "RAW PORT=" + V3DGPORT;
else
style = "RAW HEADER=true PORT=" + V3DGPORT;
String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + ' ' + _conOptions + ' ' + sopts + '\n';
if (masterMode) {
String req = "SESSION CREATE DESTINATION=" + dest + " STYLE=MASTER ID=masterSink " + sopts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER sent");
boolean ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("SESSION CREATE STYLE=MASTER failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("SESSION CREATE STYLE=MASTER reply found: " + ok);
}
String req = "SESSION " + command + " STYLE=" + style + ' ' + _conOptions + ' ' + sopts + '\n';
samOut.write(req.getBytes("UTF-8"));
samOut.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create sent");
_log.debug("SESSION " + command + " sent");
if (mode == STREAM) {
boolean ok = eventHandler.waitForSessionCreateReply();
// why only waiting in stream mode?
boolean ok;
if (masterMode)
ok = eventHandler.waitForSessionAddReply();
else
ok = eventHandler.waitForSessionCreateReply();
if (!ok)
throw new IOException("Session create failed");
throw new IOException("SESSION " + command + " failed");
if (_log.shouldLog(Log.DEBUG))
_log.debug("Session create reply found: " + ok);
_log.debug("SESSION " + command + " reply found: " + ok);
}
req = "NAMING LOOKUP NAME=ME\n";
samOut.write(req.getBytes("UTF-8"));