From 93180998451958e024e909d9ca773caf0b8f014b Mon Sep 17 00:00:00 2001 From: zzz Date: Tue, 24 Sep 2013 16:01:20 +0000 Subject: [PATCH] * Streaming: - Cleanups - Remove setCloseReceivedOn(), unused outside Connection - OR the isFlagSet parameter instead of multiple calls - Remove acked packets from _outboundPackets inside synced iterator - Short-circuit _outboundPackets iterator if empty - Small optimization if not logging in ConnectionPacketHandler - Stub out processing of close ack (ticket #1042) --- .../net/i2p/client/streaming/Connection.java | 51 +++++++++++-------- .../streaming/ConnectionPacketHandler.java | 20 +++++--- .../net/i2p/client/streaming/PacketLocal.java | 14 ++--- 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 6d088a2ef..6dcfba412 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -151,13 +151,24 @@ class Connection { return _lastSendId.incrementAndGet(); } - void closeReceived() { - if (setCloseReceivedOn(_context.clock().now())) { + /** + * Notify that a close was received + */ + public void closeReceived() { + if (_closeReceivedOn.compareAndSet(0, _context.clock().now())) { _inputStream.closeReceived(); synchronized (_connectLock) { _connectLock.notifyAll(); } } } + /** + * Notify that a close that we sent was acked + * @since 0.9.9 + */ + public void ourCloseAcked() { + // todo + } + /** * This doesn't "send a choke". Rather, it blocks if the outbound window is full, * thus choking the sender that calls this. @@ -276,15 +287,16 @@ class Connection { } /** - * got a packet we shouldn't have, send 'em a reset - * + * Got a packet we shouldn't have, send 'em a reset. + * More than one reset may be sent. */ - void sendReset() { + public void sendReset() { scheduleDisconnectEvent(); long now = _context.clock().now(); if (_resetSentOn.get() + 10*1000 > now) return; // don't send resets too fast if (_resetReceived.get()) return; - _resetSentOn.compareAndSet(0, now); + // Unconditionally set + _resetSentOn.set(now); if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return; PacketLocal reply = new PacketLocal(_context, _remotePeer); reply.setFlag(Packet.FLAG_RESET); @@ -411,9 +423,9 @@ class Connection { /** * Process the acks and nacks received in a packet - * @return List of packets acked or null + * @return List of packets acked for the first time, or null if none */ - List ackPackets(long ackThrough, long nacks[]) { + public List ackPackets(long ackThrough, long nacks[]) { // FIXME synch this part too? if (ackThrough < _highestAckedThrough) { // dupack which won't tell us anything @@ -433,7 +445,9 @@ class Connection { List acked = null; synchronized (_outboundPackets) { - for (Map.Entry e : _outboundPackets.entrySet()) { + if (!_outboundPackets.isEmpty()) { // short circuit iterator + for (Iterator> iter = _outboundPackets.entrySet().iterator(); iter.hasNext(); ) { + Map.Entry e = iter.next(); long id = e.getKey().longValue(); if (id <= ackThrough) { boolean nacked = false; @@ -451,10 +465,11 @@ class Connection { } if (!nacked) { // aka ACKed if (acked == null) - acked = new ArrayList(1); + acked = new ArrayList(8); PacketLocal ackedPacket = e.getValue(); ackedPacket.ackReceived(); acked.add(ackedPacket); + iter.remove(); } } else { // TODO @@ -475,11 +490,12 @@ class Connection { //nackedPacket.incrementNACKs(); break; // _outboundPackets is ordered } - } + } // for + } // !isEmpty() if (acked != null) { for (int i = 0; i < acked.size(); i++) { PacketLocal p = acked.get(i); - _outboundPackets.remove(Long.valueOf(p.getSequenceNum())); + // removed from _outboundPackets above in iterator _ackedPackets++; if (p.getNumSends() > 1) { _activeResends.decrementAndGet(); @@ -529,7 +545,8 @@ class Connection { _log.warn("Took " + elapsed + "ms to pump through " + sched + " on " + toString()); } - void resetReceived() { + /** notify that a reset was received */ + public void resetReceived() { if (!_resetReceived.compareAndSet(false, true)) return; scheduleDisconnectEvent(); @@ -815,6 +832,7 @@ class Connection { /** @return 0 if not sent */ public long getCloseSentOn() { return _closeSentOn.get(); } + /** notify that a close was sent */ public void setCloseSentOn(long when) { if (_closeSentOn.compareAndSet(0, when)) scheduleDisconnectEvent(); @@ -823,13 +841,6 @@ class Connection { /** @return 0 if not received */ public long getCloseReceivedOn() { return _closeReceivedOn.get(); } - /** - * @return true if the first close received, false otherwise - */ - public boolean setCloseReceivedOn(long when) { - return _closeReceivedOn.compareAndSet(0, when); - } - public void updateShareOpts() { if (_closeSentOn.get() > 0 && !_updatedShareOpts) { _connectionManager.updateShareOpts(this); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 333a4086b..b49ff104b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -54,7 +54,7 @@ class ConnectionPacketHandler { if (con.getHardDisconnected()) { if ( (packet.getSequenceNum() > 0) || (packet.getPayloadSize() > 0) || - (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) || (packet.isFlagSet(Packet.FLAG_CLOSE)) ) { + (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE)) ) { if (_log.shouldLog(Log.WARN)) _log.warn("Received a data packet after hard disconnect: " + packet + " on " + con); con.sendReset(); @@ -305,6 +305,8 @@ class ConnectionPacketHandler { _context.statManager().addRateData("stream.sendsBeforeAck", numSends, ackTime); + if (p.isFlagSet(Packet.FLAG_CLOSE)) + con.ourCloseAcked(); // ACK the tags we delivered so we can use them //if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null) @@ -317,15 +319,18 @@ class ConnectionPacketHandler { _log.debug("Packet acked after " + ackTime + "ms: " + p); } if (highestRTT > 0) { - int oldrtt = con.getOptions().getRTT(); - int oldrto = con.getOptions().getRTO(); - int olddev = con.getOptions().getRTTDev(); - con.getOptions().updateRTT(highestRTT); - if (_log.shouldLog(Log.INFO)) + if (_log.shouldLog(Log.INFO)) { + int oldrtt = con.getOptions().getRTT(); + int oldrto = con.getOptions().getRTO(); + int olddev = con.getOptions().getRTTDev(); + con.getOptions().updateRTT(highestRTT); _log.info("acked: " + acked.size() + " highestRTT: " + highestRTT + " RTT: " + oldrtt + " -> " + con.getOptions().getRTT() + " RTO: " + oldrto + " -> " + con.getOptions().getRTO() + " Dev: " + olddev + " -> " + con.getOptions().getRTTDev()); + } else { + con.getOptions().updateRTT(highestRTT); + } if (firstAck) { if (con.isInbound()) _context.statManager().addRateData("stream.con.initialRTT.in", highestRTT, 0); @@ -542,8 +547,7 @@ class ConnectionPacketHandler { private void verifySignature(Packet packet, Connection con) throws I2PException { // verify the signature if necessary if (con.getOptions().getRequireFullySigned() || - packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) || - packet.isFlagSet(Packet.FLAG_CLOSE) ) { + packet.isFlagSet(Packet.FLAG_SYNCHRONIZE | Packet.FLAG_CLOSE)) { // we need a valid signature Destination from = con.getRemotePeer(); if (from == null) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 1597504ff..b35e2f3d2 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -89,10 +89,10 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { } public boolean shouldSign() { - return isFlagSet(FLAG_SIGNATURE_INCLUDED) || - isFlagSet(FLAG_SYNCHRONIZE) || - isFlagSet(FLAG_CLOSE) || - isFlagSet(FLAG_ECHO); + return isFlagSet(FLAG_SIGNATURE_INCLUDED | + FLAG_SYNCHRONIZE | + FLAG_CLOSE | + FLAG_ECHO); } /** last minute update of ack fields, just before write/sign */ @@ -209,9 +209,9 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { if (_numSends > 1) buf.append(" sent ").append(_numSends).append(" times"); - if (isFlagSet(Packet.FLAG_SYNCHRONIZE) || - isFlagSet(Packet.FLAG_CLOSE) || - isFlagSet(Packet.FLAG_RESET)) { + if (isFlagSet(FLAG_SYNCHRONIZE | + FLAG_CLOSE | + FLAG_RESET)) { if (con != null) { buf.append(" from ");