diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java index c333d384dc5b576274e406515c724eff78df3b62..30c3a0d678af853b4e25866d3567dc8c464dc962 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClient.java @@ -226,6 +226,8 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable Destination dest = I2PTunnel.destFromName(destination); if (dest == null) { l.log("Could not resolve " + destination + "."); + if (_log.shouldLog(Log.WARN)) + _log.warn("Unable to resolve " + destination + " (proxy? " + usingWWWProxy + ", request: " + targetRequest); writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, destination); s.close(); return; @@ -365,6 +367,9 @@ public class I2PTunnelHTTPClient extends I2PTunnelClientBase implements Runnable private void handleHTTPClientException(Exception ex, OutputStream out, String targetRequest, boolean usingWWWProxy, String wwwProxy) { + + if (_log.shouldLog(Log.WARN)) + _log.warn("Error sending to " + wwwProxy + " (proxy? " + usingWWWProxy + ", request: " + targetRequest, ex); if (out != null) { try { writeErrorMessage(ERR_DESTINATION_UNKNOWN, out, targetRequest, usingWWWProxy, wwwProxy); diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index b06f29edb365dde4f1ca64fefff11c17cf9a99e7..3ab620d5275f1466bafd6474f2c66d3ebcdfa370 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -131,8 +131,11 @@ class I2PSocketImpl implements I2PSocket { } catch (InterruptedException ex) { } - if ((maxWait >= 0) && (System.currentTimeMillis() >= dieAfter)) - throw new InterruptedIOException("Timed out waiting for remote ID"); + long now = System.currentTimeMillis(); + if ((maxWait >= 0) && (now >= dieAfter)) { + long waitedExcess = now - dieAfter; + throw new InterruptedIOException("Timed out waiting for remote ID (waited " + waitedExcess + "ms too long [" + maxWait + "ms])"); + } if (_log.shouldLog(Log.DEBUG)) _log.debug("TIMING: RemoteID set to " diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index b2593512380d022a58eb1a777969f4b6387941cd..9c648b4c5254d499c64a16db9f8bc6ce2a754a48 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -419,17 +419,25 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa void sendMessage(I2CPMessage message) throws I2PSessionException { if (isClosed()) throw new I2PSessionException("Already closed"); + long beforeSync = _context.clock().now(); + long inSync = 0; + try { synchronized (_out) { + inSync = _context.clock().now(); message.writeMessage(_out); _out.flush(); } - if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Message written out and flushed"); } catch (I2CPMessageException ime) { throw new I2PSessionException(getPrefix() + "Error writing out the message", ime); } catch (IOException ioe) { throw new I2PSessionException(getPrefix() + "Error writing out the message", ioe); } + long afterSync = _context.clock().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Message written out and flushed w/ " + + (inSync-beforeSync) + "ms to sync and " + + (afterSync-inSync) + "ms to send");; } /** diff --git a/core/java/src/net/i2p/client/I2PSessionImpl2.java b/core/java/src/net/i2p/client/I2PSessionImpl2.java index 381fd87474bef0f0832cb86d0bbb8eca9f3a2701..85af20561e14251aa93953487031a9b173df6505 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl2.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl2.java @@ -62,7 +62,6 @@ class I2PSessionImpl2 extends I2PSessionImpl { } public boolean sendMessage(Destination dest, byte[] payload) throws I2PSessionException { - return sendMessage(dest, payload, new SessionKey(), new HashSet(64)); } @@ -137,18 +136,29 @@ class I2PSessionImpl2 extends I2PSessionImpl { } } + long beforeSendingSync = _context.clock().now(); + long inSendingSync = 0; synchronized (_sendingStates) { + inSendingSync = _context.clock().now(); _sendingStates.add(state); } + long afterSendingSync = _context.clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " - + state.getNonce()); + + state.getNonce() + + " sync took " + (inSendingSync-beforeSendingSync) + + " add took " + (afterSendingSync-inSendingSync)); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); + long beforeWaitFor = _context.clock().now(); state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, _context.clock().now() + getTimeout()); + long afterWaitFor = _context.clock().now(); + long inRemovingSync = 0; synchronized (_sendingStates) { + inRemovingSync = _context.clock().now(); _sendingStates.remove(state); } + long afterRemovingSync = _context.clock().now(); boolean found = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "After waitFor sending state " + state.getMessageId().getMessageId() @@ -206,22 +216,34 @@ class I2PSessionImpl2 extends I2PSessionImpl { } } + long beforeSendingSync = _context.clock().now(); + long inSendingSync = 0; synchronized (_sendingStates) { + inSendingSync = _context.clock().now(); _sendingStates.add(state); } + long afterSendingSync = _context.clock().now(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Adding sending state " + state.getMessageId() + " / " - + state.getNonce()); + + state.getNonce() + + " sync took " + (inSendingSync-beforeSendingSync) + + " add took " + (afterSendingSync-inSendingSync)); _producer.sendMessage(this, dest, nonce, payload, tag, key, sentTags, newKey); + long beforeWaitFor = _context.clock().now(); if (isGuaranteed()) state.waitFor(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS, _context.clock().now() + SEND_TIMEOUT); else state.waitFor(MessageStatusMessage.STATUS_SEND_ACCEPTED, _context.clock().now() + SEND_TIMEOUT); + + long afterWaitFor = _context.clock().now(); + long inRemovingSync = 0; synchronized (_sendingStates) { + inRemovingSync = _context.clock().now(); _sendingStates.remove(state); } + long afterRemovingSync = _context.clock().now(); boolean found = state.received(MessageStatusMessage.STATUS_SEND_GUARANTEED_SUCCESS); boolean accepted = state.received(MessageStatusMessage.STATUS_SEND_ACCEPTED); @@ -229,7 +251,12 @@ class I2PSessionImpl2 extends I2PSessionImpl { if (_log.shouldLog(Log.CRIT)) _log.log(Log.CRIT, getPrefix() + "State with nonce " + state.getNonce() + " was not accepted? (no messageId!! found=" + found - + " msgId=" + state.getMessageId() + ")"); + + " msgId=" + state.getMessageId() + + ", sendingSync=" + (afterSendingSync-beforeSendingSync) + + ", sendMessage=" + (beforeWaitFor-afterSendingSync) + + ", waitFor=" + (afterWaitFor-beforeWaitFor) + + ", removingSync=" + (afterRemovingSync-afterWaitFor) + + ")"); //if (true) // throw new OutOfMemoryError("not really an OOM, but more of jr fucking shit up"); nackTags(state); @@ -280,7 +307,10 @@ class I2PSessionImpl2 extends I2PSessionImpl { public void receiveStatus(int msgId, long nonce, int status) { if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "Received status " + status + " for msgId " + msgId + " / " + nonce); MessageState state = null; + long beforeSync = _context.clock().now(); + long inSync = 0; synchronized (_sendingStates) { + inSync = _context.clock().now(); for (Iterator iter = _sendingStates.iterator(); iter.hasNext();) { state = (MessageState) iter.next(); if (_log.shouldLog(Log.DEBUG)) _log.debug(getPrefix() + "State " + state.getMessageId() + " / " + state.getNonce()); @@ -296,7 +326,12 @@ class I2PSessionImpl2 extends I2PSessionImpl { } } } + long afterSync = _context.clock().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("receiveStatus(" + msgId + ", " + nonce + ", " + status+ "): sync: " + + (inSync-beforeSync) + "ms, check: " + (afterSync-inSync)); + if (state != null) { if (state.getMessageId() == null) { MessageId id = new MessageId(); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index 5cdaac9c17cd4c3e015e1beab23d917a91307fea..3866d16caf6d6b53e49cf5189c14ee2e983152e1 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -363,8 +363,9 @@ public class ClientConnectionRunner { */ private class ClientWriterRunner implements Runnable { private List _messagesToWrite; + private long _lastAdded; public ClientWriterRunner() { - _messagesToWrite = new ArrayList(2); + _messagesToWrite = new ArrayList(4); } /** @@ -374,7 +375,8 @@ public class ClientConnectionRunner { public void addMessage(I2CPMessage msg) { synchronized (_messagesToWrite) { _messagesToWrite.add(msg); - _messagesToWrite.notify(); + _lastAdded = _context.clock().now(); + _messagesToWrite.notifyAll(); } } @@ -384,28 +386,40 @@ public class ClientConnectionRunner { */ public void stopWriting() { synchronized (_messagesToWrite) { - _messagesToWrite.notify(); + _messagesToWrite.notifyAll(); } } public void run() { while (!_dead) { I2CPMessage msg = null; + long beforeCheckSync = _context.clock().now(); + long inCheckSync = 0; synchronized (_messagesToWrite) { + inCheckSync = _context.clock().now(); if (_messagesToWrite.size() > 0) { - // we do this test before and after wait, in case more than - // one message gets enqueued msg = (I2CPMessage)_messagesToWrite.remove(0); } else { try { _messagesToWrite.wait(); - } catch (InterruptedException ie) {} - - if (_messagesToWrite.size() > 0) - msg = (I2CPMessage)_messagesToWrite.remove(0); + } catch (InterruptedException ie) { + if (_messagesToWrite.size() > 0) + msg = (I2CPMessage)_messagesToWrite.remove(0); + } } } - if (msg != null) + + long afterCheckSync = _context.clock().now(); + + if (msg != null) { writeMessage(msg); + long afterWriteMessage = _context.clock().now(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("writeMessage: check sync took " + + (inCheckSync-beforeCheckSync) + "ms, writemessage took " + + (afterWriteMessage-afterCheckSync) + + "ms, time since addMessage(): " + + + (afterCheckSync-_lastAdded)); + } } } @@ -417,7 +431,8 @@ public class ClientConnectionRunner { _out.flush(); } if (_log.shouldLog(Log.DEBUG)) - _log.debug("after doSend of a "+ msg.getClass().getName() + " message"); + _log.debug("after writeMessage("+ msg.getClass().getName() + "): " + + (_context.clock().now()-before) + "ms");; } catch (I2CPMessageException ime) { _log.error("Message exception sending I2CP message", ime); stopRunning(); @@ -456,7 +471,15 @@ public class ClientConnectionRunner { + _config.getDestination().calculateHash().toBase64()); } _writer.addMessage(msg); - + if (_log.shouldLog(Log.DEBUG)) { + if ( (_config == null) || (_config.getDestination() == null) ) + _log.debug("after doSend of a "+ msg.getClass().getName() + + " message on for establishing i2cp con"); + else + _log.debug("after doSend of a "+ msg.getClass().getName() + + " message on for " + + _config.getDestination().calculateHash().toBase64()); + } } // this *should* be mod 65536, but UnsignedInteger is still b0rked. FIXME