diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 3397c38ca3a0e34b017d584a049e9b534e66b367..82373ed06baf43d5423685d421987df4a0365876 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -341,7 +341,7 @@ public class SAMStreamSession { } /** - * Remove and close a SAM STREAM session socket handler. + * Remove and gracefully close a SAM STREAM session socket handler. * * @param id Handler id to be removed */ @@ -357,12 +357,12 @@ public class SAMStreamSession { if (reader != null) reader.stopRunning(); if (sender != null) - sender.stopRunning(); - _log.debug("Removed SAM STREAM session socket handler " + id); + sender.shutDownGracefully(); + _log.debug("Removed SAM STREAM session socket handler (gracefully) " + id); } /** - * Remove and close all the socket handlers managed by this SAM + * Remove and hard close all the socket handlers managed by this SAM * STREAM session. * */ @@ -378,7 +378,7 @@ public class SAMStreamSession { while (iter.hasNext()) { id = (Integer)iter.next(); ((SAMStreamSessionSocketReader)handlersMap.get(id)).stopRunning(); - ((StreamSender)sendersMap.get(id)).stopRunning(); + ((StreamSender)sendersMap.get(id)).shutDownGracefully(); } handlersMap.clear(); sendersMap.clear(); @@ -498,25 +498,20 @@ public class SAMStreamSession { } /** - * Stop a SAM STREAM session socket reader + * Stop a SAM STREAM session socket reader thead immediately. * */ public void stopRunning() { - _log.debug("stopRunning() invoked on socket handler " + id); + _log.debug("stopRunning() invoked on socket reader " + id); synchronized (runningLock) { if (stillRunning) { stillRunning = false; - try { - i2pSocket.close(); - } catch (IOException e) { - _log.debug("Caught IOException", e); - } } } } public void run() { - _log.debug("SAM STREAM session socket handler running"); + _log.debug("run() called for socket reader " + id); int read = -1; byte[] data = new byte[SOCKET_HANDLER_BUF_SIZE]; @@ -568,7 +563,9 @@ public class SAMStreamSession { private int _id; private ByteCache _cache; private OutputStream _out = null; - private boolean _stillRunning; + private boolean _stillRunning, _shuttingDownGracefully; + private Object runningLock = new Object(); + private I2PSocket i2pSocket = null; public StreamSender(I2PSocket s, int id) throws IOException { _data = new ArrayList(1); @@ -576,6 +573,8 @@ public class SAMStreamSession { _cache = ByteCache.getInstance(4, 32*1024); _out = s.getOutputStream(); _stillRunning = true; + _shuttingDownGracefully = false; + i2pSocket = s; } /** @@ -602,28 +601,54 @@ public class SAMStreamSession { } /** - * Stop a SAM STREAM session socket sender + * Stop a SAM STREAM session socket sender thread immediately * */ public void stopRunning() { _log.debug("stopRunning() invoked on socket sender " + _id); - _stillRunning = false; - synchronized (_data) { - _data.clear(); - _data.notifyAll(); + synchronized (runningLock) { + if (_stillRunning) { + _stillRunning = false; + try { + i2pSocket.close(); + } catch (IOException e) { + _log.debug("Caught IOException", e); + } + synchronized (_data) { + _data.clear(); + _data.notifyAll(); + } + } } } + /** + * Stop a SAM STREAM session socket sender gracefully: stop the + * sender thread once all pending data has been sent. + */ + public void shutDownGracefully() { + _log.debug("shutDownGracefully() invoked on socket sender " + _id); + _shuttingDownGracefully = true; + } + public void run() { + _log.debug("run() called for socket sender " + _id); ByteArray data = null; while (_stillRunning) { data = null; try { synchronized (_data) { - if (_data.size() > 0) + if (_data.size() > 0) { data = (ByteArray)_data.remove(0); - else + } else if (_shuttingDownGracefully) { + /* No data left and shutting down gracefully? + If so, stop the sender. */ + stopRunning(); + break; + } else { + /* Wait for data. */ _data.wait(5000); + } } if (data != null) {