From dd7d993631bd40c5ac16fcf3a7488d2fbc8e6f0b Mon Sep 17 00:00:00 2001 From: sponge <sponge@mail.i2p> Date: Thu, 25 Sep 2008 23:59:01 +0000 Subject: [PATCH] Added Simple true/false storage class to the utilities Added socketSoTimeout CHANGED RetransmissionTimer is now public FIXED SimpleTimer has a way to be stopped, and reap it's children FIXED Lots of javadoc additions, where I could CLEANUP all code that needed to catch the timeout exception for socketSoTimeout --- .../net/i2p/i2ptunnel/I2PTunnelServer.java | 3 + .../i2p/client/streaming/I2PServerSocket.java | 17 +- .../client/streaming/I2PServerSocketImpl.java | 55 +++++-- .../client/streaming/StreamSinkServer.java | 55 +++++-- .../client/streaming/ConnectionHandler.java | 1 + .../client/streaming/ConnectionManager.java | 148 +++++++++++++----- .../client/streaming/I2PServerSocketFull.java | 38 ++++- .../i2p/client/streaming/I2PSocketFull.java | 62 +++++--- .../streaming/I2PSocketManagerFull.java | 146 ++++++++++++++--- .../client/streaming/RetransmissionTimer.java | 2 +- core/java/src/net/i2p/util/Executor.java | 31 +++- core/java/src/net/i2p/util/SimpleStore.java | 35 +++++ core/java/src/net/i2p/util/SimpleTimer.java | 93 ++++++++--- 13 files changed, 540 insertions(+), 146 deletions(-) create mode 100644 core/java/src/net/i2p/util/SimpleStore.java diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 014d91e9bf..ce7b230f2c 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -12,6 +12,7 @@ import java.net.ConnectException; import java.net.InetAddress; import java.net.Socket; import java.net.SocketException; +import java.net.SocketTimeoutException; import java.util.Iterator; import java.util.Properties; @@ -219,6 +220,8 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { if (_log.shouldLog(Log.ERROR)) _log.error("Error accepting", ce); // not killing the server.. + } catch(SocketTimeoutException ste) { + // ignored, we never set the timeout } } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 726d462ceb..a925b53546 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -2,6 +2,7 @@ package net.i2p.client.streaming; import java.net.ConnectException; +import java.net.SocketTimeoutException; import net.i2p.I2PException; /** @@ -9,6 +10,7 @@ import net.i2p.I2PException; * */ public interface I2PServerSocket { + /** * Closes the socket. */ @@ -24,8 +26,21 @@ public interface I2PServerSocket { * @throws I2PException if there is a problem with reading a new socket * from the data available (aka the I2PSession closed, etc) * @throws ConnectException if the I2PServerSocket is closed + * @throws SocketTimeoutException + */ + public I2PSocket accept() throws I2PException, ConnectException, SocketTimeoutException; + + /** + * Set Sock Option accept timeout + * @param x + */ + public void setSoTimeout(long x); + + /** + * Get Sock Option accept timeout + * @return timeout */ - public I2PSocket accept() throws I2PException, ConnectException; + public long getSoTimeout(); /** * Access the manager which is coordinating the server socket diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 965ba31bff..c9b16b1b28 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -17,19 +17,33 @@ import net.i2p.util.Log; * */ class I2PServerSocketImpl implements I2PServerSocket { + private final static Log _log = new Log(I2PServerSocketImpl.class); private I2PSocketManager mgr; /** list of sockets waiting for the client to accept them */ private List pendingSockets = Collections.synchronizedList(new ArrayList(4)); - /** have we been closed */ private volatile boolean closing = false; - /** lock on this when accepting a pending socket, and wait on it for notification of acceptance */ private Object socketAcceptedLock = new Object(); /** lock on this when adding a new socket to the pending list, and wait on it accordingly */ private Object socketAddedLock = new Object(); + /** + * Set Sock Option accept timeout stub, does nothing + * @param x + */ + public void setSoTimeout(long x) { + } + + /** + * Get Sock Option accept timeout stub, does nothing + * @return timeout + */ + public long getSoTimeout() { + return -1; + } + public I2PServerSocketImpl(I2PSocketManager mgr) { this.mgr = mgr; } @@ -47,19 +61,22 @@ class I2PServerSocketImpl implements I2PServerSocket { * @throws ConnectException if the I2PServerSocket is closed */ public I2PSocket accept() throws I2PException, ConnectException { - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.debug("accept() called, pending: " + pendingSockets.size()); - + } I2PSocket ret = null; while ( (ret == null) && (!closing) ){ while (pendingSockets.size() <= 0) { - if (closing) throw new ConnectException("I2PServerSocket closed"); + if(closing) { + throw new ConnectException("I2PServerSocket closed"); + } try { synchronized(socketAddedLock) { socketAddedLock.wait(); } - } catch (InterruptedException ie) {} + } catch(InterruptedException ie) { + } } synchronized (pendingSockets) { if (pendingSockets.size() > 0) { @@ -73,8 +90,9 @@ class I2PServerSocketImpl implements I2PServerSocket { } } - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.debug("TIMING: handed out accept result " + ret.hashCode()); + } return ret; } @@ -88,12 +106,13 @@ class I2PServerSocketImpl implements I2PServerSocket { * or the socket was closed */ public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); - + } if (closing) { - if (_log.shouldLog(Log.WARN)) + if(_log.shouldLog(Log.WARN)) { _log.warn("Already closing the socket"); + } return false; } @@ -110,14 +129,16 @@ class I2PServerSocketImpl implements I2PServerSocket { while (pendingSockets.contains(s)) { long now = clock.now(); if (now >= end) { - if (_log.shouldLog(Log.INFO)) + if(_log.shouldLog(Log.INFO)) { _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); + } pendingSockets.remove(s); return false; } if (closing) { - if (_log.shouldLog(Log.WARN)) + if(_log.shouldLog(Log.WARN)) { _log.warn("Server socket closed while waiting for accept"); + } pendingSockets.remove(s); return false; } @@ -126,11 +147,13 @@ class I2PServerSocketImpl implements I2PServerSocket { synchronized (socketAcceptedLock) { socketAcceptedLock.wait(remaining); } - } catch (InterruptedException ie) {} + } catch(InterruptedException ie) { + } } long now = clock.now(); - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); + } return true; } @@ -146,5 +169,7 @@ class I2PServerSocketImpl implements I2PServerSocket { } } - public I2PSocketManager getManager() { return mgr; } + public I2PSocketManager getManager() { + return mgr; + } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java index c8b566190c..ad8d07989e 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/StreamSinkServer.java @@ -5,6 +5,7 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.util.Properties; import net.i2p.I2PAppContext; @@ -20,6 +21,7 @@ import net.i2p.util.Log; * */ public class StreamSinkServer { + private Log _log; private String _sinkDir; private String _destFile; @@ -36,6 +38,7 @@ public class StreamSinkServer { public StreamSinkServer(String sinkDir, String ourDestFile) { this(sinkDir, ourDestFile, null, -1, 3); } + public StreamSinkServer(String sinkDir, String ourDestFile, String i2cpHost, int i2cpPort, int handlers) { _sinkDir = sinkDir; _destFile = ourDestFile; @@ -52,13 +55,15 @@ public class StreamSinkServer { */ public void runServer() { I2PSocketManager mgr = null; - if (_i2cpHost != null) + if(_i2cpHost != null) { mgr = I2PSocketManagerFactory.createManager(_i2cpHost, _i2cpPort, new Properties()); - else + } else { mgr = I2PSocketManagerFactory.createManager(); + } Destination dest = mgr.getSession().getMyDestination(); - if (_log.shouldLog(Log.INFO)) + if(_log.shouldLog(Log.INFO)) { _log.info("Listening for connections on: " + dest.calculateHash().toBase64()); + } FileOutputStream fos = null; try { fos = new FileOutputStream(_destFile); @@ -70,7 +75,12 @@ public class StreamSinkServer { _log.error("Error formatting the destination", dfe); return; } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} + if(fos != null) { + try { + fos.close(); + } catch(IOException ioe) { + } + } } I2PServerSocket sock = mgr.getServerSocket(); @@ -91,22 +101,28 @@ public class StreamSinkServer { * */ private class ClientRunner implements Runnable { + private I2PServerSocket _socket; + public ClientRunner(I2PServerSocket socket) { _socket = socket; } + public void run() { while (true) { try { I2PSocket socket = _socket.accept(); - if (socket != null) + if(socket != null) { handle(socket); + } } catch (I2PException ie) { _log.error("Error accepting connection", ie); return; } catch (ConnectException ce) { _log.error("Connection already dropped", ce); return; + } catch(SocketTimeoutException ste) { + // ignored } } } @@ -115,12 +131,14 @@ public class StreamSinkServer { FileOutputStream fos = null; try { File sink = new File(_sinkDir); - if (!sink.exists()) + if(!sink.exists()) { sink.mkdirs(); + } File cur = File.createTempFile("clientSink", ".dat", sink); fos = new FileOutputStream(cur); - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.debug("Writing to " + cur.getAbsolutePath()); + } } catch (IOException ioe) { _log.error("Error creating sink", ioe); return; @@ -135,17 +153,28 @@ public class StreamSinkServer { while ( (read = in.read(buf)) != -1) { //_fos.write(buf, 0, read); written += read; - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.debug("read and wrote " + read + " (" + written + ")"); } + } fos.write(("written: [" + written + "]\n").getBytes()); long lifetime = System.currentTimeMillis() - start; _log.info("Got EOF from client socket [written=" + written + " lifetime=" + lifetime + "]"); } catch (IOException ioe) { _log.error("Error writing the sink", ioe); } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} - if (sock != null) try { sock.close(); } catch (IOException ioe) {} + if(fos != null) { + try { + fos.close(); + } catch(IOException ioe) { + } + } + if(sock != null) { + try { + sock.close(); + } catch(IOException ioe) { + } + } _log.debug("Client socket closed"); } } @@ -174,7 +203,8 @@ public class StreamSinkServer { if (args.length == 5) { try { handlers = Integer.parseInt(args[4]); - } catch (NumberFormatException nfe) {} + } catch(NumberFormatException nfe) { + } } try { int port = Integer.parseInt(args[1]); @@ -186,7 +216,8 @@ public class StreamSinkServer { default: System.out.println("Usage: StreamSinkServer [i2cpHost i2cpPort] sinkDir ourDestFile [handlers]"); } - if (server != null) + if(server != null) { server.runServer(); } } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 4960f1a228..f05ae1c8cf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -1,5 +1,6 @@ package net.i2p.client.streaming; +import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.List; diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index dcc93c5ec0..d2223c1812 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -21,6 +21,7 @@ import net.i2p.util.SimpleTimer; * */ public class ConnectionManager { + private I2PAppContext _context; private Log _log; private I2PSession _session; @@ -39,6 +40,7 @@ public class ConnectionManager { private ConnectionOptions _defaultOptions; private volatile int _numWaiting; private Object _connectionLock; + private long SoTimeout; public ConnectionManager(I2PAppContext context, I2PSession session, int maxConcurrent, ConnectionOptions defaultOptions) { _context = context; @@ -58,6 +60,9 @@ public class ConnectionManager { _maxConcurrentStreams = maxConcurrent; _defaultOptions = defaultOptions; _numWaiting = 0; + /** Socket timeout for accept() */ + SoTimeout = -1; + _context.statManager().createRateStat("stream.con.lifetimeMessagesSent", "How many messages do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeMessagesReceived", "How many messages do we receive on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("stream.con.lifetimeBytesSent", "How many bytes do we send on a stream?", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); @@ -75,6 +80,7 @@ public class ConnectionManager { return (Connection)_connectionByInboundId.get(new Long(id)); } } + /** * not guaranteed to be unique, but in case we receive more than one packet * on an inbound connection that we havent ack'ed yet... @@ -83,16 +89,34 @@ public class ConnectionManager { synchronized (_connectionLock) { for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if (DataHelper.eq(con.getSendStreamId(), id)) + if(DataHelper.eq(con.getSendStreamId(), id)) { return con; } } + } return null; } + /** + * Set the socket accept() timeout. + * @param x + */ + public void MsetSoTimeout(long x) { + SoTimeout = x; + } + + /** + * Get the socket accept() timeout. + * @return + */ + public long MgetSoTimeout() { + return SoTimeout; + } + public void setAllowIncomingConnections(boolean allow) { _connectionHandler.setActive(allow); } + /** should we acceot connections, or just reject everyone? */ public boolean getAllowIncomingConnections() { return _connectionHandler.getActive(); @@ -113,9 +137,10 @@ public class ConnectionManager { synchronized (_connectionLock) { total = _connectionByInboundId.size(); for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { - if ( ((Connection)iter.next()).getIsConnected() ) + if(((Connection)iter.next()).getIsConnected()) { active++; } + } if (locked_tooManyStreams()) { reject = true; } else { @@ -135,9 +160,9 @@ public class ConnectionManager { _context.statManager().addRateData("stream.receiveActive", active, total); if (reject) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refusing connection since we have exceeded our max of " - + _maxConcurrentStreams + " connections"); + if(_log.shouldLog(Log.WARN)) { + _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " connections"); + } PacketLocal reply = new PacketLocal(_context, synPacket.getOptionalFrom()); reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); @@ -163,7 +188,6 @@ public class ConnectionManager { _context.statManager().addRateData("stream.connectionReceived", 1, 0); return con; } - private static final long DEFAULT_STREAM_DELAY_MAX = 10*1000; /** @@ -176,15 +200,16 @@ public class ConnectionManager { Connection con = null; long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; long expiration = _context.clock().now() + opts.getConnectTimeout(); - if (opts.getConnectTimeout() <= 0) + if(opts.getConnectTimeout() <= 0) { expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; + } _numWaiting++; while (true) { long remaining = expiration - _context.clock().now(); if (remaining <= 0) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refusing to connect since we have exceeded our max of " - + _maxConcurrentStreams + " connections"); + if(_log.shouldLog(Log.WARN)) { + _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections"); + } _numWaiting--; return null; } @@ -193,16 +218,18 @@ public class ConnectionManager { if (locked_tooManyStreams()) { // allow a full buffer of pending/waiting streams if (_numWaiting > _maxConcurrentStreams) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refusing connection since we have exceeded our max of " - + _maxConcurrentStreams + " and there are " + _numWaiting - + " waiting already"); + if(_log.shouldLog(Log.WARN)) { + _log.warn("Refusing connection since we have exceeded our max of " + _maxConcurrentStreams + " and there are " + _numWaiting + " waiting already"); + } _numWaiting--; return null; } // no remaining streams, lets wait a bit - try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} + try { + _connectionLock.wait(remaining); + } catch(InterruptedException ie) { + } } else { con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); con.setRemotePeer(peer); @@ -224,35 +251,53 @@ public class ConnectionManager { if (opts.getConnectDelay() <= 0) { con.waitForConnect(); } - if (_numWaiting > 0) + if(_numWaiting > 0) { _numWaiting--; - + } _context.statManager().addRateData("stream.connectionCreated", 1, 0); return con; } private boolean locked_tooManyStreams() { - if (_maxConcurrentStreams <= 0) return false; - if (_connectionByInboundId.size() < _maxConcurrentStreams) return false; + if(_maxConcurrentStreams <= 0) { + return false; + } + if(_connectionByInboundId.size() < _maxConcurrentStreams) { + return false; + } int active = 0; for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if (con.getIsConnected()) + if(con.getIsConnected()) { active++; } + } - if ( (_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO)) ) - _log.info("More than 100 connections! " + active - + " total: " + _connectionByInboundId.size()); - + if((_connectionByInboundId.size() > 100) && (_log.shouldLog(Log.INFO))) { + _log.info("More than 100 connections! " + active + " total: " + _connectionByInboundId.size()); + } return (active >= _maxConcurrentStreams); } - public MessageHandler getMessageHandler() { return _messageHandler; } - public PacketHandler getPacketHandler() { return _packetHandler; } - public ConnectionHandler getConnectionHandler() { return _connectionHandler; } - public I2PSession getSession() { return _session; } - public PacketQueue getPacketQueue() { return _outboundQueue; } + public MessageHandler getMessageHandler() { + return _messageHandler; + } + + public PacketHandler getPacketHandler() { + return _packetHandler; + } + + public ConnectionHandler getConnectionHandler() { + return _connectionHandler; + } + + public I2PSession getSession() { + return _session; + } + + public PacketQueue getPacketQueue() { + return _outboundQueue; + } /** * Something b0rked hard, so kill all of our connections without mercy. @@ -279,11 +324,12 @@ public class ConnectionManager { synchronized (_connectionLock) { Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); removed = (o == con); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Connection removed? " + removed + " remaining: " - + _connectionByInboundId.size() + ": " + con); - if (!removed && _log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { + _log.debug("Connection removed? " + removed + " remaining: " + _connectionByInboundId.size() + ": " + con); + } + if(!removed && _log.shouldLog(Log.DEBUG)) { _log.debug("Failed to remove " + con +"\n" + _connectionByInboundId.values()); + } _connectionLock.notifyAll(); } if (removed) { @@ -309,9 +355,11 @@ public class ConnectionManager { public boolean ping(Destination peer, long timeoutMs) { return ping(peer, timeoutMs, true); } + public boolean ping(Destination peer, long timeoutMs, boolean blocking) { return ping(peer, timeoutMs, blocking, null, null, null); } + public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); PacketLocal packet = new PacketLocal(_context, peer); @@ -335,8 +383,12 @@ public class ConnectionManager { if (blocking) { synchronized (req) { - if (!req.pongReceived()) - try { req.wait(timeoutMs); } catch (InterruptedException ie) {} + if(!req.pongReceived()) { + try { + req.wait(timeoutMs); + } catch(InterruptedException ie) { + } + } } synchronized (_pendingPings) { @@ -351,12 +403,15 @@ public class ConnectionManager { } interface PingNotifier { + public void pingComplete(boolean ok); } private class PingFailed implements SimpleTimer.TimedEvent { + private Long _id; private PingNotifier _notifier; + public PingFailed(Long id, PingNotifier notifier) { _id = id; _notifier = notifier; @@ -366,29 +421,35 @@ public class ConnectionManager { boolean removed = false; synchronized (_pendingPings) { Object o = _pendingPings.remove(_id); - if (o != null) + if(o != null) { removed = true; } + } if (removed) { - if (_notifier != null) + if(_notifier != null) { _notifier.pingComplete(false); - if (_log.shouldLog(Log.INFO)) + } + if(_log.shouldLog(Log.INFO)) { _log.info("Ping failed"); } } } + } private class PingRequest { + private boolean _ponged; private Destination _peer; private PacketLocal _packet; private PingNotifier _notifier; + public PingRequest(Destination peer, PacketLocal packet, PingNotifier notifier) { _ponged = false; _peer = peer; _packet = packet; _notifier = notifier; } + public void pong() { _log.debug("Ping successful"); _context.sessionKeyManager().tagsDelivered(_peer.getPublicKey(), _packet.getKeyUsed(), _packet.getTagsSent()); @@ -396,10 +457,14 @@ public class ConnectionManager { _ponged = true; ConnectionManager.PingRequest.this.notifyAll(); } - if (_notifier != null) + if(_notifier != null) { _notifier.pingComplete(true); } - public boolean pongReceived() { return _ponged; } + } + + public boolean pongReceived() { + return _ponged; + } } void receivePong(long pingId) { @@ -407,7 +472,8 @@ public class ConnectionManager { synchronized (_pendingPings) { req = (PingRequest)_pendingPings.remove(new Long(pingId)); } - if (req != null) + if(req != null) { req.pong(); } } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java index b1a4175f24..68f8c0045e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -1,5 +1,8 @@ package net.i2p.client.streaming; +import java.net.SocketTimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; import net.i2p.I2PException; /** @@ -7,17 +10,46 @@ import net.i2p.I2PException; * */ public class I2PServerSocketFull implements I2PServerSocket { + private I2PSocketManagerFull _socketManager; + /** + * + * @param mgr + */ public I2PServerSocketFull(I2PSocketManagerFull mgr) { _socketManager = mgr; } - public I2PSocket accept() throws I2PException { + /** + * + * @return + * @throws net.i2p.I2PException + * @throws SocketTimeoutException + */ + public I2PSocket accept() throws I2PException, SocketTimeoutException { return _socketManager.receiveSocket(); } - public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } + public long getSoTimeout() { + return _socketManager.getConnectionManager().MgetSoTimeout(); + } - public I2PSocketManager getManager() { return _socketManager; } + public void setSoTimeout(long x) { + _socketManager.getConnectionManager().MsetSoTimeout(x); + } + /** + * Close the connection. + */ + public void close() { + _socketManager.getConnectionManager().setAllowIncomingConnections(false); + } + + /** + * + * @return _socketManager + */ + public I2PSocketManager getManager() { + return _socketManager; + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 61dd487578..2ea85270f9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -11,6 +11,7 @@ import net.i2p.data.Destination; * */ public class I2PSocketFull implements I2PSocket { + private Connection _connection; private I2PSocket.SocketErrorListener _listener; private Destination _remotePeer; @@ -24,9 +25,12 @@ public class I2PSocketFull implements I2PSocket { } } + public void close() throws IOException { Connection c = _connection; - if (c == null) return; + if(c == null) { + return; + } if (c.getIsConnected()) { OutputStream out = c.getOutputStream(); if (out != null) { @@ -44,58 +48,71 @@ public class I2PSocketFull implements I2PSocket { destroy(); } - Connection getConnection() { return _connection; } + Connection getConnection() { + return _connection; + } public InputStream getInputStream() { Connection c = _connection; - if (c != null) + if(c != null) { return c.getInputStream(); - else + } else { return null; } + } public I2PSocketOptions getOptions() { Connection c = _connection; - if (c != null) + if(c != null) { return c.getOptions(); - else + } else { return null; } + } public OutputStream getOutputStream() throws IOException { Connection c = _connection; - if (c != null) + if(c != null) { return c.getOutputStream(); - else + } else { return null; } + } - public Destination getPeerDestination() { return _remotePeer; } + public Destination getPeerDestination() { + return _remotePeer; + } public long getReadTimeout() { I2PSocketOptions opts = getOptions(); - if (opts != null) + if(opts != null) { return opts.getReadTimeout(); - else + } else { return -1; } + } - public Destination getThisDestination() { return _localPeer; } + public Destination getThisDestination() { + return _localPeer; + } public void setOptions(I2PSocketOptions options) { Connection c = _connection; - if (c == null) return; - - if (options instanceof ConnectionOptions) + if(c == null) { + return; + } + if(options instanceof ConnectionOptions) { c.setOptions((ConnectionOptions)options); - else + } else { c.setOptions(new ConnectionOptions(options)); } + } public void setReadTimeout(long ms) { Connection c = _connection; - if (c == null) return; - + if(c == null) { + return; + } c.getInputStream().setReadTimeout((int)ms); c.getOptions().setReadTimeout(ms); } @@ -116,14 +133,17 @@ public class I2PSocketFull implements I2PSocket { Connection c = _connection; _connection = null; _listener = null; - if (c != null) + if(c != null) { c.disconnectComplete(); } + } + public String toString() { Connection c = _connection; - if (c == null) + if(c == null) { return super.toString(); - else + } else { return c.toString(); } } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java index 7384a4972f..5e79db0ede 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -1,6 +1,7 @@ package net.i2p.client.streaming; import java.net.NoRouteToHostException; +import java.net.SocketTimeoutException; import java.util.HashSet; import java.util.Iterator; import java.util.Properties; @@ -13,7 +14,6 @@ import net.i2p.client.I2PSessionException; import net.i2p.data.Destination; import net.i2p.util.Log; - /** * Centralize the coordination and multiplexing of the local client's streaming. * There should be one I2PSocketManager for each I2PSession, and if an application @@ -23,6 +23,7 @@ import net.i2p.util.Log; * */ public class I2PSocketManagerFull implements I2PSocketManager { + private I2PAppContext _context; private Log _log; private I2PSession _session; @@ -33,27 +34,41 @@ public class I2PSocketManagerFull implements I2PSocketManager { private int _maxStreams; private static int __managerId = 0; private ConnectionManager _connectionManager; - /** * How long to wait for the client app to accept() before sending back CLOSE? * This includes the time waiting in the queue. Currently set to 5 seconds. */ private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; + /** + * + */ public I2PSocketManagerFull() { _context = null; _session = null; } + + /** + * + * @param context + * @param session + * @param opts + * @param name + */ public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { this(); init(context, session, opts, name); } - /** how many streams will we allow at once? */ public static final String PROP_MAX_STREAMS = "i2p.streaming.maxConcurrentStreams"; /** * + * + * @param context + * @param session + * @param opts + * @param name */ public void init(I2PAppContext context, I2PSession session, Properties opts, String name) { _context = context; @@ -65,8 +80,9 @@ public class I2PSocketManagerFull implements I2PSocketManager { String num = (opts != null ? opts.getProperty(PROP_MAX_STREAMS, "-1") : "-1"); _maxStreams = Integer.parseInt(num); } catch (NumberFormatException nfe) { - if (_log.shouldLog(Log.WARN)) + if(_log.shouldLog(Log.WARN)) { _log.warn("Invalid max # of concurrent streams, defaulting to unlimited", nfe); + } _maxStreams = -1; } _name = name + " " + (++__managerId); @@ -76,44 +92,77 @@ public class I2PSocketManagerFull implements I2PSocketManager { _serverSocket = new I2PServerSocketFull(this); if (_log.shouldLog(Log.INFO)) { - _log.info("Socket manager created. \ndefault options: " + _defaultOptions - + "\noriginal properties: " + opts); + _log.info("Socket manager created. \ndefault options: " + _defaultOptions + "\noriginal properties: " + opts); } } - public I2PSocketOptions buildOptions() { return buildOptions(null); } + /** + * + * @return + */ + public I2PSocketOptions buildOptions() { + return buildOptions(null); + } + + /** + * + * @param opts + * @return + */ public I2PSocketOptions buildOptions(Properties opts) { ConnectionOptions curOpts = new ConnectionOptions(_defaultOptions); curOpts.setProperties(opts); return curOpts; } + /** + * + * @return + */ public I2PSession getSession() { return _session; } + /** + * + * @return + */ public ConnectionManager getConnectionManager() { return _connectionManager; } - public I2PSocket receiveSocket() throws I2PException { + /** + * + * @return + * @throws net.i2p.I2PException + * @throws java.net.SocketTimeoutException + */ + public I2PSocket receiveSocket() throws I2PException, SocketTimeoutException { verifySession(); - Connection con = _connectionManager.getConnectionHandler().accept(-1); - if (_log.shouldLog(Log.DEBUG)) + Connection con = _connectionManager.getConnectionHandler().accept(_connectionManager.MgetSoTimeout()); + if(_log.shouldLog(Log.DEBUG)) { _log.debug("receiveSocket() called: " + con); + } if (con != null) { I2PSocketFull sock = new I2PSocketFull(con); con.setSocket(sock); return sock; } else { + if(_connectionManager.MgetSoTimeout() == -1) { return null; } + throw new SocketTimeoutException("I2PSocket timed out"); + } } /** * Ping the specified peer, returning true if they replied to the ping within * the timeout specified, false otherwise. This call blocks. * + * + * @param peer + * @param timeoutMs + * @return */ public boolean ping(Destination peer, long timeoutMs) { return _connectionManager.ping(peer, timeoutMs); @@ -125,25 +174,47 @@ public class I2PSocketManagerFull implements I2PSocketManager { * * @param ms milliseconds to wait, maximum */ - public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } - public long getAcceptTimeout() { return _acceptTimeout; } + public void setAcceptTimeout(long ms) { + _acceptTimeout = ms; + } + + /** + * + * @return + */ + public long getAcceptTimeout() { + return _acceptTimeout; + } + /** + * + * @param options + */ public void setDefaultOptions(I2PSocketOptions options) { _defaultOptions = new ConnectionOptions((ConnectionOptions) options); } + /** + * + * @return + */ public I2PSocketOptions getDefaultOptions() { return _defaultOptions; } + /** + * + * @return + */ public I2PServerSocket getServerSocket() { _connectionManager.setAllowIncomingConnections(true); return _serverSocket; } private void verifySession() throws I2PException { - if (!_connectionManager.getSession().isClosed()) + if(!_connectionManager.getSession().isClosed()) { return; + } _connectionManager.getSession().connect(); } @@ -159,20 +230,22 @@ public class I2PSocketManagerFull implements I2PSocketManager { public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException, NoRouteToHostException { verifySession(); - if (options == null) + if(options == null) { options = _defaultOptions; + } ConnectionOptions opts = null; - if (options instanceof ConnectionOptions) + if(options instanceof ConnectionOptions) { opts = new ConnectionOptions((ConnectionOptions)options); - else + } else { opts = new ConnectionOptions(options); - - if (_log.shouldLog(Log.INFO)) - _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0,6) - + " with options: " + opts); + } + if(_log.shouldLog(Log.INFO)) { + _log.info("Connecting to " + peer.calculateHash().toBase64().substring(0, 6) + " with options: " + opts); + } Connection con = _connectionManager.connect(peer, opts); - if (con == null) + if(con == null) { throw new TooManyStreamsException("Too many streams (max " + _maxStreams + ")"); + } I2PSocketFull socket = new I2PSocketFull(con); con.setSocket(socket); if (con.getConnectionError() != null) { @@ -187,6 +260,7 @@ public class I2PSocketManagerFull implements I2PSocketManager { * * @param peer Destination to connect to * + * @return * @throws NoRouteToHostException if the peer is not found or not reachable * @throws I2PException if there is some other I2P-related problem */ @@ -216,25 +290,49 @@ public class I2PSocketManagerFull implements I2PSocketManager { /** * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. * + * + * @return */ public Set listSockets() { Set connections = _connectionManager.listConnections(); Set rv = new HashSet(connections.size()); for (Iterator iter = connections.iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); - if (con.getSocket() != null) + if(con.getSocket() != null) { rv.add(con.getSocket()); } + } return rv; } - public String getName() { return _name; } - public void setName(String name) { _name = name; } + /** + * + * @return + */ + public String getName() { + return _name; + } + /** + * + * @param name + */ + public void setName(String name) { + _name = name; + } + /** + * + * @param lsnr + */ public void addDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { _connectionManager.getMessageHandler().addDisconnectListener(lsnr); } + + /** + * + * @param lsnr + */ public void removeDisconnectListener(I2PSocketManager.DisconnectListener lsnr) { _connectionManager.getMessageHandler().removeDisconnectListener(lsnr); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java index 0ea0c83d70..c52c373b17 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/RetransmissionTimer.java @@ -5,7 +5,7 @@ import net.i2p.util.SimpleTimer; /** * */ -class RetransmissionTimer extends SimpleTimer { +public class RetransmissionTimer extends SimpleTimer { private static final RetransmissionTimer _instance = new RetransmissionTimer(); public static final SimpleTimer getInstance() { return _instance; } protected RetransmissionTimer() { super("StreamingTimer"); } diff --git a/core/java/src/net/i2p/util/Executor.java b/core/java/src/net/i2p/util/Executor.java index e3c1b6fbfe..bdb728f97c 100644 --- a/core/java/src/net/i2p/util/Executor.java +++ b/core/java/src/net/i2p/util/Executor.java @@ -5,22 +5,32 @@ import java.util.List; import net.i2p.I2PAppContext; class Executor implements Runnable { + private I2PAppContext _context; private Log _log; private List _readyEvents; - public Executor(I2PAppContext ctx, Log log, List events) { + private SimpleStore runn; + + public Executor(I2PAppContext ctx, Log log, List events, SimpleStore x) { _context = ctx; _readyEvents = events; + runn = x; } + public void run() { - while (true) { + while(runn.getAnswer()) { SimpleTimer.TimedEvent evt = null; synchronized (_readyEvents) { - if (_readyEvents.size() <= 0) - try { _readyEvents.wait(); } catch (InterruptedException ie) {} - if (_readyEvents.size() > 0) + if(_readyEvents.size() <= 0) { + try { + _readyEvents.wait(); + } catch(InterruptedException ie) { + } + } + if(_readyEvents.size() > 0) { evt = (SimpleTimer.TimedEvent)_readyEvents.remove(0); } + } if (evt != null) { long before = _context.clock().now(); @@ -30,17 +40,24 @@ class Executor implements Runnable { log("wtf, event borked: " + evt, t); } long time = _context.clock().now() - before; - if ( (time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN)) ) + if((time > 1000) && (_log != null) && (_log.shouldLog(Log.WARN))) { _log.warn("wtf, event execution took " + time + ": " + evt); } } } + } + /** + * + * @param msg + * @param t + */ private void log(String msg, Throwable t) { synchronized (this) { - if (_log == null) + if(_log == null) { _log = I2PAppContext.getGlobalContext().logManager().getLog(SimpleTimer.class); } + } _log.log(Log.CRIT, msg, t); } } diff --git a/core/java/src/net/i2p/util/SimpleStore.java b/core/java/src/net/i2p/util/SimpleStore.java new file mode 100644 index 0000000000..b73a8e7ebb --- /dev/null +++ b/core/java/src/net/i2p/util/SimpleStore.java @@ -0,0 +1,35 @@ +/* + * This is free software, do as you please. + */ + +package net.i2p.util; + +/** + * + * @author sponge + */ +public class SimpleStore { + + private boolean answer; + + SimpleStore(boolean x) { + answer=x; + } + + /** + * set the answer + * + * @param x + */ + public void setAnswer(boolean x) { + answer = x; + } + /** + * + * @return boolean + */ + public boolean getAnswer() { + return answer; + } + +} diff --git a/core/java/src/net/i2p/util/SimpleTimer.java b/core/java/src/net/i2p/util/SimpleTimer.java index 9543f72c55..e5725a9212 100644 --- a/core/java/src/net/i2p/util/SimpleTimer.java +++ b/core/java/src/net/i2p/util/SimpleTimer.java @@ -16,8 +16,12 @@ import net.i2p.I2PAppContext; * */ public class SimpleTimer { + private static final SimpleTimer _instance = new SimpleTimer(); - public static SimpleTimer getInstance() { return _instance; } + + public static SimpleTimer getInstance() { + return _instance; + } private I2PAppContext _context; private Log _log; /** event time (Long) to event (TimedEvent) mapping */ @@ -25,9 +29,21 @@ public class SimpleTimer { /** event (TimedEvent) to event time (Long) mapping */ private Map _eventTimes; private List _readyEvents; + private SimpleStore runn; + + /** + * + */ + protected SimpleTimer() { + this("SimpleTimer"); + } - protected SimpleTimer() { this("SimpleTimer"); } + /** + * + * @param name + */ protected SimpleTimer(String name) { + runn = new SimpleStore(true); _context = I2PAppContext.getGlobalContext(); _log = _context.logManager().getLog(SimpleTimer.class); _events = new TreeMap(); @@ -38,13 +54,28 @@ public class SimpleTimer { runner.setDaemon(true); runner.start(); for (int i = 0; i < 3; i++) { - I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents)); + I2PThread executor = new I2PThread(new Executor(_context, _log, _readyEvents, runn)); executor.setName(name + "Executor " + i); executor.setDaemon(true); executor.start(); } } + /** + * Removes the SimpleTimer. + */ + public void removeSimpleTimer() { + synchronized(_events) { + runn.setAnswer(false); + _events.notifyAll(); + } + } + + /** + * + * @param event + * @param timeoutMs + */ public void reschedule(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, false); } @@ -55,9 +86,16 @@ public class SimpleTimer { * for the earlier of the two timeouts, which may be before this stated * timeout. If this is not the desired behavior, call removeEvent first. * + * @param event + * @param timeoutMs */ - public void addEvent(TimedEvent event, long timeoutMs) { addEvent(event, timeoutMs, true); } + public void addEvent(TimedEvent event, long timeoutMs) { + addEvent(event, timeoutMs, true); + } + /** + * @param event + * @param timeoutMs * @param useEarliestTime if its already scheduled, use the earlier of the * two timeouts, else use the later */ @@ -86,8 +124,9 @@ public class SimpleTimer { } } } - while (_events.containsKey(time)) + while(_events.containsKey(time)) { time = new Long(time.longValue() + 1); + } _events.put(time, event); _eventTimes.put(event, time); @@ -107,24 +146,33 @@ public class SimpleTimer { _events.notifyAll(); } if (time.longValue() > eventTime + 100) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Lots of timer congestion, had to push " + event + " back " - + (time.longValue()-eventTime) + "ms (# events: " + totalEvents + ")"); + if(_log.shouldLog(Log.WARN)) { + _log.warn("Lots of timer congestion, had to push " + event + " back " + (time.longValue() - eventTime) + "ms (# events: " + totalEvents + ")"); + } } long timeToAdd = System.currentTimeMillis() - now; if (timeToAdd > 50) { - if (_log.shouldLog(Log.WARN)) + if(_log.shouldLog(Log.WARN)) { _log.warn("timer contention: took " + timeToAdd + "ms to add a job with " + totalEvents + " queued"); } + } } + /** + * + * @param evt + * @return + */ public boolean removeEvent(TimedEvent evt) { - if (evt == null) return false; + if(evt == null) { + return false; + } synchronized (_events) { Long when = (Long)_eventTimes.remove(evt); - if (when != null) + if(when != null) { _events.remove(when); + } return null != when; } } @@ -133,6 +181,7 @@ public class SimpleTimer { * Simple interface for events to be queued up and notified on expiration */ public interface TimedEvent { + /** * the time requested has been reached (this call should NOT block, * otherwise the whole SimpleTimer gets backed up) @@ -140,15 +189,15 @@ public class SimpleTimer { */ public void timeReached(); } - private long _occurredTime; private long _occurredEventCount; - private TimedEvent _recentEvents[] = new TimedEvent[5]; - + // not used + // private TimedEvent _recentEvents[] = new TimedEvent[5]; private class SimpleTimerRunner implements Runnable { + public void run() { List eventsToFire = new ArrayList(1); - while (true) { + while(runn.getAnswer()) { try { synchronized (_events) { //if (_events.size() <= 0) @@ -158,8 +207,10 @@ public class SimpleTimer { long now = System.currentTimeMillis(); long nextEventDelay = -1; Object nextEvent = null; - while (true) { - if (_events.size() <= 0) break; + while(runn.getAnswer()) { + if(_events.size() <= 0) { + break; + } Long when = (Long)_events.firstKey(); if (when.longValue() <= now) { TimedEvent evt = (TimedEvent)_events.remove(when); @@ -175,16 +226,15 @@ public class SimpleTimer { } if (eventsToFire.size() <= 0) { if (nextEventDelay != -1) { - if (_log.shouldLog(Log.DEBUG)) + if(_log.shouldLog(Log.DEBUG)) { _log.debug("Next event in " + nextEventDelay + ": " + nextEvent); + } _events.wait(nextEventDelay); } else { _events.wait(); } } } - } catch (ThreadDeath td) { - return; // die } catch (InterruptedException ie) { // ignore } catch (Throwable t) { @@ -200,8 +250,9 @@ public class SimpleTimer { now = now - (now % 1000); synchronized (_readyEvents) { - for (int i = 0; i < eventsToFire.size(); i++) + for(int i = 0; i < eventsToFire.size(); i++) { _readyEvents.add(eventsToFire.get(i)); + } _readyEvents.notifyAll(); } -- GitLab