diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index b7a94bfb8..d5f518b8a 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -8,12 +8,14 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.DatagramPacket; import java.net.DatagramSocket; +import java.net.ServerSocket; import java.net.Socket; import java.security.GeneralSecurityException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import javax.net.ssl.SSLSocket; +import javax.net.ssl.SSLServerSocket; import gnu.getopt.Getopt; @@ -49,11 +51,12 @@ public class SAMStreamSink { private final Map _remotePeers; private static I2PSSLSocketFactory _sslSocketFactory; - private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5; + private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4, RAWHDR = 5, FORWARD = 6; private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort] [-o opt=val] [-u user] [-w password] myDestFile sinkDir\n" + - " modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4 raw-with-headers: 5\n" + + " modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4; raw-with-headers: 5; stream-forward: 6\n" + " -s: use SSL\n" + " multiple -o session options are allowed"; + private static final int V3FORWARDPORT=9998; private static final int V3DGPORT=9999; public static void main(String args[]) { @@ -75,7 +78,7 @@ public class SAMStreamSink { case 'm': mode = Integer.parseInt(g.getOptarg()); - if (mode < 0 || mode > RAWHDR) { + if (mode < 0 || mode > FORWARD) { System.err.println(USAGE); return; } @@ -167,9 +170,9 @@ public class SAMStreamSink { Thread t = new Pinger(out); t.start(); } - if (_isV3 && mode == STREAM) { + if (_isV3 && (mode == STREAM || mode == FORWARD)) { // test multiple acceptors, only works in 3.2 - int acceptors = isV32 ? 4 : 1; + int acceptors = (_isV32 && mode == STREAM) ? 4 : 1; for (int i = 0; i < acceptors; i++) { Socket sock2 = connect(isSSL); out = sock2.getOutputStream(); @@ -184,6 +187,10 @@ public class SAMStreamSink { if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake " + (2 + i) + " complete."); } + if (mode == FORWARD) { + // set up a listening ServerSocket + (new FwdRcvr(isSSL)).start(); + } } else if (_isV3 && (mode == DG || mode == RAW || mode == RAWHDR)) { // set up a listening DatagramSocket (new DGRcvr(mode)).start(); @@ -244,6 +251,44 @@ public class SAMStreamSink { } } + private class FwdRcvr extends I2PAppThread { + private final boolean _isSSL; + + public FwdRcvr(boolean isSSL) { + if (isSSL) + throw new UnsupportedOperationException("TODO"); + _isSSL = isSSL; + } + + public void run() { + try { + ServerSocket ss; + if (_isSSL) { + throw new UnsupportedOperationException("TODO"); + } else { + ss = new ServerSocket(V3FORWARDPORT); + } + while (true) { + Socket s = ss.accept(); + Sink sink = new Sink("FAKE", "FAKEFROM"); + try { + InputStream in = s.getInputStream(); + byte[] buf = new byte[32768]; + int len; + while((len = in.read(buf)) >= 0) { + sink.received(buf, 0, len); + } + sink.closed(); + } catch (IOException ioe) { + _log.error("Fwdcvr", ioe); + } + } + } catch (IOException ioe) { + _log.error("Fwdcvr", ioe); + } + } + } + private static class Pinger extends I2PAppThread { private final OutputStream _out; @@ -480,17 +525,25 @@ public class SAMStreamSink { // only for v3 //String req = "STREAM ACCEPT SILENT=true ID=" + _v3ID + "\n"; // TO_PORT not supported until 3.2 but 3.0-3.1 will ignore - String req = "STREAM ACCEPT SILENT=false TO_PORT=5678 ID=" + _v3ID + "\n"; + String req; + if (mode == STREAM) + req = "STREAM ACCEPT SILENT=false TO_PORT=5678 ID=" + _v3ID + "\n"; + else if (mode == FORWARD) + req = "STREAM FORWARD ID=" + _v3ID + " PORT=" + V3FORWARDPORT + '\n'; + else + throw new IllegalStateException("mode " + mode); samOut.write(req.getBytes()); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("STREAM ACCEPT sent"); - // docs were wrong, we do not get a STREAM STATUS if SILENT=true - //boolean ok = eventHandler.waitForStreamStatusReply(); - //if (!ok) - // throw new IOException("Stream status failed"); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("got STREAM STATUS, awaiting connection"); + _log.debug("STREAM ACCEPT/FORWARD sent"); + if (mode == FORWARD) { + // docs were wrong, we do not get a STREAM STATUS if SILENT=true for ACCEPT + boolean ok = eventHandler.waitForStreamStatusReply(); + if (!ok) + throw new IOException("Stream status failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("got STREAM STATUS, awaiting connection"); + } return "OK"; } _isV3 = VersionComparator.comp(hisVersion, "3") >= 0; @@ -530,7 +583,7 @@ public class SAMStreamSink { dest = _destFile; } String style; - if (mode == STREAM) + if (mode == STREAM || mode == FORWARD) style = "STREAM"; else if (mode == V1DG) style = "DATAGRAM";