From 9e5fe7d2b6b1021feed75ebf5156f3ae8688026f Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Sat, 16 Apr 2005 15:18:09 +0000 Subject: [PATCH] * fixed some stupid threading issues in the packet handler (duh) * use the new raw i2np message format (the previous corruptions were due to above) * add a new test component (UDPFlooder) which floods all peers at the rate desired * packet munging fix for highly fragmented messages * include basic slow start code * fixed the UDP peer rate refilling * cleaned up some nextSend scheduling --- .../net/i2p/data/i2np/I2NPMessageImpl.java | 4 +- .../i2p/router/transport/udp/ACKSender.java | 2 +- .../transport/udp/EstablishmentManager.java | 6 +- .../udp/InboundMessageFragments.java | 22 ++-- .../transport/udp/InboundMessageState.java | 6 +- .../router/transport/udp/MessageReceiver.java | 4 +- .../udp/OutboundMessageFragments.java | 37 +++--- .../transport/udp/OutboundMessageState.java | 34 ++++-- .../router/transport/udp/PacketBuilder.java | 19 ++- .../router/transport/udp/PacketHandler.java | 111 +++++++++--------- .../i2p/router/transport/udp/PeerState.java | 38 +++++- .../i2p/router/transport/udp/UDPFlooder.java | 97 +++++++++++++++ .../router/transport/udp/UDPPacketReader.java | 31 ++++- .../router/transport/udp/UDPTransport.java | 27 ++++- .../net/i2p/router/tunnel/pool/TestJob.java | 3 +- 15 files changed, 323 insertions(+), 118 deletions(-) create mode 100644 router/java/src/net/i2p/router/transport/udp/UDPFlooder.java diff --git a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java index 2e6f09149c..8dd718e2d3 100644 --- a/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java +++ b/router/java/src/net/i2p/data/i2np/I2NPMessageImpl.java @@ -35,7 +35,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM public final static long DEFAULT_EXPIRATION_MS = 1*60*1000; // 1 minute by default public final static int CHECKSUM_LENGTH = 1; //Hash.HASH_LENGTH; - private static final boolean RAW_FULL_SIZE = true; + private static final boolean RAW_FULL_SIZE = false; public I2NPMessageImpl(I2PAppContext context) { _context = context; @@ -123,7 +123,7 @@ public abstract class I2NPMessageImpl extends DataStructureImpl implements I2NPM + "data.len=" + data.length + " offset=" + offset + " cur=" + cur - + " wanted=" + size + "]"); + + " wanted=" + size + "]: " + getClass().getName()); SHA256EntryCache.CacheEntry cache = _context.sha().cache().acquire(size); Hash calc = _context.sha().calculateHash(data, cur, size, cache); diff --git a/router/java/src/net/i2p/router/transport/udp/ACKSender.java b/router/java/src/net/i2p/router/transport/udp/ACKSender.java index 8ec022e7c2..407bd07bba 100644 --- a/router/java/src/net/i2p/router/transport/udp/ACKSender.java +++ b/router/java/src/net/i2p/router/transport/udp/ACKSender.java @@ -23,7 +23,7 @@ public class ACKSender implements Runnable { _log = ctx.logManager().getLog(ACKSender.class); _fragments = fragments; _transport = transport; - _builder = new PacketBuilder(_context, _transport); + _builder = new PacketBuilder(_context); } public void run() { diff --git a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java index 04654f6b2f..e4d7e7d0fe 100644 --- a/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java +++ b/router/java/src/net/i2p/router/transport/udp/EstablishmentManager.java @@ -38,7 +38,7 @@ public class EstablishmentManager { _context = ctx; _log = ctx.logManager().getLog(EstablishmentManager.class); _transport = transport; - _builder = new PacketBuilder(ctx, _transport); + _builder = new PacketBuilder(ctx); _inboundStates = new HashMap(32); _outboundStates = new HashMap(32); _activityLock = new Object(); @@ -281,7 +281,7 @@ public class EstablishmentManager { _log.debug("Send created to: " + state.getRemoteHostInfo()); state.generateSessionKey(); - _transport.send(_builder.buildSessionCreatedPacket(state)); + _transport.send(_builder.buildSessionCreatedPacket(state, _transport.getExternalPort(), _transport.getIntroKey())); // if they haven't advanced to sending us confirmed packets in 5s, // repeat state.setNextSendTime(now + 5*1000); @@ -308,7 +308,7 @@ public class EstablishmentManager { // signs if we havent signed yet state.prepareSessionConfirmed(); - UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state); + UDPPacket packets[] = _builder.buildSessionConfirmedPackets(state, _context.router().getRouterInfo().getIdentity()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Send confirm to: " + state.getRemoteHostInfo()); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java index 1738e3da90..76d0cd39bd 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageFragments.java @@ -38,7 +38,7 @@ public class InboundMessageFragments { private static final int RECENTLY_COMPLETED_SIZE = 100; /** how frequently do we want to send ACKs to a peer? */ - private static final int ACK_FREQUENCY = 100; + private static final int ACK_FREQUENCY = 200; public InboundMessageFragments(RouterContext ctx, OutboundMessageFragments outbound, UDPTransport transport) { _context = ctx; @@ -132,7 +132,7 @@ public class InboundMessageFragments { messageComplete = true; messages.remove(messageId); - while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE) + while (_recentlyCompletedMessages.size() >= RECENTLY_COMPLETED_SIZE) _recentlyCompletedMessages.remove(0); _recentlyCompletedMessages.add(messageId); @@ -147,8 +147,6 @@ public class InboundMessageFragments { _context.statManager().addRateData("udp.receivedCompleteTime", state.getLifetime(), state.getLifetime()); _context.statManager().addRateData("udp.receivedCompleteFragments", state.getFragmentCount(), state.getLifetime()); - - _stateLock.notifyAll(); } else if (state.isExpired()) { messageExpired = true; messages.remove(messageId); @@ -160,6 +158,8 @@ public class InboundMessageFragments { if (!fragmentOK) break; } + + _stateLock.notifyAll(); } } @@ -167,13 +167,15 @@ public class InboundMessageFragments { if (data.readACKsIncluded()) { int fragments = 0; long acks[] = data.readACKs(); - _context.statManager().addRateData("udp.receivedACKs", acks.length, 0); - for (int i = 0; i < acks.length; i++) { - if (_log.shouldLog(Log.INFO)) - _log.info("Full ACK of message " + acks[i] + " received!"); - fragments += _outbound.acked(acks[i], from.getRemotePeer()); + if (acks != null) { + _context.statManager().addRateData("udp.receivedACKs", acks.length, 0); + for (int i = 0; i < acks.length; i++) { + if (_log.shouldLog(Log.INFO)) + _log.info("Full ACK of message " + acks[i] + " received!"); + fragments += _outbound.acked(acks[i], from.getRemotePeer()); + } + from.messageACKed(fragments * from.getMTU()); // estimated size } - from.messageACKed(fragments * from.getMTU()); // estimated size } if (data.readECN()) from.ECNReceived(); diff --git a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java index a07d45341f..ba60ebf6a2 100644 --- a/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/InboundMessageState.java @@ -50,7 +50,11 @@ public class InboundMessageState { public synchronized boolean receiveFragment(UDPPacketReader.DataReader data, int dataFragment) { int fragmentNum = data.readMessageFragmentNum(dataFragment); if ( (fragmentNum < 0) || (fragmentNum > _fragments.length)) { - _log.log(Log.CRIT, "Invalid fragment " + fragmentNum + ": " + data, new Exception("source")); + StringBuffer buf = new StringBuffer(1024); + buf.append("Invalid fragment ").append(fragmentNum); + buf.append(": ").append(data); + data.toRawString(buf); + _log.log(Log.CRIT, buf.toString(), new Exception("source")); return false; } if (_fragments[fragmentNum] == null) { diff --git a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java index c13071159a..8a709f5a1d 100644 --- a/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java +++ b/router/java/src/net/i2p/router/transport/udp/MessageReceiver.java @@ -61,8 +61,8 @@ public class MessageReceiver implements Runnable { m.setUniqueId(state.getMessageId()); return m; } catch (I2NPMessageException ime) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Message invalid: " + state, ime); + if (_log.shouldLog(Log.ERROR)) + _log.error("Message invalid: " + state, ime); return null; } catch (Exception e) { _log.log(Log.CRIT, "Error dealing with a message: " + state, e); diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java index 60c2503a2c..82cd9698d8 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageFragments.java @@ -42,7 +42,7 @@ public class OutboundMessageFragments { _transport = transport; _activeMessages = new ArrayList(MAX_ACTIVE); _nextPacketMessage = 0; - _builder = new PacketBuilder(ctx, _transport); + _builder = new PacketBuilder(ctx); _alive = true; _context.statManager().createRateStat("udp.sendVolleyTime", "Long it takes to send a full volley", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); _context.statManager().createRateStat("udp.sendConfirmTime", "How long it takes to send a message and get the ACK", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 }); @@ -89,10 +89,11 @@ public class OutboundMessageFragments { */ public void add(OutNetMessage msg) { OutboundMessageState state = new OutboundMessageState(_context); - state.initialize(msg); + boolean ok = state.initialize(msg); finishMessages(); synchronized (_activeMessages) { - _activeMessages.add(state); + if (ok) + _activeMessages.add(state); _activeMessages.notifyAll(); } } @@ -129,14 +130,14 @@ public class OutboundMessageFragments { // it can not have an OutNetMessage if the source is the // final after establishment message if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to send a direct message: " + state); + _log.warn("Unable to send an expired direct message: " + state); } i--; } else if (state.getPushCount() > MAX_VOLLEYS) { _activeMessages.remove(i); _context.statManager().addRateData("udp.sendAggressiveFailed", state.getPushCount(), state.getLifetime()); - if (state.getPeer() != null) - state.getPeer().congestionOccurred(); + //if (state.getPeer() != null) + // state.getPeer().congestionOccurred(); if (state.getMessage() != null) { _transport.failed(state.getMessage()); @@ -144,7 +145,7 @@ public class OutboundMessageFragments { // it can not have an OutNetMessage if the source is the // final after establishment message if (_log.shouldLog(Log.WARN)) - _log.warn("Unable to send a direct message: " + state); + _log.warn("Unable to send a direct message after too many volleys: " + state); } i--; @@ -198,25 +199,31 @@ public class OutboundMessageFragments { int fragmentSize = state.fragmentSize(currentFragment); if (peer.allocateSendingBytes(fragmentSize)) { if (_log.shouldLog(Log.INFO)) - _log.info("Allocation of " + fragmentSize + " allowed"); + _log.info("Allocation of " + fragmentSize + " allowed with " + + peer.getSendWindowBytesRemaining() + + "/" + peer.getSendWindowBytes() + + " remaining" + + " for message " + state.getMessageId() + ": " + state); // for fairness, we move on in a round robin _nextPacketMessage = i + 1; if (state.getPushCount() != oldVolley) { _context.statManager().addRateData("udp.sendVolleyTime", state.getLifetime(), state.getFragmentCount()); - state.setNextSendTime(now + 500); + state.setNextSendTime(now + (1000-(now%1000)) + _context.random().nextInt(2000)); } else { if (peer.getSendWindowBytesRemaining() > 0) state.setNextSendTime(now); else - state.setNextSendTime(now + 50 ); + state.setNextSendTime(now + (1000-(now%1000))); } break; } else { if (_log.shouldLog(Log.WARN)) - _log.warn("Allocation of " + fragmentSize + " rejected"); - state.setNextSendTime(now + _context.random().nextInt(500)); + _log.warn("Allocation of " + fragmentSize + " rejected w/ wsize=" + peer.getSendWindowBytes() + + " available=" + peer.getSendWindowBytesRemaining() + + " for message " + state.getMessageId() + ": " + state); + state.setNextSendTime(now + (1000-(now%1000))); currentFragment = -1; } } @@ -229,15 +236,15 @@ public class OutboundMessageFragments { if (currentFragment < 0) { if (nextSend <= 0) { try { - _activeMessages.wait(100); + _activeMessages.wait(); } catch (InterruptedException ie) {} } else { // none of the packets were eligible for sending long delay = nextSend - now; if (delay <= 0) delay = 10; - if (delay > 500) - delay = 500; + if (delay > 1000) + delay = 1000; try { _activeMessages.wait(delay); } catch (InterruptedException ie) {} diff --git a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java index cff837dace..b38d2a82c7 100644 --- a/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java +++ b/router/java/src/net/i2p/router/transport/udp/OutboundMessageState.java @@ -4,7 +4,7 @@ import java.util.Arrays; import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.i2np.I2NPMessage; -import net.i2p.router.RouterContext; +import net.i2p.I2PAppContext; import net.i2p.router.OutNetMessage; import net.i2p.util.ByteCache; import net.i2p.util.Log; @@ -14,7 +14,7 @@ import net.i2p.util.Log; * */ public class OutboundMessageState { - private RouterContext _context; + private I2PAppContext _context; private Log _log; /** may be null if we are part of the establishment */ private OutNetMessage _message; @@ -35,19 +35,35 @@ public class OutboundMessageState { public static final int MAX_FRAGMENTS = 32; private static final ByteCache _cache = ByteCache.getInstance(64, MAX_FRAGMENTS*1024); - public OutboundMessageState(RouterContext context) { + public OutboundMessageState(I2PAppContext context) { _context = context; _log = _context.logManager().getLog(OutboundMessageState.class); _pushCount = 0; _maxSends = 0; } - public synchronized void initialize(OutNetMessage msg) { - initialize(msg, msg.getMessage(), null); + public synchronized boolean initialize(OutNetMessage msg) { + try { + initialize(msg, msg.getMessage(), null); + return true; + } catch (OutOfMemoryError oom) { + throw oom; + } catch (Exception e) { + _log.log(Log.CRIT, "Error initializing " + msg, e); + return false; + } } - public void initialize(I2NPMessage msg, PeerState peer) { - initialize(null, msg, peer); + public boolean initialize(I2NPMessage msg, PeerState peer) { + try { + initialize(null, msg, peer); + return true; + } catch (OutOfMemoryError oom) { + throw oom; + } catch (Exception e) { + _log.log(Log.CRIT, "Error initializing " + msg, e); + return false; + } } private void initialize(OutNetMessage m, I2NPMessage msg, PeerState peer) { @@ -227,6 +243,10 @@ public class OutboundMessageState { buf.append("Message ").append(_messageId); if (_fragmentSends != null) buf.append(" with ").append(_fragmentSends.length).append(" fragments"); + if (_messageBuf != null) + buf.append(" of size ").append(_messageBuf.getValid()); + buf.append(" volleys: ").append(_maxSends); + buf.append(" lifetime: ").append(getLifetime()); return buf.toString(); } } diff --git a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java index cdeb891695..609eb3d4c0 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketBuilder.java @@ -6,13 +6,14 @@ import java.util.Arrays; import java.util.Date; import java.util.List; +import net.i2p.I2PAppContext; import net.i2p.data.Base64; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; import net.i2p.data.Hash; +import net.i2p.data.RouterIdentity; import net.i2p.data.SessionKey; import net.i2p.data.Signature; -import net.i2p.router.RouterContext; import net.i2p.util.ByteCache; import net.i2p.util.Log; @@ -22,16 +23,14 @@ import net.i2p.util.Log; * */ public class PacketBuilder { - private RouterContext _context; + private I2PAppContext _context; private Log _log; - private UDPTransport _transport; private static final ByteCache _ivCache = ByteCache.getInstance(64, UDPPacket.IV_SIZE); - public PacketBuilder(RouterContext ctx, UDPTransport transport) { + public PacketBuilder(I2PAppContext ctx) { _context = ctx; _log = ctx.logManager().getLog(PacketBuilder.class); - _transport = transport; } public UDPPacket buildPacket(OutboundMessageState state, int fragment, PeerState peer) { @@ -136,7 +135,7 @@ public class PacketBuilder { * * @return ready to send packet, or null if there was a problem */ - public UDPPacket buildSessionCreatedPacket(InboundEstablishState state) { + public UDPPacket buildSessionCreatedPacket(InboundEstablishState state, int externalPort, SessionKey ourIntroKey) { UDPPacket packet = UDPPacket.acquire(_context); try { packet.getPacket().setAddress(InetAddress.getByAddress(state.getSentIP())); @@ -187,7 +186,7 @@ public class PacketBuilder { buf.append(" AliceIP: ").append(Base64.encode(state.getSentIP())); buf.append(" AlicePort: ").append(state.getSentPort()); buf.append(" BobIP: ").append(Base64.encode(state.getReceivedOurIP())); - buf.append(" BobPort: ").append(_transport.getExternalPort()); + buf.append(" BobPort: ").append(externalPort); buf.append(" RelayTag: ").append(state.getSentRelayTag()); buf.append(" SignedOn: ").append(state.getSentSignedOnTime()); buf.append(" signature: ").append(Base64.encode(state.getSentSignature().getData())); @@ -209,7 +208,7 @@ public class PacketBuilder { if ( (off % 16) != 0) off += 16 - (off % 16); packet.getPacket().setLength(off); - authenticate(packet, _transport.getIntroKey(), _transport.getIntroKey(), iv); + authenticate(packet, ourIntroKey, ourIntroKey, iv); setTo(packet, state.getSentIP(), state.getSentPort()); _ivCache.release(iv); return packet; @@ -279,8 +278,8 @@ public class PacketBuilder { * * @return ready to send packets, or null if there was a problem */ - public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState state) { - byte identity[] = _context.router().getRouterInfo().getIdentity().toByteArray(); + public UDPPacket[] buildSessionConfirmedPackets(OutboundEstablishState state, RouterIdentity ourIdentity) { + byte identity[] = ourIdentity.toByteArray(); int numFragments = identity.length / MAX_IDENTITY_FRAGMENT_SIZE; if (numFragments * MAX_IDENTITY_FRAGMENT_SIZE != identity.length) numFragments++; diff --git a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java index dacd2d8d77..b2f76c3df4 100644 --- a/router/java/src/net/i2p/router/transport/udp/PacketHandler.java +++ b/router/java/src/net/i2p/router/transport/udp/PacketHandler.java @@ -20,17 +20,16 @@ import net.i2p.util.Log; * receiver's queue and pushing them as necessary. * */ -public class PacketHandler implements Runnable { +public class PacketHandler { private RouterContext _context; private Log _log; private UDPTransport _transport; private UDPEndpoint _endpoint; - private UDPPacketReader _reader; private EstablishmentManager _establisher; private InboundMessageFragments _inbound; private boolean _keepReading; - private static final int NUM_HANDLERS = 3; + private static final int NUM_HANDLERS = 4; public PacketHandler(RouterContext ctx, UDPTransport transport, UDPEndpoint endpoint, EstablishmentManager establisher, InboundMessageFragments inbound) { _context = ctx; @@ -39,7 +38,6 @@ public class PacketHandler implements Runnable { _endpoint = endpoint; _establisher = establisher; _inbound = inbound; - _reader = new UDPPacketReader(ctx); _context.statManager().createRateStat("udp.handleTime", "How long it takes to handle a received packet after its been pulled off the queue", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.queueTime", "How long after a packet is received can we begin handling it", "udp", new long[] { 10*60*1000, 60*60*1000 }); _context.statManager().createRateStat("udp.receivePacketSkew", "How long ago after the packet was sent did we receive it", "udp", new long[] { 10*60*1000, 60*60*1000 }); @@ -48,7 +46,7 @@ public class PacketHandler implements Runnable { public void startup() { _keepReading = true; for (int i = 0; i < NUM_HANDLERS; i++) { - I2PThread t = new I2PThread(this, "Packet handler " + i + ": " + _endpoint.getListenPort()); + I2PThread t = new I2PThread(new Handler(), "Packet handler " + i + ": " + _endpoint.getListenPort()); t.setDaemon(true); t.start(); } @@ -58,30 +56,37 @@ public class PacketHandler implements Runnable { _keepReading = false; } - public void run() { - while (_keepReading) { - UDPPacket packet = _endpoint.receive(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received the packet " + packet); - long queueTime = packet.getLifetime(); - long handleStart = _context.clock().now(); - handlePacket(packet); - long handleTime = _context.clock().now() - handleStart; - _context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime()); - _context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime()); - - if (handleTime > 1000) { - if (_log.shouldLog(Log.WARN)) - _log.warn("Took " + handleTime + " to process the packet " - + packet + ": " + _reader); + private class Handler implements Runnable { + private UDPPacketReader _reader; + public Handler() { + _reader = new UDPPacketReader(_context); + } + + public void run() { + while (_keepReading) { + UDPPacket packet = _endpoint.receive(); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received the packet " + packet); + long queueTime = packet.getLifetime(); + long handleStart = _context.clock().now(); + handlePacket(_reader, packet); + long handleTime = _context.clock().now() - handleStart; + _context.statManager().addRateData("udp.handleTime", handleTime, packet.getLifetime()); + _context.statManager().addRateData("udp.queueTime", queueTime, packet.getLifetime()); + + if (handleTime > 1000) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Took " + handleTime + " to process the packet " + + packet + ": " + _reader); + } + + // back to the cache with thee! + packet.release(); } - - // back to the cache with thee! - packet.release(); } } - private void handlePacket(UDPPacket packet) { + private void handlePacket(UDPPacketReader reader, UDPPacket packet) { if (packet == null) return; InetAddress remAddr = packet.getPacket().getAddress(); @@ -94,7 +99,7 @@ public class PacketHandler implements Runnable { if (est != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received IS for an inbound establishment"); - receivePacket(packet, est); + receivePacket(reader, packet, est); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received is not for an inbound establishment"); @@ -102,22 +107,22 @@ public class PacketHandler implements Runnable { if (oest != null) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received IS for an outbound establishment"); - receivePacket(packet, oest); + receivePacket(reader, packet, oest); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received is not for an inbound or outbound establishment"); // ok, not already known establishment, try as a new one - receivePacket(packet); + receivePacket(reader, packet); } } } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received IS for an existing peer"); - receivePacket(packet, state); + receivePacket(reader, packet, state); } } - private void receivePacket(UDPPacket packet, PeerState state) { + private void receivePacket(UDPPacketReader reader, UDPPacket packet, PeerState state) { boolean isValid = packet.validate(state.getCurrentMACKey()); if (!isValid) { if (state.getNextMACKey() != null) @@ -147,10 +152,10 @@ public class PacketHandler implements Runnable { packet.decrypt(state.getCurrentCipherKey()); } - handlePacket(packet, state, null, null); + handlePacket(reader, packet, state, null, null); } - private void receivePacket(UDPPacket packet) { + private void receivePacket(UDPPacketReader reader, UDPPacket packet) { boolean isValid = packet.validate(_transport.getIntroKey()); if (!isValid) { if (_log.shouldLog(Log.WARN)) @@ -162,10 +167,10 @@ public class PacketHandler implements Runnable { } packet.decrypt(_transport.getIntroKey()); - handlePacket(packet, null, null, null); + handlePacket(reader, packet, null, null, null); } - private void receivePacket(UDPPacket packet, InboundEstablishState state) { + private void receivePacket(UDPPacketReader reader, UDPPacket packet, InboundEstablishState state) { if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { StringBuffer buf = new StringBuffer(128); buf.append("Attempting to receive a packet on a known inbound state: "); @@ -182,7 +187,7 @@ public class PacketHandler implements Runnable { _log.warn("Valid introduction packet received for inbound con: " + packet); packet.decrypt(state.getCipherKey()); - handlePacket(packet, null, null, null); + handlePacket(reader, packet, null, null, null); return; } else { if (_log.shouldLog(Log.WARN)) @@ -192,10 +197,10 @@ public class PacketHandler implements Runnable { } // ok, we couldn't handle it with the established stuff, so fall back // on earlier state packets - receivePacket(packet); + receivePacket(reader, packet); } - private void receivePacket(UDPPacket packet, OutboundEstablishState state) { + private void receivePacket(UDPPacketReader reader, UDPPacket packet, OutboundEstablishState state) { if ( (state != null) && (_log.shouldLog(Log.DEBUG)) ) { StringBuffer buf = new StringBuffer(128); buf.append("Attempting to receive a packet on a known outbound state: "); @@ -213,7 +218,7 @@ public class PacketHandler implements Runnable { _log.warn("Valid introduction packet received for outbound established con: " + packet); packet.decrypt(state.getCipherKey()); - handlePacket(packet, null, state, null); + handlePacket(reader, packet, null, state, null); return; } } @@ -224,7 +229,7 @@ public class PacketHandler implements Runnable { if (_log.shouldLog(Log.WARN)) _log.warn("Valid introduction packet received for outbound established con with old intro key: " + packet); packet.decrypt(state.getIntroKey()); - handlePacket(packet, null, state, null); + handlePacket(reader, packet, null, state, null); return; } else { if (_log.shouldLog(Log.WARN)) @@ -233,7 +238,7 @@ public class PacketHandler implements Runnable { // ok, we couldn't handle it with the established stuff, so fall back // on earlier state packets - receivePacket(packet); + receivePacket(reader, packet); } /** let packets be up to 30s slow */ @@ -242,10 +247,10 @@ public class PacketHandler implements Runnable { /** * Parse out the interesting bits and honor what it says */ - private void handlePacket(UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { - _reader.initialize(packet); + private void handlePacket(UDPPacketReader reader, UDPPacket packet, PeerState state, OutboundEstablishState outState, InboundEstablishState inState) { + reader.initialize(packet); long now = _context.clock().now(); - long when = _reader.readTimestamp() * 1000; + long when = reader.readTimestamp() * 1000; long skew = now - when; if (skew > GRACE_PERIOD) { if (_log.shouldLog(Log.WARN)) @@ -263,31 +268,27 @@ public class PacketHandler implements Runnable { int fromPort = packet.getPacket().getPort(); String from = PeerState.calculateRemoteHostString(fromHost.getAddress(), fromPort); - switch (_reader.readPayloadType()) { + switch (reader.readPayloadType()) { case UDPPacket.PAYLOAD_TYPE_SESSION_REQUEST: - _establisher.receiveSessionRequest(from, fromHost, fromPort, _reader); + _establisher.receiveSessionRequest(from, fromHost, fromPort, reader); break; case UDPPacket.PAYLOAD_TYPE_SESSION_CONFIRMED: - _establisher.receiveSessionConfirmed(from, _reader); + _establisher.receiveSessionConfirmed(from, reader); break; case UDPPacket.PAYLOAD_TYPE_SESSION_CREATED: - _establisher.receiveSessionCreated(from, _reader); + _establisher.receiveSessionCreated(from, reader); break; case UDPPacket.PAYLOAD_TYPE_DATA: if (outState != null) state = _establisher.receiveData(outState); - handleData(packet, state); + if (_log.shouldLog(Log.INFO)) + _log.info("Received new DATA packet from " + state + ": " + packet); + _inbound.receiveData(state, reader.getDataReader()); break; default: if (_log.shouldLog(Log.WARN)) - _log.warn("Unknown payload type: " + _reader.readPayloadType()); + _log.warn("Unknown payload type: " + reader.readPayloadType()); return; } } - - private void handleData(UDPPacket packet, PeerState peer) { - if (_log.shouldLog(Log.INFO)) - _log.info("Received new DATA packet from " + peer + ": " + packet); - _inbound.receiveData(peer, _reader.getDataReader()); - } } diff --git a/router/java/src/net/i2p/router/transport/udp/PeerState.java b/router/java/src/net/i2p/router/transport/udp/PeerState.java index 33cfde591a..93d8da5bfa 100644 --- a/router/java/src/net/i2p/router/transport/udp/PeerState.java +++ b/router/java/src/net/i2p/router/transport/udp/PeerState.java @@ -79,6 +79,14 @@ public class PeerState { private int _sendWindowBytes; /** how many bytes can we send to the peer in the current second */ private int _sendWindowBytesRemaining; + private long _lastSendRefill; + private long _lastCongestionOccurred; + /** + * when sendWindowBytes is below this, grow the window size quickly, + * but after we reach it, grow it slowly + * + */ + private int _slowStartThreshold; /** what IP is the peer sending and receiving packets on? */ private byte[] _remoteIP; /** what port is the peer sending and receiving packets on? */ @@ -108,7 +116,8 @@ public class PeerState { private static final int DEFAULT_SEND_WINDOW_BYTES = 16*1024; private static final int MINIMUM_WINDOW_BYTES = DEFAULT_SEND_WINDOW_BYTES; - private static final int DEFAULT_MTU = 512; + private static final int MAX_SEND_WINDOW_BYTES = 1024*1024; + private static final int DEFAULT_MTU = 1024; public PeerState(I2PAppContext ctx) { _context = ctx; @@ -130,6 +139,9 @@ public class PeerState { _remoteWantsPreviousACKs = false; _sendWindowBytes = DEFAULT_SEND_WINDOW_BYTES; _sendWindowBytesRemaining = DEFAULT_SEND_WINDOW_BYTES; + _slowStartThreshold = MAX_SEND_WINDOW_BYTES/2; + _lastSendRefill = _context.clock().now(); + _lastCongestionOccurred = -1; _remoteIP = null; _remotePort = -1; _remoteRequiresIntroduction = false; @@ -298,9 +310,9 @@ public class PeerState { */ public boolean allocateSendingBytes(int size) { long now = _context.clock().now(); - if (_lastSendTime > 0) { - if (_lastSendTime + 1000 <= now) - _sendWindowBytesRemaining = _sendWindowBytes; + if (_lastSendRefill + 1000 <= now) { + _sendWindowBytesRemaining = _sendWindowBytes; + _lastSendRefill = now; } if (size <= _sendWindowBytesRemaining) { _sendWindowBytesRemaining -= size; @@ -334,6 +346,7 @@ public class PeerState { _mtu = mtu; _mtuLastChecked = _context.clock().now(); } + public int getSlowStartThreshold() { return _slowStartThreshold; } /** we received the message specified completely */ public void messageFullyReceived(Long messageId) { @@ -350,9 +363,16 @@ public class PeerState { * */ public void congestionOccurred() { + long now = _context.clock().now(); + if (_lastCongestionOccurred + 2000 > now) + return; // only shrink once every other second + _lastCongestionOccurred = now; + _sendWindowBytes /= 2; if (_sendWindowBytes < MINIMUM_WINDOW_BYTES) _sendWindowBytes = MINIMUM_WINDOW_BYTES; + if (_sendWindowBytes < _slowStartThreshold) + _slowStartThreshold = _sendWindowBytes; } /** pull off the ACKs (Long) to send to the peer */ @@ -368,7 +388,15 @@ public class PeerState { /** we sent a message which was ACKed containing the given # of bytes */ public void messageACKed(int bytesACKed) { _consecutiveSendingSecondsWithoutACKs = 0; - _sendWindowBytes += bytesACKed; + if (_sendWindowBytes <= _slowStartThreshold) { + _sendWindowBytes += bytesACKed; + } else { + double prob = bytesACKed / _sendWindowBytes; + if (_context.random().nextDouble() <= prob) + _sendWindowBytes += bytesACKed; + } + if (_sendWindowBytes > MAX_SEND_WINDOW_BYTES) + _sendWindowBytes = MAX_SEND_WINDOW_BYTES; _lastReceiveTime = _context.clock().now(); _messagesSent++; } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java new file mode 100644 index 0000000000..c737e31e11 --- /dev/null +++ b/router/java/src/net/i2p/router/transport/udp/UDPFlooder.java @@ -0,0 +1,97 @@ +package net.i2p.router.transport.udp; + +import java.util.ArrayList; +import java.util.List; +import net.i2p.data.RouterInfo; +import net.i2p.data.i2np.DataMessage; +import net.i2p.data.i2np.I2NPMessage; +import net.i2p.router.OutNetMessage; +import net.i2p.router.RouterContext; +import net.i2p.util.I2PThread; +import net.i2p.util.Log; + +/** + * + */ +class UDPFlooder implements Runnable { + private RouterContext _context; + private Log _log; + private UDPTransport _transport; + private List _peers; + private boolean _alive; + + public UDPFlooder(RouterContext ctx, UDPTransport transport) { + _context = ctx; + _log = ctx.logManager().getLog(UDPFlooder.class); + _transport = transport; + _peers = new ArrayList(4); + } + + public void addPeer(PeerState peer) { + synchronized (_peers) { + _peers.add(peer); + _peers.notifyAll(); + } + } + + public void startup() { + _alive = true; + I2PThread t = new I2PThread(this, "flooder"); + t.setDaemon(true); + t.start(); + } + + public void shutdown() { + _alive = false; + synchronized (_peers) { + _peers.notifyAll(); + } + } + + public void run() { + while (_alive) { + try { + synchronized (_peers) { + if (_peers.size() <= 0) + _peers.wait(); + } + } catch (InterruptedException ie) {} + + // peers always grows, so this is fairly safe + for (int i = 0; i < _peers.size(); i++) { + PeerState peer = (PeerState)_peers.get(i); + DataMessage m = new DataMessage(_context); + byte data[] = new byte[4096]; + _context.random().nextBytes(data); + m.setData(data); + m.setMessageExpiration(_context.clock().now() + 10*1000); + m.setUniqueId(_context.random().nextLong(I2NPMessage.MAX_ID_VALUE)); + if (true) { + OutNetMessage msg = new OutNetMessage(_context); + msg.setMessage(m); + msg.setExpiration(m.getMessageExpiration()); + msg.setPriority(500); + RouterInfo to = _context.netDb().lookupRouterInfoLocally(peer.getRemotePeer()); + if (to == null) + continue; + msg.setTarget(to); + _context.statManager().getStatLog().addData(peer.getRemotePeer().toBase64().substring(0,6), "udp.floodDataSent", 1, 0); + + _transport.send(msg); + } else { + _transport.send(m, peer); + } + } + long floodDelay = calcFloodDelay(); + try { Thread.sleep(floodDelay); } catch (InterruptedException ie) {} + } + } + + private long calcFloodDelay() { + try { + return Long.parseLong(_context.getProperty("udp.floodDelay", "30000")); + } catch (Exception e) { + return 30*1000; + } + } +} diff --git a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java index b9cf54299e..6ca4fe24c0 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPPacketReader.java @@ -52,7 +52,7 @@ public class UDPPacketReader { /** what type of payload is in here? */ public int readPayloadType() { // 3 highest order bits == payload type - return _message[_payloadBeginOffset] >>> 4; + return (_message[_payloadBeginOffset] & 0xFF) >>> 4; } /** does this packet include rekeying data? */ @@ -106,6 +106,11 @@ public class UDPPacketReader { } } + public void toRawString(StringBuffer buf) { + if (_message != null) + buf.append(Base64.encode(_message, _payloadBeginOffset, _payloadLength)); + } + /** Help read the SessionRequest payload */ public class SessionRequestReader { public static final int X_LENGTH = 256; @@ -187,7 +192,7 @@ public class UDPPacketReader { /** which fragment is this? */ public int readCurrentFragmentNum() { int readOffset = readBodyOffset(); - return _message[readOffset] >>> 4; + return (_message[readOffset] & 0xFF) >>> 4; } /** how many fragments will there be? */ public int readTotalFragmentNum() { @@ -323,7 +328,7 @@ public class UDPPacketReader { public int readMessageFragmentNum(int fragmentNum) { int off = getFragmentBegin(fragmentNum); off += 4; // messageId - return _message[off] >>> 3; + return (_message[off] & 0xFF) >>> 3; } public boolean readMessageIsLast(int fragmentNum) { int off = getFragmentBegin(fragmentNum); @@ -434,7 +439,13 @@ public class UDPPacketReader { for (int i = 0; i < numFragments; i++) { buf.append("containing messageId "); buf.append(DataHelper.fromLong(_message, off, 4)); - off += 5; // messageId+info + off += 4; + int fragNum = (_message[off] & 0XFF) >>> 3; + boolean isLast = (_message[off] & (1 << 2)) != 0; + off++; + buf.append(" frag# ").append(fragNum); + buf.append(" isLast? ").append(isLast); + buf.append(" info ").append((int)_message[off-1]); int size = (int)DataHelper.fromLong(_message, off, 2); buf.append(" with ").append(size).append(" bytes"); buf.append(' '); @@ -444,5 +455,17 @@ public class UDPPacketReader { return buf.toString(); } + + public void toRawString(StringBuffer buf) { + UDPPacketReader.this.toRawString(buf); + buf.append(" payload: "); + + int off = getFragmentBegin(0); // first fragment + off += 4; // messageId + off++; // fragment info + int size = (int)DataHelper.fromLong(_message, off, 2); + off += 2; + buf.append(Base64.encode(_message, off, size)); + } } } diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java index e37c87036d..4fb1f7743a 100644 --- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java +++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java @@ -49,6 +49,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private OutboundRefiller _refiller; private PacketPusher _pusher; private InboundMessageFragments _inboundFragments; + private UDPFlooder _flooder; /** list of RelayPeer objects for people who will relay to us */ private List _relayPeers; @@ -83,6 +84,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority private static final int PRIORITY_LIMITS[] = new int[] { 100, 200, 300, 400, 500, 1000 }; /** configure the priority queue with the given weighting per priority group */ private static final int PRIORITY_WEIGHT[] = new int[] { 1, 1, 1, 1, 1, 2 }; + + /** should we flood all UDP peers with the configured rate? */ + private static final boolean SHOULD_FLOOD_PEERS = false; public UDPTransport(RouterContext ctx) { super(ctx); @@ -101,6 +105,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _fragments = new OutboundMessageFragments(_context, this); _inboundFragments = new InboundMessageFragments(_context, _fragments, this); + _flooder = new UDPFlooder(_context, this); } public void startup() { @@ -118,6 +123,8 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _refiller.shutdown(); if (_inboundFragments != null) _inboundFragments.shutdown(); + if (_flooder != null) + _flooder.shutdown(); _introKey = new SessionKey(new byte[SessionKey.KEYSIZE_BYTES]); System.arraycopy(_context.routerHash().getData(), 0, _introKey.getData(), 0, SessionKey.KEYSIZE_BYTES); @@ -165,6 +172,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority if (_refiller == null) _refiller = new OutboundRefiller(_context, _fragments, _outboundMessages); + if (_flooder == null) + _flooder = new UDPFlooder(_context, this); + _endpoint.startup(); _establisher.startup(); _handler.startup(); @@ -173,9 +183,12 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _pusher = new PacketPusher(_context, _fragments, _endpoint.getSender()); _pusher.startup(); _refiller.startup(); + _flooder.startup(); } public void shutdown() { + if (_flooder != null) + _flooder.shutdown(); if (_refiller != null) _refiller.shutdown(); if (_handler != null) @@ -296,6 +309,9 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority _context.shitlist().unshitlistRouter(peer.getRemotePeer()); + if (SHOULD_FLOOD_PEERS) + _flooder.addPeer(peer); + return true; } @@ -321,6 +337,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority public String getStyle() { return STYLE; } public void send(OutNetMessage msg) { + if (msg == null) return; + if (msg.getTarget() == null) return; + if (msg.getTarget().getIdentity() == null) return; + Hash to = msg.getTarget().getIdentity().calculateHash(); if (getPeerState(to) != null) { if (_log.shouldLog(Log.DEBUG)) @@ -471,7 +491,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append("<table border=\"1\">\n"); buf.append(" <tr><td><b>Peer</b></td><td><b>Location</b></td>\n"); buf.append(" <td><b>Last send</b></td><td><b>Last recv</b></td>\n"); - buf.append(" <td><b>Lifetime</b></td><td><b>Window size</b></td>\n"); + buf.append(" <td><b>Lifetime</b></td><td><b>cwnd</b></td><td><b>ssthresh</b></td>\n"); buf.append(" <td><b>Sent</b></td><td><b>Received</b></td>\n"); buf.append(" </tr>\n"); out.write(buf.toString()); @@ -517,6 +537,10 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority buf.append(peer.getSendWindowBytes()); buf.append("</td>"); + buf.append("<td>"); + buf.append(peer.getSlowStartThreshold()); + buf.append("</td>"); + buf.append("<td>"); buf.append(peer.getMessagesSent()); buf.append("</td>"); @@ -533,7 +557,6 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority out.write("</table>\n"); } - /** * Cache the bid to reduce object churn */ diff --git a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java index 76fd5a15a1..5db5495147 100644 --- a/router/java/src/net/i2p/router/tunnel/pool/TestJob.java +++ b/router/java/src/net/i2p/router/tunnel/pool/TestJob.java @@ -240,7 +240,8 @@ class TestJob extends JobImpl { } public String getName() { return "Tunnel test timeout"; } public void runJob() { - _log.error("Timeout: found? " + _found, getAddedBy()); + if (_log.shouldLog(Log.WARN)) + _log.warn("Timeout: found? " + _found, getAddedBy()); if (!_found) testFailed(getContext().clock().now() - _started); } -- GitLab