diff --git a/router/java/src/net/i2p/router/OutNetMessage.java b/router/java/src/net/i2p/router/OutNetMessage.java index e2c20c0db02b00634ab5a63b15d5a49156280940..c3cb4c4dba82071be658bf0c3f2d7a4bec0cd8d2 100644 --- a/router/java/src/net/i2p/router/OutNetMessage.java +++ b/router/java/src/net/i2p/router/OutNetMessage.java @@ -31,14 +31,14 @@ import net.i2p.util.Log; public class OutNetMessage implements CDPQEntry { private final Log _log; private final RouterContext _context; - private RouterInfo _target; - private I2NPMessage _message; - private int _messageTypeId; + private final RouterInfo _target; + private final I2NPMessage _message; + private final int _messageTypeId; /** cached message ID, for use after we discard the message */ - private long _messageId; - private long _messageSize; - private int _priority; - private long _expiration; + private final long _messageId; + private final long _messageSize; + private final int _priority; + private final long _expiration; private Job _onSend; private Job _onFailedSend; private ReplyJob _onReply; @@ -81,11 +81,37 @@ public class OutNetMessage implements CDPQEntry { public static final int PRIORITY_NETDB_HARVEST = 100; public static final int PRIORITY_LOWEST = 100; - public OutNetMessage(RouterContext context) { + /** + * Null msg and target (used in OutboundMessageRegistry only) + * @since 0.9.9 + */ + public OutNetMessage(RouterContext context, long expiration) { + this(context, null, expiration, -1, null); + } + + /** + * Standard constructor + * @param msg generally non-null + * @param target generally non-null + * @since 0.9.9 + */ + public OutNetMessage(RouterContext context, I2NPMessage msg, long expiration, int priority, RouterInfo target) { _context = context; _log = context.logManager().getLog(OutNetMessage.class); - _priority = -1; - _expiration = -1; + _message = msg; + if (msg != null) { + _messageTypeId = msg.getType(); + _messageId = msg.getUniqueId(); + _messageSize = _message.getMessageSize(); + } else { + _messageTypeId = 0; + _messageId = 0; + _messageSize = 0; + } + _priority = priority; + _expiration = expiration; + _target = target; + //_createdBy = new Exception("Created by"); _created = context.clock().now(); if (_log.shouldLog(Log.INFO)) @@ -160,7 +186,6 @@ public class OutNetMessage implements CDPQEntry { * */ public RouterInfo getTarget() { return _target; } - public void setTarget(RouterInfo target) { _target = target; } /** * Specifies the message to be sent @@ -168,15 +193,6 @@ public class OutNetMessage implements CDPQEntry { */ public I2NPMessage getMessage() { return _message; } - public void setMessage(I2NPMessage msg) { - _message = msg; - if (msg != null) { - _messageTypeId = msg.getType(); - _messageId = msg.getUniqueId(); - _messageSize = _message.getMessageSize(); - } - } - /** * For debugging only. * @return the simple class name @@ -190,18 +206,19 @@ public class OutNetMessage implements CDPQEntry { public long getMessageId() { return _messageId; } public long getMessageSize() { - if (_messageSize <= 0) { - _messageSize = _message.getMessageSize(); - } return _messageSize; } + /** + * Copies the message data to outbuffer. + * Used only by VM Comm System. + * @return the length, or -1 if message is null + */ public int getMessageData(byte outBuffer[]) { if (_message == null) { return -1; } else { int len = _message.toByteArray(outBuffer); - _messageSize = len; return len; } } @@ -213,7 +230,7 @@ public class OutNetMessage implements CDPQEntry { * */ public int getPriority() { return _priority; } - public void setPriority(int priority) { _priority = priority; } + /** * Specify the # ms since the epoch after which if the message has not been * sent the OnFailedSend job should be fired and the message should be @@ -222,7 +239,7 @@ public class OutNetMessage implements CDPQEntry { * */ public long getExpiration() { return _expiration; } - public void setExpiration(long expiration) { _expiration = expiration; } + /** * After the message is successfully passed to the router specified, the * given job is enqueued. @@ -230,6 +247,7 @@ public class OutNetMessage implements CDPQEntry { */ public Job getOnSendJob() { return _onSend; } public void setOnSendJob(Job job) { _onSend = job; } + /** * If the router could not be reached or the expiration passed, this job * is enqueued. @@ -237,18 +255,21 @@ public class OutNetMessage implements CDPQEntry { */ public Job getOnFailedSendJob() { return _onFailedSend; } public void setOnFailedSendJob(Job job) { _onFailedSend = job; } + /** * If the MessageSelector detects a reply, this job is enqueued * */ public ReplyJob getOnReplyJob() { return _onReply; } public void setOnReplyJob(ReplyJob job) { _onReply = job; } + /** * If the Message selector is specified but it doesn't find a reply before * its expiration passes, this job is enqueued. */ public Job getOnFailedReplyJob() { return _onFailedReply; } public void setOnFailedReplyJob(Job job) { _onFailedReply = job; } + /** * Defines a MessageSelector to find a reply to this message. * @@ -256,13 +277,13 @@ public class OutNetMessage implements CDPQEntry { public MessageSelector getReplySelector() { return _replySelector; } public void setReplySelector(MessageSelector selector) { _replySelector = selector; } - public void transportFailed(String transportStyle) { + public synchronized void transportFailed(String transportStyle) { if (_failedTransports == null) _failedTransports = new HashSet(2); _failedTransports.add(transportStyle); } - /** not thread safe - dont fail transports and iterate over this at the same time */ - public Set getFailedTransports() { + + public synchronized Set getFailedTransports() { return (_failedTransports == null ? Collections.EMPTY_SET : _failedTransports); } @@ -345,38 +366,8 @@ public class OutNetMessage implements CDPQEntry { * we may keep the object around for a while to use its ID, jobs, etc. */ public void discardData() { - if ( (_message != null) && (_messageSize <= 0) ) - _messageSize = _message.getMessageSize(); - //if (_log.shouldLog(Log.DEBUG)) { - // long timeToDiscard = _context.clock().now() - _created; - // _log.debug("Discard " + _messageSize + "byte " + getMessageType() + " message after " - // + timeToDiscard); - //} - _message = null; - //_context.statManager().addRateData("outNetMessage.timeToDiscard", timeToDiscard, timeToDiscard); - //_context.messageStateMonitor().outboundMessageDiscarded(); } - /* - public void finalize() throws Throwable { - if (_message != null) { - if (_log.shouldLog(Log.WARN)) { - StringBuilder buf = new StringBuilder(1024); - buf.append("Undiscarded ").append(_messageSize).append("byte "); - buf.append(_messageType).append(" message created "); - buf.append((_context.clock().now() - _created)).append("ms ago: "); - buf.append(_messageId); // .append(" to ").append(_target.calculateHash().toBase64()); - buf.append(", timing - \n"); - renderTimestamps(buf); - _log.warn(buf.toString(), _createdBy); - } - _context.messageStateMonitor().outboundMessageDiscarded(); - } - _context.messageStateMonitor().outboundMessageFinalized(); - super.finalize(); - } - */ - @Override public String toString() { StringBuilder buf = new StringBuilder(256); @@ -445,22 +436,4 @@ public class OutNetMessage implements CDPQEntry { return _fmt.format(d); } } - -/**** - - @Override - public int hashCode() { - int rv = DataHelper.hashCode(_message); - rv ^= DataHelper.hashCode(_target); - // the others are pretty much inconsequential - return rv; - } - - @Override - public boolean equals(Object obj) { - //if(obj == null) return false; - //if(!(obj instanceof OutNetMessage)) return false; - return obj == this; // two OutNetMessages are different even if they contain the same message - } -****/ } diff --git a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java index 8880a778547efbeda2fac5be2c59468596b94306..6d0cfdbcf2aebd8c4681f3b8a39edfb75f8bf271 100644 --- a/router/java/src/net/i2p/router/message/SendMessageDirectJob.java +++ b/router/java/src/net/i2p/router/message/SendMessageDirectJob.java @@ -129,16 +129,12 @@ public class SendMessageDirectJob extends JobImpl { Hash us = getContext().routerHash(); if (us.equals(to)) { if (_selector != null) { - OutNetMessage outM = new OutNetMessage(getContext()); - outM.setExpiration(_expiration); - outM.setMessage(_message); + OutNetMessage outM = new OutNetMessage(getContext(), _message, _expiration, _priority, _router); outM.setOnFailedReplyJob(_onFail); outM.setOnFailedSendJob(_onFail); outM.setOnReplyJob(_onSuccess); outM.setOnSendJob(_onSend); - outM.setPriority(_priority); outM.setReplySelector(_selector); - outM.setTarget(_router); getContext().messageRegistry().registerPending(outM); } @@ -152,16 +148,12 @@ public class SendMessageDirectJob extends JobImpl { + " to inbound message pool as it was destined for ourselves"); //_log.debug("debug", _createdBy); } else { - OutNetMessage msg = new OutNetMessage(getContext()); - msg.setExpiration(_expiration); - msg.setMessage(_message); + OutNetMessage msg = new OutNetMessage(getContext(), _message, _expiration, _priority, _router); msg.setOnFailedReplyJob(_onFail); msg.setOnFailedSendJob(_onFail); msg.setOnReplyJob(_onSuccess); msg.setOnSendJob(_onSend); - msg.setPriority(_priority); msg.setReplySelector(_selector); - msg.setTarget(_router); getContext().outNetMessagePool().add(msg); if (_log.shouldLog(Log.DEBUG)) _log.debug("Adding " + _message.getClass().getName() diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java index e124e19d51f0f80caa5fda1673196c9e16044b84..641796e6fec11f4bb058d9aebdfacb3ab070421f 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/FloodfillNetworkDatabaseFacade.java @@ -214,12 +214,7 @@ public class FloodfillNetworkDatabaseFacade extends KademliaNetworkDatabaseFacad msg.setReplyGateway(null); msg.setReplyToken(0); msg.setReplyTunnel(null); - OutNetMessage m = new OutNetMessage(_context); - m.setMessage(msg); - m.setOnFailedReplyJob(null); - m.setPriority(FLOOD_PRIORITY); - m.setTarget(target); - m.setExpiration(_context.clock().now()+FLOOD_TIMEOUT); + OutNetMessage m = new OutNetMessage(_context, msg, _context.clock().now()+FLOOD_TIMEOUT, FLOOD_PRIORITY, target); // note send failure but don't give credit on success // might need to change this Job floodFail = new FloodFailedJob(_context, peer); diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java index 3c47769f4e7f3508bdbc0d1700d067e11d113826..a91137edaf07eb2a204dc8e6befaefbeb59a48ff 100644 --- a/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java +++ b/router/java/src/net/i2p/router/networkdb/kademlia/StoreJob.java @@ -315,15 +315,11 @@ class StoreJob extends JobImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("sending store directly to " + peer.getIdentity().getHash()); - OutNetMessage m = new OutNetMessage(getContext()); - m.setExpiration(expiration); - m.setMessage(msg); + OutNetMessage m = new OutNetMessage(getContext(), msg, expiration, STORE_PRIORITY, peer); m.setOnFailedReplyJob(onFail); m.setOnFailedSendJob(onFail); m.setOnReplyJob(onReply); - m.setPriority(STORE_PRIORITY); m.setReplySelector(selector); - m.setTarget(peer); getContext().messageRegistry().registerPending(m); getContext().commSystem().processMessage(m); } diff --git a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java index 2b79babe321510535281ea5005ede51dba0ac4bf..5cf11573d6b2e043cf2f276aa3f8c15028b4a7fd 100644 --- a/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java +++ b/router/java/src/net/i2p/router/transport/OutboundMessageRegistry.java @@ -171,8 +171,7 @@ public class OutboundMessageRegistry { * @return an ONM where getMessage() is null. Use it to call unregisterPending() later if desired. */ public OutNetMessage registerPending(MessageSelector replySelector, ReplyJob onReply, Job onTimeout, int timeoutMs) { - OutNetMessage msg = new OutNetMessage(_context); - msg.setExpiration(_context.clock().now() + timeoutMs); + OutNetMessage msg = new OutNetMessage(_context, _context.clock().now() + timeoutMs); msg.setOnFailedReplyJob(onTimeout); msg.setOnFailedSendJob(onTimeout); msg.setOnReplyJob(onReply); diff --git a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java index 6b52b65c82a2af8b65d6b7cf981b9ab29a59bb12..7364fcbcc6395df5b2c773b60915e0f90e711d26 100644 --- a/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java +++ b/router/java/src/net/i2p/router/transport/ntcp/NTCPConnection.java @@ -489,15 +489,11 @@ class NTCPConnection { } public void enqueueInfoMessage() { - OutNetMessage infoMsg = new OutNetMessage(_context); - infoMsg.setExpiration(_context.clock().now()+10*1000); - DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); - dsm.setEntry(_context.router().getRouterInfo()); - infoMsg.setMessage(dsm); - infoMsg.setPriority(PRIORITY); RouterInfo target = _context.netDb().lookupRouterInfoLocally(_remotePeer.calculateHash()); if (target != null) { - infoMsg.setTarget(target); + DatabaseStoreMessage dsm = new DatabaseStoreMessage(_context); + dsm.setEntry(_context.router().getRouterInfo()); + OutNetMessage infoMsg = new OutNetMessage(_context, dsm, _context.clock().now()+10*1000, PRIORITY, target); infoMsg.beginSend(); _context.statManager().addRateData("ntcp.infoMessageEnqueued", 1); send(infoMsg); diff --git a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java index a2c055adefe0fd43e5c7fd6b0d139af9a3a0b92c..f46b1c235722152ff8041e367aa57cfa9b200299 100644 --- a/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/InboundGatewayReceiver.java @@ -57,11 +57,7 @@ class InboundGatewayReceiver implements TunnelGateway.Receiver { msg.setData(encrypted); msg.setTunnelId(_config.getSendTunnel()); - OutNetMessage out = new OutNetMessage(_context); - out.setMessage(msg); - out.setTarget(_target); - out.setExpiration(msg.getMessageExpiration()); - out.setPriority(PRIORITY); + OutNetMessage out = new OutNetMessage(_context, msg, msg.getMessageExpiration(), PRIORITY, _target); _context.outNetMessagePool().add(out); return msg.getUniqueId(); } diff --git a/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java b/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java index a54b62684925a09260d8633ef847baa53250629e..c2a61ffd45df6b25081b0e01f5db6243dd95b843 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundMessageDistributor.java @@ -66,11 +66,7 @@ class OutboundMessageDistributor { _context.inNetMessagePool().add(m, null, null); return; } else { - OutNetMessage out = new OutNetMessage(_context); - out.setExpiration(_context.clock().now() + MAX_DISTRIBUTE_TIME); - out.setTarget(target); - out.setMessage(m); - out.setPriority(_priority); + OutNetMessage out = new OutNetMessage(_context, m, _context.clock().now() + MAX_DISTRIBUTE_TIME, _priority, target); if (_log.shouldLog(Log.DEBUG)) _log.debug("queueing outbound message to " + target.getIdentity().calculateHash()); diff --git a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java index db40ff00ad6d393d2b9132625d5b575e677f5c77..76ffa5d3dc5365702f535c61852c755a7ef3c057 100644 --- a/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java +++ b/router/java/src/net/i2p/router/tunnel/OutboundReceiver.java @@ -71,11 +71,7 @@ class OutboundReceiver implements TunnelGateway.Receiver { private void send(TunnelDataMessage msg, RouterInfo ri) { if (_log.shouldLog(Log.DEBUG)) _log.debug("forwarding encrypted data out " + _config + ": " + msg.getUniqueId()); - OutNetMessage m = new OutNetMessage(_context); - m.setMessage(msg); - m.setExpiration(msg.getMessageExpiration()); - m.setTarget(ri); - m.setPriority(_priority); + OutNetMessage m = new OutNetMessage(_context, msg, msg.getMessageExpiration(), _priority, ri); _context.outNetMessagePool().add(m); _config.incrementProcessedMessages(); } diff --git a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java index 758a991ffc8043b3bd1d524372a9c01452f3b654..fea7cca737cd46b94c0842cd09b589cf699e1d3a 100644 --- a/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java +++ b/router/java/src/net/i2p/router/tunnel/TunnelParticipant.java @@ -192,12 +192,8 @@ class TunnelParticipant { _context.messageHistory().wrap("TunnelDataMessage", oldId, "TunnelDataMessage", newId); msg.setUniqueId(newId); msg.setMessageExpiration(_context.clock().now() + 10*1000); - OutNetMessage m = new OutNetMessage(_context); msg.setTunnelId(config.getSendTunnel()); - m.setMessage(msg); - m.setExpiration(msg.getMessageExpiration()); - m.setTarget(ri); - m.setPriority(PRIORITY); + OutNetMessage m = new OutNetMessage(_context, msg, msg.getMessageExpiration(), PRIORITY, ri); if (_log.shouldLog(Log.DEBUG)) _log.debug("Forward on from " + _config + ": " + msg); _context.outNetMessagePool().add(m); diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java index a2f936965b46238818f77e4e75c930d5821782a1..ebbe487d4ee52a90a20df9af446ac412dc6210ba 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildHandler.java @@ -781,11 +781,7 @@ class BuildHandler implements Runnable { if (!isOutEnd) { state.msg.setUniqueId(req.readReplyMessageId()); state.msg.setMessageExpiration(_context.clock().now() + NEXT_HOP_SEND_TIMEOUT); - OutNetMessage msg = new OutNetMessage(_context); - msg.setMessage(state.msg); - msg.setExpiration(state.msg.getMessageExpiration()); - msg.setPriority(PRIORITY); - msg.setTarget(nextPeerInfo); + OutNetMessage msg = new OutNetMessage(_context, state.msg, state.msg.getMessageExpiration(), PRIORITY, nextPeerInfo); if (response == 0) msg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg)); _context.outNetMessagePool().add(msg); @@ -814,11 +810,7 @@ class BuildHandler implements Runnable { _context.tunnelDispatcher().dispatch(m); } else { // ok, the gateway is some other peer, shove 'er across - OutNetMessage outMsg = new OutNetMessage(_context); - outMsg.setExpiration(m.getMessageExpiration()); - outMsg.setMessage(m); - outMsg.setPriority(PRIORITY); - outMsg.setTarget(nextPeerInfo); + OutNetMessage outMsg = new OutNetMessage(_context, m, m.getMessageExpiration(), PRIORITY, nextPeerInfo); if (response == 0) outMsg.setOnFailedSendJob(new TunnelBuildNextHopFailJob(_context, cfg)); _context.outNetMessagePool().add(outMsg); diff --git a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java index 1c71e7bb7143ac516df858b3b729afc273c0ebe5..32ad62b2450d0165ba5fd4f5acfc452f3841efc6 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java +++ b/router/java/src/net/i2p/router/tunnel/pool/BuildRequestor.java @@ -172,15 +172,11 @@ abstract class BuildRequestor { + " for " + cfg + " waiting for the reply of " + cfg.getReplyMessageId() + " with msgId=" + msg.getUniqueId()); // send it directly to the first hop - OutNetMessage outMsg = new OutNetMessage(ctx); // Add some fuzz to the TBM expiration to make it harder to guess how many hops // or placement in the tunnel msg.setMessageExpiration(ctx.clock().now() + BUILD_MSG_TIMEOUT + ctx.random().nextLong(20*1000)); // We set the OutNetMessage expiration much shorter, so that the // TunnelBuildFirstHopFailJob fires before the 13s build expiration. - outMsg.setExpiration(ctx.clock().now() + FIRST_HOP_TIMEOUT); - outMsg.setMessage(msg); - outMsg.setPriority(PRIORITY); RouterInfo peer = ctx.netDb().lookupRouterInfoLocally(cfg.getPeer(1)); if (peer == null) { if (log.shouldLog(Log.WARN)) @@ -188,7 +184,7 @@ abstract class BuildRequestor { exec.buildComplete(cfg, pool); return; } - outMsg.setTarget(peer); + OutNetMessage outMsg = new OutNetMessage(ctx, msg, ctx.clock().now() + FIRST_HOP_TIMEOUT, PRIORITY, peer); outMsg.setOnFailedSendJob(new TunnelBuildFirstHopFailJob(ctx, pool, cfg, exec)); ctx.outNetMessagePool().add(outMsg); }