diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandler.java b/apps/sam/java/src/net/i2p/sam/SAMHandler.java index c7ec10388be2937cbda5a04c2944c76f8162c1fd..3a1f4aaa7c18d060610913dfeb9d3ff8052ef9f0 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandler.java @@ -158,6 +158,8 @@ abstract class SAMHandler implements Runnable, Handler { * unregister with the bridge. */ public void stopHandling() { + if (_log.shouldInfo()) + _log.info("Stopping: " + this, new Exception("I did it")); synchronized (stopLock) { stopHandler = true; } diff --git a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java index 4bb374f8ddb7dc4fd4551336c8b5a48a9b50303b..71a617e25fc8ad436f19e304277cae3487c12577 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java +++ b/apps/sam/java/src/net/i2p/sam/SAMHandlerFactory.java @@ -56,7 +56,7 @@ class SAMHandlerFactory { throw new SAMException("Timeout waiting for HELLO VERSION", e); } catch (IOException e) { throw new SAMException("Error reading from socket", e); - } catch (Exception e) { + } catch (RuntimeException e) { throw new SAMException("Unexpected error", e); } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index a0b5d53097f7c04c863ba23ee11c3392796fb4bd..e72735ecdc46708a3f4727bc7f173cb72af0830e 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -187,7 +187,9 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Caught IOException for message [" + msg + "]", e); - } catch (Exception e) { + } catch (SAMException e) { + _log.error("Unexpected exception for message [" + msg + "]", e); + } catch (RuntimeException e) { _log.error("Unexpected exception for message [" + msg + "]", e); } finally { if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index 246cd5fb986c2aa09430a2ca5f79a8e4698c9d2e..ff17f07b801bf831ccb894d63eed09f5cc4d1e39 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -294,7 +294,7 @@ class SAMv3Handler extends SAMv1Handler if (now - _lastPing >= READ_TIMEOUT) { if (_log.shouldWarn()) _log.warn("Failed to respond to PING"); - writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); + writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); break; } } else { @@ -309,7 +309,7 @@ class SAMv3Handler extends SAMv1Handler if (now - _lastPing >= 2*READ_TIMEOUT) { if (_log.shouldWarn()) _log.warn("Failed to respond to PING"); - writeString("PING STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); + writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"PONG timeout\"\n"); break; } } else if (_lastPing < 0) { @@ -420,9 +420,11 @@ class SAMv3Handler extends SAMv1Handler } // while } catch (IOException e) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Caught IOException for message [" + msg + "]", e); - } catch (Exception e) { - _log.error("Unexpected exception for message [" + msg + "]", e); + _log.debug("Caught IOException in handler", e); + } catch (SAMException e) { + _log.error("Unexpected exception for message [" + msg + ']', e); + } catch (RuntimeException e) { + _log.error("Unexpected exception for message [" + msg + ']', e); } finally { if (_log.shouldLog(Log.DEBUG)) _log.debug("Stopping handler"); @@ -464,6 +466,8 @@ class SAMv3Handler extends SAMv1Handler */ @Override public void stopHandling() { + if (_log.shouldInfo()) + _log.info("Stopping (stolen? " + stolenSocket + "): " + this, new Exception("I did it")); synchronized (stopLock) { stopHandler = true; } @@ -728,14 +732,16 @@ class SAMv3Handler extends SAMv1Handler @Override protected boolean execStreamConnect( Properties props) { + // Messages are NOT sent if SILENT=true, + // The specs said that they were. + boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT")); try { if (props.isEmpty()) { - notifyStreamResult(true,"I2P_ERROR","No parameters specified in STREAM CONNECT message"); + notifyStreamResult(verbose, "I2P_ERROR","No parameters specified in STREAM CONNECT message"); if (_log.shouldLog(Log.DEBUG)) _log.debug("No parameters specified in STREAM CONNECT message"); return false; } - boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT")); String dest = props.getProperty("DESTINATION"); if (dest == null) { @@ -776,6 +782,9 @@ class SAMv3Handler extends SAMv1Handler } private boolean execStreamForwardIncoming( Properties props ) { + // Messages ARE sent if SILENT=true, + // which is different from CONNECT and ACCEPT. + // But this matched the specs. try { try { streamForwardingSocket = true ; @@ -794,6 +803,8 @@ class SAMv3Handler extends SAMv1Handler private boolean execStreamAccept( Properties props ) { + // Messages are NOT sent if SILENT=true, + // The specs said that they were. boolean verbose = !Boolean.parseBoolean(props.getProperty("SILENT")); try { try { diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java index ea20556f33eb6faceb416faaa81df4ebb3536aa3..2f0e8fb85cf567f669e85f0f1b0bc87603ee7b23 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java @@ -14,5 +14,7 @@ public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListe public void streamConnectedReceived(String remoteDestination, String id) {} public void streamDataReceived(String id, byte[] data, int offset, int length) {} public void streamStatusReceived(String result, String id, String message) {} + public void pingReceived(String data) {} + public void pongReceived(String data) {} public void unknownMessageReceived(String major, String minor, Properties params) {} } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java index f8a00b561b80dc8a7131673a9a4e59a69f9db23f..4b763019baa1dde460c1ff7779c03fff2471e316 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java @@ -80,6 +80,8 @@ public class SAMReader { public void streamDataReceived(String id, byte data[], int offset, int length); public void namingReplyReceived(String name, String result, String value, String message); public void destReplyReceived(String publicKey, String privateKey); + public void pingReceived(String data); + public void pongReceived(String data); public void unknownMessageReceived(String major, String minor, Properties params); } @@ -118,13 +120,13 @@ public class SAMReader { StringTokenizer tok = new StringTokenizer(line); - if (tok.countTokens() < 2) { + if (tok.countTokens() <= 0) { _log.error("Invalid SAM line: [" + line + "]"); break; } String major = tok.nextToken(); - String minor = tok.nextToken(); + String minor = tok.hasMoreTokens() ? tok.nextToken() : ""; params.clear(); while (tok.hasMoreTokens()) { @@ -247,6 +249,12 @@ public class SAMReader { } else { _listener.unknownMessageReceived(major, minor, params); } + } else if ("PING".equals(major)) { + // this omits anything after a space + _listener.pingReceived(minor); + } else if ("PONG".equals(major)) { + // this omits anything after a space + _listener.pongReceived(minor); } else { _listener.unknownMessageReceived(major, minor, params); } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index 2681122cea9b85403359a3fdf925171236447624..9c36091d02619e75c37c4d008b67e9f0a2b583aa 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -174,7 +174,7 @@ public class SAMStreamSend { _log.info("We are " + destination); } return destination; - } catch (Exception e) { + } catch (IOException e) { _log.error("Error handshaking", e); return null; } @@ -327,6 +327,12 @@ public class SAMStreamSend { _reader2.stopReading(); // stop the reader, since we're only doing this once for testing // you wouldn't do this in a real application + if (_isV3) { + // closing the master socket too fast will kill the data socket flushing through + try { + Thread.sleep(10000); + } catch (InterruptedException ie) {} + } _reader.stopReading(); } } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index c0cfa48ba55205f1ada5ba0276ba2fc529bc4c28..1289691ea6384bf004ac8110b85e76e8b743c257 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -66,12 +66,12 @@ public class SAMStreamSink { _log.debug("Starting up"); try { Socket sock = connect(); - SAMEventHandler eventHandler = new SinkEventHandler(_context); + OutputStream out = sock.getOutputStream(); + SAMEventHandler eventHandler = new SinkEventHandler(_context, out); _reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); - OutputStream out = sock.getOutputStream(); String ourDest = handshake(out, version, true, eventHandler); if (ourDest == null) throw new IOException("handshake failed"); @@ -79,12 +79,12 @@ public class SAMStreamSink { _log.debug("Handshake complete. we are " + ourDest); if (_isV3) { Socket sock2 = connect(); - eventHandler = new SinkEventHandler2(_context, sock2.getInputStream()); + out = sock2.getOutputStream(); + eventHandler = new SinkEventHandler2(_context, sock2.getInputStream(), out); _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); _reader2.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader2 created"); - out = sock2.getOutputStream(); String ok = handshake(out, version, false, eventHandler); if (ok == null) throw new IOException("2nd handshake failed"); @@ -99,7 +99,12 @@ public class SAMStreamSink { private class SinkEventHandler extends SAMEventHandler { - public SinkEventHandler(I2PAppContext ctx) { super(ctx); } + protected final OutputStream _out; + + public SinkEventHandler(I2PAppContext ctx, OutputStream out) { + super(ctx); + _out = out; + } @Override public void streamClosedReceived(String result, String id, String message) { @@ -143,14 +148,28 @@ public class SAMStreamSink { _log.error("Error creating a new sink", ioe); } } + + @Override + public void pingReceived(String data) { + if (_log.shouldInfo()) + _log.info("Got PING " + data + ", sending PONG " + data); + synchronized (_out) { + try { + _out.write(("PONG " + data + '\n').getBytes()); + _out.flush(); + } catch (IOException ioe) { + _log.error("PONG fail", ioe); + } + } + } } private class SinkEventHandler2 extends SinkEventHandler { private final InputStream _in; - public SinkEventHandler2(I2PAppContext ctx, InputStream in) { - super(ctx); + public SinkEventHandler2(I2PAppContext ctx, InputStream in, OutputStream out) { + super(ctx, out); _in = in; } @@ -159,10 +178,9 @@ public class SAMStreamSink { if (_log.shouldLog(Log.DEBUG)) _log.debug("got STREAM STATUS, result=" + result); super.streamStatusReceived(result, id, message); - // with SILENT=true, there's nothing else coming, so fire up the Sink Sink sink = null; try { - String dest = "TODO if not silent"; + String dest = "TODO_if_not_silent"; sink = new Sink(_v3ID, dest); synchronized (_remotePeers) { _remotePeers.put(_v3ID, sink); @@ -315,7 +333,7 @@ public class SAMStreamSink { _log.info(_destFile + " is located at " + destination); } return destination; - } catch (Exception e) { + } catch (IOException e) { _log.error("Error handshaking", e); return null; } @@ -337,7 +355,7 @@ public class SAMStreamSink { fos.write(dest.getBytes()); if (_log.shouldLog(Log.DEBUG)) _log.debug("My destination written to " + _destFile); - } catch (Exception e) { + } catch (IOException e) { _log.error("Error writing to " + _destFile, e); return false; } finally {