diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java index 0585c240f83be6680d1b2a634215cc78184cb575..437e679e4885c029f52a6dd680986fa40054046d 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnel.java @@ -1146,6 +1146,8 @@ public class I2PTunnel implements Logging, EventDispatcher { } private String getPrefix() { return '[' + _tunnelId + "]: "; } + + public I2PAppContext getContext() { return _context; } /** * Call this whenever we lose touch with the router involuntarily (aka the router diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index e8eb3eed4b991c5d10691ae96e30eef982b88b40..a5c219022163d39bb462430af1d9b1e07c72d879 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -35,85 +35,66 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { public I2PTunnelHTTPServer(InetAddress host, int port, String privData, String spoofHost, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super(host, port, privData, l, notifyThis, tunnel); _spoofHost = spoofHost; + getTunnel().getContext().statManager().createRateStat("i2ptunnel.httpserver.blockingHandleTime", "how long the blocking handle takes to complete", "I2PTunnel.HTTPServer", new long[] { 60*1000, 10*60*1000, 3*60*60*1000 }); } public I2PTunnelHTTPServer(InetAddress host, int port, File privkey, String privkeyname, String spoofHost, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super(host, port, privkey, privkeyname, l, notifyThis, tunnel); _spoofHost = spoofHost; + getTunnel().getContext().statManager().createRateStat("i2ptunnel.httpserver.blockingHandleTime", "how long the blocking handle takes to complete", "I2PTunnel.HTTPServer", new long[] { 60*1000, 10*60*1000, 3*60*60*1000 }); } public I2PTunnelHTTPServer(InetAddress host, int port, InputStream privData, String privkeyname, String spoofHost, Logging l, EventDispatcher notifyThis, I2PTunnel tunnel) { super(host, port, privData, privkeyname, l, notifyThis, tunnel); - _spoofHost = spoofHost; + _spoofHost = spoofHost; + getTunnel().getContext().statManager().createRateStat("i2ptunnel.httpserver.blockingHandleTime", "how long the blocking handle takes to complete", "I2PTunnel.HTTPServer", new long[] { 60*1000, 10*60*1000, 3*60*60*1000 }); } - public void run() { - try { - I2PServerSocket i2pss = sockMgr.getServerSocket(); - while (true) { - I2PSocket i2ps = i2pss.accept(); - if (i2ps == null) throw new I2PException("I2PServerSocket closed"); - I2PThread t = new I2PThread(new Handler(i2ps)); - t.start(); - } - } catch (I2PException ex) { - _log.error("Error while waiting for I2PConnections", ex); - } catch (IOException ex) { - _log.error("Error while waiting for I2PConnections", ex); - } - } - /** - * Async handler to keep .accept() from blocking too long. - * todo: replace with a thread pool so we dont get overrun by threads if/when - * receiving a lot of connection requests concurrently. + * Called by the thread pool of I2PSocket handlers * */ - private class Handler implements Runnable { - private I2PSocket _handleSocket; - public Handler(I2PSocket socket) { - _handleSocket = socket; - } - public void run() { - long afterAccept = I2PAppContext.getGlobalContext().clock().now(); - long afterSocket = -1; - - //local is fast, so synchronously. Does not need that many - //threads. + protected void blockingHandle(I2PSocket socket) { + long afterAccept = getTunnel().getContext().clock().now(); + long afterSocket = -1; + //local is fast, so synchronously. Does not need that many + //threads. + try { + socket.setReadTimeout(readTimeout); + String modifiedHeader = getModifiedHeader(socket); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Modified header: [" + modifiedHeader + "]"); + + Socket s = new Socket(remoteHost, remotePort); + afterSocket = getTunnel().getContext().clock().now(); + new I2PTunnelRunner(s, socket, slock, null, modifiedHeader.getBytes(), null); + } catch (SocketException ex) { try { - _handleSocket.setReadTimeout(readTimeout); - String modifiedHeader = getModifiedHeader(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Modified header: [" + modifiedHeader + "]"); - - Socket s = new Socket(remoteHost, remotePort); - afterSocket = I2PAppContext.getGlobalContext().clock().now(); - new I2PTunnelRunner(s, _handleSocket, slock, null, modifiedHeader.getBytes(), null); - } catch (SocketException ex) { - try { - _handleSocket.close(); - } catch (IOException ioe) { + socket.close(); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) _log.error("Error while closing the received i2p con", ex); - } - } catch (IOException ex) { - _log.error("Error while handling for I2PConnections", ex); } - - long afterHandle = I2PAppContext.getGlobalContext().clock().now(); - long timeToHandle = afterHandle - afterAccept; - if (timeToHandle > 1000) - _log.warn("Took a while to handle the request [" + timeToHandle + ", socket create: " - + (afterSocket-afterAccept) + "]"); - } - private String getModifiedHeader() throws IOException { - InputStream in = _handleSocket.getInputStream(); - - StringBuffer command = new StringBuffer(128); - Properties headers = readHeaders(in, command); - headers.setProperty("Host", _spoofHost); - headers.setProperty("Connection", "close"); - return formatHeaders(headers, command); + } catch (IOException ex) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error while receiving the new HTTP request", ex); } + + long afterHandle = getTunnel().getContext().clock().now(); + long timeToHandle = afterHandle - afterAccept; + getTunnel().getContext().statManager().addRateData("i2ptunnel.httpserver.blockingHandleTime", timeToHandle, 0); + if ( (timeToHandle > 1000) && (_log.shouldLog(Log.WARN)) ) + _log.warn("Took a while to handle the request [" + timeToHandle + ", socket create: " + (afterSocket-afterAccept) + "]"); + } + + private String getModifiedHeader(I2PSocket handleSocket) throws IOException { + InputStream in = handleSocket.getInputStream(); + + StringBuffer command = new StringBuffer(128); + Properties headers = readHeaders(in, command); + headers.setProperty("Host", _spoofHost); + headers.setProperty("Connection", "close"); + return formatHeaders(headers, command); } private String formatHeaders(Properties headers, StringBuffer command) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java index 64a4708e2313f580e85f36fdf1f7819418480401..608d36ba89b9c5b8abd7e4d3fc121ea27cd10f08 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelServer.java @@ -189,7 +189,7 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { public void run() { while (open) { try { - handle(_serverSocket.accept()); + blockingHandle(_serverSocket.accept()); } catch (I2PException ex) { _log.error("Error while waiting for I2PConnections", ex); return; @@ -199,32 +199,32 @@ public class I2PTunnelServer extends I2PTunnelTask implements Runnable { } } } + } - private void handle(I2PSocket socket) { - long afterAccept = I2PAppContext.getGlobalContext().clock().now(); - long afterSocket = -1; - //local is fast, so synchronously. Does not need that many - //threads. + protected void blockingHandle(I2PSocket socket) { + long afterAccept = I2PAppContext.getGlobalContext().clock().now(); + long afterSocket = -1; + //local is fast, so synchronously. Does not need that many + //threads. + try { + socket.setReadTimeout(readTimeout); + Socket s = new Socket(remoteHost, remotePort); + afterSocket = I2PAppContext.getGlobalContext().clock().now(); + new I2PTunnelRunner(s, socket, slock, null, null); + } catch (SocketException ex) { try { - socket.setReadTimeout(readTimeout); - Socket s = new Socket(remoteHost, remotePort); - afterSocket = I2PAppContext.getGlobalContext().clock().now(); - new I2PTunnelRunner(s, socket, slock, null, null); - } catch (SocketException ex) { - try { - socket.close(); - } catch (IOException ioe) { - _log.error("Error while closing the received i2p con", ex); - } - } catch (IOException ex) { - _log.error("Error while waiting for I2PConnections", ex); + socket.close(); + } catch (IOException ioe) { + _log.error("Error while closing the received i2p con", ex); } - - long afterHandle = I2PAppContext.getGlobalContext().clock().now(); - long timeToHandle = afterHandle - afterAccept; - if (timeToHandle > 1000) - _log.warn("Took a while to handle the request [" + timeToHandle + ", socket create: " + (afterSocket-afterAccept) + "]"); + } catch (IOException ex) { + _log.error("Error while waiting for I2PConnections", ex); } + + long afterHandle = I2PAppContext.getGlobalContext().clock().now(); + long timeToHandle = afterHandle - afterAccept; + if (timeToHandle > 1000) + _log.warn("Took a while to handle the request [" + timeToHandle + ", socket create: " + (afterSocket-afterAccept) + "]"); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 5d91d021fa81da86958cd6983710c3b87a0f71d0..da55bc3d357d316fc8a5117d350a0d0d0ac66061 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -91,8 +91,8 @@ public class MessageHandler implements I2PSessionListener { * */ public void errorOccurred(I2PSession session, String message, Throwable error) { - if (_log.shouldLog(Log.ERROR)) - _log.error("error occurred: " + message + "- " + error.getMessage()); + if (_log.shouldLog(Log.WARN)) + _log.warn("error occurred: " + message + "- " + error.getMessage()); if (_log.shouldLog(Log.WARN)) _log.warn("cause", error); //_manager.disconnectAllHard(); diff --git a/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java b/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java index 9551698fc0013c10edac9b92bc104420b032ad8a..90c111c15119a9dbde4b36024367a40bc8b350ad 100644 --- a/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java +++ b/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java @@ -29,20 +29,27 @@ import net.i2p.util.Log; class I2PClientMessageHandlerMap { private final static Log _log = new Log(I2PClientMessageHandlerMap.class); /** map of message type id --> I2CPMessageHandler */ - private Map _handlers; + private I2CPMessageHandler _handlers[]; public I2PClientMessageHandlerMap(I2PAppContext context) { - _handlers = new HashMap(); - _handlers.put(new Integer(DisconnectMessage.MESSAGE_TYPE), new DisconnectMessageHandler(context)); - _handlers.put(new Integer(SessionStatusMessage.MESSAGE_TYPE), new SessionStatusMessageHandler(context)); - _handlers.put(new Integer(RequestLeaseSetMessage.MESSAGE_TYPE), new RequestLeaseSetMessageHandler(context)); - _handlers.put(new Integer(MessagePayloadMessage.MESSAGE_TYPE), new MessagePayloadMessageHandler(context)); - _handlers.put(new Integer(MessageStatusMessage.MESSAGE_TYPE), new MessageStatusMessageHandler(context)); - _handlers.put(new Integer(SetDateMessage.MESSAGE_TYPE), new SetDateMessageHandler(context)); + int highest = DisconnectMessage.MESSAGE_TYPE; + highest = Math.max(highest, SessionStatusMessage.MESSAGE_TYPE); + highest = Math.max(highest, RequestLeaseSetMessage.MESSAGE_TYPE); + highest = Math.max(highest, MessagePayloadMessage.MESSAGE_TYPE); + highest = Math.max(highest, MessageStatusMessage.MESSAGE_TYPE); + highest = Math.max(highest, SetDateMessage.MESSAGE_TYPE); + + _handlers = new I2CPMessageHandler[highest+1]; + _handlers[DisconnectMessage.MESSAGE_TYPE] = new DisconnectMessageHandler(context); + _handlers[SessionStatusMessage.MESSAGE_TYPE] = new SessionStatusMessageHandler(context); + _handlers[RequestLeaseSetMessage.MESSAGE_TYPE] = new RequestLeaseSetMessageHandler(context); + _handlers[MessagePayloadMessage.MESSAGE_TYPE] = new MessagePayloadMessageHandler(context); + _handlers[MessageStatusMessage.MESSAGE_TYPE] = new MessageStatusMessageHandler(context); + _handlers[SetDateMessage.MESSAGE_TYPE] = new SetDateMessageHandler(context); } public I2CPMessageHandler getHandler(int messageTypeId) { - I2CPMessageHandler handler = (I2CPMessageHandler) _handlers.get(new Integer(messageTypeId)); - return handler; + if ( (messageTypeId < 0) || (messageTypeId >= _handlers.length) ) return null; + return _handlers[messageTypeId]; } } \ No newline at end of file diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index c4fc19f226360ec17714fcfabf55d0b66c9f94e8..cbc0473414170138baeb3c46a4dd18744d8d0f67 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -78,7 +78,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa /** class that generates new messages */ protected I2CPMessageProducer _producer; - /** map of integer --> MessagePayloadMessage */ + /** map of Long --> MessagePayloadMessage */ private Map _availableMessages; protected I2PClientMessageHandlerMap _handlerMap; @@ -295,7 +295,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa public byte[] receiveMessage(int msgId) throws I2PSessionException { MessagePayloadMessage msg = null; synchronized (_availableMessages) { - msg = (MessagePayloadMessage) _availableMessages.remove(new Integer(msgId)); + msg = (MessagePayloadMessage) _availableMessages.remove(new Long(msgId)); } if (msg == null) return null; return msg.getPayload().getUnencryptedData(); @@ -346,9 +346,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa */ public void addNewMessage(MessagePayloadMessage msg) { synchronized (_availableMessages) { - _availableMessages.put(new Integer(msg.getMessageId().getMessageId()), msg); + _availableMessages.put(new Long(msg.getMessageId()), msg); } - int id = msg.getMessageId().getMessageId(); + long id = msg.getMessageId(); byte data[] = msg.getPayload().getUnencryptedData(); if ((data == null) || (data.length <= 0)) { if (_log.shouldLog(Log.CRIT)) @@ -363,12 +363,12 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa SimpleTimer.getInstance().addEvent(new VerifyUsage(id), 30*1000); } private class VerifyUsage implements SimpleTimer.TimedEvent { - private int _msgId; - public VerifyUsage(int id) { _msgId = id; } + private long _msgId; + public VerifyUsage(long id) { _msgId = id; } public void timeReached() { MessagePayloadMessage removed = null; synchronized (_availableMessages) { - removed = (MessagePayloadMessage)_availableMessages.remove(new Integer(_msgId)); + removed = (MessagePayloadMessage)_availableMessages.remove(new Long(_msgId)); } if (removed != null) _log.log(Log.CRIT, "Message NOT removed! id=" + _msgId + ": " + removed); @@ -393,9 +393,9 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } } - public void available(int msgId, int size) { + public void available(long msgId, int size) { synchronized (AvailabilityNotifier.this) { - _pendingIds.add(new Integer(msgId)); + _pendingIds.add(new Long(msgId)); _pendingSizes.add(new Integer(size)); AvailabilityNotifier.this.notifyAll(); } @@ -403,7 +403,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa public void run() { _alive = true; while (_alive) { - Integer msgId = null; + Long msgId = null; Integer size = null; synchronized (AvailabilityNotifier.this) { if (_pendingIds.size() <= 0) { @@ -413,7 +413,7 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa } } if (_pendingIds.size() > 0) { - msgId = (Integer)_pendingIds.remove(0); + msgId = (Long)_pendingIds.remove(0); size = (Integer)_pendingSizes.remove(0); } } @@ -532,8 +532,8 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa * Pass off the error to the listener */ void propogateError(String msg, Throwable error) { - if (_log.shouldLog(Log.ERROR)) - _log.error(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage()); + if (_log.shouldLog(Log.WARN)) + _log.warn(getPrefix() + "Error occurred: " + msg + " - " + error.getMessage()); if (_log.shouldLog(Log.WARN)) _log.warn(getPrefix() + " cause", error); diff --git a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java index ee51a1eef4318b5bfe2bd12d17fb3260affd03ca..eec16feea86ee22329d685987762eb9534cd5f18 100644 --- a/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java +++ b/core/java/src/net/i2p/client/MessagePayloadMessageHandler.java @@ -35,7 +35,7 @@ class MessagePayloadMessageHandler extends HandlerImpl { _log.debug("Handle message " + message); try { MessagePayloadMessage msg = (MessagePayloadMessage) message; - MessageId id = msg.getMessageId(); + long id = msg.getMessageId(); Payload payload = decryptPayload(msg, session); session.addNewMessage(msg); diff --git a/core/java/src/net/i2p/client/MessageStatusMessageHandler.java b/core/java/src/net/i2p/client/MessageStatusMessageHandler.java index e32f14e7245e7f10cc6cc58f79973836ff2c29e8..41d59304b43b40cac41d88bc3ab1d8f11442b9e5 100644 --- a/core/java/src/net/i2p/client/MessageStatusMessageHandler.java +++ b/core/java/src/net/i2p/client/MessageStatusMessageHandler.java @@ -46,7 +46,7 @@ class MessageStatusMessageHandler extends HandlerImpl { } return; case MessageStatusMessage.STATUS_SEND_ACCEPTED: - session.receiveStatus(msg.getMessageId().getMessageId(), msg.getNonce(), msg.getStatus()); + session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); // noop return; case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_SUCCESS: @@ -54,14 +54,14 @@ class MessageStatusMessageHandler extends HandlerImpl { if (_log.shouldLog(Log.INFO)) _log.info("Message delivery succeeded for message " + msg.getMessageId()); //if (!skipStatus) - session.receiveStatus(msg.getMessageId().getMessageId(), msg.getNonce(), msg.getStatus()); + session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); return; case MessageStatusMessage.STATUS_SEND_BEST_EFFORT_FAILURE: case MessageStatusMessage.STATUS_SEND_GUARANTEED_FAILURE: if (_log.shouldLog(Log.INFO)) _log.info("Message delivery FAILED for message " + msg.getMessageId()); //if (!skipStatus) - session.receiveStatus(msg.getMessageId().getMessageId(), msg.getNonce(), msg.getStatus()); + session.receiveStatus((int)msg.getMessageId(), msg.getNonce(), msg.getStatus()); return; default: if (_log.shouldLog(Log.ERROR)) diff --git a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java index ae3478902d13c228e0ca2125f068ab041ee12222..37883743930c6c390a4b78471162790612423c8d 100644 --- a/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java +++ b/core/java/src/net/i2p/data/i2cp/I2CPMessageImpl.java @@ -61,6 +61,7 @@ public abstract class I2CPMessageImpl extends DataStructureImpl implements I2CPM + " class: " + getClass().getName() + ")"); if (length < 0) throw new IOException("Negative payload size"); + /* byte buf[] = new byte[length]; int read = DataHelper.read(in, buf); if (read != length) @@ -69,6 +70,8 @@ public abstract class I2CPMessageImpl extends DataStructureImpl implements I2CPM ByteArrayInputStream bis = new ByteArrayInputStream(buf); doReadMessage(bis, length); + */ + doReadMessage(in, length); } /** diff --git a/core/java/src/net/i2p/data/i2cp/MessageId.java b/core/java/src/net/i2p/data/i2cp/MessageId.java index 4d91e814824ac8fef8e69bdf8a323bc4900100b2..350809d02e69587bec75cd4838da5eec74787d5d 100644 --- a/core/java/src/net/i2p/data/i2cp/MessageId.java +++ b/core/java/src/net/i2p/data/i2cp/MessageId.java @@ -26,22 +26,25 @@ import net.i2p.util.Log; */ public class MessageId extends DataStructureImpl { private final static Log _log = new Log(MessageId.class); - private int _messageId; + private long _messageId; public MessageId() { setMessageId(-1); } + public MessageId(long id) { + setMessageId(id); + } - public int getMessageId() { + public long getMessageId() { return _messageId; } - public void setMessageId(int id) { + public void setMessageId(long id) { _messageId = id; } public void readBytes(InputStream in) throws DataFormatException, IOException { - _messageId = (int) DataHelper.readLong(in, 4); + _messageId = DataHelper.readLong(in, 4); } public void writeBytes(OutputStream out) throws DataFormatException, IOException { @@ -55,7 +58,7 @@ public class MessageId extends DataStructureImpl { } public int hashCode() { - return getMessageId(); + return (int)getMessageId(); } public String toString() { diff --git a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java index b51e9c2fc096588261911ab236629d755bb99fb2..1a8bddad9accff05db5a0b907ffa353fde4a62dd 100644 --- a/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessagePayloadMessage.java @@ -27,29 +27,29 @@ import net.i2p.util.Log; public class MessagePayloadMessage extends I2CPMessageImpl { private final static Log _log = new Log(MessagePayloadMessage.class); public final static int MESSAGE_TYPE = 31; - private SessionId _sessionId; - private MessageId _messageId; + private long _sessionId; + private long _messageId; private Payload _payload; public MessagePayloadMessage() { - setSessionId(null); - setMessageId(null); + setSessionId(-1); + setMessageId(-1); setPayload(null); } - public SessionId getSessionId() { + public long getSessionId() { return _sessionId; } - public void setSessionId(SessionId id) { + public void setSessionId(long id) { _sessionId = id; } - public MessageId getMessageId() { + public long getMessageId() { return _messageId; } - public void setMessageId(MessageId id) { + public void setMessageId(long id) { _messageId = id; } @@ -63,10 +63,8 @@ public class MessagePayloadMessage extends I2CPMessageImpl { protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { try { - _sessionId = new SessionId(); - _sessionId.readBytes(in); - _messageId = new MessageId(); - _messageId.readBytes(in); + _sessionId = DataHelper.readLong(in, 2); + _messageId = DataHelper.readLong(in, 4); _payload = new Payload(); _payload.readBytes(in); } catch (DataFormatException dfe) { @@ -84,9 +82,9 @@ public class MessagePayloadMessage extends I2CPMessageImpl { * */ public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { - if (_sessionId == null) + if (_sessionId <= 0) throw new I2CPMessageException("Unable to write out the message, as the session ID has not been defined"); - if (_messageId == null) + if (_messageId < 0) throw new I2CPMessageException("Unable to write out the message, as the message ID has not been defined"); if (_payload == null) throw new I2CPMessageException("Unable to write out the message, as the payload has not been defined"); @@ -95,8 +93,8 @@ public class MessagePayloadMessage extends I2CPMessageImpl { try { DataHelper.writeLong(out, 4, size); DataHelper.writeLong(out, 1, getType()); - DataHelper.writeLong(out, 2, _sessionId.getSessionId()); - DataHelper.writeLong(out, 4, _messageId.getMessageId()); + DataHelper.writeLong(out, 2, _sessionId); + DataHelper.writeLong(out, 4, _messageId); DataHelper.writeLong(out, 4, _payload.getSize()); out.write(_payload.getEncryptedData()); } catch (DataFormatException dfe) { diff --git a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java index e11a8cacaf5b02a2fec749a5a67c75616a0022b3..917b53ba0694962ea8ad84e4c7c7f6182ba9aaa6 100644 --- a/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java +++ b/core/java/src/net/i2p/data/i2cp/MessageStatusMessage.java @@ -12,6 +12,7 @@ package net.i2p.data.i2cp; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -26,8 +27,8 @@ import net.i2p.util.Log; public class MessageStatusMessage extends I2CPMessageImpl { private final static Log _log = new Log(SessionStatusMessage.class); public final static int MESSAGE_TYPE = 22; - private SessionId _sessionId; - private MessageId _messageId; + private long _sessionId; + private long _messageId; private long _nonce; private long _size; private int _status; @@ -40,18 +41,18 @@ public class MessageStatusMessage extends I2CPMessageImpl { public final static int STATUS_SEND_GUARANTEED_FAILURE = 5; public MessageStatusMessage() { - setSessionId(null); + setSessionId(-1); setStatus(-1); - setMessageId(null); + setMessageId(-1); setSize(-1); setNonce(-1); } - public SessionId getSessionId() { + public long getSessionId() { return _sessionId; } - public void setSessionId(SessionId id) { + public void setSessionId(long id) { _sessionId = id; } @@ -63,11 +64,11 @@ public class MessageStatusMessage extends I2CPMessageImpl { _status = status; } - public MessageId getMessageId() { + public long getMessageId() { return _messageId; } - public void setMessageId(MessageId id) { + public void setMessageId(long id) { _messageId = id; } @@ -108,10 +109,8 @@ public class MessageStatusMessage extends I2CPMessageImpl { protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { try { - _sessionId = new SessionId(); - _sessionId.readBytes(in); - _messageId = new MessageId(); - _messageId.readBytes(in); + _sessionId = DataHelper.readLong(in, 2); + _messageId = DataHelper.readLong(in, 4); _status = (int) DataHelper.readLong(in, 1); _size = DataHelper.readLong(in, 4); _nonce = DataHelper.readLong(in, 4); @@ -120,20 +119,32 @@ public class MessageStatusMessage extends I2CPMessageImpl { } } - protected byte[] doWriteMessage() throws I2CPMessageException, IOException { - if ((_sessionId == null) || (_messageId == null) || (_status < 0) || (_nonce <= 0)) - throw new I2CPMessageException("Unable to write out the message as there is not enough data"); - ByteArrayOutputStream os = new ByteArrayOutputStream(64); + + /** + * Override to reduce mem churn + */ + public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { + int len = 2 + // sessionId + 4 + // messageId + 1 + // status + 4 + // size + 4; // nonce + try { - _sessionId.writeBytes(os); - _messageId.writeBytes(os); - DataHelper.writeLong(os, 1, _status); - DataHelper.writeLong(os, 4, _size); - DataHelper.writeLong(os, 4, _nonce); + DataHelper.writeLong(out, 4, len); + DataHelper.writeLong(out, 1, getType()); + DataHelper.writeLong(out, 2, _sessionId); + DataHelper.writeLong(out, 4, _messageId); + DataHelper.writeLong(out, 1, _status); + DataHelper.writeLong(out, 4, _size); + DataHelper.writeLong(out, 4, _nonce); } catch (DataFormatException dfe) { - throw new I2CPMessageException("Error writing out the message data", dfe); + throw new I2CPMessageException("Unable to write the message length or type", dfe); } - return os.toByteArray(); + } + + protected byte[] doWriteMessage() throws I2CPMessageException, IOException { + throw new UnsupportedOperationException("This shouldn't be called... use writeMessage(out)"); } public int getType() { diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java index 4c27ca9cce0e1ae9da09c966dfa3b9adbb3dd806..473dd4f8c8f5ddf7f5100d293dcb4d5da04d7aac 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageBeginMessage.java @@ -12,6 +12,7 @@ package net.i2p.data.i2cp; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -26,50 +27,62 @@ import net.i2p.util.Log; public class ReceiveMessageBeginMessage extends I2CPMessageImpl { private final static Log _log = new Log(ReceiveMessageBeginMessage.class); public final static int MESSAGE_TYPE = 6; - private SessionId _sessionId; - private MessageId _messageId; + private long _sessionId; + private long _messageId; public ReceiveMessageBeginMessage() { - setSessionId(null); - setMessageId(null); + setSessionId(-1); + setMessageId(-1); } - public SessionId getSessionId() { + public long getSessionId() { return _sessionId; } - public void setSessionId(SessionId id) { + public void setSessionId(long id) { _sessionId = id; } - public MessageId getMessageId() { + public long getMessageId() { return _messageId; } - public void setMessageId(MessageId id) { + public void setMessageId(long id) { _messageId = id; } protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { try { - _sessionId = new SessionId(); - _sessionId.readBytes(in); - _messageId = new MessageId(); - _messageId.readBytes(in); + _sessionId = DataHelper.readLong(in, 2); + _messageId = DataHelper.readLong(in, 4); } catch (DataFormatException dfe) { throw new I2CPMessageException("Unable to load the message data", dfe); } } protected byte[] doWriteMessage() throws I2CPMessageException, IOException { - if ((_sessionId == null) || (_messageId == null)) - throw new I2CPMessageException("Unable to write out the message as there is not enough data"); - byte rv[] = new byte[2+4]; - DataHelper.toLong(rv, 0, 2, _sessionId.getSessionId()); - DataHelper.toLong(rv, 2, 4, _messageId.getMessageId()); - return rv; + throw new UnsupportedOperationException("This shouldn't be called... use writeMessage(out)"); } + + /** + * Override to reduce mem churn + */ + public void writeMessage(OutputStream out) throws I2CPMessageException, IOException { + int len = 2 + // sessionId + 4; // messageId + + try { + DataHelper.writeLong(out, 4, len); + DataHelper.writeLong(out, 1, getType()); + DataHelper.writeLong(out, 2, _sessionId); + DataHelper.writeLong(out, 4, _messageId); + } catch (DataFormatException dfe) { + throw new I2CPMessageException("Unable to write the message length or type", dfe); + } + } + + public int getType() { return MESSAGE_TYPE; } diff --git a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java index eb7adccade79fc516dca87528123817798ee5e90..9ecb016a0a6d7bebc7d64c6a8a731b61e507ab2d 100644 --- a/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java +++ b/core/java/src/net/i2p/data/i2cp/ReceiveMessageEndMessage.java @@ -26,47 +26,45 @@ import net.i2p.util.Log; public class ReceiveMessageEndMessage extends I2CPMessageImpl { private final static Log _log = new Log(ReceiveMessageEndMessage.class); public final static int MESSAGE_TYPE = 7; - private SessionId _sessionId; - private MessageId _messageId; + private long _sessionId; + private long _messageId; public ReceiveMessageEndMessage() { - setSessionId(null); - setMessageId(null); + setSessionId(-1); + setMessageId(-1); } - public SessionId getSessionId() { + public long getSessionId() { return _sessionId; } - public void setSessionId(SessionId id) { + public void setSessionId(long id) { _sessionId = id; } - public MessageId getMessageId() { + public long getMessageId() { return _messageId; } - public void setMessageId(MessageId id) { + public void setMessageId(long id) { _messageId = id; } protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { try { - _sessionId = new SessionId(); - _sessionId.readBytes(in); - _messageId = new MessageId(); - _messageId.readBytes(in); + _sessionId = DataHelper.readLong(in, 2); + _messageId = DataHelper.readLong(in, 4); } catch (DataFormatException dfe) { throw new I2CPMessageException("Unable to load the message data", dfe); } } protected byte[] doWriteMessage() throws I2CPMessageException, IOException { - if ((_sessionId == null) || (_messageId == null)) + if ((_sessionId < 0) || (_messageId < 0)) throw new I2CPMessageException("Unable to write out the message as there is not enough data"); byte rv[] = new byte[2+4]; - DataHelper.toLong(rv, 0, 2, _sessionId.getSessionId()); - DataHelper.toLong(rv, 2, 4, _messageId.getMessageId()); + DataHelper.toLong(rv, 0, 2, _sessionId); + DataHelper.toLong(rv, 2, 4, _messageId); return rv; } diff --git a/history.txt b/history.txt index c6303a94e02b121a70e28b11a62b079d2ba35d40..c30f9e23dafd9b41ce42f8824f4dc03b6e291506 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,10 @@ -$Id: history.txt,v 1.215 2005/07/20 14:24:47 jrandom Exp $ +$Id: history.txt,v 1.216 2005/07/21 17:37:16 jrandom Exp $ + +2005-07-22 jrandom + * Use the small thread pool for I2PTunnelHTTPServer (already used for + I2PTunnelServer) + * Minor memory churn reduction in I2CP + * Small stats update 2005-07-21 jrandom * Fix in the SDK for a bug which would manifest itself as misrouted diff --git a/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java b/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java index 94368d92f63351caf6f5193cb1599f1c97f31135..0b5571a57ea9c69800953de076fb2033e626b553 100644 --- a/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java +++ b/router/java/src/net/i2p/data/i2np/DeliveryInstructions.java @@ -216,60 +216,94 @@ public class DeliveryInstructions extends DataStructureImpl { val = val | fmode; if (getDelayRequested()) val = val | FLAG_DELAY; - _log.debug("getFlags() = " + val); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("getFlags() = " + val); return val; } private byte[] getAdditionalInfo() throws DataFormatException { - ByteArrayOutputStream baos = new ByteArrayOutputStream(64); - try { - if (getEncrypted()) { - if (_encryptionKey == null) throw new DataFormatException("Encryption key is not set"); - _encryptionKey.writeBytes(baos); + int additionalSize = 0; + if (getEncrypted()) { + if (_encryptionKey == null) throw new DataFormatException("Encryption key is not set"); + additionalSize += SessionKey.KEYSIZE_BYTES; + } + switch (getDeliveryMode()) { + case FLAG_MODE_LOCAL: if (_log.shouldLog(Log.DEBUG)) - _log.debug("IsEncrypted"); - } else { + _log.debug("mode = local"); + break; + case FLAG_MODE_DESTINATION: + if (_destinationHash == null) throw new DataFormatException("Destination hash is not set"); + additionalSize += Hash.HASH_LENGTH; + break; + case FLAG_MODE_ROUTER: + if (_routerHash == null) throw new DataFormatException("Router hash is not set"); + additionalSize += Hash.HASH_LENGTH; + break; + case FLAG_MODE_TUNNEL: + if ( (_routerHash == null) || (_tunnelId == null) ) throw new DataFormatException("Router hash or tunnel ID is not set"); + additionalSize += Hash.HASH_LENGTH; + additionalSize += 4; // tunnelId + break; + } + + if (getDelayRequested()) { + additionalSize += 4; + } + + byte rv[] = new byte[additionalSize]; + int offset = 0; + + if (getEncrypted()) { + if (_encryptionKey == null) throw new DataFormatException("Encryption key is not set"); + System.arraycopy(_encryptionKey.getData(), 0, rv, offset, SessionKey.KEYSIZE_BYTES); + offset += SessionKey.KEYSIZE_BYTES; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("IsEncrypted"); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Is NOT Encrypted"); + } + switch (getDeliveryMode()) { + case FLAG_MODE_LOCAL: + if (_log.shouldLog(Log.DEBUG)) + _log.debug("mode = local"); + break; + case FLAG_MODE_DESTINATION: + if (_destinationHash == null) throw new DataFormatException("Destination hash is not set"); + System.arraycopy(_destinationHash.getData(), 0, rv, offset, Hash.HASH_LENGTH); + offset += Hash.HASH_LENGTH; if (_log.shouldLog(Log.DEBUG)) - _log.debug("Is NOT Encrypted"); - } - switch (getDeliveryMode()) { - case FLAG_MODE_LOCAL: - if (_log.shouldLog(Log.DEBUG)) - _log.debug("mode = local"); - break; - case FLAG_MODE_DESTINATION: - if (_destinationHash == null) throw new DataFormatException("Destination hash is not set"); - _destinationHash.writeBytes(baos); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("mode = destination, hash = " + _destinationHash); - break; - case FLAG_MODE_ROUTER: - if (_routerHash == null) throw new DataFormatException("Router hash is not set"); - _routerHash.writeBytes(baos); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("mode = router, routerHash = " + _routerHash); - break; - case FLAG_MODE_TUNNEL: - if ( (_routerHash == null) || (_tunnelId == null) ) throw new DataFormatException("Router hash or tunnel ID is not set"); - _routerHash.writeBytes(baos); - _tunnelId.writeBytes(baos); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("mode = tunnel, tunnelId = " + _tunnelId.getTunnelId() - + ", routerHash = " + _routerHash); - break; - } - if (getDelayRequested()) { + _log.debug("mode = destination, hash = " + _destinationHash); + break; + case FLAG_MODE_ROUTER: + if (_routerHash == null) throw new DataFormatException("Router hash is not set"); + System.arraycopy(_routerHash.getData(), 0, rv, offset, Hash.HASH_LENGTH); + offset += Hash.HASH_LENGTH; if (_log.shouldLog(Log.DEBUG)) - _log.debug("delay requested: " + getDelaySeconds()); - DataHelper.writeLong(baos, 4, getDelaySeconds()); - } else { + _log.debug("mode = router, routerHash = " + _routerHash); + break; + case FLAG_MODE_TUNNEL: + if ( (_routerHash == null) || (_tunnelId == null) ) throw new DataFormatException("Router hash or tunnel ID is not set"); + System.arraycopy(_routerHash.getData(), 0, rv, offset, Hash.HASH_LENGTH); + offset += Hash.HASH_LENGTH; + DataHelper.toLong(rv, offset, 4, _tunnelId.getTunnelId()); + offset += 4; if (_log.shouldLog(Log.DEBUG)) - _log.debug("delay NOT requested"); - } - } catch (IOException ioe) { - throw new DataFormatException("Unable to write out additional info", ioe); + _log.debug("mode = tunnel, tunnelId = " + _tunnelId.getTunnelId() + + ", routerHash = " + _routerHash); + break; + } + if (getDelayRequested()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("delay requested: " + getDelaySeconds()); + DataHelper.toLong(rv, offset, 4, getDelaySeconds()); + offset += 4; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("delay NOT requested"); } - return baos.toByteArray(); + return rv; } public void writeBytes(OutputStream out) throws DataFormatException, IOException { diff --git a/router/java/src/net/i2p/data/i2np/GarlicClove.java b/router/java/src/net/i2p/data/i2np/GarlicClove.java index c8f7679aa311b09112a9f1f4421f47a4dcde189f..3e10ba45f94607ccda2328d7b2d911aef7a31562 100644 --- a/router/java/src/net/i2p/data/i2np/GarlicClove.java +++ b/router/java/src/net/i2p/data/i2np/GarlicClove.java @@ -107,19 +107,29 @@ public class GarlicClove extends DataStructureImpl { public void writeBytes(OutputStream out) throws DataFormatException, IOException { - StringBuffer error = new StringBuffer(); - if (_instructions == null) + StringBuffer error = null; + if (_instructions == null) { + if (error == null) error = new StringBuffer(); error.append("No instructions "); - if (_msg == null) + } + if (_msg == null) { + if (error == null) error = new StringBuffer(); error.append("No message "); - if (_cloveId < 0) + } + if (_cloveId < 0) { + if (error == null) error = new StringBuffer(); error.append("CloveID < 0 [").append(_cloveId).append("] "); - if (_expiration == null) + } + if (_expiration == null) { + if (error == null) error = new StringBuffer(); error.append("Expiration is null "); - if (_certificate == null) + } + if (_certificate == null) { + if (error == null) error = new StringBuffer(); error.append("Certificate is null "); + } - if (error.length() > 0) + if ( (error != null) && (error.length() > 0) ) throw new DataFormatException(error.toString()); _instructions.writeBytes(out); diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 3ae5b7df08592bd7d9f0a974edc79a83756c587f..01616a7f6883d63d9e34bf41033d50fe911457f6 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.206 $ $Date: 2005/07/20 14:24:47 $"; + public final static String ID = "$Revision: 1.207 $ $Date: 2005/07/21 17:37:15 $"; public final static String VERSION = "0.5.0.7"; - public final static long BUILD = 18; + public final static long BUILD = 19; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java index cb1a17f7aa39b77bfac8cd993ea4577c1265c080..42a6fcb880672aae1e36776d37a7d0f3514f9d17 100644 --- a/router/java/src/net/i2p/router/client/ClientConnectionRunner.java +++ b/router/java/src/net/i2p/router/client/ClientConnectionRunner.java @@ -292,8 +292,8 @@ public class ClientConnectionRunner { _log.debug("Acking message send [accepted]" + id + " / " + nonce + " for sessionId " + _sessionId, new Exception("sendAccepted")); MessageStatusMessage status = new MessageStatusMessage(); - status.setMessageId(id); - status.setSessionId(_sessionId); + status.setMessageId(id.getMessageId()); + status.setSessionId(_sessionId.getSessionId()); status.setSize(0L); status.setNonce(nonce); status.setStatus(MessageStatusMessage.STATUS_SEND_ACCEPTED); @@ -491,8 +491,8 @@ public class ClientConnectionRunner { if (_dead) return; MessageStatusMessage msg = new MessageStatusMessage(); - msg.setMessageId(_messageId); - msg.setSessionId(_sessionId); + msg.setMessageId(_messageId.getMessageId()); + msg.setSessionId(_sessionId.getSessionId()); msg.setNonce(2); msg.setSize(0); if (_success) diff --git a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java index a0c8c10a92ac95d48a9b24a3ff7fbbe9da6446a2..d75e27fb481dce8b64faec858b2ae0ce87c7f5b3 100644 --- a/router/java/src/net/i2p/router/client/ClientMessageEventListener.java +++ b/router/java/src/net/i2p/router/client/ClientMessageEventListener.java @@ -179,8 +179,8 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi _log.debug("Handling recieve begin: id = " + message.getMessageId()); MessagePayloadMessage msg = new MessagePayloadMessage(); msg.setMessageId(message.getMessageId()); - msg.setSessionId(_runner.getSessionId()); - Payload payload = _runner.getPayload(message.getMessageId()); + msg.setSessionId(_runner.getSessionId().getSessionId()); + Payload payload = _runner.getPayload(new MessageId(message.getMessageId())); if (payload == null) { if (_log.shouldLog(Log.ERROR)) _log.error("Payload for message id [" + message.getMessageId() @@ -202,7 +202,7 @@ class ClientMessageEventListener implements I2CPMessageReader.I2CPMessageEventLi * */ private void handleReceiveEnd(I2CPMessageReader reader, ReceiveMessageEndMessage message) { - _runner.removePayload(message.getMessageId()); + _runner.removePayload(new MessageId(message.getMessageId())); } private void handleDestroySession(I2CPMessageReader reader, DestroySessionMessage message) { diff --git a/router/java/src/net/i2p/router/client/ClientWriterRunner.java b/router/java/src/net/i2p/router/client/ClientWriterRunner.java index dca6ac58d2f398f9a5ddb56244f9a60331f82d25..bf13648772fba67d4d305e39bdac1b2aafbd9f3f 100644 --- a/router/java/src/net/i2p/router/client/ClientWriterRunner.java +++ b/router/java/src/net/i2p/router/client/ClientWriterRunner.java @@ -61,25 +61,27 @@ class ClientWriterRunner implements Runnable { } } public void run() { + List messages = new ArrayList(64); + List messageTimes = new ArrayList(64); + List switchList = null; + while (!_runner.getIsDead()) { - List messages = null; - List messageTimes = null; - synchronized (_dataLock) { if (_messagesToWrite.size() <= 0) try { _dataLock.wait(); } catch (InterruptedException ie) {} if (_messagesToWrite.size() > 0) { - messages = new ArrayList(_messagesToWrite.size()); - messageTimes = new ArrayList(_messagesToWriteTimes.size()); - messages.addAll(_messagesToWrite); - messageTimes.addAll(_messagesToWriteTimes); - _messagesToWrite.clear(); - _messagesToWriteTimes.clear(); + switchList = _messagesToWrite; + _messagesToWrite = messages; + messages = switchList; + + switchList = _messagesToWriteTimes; + _messagesToWriteTimes = messageTimes; + messageTimes = switchList; } } - if (messages != null) { + if (messages.size() > 0) { for (int i = 0; i < messages.size(); i++) { I2CPMessage msg = (I2CPMessage)messages.get(i); Long when = (Long)messageTimes.get(i); @@ -92,7 +94,9 @@ class ClientWriterRunner implements Runnable { + (_context.clock().now()-when.longValue()) + " for " + msg.getClass().getName()); } - } + } + messages.clear(); + messageTimes.clear(); } } } diff --git a/router/java/src/net/i2p/router/client/MessageReceivedJob.java b/router/java/src/net/i2p/router/client/MessageReceivedJob.java index 54750438ac49bbe59e22481add07883309e785fb..d0c767828f296fd312d7668c66a16e762114390a 100644 --- a/router/java/src/net/i2p/router/client/MessageReceivedJob.java +++ b/router/java/src/net/i2p/router/client/MessageReceivedJob.java @@ -56,8 +56,8 @@ class MessageReceivedJob extends JobImpl { _log.debug("Sending message available: " + id + " to sessionId " + _runner.getSessionId() + " (with nonce=1)", new Exception("available")); MessageStatusMessage msg = new MessageStatusMessage(); - msg.setMessageId(id); - msg.setSessionId(_runner.getSessionId()); + msg.setMessageId(id.getMessageId()); + msg.setSessionId(_runner.getSessionId().getSessionId()); msg.setSize(size); msg.setNonce(1); msg.setStatus(MessageStatusMessage.STATUS_AVAILABLE); diff --git a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java index 7de9ec65966e51f5ce5b56b23c5420de62ef7741..505cd5de206fc996bef62c3db9911ba12189a268 100644 --- a/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java +++ b/router/java/src/net/i2p/router/networkdb/HandleDatabaseLookupMessageJob.java @@ -49,6 +49,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { _log = getContext().logManager().getLog(HandleDatabaseLookupMessageJob.class); getContext().statManager().createRateStat("netDb.lookupsHandled", "How many netDb lookups have we handled?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.lookupsMatched", "How many netDb lookups did we have the data for?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); + getContext().statManager().createRateStat("netDb.lookupsMatchedLeaseSet", "How many netDb leaseSet lookups did we have the data for?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.lookupsMatchedReceivedPublished", "How many netDb lookups did we have the data for that were published to us?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.lookupsMatchedLocalClosest", "How many netDb lookups for local data were received where we are the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); getContext().statManager().createRateStat("netDb.lookupsMatchedLocalNotClosest", "How many netDb lookups for local data were received where we are NOT the closest peers?", "NetworkDatabase", new long[] { 5*60*1000l, 60*60*1000l, 24*60*60*1000l }); @@ -130,6 +131,7 @@ public class HandleDatabaseLookupMessageJob extends JobImpl { if (data instanceof LeaseSet) { msg.setLeaseSet((LeaseSet)data); msg.setValueType(DatabaseStoreMessage.KEY_TYPE_LEASESET); + getContext().statManager().addRateData("netDb.lookupsMatchedLeaseSet", 1, 0); } else if (data instanceof RouterInfo) { msg.setRouterInfo((RouterInfo)data); msg.setValueType(DatabaseStoreMessage.KEY_TYPE_ROUTERINFO); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index c119bd2d003c37e4eda0b819cf7166ab07eaee56..e3d424ee2935d567a6959d99ba1f4748c6850fe2 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -100,6 +100,8 @@ public class ACKSender implements Runnable { _context.statManager().addRateData("udp.sendACKCount", ackBitfields.size(), 0); _context.statManager().addRateData("udp.sendACKRemaining", remaining, 0); now = _context.clock().now(); + if (lastSend < 0) + lastSend = now - 1; _context.statManager().addRateData("udp.ackFrequency", now-lastSend, now-wanted); //_context.statManager().getStatLog().addData(peer.getRemoteHostId().toString(), "udp.peer.sendACKCount", ackBitfields.size(), 0); UDPPacket ack = _builder.buildACK(peer, ackBitfields); diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index 92aa974d6aa911d358d30511ec39a656d94993e2..278762ee2998340f350311600ada0e0d0ee8c391 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -41,7 +41,11 @@ public class PacketHandler { _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 }); - _context.statManager().createRateStat("udp.droppedInvalid", "How old the packet we dropped due to invalidity was", "udp", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.droppedInvalidUnkown", "How old the packet we dropped due to invalidity (unkown type) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.droppedInvalidReestablish", "How old the packet we dropped due to invalidity (doesn't use existing key, not an establishment) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.droppedInvalidEstablish", "How old the packet we dropped due to invalidity (establishment, bad key) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.droppedInvalidInboundEstablish", "How old the packet we dropped due to invalidity (inbound establishment, bad key) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); + _context.statManager().createRateStat("udp.droppedInvalidSkew", "How skewed the packet we dropped due to invalidity (valid except bad skew) was", "udp", new long[] { 10*60*1000, 60*60*1000 }); } public void startup() { @@ -158,7 +162,7 @@ public class PacketHandler { } else { if (_log.shouldLog(Log.WARN)) _log.warn("Validation with existing con failed, and validation as reestablish failed too. DROP"); - _context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration()); + _context.statManager().addRateData("udp.droppedInvalidReestablish", packet.getLifetime(), packet.getExpiration()); } return; } @@ -177,7 +181,7 @@ public class PacketHandler { if (!isValid) { if (_log.shouldLog(Log.WARN)) _log.warn("Invalid introduction packet received: " + packet, new Exception("path")); - _context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration()); + _context.statManager().addRateData("udp.droppedInvalidEstablish", packet.getLifetime(), packet.getExpiration()); return; } else { if (_log.shouldLog(Log.INFO)) @@ -224,7 +228,7 @@ public class PacketHandler { // on earlier state packets receivePacket(reader, packet); } else { - _context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration()); + _context.statManager().addRateData("udp.droppedInvalidInboundEstablish", packet.getLifetime(), packet.getExpiration()); } } @@ -283,12 +287,12 @@ public class PacketHandler { if (skew > GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) _log.warn("Packet too far in the future: " + new Date(sendOn/1000) + ": " + packet); - _context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration()); + _context.statManager().addRateData("udp.droppedInvalidSkew", skew, packet.getExpiration()); return; } else if (skew < 0 - GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) _log.warn("Packet too far in the past: " + new Date(sendOn/1000) + ": " + packet); - _context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration()); + _context.statManager().addRateData("udp.droppedInvalidSkew", 0-skew, packet.getExpiration()); return; } @@ -324,7 +328,7 @@ public class PacketHandler { default: if (_log.shouldLog(Log.WARN)) _log.warn("Unknown payload type: " + reader.readPayloadType()); - _context.statManager().addRateData("udp.droppedInvalid", packet.getLifetime(), packet.getExpiration()); + _context.statManager().addRateData("udp.droppedInvalidUnknown", packet.getLifetime(), packet.getExpiration()); return; } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPSender.java b/router/java/src/net/i2p/router/transport/udp/UDPSender.java index e73ce471578a17c9db6145be246a894026c100bc..debc1df0f7f9dbff9591410c46710ef7a5e0a657 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPSender.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPSender.java @@ -168,9 +168,11 @@ public class UDPSender { } long sendTime = _context.clock().now() - before; _context.statManager().addRateData("udp.socketSendTime", sendTime, packet.getLifetime()); - _context.statManager().addRateData("udp.sendBWThrottleTime", afterBW - acquireTime, acquireTime - packet.getBegin()); + long throttleTime = afterBW - acquireTime; + if (throttleTime > 10) + _context.statManager().addRateData("udp.sendBWThrottleTime", throttleTime, acquireTime - packet.getBegin()); if (packet.getMarkedType() == 1) - _context.statManager().addRateData("udp.sendACKTime", afterBW - acquireTime, packet.getLifetime()); + _context.statManager().addRateData("udp.sendACKTime", throttleTime, packet.getLifetime()); _context.statManager().addRateData("udp.pushTime", packet.getLifetime(), packet.getLifetime()); _context.statManager().addRateData("udp.sendPacketSize", size, packet.getLifetime()); } catch (IOException ioe) { diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index 9482445f998c65181fec81d69600e7269b0d4d82..63c8200139e2d6f4d7ad8f85de0349d8654b6ca3 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -128,6 +128,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _expireEvent = new ExpirePeerEvent(); _context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); + _context.statManager().createRateStat("udp.droppedPeerInactive", "How long ago did we receive from a dropped peer (duration == session lifetime)", "udp", new long[] { 60*60*1000, 24*60*60*1000 }); } public void startup() { @@ -315,6 +316,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority boolean addRemotePeerState(PeerState peer) { if (_log.shouldLog(Log.INFO)) _log.info("Add remote peer state: " + peer); + long oldEstablishedOn = -1; PeerState oldPeer = null; if (peer.getRemotePeer() != null) { synchronized (_peersByIdent) { @@ -323,6 +325,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority // should we transfer the oldPeer's RTT/RTO/etc? nah // or perhaps reject the new session? nah, // using the new one allow easier reconnect + oldEstablishedOn = oldPeer.getKeyEstablishedTime(); } } } @@ -339,6 +342,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if ( (oldPeer != null) && (oldPeer != peer) ) { //_peersByRemoteHost.put(remoteString, oldPeer); //return false; + oldEstablishedOn = oldPeer.getKeyEstablishedTime(); } } @@ -353,6 +357,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _expireEvent.add(peer); + if (oldEstablishedOn > 0) + _context.statManager().addRateData("udp.alreadyConnected", oldEstablishedOn, 0); return true; } @@ -367,6 +373,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority long now = _context.clock().now(); _context.statManager().addRateData("udp.droppedPeer", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); _context.shitlist().shitlistRouter(peer.getRemotePeer(), "dropped after too many retries"); + } else { + long now = _context.clock().now(); + _context.statManager().addRateData("udp.droppedPeerInactive", now - peer.getLastReceiveTime(), now - peer.getKeyEstablishedTime()); } synchronized (_peersByIdent) { _peersByIdent.remove(peer.getRemotePeer());