From bd048b04cc4c35fd33d31a453565bdeb167adbe9 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Thu, 26 Nov 2015 20:55:10 +0000 Subject: [PATCH] Fix ReadLine bug that buffered and lost input; can't handle UTF-8 for now. Start support of datagrams and raw in the client --- apps/sam/java/src/net/i2p/sam/ReadLine.java | 5 +- .../src/net/i2p/sam/client/SAMStreamSend.java | 103 +++++++++++------- .../src/net/i2p/sam/client/SAMStreamSink.java | 46 +++++--- 3 files changed, 95 insertions(+), 59 deletions(-) diff --git a/apps/sam/java/src/net/i2p/sam/ReadLine.java b/apps/sam/java/src/net/i2p/sam/ReadLine.java index 94cf708df3..2af5f19d31 100644 --- a/apps/sam/java/src/net/i2p/sam/ReadLine.java +++ b/apps/sam/java/src/net/i2p/sam/ReadLine.java @@ -34,7 +34,10 @@ class ReadLine { if (timeout <= 0) throw new SocketTimeoutException(); long expires = System.currentTimeMillis() + timeout; - InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8"); + // this reads and buffers extra bytes, so we can't use it + // unless we're going to decode UTF-8 on-the-fly, we're stuck with ASCII + //InputStreamReader in = new InputStreamReader(socket.getInputStream(), "UTF-8"); + InputStream in = socket.getInputStream(); int c; int i = 0; socket.setSoTimeout(timeout); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index 881073d09f..08a7c05752 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -39,12 +39,15 @@ public class SAMStreamSend { /** Connection id (Integer) to peer (Flooder) */ private final Map<String, Sender> _remotePeers; - private static final String USAGE = "Usage: SAMStreamSend [-s] [-d] [-r] [-v version] [-b samHost] [-p samPort] peerDestFile dataDir"; + private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4; + private static final String USAGE = "Usage: SAMStreamSend [-s] [-m mode] [-v version] [-b samHost] [-p samPort] peerDestFile dataDir\n" + + " modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" + + " -s: use SSL"; public static void main(String args[]) { - Getopt g = new Getopt("SAM", args, "drsb:p:v:"); + Getopt g = new Getopt("SAM", args, "sb:m:p:v:"); boolean isSSL = false; - int mode = 0; // stream + int mode = STREAM; String version = "1.0"; String host = "127.0.0.1"; String port = "7656"; @@ -55,12 +58,12 @@ public class SAMStreamSend { isSSL = true; break; - case 'd': - mode = 1; // datagram - break; - - case 'r': - mode = 2; // raw + case 'm': + mode = Integer.parseInt(g.getOptarg()); + if (mode < 0 || mode > V1RAW) { + System.err.println(USAGE); + return; + } break; case 'v': @@ -92,7 +95,7 @@ public class SAMStreamSend { I2PAppContext ctx = I2PAppContext.getGlobalContext(); SAMStreamSend sender = new SAMStreamSend(ctx, host, port, args[startArgs], args[startArgs + 1]); - sender.startup(version); + sender.startup(version, isSSL, mode); } public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) { @@ -107,38 +110,38 @@ public class SAMStreamSend { _remotePeers = new HashMap<String, Sender>(); } - public void startup(String version) { + public void startup(String version, boolean isSSL, int mode) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up"); try { - Socket sock = connect(); + Socket sock = connect(isSSL); SAMEventHandler eventHandler = new SendEventHandler(_context); _reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); OutputStream out = sock.getOutputStream(); - String ourDest = handshake(out, version, true, eventHandler); + String ourDest = handshake(out, version, true, eventHandler, mode); if (ourDest == null) throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); - if (_isV3) { - Socket sock2 = connect(); + if (_isV3 && mode != V1DG && mode != V1RAW) { + Socket sock2 = connect(isSSL); eventHandler = new SendEventHandler(_context); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader2 created"); out = sock2.getOutputStream(); - String ok = handshake(out, version, false, eventHandler); + String ok = handshake(out, version, false, eventHandler, mode); if (ok == null) throw new IOException("2nd handshake failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake2 complete."); } if (ourDest != null) { - send(out, eventHandler); + send(out, eventHandler, mode); } } catch (IOException e) { _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); @@ -168,12 +171,12 @@ public class SAMStreamSend { } } - private Socket connect() throws IOException { + private Socket connect(boolean isSSL) throws IOException { return new Socket(_samHost, Integer.parseInt(_samPort)); } /** @return our b64 dest or null */ - private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) { + private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode) { synchronized (samOut) { try { samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); @@ -194,7 +197,14 @@ public class SAMStreamSend { _v3ID = Base32.encode(id); _conOptions = "ID=" + _v3ID; } - String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; + String style; + if (mode == STREAM) + style = "STREAM"; + else if (mode == DG || mode == V1DG) + style = "DATAGRAM"; + else + style = "RAW"; + String req = "SESSION CREATE STYLE=" + style + " DESTINATION=TRANSIENT " + _conOptions + "\n"; samOut.write(req.getBytes()); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) @@ -227,8 +237,8 @@ public class SAMStreamSend { } } - private void send(OutputStream samOut, SAMEventHandler eventHandler) throws IOException { - Sender sender = new Sender(samOut, eventHandler); + private void send(OutputStream samOut, SAMEventHandler eventHandler, int mode) throws IOException { + Sender sender = new Sender(samOut, eventHandler, mode); boolean ok = sender.openConnection(); if (ok) { I2PAppThread t = new I2PAppThread(sender, "Sender"); @@ -247,10 +257,12 @@ public class SAMStreamSend { private long _totalSent; private final OutputStream _samOut; private final SAMEventHandler _eventHandler; + private final int _mode; - public Sender(OutputStream samOut, SAMEventHandler eventHandler) { + public Sender(OutputStream samOut, SAMEventHandler eventHandler, int mode) { _samOut = samOut; _eventHandler = eventHandler; + _mode = mode; synchronized (_remotePeers) { if (_v3ID != null) _connectionId = _v3ID; @@ -273,21 +285,23 @@ public class SAMStreamSend { _context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 }); _context.statManager().createRateStat("send." + _connectionId + ".lifetime", "How long we talk to a peer", "swarm", new long[] { 5*60*1000 }); - StringBuilder buf = new StringBuilder(1024); - buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination); - // not supported until 3.2 but 3.0-3.1 will ignore - if (_isV3) - buf.append(" FROM_PORT=1234 TO_PORT=5678"); - buf.append('\n'); - byte[] msg = DataHelper.getASCII(buf.toString()); - synchronized (_samOut) { - _samOut.write(msg); - _samOut.flush(); + if (_mode == STREAM) { + StringBuilder buf = new StringBuilder(1024); + buf.append("STREAM CONNECT ID=").append(_connectionId).append(" DESTINATION=").append(_remoteDestination); + // not supported until 3.2 but 3.0-3.1 will ignore + if (_isV3) + buf.append(" FROM_PORT=1234 TO_PORT=5678"); + buf.append('\n'); + byte[] msg = DataHelper.getASCII(buf.toString()); + synchronized (_samOut) { + _samOut.write(msg); + _samOut.flush(); + } + _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS..."); + boolean ok = _eventHandler.waitForStreamStatusReply(); + if (!ok) + throw new IOException("STREAM CONNECT failed"); } - _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS..."); - boolean ok = _eventHandler.waitForStreamStatusReply(); - if (!ok) - throw new IOException("STREAM CONNECT failed"); _in = new FileInputStream(_dataFile); return true; @@ -318,7 +332,7 @@ public class SAMStreamSend { _started = _context.clock().now(); _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0); final long toSend = (new File(_dataFile)).length(); - byte data[] = new byte[1024]; + byte data[] = new byte[8192]; long lastSend = _context.clock().now(); while (!_closed) { try { @@ -334,8 +348,17 @@ public class SAMStreamSend { lastSend = now; synchronized (_samOut) { - if (!_isV3) { - byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes(); + if (!_isV3 || _mode == V1DG || _mode == V1RAW) { + String m; + if (_mode == STREAM) + m = "STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n"; + else if (_mode == V1DG) + m = "DATAGRAM SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n"; + else if (_mode == V1RAW) + m = "RAW SEND DESTINATION=" + _remoteDestination + " SIZE=" + read + "\n"; + else + throw new IOException("unsupported mode " + _mode); + byte msg[] = DataHelper.getASCII(m); _samOut.write(msg); } _samOut.write(data, 0, read); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index 91c1f38935..38e7a57bc9 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -40,12 +40,15 @@ public class SAMStreamSink { /** Connection id (Integer) to peer (Flooder) */ private final Map<String, Sink> _remotePeers; - private static final String USAGE = "Usage: SAMStreamSink [-s] [-d] [-r] [-v version] [-b samHost] [-p samPort] myDestFile sinkDir"; + private static final int STREAM=0, DG=1, V1DG=2, RAW=3, V1RAW=4; + private static final String USAGE = "Usage: SAMStreamSink [-s] [-m mode] [-v version] [-b samHost] [-p samPort] myDestFile sinkDir\n" + + " modes: stream: 0; datagram: 1; v1datagram: 2; raw: 3; v1raw: 4\n" + + " -s: use SSL"; public static void main(String args[]) { - Getopt g = new Getopt("SAM", args, "drsb:p:v:"); + Getopt g = new Getopt("SAM", args, "sb:m:p:v:"); boolean isSSL = false; - int mode = 0; // stream + int mode = STREAM; String version = "1.0"; String host = "127.0.0.1"; String port = "7656"; @@ -56,12 +59,12 @@ public class SAMStreamSink { isSSL = true; break; - case 'd': - mode = 1; // datagram - break; - - case 'r': - mode = 2; // raw + case 'm': + mode = Integer.parseInt(g.getOptarg()); + if (mode < 0 || mode > V1RAW) { + System.err.println(USAGE); + return; + } break; case 'v': @@ -93,7 +96,7 @@ public class SAMStreamSink { I2PAppContext ctx = I2PAppContext.getGlobalContext(); SAMStreamSink sink = new SAMStreamSink(ctx, host, port, args[startArgs], args[startArgs + 1]); - sink.startup(version); + sink.startup(version, isSSL, mode); } public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) { @@ -108,31 +111,31 @@ public class SAMStreamSink { _remotePeers = new HashMap<String, Sink>(); } - public void startup(String version) { + public void startup(String version, boolean isSSL, int mode) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up"); try { - Socket sock = connect(); + Socket sock = connect(isSSL); OutputStream out = sock.getOutputStream(); SAMEventHandler eventHandler = new SinkEventHandler(_context, out); _reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); - String ourDest = handshake(out, version, true, eventHandler); + String ourDest = handshake(out, version, true, eventHandler, mode); if (ourDest == null) throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); if (_isV3) { - Socket sock2 = connect(); + Socket sock2 = connect(isSSL); out = sock2.getOutputStream(); eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader2 created"); - String ok = handshake(out, version, false, eventHandler); + String ok = handshake(out, version, false, eventHandler, mode); if (ok == null) throw new IOException("2nd handshake failed"); if (_log.shouldLog(Log.DEBUG)) @@ -285,12 +288,12 @@ public class SAMStreamSink { } } - private Socket connect() throws IOException { + private Socket connect(boolean isSSL) throws IOException { return new Socket(_samHost, Integer.parseInt(_samPort)); } /** @return our b64 dest or null */ - private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) { + private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler, int mode) { synchronized (samOut) { try { samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); @@ -354,7 +357,14 @@ public class SAMStreamSink { // and give it to the SAM server dest = _destFile; } - String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n"; + String style; + if (mode == STREAM) + style = "STREAM"; + else if (mode == DG || mode == V1DG) + style = "DATAGRAM"; + else + style = "RAW"; + String req = "SESSION CREATE STYLE=" + style + " DESTINATION=" + dest + " " + _conOptions + "\n"; samOut.write(req.getBytes()); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) -- GitLab