I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 19c6760e authored by zzz's avatar zzz
Browse files

* I2CP: Change from the internal pseudo-socket that was

      implemented in 0.7.9 to an internal Queue that directly
      passes I2CPMessage objects. For in-JVM clients,
      this eliminates two writer threads per client and
      avoids the serialization/deserialization of I2CP messages.
parent 293eea9e
No related branches found
No related tags found
No related merge requests found
Showing
with 476 additions and 179 deletions
...@@ -22,6 +22,7 @@ import net.i2p.crypto.SHA256Generator; ...@@ -22,6 +22,7 @@ import net.i2p.crypto.SHA256Generator;
import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.SessionKeyManager;
import net.i2p.crypto.TransientSessionKeyManager; import net.i2p.crypto.TransientSessionKeyManager;
import net.i2p.data.RoutingKeyGenerator; import net.i2p.data.RoutingKeyGenerator;
import net.i2p.internal.InternalClientManager;
import net.i2p.stat.StatManager; import net.i2p.stat.StatManager;
import net.i2p.util.Clock; import net.i2p.util.Clock;
import net.i2p.util.ConcurrentHashSet; import net.i2p.util.ConcurrentHashSet;
...@@ -843,4 +844,13 @@ public class I2PAppContext { ...@@ -843,4 +844,13 @@ public class I2PAppContext {
public boolean isRouterContext() { public boolean isRouterContext() {
return false; return false;
} }
/**
* Use this to connect to the router in the same JVM.
* @return always null in I2PAppContext, the client manager if in RouterContext
* @since 0.8.3
*/
public InternalClientManager internalClientManager() {
return null;
}
} }
...@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -9,6 +9,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageImpl; import net.i2p.data.i2cp.I2CPMessageImpl;
import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.internal.PoisonI2CPMessage;
import net.i2p.util.I2PAppThread; import net.i2p.util.I2PAppThread;
/** /**
...@@ -50,7 +51,7 @@ class ClientWriterRunner implements Runnable { ...@@ -50,7 +51,7 @@ class ClientWriterRunner implements Runnable {
public void stopWriting() { public void stopWriting() {
_messagesToWrite.clear(); _messagesToWrite.clear();
try { try {
_messagesToWrite.put(new PoisonMessage()); _messagesToWrite.put(new PoisonI2CPMessage());
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
...@@ -62,7 +63,7 @@ class ClientWriterRunner implements Runnable { ...@@ -62,7 +63,7 @@ class ClientWriterRunner implements Runnable {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
continue; continue;
} }
if (msg.getType() == PoisonMessage.MESSAGE_TYPE) if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
break; break;
// only thread, we don't need synchronized // only thread, we don't need synchronized
try { try {
...@@ -80,18 +81,4 @@ class ClientWriterRunner implements Runnable { ...@@ -80,18 +81,4 @@ class ClientWriterRunner implements Runnable {
} }
_messagesToWrite.clear(); _messagesToWrite.clear();
} }
/**
* End-of-stream msg used to stop the concurrent queue
* See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
*
*/
private static class PoisonMessage extends I2CPMessageImpl {
public static final int MESSAGE_TYPE = 999999;
public int getType() {
return MESSAGE_TYPE;
}
public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
}
} }
...@@ -39,8 +39,10 @@ import net.i2p.data.i2cp.I2CPMessageException; ...@@ -39,8 +39,10 @@ import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessagePayloadMessage;
import net.i2p.data.i2cp.SessionId; import net.i2p.data.i2cp.SessionId;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.InternalClientManager;
import net.i2p.internal.QueuedI2CPMessageReader;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.InternalSocket;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleScheduler; import net.i2p.util.SimpleScheduler;
import net.i2p.util.SimpleTimer; import net.i2p.util.SimpleTimer;
...@@ -79,6 +81,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -79,6 +81,13 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
/** where we pipe our messages */ /** where we pipe our messages */
protected /* FIXME final FIXME */OutputStream _out; protected /* FIXME final FIXME */OutputStream _out;
/**
* Used for internal connections to the router.
* If this is set, _socket, _writer, and _out will be null.
* @since 0.8.3
*/
protected I2CPMessageQueue _queue;
/** who we send events to */ /** who we send events to */
protected I2PSessionListener _sessionListener; protected I2PSessionListener _sessionListener;
...@@ -285,17 +294,27 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -285,17 +294,27 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
long startConnect = _context.clock().now(); long startConnect = _context.clock().now();
try { try {
// If we are in the router JVM, connect using the interal pseudo-socket // If we are in the router JVM, connect using the interal queue
_socket = InternalSocket.getSocket(_hostname, _portNum); if (_context.isRouterContext()) {
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it. // _socket, _out, and _writer remain null
_out = _socket.getOutputStream(); InternalClientManager mgr = _context.internalClientManager();
synchronized (_out) { if (mgr == null)
_out.write(I2PClient.PROTOCOL_BYTE); throw new I2PSessionException("Router is not ready for connections");
_out.flush(); // the following may throw an I2PSessionException
_queue = mgr.connect();
_reader = new QueuedI2CPMessageReader(_queue, this);
} else {
_socket = new Socket(_hostname, _portNum);
// _socket.setSoTimeout(1000000); // Uhmmm we could really-really use a real timeout, and handle it.
_out = _socket.getOutputStream();
synchronized (_out) {
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
}
_writer = new ClientWriterRunner(_out, this);
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);
} }
_writer = new ClientWriterRunner(_out, this);
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading"); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "before startReading");
_reader.startReading(); _reader.startReading();
if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate"); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Before getDate");
...@@ -567,9 +586,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -567,9 +586,14 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
* @throws I2PSessionException if the message is malformed or there is an error writing it out * @throws I2PSessionException if the message is malformed or there is an error writing it out
*/ */
void sendMessage(I2CPMessage message) throws I2PSessionException { void sendMessage(I2CPMessage message) throws I2PSessionException {
if (isClosed() || _writer == null) if (isClosed())
throw new I2PSessionException("Already closed");
else if (_queue != null)
_queue.offer(message); // internal
else if (_writer == null)
throw new I2PSessionException("Already closed"); throw new I2PSessionException("Already closed");
_writer.addMessage(message); else
_writer.addMessage(message);
} }
/** /**
...@@ -581,8 +605,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -581,8 +605,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
// Only log as WARN if the router went away // Only log as WARN if the router went away
int level; int level;
String msgpfx; String msgpfx;
if ((error instanceof EOFException) || if (error instanceof EOFException) {
(error.getMessage() != null && error.getMessage().startsWith("Pipe closed"))) {
level = Log.WARN; level = Log.WARN;
msgpfx = "Router closed connection: "; msgpfx = "Router closed connection: ";
} else { } else {
...@@ -647,6 +670,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa ...@@ -647,6 +670,10 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa
_reader.stopReading(); _reader.stopReading();
_reader = null; _reader = null;
} }
if (_queue != null) {
// internal
_queue.close();
}
if (_writer != null) { if (_writer != null) {
_writer.stopWriting(); _writer.stopWriting();
_writer = null; _writer = null;
......
...@@ -19,6 +19,9 @@ import net.i2p.data.i2cp.DestLookupMessage; ...@@ -19,6 +19,9 @@ import net.i2p.data.i2cp.DestLookupMessage;
import net.i2p.data.i2cp.DestReplyMessage; import net.i2p.data.i2cp.DestReplyMessage;
import net.i2p.data.i2cp.GetBandwidthLimitsMessage; import net.i2p.data.i2cp.GetBandwidthLimitsMessage;
import net.i2p.data.i2cp.I2CPMessageReader; import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.InternalClientManager;
import net.i2p.internal.QueuedI2CPMessageReader;
import net.i2p.util.I2PThread; import net.i2p.util.I2PThread;
import net.i2p.util.InternalSocket; import net.i2p.util.InternalSocket;
...@@ -72,16 +75,26 @@ class I2PSimpleSession extends I2PSessionImpl2 { ...@@ -72,16 +75,26 @@ class I2PSimpleSession extends I2PSessionImpl2 {
notifier.start(); notifier.start();
try { try {
// If we are in the router JVM, connect using the interal pseudo-socket // If we are in the router JVM, connect using the interal queue
_socket = InternalSocket.getSocket(_hostname, _portNum); if (_context.isRouterContext()) {
_out = _socket.getOutputStream(); // _socket, _out, and _writer remain null
synchronized (_out) { InternalClientManager mgr = _context.internalClientManager();
_out.write(I2PClient.PROTOCOL_BYTE); if (mgr == null)
_out.flush(); throw new I2PSessionException("Router is not ready for connections");
// the following may throw an I2PSessionException
_queue = mgr.connect();
_reader = new QueuedI2CPMessageReader(_queue, this);
} else {
_socket = new Socket(_hostname, _portNum);
_out = _socket.getOutputStream();
synchronized (_out) {
_out.write(I2PClient.PROTOCOL_BYTE);
_out.flush();
}
_writer = new ClientWriterRunner(_out, this);
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);
} }
_writer = new ClientWriterRunner(_out, this);
InputStream in = _socket.getInputStream();
_reader = new I2CPMessageReader(in, this);
_reader.startReading(); _reader.startReading();
} catch (UnknownHostException uhe) { } catch (UnknownHostException uhe) {
......
...@@ -27,11 +27,11 @@ import net.i2p.util.Log; ...@@ -27,11 +27,11 @@ import net.i2p.util.Log;
public class I2CPMessageReader { public class I2CPMessageReader {
private final static Log _log = new Log(I2CPMessageReader.class); private final static Log _log = new Log(I2CPMessageReader.class);
private InputStream _stream; private InputStream _stream;
private I2CPMessageEventListener _listener; protected I2CPMessageEventListener _listener;
private I2CPMessageReaderRunner _reader; protected I2CPMessageReaderRunner _reader;
private Thread _readerThread; protected Thread _readerThread;
private static volatile long __readerId = 0; protected static volatile long __readerId = 0;
public I2CPMessageReader(InputStream stream, I2CPMessageEventListener lsnr) { public I2CPMessageReader(InputStream stream, I2CPMessageEventListener lsnr) {
_stream = stream; _stream = stream;
...@@ -42,6 +42,14 @@ public class I2CPMessageReader { ...@@ -42,6 +42,14 @@ public class I2CPMessageReader {
_readerThread.setName("I2CP Reader " + (++__readerId)); _readerThread.setName("I2CP Reader " + (++__readerId));
} }
/**
* For internal extension only. No stream.
* @since 0.8.3
*/
protected I2CPMessageReader(I2CPMessageEventListener lsnr) {
setListener(lsnr);
}
public void setListener(I2CPMessageEventListener lsnr) { public void setListener(I2CPMessageEventListener lsnr) {
_listener = lsnr; _listener = lsnr;
} }
...@@ -114,9 +122,9 @@ public class I2CPMessageReader { ...@@ -114,9 +122,9 @@ public class I2CPMessageReader {
public void disconnected(I2CPMessageReader reader); public void disconnected(I2CPMessageReader reader);
} }
private class I2CPMessageReaderRunner implements Runnable { protected class I2CPMessageReaderRunner implements Runnable {
private volatile boolean _doRun; protected volatile boolean _doRun;
private volatile boolean _stayAlive; protected volatile boolean _stayAlive;
public I2CPMessageReaderRunner() { public I2CPMessageReaderRunner() {
_doRun = true; _doRun = true;
......
package net.i2p.internal;
import net.i2p.data.i2cp.I2CPMessage;
/**
* Contains the methods to talk to a router or client via I2CP,
* when both are in the same JVM.
* This interface contains methods to access two queues,
* one for transmission and one for receiving.
* The methods are identical to those in java.util.concurrent.BlockingQueue.
*
* Reading may be done in a thread using the QueuedI2CPMessageReader class.
* Non-blocking writing may be done directly with offer().
*
* @author zzz
* @since 0.8.3
*/
public abstract class I2CPMessageQueue {
/**
* Send a message, nonblocking.
* @return success (false if no space available)
*/
public abstract boolean offer(I2CPMessage msg);
/**
* Receive a message, nonblocking.
* Unused for now.
* @return message or null if none available
*/
public abstract I2CPMessage poll();
/**
* Send a message, blocking until space is available.
* Unused for now.
*/
public abstract void put(I2CPMessage msg) throws InterruptedException;
/**
* Receive a message, blocking until one is available.
* @return message
*/
public abstract I2CPMessage take() throws InterruptedException;
/**
* == offer(new PoisonI2CPMessage());
*/
public void close() {
offer(new PoisonI2CPMessage());
}
}
package net.i2p.internal;
import net.i2p.client.I2PSessionException;
import net.i2p.data.i2cp.I2CPMessage;
/**
* A manager for the in-JVM I2CP message interface
*
* @author zzz
* @since 0.8.3
*/
public interface InternalClientManager {
/**
* Connect to the router, receiving a message queue to talk to the router with.
* @throws I2PSessionException if the router isn't ready
*/
public I2CPMessageQueue connect() throws I2PSessionException;
}
package net.i2p.internal;
import java.io.InputStream;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.data.i2cp.I2CPMessageImpl;
/**
* For marking end-of-queues in a standard manner.
* Don't actually send it.
*
* @author zzz
* @since 0.8.3
*/
public class PoisonI2CPMessage extends I2CPMessageImpl {
public final static int MESSAGE_TYPE = 999999;
public PoisonI2CPMessage() {
super();
}
/**
* @deprecated don't do this
* @throws I2CPMessageException always
*/
protected void doReadMessage(InputStream in, int size) throws I2CPMessageException {
throw new I2CPMessageException("Don't do this");
}
/**
* @deprecated don't do this
* @throws I2CPMessageException always
*/
protected byte[] doWriteMessage() throws I2CPMessageException {
throw new I2CPMessageException("Don't do this");
}
public int getType() {
return MESSAGE_TYPE;
}
/* FIXME missing hashCode() method FIXME */
@Override
public boolean equals(Object object) {
if ((object != null) && (object instanceof PoisonI2CPMessage)) {
return true;
}
return false;
}
@Override
public String toString() {
StringBuilder buf = new StringBuilder();
buf.append("[PoisonMessage]");
return buf.toString();
}
}
package net.i2p.internal;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageReader;
import net.i2p.util.I2PThread;
/**
* Get messages off an In-JVM queue, zero-copy
*
* @author zzz
* @since 0.8.3
*/
public class QueuedI2CPMessageReader extends I2CPMessageReader {
private final I2CPMessageQueue in;
public QueuedI2CPMessageReader(I2CPMessageQueue in, I2CPMessageEventListener lsnr) {
super(lsnr);
this.in = in;
_reader = new QueuedI2CPMessageReaderRunner();
_readerThread = new I2PThread(_reader, "I2CP Internal Reader " + (++__readerId), true);
}
protected class QueuedI2CPMessageReaderRunner extends I2CPMessageReaderRunner implements Runnable {
public QueuedI2CPMessageReaderRunner() {
super();
}
@Override
public void run() {
while (_stayAlive) {
while (_doRun) {
// do read
I2CPMessage msg = null;
try {
msg = in.take();
if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
cancelRunner();
else
_listener.messageReceived(QueuedI2CPMessageReader.this, msg);
} catch (InterruptedException ie) {}
}
if (!_doRun) {
// pause .5 secs when we're paused
try {
Thread.sleep(500);
} catch (InterruptedException ie) {
_listener.disconnected(QueuedI2CPMessageReader.this);
cancelRunner();
}
}
}
}
}
}
<html><body>
<p>
Interface and classes for a router and client
within the same JVM to directly pass I2CP messages using Queues
instead of serialized messages over socket streams.
</p>
</body></html>
...@@ -6,6 +6,7 @@ import java.util.Properties; ...@@ -6,6 +6,7 @@ import java.util.Properties;
import net.i2p.I2PAppContext; import net.i2p.I2PAppContext;
import net.i2p.data.Hash; import net.i2p.data.Hash;
import net.i2p.internal.InternalClientManager;
import net.i2p.router.client.ClientManagerFacadeImpl; import net.i2p.router.client.ClientManagerFacadeImpl;
import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade; import net.i2p.router.networkdb.kademlia.FloodfillNetworkDatabaseFacade;
import net.i2p.router.peermanager.Calculator; import net.i2p.router.peermanager.Calculator;
...@@ -34,7 +35,7 @@ import net.i2p.util.KeyRing; ...@@ -34,7 +35,7 @@ import net.i2p.util.KeyRing;
*/ */
public class RouterContext extends I2PAppContext { public class RouterContext extends I2PAppContext {
private Router _router; private Router _router;
private ClientManagerFacade _clientManagerFacade; private ClientManagerFacadeImpl _clientManagerFacade;
private ClientMessagePool _clientMessagePool; private ClientMessagePool _clientMessagePool;
private JobQueue _jobQueue; private JobQueue _jobQueue;
private InNetMessagePool _inNetMessagePool; private InNetMessagePool _inNetMessagePool;
...@@ -106,10 +107,12 @@ public class RouterContext extends I2PAppContext { ...@@ -106,10 +107,12 @@ public class RouterContext extends I2PAppContext {
} }
public void initAll() { public void initAll() {
if ("false".equals(getProperty("i2p.dummyClientFacade", "false"))) if (getBooleanProperty("i2p.dummyClientFacade"))
_clientManagerFacade = new ClientManagerFacadeImpl(this); System.err.println("i2p.dummpClientFacade currently unsupported");
else _clientManagerFacade = new ClientManagerFacadeImpl(this);
_clientManagerFacade = new DummyClientManagerFacade(this); // removed since it doesn't implement InternalClientManager for now
//else
// _clientManagerFacade = new DummyClientManagerFacade(this);
_clientMessagePool = new ClientMessagePool(this); _clientMessagePool = new ClientMessagePool(this);
_jobQueue = new JobQueue(this); _jobQueue = new JobQueue(this);
_inNetMessagePool = new InNetMessagePool(this); _inNetMessagePool = new InNetMessagePool(this);
...@@ -395,4 +398,13 @@ public class RouterContext extends I2PAppContext { ...@@ -395,4 +398,13 @@ public class RouterContext extends I2PAppContext {
public boolean isRouterContext() { public boolean isRouterContext() {
return true; return true;
} }
/**
* Use this to connect to the router in the same JVM.
* @return the client manager
* @since 0.8.3
*/
public InternalClientManager internalClientManager() {
return _clientManagerFacade;
}
} }
...@@ -52,7 +52,7 @@ import net.i2p.util.SimpleTimer; ...@@ -52,7 +52,7 @@ import net.i2p.util.SimpleTimer;
*/ */
public class ClientConnectionRunner { public class ClientConnectionRunner {
private Log _log; private Log _log;
private RouterContext _context; protected final RouterContext _context;
private ClientManager _manager; private ClientManager _manager;
/** socket for this particular peer connection */ /** socket for this particular peer connection */
private Socket _socket; private Socket _socket;
...@@ -71,7 +71,7 @@ public class ClientConnectionRunner { ...@@ -71,7 +71,7 @@ public class ClientConnectionRunner {
/** set of messageIds created but not yet ACCEPTED */ /** set of messageIds created but not yet ACCEPTED */
private Set<MessageId> _acceptedPending; private Set<MessageId> _acceptedPending;
/** thingy that does stuff */ /** thingy that does stuff */
private I2CPMessageReader _reader; protected I2CPMessageReader _reader;
/** just for this destination */ /** just for this destination */
private SessionKeyManager _sessionKeyManager; private SessionKeyManager _sessionKeyManager;
/** /**
...@@ -469,18 +469,8 @@ public class ClientConnectionRunner { ...@@ -469,18 +469,8 @@ public class ClientConnectionRunner {
_log.warn("Error sending I2CP message - client went away", eofe); _log.warn("Error sending I2CP message - client went away", eofe);
stopRunning(); stopRunning();
} catch (IOException ioe) { } catch (IOException ioe) {
// only warn if client went away if (_log.shouldLog(Log.ERROR))
int level; _log.error("IO Error sending I2CP message to client", ioe);
String emsg;
if (ioe.getMessage() != null && ioe.getMessage().startsWith("Pipe closed")) {
level = Log.WARN;
emsg = "Error sending I2CP message - client went away";
} else {
level = Log.ERROR;
emsg = "IO Error sending I2CP message to client";
}
if (_log.shouldLog(level))
_log.log(level, emsg, ioe);
stopRunning(); stopRunning();
} catch (Throwable t) { } catch (Throwable t) {
_log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t); _log.log(Log.CRIT, "Unhandled exception sending I2CP message to client", t);
......
...@@ -15,7 +15,9 @@ import java.util.HashSet; ...@@ -15,7 +15,9 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.client.I2PSessionException;
import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.SessionKeyManager;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
...@@ -23,8 +25,10 @@ import net.i2p.data.Hash; ...@@ -23,8 +25,10 @@ import net.i2p.data.Hash;
import net.i2p.data.LeaseSet; import net.i2p.data.LeaseSet;
import net.i2p.data.Payload; import net.i2p.data.Payload;
import net.i2p.data.TunnelId; import net.i2p.data.TunnelId;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.SessionConfig; import net.i2p.data.i2cp.SessionConfig;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.router.ClientManagerFacade; import net.i2p.router.ClientManagerFacade;
import net.i2p.router.ClientMessage; import net.i2p.router.ClientMessage;
import net.i2p.router.Job; import net.i2p.router.Job;
...@@ -42,7 +46,6 @@ import net.i2p.util.Log; ...@@ -42,7 +46,6 @@ import net.i2p.util.Log;
public class ClientManager { public class ClientManager {
private Log _log; private Log _log;
private ClientListenerRunner _listener; private ClientListenerRunner _listener;
private ClientListenerRunner _internalListener;
private final HashMap<Destination, ClientConnectionRunner> _runners; // Destination --> ClientConnectionRunner private final HashMap<Destination, ClientConnectionRunner> _runners; // Destination --> ClientConnectionRunner
private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet private final Set<ClientConnectionRunner> _pendingRunners; // ClientConnectionRunner for clients w/out a Dest yet
private RouterContext _ctx; private RouterContext _ctx;
...@@ -69,11 +72,6 @@ public class ClientManager { ...@@ -69,11 +72,6 @@ public class ClientManager {
t.setName("ClientListener:" + port); t.setName("ClientListener:" + port);
t.setDaemon(true); t.setDaemon(true);
t.start(); t.start();
_internalListener = new InternalClientListenerRunner(_ctx, this, port);
t = new I2PThread(_internalListener);
t.setName("ClientListener:" + port + "-i");
t.setDaemon(true);
t.start();
} }
public void restart() { public void restart() {
...@@ -97,7 +95,6 @@ public class ClientManager { ...@@ -97,7 +95,6 @@ public class ClientManager {
public void shutdown() { public void shutdown() {
_log.info("Shutting down the ClientManager"); _log.info("Shutting down the ClientManager");
_listener.stopListening(); _listener.stopListening();
_internalListener.stopListening();
Set<ClientConnectionRunner> runners = new HashSet(); Set<ClientConnectionRunner> runners = new HashSet();
synchronized (_runners) { synchronized (_runners) {
for (Iterator<ClientConnectionRunner> iter = _runners.values().iterator(); iter.hasNext();) { for (Iterator<ClientConnectionRunner> iter = _runners.values().iterator(); iter.hasNext();) {
...@@ -117,6 +114,23 @@ public class ClientManager { ...@@ -117,6 +114,23 @@ public class ClientManager {
} }
} }
/**
* The InternalClientManager interface.
* Connects to the router, receiving a message queue to talk to the router with.
* Might throw I2PSessionException if the router isn't ready, someday.
* @since 0.8.3
*/
public I2CPMessageQueue internalConnect() {
// for now we make these unlimited size
LinkedBlockingQueue<I2CPMessage> in = new LinkedBlockingQueue();
LinkedBlockingQueue<I2CPMessage> out = new LinkedBlockingQueue();
I2CPMessageQueue myQueue = new I2CPMessageQueueImpl(in, out);
I2CPMessageQueue hisQueue = new I2CPMessageQueueImpl(out, in);
ClientConnectionRunner runner = new QueuedClientConnectionRunner(_ctx, this, myQueue);
registerConnection(runner);
return hisQueue;
}
public boolean isAlive() { return _listener.isListening(); } public boolean isAlive() { return _listener.isListening(); }
public void registerConnection(ClientConnectionRunner runner) { public void registerConnection(ClientConnectionRunner runner) {
......
...@@ -14,6 +14,7 @@ import java.util.Collections; ...@@ -14,6 +14,7 @@ import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import net.i2p.client.I2PSessionException;
import net.i2p.crypto.SessionKeyManager; import net.i2p.crypto.SessionKeyManager;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
...@@ -21,6 +22,8 @@ import net.i2p.data.Hash; ...@@ -21,6 +22,8 @@ import net.i2p.data.Hash;
import net.i2p.data.LeaseSet; import net.i2p.data.LeaseSet;
import net.i2p.data.i2cp.MessageId; import net.i2p.data.i2cp.MessageId;
import net.i2p.data.i2cp.SessionConfig; import net.i2p.data.i2cp.SessionConfig;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.InternalClientManager;
import net.i2p.router.ClientManagerFacade; import net.i2p.router.ClientManagerFacade;
import net.i2p.router.ClientMessage; import net.i2p.router.ClientMessage;
import net.i2p.router.Job; import net.i2p.router.Job;
...@@ -32,7 +35,7 @@ import net.i2p.util.Log; ...@@ -32,7 +35,7 @@ import net.i2p.util.Log;
* *
* @author jrandom * @author jrandom
*/ */
public class ClientManagerFacadeImpl extends ClientManagerFacade { public class ClientManagerFacadeImpl extends ClientManagerFacade implements InternalClientManager {
private final static Log _log = new Log(ClientManagerFacadeImpl.class); private final static Log _log = new Log(ClientManagerFacadeImpl.class);
private ClientManager _manager; private ClientManager _manager;
private RouterContext _context; private RouterContext _context;
...@@ -220,4 +223,16 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade { ...@@ -220,4 +223,16 @@ public class ClientManagerFacadeImpl extends ClientManagerFacade {
else else
return Collections.EMPTY_SET; return Collections.EMPTY_SET;
} }
/**
* The InternalClientManager interface.
* Connect to the router, receiving a message queue to talk to the router with.
* @throws I2PSessionException if the router isn't ready
* @since 0.8.3
*/
public I2CPMessageQueue connect() throws I2PSessionException {
if (_manager != null)
return _manager.internalConnect();
throw new I2PSessionException("No manager yet");
}
} }
...@@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue; ...@@ -8,6 +8,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageImpl; import net.i2p.data.i2cp.I2CPMessageImpl;
import net.i2p.data.i2cp.I2CPMessageException; import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.internal.PoisonI2CPMessage;
import net.i2p.router.RouterContext; import net.i2p.router.RouterContext;
import net.i2p.util.Log; import net.i2p.util.Log;
...@@ -52,7 +53,7 @@ class ClientWriterRunner implements Runnable { ...@@ -52,7 +53,7 @@ class ClientWriterRunner implements Runnable {
public void stopWriting() { public void stopWriting() {
_messagesToWrite.clear(); _messagesToWrite.clear();
try { try {
_messagesToWrite.put(new PoisonMessage()); _messagesToWrite.put(new PoisonI2CPMessage());
} catch (InterruptedException ie) {} } catch (InterruptedException ie) {}
} }
...@@ -64,23 +65,9 @@ class ClientWriterRunner implements Runnable { ...@@ -64,23 +65,9 @@ class ClientWriterRunner implements Runnable {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
continue; continue;
} }
if (msg.getType() == PoisonMessage.MESSAGE_TYPE) if (msg.getType() == PoisonI2CPMessage.MESSAGE_TYPE)
break; break;
_runner.writeMessage(msg); _runner.writeMessage(msg);
} }
} }
/**
* End-of-stream msg used to stop the concurrent queue
* See http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html
*
*/
private static class PoisonMessage extends I2CPMessageImpl {
public static final int MESSAGE_TYPE = 999999;
public int getType() {
return MESSAGE_TYPE;
}
public void doReadMessage(InputStream buf, int size) throws I2CPMessageException, IOException {}
public byte[] doWriteMessage() throws I2CPMessageException, IOException { return null; }
}
} }
package net.i2p.router.client;
import java.util.concurrent.BlockingQueue;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.internal.I2CPMessageQueue;
/**
* Contains the methods to talk to a router or client via I2CP,
* when both are in the same JVM.
* This interface contains methods to access two queues,
* one for transmission and one for receiving.
* The methods are identical to those in java.util.concurrent.BlockingQueue
*
* @author zzz
* @since 0.8.3
*/
class I2CPMessageQueueImpl extends I2CPMessageQueue {
private final BlockingQueue<I2CPMessage> _in;
private final BlockingQueue<I2CPMessage> _out;
public I2CPMessageQueueImpl(BlockingQueue<I2CPMessage> in, BlockingQueue<I2CPMessage> out) {
_in = in;
_out = out;
}
/**
* Send a message, nonblocking
* @return success (false if no space available)
*/
public boolean offer(I2CPMessage msg) {
return _out.offer(msg);
}
/**
* Receive a message, nonblocking
* @return message or null if none available
*/
public I2CPMessage poll() {
return _in.poll();
}
/**
* Send a message, blocking until space is available
*/
public void put(I2CPMessage msg) throws InterruptedException {
_out.put(msg);
}
/**
* Receive a message, blocking until one is available
* @return message
*/
public I2CPMessage take() throws InterruptedException {
return _in.take();
}
}
package net.i2p.router.client;
/*
* free (adj.): unencumbered; not under the control of others
* Written by jrandom in 2003 and released into the public domain
* with no warranty of any kind, either expressed or implied.
* It probably won't make your computer catch on fire, or eat
* your children, but it might. Use at your own risk.
*
*/
import java.io.IOException;
import java.net.Socket;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
import net.i2p.util.InternalServerSocket;
/**
* Listen for in-JVM connections on the internal "socket"
*
* @author zzz
* @since 0.7.9
*/
public class InternalClientListenerRunner extends ClientListenerRunner {
public InternalClientListenerRunner(RouterContext context, ClientManager manager, int port) {
super(context, manager, port);
_log = _context.logManager().getLog(InternalClientListenerRunner.class);
}
/**
* Start up the socket listener, listens for connections, and
* fires those connections off via {@link #runConnection runConnection}.
* This only returns if the socket cannot be opened or there is a catastrophic
* failure.
*
*/
public void runServer() {
try {
if (_log.shouldLog(Log.INFO))
_log.info("Listening on internal port " + _port);
_socket = new InternalServerSocket(_port);
if (_log.shouldLog(Log.DEBUG))
_log.debug("InternalServerSocket created, before accept: " + _socket);
_listening = true;
_running = true;
while (_running) {
try {
Socket socket = _socket.accept();
if (validate(socket)) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Internal connection received");
runConnection(socket);
} else {
if (_log.shouldLog(Log.WARN))
_log.warn("Refused connection from " + socket.getInetAddress());
try {
socket.close();
} catch (IOException ioe) {}
}
} catch (IOException ioe) {
if (_context.router().isAlive())
_log.error("Server error accepting", ioe);
} catch (Throwable t) {
if (_context.router().isAlive())
_log.error("Fatal error running client listener - killing the thread!", t);
_listening = false;
return;
}
}
} catch (IOException ioe) {
if (_context.router().isAlive())
_log.error("Error listening on internal port " + _port, ioe);
}
_listening = false;
if (_socket != null) {
try { _socket.close(); } catch (IOException ioe) {}
_socket = null;
}
if (_context.router().isAlive())
_log.error("CANCELING I2CP LISTEN", new Exception("I2CP Listen cancelled!!!"));
_running = false;
}
}
package net.i2p.router.client;
import java.io.IOException;
import net.i2p.data.i2cp.I2CPMessage;
import net.i2p.data.i2cp.I2CPMessageException;
import net.i2p.internal.I2CPMessageQueue;
import net.i2p.internal.QueuedI2CPMessageReader;
import net.i2p.router.RouterContext;
import net.i2p.util.Log;
/**
* Zero-copy in-JVM.
* While super() starts both a reader and a writer thread, we only need a reader thread here.
*
* @author zzz
* @since 0.8.3
*/
public class QueuedClientConnectionRunner extends ClientConnectionRunner {
private final I2CPMessageQueue queue;
/**
* Create a new runner with the given queues
*
*/
public QueuedClientConnectionRunner(RouterContext context, ClientManager manager, I2CPMessageQueue queue) {
super(context, manager, null);
this.queue = queue;
}
/**
* Starts the reader thread. Does not call super().
*/
@Override
public void startRunning() {
_reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this));
_reader.startReading();
}
/**
* Calls super() to stop the reader, and sends a poison message to the client.
*/
@Override
void stopRunning() {
super.stopRunning();
queue.close();
}
/**
* In super(), doSend queues it to the writer thread and
* the writer thread calls writeMessage() to write to the output stream.
* Since we have no writer thread this shouldn't happen.
*/
@Override
void writeMessage(I2CPMessage msg) {
throw new RuntimeException("huh?");
}
/**
* Actually send the I2CPMessage to the client.
* Nonblocking.
*/
@Override
void doSend(I2CPMessage msg) throws I2CPMessageException {
// This will never fail, for now, as the router uses unbounded queues
// Perhaps in the future we may want to use bounded queues,
// with non-blocking writes for the router
// and blocking writes for the client?
boolean success = queue.offer(msg);
if (!success)
throw new I2CPMessageException("I2CP write to queue failed");
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment