diff --git a/router/java/src/net/i2p/router/StatisticsManager.java b/router/java/src/net/i2p/router/StatisticsManager.java
index 45f7523c8f1812bba66f2777e8fb110453c8b305..6142c658d882ee2b4b9845081ce55f1e36d69233 100644
--- a/router/java/src/net/i2p/router/StatisticsManager.java
+++ b/router/java/src/net/i2p/router/StatisticsManager.java
@@ -109,6 +109,8 @@ public class StatisticsManager implements Service {
             includeRate("netDb.lookupsMatched", stats, new long[] { 5*60*1000, 60*60*1000 });
             includeRate("netDb.storeSent", stats, new long[] { 5*60*1000, 60*60*1000 });
             includeRate("netDb.successPeers", stats, new long[] { 60*60*1000 });
+            includeRate("netDb.failedPeers", stats, new long[] { 60*60*1000 });
+            includeRate("netDb.searchCount", stats, new long[] { 3*60*60*1000});
             includeRate("transport.receiveMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
             includeRate("transport.sendMessageSize", stats, new long[] { 5*60*1000, 60*60*1000 });
             includeRate("client.sendAckTime", stats, new long[] { 60*60*1000, 24*60*60*1000l }, true);
diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java
index 7cf1bdd8f695984f6b730dd18e5ff7dc899bb9ec..429f62a3dd737b99fb96c5860a0df7f3df47fa2d 100644
--- a/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java
+++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJob.java
@@ -60,6 +60,7 @@ public class OutboundClientMessageJob extends JobImpl {
     private NextStepJob _nextStep;
     private LookupLeaseSetFailedJob _lookupLeaseSetFailed;
     private long _overallExpiration;
+    private boolean _shouldBundle;
     
     /**
      * final timeout (in milliseconds) that the outbound message will fail in.
@@ -77,6 +78,29 @@ public class OutboundClientMessageJob extends JobImpl {
     /** dont search for the lease more than 6 times */
     private final static int MAX_LEASE_LOOKUPS = 6;
     
+    /**
+     * If the client's config specifies shouldBundleReplyInfo=true, messages sent from
+     * that client to any peers will probabalistically include the sending destination's
+     * current LeaseSet (allowing the recipient to reply without having to do a full
+     * netDb lookup).  This should improve performance during the initial negotiations,
+     * but is not necessary for communication that isn't bidirectional.
+     *
+     */
+    public static final String BUNDLE_REPLY_LEASESET = "shouldBundleReplyInfo";
+    /**
+     * Allow the override of the frequency of bundling the reply info in with a message.
+     * The client app can specify bundleReplyInfoProbability=80 (for instance) and that
+     * will cause the router to include the sender's leaseSet with 80% of the messages
+     * sent to the peer.
+     *
+     */
+    public static final String BUNDLE_PROBABILITY = "bundleReplyInfoProbability";
+    /** 
+     * How often do messages include the reply leaseSet (out of every 100 tries).  
+     * Including it each time is probably overkill, but who knows.  
+     */
+    private static final int BUNDLE_PROBABILITY_DEFAULT = 30;
+    
     /**
      * Send the sucker
      */
@@ -105,6 +129,8 @@ public class OutboundClientMessageJob extends JobImpl {
             }
         }
         
+        _shouldBundle = getShouldBundle();
+        
         _overallExpiration = timeoutMs + _context.clock().now();
         _status = new OutboundClientMessageStatus(msg);
         _nextStep = new NextStepJob();
@@ -253,6 +279,25 @@ public class OutboundClientMessageJob extends JobImpl {
         }
     }
     
+    private boolean getShouldBundle() {
+        String wantBundle = _status.getMessage().getSenderConfig().getOptions().getProperty(BUNDLE_REPLY_LEASESET, "true");
+        if ("true".equals(wantBundle)) {
+            int probability = BUNDLE_PROBABILITY_DEFAULT;
+            String str = _status.getMessage().getSenderConfig().getOptions().getProperty(BUNDLE_PROBABILITY);
+            try { 
+                if (str != null) 
+                    probability = Integer.parseInt(str);
+            } catch (NumberFormatException nfe) {
+                if (_log.shouldLog(Log.WARN))
+                    _log.warn("Bundle leaseSet probability overridden incorrectly [" + str + "]", nfe);
+            }
+            if (probability >= _context.random().nextInt(100))
+                return true;
+            else
+                return false;
+        }
+    }
+    
     /**
      * Send the message to the specified tunnel by creating a new garlic message containing
      * the (already created) payload clove as well as a new delivery status message.  This garlic
@@ -267,11 +312,16 @@ public class OutboundClientMessageJob extends JobImpl {
         PublicKey key = _status.getLeaseSet().getEncryptionKey();
         SessionKey sessKey = new SessionKey();
         Set tags = new HashSet();
+        LeaseSet replyLeaseSet = null;
+        if (_shouldBundle) {
+            replyLeaseSet = _context.netDb().lookupLeaseSetLocally(_status.getFrom().calculateHash());
+        }
+        
         GarlicMessage msg = OutboundClientMessageJobHelper.createGarlicMessage(_context, token, 
                                                                                _overallExpiration, key, 
                                                                                _status.getClove(), 
                                                                                _status.getTo(), sessKey, 
-                                                                               tags, true);
+                                                                               tags, true, replyLeaseSet);
         
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("send(lease) - token expected " + token);
diff --git a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java
index 28f553ccac7761268d7378c000b7f9a48ef4b847..51043e6cc721890479d724e9407675b18f139e21 100644
--- a/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java
+++ b/router/java/src/net/i2p/router/message/OutboundClientMessageJobHelper.java
@@ -19,9 +19,11 @@ import net.i2p.data.Payload;
 import net.i2p.data.PublicKey;
 import net.i2p.data.SessionKey;
 import net.i2p.data.TunnelId;
+import net.i2p.data.LeaseSet;
 import net.i2p.data.i2np.DataMessage;
 import net.i2p.data.i2np.DeliveryInstructions;
 import net.i2p.data.i2np.DeliveryStatusMessage;
+import net.i2p.data.i2np.DatabaseStoreMessage;
 import net.i2p.data.i2np.GarlicMessage;
 import net.i2p.router.Router;
 import net.i2p.router.TunnelInfo;
@@ -50,23 +52,32 @@ class OutboundClientMessageJobHelper {
      *
      * For now, its just a tunneled DeliveryStatusMessage
      *
+     * @param bundledReplyLeaseSet if specified, the given LeaseSet will be packaged with the message (allowing
+     *                             much faster replies, since their netDb search will return almost instantly)
      */
-    static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, Payload data, Destination dest, SessionKey wrappedKey, Set wrappedTags, boolean requireAck) {
+    static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, 
+                                             Payload data, Destination dest, SessionKey wrappedKey, Set wrappedTags, 
+                                             boolean requireAck, LeaseSet bundledReplyLeaseSet) {
         PayloadGarlicConfig dataClove = buildDataClove(ctx, data, dest, expiration);
-        return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, dest, wrappedKey, wrappedTags, requireAck);
+        return createGarlicMessage(ctx, replyToken, expiration, recipientPK, dataClove, dest, wrappedKey, 
+                                   wrappedTags, requireAck, bundledReplyLeaseSet);
     }
     /**
      * Allow the app to specify the data clove directly, which enables OutboundClientMessage to resend the
      * same payload (including expiration and unique id) in different garlics (down different tunnels)
      *
      */
