From 19c6760ea7f6f8a1a6fe6c80c42d5ddbf511927e Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Wed, 1 Dec 2010 18:57:05 +0000 Subject: [PATCH] * I2CP: Change from the internal pseudo-socket that was implemented in 0.7.9 to an internal Queue that directly passes I2CPMessage objects. For in-JVM clients, this eliminates two writer threads per client and avoids the serialization/deserialization of I2CP messages. --- core/java/src/net/i2p/I2PAppContext.java | 10 +++ .../net/i2p/client/ClientWriterRunner.java | 19 +--- .../src/net/i2p/client/I2PSessionImpl.java | 57 ++++++++---- .../src/net/i2p/client/I2PSimpleSession.java | 31 +++++-- .../net/i2p/data/i2cp/I2CPMessageReader.java | 22 +++-- .../net/i2p/internal/I2CPMessageQueue.java | 51 +++++++++++ .../i2p/internal/InternalClientManager.java | 19 ++++ .../net/i2p/internal/PoisonI2CPMessage.java | 58 ++++++++++++ .../i2p/internal/QueuedI2CPMessageReader.java | 55 ++++++++++++ core/java/src/net/i2p/internal/package.html | 7 ++ .../src/net/i2p/router/RouterContext.java | 22 +++-- .../router/client/ClientConnectionRunner.java | 18 +--- .../net/i2p/router/client/ClientManager.java | 28 ++++-- .../client/ClientManagerFacadeImpl.java | 17 +++- .../i2p/router/client/ClientWriterRunner.java | 19 +--- .../router/client/I2CPMessageQueueImpl.java | 57 ++++++++++++ .../client/InternalClientListenerRunner.java | 89 ------------------- .../client/QueuedClientConnectionRunner.java | 76 ++++++++++++++++ 18 files changed, 476 insertions(+), 179 deletions(-) create mode 100644 core/java/src/net/i2p/internal/I2CPMessageQueue.java create mode 100644 core/java/src/net/i2p/internal/InternalClientManager.java create mode 100644 core/java/src/net/i2p/internal/PoisonI2CPMessage.java create mode 100644 core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java create mode 100644 core/java/src/net/i2p/internal/package.html create mode 100644 router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java delete mode 100644 router/java/src/net/i2p/router/client/InternalClientListenerRunner.java create mode 100644 router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java diff --git a/core/java/src/net/i2p/I2PAppContext.java b/core/java/src/net/i2p/I2PAppContext.java index b497796408..458aee31e5 100644 --- a/core/java/src/net/i2p/I2PAppContext.java +++ b/core/java/src/net/i2p/I2PAppContext.java @@ -22,6 +22,7 @@ import net.i2p.crypto.SHA256Generator; import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.TransientSessionKeyManager; import net.i2p.data.RoutingKeyGenerator; +import net.i2p.internal.InternalClientManager; import net.i2p.stat.StatManager; import net.i2p.util.Clock; import net.i2p.util.ConcurrentHashSet; @@ -843,4 +844,13 @@ public class I2PAppContext { public boolean isRouterContext() { return false; } + + /** + * Use this to connect to the router in the same JVM. + * @return always null in I2PAppContext, the client manager if in RouterContext + * @since 0.8.3 + */ + public InternalClientManager internalClientManager() { + return null; + } } diff --git a/core/java/src/net/i2p/client/ClientWriterRunner.java b/core/java/src/net/i2p/client/ClientWriterRunner.java index 056208fb5a..f69148da3e 100644 --- a/core/java/src/net/i2p/client/ClientWriterRunner.java +++ b/core/java/src/net/i2p/client/ClientWriterRunner.java @@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageImpl; import net.i2p.data.i2cp.I2CPMessageException; +import net.i2p.internal.PoisonI2CPMessage; import net.i2p.util.I2PAppThread; /** @@ -50,7 +51,7 @@ class ClientWriterRunner implements Runnable { public void stopWriting() { _messagesToWrite.clear(); try { - _messagesToWrite.put(new PoisonMessage()); + _messagesToWrite.put(new PoisonI2CPMessage()); } catch (InterruptedException ie) {} } @@ -62,7 +63,7 @@ class ClientWriterRunner implements Runnable { } catch (InterruptedException ie) { continue; } - if (msg.getType() == PoisonMessage.MESSAGE_TYPE) + if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE) break; // only thread, we don't need synchronized try { @@ -80,18 +81,4 @@ class ClientWriterRunner implements Runnable { } _messagesToWrite.clear(); } - - /** - * End-of-stream msg used to stop the concurrent queue - * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html - * - */ - private static class PoisonMessage extends I2CPMessageImpl { - public static final int MESSAGE_TYPE = 999999; - public int getType() { - return MESSAGE_TYPE; - } - public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {} - public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; } - } } diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 72cc8b406b..639ac5e9c6 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -39,8 +39,10 @@ import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.SessionId; +import net.i2p.internal.I2CPMessageQueue; +import net.i2p.internal.InternalClientManager; +import net.i2p.internal.QueuedI2CPMessageReader; import net.i2p.util.I2PThread; -import net.i2p.util.InternalSocket; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleTimer; @@ -79,6 +81,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** where we pipe our messages */ protected /* FIXME final FIXME */OutputStream _out; + /** + * Used for internal connections to the router. + * If this is set, _socket, _writer, and _out will be null. + * @since 0.8.3 + */ + protected I2CPMessageQueue _queue; + /** who we send events to */ protected I2PSessionListener _sessionListener; @@ -285,17 +294,27 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa long startConnect = _context.clock().now(); try { - // If we are in the router JVM, connect using the interal pseudo-socket - _socket = InternalSocket.getSocket(_hostname, _portNum); - // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. - _out = _socket.getOutputStream(); - synchronized (_out) { - _out.write(I2PClient.PROTOCOL_BYTE); - _out.flush(); + // If we are in the router JVM, connect using the interal queue + if (_context.isRouterContext()) { + // _socket, _out, and _writer remain null + InternalClientManager mgr = _context.internalClientManager(); + if (mgr == null) + throw new I2PSessionException("Router is not ready for connections"); + // the following may throw an I2PSessionException + _queue = mgr.connect(); + _reader = new QueuedI2CPMessageReader(_queue, this); + } else { + _socket = new Socket(_hostname, _portNum); + // _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. + _out = _socket.getOutputStream(); + synchronized (_out) { + _out.write(I2PClient.PROTOCOL_BYTE); + _out.flush(); + } + _writer = new ClientWriterRunner(_out, this); + InputStream in = _socket.getInputStream(); + _reader = new I2CPMessageReader(in, this); } - _writer = new ClientWriterRunner(_out, this); - InputStream in = _socket.getInputStream(); - _reader = new I2CPMessageReader(in, this); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading"); _reader.startReading(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate"); @@ -567,9 +586,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * @throws I2PSessionException if the message is malformed or there is an error writing it out */ void sendMessage(I2CPMessage message) throws I2PSessionException { - if (isClosed() || _writer == null) + if (isClosed()) + throw new I2PSessionException("Already closed"); + else if (_queue != null) + _queue.offer(message); // internal + else if (_writer == null) throw new I2PSessionException("Already closed"); - _writer.addMessage(message); + else + _writer.addMessage(message); } /** @@ -581,8 +605,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa // Only log as WARN if the router went away int level; String msgpfx; - if ((error instanceof EOFException) || - (error.getMessage() != null && error.getMessage().startsWith("Pipe closed"))) { + if (error instanceof EOFException) { level = Log.WARN; msgpfx = "Router closed connection: "; } else { @@ -647,6 +670,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa _reader.stopReading(); _reader = null; } + if (_queue != null) { + // internal + _queue.close(); + } if (_writer != null) { _writer.stopWriting(); _writer = null; diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index f4bc3e812c..c2da90217c 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -19,6 +19,9 @@ import net.i2p.data.i2cp.DestLookupMessage; import net.i2p.data.i2cp.DestReplyMessage; import net.i2p.data.i2cp.GetBandwidthLimitsMessage; import net.i2p.data.i2cp.I2CPMessageReader; +import net.i2p.internal.I2CPMessageQueue; +import net.i2p.internal.InternalClientManager; +import net.i2p.internal.QueuedI2CPMessageReader; import net.i2p.util.I2PThread; import net.i2p.util.InternalSocket; @@ -72,16 +75,26 @@ class I2PSimpleSession extends I2PSessionImpl2 { notifier.start(); try { - // If we are in the router JVM, connect using the interal pseudo-socket - _socket = InternalSocket.getSocket(_hostname, _portNum); - _out = _socket.getOutputStream(); - synchronized (_out) { - _out.write(I2PClient.PROTOCOL_BYTE); - _out.flush(); + // If we are in the router JVM, connect using the interal queue + if (_context.isRouterContext()) { + // _socket, _out, and _writer remain null + InternalClientManager mgr = _context.internalClientManager(); + if (mgr == null) + throw new I2PSessionException("Router is not ready for connections"); + // the following may throw an I2PSessionException + _queue = mgr.connect(); + _reader = new QueuedI2CPMessageReader(_queue, this); + } else { + _socket = new Socket(_hostname, _portNum); + _out = _socket.getOutputStream(); + synchronized (_out) { + _out.write(I2PClient.PROTOCOL_BYTE); + _out.flush(); + } + _writer = new ClientWriterRunner(_out, this); + InputStream in = _socket.getInputStream(); + _reader = new I2CPMessageReader(in, this); } - _writer = new ClientWriterRunner(_out, this); - InputStream in = _socket.getInputStream(); - _reader = new I2CPMessageReader(in, this); _reader.startReading(); } catch (UnknownHostException uhe) { diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java index 461c4b08a4..4ac902a487 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageReader.java @@ -27,11 +27,11 @@ import net.i2p.util.Log; public class I2CPMessageReader { private final static Log _log = new Log(I2CPMessageReader.class); private InputStream _stream; - private I2CPMessageEventListener _listener; - private I2CPMessageReaderRunner _reader; - private Thread _readerThread; + protected I2CPMessageEventListener _listener; + protected I2CPMessageReaderRunner _reader; + protected Thread _readerThread; - private static volatile long __readerId = 0; + protected static volatile long __readerId = 0; public I2CPMessageReader(InputStream stream, I2CPMessageEventListener lsnr) { _stream = stream; @@ -42,6 +42,14 @@ public class I2CPMessageReader { _readerThread.setName("I2CP Reader " + (++__readerId)); } + /** + * For internal extension only. No stream. + * @since 0.8.3 + */ + protected I2CPMessageReader(I2CPMessageEventListener lsnr) { + setListener(lsnr); + } + public void setListener(I2CPMessageEventListener lsnr) { _listener = lsnr; } @@ -114,9 +122,9 @@ public class I2CPMessageReader { public void disconnected(I2CPMessageReader reader); } - private class I2CPMessageReaderRunner implements Runnable { - private volatile boolean _doRun; - private volatile boolean _stayAlive; + protected class I2CPMessageReaderRunner implements Runnable { + protected volatile boolean _doRun; + protected volatile boolean _stayAlive; public I2CPMessageReaderRunner() { _doRun = true; diff --git a/core/java/src/net/i2p/internal/I2CPMessageQueue.java b/core/java/src/net/i2p/internal/I2CPMessageQueue.java new file mode 100644 index 0000000000..93bea3a3f2 --- /dev/null +++ b/core/java/src/net/i2p/internal/I2CPMessageQueue.java @@ -0,0 +1,51 @@ +package net.i2p.internal; + +import net.i2p.data.i2cp.I2CPMessage; + +/** + * Contains the methods to talk to a router or client via I2CP, + * when both are in the same JVM. + * This interface contains methods to access two queues, + * one for transmission and one for receiving. + * The methods are identical to those in java.util.concurrent.BlockingQueue. + * + * Reading may be done in a thread using the QueuedI2CPMessageReader class. + * Non-blocking writing may be done directly with offer(). + * + * @author zzz + * @since 0.8.3 + */ +public abstract class I2CPMessageQueue { + + /** + * Send a message, nonblocking. + * @return success (false if no space available) + */ + public abstract boolean offer(I2CPMessage msg); + + /** + * Receive a message, nonblocking. + * Unused for now. + * @return message or null if none available + */ + public abstract I2CPMessage poll(); + + /** + * Send a message, blocking until space is available. + * Unused for now. + */ + public abstract void put(I2CPMessage msg) throws InterruptedException; + + /** + * Receive a message, blocking until one is available. + * @return message + */ + public abstract I2CPMessage take() throws InterruptedException; + + /** + * == offer(new PoisonI2CPMessage()); + */ + public void close() { + offer(new PoisonI2CPMessage()); + } +} diff --git a/core/java/src/net/i2p/internal/InternalClientManager.java b/core/java/src/net/i2p/internal/InternalClientManager.java new file mode 100644 index 0000000000..a923fb9f70 --- /dev/null +++ b/core/java/src/net/i2p/internal/InternalClientManager.java @@ -0,0 +1,19 @@ +package net.i2p.internal; + +import net.i2p.client.I2PSessionException; +import net.i2p.data.i2cp.I2CPMessage; + +/** + * A manager for the in-JVM I2CP message interface + * + * @author zzz + * @since 0.8.3 + */ +public interface InternalClientManager { + + /** + * Connect to the router, receiving a message queue to talk to the router with. + * @throws I2PSessionException if the router isn't ready + */ + public I2CPMessageQueue connect() throws I2PSessionException; +} diff --git a/core/java/src/net/i2p/internal/PoisonI2CPMessage.java b/core/java/src/net/i2p/internal/PoisonI2CPMessage.java new file mode 100644 index 0000000000..23b99db9ac --- /dev/null +++ b/core/java/src/net/i2p/internal/PoisonI2CPMessage.java @@ -0,0 +1,58 @@ +package net.i2p.internal; + +import java.io.InputStream; + +import net.i2p.data.i2cp.I2CPMessageException; +import net.i2p.data.i2cp.I2CPMessageImpl; + +/** + * For marking end-of-queues in a standard manner. + * Don't actually send it. + * + * @author zzz + * @since 0.8.3 + */ +public class PoisonI2CPMessage extends I2CPMessageImpl { + public final static int MESSAGE_TYPE = 999999; + + public PoisonI2CPMessage() { + super(); + } + + /** + * @deprecated don't do this + * @throws I2CPMessageException always + */ + protected void doReadMessage(InputStream in, int size) throws I2CPMessageException { + throw new I2CPMessageException("Don't do this"); + } + + /** + * @deprecated don't do this + * @throws I2CPMessageException always + */ + protected byte[] doWriteMessage() throws I2CPMessageException { + throw new I2CPMessageException("Don't do this"); + } + + public int getType() { + return MESSAGE_TYPE; + } + + /* FIXME missing hashCode() method FIXME */ + @Override + public boolean equals(Object object) { + if ((object != null) && (object instanceof PoisonI2CPMessage)) { + return true; + } + + return false; + } + + @Override + public String toString() { + StringBuilder buf = new StringBuilder(); + buf.append("[PoisonMessage]"); + return buf.toString(); + } +} diff --git a/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java new file mode 100644 index 0000000000..da128ceaa2 --- /dev/null +++ b/core/java/src/net/i2p/internal/QueuedI2CPMessageReader.java @@ -0,0 +1,55 @@ +package net.i2p.internal; + +import net.i2p.data.i2cp.I2CPMessage; +import net.i2p.data.i2cp.I2CPMessageReader; +import net.i2p.util.I2PThread; + +/** + * Get messages off an In-JVM queue, zero-copy + * + * @author zzz + * @since 0.8.3 + */ +public class QueuedI2CPMessageReader extends I2CPMessageReader { + private final I2CPMessageQueue in; + + public QueuedI2CPMessageReader(I2CPMessageQueue in, I2CPMessageEventListener lsnr) { + super(lsnr); + this.in = in; + _reader = new QueuedI2CPMessageReaderRunner(); + _readerThread = new I2PThread(_reader, "I2CP Internal Reader " + (++__readerId), true); + } + + protected class QueuedI2CPMessageReaderRunner extends I2CPMessageReaderRunner implements Runnable { + + public QueuedI2CPMessageReaderRunner() { + super(); + } + + @Override + public void run() { + while (_stayAlive) { + while (_doRun) { + // do read + I2CPMessage msg = null; + try { + msg = in.take(); + if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE) + cancelRunner(); + else + _listener.messageReceived(QueuedI2CPMessageReader.this, msg); + } catch (InterruptedException ie) {} + } + if (!_doRun) { + // pause .5 secs when we're paused + try { + Thread.sleep(500); + } catch (InterruptedException ie) { + _listener.disconnected(QueuedI2CPMessageReader.this); + cancelRunner(); + } + } + } + } + } +} diff --git a/core/java/src/net/i2p/internal/package.html b/core/java/src/net/i2p/internal/package.html new file mode 100644 index 0000000000..edac509f01 --- /dev/null +++ b/core/java/src/net/i2p/internal/package.html @@ -0,0 +1,7 @@ +<html><body> +<p> +Interface and classes for a router and client +within the same JVM to directly pass I2CP messages using Queues +instead of serialized messages over socket streams. +</p> +</body></html> diff --git a/router/java/src/net/i2p/router/RouterContext.java b/router/java/src/net/i2p/router/RouterContext.java index 3d5ed609ed..cb3c636625 100644 --- a/router/java/src/net/i2p/router/RouterContext.java +++ b/router/java/src/net/i2p/router/RouterContext.java @@ -6,6 +6,7 @@ import java.util.Properties; import net.i2p.I2PAppContext; import net.i2p.data.Hash; +import net.i2p.internal.InternalClientManager; import net.i2p.router.client.ClientManagerFacadeImpl; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.router.peermanager.Calculator; @@ -34,7 +35,7 @@ import net.i2p.util.KeyRing; */ public class RouterContext extends I2PAppContext { private Router _router; - private ClientManagerFacade _clientManagerFacade; + private ClientManagerFacadeImpl _clientManagerFacade; private ClientMessagePool _clientMessagePool; private JobQueue _jobQueue; private InNetMessagePool _inNetMessagePool; @@ -106,10 +107,12 @@ public class RouterContext extends I2PAppContext { } public void initAll() { - if ("false".equals(getProperty("i2p.dummyClientFacade", "false"))) - _clientManagerFacade = new ClientManagerFacadeImpl(this); - else - _clientManagerFacade = new DummyClientManagerFacade(this); + if (getBooleanProperty("i2p.dummyClientFacade")) + System.err.println("i2p.dummpClientFacade currently unsupported"); + _clientManagerFacade = new ClientManagerFacadeImpl(this); + // removed since it doesn't implement InternalClientManager for now + //else + // _clientManagerFacade = new DummyClientManagerFacade(this); _clientMessagePool = new ClientMessagePool(this); _jobQueue = new JobQueue(this); _inNetMessagePool = new InNetMessagePool(this); @@ -395,4 +398,13 @@ public class RouterContext extends I2PAppContext { public boolean isRouterContext() { return true; } + + /** + * Use this to connect to the router in the same JVM. + * @return the client manager + * @since 0.8.3 + */ + public InternalClientManager internalClientManager() { + return _clientManagerFacade; + } } diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index b3468e4e0b..5807aa0983 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -52,7 +52,7 @@ import net.i2p.util.SimpleTimer; */ public class ClientConnectionRunner { private Log _log; - private RouterContext _context; + protected final RouterContext _context; private ClientManager _manager; /** socket for this particular peer connection */ private Socket _socket; @@ -71,7 +71,7 @@ public class ClientConnectionRunner { /** set of messageIds created but not yet ACCEPTED */ private Set<MessageId> _acceptedPending; /** thingy that does stuff */ - private I2CPMessageReader _reader; + protected I2CPMessageReader _reader; /** just for this destination */ private SessionKeyManager _sessionKeyManager; /** @@ -469,18 +469,8 @@ public class ClientConnectionRunner { _log.warn("Error sending I2CP message - client went away", eofe); stopRunning(); } catch (IOException ioe) { - // only warn if client went away - int level; - String emsg; - if (ioe.getMessage() != null && ioe.getMessage().startsWith("Pipe closed")) { - level = Log.WARN; - emsg = "Error sending I2CP message - client went away"; - } else { - level = Log.ERROR; - emsg = "IO Error sending I2CP message to client"; - } - if (_log.shouldLog(level)) - _log.log(level, emsg, ioe); + if (_log.shouldLog(Log.ERROR)) + _log.error("IO Error sending I2CP message to client", ioe); stopRunning(); } catch (Throwable t) { _log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t); diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index 7d866ab0b6..948f895bbe 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -15,7 +15,9 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import net.i2p.client.I2PSessionException; import net.i2p.crypto.SessionKeyManager; import net.i2p.data.DataHelper; import net.i2p.data.Destination; @@ -23,8 +25,10 @@ import net.i2p.data.Hash; import net.i2p.data.LeaseSet; import net.i2p.data.Payload; import net.i2p.data.TunnelId; +import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.SessionConfig; +import net.i2p.internal.I2CPMessageQueue; import net.i2p.router.ClientManagerFacade; import net.i2p.router.ClientMessage; import net.i2p.router.Job; @@ -42,7 +46,6 @@ import net.i2p.util.Log; public class ClientManager { private Log _log; private ClientListenerRunner _listener; - private ClientListenerRunner _internalListener; private final HashMap<Destination, ClientConnectionRunner> _runners; // Destination --> ClientConnectionRunner private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet private RouterContext _ctx; @@ -69,11 +72,6 @@ public class ClientManager { t.setName("ClientListener:" + port); t.setDaemon(true); t.start(); - _internalListener = new InternalClientListenerRunner(_ctx, this, port); - t = new I2PThread(_internalListener); - t.setName("ClientListener:" + port + "-i"); - t.setDaemon(true); - t.start(); } public void restart() { @@ -97,7 +95,6 @@ public class ClientManager { public void shutdown() { _log.info("Shutting down the ClientManager"); _listener.stopListening(); - _internalListener.stopListening(); Set<ClientConnectionRunner> runners = new HashSet(); synchronized (_runners) { for (Iterator<ClientConnectionRunner> iter = _runners.values().iterator(); iter.hasNext();) { @@ -117,6 +114,23 @@ public class ClientManager { } } + /** + * The InternalClientManager interface. + * Connects to the router, receiving a message queue to talk to the router with. + * Might throw I2PSessionException if the router isn't ready, someday. + * @since 0.8.3 + */ + public I2CPMessageQueue internalConnect() { + // for now we make these unlimited size + LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue(); + LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue(); + I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out); + I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in); + ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue); + registerConnection(runner); + return hisQueue; + } + public boolean isAlive() { return _listener.isListening(); } public void registerConnection(ClientConnectionRunner runner) { diff --git a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java index 066d6cc354..5fd0bbc28b 100644 --- a/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java +++ b/router/java/src/net/i2p/router/client/ClientManagerFacadeImpl.java @@ -14,6 +14,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Set; +import net.i2p.client.I2PSessionException; import net.i2p.crypto.SessionKeyManager; import net.i2p.data.DataHelper; import net.i2p.data.Destination; @@ -21,6 +22,8 @@ import net.i2p.data.Hash; import net.i2p.data.LeaseSet; import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.SessionConfig; +import net.i2p.internal.I2CPMessageQueue; +import net.i2p.internal.InternalClientManager; import net.i2p.router.ClientManagerFacade; import net.i2p.router.ClientMessage; import net.i2p.router.Job; @@ -32,7 +35,7 @@ import net.i2p.util.Log; * * @author jrandom */ -public class ClientManagerFacadeImpl extends ClientManagerFacade { +public class ClientManagerFacadeImpl extends ClientManagerFacade implements InternalClientManager { private final static Log _log = new Log(ClientManagerFacadeImpl.class); private ClientManager _manager; private RouterContext _context; @@ -220,4 +223,16 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade { else return Collections.EMPTY_SET; } + + /** + * The InternalClientManager interface. + * Connect to the router, receiving a message queue to talk to the router with. + * @throws I2PSessionException if the router isn't ready + * @since 0.8.3 + */ + public I2CPMessageQueue connect() throws I2PSessionException { + if (_manager != null) + return _manager.internalConnect(); + throw new I2PSessionException("No manager yet"); + } } diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java index 49fcddcc20..b93a4e5f44 100644 --- a/router/java/src/net/i2p/router/client/ClientWriterRunner.java +++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java @@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageImpl; import net.i2p.data.i2cp.I2CPMessageException; +import net.i2p.internal.PoisonI2CPMessage; import net.i2p.router.RouterContext; import net.i2p.util.Log; @@ -52,7 +53,7 @@ class ClientWriterRunner implements Runnable { public void stopWriting() { _messagesToWrite.clear(); try { - _messagesToWrite.put(new PoisonMessage()); + _messagesToWrite.put(new PoisonI2CPMessage()); } catch (InterruptedException ie) {} } @@ -64,23 +65,9 @@ class ClientWriterRunner implements Runnable { } catch (InterruptedException ie) { continue; } - if (msg.getType() == PoisonMessage.MESSAGE_TYPE) + if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE) break; _runner.writeMessage(msg); } } - - /** - * End-of-stream msg used to stop the concurrent queue - * See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html - * - */ - private static class PoisonMessage extends I2CPMessageImpl { - public static final int MESSAGE_TYPE = 999999; - public int getType() { - return MESSAGE_TYPE; - } - public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {} - public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; } - } } diff --git a/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java new file mode 100644 index 0000000000..f65b061766 --- /dev/null +++ b/router/java/src/net/i2p/router/client/I2CPMessageQueueImpl.java @@ -0,0 +1,57 @@ +package net.i2p.router.client; + +import java.util.concurrent.BlockingQueue; + +import net.i2p.data.i2cp.I2CPMessage; +import net.i2p.internal.I2CPMessageQueue; + +/** + * Contains the methods to talk to a router or client via I2CP, + * when both are in the same JVM. + * This interface contains methods to access two queues, + * one for transmission and one for receiving. + * The methods are identical to those in java.util.concurrent.BlockingQueue + * + * @author zzz + * @since 0.8.3 + */ +class I2CPMessageQueueImpl extends I2CPMessageQueue { + private final BlockingQueue<I2CPMessage> _in; + private final BlockingQueue<I2CPMessage> _out; + + public I2CPMessageQueueImpl(BlockingQueue<I2CPMessage> in, BlockingQueue<I2CPMessage> out) { + _in = in; + _out = out; + } + + /** + * Send a message, nonblocking + * @return success (false if no space available) + */ + public boolean offer(I2CPMessage msg) { + return _out.offer(msg); + } + + /** + * Receive a message, nonblocking + * @return message or null if none available + */ + public I2CPMessage poll() { + return _in.poll(); + } + + /** + * Send a message, blocking until space is available + */ + public void put(I2CPMessage msg) throws InterruptedException { + _out.put(msg); + } + + /** + * Receive a message, blocking until one is available + * @return message + */ + public I2CPMessage take() throws InterruptedException { + return _in.take(); + } +} diff --git a/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java b/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java deleted file mode 100644 index 995c69400f..0000000000 --- a/router/java/src/net/i2p/router/client/InternalClientListenerRunner.java +++ /dev/null @@ -1,89 +0,0 @@ -package net.i2p.router.client; -/* - * free (adj.): unencumbered; not under the control of others - * Written by jrandom in 2003 and released into the public domain - * with no warranty of any kind, either expressed or implied. - * It probably won't make your computer catch on fire, or eat - * your children, but it might. Use at your own risk. - * - */ - -import java.io.IOException; -import java.net.Socket; - -import net.i2p.router.RouterContext; -import net.i2p.util.Log; -import net.i2p.util.InternalServerSocket; - -/** - * Listen for in-JVM connections on the internal "socket" - * - * @author zzz - * @since 0.7.9 - */ -public class InternalClientListenerRunner extends ClientListenerRunner { - - public InternalClientListenerRunner(RouterContext context, ClientManager manager, int port) { - super(context, manager, port); - _log = _context.logManager().getLog(InternalClientListenerRunner.class); - } - - /** - * Start up the socket listener, listens for connections, and - * fires those connections off via {@link #runConnection runConnection}. - * This only returns if the socket cannot be opened or there is a catastrophic - * failure. - * - */ - public void runServer() { - try { - if (_log.shouldLog(Log.INFO)) - _log.info("Listening on internal port " + _port); - _socket = new InternalServerSocket(_port); - - if (_log.shouldLog(Log.DEBUG)) - _log.debug("InternalServerSocket created, before accept: " + _socket); - - _listening = true; - _running = true; - while (_running) { - try { - Socket socket = _socket.accept(); - if (validate(socket)) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Internal connection received"); - runConnection(socket); - } else { - if (_log.shouldLog(Log.WARN)) - _log.warn("Refused connection from " + socket.getInetAddress()); - try { - socket.close(); - } catch (IOException ioe) {} - } - } catch (IOException ioe) { - if (_context.router().isAlive()) - _log.error("Server error accepting", ioe); - } catch (Throwable t) { - if (_context.router().isAlive()) - _log.error("Fatal error running client listener - killing the thread!", t); - _listening = false; - return; - } - } - } catch (IOException ioe) { - if (_context.router().isAlive()) - _log.error("Error listening on internal port " + _port, ioe); - } - - _listening = false; - if (_socket != null) { - try { _socket.close(); } catch (IOException ioe) {} - _socket = null; - } - - - if (_context.router().isAlive()) - _log.error("CANCELING I2CP LISTEN", new Exception("I2CP Listen cancelled!!!")); - _running = false; - } -} diff --git a/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java new file mode 100644 index 0000000000..ea2cc3d30b --- /dev/null +++ b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java @@ -0,0 +1,76 @@ +package net.i2p.router.client; + +import java.io.IOException; + +import net.i2p.data.i2cp.I2CPMessage; +import net.i2p.data.i2cp.I2CPMessageException; +import net.i2p.internal.I2CPMessageQueue; +import net.i2p.internal.QueuedI2CPMessageReader; +import net.i2p.router.RouterContext; +import net.i2p.util.Log; + +/** + * Zero-copy in-JVM. + * While super() starts both a reader and a writer thread, we only need a reader thread here. + * + * @author zzz + * @since 0.8.3 + */ +public class QueuedClientConnectionRunner extends ClientConnectionRunner { + private final I2CPMessageQueue queue; + + /** + * Create a new runner with the given queues + * + */ + public QueuedClientConnectionRunner(RouterContext context, ClientManager manager, I2CPMessageQueue queue) { + super(context, manager, null); + this.queue = queue; + } + + + + /** + * Starts the reader thread. Does not call super(). + */ + @Override + public void startRunning() { + _reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this)); + _reader.startReading(); + } + + /** + * Calls super() to stop the reader, and sends a poison message to the client. + */ + @Override + void stopRunning() { + super.stopRunning(); + queue.close(); + } + + /** + * In super(), doSend queues it to the writer thread and + * the writer thread calls writeMessage() to write to the output stream. + * Since we have no writer thread this shouldn't happen. + */ + @Override + void writeMessage(I2CPMessage msg) { + throw new RuntimeException("huh?"); + } + + /** + * Actually send the I2CPMessage to the client. + * Nonblocking. + */ + @Override + void doSend(I2CPMessage msg) throws I2CPMessageException { + // This will never fail, for now, as the router uses unbounded queues + // Perhaps in the future we may want to use bounded queues, + // with non-blocking writes for the router + // and blocking writes for the client? + boolean success = queue.offer(msg); + if (!success) + throw new I2CPMessageException("I2CP write to queue failed"); + } + +} -- GitLab