SAM version 3 :

- Raw and Datagram sessions implemented
      - option "SILENT=true" added to the stream protocol
      - java 6 warnings removed
   ministreaming :
      - java 6 warnings removed
   ministreaming and streaming :
      -  added functions : 
      	I2PServerSocket.waitIncoming(long timeout)
      	I2PServerSocket.accept(boolean block)
This commit is contained in:
mkvore-commit
2009-04-02 08:22:31 +00:00
parent 9aa8707647
commit a4d16af95d
44 changed files with 2278 additions and 183 deletions

View File

@@ -12,12 +12,11 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.NoRouteToHostException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.nio.ByteBuffer;
import java.util.Properties;
import java.util.StringTokenizer;
@@ -40,14 +39,14 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
private final static Log _log = new Log(SAMv1Handler.class);
private final static int IN_BUFSIZE = 2048;
protected SAMRawSession rawSession = null;
protected SAMDatagramSession datagramSession = null;
protected SAMStreamSession streamSession = null;
protected SAMDatagramSession getDatagramSession() {return datagramSession ;}
protected SAMRawSession getRawSession() {return rawSession ;}
private SAMRawSession rawSession = null;
private SAMDatagramSession datagramSession = null;
protected SAMStreamSession streamSession = null;
private long _id;
private static volatile long __id = 0;
protected long _id;
protected static volatile long __id = 0;
/**
* Create a new SAM version 1 handler. This constructor expects
@@ -60,7 +59,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
* @throws SAMException
* @throws IOException
*/
public SAMv1Handler(Socket s, int verMajor, int verMinor) throws SAMException, IOException {
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor) throws SAMException, IOException {
this(s, verMajor, verMinor, new Properties());
}
/**
@@ -75,7 +74,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
* @throws SAMException
* @throws IOException
*/
public SAMv1Handler(Socket s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
public SAMv1Handler(SocketChannel s, int verMajor, int verMinor, Properties i2cpProps) throws SAMException, IOException {
super(s, verMajor, verMinor, i2cpProps);
_id = ++__id;
_log.debug("SAM version 1 handler instantiated");
@@ -101,16 +100,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
_log.debug("SAM handling started");
try {
InputStream in = getClientSocketInputStream();
int b = -1;
while (true) {
if (shouldStop()) {
_log.debug("Stop request found");
break;
}
msg = DataHelper.readLine(in);
msg = DataHelper.readLine(getClientSocket().socket().getInputStream()).trim();
if (msg == null) {
_log.debug("Connection closed by client");
break;
@@ -175,11 +171,11 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
} catch (IOException e) {
_log.error("Error closing socket: " + e.getMessage());
}
if (rawSession != null) {
rawSession.close();
if (getRawSession() != null) {
getRawSession().close();
}
if (datagramSession != null) {
datagramSession.close();
if (getDatagramSession() != null) {
getDatagramSession().close();
}
if (streamSession != null) {
streamSession.close();
@@ -188,13 +184,13 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a SESSION message */
private boolean execSessionMessage(String opcode, Properties props) {
protected boolean execSessionMessage(String opcode, Properties props) {
String dest = "BUG!";
try{
if (opcode.equals("CREATE")) {
if ((rawSession != null) || (datagramSession != null)
if ((getRawSession() != null) || (getDatagramSession() != null)
|| (streamSession != null)) {
_log.debug("Trying to create a session, but one still exists");
return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n");
@@ -293,7 +289,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a DEST message*/
private boolean execDestMessage(String opcode, Properties props) {
protected boolean execDestMessage(String opcode, Properties props) {
if (opcode.equals("GENERATE")) {
if (props.size() > 0) {
@@ -318,7 +314,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a NAMING message */
private boolean execNamingMessage(String opcode, Properties props) {
protected boolean execNamingMessage(String opcode, Properties props) {
if (opcode.equals("LOOKUP")) {
if (props == null) {
_log.debug("No parameters specified in NAMING LOOKUP message");
@@ -333,18 +329,18 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
Destination dest;
if (name.equals("ME")) {
if (rawSession != null) {
dest = rawSession.getDestination();
if (getRawSession() != null) {
dest = getRawSession().getDestination();
} else if (streamSession != null) {
dest = streamSession.getDestination();
} else if (datagramSession != null) {
dest = datagramSession.getDestination();
} else if (getDatagramSession() != null) {
dest = getDatagramSession().getDestination();
} else {
_log.debug("Lookup for SESSION destination, but session is null");
return false;
}
} else {
dest = SAMUtils.lookupHost(name, null);
dest = SAMUtils.getDest(name);
}
if (dest == null) {
@@ -364,8 +360,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
/* Parse and execute a DATAGRAM message */
private boolean execDatagramMessage(String opcode, Properties props) {
if (datagramSession == null) {
protected boolean execDatagramMessage(String opcode, Properties props) {
if (getDatagramSession() == null) {
_log.error("DATAGRAM message received, but no DATAGRAM session exists");
return false;
}
@@ -403,7 +399,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
DataInputStream in = new DataInputStream(getClientSocketInputStream());
DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
byte[] data = new byte[size];
in.readFully(data);
@@ -435,8 +431,8 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
/* Parse and execute a RAW message */
private boolean execRawMessage(String opcode, Properties props) {
if (rawSession == null) {
protected boolean execRawMessage(String opcode, Properties props) {
if (getRawSession() == null) {
_log.error("RAW message received, but no RAW session exists");
return false;
}
@@ -474,12 +470,12 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
DataInputStream in = new DataInputStream(getClientSocketInputStream());
DataInputStream in = new DataInputStream(getClientSocket().socket().getInputStream());
byte[] data = new byte[size];
in.readFully(data);
if (!rawSession.sendBytes(dest, data)) {
if (!getRawSession().sendBytes(dest, data)) {
_log.error("RAW SEND failed");
return true;
}
@@ -567,7 +563,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
try {
if (!streamSession.sendBytes(id, getClientSocketInputStream(), size)) { // data)) {
if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) {
if (_log.shouldLog(Log.WARN))
_log.warn("STREAM SEND [" + size + "] failed");
boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n");
@@ -691,7 +687,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
// SAMRawReceiver implementation
public void receiveRawBytes(byte data[]) throws IOException {
if (rawSession == null) {
if (getRawSession() == null) {
_log.error("BUG! Received raw bytes, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
}
@@ -701,17 +697,18 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
String msgText = "RAW RECEIVED SIZE=" + data.length + "\n";
msg.write(msgText.getBytes("ISO-8859-1"));
msg.write(data);
msg.flush();
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);
writeBytes(msg.toByteArray());
writeBytes(ByteBuffer.wrap(msg.toByteArray()));
}
public void stopRawReceiving() {
_log.debug("stopRawReceiving() invoked");
if (rawSession == null) {
if (getRawSession() == null) {
_log.error("BUG! Got raw receiving stop, but session is null!");
throw new NullPointerException("BUG! RAW session is null!");
}
@@ -726,7 +723,7 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
// SAMDatagramReceiver implementation
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
if (datagramSession == null) {
if (getDatagramSession() == null) {
_log.error("BUG! Received datagram bytes, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
}
@@ -740,14 +737,14 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);
msg.write(data);
writeBytes(msg.toByteArray());
msg.flush();
writeBytes(ByteBuffer.wrap(msg.toByteArray()));
}
public void stopDatagramReceiving() {
_log.debug("stopDatagramReceiving() invoked");
if (datagramSession == null) {
if (getDatagramSession() == null) {
_log.error("BUG! Got datagram receiving stop, but session is null!");
throw new NullPointerException("BUG! DATAGRAM session is null!");
}
@@ -830,29 +827,23 @@ public class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatag
}
}
public void receiveStreamBytes(int id, byte data[], int len) throws IOException {
public void receiveStreamBytes(int id, ByteBuffer data) throws IOException {
if (streamSession == null) {
_log.error("Received stream bytes, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + len + "\n";
String msgText = "STREAM RECEIVED ID=" + id +" SIZE=" + data.remaining() + "\n";
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);
byte prefix[] = msgText.getBytes("ISO-8859-1");
ByteBuffer prefix = ByteBuffer.wrap(msgText.getBytes("ISO-8859-1"));
// dont waste so much memory
//ByteArrayOutputStream msg = new ByteArrayOutputStream();
//msg.write(msgText.getBytes("ISO-8859-1"));
//msg.write(data, 0, len);
// writeBytes(msg.toByteArray());
Object writeLock = getWriteLock();
OutputStream out = getOut();
synchronized (writeLock) {
out.write(prefix);
out.write(data, 0, len);
out.flush();
while (prefix.hasRemaining()) socket.write(prefix);
while (data.hasRemaining()) socket.write(data);
socket.socket().getOutputStream().flush();
}
}