diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index 37c8cb498..c0cfa48ba 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -31,10 +31,10 @@ public class SAMStreamSink { private final String _destFile; private final String _sinkDir; private String _conOptions; - private SAMReader _reader; + private SAMReader _reader, _reader2; private boolean _isV3; + private String _v3ID; //private boolean _dead; - private final SAMEventHandler _eventHandler; /** Connection id (Integer) to peer (Flooder) */ private final Map _remotePeers; @@ -58,7 +58,6 @@ public class SAMStreamSink { _destFile = destFile; _sinkDir = sinkDir; _conOptions = ""; - _eventHandler = new SinkEventHandler(_context); _remotePeers = new HashMap(); } @@ -67,20 +66,32 @@ public class SAMStreamSink { _log.debug("Starting up"); try { Socket sock = connect(); - _reader = new SAMReader(_context, sock.getInputStream(), _eventHandler); + SAMEventHandler eventHandler = new SinkEventHandler(_context); + _reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); OutputStream out = sock.getOutputStream(); - String ourDest = handshake(out, version, true); + String ourDest = handshake(out, version, true, eventHandler); + if (ourDest == null) + throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); - if (ourDest != null) { - //boolean written = - writeDest(ourDest); - } else { - _reader.stopReading(); + if (_isV3) { + Socket sock2 = connect(); + eventHandler = new SinkEventHandler2(_context, sock2.getInputStream()); + _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); + _reader2.startReading(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Reader2 created"); + out = sock2.getOutputStream(); + String ok = handshake(out, version, false, eventHandler); + if (ok == null) + throw new IOException("2nd handshake failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handshake2 complete."); } + writeDest(ourDest); } catch (IOException e) { _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); } @@ -133,24 +144,115 @@ public class SAMStreamSink { } } } + + private class SinkEventHandler2 extends SinkEventHandler { + + private final InputStream _in; + + public SinkEventHandler2(I2PAppContext ctx, InputStream in) { + super(ctx); + _in = in; + } + + @Override + public void streamStatusReceived(String result, String id, String message) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("got STREAM STATUS, result=" + result); + super.streamStatusReceived(result, id, message); + // with SILENT=true, there's nothing else coming, so fire up the Sink + Sink sink = null; + try { + String dest = "TODO if not silent"; + sink = new Sink(_v3ID, dest); + synchronized (_remotePeers) { + _remotePeers.put(_v3ID, sink); + } + } catch (IOException ioe) { + _log.error("Error creating a new sink", ioe); + try { _in.close(); } catch (IOException ioe2) {} + if (sink != null) + sink.closed(); + return; + } + // inline so the reader doesn't grab the data + try { + boolean gotDest = false; + byte[] dest = new byte[1024]; + int dlen = 0; + byte buf[] = new byte[4096]; + int len; + while((len = _in.read(buf)) >= 0) { + if (!gotDest) { + // eat the dest line + for (int i = 0; i < len; i++) { + byte b = buf[i]; + if (b == (byte) '\n') { + gotDest = true; + if (_log.shouldInfo()) { + try { + _log.info("Got incoming accept from: \"" + new String(dest, 0, dlen, "ISO-8859-1") + '"'); + } catch (IOException uee) {} + } + // feed any remaining to the sink + i++; + if (i < len) + sink.received(buf, i, len - i); + break; + } else { + if (dlen < dest.length) { + dest[dlen++] = b; + } else if (dlen == dest.length) { + dlen++; + _log.error("first line overflow on accept"); + } + } + } + } else { + sink.received(buf, 0, len); + } + } + sink.closed(); + } catch (IOException ioe) { + _log.error("Error reading", ioe); + } finally { + try { _in.close(); } catch (IOException ioe) {} + } + } + } private Socket connect() throws IOException { return new Socket(_samHost, Integer.parseInt(_samPort)); } /** @return our b64 dest or null */ - private String handshake(OutputStream samOut, String version, boolean isMaster) { + private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) { synchronized (samOut) { try { samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); - String hisVersion = _eventHandler.waitForHelloReply(); + String hisVersion = eventHandler.waitForHelloReply(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello reply found: " + hisVersion); if (hisVersion == null) throw new IOException("Hello failed"); + if (!isMaster) { + // only for v3 + //String req = "STREAM ACCEPT SILENT=true ID=" + _v3ID + "\n"; + String req = "STREAM ACCEPT SILENT=false ID=" + _v3ID + "\n"; + samOut.write(req.getBytes()); + samOut.flush(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("STREAM ACCEPT sent"); + // docs were wrong, we do not get a STREAM STATUS if SILENT=true + //boolean ok = eventHandler.waitForStreamStatusReply(); + //if (!ok) + // throw new IOException("Stream status failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("got STREAM STATUS, awaiting connetion"); + return "OK"; + } _isV3 = VersionComparator.comp(hisVersion, "3") >= 0; String dest; if (_isV3) { @@ -178,7 +280,8 @@ public class SAMStreamSink { if (isMaster) { byte[] id = new byte[5]; _context.random().nextBytes(id); - _conOptions = "ID=" + Base32.encode(id); + _v3ID = Base32.encode(id); + _conOptions = "ID=" + _v3ID; } } else { // we use the filename as the name in sam.keys @@ -190,7 +293,7 @@ public class SAMStreamSink { samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); - boolean ok = _eventHandler.waitForSessionCreateReply(); + boolean ok = eventHandler.waitForSessionCreateReply(); if (!ok) throw new IOException("Session create failed"); if (_log.shouldLog(Log.DEBUG)) @@ -201,7 +304,7 @@ public class SAMStreamSink { samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup sent"); - String destination = _eventHandler.waitForNamingReply("ME"); + String destination = eventHandler.waitForNamingReply("ME"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup reply found: " + destination); if (destination == null) {