-    static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, PayloadGarlicConfig dataClove, Destination dest, SessionKey wrappedKey, Set wrappedTags, boolean requireAck) {
-        GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, dest, requireAck);
+    static GarlicMessage createGarlicMessage(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, 
+                                             PayloadGarlicConfig dataClove, Destination dest, SessionKey wrappedKey, 
+                                             Set wrappedTags, boolean requireAck, LeaseSet bundledReplyLeaseSet) {
+        GarlicConfig config = createGarlicConfig(ctx, replyToken, expiration, recipientPK, dataClove, dest, requireAck, bundledReplyLeaseSet);
         GarlicMessage msg = GarlicMessageBuilder.buildMessage(ctx, config, wrappedKey, wrappedTags);
         return msg;
     }
     
-    private static GarlicConfig createGarlicConfig(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, PayloadGarlicConfig dataClove, Destination dest, boolean requireAck) {
+    private static GarlicConfig createGarlicConfig(RouterContext ctx, long replyToken, long expiration, PublicKey recipientPK, 
+                                                   PayloadGarlicConfig dataClove, Destination dest, boolean requireAck,
+                                                   LeaseSet bundledReplyLeaseSet) {
         Log log = ctx.logManager().getLog(OutboundClientMessageJobHelper.class);
         log.debug("Reply token: " + replyToken);
         GarlicConfig config = new GarlicConfig();
@@ -78,6 +89,11 @@ class OutboundClientMessageJobHelper {
             config.addClove(ackClove);
         }
         
+        if (bundledReplyLeaseSet != null) {
+            PayloadGarlicConfig leaseSetClove = buildLeaseSetClove(ctx, expiration, bundledReplyLeaseSet);
+            config.addClove(leaseSetClove);
+        }
+        
         DeliveryInstructions instructions = new DeliveryInstructions();
         instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
         instructions.setDelayRequested(false);
@@ -177,4 +193,32 @@ class OutboundClientMessageJobHelper {
         
         return clove;
     }
+    
+    
+    /**
+     * Build a clove that stores the leaseSet locally 
+     */
+    static PayloadGarlicConfig buildLeaseSetClove(RouterContext ctx, long expiration, LeaseSet replyLeaseSet) {
+        PayloadGarlicConfig clove = new PayloadGarlicConfig();
+        
+        DeliveryInstructions instructions = new DeliveryInstructions();
+        instructions.setDeliveryMode(DeliveryInstructions.DELIVERY_MODE_LOCAL);
+        instructions.setDelayRequested(false);
+        instructions.setDelaySeconds(0);
+        instructions.setEncrypted(false);
+        
+        clove.setCertificate(new Certificate(Certificate.CERTIFICATE_TYPE_NULL, null));
+        clove.setDeliveryInstructions(instructions);
+        clove.setExpiration(expiration);
+        clove.setId(ctx.random().nextInt(Integer.MAX_VALUE));
+        DatabaseStoreMessage msg = new DatabaseStoreMessage(ctx);
+        msg.setLeaseSet(replyLeaseSet);
+        msg.setMessageExpiration(new Date(expiration));
+        msg.setKey(replyLeaseSet.getDestination().calculateHash());
+        clove.setPayload(msg);
+        clove.setRecipientPublicKey(null);
+        clove.setRequestAck(false);
+        
+        return clove;
+    }
 }
diff --git a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java
index c38426cbf534a54da4130469cbbc2d3317e162ce..d5e2cc302402077e8f67c58974894495a325f8ea 100644
--- a/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java
+++ b/router/java/src/net/i2p/router/networkdb/kademlia/SearchJob.java
@@ -80,7 +80,8 @@ class SearchJob extends JobImpl {
         _context.statManager().createRateStat("netDb.successTime", "How long a successful search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
         _context.statManager().createRateStat("netDb.failedTime", "How long a failed search takes", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
         _context.statManager().createRateStat("netDb.successPeers", "How many peers are contacted in a successful search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
-        _context.statManager().createRateStat("netDb.failedPeers", "How many peers are contacted in a failed search", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
+        _context.statManager().createRateStat("netDb.failedPeers", "How many peers fail to respond to a lookup?", "Network Database", new long[] { 60*60*1000l, 24*60*60*1000l });
+        _context.statManager().createRateStat("netDb.searchCount", "Overall number of searches sent", "Network Database", new long[] { 60*60*1000l, 3*60*60*1000l, 24*60*60*1000l });
         if (_log.shouldLog(Log.DEBUG))
             _log.debug("Search (" + getClass().getName() + " for " + key.toBase64(), new Exception("Search enqueued by"));
     }
@@ -88,6 +89,7 @@ class SearchJob extends JobImpl {
     public void runJob() {
         if (_log.shouldLog(Log.INFO))
             _log.info(getJobId() + ": Searching for " + _state.getTarget()); // , getAddedBy());
+        _context.statManager().addRateData("netDb.searchCount", 1, 0);
         searchNext();
     }
 
@@ -474,6 +476,7 @@ class SearchJob extends JobImpl {
                 if (_log.shouldLog(Log.ERROR))
                     _log.error("NOT (!!) Penalizing peer for timeout on search: " + _peer.toBase64());
             }
+            _context.statManager().addRateData("netDb.failedPeers", 1, 0);
             searchNext();
         }
         public String getName() { return "Kademlia Search Failed"; }
@@ -509,7 +512,6 @@ class SearchJob extends JobImpl {
         if (_keepStats) {
             long time = _context.clock().now() - _state.getWhenStarted();
             _context.statManager().addRateData("netDb.failedTime", time, 0);
-            _context.statManager().addRateData("netDb.failedPeers", _state.getAttempted().size(), time);
         }
         if (_onFailure != null)
             _context.jobQueue().addJob(_onFailure);