From 258244fed8a2a0069404de2f17590d8c0b8bb887 Mon Sep 17 00:00:00 2001 From: jrandom Date: Sat, 13 Nov 2004 09:49:31 +0000 Subject: [PATCH] * ack packets with a payload, even if they have ID=0 (duh) * properly implement the connection timeout * make sure we clear the outbound packets on close * don't b0rk on repeated close() calls --- .../net/i2p/client/streaming/Connection.java | 11 +++++- .../client/streaming/ConnectionManager.java | 35 ++++++++++++++----- .../streaming/ConnectionPacketHandler.java | 4 +-- .../i2p/client/streaming/I2PSocketFull.java | 2 +- .../client/streaming/MessageOutputStream.java | 14 +++++++- .../src/net/i2p/client/streaming/Packet.java | 3 ++ .../i2p/client/streaming/PacketHandler.java | 26 +++++++------- .../net/i2p/client/streaming/PacketQueue.java | 10 +++++- 8 files changed, 77 insertions(+), 28 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index af5b6bc59f..f64ecce99f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -68,7 +68,7 @@ public class Connection { /** wait up to 5 minutes after disconnection so we can ack/close packets */ public static int DISCONNECT_TIMEOUT = 5*60*1000; - /** lets be sane.. no more than 32 packets in the air in each dir */ + /** lets be sane- no more than 32 packets in the air in each dir */ public static final int MAX_WINDOW_SIZE = 32; public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { @@ -350,6 +350,15 @@ public class Connection { + toString()); _connectionManager.removeConnection(this); } + + synchronized (_outboundPackets) { + for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { + PacketLocal pl = (PacketLocal)iter.next(); + pl.cancelled(); + } + _outboundPackets.clear(); + _outboundPackets.notifyAll(); + } } private class DisconnectEvent implements SimpleTimer.TimedEvent { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 8f75886665..2fc679e2ad 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -10,6 +10,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PSession; import net.i2p.data.ByteArray; +import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.data.SessionKey; import net.i2p.util.SimpleTimer; @@ -63,6 +64,20 @@ public class ConnectionManager { return (Connection)_connectionByInboundId.get(new ByteArray(id)); } } + /** + * not guaranteed to be unique, but in case we receive more than one packet + * on an inbound connection that we havent ack'ed yet... + */ + Connection getConnectionByOutboundId(byte[] id) { + synchronized (_connectionLock) { + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (DataHelper.eq(con.getSendStreamId(), id)) + return con; + } + } + return null; + } public void setAllowIncomingConnections(boolean allow) { _connectionHandler.setActive(allow); @@ -138,21 +153,20 @@ public class ConnectionManager { public Connection connect(Destination peer, ConnectionOptions opts) { Connection con = null; byte receiveId[] = new byte[4]; + _context.random().nextBytes(receiveId); long expiration = _context.clock().now() + opts.getConnectTimeout(); if (opts.getConnectTimeout() <= 0) expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; _numWaiting++; while (true) { - if (expiration < _context.clock().now()) { + long remaining = expiration - _context.clock().now(); + if (remaining <= 0) { if (_log.shouldLog(Log.WARN)) _log.warn("Refusing to connect since we have exceeded our max of " + _maxConcurrentStreams + " connections"); _numWaiting--; return null; } - con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); - con.setRemotePeer(peer); - _context.random().nextBytes(receiveId); boolean reject = false; synchronized (_connectionLock) { if (locked_tooManyStreams()) { @@ -165,18 +179,21 @@ public class ConnectionManager { _numWaiting--; return null; } - - reject = true; + + // no remaining streams, lets wait a bit + try { _connectionLock.wait(remaining); } catch (InterruptedException ie) {} } else { + con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + con.setRemotePeer(peer); + ByteArray ba = new ByteArray(receiveId); while (_connectionByInboundId.containsKey(ba)) { _context.random().nextBytes(receiveId); } _connectionByInboundId.put(ba, con); + break; // stop looping as a psuedo-wait } } - if (!reject) - break; } // ok we're in... @@ -223,12 +240,14 @@ public class ConnectionManager { con.disconnect(false, false); } _connectionByInboundId.clear(); + _connectionLock.notifyAll(); } } public void removeConnection(Connection con) { synchronized (_connectionLock) { _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId())); + _connectionLock.notifyAll(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index c0e5647f4e..b135822468 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -56,9 +56,9 @@ public class ConnectionPacketHandler { _log.debug("Scheduling ack in " + delay + "ms for received packet " + packet); } } else { - if (packet.getSequenceNum() > 0) { + if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) ) { // take note of congestion - con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); + //con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); if (_log.shouldLog(Log.WARN)) _log.warn("congestion.. dup " + packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index a7877d713a..089733c0f1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -23,7 +23,7 @@ public class I2PSocketFull implements I2PSocket { _connection.getOutputStream().close(); _connection.disconnect(true); } else { - throw new IOException("Not connected"); + //throw new IOException("Not connected"); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index aa29c62828..a1f37ab045 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -5,6 +5,8 @@ import java.io.InterruptedIOException; import java.io.OutputStream; import net.i2p.I2PAppContext; +import net.i2p.data.ByteArray; +import net.i2p.util.ByteCache; import net.i2p.util.Log; /** @@ -21,15 +23,17 @@ public class MessageOutputStream extends OutputStream { private boolean _closed; private long _written; private int _writeTimeout; + private ByteCache _dataCache; public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); } public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) { super(); + _dataCache = ByteCache.getInstance(128, bufSize); _context = ctx; _log = ctx.logManager().getLog(MessageOutputStream.class); - _buf = new byte[bufSize]; + _buf = _dataCache.acquire().getData(); // new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); _written = 0; @@ -130,6 +134,10 @@ public class MessageOutputStream extends OutputStream { _closed = true; flush(); _log.debug("Output stream closed after writing " + _written); + if (_buf != null) { + _dataCache.release(new ByteArray(_buf)); + _buf = null; + } } public void closeInternal() { _closed = true; @@ -143,6 +151,10 @@ public class MessageOutputStream extends OutputStream { } _dataLock.notifyAll(); } + if (_buf != null) { + _dataCache.release(new ByteArray(_buf)); + _buf = null; + } } public boolean getClosed() { return _closed; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 9cbd27b92b..914ed5841e 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -206,6 +206,9 @@ public class Packet { if ( (payload != null) && (payload.length > MAX_PAYLOAD_SIZE) ) throw new IllegalArgumentException("Too large payload: " + payload.length); } + public int getPayloadSize() { + return (_payload == null ? 0 : _payload.length); + } /** is a particular flag set on this packet? */ public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index fba0302709..9db51eb9cf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -175,20 +175,18 @@ public class PacketHandler { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received on an unknown stream (and not an ECHO): " + packet); if (sendId == null) { - for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) { - Connection con = (Connection)iter.next(); - if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) { - if (con.getAckedPackets() <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("Received additional packets before the syn on " + con + ": " + packet); - receiveKnownCon(con, packet); - return; - } else { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets()); - receiveKnownCon(con, packet); - return; - } + Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId()); + if (con != null) { + if (con.getAckedPackets() <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received additional packets before the syn on " + con + ": " + packet); + receiveKnownCon(con, packet); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("hrmph, received while ack of syn was in flight on " + con + ": " + packet + " acked: " + con.getAckedPackets()); + receiveKnownCon(con, packet); + return; } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index f01da61d36..6597865d7f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -7,7 +7,9 @@ import java.util.HashSet; import net.i2p.I2PAppContext; import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; +import net.i2p.data.ByteArray; import net.i2p.data.SessionKey; +import net.i2p.util.ByteCache; import net.i2p.util.Log; /** @@ -18,14 +20,20 @@ class PacketQueue { private Log _log; private I2PSession _session; private byte _buf[]; + private ByteCache _cache = ByteCache.getInstance(64, 36*1024); public PacketQueue(I2PAppContext context, I2PSession session) { _context = context; _session = session; - _buf = new byte[36*1024]; + _buf = _cache.acquire().getData(); // new byte[36*1024]; _log = context.logManager().getLog(PacketQueue.class); } + protected void finalize() throws Throwable { + _cache.release(new ByteArray(_buf)); + super.finalize(); + } + /** * Add a new packet to be sent out ASAP */