diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java index a35de82975f1df9b0fdf0bba032bece311ba2c96..22470e22b955e049d96ddbf3f5264ece7f149e67 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -20,6 +20,7 @@ public class ConnectionOptions extends I2PSocketOptions { private int _maxResends; private int _inactivityTimeout; private int _inactivityAction; + private int _inboundBufferSize; public static final int PROFILE_BULK = 1; public static final int PROFILE_INTERACTIVE = 2; @@ -59,6 +60,7 @@ public class ConnectionOptions extends I2PSocketOptions { setMaxResends(opts.getMaxResends()); setInactivityTimeout(opts.getInactivityTimeout()); setInactivityAction(opts.getInactivityAction()); + setInboundBufferSize(opts.getInboundBufferSize()); } else { setConnectDelay(2*1000); setProfile(PROFILE_BULK); @@ -72,6 +74,7 @@ public class ConnectionOptions extends I2PSocketOptions { setWriteTimeout(-1); setInactivityTimeout(5*60*1000); setInactivityAction(INACTIVITY_ACTION_SEND); + setInboundBufferSize(256*1024); } } @@ -186,4 +189,11 @@ public class ConnectionOptions extends I2PSocketOptions { public int getInactivityAction() { return _inactivityAction; } public void setInactivityAction(int action) { _inactivityAction = action; } + + /** + * how much data are we willing to accept in our buffer? + * + */ + public int getInboundBufferSize() { return _inboundBufferSize; } + public void setInboundBufferSize(int bytes) { _inboundBufferSize = bytes; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index c8038fc0adc427fec57bca71af79c3a1144a933a..552195fda883016c85e1a241f7edff66e442be39 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -7,6 +7,7 @@ import net.i2p.I2PException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; /** * Receive a packet for a particular connection - placing the data onto the @@ -27,6 +28,13 @@ public class ConnectionPacketHandler { boolean ok = verifyPacket(packet, con); if (!ok) return; con.packetReceived(); + if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet); + con.getOptions().setChoke(5*1000); + return; + } + con.getOptions().setChoke(0); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); // close *after* receiving the data, as well as after verifying the signatures / etc @@ -53,7 +61,8 @@ public class ConnectionPacketHandler { con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); if (_log.shouldLog(Log.WARN)) - _log.warn("congestion.. dup " + packet); + _log.warn("congestion.. dup " + packet); + SimpleTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay()); //con.incrementUnackedPacketsReceived(); con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); } else { @@ -249,4 +258,22 @@ public class ConnectionPacketHandler { } } } + + private class AckDup implements SimpleTimer.TimedEvent { + private long _created; + private Connection _con; + public AckDup(Connection con) { + _created = _context.clock().now(); + _con = con; + } + public void timeReached() { + if (_con.getLastActivityOn() <= _created) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Last activity was a while ago, and we want to ack a dup"); + // we haven't done anything since receiving the dup, send an + // ack now + _con.ackImmediately(); + } + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java index 09836477e52d714da053032666643d584ad51d14..89d28b3546f720670cd3d2d846176be4c861c475 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java @@ -31,10 +31,15 @@ class SchedulerReceived extends SchedulerImpl { long timeTillSend = con.getNextSendTime() - _context.clock().now(); if (timeTillSend <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("received con... send a packet"); - con.sendAvailable(); - con.setNextSendTime(-1); + if (con.getNextSendTime() > 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("received con... send a packet"); + con.sendAvailable(); + con.setNextSendTime(-1); + } else { + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + reschedule(con.getOptions().getSendAckDelay(), con); + } } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("received con... time till next send: " + timeTillSend);