From 443abce6479608056f7a7c335fea3bf42fcee558 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sun, 26 Dec 2010 20:36:44 +0000 Subject: [PATCH] * I2CP: - Move BWLimits and DestLookup message support from I2PSimpleSession to I2PSessionImpl - Include the Hash in the DestReplyMessage on a failed lookup so the client may correlate replies - Add support for parallel lookups and BWLimits requests - Add support for specifying the timeout for DestLookups (can only be smaller than the router timeout for now) - Extend dest lookup router timeout from 10s to 15s --- .../src/org/klomp/snark/I2PSnarkUtil.java | 31 ++++- .../i2p/client/DestReplyMessageHandler.java | 11 +- .../client/I2PClientMessageHandlerMap.java | 6 + core/java/src/net/i2p/client/I2PSession.java | 14 ++- .../src/net/i2p/client/I2PSessionImpl.java | 106 +++++++++++++++++- .../src/net/i2p/client/I2PSimpleSession.java | 57 ---------- .../src/net/i2p/client/naming/LookupDest.java | 7 ++ .../net/i2p/data/i2cp/DestReplyMessage.java | 55 +++++++-- 8 files changed, 210 insertions(+), 77 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java index 001928d479..b36c0fdcbf 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java +++ b/apps/i2psnark/java/src/org/klomp/snark/I2PSnarkUtil.java @@ -14,11 +14,13 @@ import java.util.StringTokenizer; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketEepGet; import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketManagerFactory; +import net.i2p.data.Base32; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; import net.i2p.data.Hash; @@ -316,21 +318,44 @@ public class I2PSnarkUtil { } } + private static final int BASE32_HASH_LENGTH = 52; // 1 + Hash.HASH_LENGTH * 8 / 5 + /** Base64 Hash or Hash.i2p or name.i2p using naming service */ Destination getDestination(String ip) { if (ip == null) return null; if (ip.endsWith(".i2p")) { if (ip.length() < 520) { // key + ".i2p" - Destination dest = _context.namingService().lookup(ip); - if (dest != null) - return dest; + if (_manager != null && ip.length() == BASE32_HASH_LENGTH + 8 && ip.endsWith(".b32.i2p")) { + // Use existing I2PSession for b32 lookups if we have it + // This is much more efficient than using the naming service + I2PSession sess = _manager.getSession(); + if (sess != null) { + byte[] b = Base32.decode(ip.substring(0, BASE32_HASH_LENGTH)); + if (b != null) { + Hash h = new Hash(b); + if (_log.shouldLog(Log.INFO)) + _log.info("Using existing session for lookup of " + ip); + try { + return sess.lookupDest(h); + } catch (I2PSessionException ise) { + } + } + } + } + if (_log.shouldLog(Log.INFO)) + _log.info("Using naming service for lookup of " + ip); + return _context.namingService().lookup(ip); } + if (_log.shouldLog(Log.INFO)) + _log.info("Creating Destination for " + ip); try { return new Destination(ip.substring(0, ip.length()-4)); // sans .i2p } catch (DataFormatException dfe) { return null; } } else { + if (_log.shouldLog(Log.INFO)) + _log.info("Creating Destination for " + ip); try { return new Destination(ip); } catch (DataFormatException dfe) { diff --git a/core/java/src/net/i2p/client/DestReplyMessageHandler.java b/core/java/src/net/i2p/client/DestReplyMessageHandler.java index 573389cfb2..8d5527d59b 100644 --- a/core/java/src/net/i2p/client/DestReplyMessageHandler.java +++ b/core/java/src/net/i2p/client/DestReplyMessageHandler.java @@ -10,6 +10,9 @@ import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.DestReplyMessage; import net.i2p.util.Log; +import net.i2p.data.Destination; +import net.i2p.data.Hash; + /** * Handle I2CP dest replies from the router */ @@ -22,6 +25,12 @@ class DestReplyMessageHandler extends HandlerImpl { if (_log.shouldLog(Log.DEBUG)) _log.debug("Handle message " + message); DestReplyMessage msg = (DestReplyMessage) message; - ((I2PSimpleSession)session).destReceived(msg.getDestination()); + Destination d = msg.getDestination(); + if (d != null) + session.destReceived(d); + Hash h = msg.getHash(); + if (h != null) + session.destLookupFailed(h); + // else let it time out } } diff --git a/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java b/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java index 6f0d950514..76350cbb21 100644 --- a/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java +++ b/core/java/src/net/i2p/client/I2PClientMessageHandlerMap.java @@ -10,6 +10,8 @@ package net.i2p.client; */ import net.i2p.I2PAppContext; +import net.i2p.data.i2cp.BandwidthLimitsMessage; +import net.i2p.data.i2cp.DestReplyMessage; import net.i2p.data.i2cp.DisconnectMessage; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.data.i2cp.MessageStatusMessage; @@ -36,6 +38,8 @@ class I2PClientMessageHandlerMap { highest = Math.max(highest, MessagePayloadMessage.MESSAGE_TYPE); highest = Math.max(highest, MessageStatusMessage.MESSAGE_TYPE); highest = Math.max(highest, SetDateMessage.MESSAGE_TYPE); + highest = Math.max(highest, DestReplyMessage.MESSAGE_TYPE); + highest = Math.max(highest, BandwidthLimitsMessage.MESSAGE_TYPE); _handlers = new I2CPMessageHandler[highest+1]; _handlers[DisconnectMessage.MESSAGE_TYPE] = new DisconnectMessageHandler(context); @@ -44,6 +48,8 @@ class I2PClientMessageHandlerMap { _handlers[MessagePayloadMessage.MESSAGE_TYPE] = new MessagePayloadMessageHandler(context); _handlers[MessageStatusMessage.MESSAGE_TYPE] = new MessageStatusMessageHandler(context); _handlers[SetDateMessage.MESSAGE_TYPE] = new SetDateMessageHandler(context); + _handlers[DestReplyMessage.MESSAGE_TYPE] = new DestReplyMessageHandler(context); + _handlers[BandwidthLimitsMessage.MESSAGE_TYPE] = new BWLimitsMessageHandler(context); } public I2CPMessageHandler getHandler(int messageTypeId) { diff --git a/core/java/src/net/i2p/client/I2PSession.java b/core/java/src/net/i2p/client/I2PSession.java index 1998dad55a..cd20cfc2d6 100644 --- a/core/java/src/net/i2p/client/I2PSession.java +++ b/core/java/src/net/i2p/client/I2PSession.java @@ -138,13 +138,21 @@ public interface I2PSession { public SigningPrivateKey getPrivateKey(); /** - * Lookup up a Hash - * + * Lookup a Destination by Hash. + * Blocking. Waits a max of 10 seconds by default. */ public Destination lookupDest(Hash h) throws I2PSessionException; /** - * Get the current bandwidth limits + * Blocking. + * @param maxWait ms + * @since 0.8.3 + * @return null on failure + */ + public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException; + + /** + * Get the current bandwidth limits. Blocking. */ public int[] bandwidthLimits() throws I2PSessionException; diff --git a/core/java/src/net/i2p/client/I2PSessionImpl.java b/core/java/src/net/i2p/client/I2PSessionImpl.java index 8090e0eaed..e101ff7252 100644 --- a/core/java/src/net/i2p/client/I2PSessionImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionImpl.java @@ -15,7 +15,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.net.UnknownHostException; -import java.util.concurrent.ConcurrentHashMap; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; @@ -23,6 +22,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import net.i2p.I2PAppContext; import net.i2p.data.DataFormatException; @@ -33,6 +34,8 @@ import net.i2p.data.PrivateKey; import net.i2p.data.SessionKey; import net.i2p.data.SessionTag; import net.i2p.data.SigningPrivateKey; +import net.i2p.data.i2cp.DestLookupMessage; +import net.i2p.data.i2cp.GetBandwidthLimitsMessage; import net.i2p.data.i2cp.GetDateMessage; import net.i2p.data.i2cp.I2CPMessage; import net.i2p.data.i2cp.I2CPMessageException; @@ -95,6 +98,11 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa protected I2CPMessageProducer _producer; /** map of Long --> MessagePayloadMessage */ protected Map<Long, MessagePayloadMessage> _availableMessages; + + /** hashes of lookups we are waiting for */ + protected final LinkedBlockingQueue<LookupWaiter> _pendingLookups = new LinkedBlockingQueue(); + protected final Object _bwReceivedLock = new Object(); + protected int[] _bwLimits; protected I2PClientMessageHandlerMap _handlerMap; @@ -786,12 +794,104 @@ abstract class I2PSessionImpl implements I2PSession, I2CPMessageReader.I2CPMessa return buf.toString(); } + /** called by the message handler */ + void destReceived(Destination d) { + Hash h = d.calculateHash(); + for (LookupWaiter w : _pendingLookups) { + if (w.hash.equals(h)) { + w.destination = d; + synchronized (w) { + w.notifyAll(); + } + } + } + } + + /** called by the message handler */ + void destLookupFailed(Hash h) { + for (LookupWaiter w : _pendingLookups) { + if (w.hash.equals(h)) { + synchronized (w) { + w.notifyAll(); + } + } + } + } + + /** called by the message handler */ + void bwReceived(int[] i) { + _bwLimits = i; + synchronized (_bwReceivedLock) { + _bwReceivedLock.notifyAll(); + } + } + + /** + * Simple object to wait for lookup replies + * @since 0.8.3 + */ + private static class LookupWaiter { + /** the request */ + public final Hash hash; + /** the reply */ + public Destination destination; + + public LookupWaiter(Hash h) { + this.hash = h; + } + } + + /** + * Blocking. Waits a max of 10 seconds by default. + * See lookupDest with maxWait parameter to change. + * Implemented in 0.8.3 in I2PSessionImpl; + * previously was available only in I2PSimpleSession. + * Multiple outstanding lookups are now allowed. + * @return null on failure + */ public Destination lookupDest(Hash h) throws I2PSessionException { - return null; + return lookupDest(h, 10*1000); + } + + /** + * Blocking. + * @param maxWait ms + * @since 0.8.3 + * @return null on failure + */ + public Destination lookupDest(Hash h, long maxWait) throws I2PSessionException { + if (_closed) + return null; + LookupWaiter waiter = new LookupWaiter(h); + _pendingLookups.offer(waiter); + sendMessage(new DestLookupMessage(h)); + try { + synchronized (waiter) { + waiter.wait(maxWait); + } + } catch (InterruptedException ie) {} + _pendingLookups.remove(waiter); + return waiter.destination; } + /** + * Blocking. Waits a max of 5 seconds. + * But shouldn't take long. + * Implemented in 0.8.3 in I2PSessionImpl; + * previously was available only in I2PSimpleSession. + * Multiple outstanding lookups are now allowed. + * @return null on failure + */ public int[] bandwidthLimits() throws I2PSessionException { - return null; + if (_closed) + return null; + sendMessage(new GetBandwidthLimitsMessage()); + try { + synchronized (_bwReceivedLock) { + _bwReceivedLock.wait(5*1000); + } + } catch (InterruptedException ie) {} + return _bwLimits; } protected void updateActivity() { diff --git a/core/java/src/net/i2p/client/I2PSimpleSession.java b/core/java/src/net/i2p/client/I2PSimpleSession.java index ed9ec5cc36..e984b6d307 100644 --- a/core/java/src/net/i2p/client/I2PSimpleSession.java +++ b/core/java/src/net/i2p/client/I2PSimpleSession.java @@ -33,12 +33,6 @@ import net.i2p.util.I2PAppThread; * @author zzz */ class I2PSimpleSession extends I2PSessionImpl2 { - private boolean _destReceived; - private /* FIXME final FIXME */ Object _destReceivedLock; - private Destination _destination; - private boolean _bwReceived; - private /* FIXME final FIXME */ Object _bwReceivedLock; - private int[] _bwLimits; /** * Create a new session for doing naming and bandwidth queries only. Do not create a destination. @@ -104,57 +98,6 @@ class I2PSimpleSession extends I2PSessionImpl2 { } } - /** called by the message handler */ - void destReceived(Destination d) { - _destReceived = true; - _destination = d; - synchronized (_destReceivedLock) { - _destReceivedLock.notifyAll(); - } - } - - void bwReceived(int[] i) { - _bwReceived = true; - _bwLimits = i; - synchronized (_bwReceivedLock) { - _bwReceivedLock.notifyAll(); - } - } - - @Override - public Destination lookupDest(Hash h) throws I2PSessionException { - if (_closed) - return null; - _destReceivedLock = new Object(); - sendMessage(new DestLookupMessage(h)); - for (int i = 0; i < 10 && !_destReceived; i++) { - try { - synchronized (_destReceivedLock) { - _destReceivedLock.wait(1000); - } - } catch (InterruptedException ie) {} - } - _destReceived = false; - return _destination; - } - - @Override - public int[] bandwidthLimits() throws I2PSessionException { - if (_closed) - return null; - _bwReceivedLock = new Object(); - sendMessage(new GetBandwidthLimitsMessage()); - for (int i = 0; i < 5 && !_bwReceived; i++) { - try { - synchronized (_bwReceivedLock) { - _bwReceivedLock.wait(1000); - } - } catch (InterruptedException ie) {} - } - _bwReceived = false; - return _bwLimits; - } - /** * Only map message handlers that we will use */ diff --git a/core/java/src/net/i2p/client/naming/LookupDest.java b/core/java/src/net/i2p/client/naming/LookupDest.java index c90b4a6738..d131efade8 100644 --- a/core/java/src/net/i2p/client/naming/LookupDest.java +++ b/core/java/src/net/i2p/client/naming/LookupDest.java @@ -22,6 +22,13 @@ import net.i2p.data.Hash; * * All calls are blocking and return null on failure. * Timeout is set to 10 seconds in I2PSimpleSession. + * + * As of 0.8.3, standard I2PSessions support lookups, + * including multiple lookups in parallel, and overriding + * the default timeout. + * Using an existing I2PSession is much more efficient and + * flexible than using this class. + * */ class LookupDest { diff --git a/core/java/src/net/i2p/data/i2cp/DestReplyMessage.java b/core/java/src/net/i2p/data/i2cp/DestReplyMessage.java index d3b2df9e10..7aaba9c892 100644 --- a/core/java/src/net/i2p/data/i2cp/DestReplyMessage.java +++ b/core/java/src/net/i2p/data/i2cp/DestReplyMessage.java @@ -13,14 +13,18 @@ import java.io.InputStream; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; +import net.i2p.data.Hash; /** - * Response to DestLookupMessage - * + * Response to DestLookupMessage. + * As of 0.8.3, the response may include the hash from the request, indicating + * a failure for a specific request. + * Payload may be empty (failure), a Hash (failure), or a Destination. */ public class DestReplyMessage extends I2CPMessageImpl { public final static int MESSAGE_TYPE = 35; private Destination _dest; + private Hash _hash; public DestReplyMessage() { super(); @@ -30,23 +34,52 @@ public class DestReplyMessage extends I2CPMessageImpl { _dest = d; } + /** + * @param h non-null with non-null data + * @since 0.8.3 + */ + public DestReplyMessage(Hash h) { + _hash = h; + } + public Destination getDestination() { return _dest; } + /** + * @since 0.8.3 + */ + public Hash getHash() { + return _hash; + } + protected void doReadMessage(InputStream in, int size) throws I2CPMessageException, IOException { - try { - Destination d = new Destination(); - d.readBytes(in); - _dest = d; - } catch (DataFormatException dfe) { - _dest = null; // null dest allowed + if (size == 0) { + _dest = null; + _hash = null; + } else { + try { + if (size == Hash.HASH_LENGTH) { + Hash h = new Hash(); + h.readBytes(in); + _hash = h; + } else { + Destination d = new Destination(); + d.readBytes(in); + _dest = d; + } + } catch (DataFormatException dfe) { + _dest = null; + _hash = null; + } } } protected byte[] doWriteMessage() throws I2CPMessageException, IOException { - if (_dest == null) + if (_dest == null && _hash == null) return new byte[0]; // null response allowed + if (_dest == null && _hash != null) + return _hash.getData(); ByteArrayOutputStream os = new ByteArrayOutputStream(_dest.size()); try { _dest.writeBytes(os); @@ -65,7 +98,8 @@ public class DestReplyMessage extends I2CPMessageImpl { public boolean equals(Object object) { if ((object != null) && (object instanceof DestReplyMessage)) { DestReplyMessage msg = (DestReplyMessage) object; - return DataHelper.eq(getDestination(), msg.getDestination()); + return DataHelper.eq(getDestination(), msg.getDestination()) && + DataHelper.eq(getHash(), msg.getHash()); } return false; } @@ -75,6 +109,7 @@ public class DestReplyMessage extends I2CPMessageImpl { StringBuilder buf = new StringBuilder(); buf.append("[DestReplyMessage: "); buf.append("\n\tDestination: ").append(_dest); + buf.append("\n\tHash: ").append(_hash); buf.append("]"); return buf.toString(); } -- GitLab