I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 83165df7 authored by jrandom's avatar jrandom Committed by zzz
Browse files

* delay the ack of a syn

* make sure we ack duplicate messages received (if we aren't already doing so)
* implement a choke on the local buffer, in case we receive data faster than its
  removed from the i2psocket's MessageInputStream (handle via packet drop and
  explicit congestion notification)
parent 30074be5
No related branches found
No related tags found
No related merge requests found
...@@ -20,6 +20,7 @@ public class ConnectionOptions extends I2PSocketOptions { ...@@ -20,6 +20,7 @@ public class ConnectionOptions extends I2PSocketOptions {
private int _maxResends; private int _maxResends;
private int _inactivityTimeout; private int _inactivityTimeout;
private int _inactivityAction; private int _inactivityAction;
private int _inboundBufferSize;
public static final int PROFILE_BULK = 1; public static final int PROFILE_BULK = 1;
public static final int PROFILE_INTERACTIVE = 2; public static final int PROFILE_INTERACTIVE = 2;
...@@ -59,6 +60,7 @@ public class ConnectionOptions extends I2PSocketOptions { ...@@ -59,6 +60,7 @@ public class ConnectionOptions extends I2PSocketOptions {
setMaxResends(opts.getMaxResends()); setMaxResends(opts.getMaxResends());
setInactivityTimeout(opts.getInactivityTimeout()); setInactivityTimeout(opts.getInactivityTimeout());
setInactivityAction(opts.getInactivityAction()); setInactivityAction(opts.getInactivityAction());
setInboundBufferSize(opts.getInboundBufferSize());
} else { } else {
setConnectDelay(2*1000); setConnectDelay(2*1000);
setProfile(PROFILE_BULK); setProfile(PROFILE_BULK);
...@@ -72,6 +74,7 @@ public class ConnectionOptions extends I2PSocketOptions { ...@@ -72,6 +74,7 @@ public class ConnectionOptions extends I2PSocketOptions {
setWriteTimeout(-1); setWriteTimeout(-1);
setInactivityTimeout(5*60*1000); setInactivityTimeout(5*60*1000);
setInactivityAction(INACTIVITY_ACTION_SEND); setInactivityAction(INACTIVITY_ACTION_SEND);
setInboundBufferSize(256*1024);
} }
} }
...@@ -186,4 +189,11 @@ public class ConnectionOptions extends I2PSocketOptions { ...@@ -186,4 +189,11 @@ public class ConnectionOptions extends I2PSocketOptions {
public int getInactivityAction() { return _inactivityAction; } public int getInactivityAction() { return _inactivityAction; }
public void setInactivityAction(int action) { _inactivityAction = action; } public void setInactivityAction(int action) { _inactivityAction = action; }
/**
* how much data are we willing to accept in our buffer?
*
*/
public int getInboundBufferSize() { return _inboundBufferSize; }
public void setInboundBufferSize(int bytes) { _inboundBufferSize = bytes; }
} }
...@@ -7,6 +7,7 @@ import net.i2p.I2PException; ...@@ -7,6 +7,7 @@ import net.i2p.I2PException;
import net.i2p.data.DataHelper; import net.i2p.data.DataHelper;
import net.i2p.data.Destination; import net.i2p.data.Destination;
import net.i2p.util.Log; import net.i2p.util.Log;
import net.i2p.util.SimpleTimer;
/** /**
* Receive a packet for a particular connection - placing the data onto the * Receive a packet for a particular connection - placing the data onto the
...@@ -27,6 +28,13 @@ public class ConnectionPacketHandler { ...@@ -27,6 +28,13 @@ public class ConnectionPacketHandler {
boolean ok = verifyPacket(packet, con); boolean ok = verifyPacket(packet, con);
if (!ok) return; if (!ok) return;
con.packetReceived(); con.packetReceived();
if (con.getInputStream().getTotalQueuedSize() > con.getOptions().getInboundBufferSize()) {
if (_log.shouldLog(Log.WARN))
_log.warn("Inbound buffer exceeded on connection " + con + ": dropping " + packet);
con.getOptions().setChoke(5*1000);
return;
}
con.getOptions().setChoke(0);
boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload());
// close *after* receiving the data, as well as after verifying the signatures / etc // close *after* receiving the data, as well as after verifying the signatures / etc
...@@ -53,7 +61,8 @@ public class ConnectionPacketHandler { ...@@ -53,7 +61,8 @@ public class ConnectionPacketHandler {
con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2);
//con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2);
if (_log.shouldLog(Log.WARN)) if (_log.shouldLog(Log.WARN))
_log.warn("congestion.. dup " + packet); _log.warn("congestion.. dup " + packet);
SimpleTimer.getInstance().addEvent(new AckDup(con), con.getOptions().getSendAckDelay());
//con.incrementUnackedPacketsReceived(); //con.incrementUnackedPacketsReceived();
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
} else { } else {
...@@ -249,4 +258,22 @@ public class ConnectionPacketHandler { ...@@ -249,4 +258,22 @@ public class ConnectionPacketHandler {
} }
} }
} }
private class AckDup implements SimpleTimer.TimedEvent {
private long _created;
private Connection _con;
public AckDup(Connection con) {
_created = _context.clock().now();
_con = con;
}
public void timeReached() {
if (_con.getLastActivityOn() <= _created) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Last activity was a while ago, and we want to ack a dup");
// we haven't done anything since receiving the dup, send an
// ack now
_con.ackImmediately();
}
}
}
} }
...@@ -31,10 +31,15 @@ class SchedulerReceived extends SchedulerImpl { ...@@ -31,10 +31,15 @@ class SchedulerReceived extends SchedulerImpl {
long timeTillSend = con.getNextSendTime() - _context.clock().now(); long timeTillSend = con.getNextSendTime() - _context.clock().now();
if (timeTillSend <= 0) { if (timeTillSend <= 0) {
if (_log.shouldLog(Log.DEBUG)) if (con.getNextSendTime() > 0) {
_log.debug("received con... send a packet"); if (_log.shouldLog(Log.DEBUG))
con.sendAvailable(); _log.debug("received con... send a packet");
con.setNextSendTime(-1); con.sendAvailable();
con.setNextSendTime(-1);
} else {
con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay());
reschedule(con.getOptions().getSendAckDelay(), con);
}
} else { } else {
if (_log.shouldLog(Log.DEBUG)) if (_log.shouldLog(Log.DEBUG))
_log.debug("received con... time till next send: " + timeTillSend); _log.debug("received con... time till next send: " + timeTillSend);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment