diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java index 5388f5dc2336f831aca586628ca1a6105553bcd2..3e02a04be9405b06a9e00b18d1609e5877df6ec2 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java @@ -7,7 +7,7 @@ import java.util.Properties; */ public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListener { public void destReplyReceived(String publicKey, String privateKey) {} - public void helloReplyReceived(boolean ok) {} + public void helloReplyReceived(boolean ok, String version) {} public void namingReplyReceived(String name, String result, String value, String message) {} public void sessionStatusReceived(String result, String destination, String message) {} public void streamClosedReceived(String result, int id, String message) {} diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java index 302dbd629943cdaa2dbd5691a982e6758e38f5d5..40916c7eb740f5c48668a041ae9bf79a927a8e3a 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java @@ -15,6 +15,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { //private I2PAppContext _context; private final Log _log; private Boolean _helloOk; + private String _version; private final Object _helloLock = new Object(); private Boolean _sessionCreateOk; private final Object _sessionCreateLock = new Object(); @@ -27,12 +28,13 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } @Override - public void helloReplyReceived(boolean ok) { + public void helloReplyReceived(boolean ok, String version) { synchronized (_helloLock) { if (ok) _helloOk = Boolean.TRUE; else _helloOk = Boolean.FALSE; + _version = version; _helloLock.notifyAll(); } } @@ -61,7 +63,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { @Override public void unknownMessageReceived(String major, String minor, Properties params) { - _log.error("wrt, [" + major + "] [" + minor + "] [" + params + "]"); + _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]"); } @@ -70,18 +72,18 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { // /** - * Wait for the connection to be established, returning true if everything + * Wait for the connection to be established, returning the server version if everything * went ok - * @return true if everything ok + * @return SAM server version if everything ok, or null on failure */ - public boolean waitForHelloReply() { + public String waitForHelloReply() { while (true) { try { synchronized (_helloLock) { if (_helloOk == null) _helloLock.wait(); else - return _helloOk.booleanValue(); + return _helloOk.booleanValue() ? _version : null; } } catch (InterruptedException ie) {} } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java index bc8d989581571d2cb050e445c473cb5e4d2e3db1..81010f21f331e28046d02e49a60bc92f26993ddc 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java @@ -71,7 +71,7 @@ public class SAMReader { public static final String NAMING_REPLY_INVALID_KEY = "INVALID_KEY"; public static final String NAMING_REPLY_KEY_NOT_FOUND = "KEY_NOT_FOUND"; - public void helloReplyReceived(boolean ok); + public void helloReplyReceived(boolean ok, String version); public void sessionStatusReceived(String result, String destination, String message); public void streamStatusReceived(String result, int id, String message); public void streamConnectedReceived(String remoteDestination, int id); @@ -159,10 +159,11 @@ public class SAMReader { if ("HELLO".equals(major)) { if ("REPLY".equals(minor)) { String result = params.getProperty("RESULT"); - if ("OK".equals(result)) - _listener.helloReplyReceived(true); + String version= params.getProperty("VERSION"); + if ("OK".equals(result) && version != null) + _listener.helloReplyReceived(true, version); else - _listener.helloReplyReceived(false); + _listener.helloReplyReceived(false, version); } else { _listener.unknownMessageReceived(major, minor, params); } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index 4e034dfa578784a41bdccd16ec90af3045d77791..c93c70a4d597172d951cc36a9474fbe7b89db0c7 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -1,5 +1,6 @@ package net.i2p.sam.client; +import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -9,9 +10,11 @@ import java.util.HashMap; import java.util.Map; import net.i2p.I2PAppContext; +import net.i2p.data.Base32; import net.i2p.data.DataHelper; import net.i2p.util.I2PAppThread; import net.i2p.util.Log; +import net.i2p.util.VersionComparator; /** * Send a file to a peer @@ -26,7 +29,7 @@ public class SAMStreamSend { private final String _samPort; private final String _destFile; private final String _dataFile; - private final String _conOptions; + private String _conOptions; private Socket _samSocket; private OutputStream _samOut; private InputStream _samIn; @@ -38,13 +41,14 @@ public class SAMStreamSend { public static void main(String args[]) { if (args.length < 4) { - System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile"); + System.err.println("Usage: SAMStreamSend samHost samPort peerDestFile dataFile [version]"); return; } - I2PAppContext ctx = new I2PAppContext(); + I2PAppContext ctx = I2PAppContext.getGlobalContext(); //String files[] = new String[args.length - 3]; SAMStreamSend sender = new SAMStreamSend(ctx, args[0], args[1], args[2], args[3]); - sender.startup(); + String version = (args.length >= 5) ? args[4] : "1.0"; + sender.startup(version); } public SAMStreamSend(I2PAppContext ctx, String samHost, String samPort, String destFile, String dataFile) { @@ -60,7 +64,7 @@ public class SAMStreamSend { _remotePeers = new HashMap<Integer,Sender>(); } - public void startup() { + public void startup(String version) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up"); boolean ok = connect(); @@ -71,7 +75,7 @@ public class SAMStreamSend { _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); - String ourDest = handshake(); + String ourDest = handshake(version); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); if (ourDest != null) { @@ -82,6 +86,8 @@ public class SAMStreamSend { private class SendEventHandler extends SAMEventHandler { public SendEventHandler(I2PAppContext ctx) { super(ctx); } + + @Override public void streamClosedReceived(String result, int id, String message) { Sender sender = null; synchronized (_remotePeers) { @@ -92,7 +98,7 @@ public class SAMStreamSend { if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection " + sender.getConnectionId() + " closed to " + sender.getDestination()); } else { - _log.error("wtf, not connected to " + id + " but we were just closed?"); + _log.error("not connected to " + id + " but we were just closed?"); } } } @@ -109,24 +115,32 @@ public class SAMStreamSend { } } - private String handshake() { + private String handshake(String version) { synchronized (_samOut) { try { - _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes()); + _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); _samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); - boolean ok = _eventHandler.waitForHelloReply(); + String hisVersion = _eventHandler.waitForHelloReply(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Hello reply found: " + ok); - if (!ok) - throw new IOException("wtf, hello failed?"); + _log.debug("Hello reply found: " + hisVersion); + if (hisVersion == null) + throw new IOException("Hello failed"); + boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0; + if (isV3) { + byte[] id = new byte[5]; + _context.random().nextBytes(id); + _conOptions = "ID=" + Base32.encode(id); + } String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; _samOut.write(req.getBytes()); _samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); - ok = _eventHandler.waitForSessionCreateReply(); + boolean ok = _eventHandler.waitForSessionCreateReply(); + if (!ok) + throw new IOException("Session create failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create reply found: " + ok); @@ -222,6 +236,7 @@ public class SAMStreamSend { public void run() { _started = _context.clock().now(); _context.statManager().addRateData("send." + _connectionId + ".started", 1, 0); + final long toSend = (new File(_dataFile)).length(); byte data[] = new byte[1024]; long lastSend = _context.clock().now(); while (!_closed) { @@ -249,6 +264,7 @@ public class SAMStreamSend { } } catch (IOException ioe) { _log.error("Error sending", ioe); + break; } } @@ -259,12 +275,14 @@ public class SAMStreamSend { _samOut.flush(); } } catch (IOException ioe) { - _log.error("Error closing", ioe); + _log.info("Error closing", ioe); } closed(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Runner exiting"); + if (toSend != _totalSent) + _log.error("Only sent " + _totalSent + " of " + toSend + " bytes"); // stop the reader, since we're only doing this once for testing // you wouldn't do this in a real application _reader.stopReading(); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index 4117367fcd9b9f625576c4293cae727b692bdb12..786f608013090cecc5321db6a618997cac82330f 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -8,9 +8,13 @@ import java.io.OutputStream; import java.net.Socket; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import net.i2p.I2PAppContext; +import net.i2p.data.Base32; +import net.i2p.data.DataHelper; import net.i2p.util.Log; +import net.i2p.util.VersionComparator; /** * Sit around on a SAM destination, receiving lots of data and @@ -26,7 +30,7 @@ public class SAMStreamSink { private final String _samPort; private final String _destFile; private final String _sinkDir; - private final String _conOptions; + private String _conOptions; private Socket _samSocket; private OutputStream _samOut; private InputStream _samIn; @@ -38,12 +42,13 @@ public class SAMStreamSink { public static void main(String args[]) { if (args.length < 4) { - System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir"); + System.err.println("Usage: SAMStreamSink samHost samPort myDestFile sinkDir [version]"); return; } - I2PAppContext ctx = new I2PAppContext(); + I2PAppContext ctx = I2PAppContext.getGlobalContext(); SAMStreamSink sink = new SAMStreamSink(ctx, args[0], args[1], args[2], args[3]); - sink.startup(); + String version = (args.length >= 5) ? args[4] : "1.0"; + sink.startup(version); } public SAMStreamSink(I2PAppContext ctx, String samHost, String samPort, String destFile, String sinkDir) { @@ -59,7 +64,7 @@ public class SAMStreamSink { _remotePeers = new HashMap<Integer,Sink>(); } - public void startup() { + public void startup(String version) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up"); boolean ok = connect(); @@ -70,14 +75,14 @@ public class SAMStreamSink { _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); - String ourDest = handshake(); + String ourDest = handshake(version); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); if (ourDest != null) { //boolean written = writeDest(ourDest); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("My destination written to " + _destFile); + } else { + _reader.stopReading(); } } } @@ -97,7 +102,7 @@ public class SAMStreamSink { if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection " + sink.getConnectionId() + " closed to " + sink.getDestination()); } else { - _log.error("wtf, not connected to " + id + " but we were just closed?"); + _log.error("not connected to " + id + " but we were just closed?"); } } @@ -110,7 +115,7 @@ public class SAMStreamSink { if (sink != null) { sink.received(data, offset, length); } else { - _log.error("wtf, not connected to " + id + " but we received " + length + "?"); + _log.error("not connected to " + id + " but we received " + length + "?"); } } @@ -142,24 +147,58 @@ public class SAMStreamSink { } } - private String handshake() { + private String handshake(String version) { synchronized (_samOut) { try { - _samOut.write("HELLO VERSION MIN=1.0 MAX=1.0\n".getBytes()); + _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); _samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); - boolean ok = _eventHandler.waitForHelloReply(); + String hisVersion = _eventHandler.waitForHelloReply(); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Hello reply found: " + ok); - if (!ok) - throw new IOException("wtf, hello failed?"); - String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + _destFile + " " + _conOptions + "\n"; + _log.debug("Hello reply found: " + hisVersion); + if (hisVersion == null) + throw new IOException("Hello failed"); + boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0; + String dest; + if (isV3) { + // we use the filename as the name in sam.keys + // and read it in ourselves + File keys = new File("sam.keys"); + if (keys.exists()) { + Properties opts = new Properties(); + DataHelper.loadProps(opts, keys); + String s = opts.getProperty(_destFile); + if (s != null) { + dest = s; + } else { + dest = "TRANSIENT"; + (new File(_destFile)).delete(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Requesting new transient destination"); + } + } else { + dest = "TRANSIENT"; + (new File(_destFile)).delete(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Requesting new transient destination"); + } + byte[] id = new byte[5]; + _context.random().nextBytes(id); + _conOptions = "ID=" + Base32.encode(id); + } else { + // we use the filename as the name in sam.keys + // and give it to the SAM server + dest = _destFile; + } + String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n"; _samOut.write(req.getBytes()); _samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); - ok = _eventHandler.waitForSessionCreateReply(); + boolean ok = _eventHandler.waitForSessionCreateReply(); + if (!ok) + throw new IOException("Session create failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create reply found: " + ok); @@ -175,7 +214,8 @@ public class SAMStreamSink { _log.error("No naming lookup reply found!"); return null; } else { - _log.info(_destFile + " is located at " + destination); + if (_log.shouldInfo()) + _log.info(_destFile + " is located at " + destination); } return destination; } catch (Exception e) { @@ -186,10 +226,18 @@ public class SAMStreamSink { } private boolean writeDest(String dest) { + File f = new File(_destFile); + if (f.exists()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Destination file exists, not overwriting:" + _destFile); + return false; + } FileOutputStream fos = null; try { - fos = new FileOutputStream(_destFile); + fos = new FileOutputStream(f); fos.write(dest.getBytes()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("My destination written to " + _destFile); } catch (Exception e) { _log.error("Error writing to " + _destFile, e); return false; @@ -236,7 +284,7 @@ public class SAMStreamSink { try { _out.close(); } catch (IOException ioe) { - _log.error("Error closing", ioe); + _log.info("Error closing", ioe); } } public void received(byte data[], int offset, int len) {