diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java index 3e02a04be9405b06a9e00b18d1609e5877df6ec2..ea20556f33eb6faceb416faaa81df4ebb3536aa3 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMClientEventListenerImpl.java @@ -10,9 +10,9 @@ public class SAMClientEventListenerImpl implements SAMReader.SAMClientEventListe public void helloReplyReceived(boolean ok, String version) {} public void namingReplyReceived(String name, String result, String value, String message) {} public void sessionStatusReceived(String result, String destination, String message) {} - public void streamClosedReceived(String result, int id, String message) {} - public void streamConnectedReceived(String remoteDestination, int id) {} - public void streamDataReceived(int id, byte[] data, int offset, int length) {} - public void streamStatusReceived(String result, int id, String message) {} + public void streamClosedReceived(String result, String id, String message) {} + public void streamConnectedReceived(String remoteDestination, String id) {} + public void streamDataReceived(String id, byte[] data, int offset, int length) {} + public void streamStatusReceived(String result, String id, String message) {} public void unknownMessageReceived(String major, String minor, Properties params) {} } diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java index 40916c7eb740f5c48668a041ae9bf79a927a8e3a..ae17b893d7412d24a256725bcdfb04fbd0a2ac0f 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMEventHandler.java @@ -18,8 +18,10 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { private String _version; private final Object _helloLock = new Object(); private Boolean _sessionCreateOk; + private Boolean _streamStatusOk; private final Object _sessionCreateLock = new Object(); private final Object _namingReplyLock = new Object(); + private final Object _streamStatusLock = new Object(); private final Map<String,String> _namingReplies = new HashMap<String,String>(); public SAMEventHandler(I2PAppContext ctx) { @@ -27,7 +29,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { _log = ctx.logManager().getLog(getClass()); } - @Override + @Override public void helloReplyReceived(boolean ok, String version) { synchronized (_helloLock) { if (ok) @@ -39,7 +41,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } - @Override + @Override public void sessionStatusReceived(String result, String destination, String msg) { synchronized (_sessionCreateLock) { if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result)) @@ -50,7 +52,7 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } - @Override + @Override public void namingReplyReceived(String name, String result, String value, String msg) { synchronized (_namingReplyLock) { if (SAMReader.SAMClientEventListener.NAMING_REPLY_OK.equals(result)) @@ -61,7 +63,18 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } } - @Override + @Override + public void streamStatusReceived(String result, String id, String message) { + synchronized (_streamStatusLock) { + if (SAMReader.SAMClientEventListener.SESSION_STATUS_OK.equals(result)) + _streamStatusOk = Boolean.TRUE; + else + _streamStatusOk = Boolean.FALSE; + _streamStatusLock.notifyAll(); + } + } + + @Override public void unknownMessageReceived(String major, String minor, Properties params) { _log.error("Unhandled message: [" + major + "] [" + minor + "] [" + params + "]"); } @@ -106,6 +119,24 @@ public class SAMEventHandler extends SAMClientEventListenerImpl { } catch (InterruptedException ie) {} } } + + /** + * Wait for the stream to be created, returning true if everything went ok + * + * @return true if everything ok + */ + public boolean waitForStreamStatusReply() { + while (true) { + try { + synchronized (_streamStatusLock) { + if (_streamStatusOk == null) + _streamStatusLock.wait(); + else + return _streamStatusOk.booleanValue(); + } + } catch (InterruptedException ie) {} + } + } /** * Return the destination found matching the name, or null if the key was diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java index 81010f21f331e28046d02e49a60bc92f26993ddc..966d977cb7b95d14c1bd57dc62f9241201d237b4 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMReader.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMReader.java @@ -73,10 +73,10 @@ public class SAMReader { public void helloReplyReceived(boolean ok, String version); public void sessionStatusReceived(String result, String destination, String message); - public void streamStatusReceived(String result, int id, String message); - public void streamConnectedReceived(String remoteDestination, int id); - public void streamClosedReceived(String result, int id, String message); - public void streamDataReceived(int id, byte data[], int offset, int length); + public void streamStatusReceived(String result, String id, String message); + public void streamConnectedReceived(String remoteDestination, String id); + public void streamClosedReceived(String result, String id, String message); + public void streamDataReceived(String id, byte data[], int offset, int length); public void namingReplyReceived(String name, String result, String value, String message); public void destReplyReceived(String publicKey, String privateKey); @@ -181,24 +181,17 @@ public class SAMReader { String result = params.getProperty("RESULT"); String id = params.getProperty("ID"); String msg = params.getProperty("MESSAGE"); - if (id != null) { - try { - _listener.streamStatusReceived(result, Integer.parseInt(id), msg); - } catch (NumberFormatException nfe) { - _listener.unknownMessageReceived(major, minor, params); - } - } else { - _listener.unknownMessageReceived(major, minor, params); - } + // id is null in v3, so pass it through regardless + //if (id != null) { + _listener.streamStatusReceived(result, id, msg); + //} else { + // _listener.unknownMessageReceived(major, minor, params); + //} } else if ("CONNECTED".equals(minor)) { String dest = params.getProperty("DESTINATION"); String id = params.getProperty("ID"); if (id != null) { - try { - _listener.streamConnectedReceived(dest, Integer.parseInt(id)); - } catch (NumberFormatException nfe) { - _listener.unknownMessageReceived(major, minor, params); - } + _listener.streamConnectedReceived(dest, id); } else { _listener.unknownMessageReceived(major, minor, params); } @@ -207,11 +200,7 @@ public class SAMReader { String id = params.getProperty("ID"); String msg = params.getProperty("MESSAGE"); if (id != null) { - try { - _listener.streamClosedReceived(result, Integer.parseInt(id), msg); - } catch (NumberFormatException nfe) { - _listener.unknownMessageReceived(major, minor, params); - } + _listener.streamClosedReceived(result, id, msg); } else { _listener.unknownMessageReceived(major, minor, params); } @@ -220,7 +209,6 @@ public class SAMReader { String size = params.getProperty("SIZE"); if (id != null) { try { - int idVal = Integer.parseInt(id); int sizeVal = Integer.parseInt(size); byte data[] = new byte[sizeVal]; @@ -228,7 +216,7 @@ public class SAMReader { if (read != sizeVal) { _listener.unknownMessageReceived(major, minor, params); } else { - _listener.streamDataReceived(idVal, data, 0, sizeVal); + _listener.streamDataReceived(id, data, 0, sizeVal); } } catch (NumberFormatException nfe) { _listener.unknownMessageReceived(major, minor, params); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java index c93c70a4d597172d951cc36a9474fbe7b89db0c7..8b219124f748974c472aa152d79310fd2d95bb4b 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSend.java @@ -30,14 +30,12 @@ public class SAMStreamSend { private final String _destFile; private final String _dataFile; private String _conOptions; - private Socket _samSocket; - private OutputStream _samOut; - private InputStream _samIn; - private SAMReader _reader; + private SAMReader _reader, _reader2; + private boolean _isV3; + private String _v3ID; //private boolean _dead; - private final SAMEventHandler _eventHandler; /** Connection id (Integer) to peer (Flooder) */ - private final Map<Integer, Sender> _remotePeers; + private final Map<String, Sender> _remotePeers; public static void main(String args[]) { if (args.length < 4) { @@ -60,27 +58,48 @@ public class SAMStreamSend { _destFile = destFile; _dataFile = dataFile; _conOptions = ""; - _eventHandler = new SendEventHandler(_context); - _remotePeers = new HashMap<Integer,Sender>(); + _remotePeers = new HashMap<String, Sender>(); } public void startup(String version) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up"); - boolean ok = connect(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Connected: " + ok); - if (ok) { - _reader = new SAMReader(_context, _samIn, _eventHandler); + try { + Socket sock = connect(); + SAMEventHandler eventHandler = new SendEventHandler(_context); + _reader = new SAMReader(_context, sock.getInputStream(), eventHandler); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); - String ourDest = handshake(version); + OutputStream out = sock.getOutputStream(); + String ourDest = handshake(out, version, true, eventHandler); + if (ourDest == null) + throw new IOException("handshake failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); + if (_isV3) { + Socket sock2 = connect(); + eventHandler = new SendEventHandler(_context); + _reader2 = new SAMReader(_context, sock2.getInputStream(), eventHandler); + _reader2.startReading(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Reader2 created"); + out = sock2.getOutputStream(); + String ok = handshake(out, version, false, eventHandler); + if (ok == null) + throw new IOException("2nd handshake failed"); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Handshake2 complete."); + } if (ourDest != null) { - send(); + send(out, eventHandler); } + } catch (IOException e) { + _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); + if (_reader != null) + _reader.stopReading(); + if (_reader2 != null) + _reader2.stopReading(); } } @@ -88,10 +107,10 @@ public class SAMStreamSend { public SendEventHandler(I2PAppContext ctx) { super(ctx); } @Override - public void streamClosedReceived(String result, int id, String message) { + public void streamClosedReceived(String result, String id, String message) { Sender sender = null; synchronized (_remotePeers) { - sender = _remotePeers.remove(Integer.valueOf(id)); + sender = _remotePeers.remove(id); } if (sender != null) { sender.closed(); @@ -103,53 +122,49 @@ public class SAMStreamSend { } } - private boolean connect() { - try { - _samSocket = new Socket(_samHost, Integer.parseInt(_samPort)); - _samOut = _samSocket.getOutputStream(); - _samIn = _samSocket.getInputStream(); - return true; - } catch (Exception e) { - _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); - return false; - } + private Socket connect() throws IOException { + return new Socket(_samHost, Integer.parseInt(_samPort)); } - private String handshake(String version) { - synchronized (_samOut) { + /** @return our b64 dest or null */ + private String handshake(OutputStream samOut, String version, boolean isMaster, SAMEventHandler eventHandler) { + synchronized (samOut) { try { - _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); - _samOut.flush(); + samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); + samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); - String hisVersion = _eventHandler.waitForHelloReply(); + String hisVersion = eventHandler.waitForHelloReply(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello reply found: " + hisVersion); if (hisVersion == null) throw new IOException("Hello failed"); - boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0; - if (isV3) { + if (!isMaster) + return "OK"; + _isV3 = VersionComparator.comp(hisVersion, "3") >= 0; + if (_isV3) { byte[] id = new byte[5]; _context.random().nextBytes(id); - _conOptions = "ID=" + Base32.encode(id); + _v3ID = Base32.encode(id); + _conOptions = "ID=" + _v3ID; } String req = "SESSION CREATE STYLE=STREAM DESTINATION=TRANSIENT " + _conOptions + "\n"; - _samOut.write(req.getBytes()); - _samOut.flush(); + samOut.write(req.getBytes()); + samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); - boolean ok = _eventHandler.waitForSessionCreateReply(); + boolean ok = eventHandler.waitForSessionCreateReply(); if (!ok) throw new IOException("Session create failed"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create reply found: " + ok); req = "NAMING LOOKUP NAME=ME\n"; - _samOut.write(req.getBytes()); - _samOut.flush(); + samOut.write(req.getBytes()); + samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup sent"); - String destination = _eventHandler.waitForNamingReply("ME"); + String destination = eventHandler.waitForNamingReply("ME"); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup reply found: " + destination); if (destination == null) { @@ -166,8 +181,8 @@ public class SAMStreamSend { } } - private void send() { - Sender sender = new Sender(); + private void send(OutputStream samOut, SAMEventHandler eventHandler) { + Sender sender = new Sender(samOut, eventHandler); boolean ok = sender.openConnection(); if (ok) { I2PAppThread t = new I2PAppThread(sender, "Sender"); @@ -176,14 +191,26 @@ public class SAMStreamSend { } private class Sender implements Runnable { - private int _connectionId; + private final String _connectionId; private String _remoteDestination; private InputStream _in; private volatile boolean _closed; private long _started; private long _totalSent; + private final OutputStream _samOut; + private final SAMEventHandler _eventHandler; - public Sender() {} + public Sender(OutputStream samOut, SAMEventHandler eventHandler) { + _samOut = samOut; + _eventHandler = eventHandler; + synchronized (_remotePeers) { + if (_v3ID != null) + _connectionId = _v3ID; + else + _connectionId = Integer.toString(_remotePeers.size() + 1); + _remotePeers.put(_connectionId, Sender.this); + } + } public boolean openConnection() { FileInputStream fin = null; @@ -193,10 +220,6 @@ public class SAMStreamSend { int read = DataHelper.read(fin, dest); _remoteDestination = new String(dest, 0, read); - synchronized (_remotePeers) { - _connectionId = _remotePeers.size() + 1; - _remotePeers.put(Integer.valueOf(_connectionId), Sender.this); - } _context.statManager().createRateStat("send." + _connectionId + ".totalSent", "Data size sent", "swarm", new long[] { 30*1000, 60*1000, 5*60*1000 }); _context.statManager().createRateStat("send." + _connectionId + ".started", "When we start", "swarm", new long[] { 5*60*1000 }); @@ -207,6 +230,10 @@ public class SAMStreamSend { _samOut.write(msg); _samOut.flush(); } + _log.debug("STREAM CONNECT sent, waiting for STREAM STATUS..."); + boolean ok = _eventHandler.waitForStreamStatusReply(); + if (!ok) + throw new IOException("STREAM CONNECT failed"); _in = new FileInputStream(_dataFile); return true; @@ -222,7 +249,7 @@ public class SAMStreamSend { } } - public int getConnectionId() { return _connectionId; } + public String getConnectionId() { return _connectionId; } public String getDestination() { return _remoteDestination; } public void closed() { @@ -252,9 +279,11 @@ public class SAMStreamSend { _log.debug("Sending " + read + " on " + _connectionId + " after " + (now-lastSend)); lastSend = now; - byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes(); synchronized (_samOut) { - _samOut.write(msg); + if (!_isV3) { + byte msg[] = ("STREAM SEND ID=" + _connectionId + " SIZE=" + read + "\n").getBytes(); + _samOut.write(msg); + } _samOut.write(data, 0, read); _samOut.flush(); } @@ -268,14 +297,23 @@ public class SAMStreamSend { } } - byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes(); - try { - synchronized (_samOut) { - _samOut.write(msg); - _samOut.flush(); + if (_isV3) { + try { + _samOut.close(); + } catch (IOException ioe) { + _log.info("Error closing", ioe); + } + } else { + byte msg[] = ("STREAM CLOSE ID=" + _connectionId + "\n").getBytes(); + try { + synchronized (_samOut) { + _samOut.write(msg); + _samOut.flush(); + _samOut.close(); + } + } catch (IOException ioe) { + _log.info("Error closing", ioe); } - } catch (IOException ioe) { - _log.info("Error closing", ioe); } closed(); @@ -283,6 +321,8 @@ public class SAMStreamSend { _log.debug("Runner exiting"); if (toSend != _totalSent) _log.error("Only sent " + _totalSent + " of " + toSend + " bytes"); + if (_reader2 != null) + _reader2.stopReading(); // stop the reader, since we're only doing this once for testing // you wouldn't do this in a real application _reader.stopReading(); diff --git a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java index 786f608013090cecc5321db6a618997cac82330f..37c8cb4987a8ea470d378a035aacee2a978c7ed8 100644 --- a/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java +++ b/apps/sam/java/src/net/i2p/sam/client/SAMStreamSink.java @@ -31,14 +31,12 @@ public class SAMStreamSink { private final String _destFile; private final String _sinkDir; private String _conOptions; - private Socket _samSocket; - private OutputStream _samOut; - private InputStream _samIn; private SAMReader _reader; + private boolean _isV3; //private boolean _dead; private final SAMEventHandler _eventHandler; /** Connection id (Integer) to peer (Flooder) */ - private final Map<Integer, Sink> _remotePeers; + private final Map<String, Sink> _remotePeers; public static void main(String args[]) { if (args.length < 4) { @@ -61,21 +59,20 @@ public class SAMStreamSink { _sinkDir = sinkDir; _conOptions = ""; _eventHandler = new SinkEventHandler(_context); - _remotePeers = new HashMap<Integer,Sink>(); + _remotePeers = new HashMap<String, Sink>(); } public void startup(String version) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Starting up"); - boolean ok = connect(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Connected: " + ok); - if (ok) { - _reader = new SAMReader(_context, _samIn, _eventHandler); + try { + Socket sock = connect(); + _reader = new SAMReader(_context, sock.getInputStream(), _eventHandler); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Reader created"); - String ourDest = handshake(version); + OutputStream out = sock.getOutputStream(); + String ourDest = handshake(out, version, true); if (_log.shouldLog(Log.DEBUG)) _log.debug("Handshake complete. we are " + ourDest); if (ourDest != null) { @@ -84,6 +81,8 @@ public class SAMStreamSink { } else { _reader.stopReading(); } + } catch (IOException e) { + _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); } } @@ -92,10 +91,10 @@ public class SAMStreamSink { public SinkEventHandler(I2PAppContext ctx) { super(ctx); } @Override - public void streamClosedReceived(String result, int id, String message) { + public void streamClosedReceived(String result, String id, String message) { Sink sink = null; synchronized (_remotePeers) { - sink = _remotePeers.remove(Integer.valueOf(id)); + sink = _remotePeers.remove(id); } if (sink != null) { sink.closed(); @@ -107,10 +106,10 @@ public class SAMStreamSink { } @Override - public void streamDataReceived(int id, byte data[], int offset, int length) { + public void streamDataReceived(String id, byte data[], int offset, int length) { Sink sink = null; synchronized (_remotePeers) { - sink = _remotePeers.get(Integer.valueOf(id)); + sink = _remotePeers.get(id); } if (sink != null) { sink.received(data, offset, length); @@ -120,14 +119,14 @@ public class SAMStreamSink { } @Override - public void streamConnectedReceived(String dest, int id) { + public void streamConnectedReceived(String dest, String id) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection " + id + " received from " + dest); try { Sink sink = new Sink(id, dest); synchronized (_remotePeers) { - _remotePeers.put(Integer.valueOf(id), sink); + _remotePeers.put(id, sink); } } catch (IOException ioe) { _log.error("Error creating a new sink", ioe); @@ -135,23 +134,16 @@ public class SAMStreamSink { } } - private boolean connect() { - try { - _samSocket = new Socket(_samHost, Integer.parseInt(_samPort)); - _samOut = _samSocket.getOutputStream(); - _samIn = _samSocket.getInputStream(); - return true; - } catch (Exception e) { - _log.error("Unable to connect to SAM at " + _samHost + ":" + _samPort, e); - return false; - } + private Socket connect() throws IOException { + return new Socket(_samHost, Integer.parseInt(_samPort)); } - private String handshake(String version) { - synchronized (_samOut) { + /** @return our b64 dest or null */ + private String handshake(OutputStream samOut, String version, boolean isMaster) { + synchronized (samOut) { try { - _samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); - _samOut.flush(); + samOut.write(("HELLO VERSION MIN=1.0 MAX=" + version + '\n').getBytes()); + samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Hello sent"); String hisVersion = _eventHandler.waitForHelloReply(); @@ -159,9 +151,9 @@ public class SAMStreamSink { _log.debug("Hello reply found: " + hisVersion); if (hisVersion == null) throw new IOException("Hello failed"); - boolean isV3 = VersionComparator.comp(hisVersion, "3") >= 0; + _isV3 = VersionComparator.comp(hisVersion, "3") >= 0; String dest; - if (isV3) { + if (_isV3) { // we use the filename as the name in sam.keys // and read it in ourselves File keys = new File("sam.keys"); @@ -183,17 +175,19 @@ public class SAMStreamSink { if (_log.shouldLog(Log.DEBUG)) _log.debug("Requesting new transient destination"); } - byte[] id = new byte[5]; - _context.random().nextBytes(id); - _conOptions = "ID=" + Base32.encode(id); + if (isMaster) { + byte[] id = new byte[5]; + _context.random().nextBytes(id); + _conOptions = "ID=" + Base32.encode(id); + } } else { // we use the filename as the name in sam.keys // and give it to the SAM server dest = _destFile; } String req = "SESSION CREATE STYLE=STREAM DESTINATION=" + dest + " " + _conOptions + "\n"; - _samOut.write(req.getBytes()); - _samOut.flush(); + samOut.write(req.getBytes()); + samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Session create sent"); boolean ok = _eventHandler.waitForSessionCreateReply(); @@ -203,8 +197,8 @@ public class SAMStreamSink { _log.debug("Session create reply found: " + ok); req = "NAMING LOOKUP NAME=ME\n"; - _samOut.write(req.getBytes()); - _samOut.flush(); + samOut.write(req.getBytes()); + samOut.flush(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Naming lookup sent"); String destination = _eventHandler.waitForNamingReply("ME"); @@ -227,11 +221,13 @@ public class SAMStreamSink { private boolean writeDest(String dest) { File f = new File(_destFile); +/* if (f.exists()) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Destination file exists, not overwriting:" + _destFile); + _log.debug("Destination file exists, not overwriting: " + _destFile); return false; } +*/ FileOutputStream fos = null; try { fos = new FileOutputStream(f); @@ -248,14 +244,14 @@ public class SAMStreamSink { } private class Sink { - private final int _connectionId; + private final String _connectionId; private final String _remoteDestination; private volatile boolean _closed; private final long _started; private long _lastReceivedOn; private final OutputStream _out; - public Sink(int conId, String remDest) throws IOException { + public Sink(String conId, String remDest) throws IOException { _connectionId = conId; _remoteDestination = remDest; _closed = false; @@ -273,7 +269,7 @@ public class SAMStreamSink { _started = _context.clock().now(); } - public int getConnectionId() { return _connectionId; } + public String getConnectionId() { return _connectionId; } public String getDestination() { return _remoteDestination; } public void closed() {