diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index cb5f7d1c9cb3ac39116e179f8f458947b5bec9de..03958f37858f3462bd89f3c5c2c2e1e36fed99d3 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -119,13 +119,14 @@ class ClientConnectionRunner { } private static volatile int __id = 0; + /** * Actually run the connection - listen for I2CP messages and respond. This * is the main driver for this class, though it gets all its meat from the * {@link net.i2p.data.i2cp.I2CPMessageReader I2CPMessageReader} * */ - public void startRunning() { + public synchronized void startRunning() { try { _reader = new I2CPMessageReader(new BufferedInputStream(_socket.getInputStream(), BUF_SIZE), new ClientMessageEventListener(_context, this, true)); @@ -137,13 +138,14 @@ class ClientConnectionRunner { t.start(); _out = _socket.getOutputStream(); // FIXME OWCH! needs a better way so it can be final. FIXME _reader.startReading(); + // TODO need a cleaner for unclaimed items in _messages, but we have no timestamps... } catch (IOException ioe) { _log.error("Error starting up the runner", ioe); } } /** die a horrible death */ - void stopRunning() { + public synchronized void stopRunning() { if (_dead) return; if (_context.router().isAlive() && _log.shouldLog(Log.WARN)) _log.warn("Stop the I2CP connection! current leaseSet: " @@ -171,16 +173,20 @@ class ClientConnectionRunner { public SessionConfig getConfig() { return _config; } /** current client's sessionkeymanager */ public SessionKeyManager getSessionKeyManager() { return _sessionKeyManager; } + /** currently allocated leaseSet */ public LeaseSet getLeaseSet() { return _currentLeaseSet; } void setLeaseSet(LeaseSet ls) { _currentLeaseSet = ls; } + public Hash getDestHash() { return _destHashCache; } /** current client's sessionId */ SessionId getSessionId() { return _sessionId; } void setSessionId(SessionId id) { if (id != null) _sessionId = id; } + /** data for the current leaseRequest, or null if there is no active leaseSet request */ LeaseRequestState getLeaseRequest() { return _leaseRequest; } + void setLeaseRequest(LeaseRequestState req) { synchronized (this) { if ( (_leaseRequest != null) && (req != _leaseRequest) ) @@ -188,6 +194,7 @@ class ClientConnectionRunner { _leaseRequest = req; } } + /** already closed? */ boolean isDead() { return _dead; } @@ -469,12 +476,14 @@ class ClientConnectionRunner { private final long _expirationTime; private final Job _onCreate; private final Job _onFailed; + public Rerequest(LeaseSet ls, long expirationTime, Job onCreate, Job onFailed) { _ls = ls; _expirationTime = expirationTime; _onCreate = onCreate; _onFailed = onFailed; } + public void timeReached() { requestLeaseSet(_ls, _expirationTime, _onCreate, _onFailed); } @@ -579,14 +588,14 @@ class ClientConnectionRunner { private final static long REQUEUE_DELAY = 500; private class MessageDeliveryStatusUpdate extends JobImpl { - private MessageId _messageId; - private boolean _success; + private final MessageId _messageId; + private final boolean _success; private long _lastTried; + public MessageDeliveryStatusUpdate(MessageId id, boolean success) { super(ClientConnectionRunner.this._context); _messageId = id; _success = success; - _lastTried = 0; } public String getName() { return "Update Delivery Status"; } diff --git a/router/java/src/net/i2p/router/client/ClientManager.java b/router/java/src/net/i2p/router/client/ClientManager.java index e4a5bc64dbd3797a0f83a76273c7dd3087f714b9..0b868aea6d2b65f882a26928d99f9e14b289a84a 100644 --- a/router/java/src/net/i2p/router/client/ClientManager.java +++ b/router/java/src/net/i2p/router/client/ClientManager.java @@ -238,7 +238,9 @@ class ClientManager { _payload = payload; _msgId = id; } + public String getName() { return "Distribute local message"; } + public void runJob() { _to.receiveMessage(_toDest, _fromDest, _payload); if (_from != null) { @@ -274,6 +276,7 @@ class ClientManager { } private static final int REQUEST_LEASESET_TIMEOUT = 120*1000; + public void requestLeaseSet(Hash dest, LeaseSet ls) { ClientConnectionRunner runner = getRunner(dest); if (runner != null) { @@ -298,6 +301,7 @@ class ClientManager { } return rv; } + public boolean isLocal(Hash destHash) { if (destHash == null) return false; synchronized (_runners) { @@ -480,18 +484,23 @@ class ClientManager { } public void messageReceived(ClientMessage msg) { - _ctx.jobQueue().addJob(new HandleJob(msg)); + // This is fast and non-blocking, run in-line + //_ctx.jobQueue().addJob(new HandleJob(msg)); + (new HandleJob(msg)).runJob(); } private class HandleJob extends JobImpl { - private ClientMessage _msg; + private final ClientMessage _msg; + public HandleJob(ClientMessage msg) { super(_ctx); _msg = msg; } + public String getName() { return "Handle Inbound Client Messages"; } + public void runJob() { - ClientConnectionRunner runner = null; + ClientConnectionRunner runner; if (_msg.getDestination() != null) runner = getRunner(_msg.getDestination()); else diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index b8def25a97b33406d80aebffc409ab5432e77894..8b16153de0af19a16f63f9ae741d760cc2057398 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -40,16 +40,22 @@ class MessageReceivedJob extends JobImpl { MessageId id = new MessageId(); id.setMessageId(_runner.getNextMessageId()); _runner.setPayload(id, _payload); - messageAvailable(id, _payload.getSize()); + try { + messageAvailable(id, _payload.getSize()); + } catch (I2CPMessageException ime) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error writing out the message status message", ime); + _runner.removePayload(id); + } } /** * Deliver notification to the client that the given message is available. */ - private void messageAvailable(MessageId id, long size) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId() - + " (with nonce=1)", new Exception("available")); + private void messageAvailable(MessageId id, long size) throws I2CPMessageException { + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId() + // + " (with nonce=1)", new Exception("available")); MessageStatusMessage msg = new MessageStatusMessage(); msg.setMessageId(id.getMessageId()); msg.setSessionId(_runner.getSessionId().getSessionId()); @@ -57,11 +63,6 @@ class MessageReceivedJob extends JobImpl { // has to be >= 0, it is initialized to -1 msg.setNonce(1); msg.setStatus(MessageStatusMessage.STATUS_AVAILABLE); - try { - _runner.doSend(msg); - } catch (I2CPMessageException ime) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error writing out the message status message", ime); - } + _runner.doSend(msg); } } diff --git a/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java index 7b16bae02051262189568ce0148f1165ceb56d93..7f23f051bc4dfb81d9a496f7136e5fabf12ba639 100644 --- a/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/QueuedClientConnectionRunner.java @@ -31,7 +31,7 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner { * Starts the reader thread. Does not call super(). */ @Override - public void startRunning() { + public synchronized void startRunning() { _reader = new QueuedI2CPMessageReader(this.queue, new ClientMessageEventListener(_context, this, false)); _reader.startReading(); } @@ -40,7 +40,7 @@ class QueuedClientConnectionRunner extends ClientConnectionRunner { * Calls super() to stop the reader, and sends a poison message to the client. */ @Override - void stopRunning() { + public synchronized void stopRunning() { super.stopRunning(); queue.close(); // queue = null;