Add protocol and port notification

This commit is contained in:
zzz
2015-06-26 18:51:03 +00:00
parent b6cb074c04
commit 876729c24e
11 changed files with 95 additions and 39 deletions

View File

@@ -22,9 +22,12 @@ interface SAMDatagramReceiver {
*
* @param sender Destination
* @param data Byte array to be received
* @param proto I2CP protocol
* @param fromPort I2CP from port
* @param toPort I2CP to port
* @throws IOException
*/
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException;
public void receiveDatagramBytes(Destination sender, byte data[], int proto, int fromPort, int toPort) throws IOException;
/**
* Stop receiving data.

View File

@@ -95,7 +95,7 @@ class SAMDatagramSession extends SAMMessageSession {
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
}
protected void messageReceived(byte[] msg) {
protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
byte[] payload;
Destination sender;
try {
@@ -117,7 +117,7 @@ class SAMDatagramSession extends SAMMessageSession {
}
try {
recv.receiveDatagramBytes(sender, payload);
recv.receiveDatagramBytes(sender, payload, proto, fromPort, toPort);
} catch (IOException e) {
_log.error("Error forwarding message to receiver", e);
close();

View File

@@ -35,8 +35,8 @@ abstract class SAMHandler implements Runnable, Handler {
private final Object socketWLock = new Object(); // Guards writings on socket
protected final SocketChannel socket;
protected final int verMajor;
protected final int verMinor;
public final int verMajor;
public final int verMinor;
/** I2CP options configuring the I2CP connection (port, host, numHops, etc) */
protected final Properties i2cpProps;

View File

@@ -18,7 +18,7 @@ import net.i2p.client.I2PClient;
import net.i2p.client.I2PClientFactory;
import net.i2p.client.I2PSession;
import net.i2p.client.I2PSessionException;
import net.i2p.client.I2PSessionListener;
import net.i2p.client.I2PSessionMuxedListener;
import net.i2p.data.Base64;
import net.i2p.data.DataFormatException;
import net.i2p.data.Destination;
@@ -135,7 +135,7 @@ abstract class SAMMessageSession {
* Handle a new received message
* @param msg Message payload
*/
protected abstract void messageReceived(byte[] msg);
protected abstract void messageReceived(byte[] msg, int proto, int fromPort, int toPort);
/**
* Do whatever is needed to shutdown the SAM session
@@ -157,7 +157,7 @@ abstract class SAMMessageSession {
*
* @author human
*/
class SAMMessageSessionHandler implements Runnable, I2PSessionListener {
class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener {
private final Object runningLock = new Object();
private volatile boolean stillRunning = true;
@@ -186,7 +186,7 @@ abstract class SAMMessageSession {
if (_log.shouldLog(Log.DEBUG))
_log.debug("I2P session connected");
session.setSessionListener(this);
session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
}
/**
@@ -217,6 +217,7 @@ abstract class SAMMessageSession {
_log.debug("Shutting down SAM message-based session handler");
shutDown();
session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY);
try {
if (_log.shouldLog(Log.DEBUG))
@@ -242,7 +243,15 @@ abstract class SAMMessageSession {
stopRunning();
}
public void messageAvailable(I2PSession session, int msgId, long size){
public void messageAvailable(I2PSession session, int msgId, long size) {
messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED,
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
}
/** @since 0.9.22 */
public void messageAvailable(I2PSession session, int msgId, long size,
int proto, int fromPort, int toPort) {
if (_log.shouldLog(Log.DEBUG)) {
_log.debug("I2P message available (id: " + msgId
+ "; size: " + size + ")");
@@ -256,7 +265,7 @@ abstract class SAMMessageSession {
+ HexDump.dump(msg));
}
messageReceived(msg);
messageReceived(msg, proto, fromPort, toPort);
} catch (I2PSessionException e) {
_log.error("Error fetching I2P message", e);
stopRunning();

View File

@@ -20,9 +20,12 @@ interface SAMRawReceiver {
* regarding the sender.
*
* @param data Byte array to be received
* @param proto I2CP protocol
* @param fromPort I2CP from port
* @param toPort I2CP to port
* @throws IOException
*/
public void receiveRawBytes(byte data[]) throws IOException;
public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException;
/**
* Stop receiving data.

View File

@@ -80,9 +80,9 @@ class SAMRawSession extends SAMMessageSession {
I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED);
}
protected void messageReceived(byte[] msg) {
protected void messageReceived(byte[] msg, int proto, int fromPort, int toPort) {
try {
recv.receiveRawBytes(msg);
recv.receiveRawBytes(msg, proto, fromPort, toPort);
} catch (IOException e) {
_log.error("Error forwarding message to receiver", e);
close();

View File

@@ -796,16 +796,21 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
// SAMRawReceiver implementation
public void receiveRawBytes(byte data[]) throws IOException {
public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException {
if (getRawSession() == null) {
_log.error("BUG! Received raw bytes, but session is null!");
return;
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
ByteArrayOutputStream msg = new ByteArrayOutputStream(64 + data.length);
String msgText = "RAW RECEIVED SIZE=" + data.length + "\n";
String msgText = "RAW RECEIVED SIZE=" + data.length;
msg.write(DataHelper.getASCII(msgText));
if ((verMajor == 3 && verMinor >= 2) || verMajor > 3) {
msgText = " PROTOCOL=" + proto + " FROM_PORT=" + fromPort + " TO_PORT=" + toPort;
msg.write(DataHelper.getASCII(msgText));
}
msg.write((byte) '\n');
msg.write(data);
if (_log.shouldLog(Log.DEBUG))
@@ -832,17 +837,23 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece
}
// SAMDatagramReceiver implementation
public void receiveDatagramBytes(Destination sender, byte data[]) throws IOException {
public void receiveDatagramBytes(Destination sender, byte data[], int proto,
int fromPort, int toPort) throws IOException {
if (getDatagramSession() == null) {
_log.error("BUG! Received datagram bytes, but session is null!");
return;
}
ByteArrayOutputStream msg = new ByteArrayOutputStream();
ByteArrayOutputStream msg = new ByteArrayOutputStream(100 + data.length);
String msgText = "DATAGRAM RECEIVED DESTINATION=" + sender.toBase64()
+ " SIZE=" + data.length + "\n";
+ " SIZE=" + data.length;
msg.write(DataHelper.getASCII(msgText));
if ((verMajor == 3 && verMinor >= 2) || verMajor > 3) {
msgText = " FROM_PORT=" + fromPort + " TO_PORT=" + toPort;
msg.write(DataHelper.getASCII(msgText));
}
msg.write((byte) '\n');
if (_log.shouldLog(Log.DEBUG))
_log.debug("sending to client: " + msgText);

View File

@@ -71,11 +71,18 @@ class SAMv3DatagramSession extends SAMDatagramSession implements SAMv3Handler.Se
}
}
public void receiveDatagramBytes(Destination sender, byte[] data) throws IOException {
public void receiveDatagramBytes(Destination sender, byte[] data, int proto,
int fromPort, int toPort) throws IOException {
if (this.clientAddress==null) {
this.handler.receiveDatagramBytes(sender, data);
this.handler.receiveDatagramBytes(sender, data, proto, fromPort, toPort);
} else {
String msg = sender.toBase64()+"\n";
StringBuilder buf = new StringBuilder(600);
buf.append(sender.toBase64());
if ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) {
buf.append(" FROM_PORT=").append(fromPort).append(" TO_PORT=").append(toPort);
}
buf.append('\n');
String msg = buf.toString();
ByteBuffer msgBuf = ByteBuffer.allocate(msg.length()+data.length);
msgBuf.put(DataHelper.getASCII(msg));
msgBuf.put(data);

View File

@@ -50,7 +50,7 @@ class SAMv3Handler extends SAMv1Handler
public static final SessionsDB sSessionsHash = new SessionsDB();
private volatile boolean stolenSocket;
private volatile boolean streamForwardingSocket;
private final boolean sendPorts;
interface Session {
String getNick();
@@ -88,6 +88,7 @@ class SAMv3Handler extends SAMv1Handler
Properties i2cpProps, SAMBridge parent) throws SAMException, IOException
{
super(s, verMajor, verMinor, i2cpProps, parent);
sendPorts = (verMajor == 3 && verMinor >= 2) || verMajor > 3;
if (_log.shouldLog(Log.DEBUG))
_log.debug("SAM version 3 handler instantiated");
}
@@ -808,7 +809,7 @@ class SAMv3Handler extends SAMv1Handler
try {
try {
streamForwardingSocket = true ;
((SAMv3StreamSession)streamSession).startForwardingIncoming(props);
((SAMv3StreamSession)streamSession).startForwardingIncoming(props, sendPorts);
notifyStreamResult( true, "OK", null );
return true ;
} catch (SAMException e) {
@@ -858,13 +859,18 @@ class SAMv3Handler extends SAMv1Handler
}
}
public void notifyStreamIncomingConnection(Destination d) throws IOException {
public void notifyStreamIncomingConnection(Destination d, int fromPort, int toPort) throws IOException {
if (getStreamSession() == null) {
_log.error("BUG! Received stream connection, but session is null!");
throw new NullPointerException("BUG! STREAM session is null!");
}
if (!writeString(d.toBase64() + "\n")) {
StringBuilder buf = new StringBuilder(600);
buf.append(d.toBase64());
if (sendPorts) {
buf.append(" FROM_PORT=").append(fromPort).append(" TO_PORT=").append(toPort);
}
buf.append('\n');
if (!writeString(buf.toString())) {
throw new IOException("Error notifying connection to SAM client");
}
}
@@ -874,6 +880,14 @@ class SAMv3Handler extends SAMv1Handler
throw new IOException("Error notifying connection to SAM client");
}
}
/** @since 0.9.22 */
public static void notifyStreamIncomingConnection(SocketChannel client, Destination d,
int fromPort, int toPort) throws IOException {
if (!writeString(d.toBase64() + " FROM_PORT=" + fromPort + " TO_PORT=" + toPort + '\n', client)) {
throw new IOException("Error notifying connection to SAM client");
}
}
}

View File

@@ -77,9 +77,9 @@ class SAMv3RawSession extends SAMRawSession implements SAMv3Handler.Session, SA
}
}
public void receiveRawBytes(byte[] data) throws IOException {
public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException {
if (this.clientAddress==null) {
this.handler.receiveRawBytes(data);
this.handler.receiveRawBytes(data, proto, fromPort, toPort);
} else {
ByteBuffer msgBuf = ByteBuffer.allocate(data.length);
msgBuf.put(data);

View File

@@ -166,9 +166,10 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
if ( rec==null || i2ps==null ) throw new InterruptedIOException() ;
if (verbose)
handler.notifyStreamIncomingConnection(i2ps.getPeerDestination()) ;
if (verbose) {
handler.notifyStreamIncomingConnection(i2ps.getPeerDestination(),
i2ps.getPort(), i2ps.getLocalPort());
}
handler.stealSocket() ;
ReadableByteChannel fromClient = handler.getClientSocket();
ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream());
@@ -185,7 +186,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
}
public void startForwardingIncoming( Properties props ) throws SAMException, InterruptedIOException
public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException, InterruptedIOException
{
SAMv3Handler.SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick);
boolean verbose = props.getProperty("SILENT", "false").equals("false");
@@ -218,7 +219,7 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
this.socketServer = this.socketMgr.getServerSocket();
}
SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose);
SocketForwarder forwarder = new SocketForwarder(host, port, this, verbose, sendPorts);
(new Thread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start();
}
@@ -227,13 +228,14 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
private final String host;
private final int port;
private final SAMv3StreamSession session;
private final boolean verbose;
private final boolean verbose, sendPorts;
SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose) {
SocketForwarder(String host, int port, SAMv3StreamSession session, boolean verbose, boolean sendPorts) {
this.host = host ;
this.port = port ;
this.session = session ;
this.verbose = verbose ;
this.sendPorts = sendPorts;
}
public void run()
@@ -264,9 +266,16 @@ class SAMv3StreamSession extends SAMStreamSession implements SAMv3Handler.Sessi
// build pipes between both sockets
try {
clientServerSock.socket().setKeepAlive(true);
if (this.verbose)
SAMv3Handler.notifyStreamIncomingConnection(
if (this.verbose) {
if (sendPorts) {
SAMv3Handler.notifyStreamIncomingConnection(
clientServerSock, i2ps.getPeerDestination(),
i2ps.getPort(), i2ps.getLocalPort());
} else {
SAMv3Handler.notifyStreamIncomingConnection(
clientServerSock, i2ps.getPeerDestination());
}
}
ReadableByteChannel fromClient = clientServerSock ;
ReadableByteChannel fromI2P = Channels.newChannel(i2ps.getInputStream());
WritableByteChannel toClient = clientServerSock ;