From 74a57abfb456db7f115450d443d032d869bbcc84 Mon Sep 17 00:00:00 2001 From: zzz <zzz@mail.i2p> Date: Sat, 12 Oct 2013 17:39:49 +0000 Subject: [PATCH] Streaming: - Make I2PSocketFull.close() nonblocking; it will now cause any user-side writes blocked in I/O (Connection.packetSendChoke()) to throw an exception (tickets #629, #1041) - Don't ignore InterruptedExceptions; throw InterruptedIOException - Back out static disconnect exception - MessageInputStream locking fixes - Cleanups I2PSnark: - Close socket before closing output stream to avoid blocking in Peer.disconnect(), and prevent Peer.disconnect() loop --- .../java/src/org/klomp/snark/Peer.java | 29 ++++++-- .../src/org/klomp/snark/PeerConnectionIn.java | 2 +- .../org/klomp/snark/PeerConnectionOut.java | 7 +- .../net/i2p/client/streaming/Connection.java | 31 ++++++--- .../streaming/ConnectionPacketHandler.java | 5 +- .../i2p/client/streaming/I2PSocketFull.java | 21 ++++-- .../client/streaming/MessageInputStream.java | 32 ++++++--- .../client/streaming/MessageOutputStream.java | 53 +++++++++----- .../src/net/i2p/client/streaming/Packet.java | 9 +++ .../net/i2p/client/streaming/PacketLocal.java | 69 +++++++++++-------- 10 files changed, 176 insertions(+), 82 deletions(-) diff --git a/apps/i2psnark/java/src/org/klomp/snark/Peer.java b/apps/i2psnark/java/src/org/klomp/snark/Peer.java index cdcac33432..821c39db8d 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/Peer.java +++ b/apps/i2psnark/java/src/org/klomp/snark/Peer.java @@ -28,6 +28,8 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; @@ -68,8 +70,10 @@ public class Peer implements Comparable private I2PSocket sock; private boolean deregister = true; - private static long __id; - private long _id; + private static final AtomicLong __id = new AtomicLong(); + private final long _id; + private final AtomicBoolean _disconnected = new AtomicBoolean(); + final static long CHECK_PERIOD = PeerCoordinator.CHECK_PERIOD; // 40 seconds final static int RATE_DEPTH = PeerCoordinator.RATE_DEPTH; // make following arrays RATE_DEPTH long private long uploaded_old[] = {-1,-1,-1}; @@ -98,7 +102,7 @@ public class Peer implements Comparable this.my_id = my_id; this.infohash = infohash; this.metainfo = metainfo; - _id = ++__id; + _id = __id.incrementAndGet(); //_log.debug("Creating a new peer with " + peerID.toString(), new Exception("creating")); } @@ -123,7 +127,7 @@ public class Peer implements Comparable byte[] id = handshake(in, out); this.peerID = new PeerID(id, sock.getPeerDestination()); - _id = ++__id; + _id = __id.incrementAndGet(); if (_log.shouldLog(Log.DEBUG)) _log.debug("Creating a new peer " + peerID.toString(), new Exception("creating " + _id)); } @@ -457,6 +461,8 @@ public class Peer implements Comparable void disconnect() { + if (!_disconnected.compareAndSet(false, true)) + return; PeerState s = state; if (s != null) { @@ -476,9 +482,11 @@ public class Peer implements Comparable PeerConnectionIn in = s.in; if (in != null) in.disconnect(); - PeerConnectionOut out = s.out; - if (out != null) - out.disconnect(); + // this is blocking in streaming, so do this after closing the socket + // so it won't really block + //PeerConnectionOut out = s.out; + //if (out != null) + // out.disconnect(); PeerListener pl = s.listener; if (pl != null) pl.disconnected(this); @@ -492,6 +500,13 @@ public class Peer implements Comparable _log.warn("Error disconnecting " + toString(), ioe); } } + if (s != null) { + // this is blocking in streaming, so do this after closing the socket + // so it won't really block + PeerConnectionOut out = s.out; + if (out != null) + out.disconnect(); + } } /** diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java index c92e30bf96..32ed0473ce 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionIn.java @@ -65,7 +65,7 @@ class PeerConnectionIn implements Runnable try { din.close(); } catch (IOException ioe) { - _log.warn("Error closing the stream from " + peer, ioe); + //_log.warn("Error closing the stream from " + peer, ioe); } } } diff --git a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java index dc64f885f1..9d6862a734 100644 --- a/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java +++ b/apps/i2psnark/java/src/org/klomp/snark/PeerConnectionOut.java @@ -56,7 +56,6 @@ class PeerConnectionOut implements Runnable _id = ++__id; lastSent = System.currentTimeMillis(); - quit = false; } public void startup() { @@ -66,7 +65,7 @@ class PeerConnectionOut implements Runnable /** * Continuesly monitors for more outgoing messages that have to be send. - * Stops if quit is true of an IOException occurs. + * Stops if quit is true or an IOException occurs. */ public void run() { @@ -215,13 +214,13 @@ class PeerConnectionOut implements Runnable thread.interrupt(); sendQueue.clear(); - sendQueue.notify(); + sendQueue.notifyAll(); } if (dout != null) { try { dout.close(); } catch (IOException ioe) { - _log.warn("Error closing the stream to " + peer, ioe); + //_log.warn("Error closing the stream to " + peer, ioe); } } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 4bb2ebc720..fbc99d4399 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -168,7 +168,7 @@ class Connection { * @return true if the packet should be sent, false for a fatal error * will return false after 5 minutes even if timeoutMs is <= 0. */ - boolean packetSendChoke(long timeoutMs) { + public boolean packetSendChoke(long timeoutMs) throws IOException, InterruptedException { long start = _context.clock().now(); long writeExpire = start + timeoutMs; // only used if timeoutMs > 0 boolean started = false; @@ -183,8 +183,10 @@ class Connection { // no need to wait until the other side has ACKed us before sending the first few wsize // packets through // Incorrect assumption, the constructor defaults _connected to true --Sponge - if (!_connected.get()) - return false; + if (!_connected.get()) + throw new IOException("disconnected"); + if (_outputStream.getClosed()) + throw new IOException("output stream closed"); started = true; // Try to keep things moving even during NACKs and retransmissions... // Limit unacked packets to the window @@ -207,12 +209,24 @@ class Connection { if (_log.shouldLog(Log.DEBUG)) _log.debug("Outbound window is full (" + unacked + "/" + wsz + "/" + _activeResends + "), waiting " + timeLeft); - try { _outboundPackets.wait(Math.min(timeLeft,250l)); } catch (InterruptedException ie) { if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); return false;} + try { + _outboundPackets.wait(Math.min(timeLeft,250l)); + } catch (InterruptedException ie) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends +")"); + throw ie; + } } else { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends // + "), waiting indefinitely"); - try { _outboundPackets.wait(250); } catch (InterruptedException ie) {if (_log.shouldLog(Log.DEBUG)) _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); return false;} //10*1000 + try { + _outboundPackets.wait(250); + } catch (InterruptedException ie) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("InterruptedException while Outbound window is full (" + _outboundPackets.size() + "/" + _activeResends + ")"); + throw ie; + } //10*1000 } } else { _context.statManager().addRateData("stream.chokeSizeEnd", _outboundPackets.size(), _context.clock().now() - start); @@ -222,6 +236,9 @@ class Connection { } } + /** + * Notify all threads waiting in packetSendChoke() + */ void windowAdjusted() { synchronized (_outboundPackets) { _outboundPackets.notifyAll(); @@ -710,8 +727,6 @@ class Connection { } } - private static final IOException DISCON_IOE = new IOException("disconnected!"); - /** * Must be called when we are done with this connection. * Final disconnect. Remove from conn manager. @@ -729,7 +744,7 @@ class Connection { _outputStream.destroy(); _receiver.destroy(); _activityTimer.cancel(); - _inputStream.streamErrorOccurred(DISCON_IOE); + _inputStream.streamErrorOccurred(new IOException("disconnected")); if (_log.shouldLog(Log.INFO)) _log.info("Connection disconnect complete: " diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 31360c8280..2ac2ba96e5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -4,6 +4,7 @@ import java.util.List; import net.i2p.I2PAppContext; import net.i2p.I2PException; +import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; @@ -200,8 +201,8 @@ class ConnectionPacketHandler { final long lastSendTime = con.getLastSendTime(); if (_log.shouldLog(Log.WARN)) - _log.warn(String.format("%s congestion.. dup packet %s now %d ackDelay %d lastSend %d", - con, packet, now, ackDelay, lastSendTime)); + _log.warn(String.format("%s congestion.. dup packet %s ackDelay %d lastSend %d ago", + con, packet, now, ackDelay, DataHelper.formatDuration(now - lastSendTime))); final long nextSendTime = lastSendTime + ackDelay; if (nextSendTime <= now) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java index 08c9aaf02b..3652bcf4ef 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -33,6 +33,15 @@ class I2PSocketFull implements I2PSocket { _remotePeer = _localPeer = null; } + /** + * Closes this socket. + * + * Nonblocking as of 0.9.9: + * Any thread currently blocked in an I/O operation upon this socket will throw an IOException. + * Once a socket has been closed, it is not available for further networking use + * (i.e. can't be reconnected or rebound). A new socket needs to be created. + * Closing this socket will also close the socket's InputStream and OutputStream. + */ public void close() throws IOException { if (!_closed.compareAndSet(false,true)) { // log a trace to find out why @@ -42,15 +51,13 @@ class I2PSocketFull implements I2PSocket { Connection c = _connection; if (c == null) return; if (c.getIsConnected()) { - OutputStream out = c.getOutputStream(); - try { - out.close(); - } catch (IOException ioe) { - // ignore any write error, as we want to keep on and kill the - // con (thanks Complication!) - } MessageInputStream in = c.getInputStream(); in.close(); + MessageOutputStream out = c.getOutputStream(); + out.closeInternal(); + // this will cause any thread waiting in Connection.packetSendChoke() + // to throw an IOE + c.windowAdjusted(); } else { //throw new IOException("Not connected"); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 27a90bab22..792faa5b90 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -2,6 +2,7 @@ package net.i2p.client.streaming; import java.io.IOException; import java.io.InputStream; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -287,12 +288,12 @@ class MessageInputStream extends InputStream { @Override public int read(byte target[], int offset, int length) throws IOException { - if (_locallyClosed) throw new IOException("Already locally closed"); - throwAnyError(); long expiration = -1; if (_readTimeout > 0) expiration = _readTimeout + System.currentTimeMillis(); synchronized (_dataLock) { + if (_locallyClosed) throw new IOException("Already locally closed"); + throwAnyError(); for (int i = 0; i < length; i++) { if ( (_readyDataBlocks.isEmpty()) && (i == 0) ) { // ok, we havent found anything, so lets block until we get @@ -312,7 +313,13 @@ class MessageInputStream extends InputStream { if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i + ") with no timeout: " + toString()); - try { _dataLock.wait(); } catch (InterruptedException ie) { } + try { + _dataLock.wait(); + } catch (InterruptedException ie) { + IOException ioe2 = new InterruptedIOException("Interrupted read"); + ioe2.initCause(ie); + throw ioe2; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i + ") with no timeout complete: " + toString()); @@ -321,7 +328,13 @@ class MessageInputStream extends InputStream { if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i + ") with timeout: " + _readTimeout + ": " + toString()); - try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { } + try { + _dataLock.wait(_readTimeout); + } catch (InterruptedException ie) { + IOException ioe2 = new InterruptedIOException("Interrupted read"); + ioe2.initCause(ie); + throw ioe2; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("read(...," + offset+", " + length+ ")[" + i + ") with timeout complete: " + _readTimeout + ": " + toString()); @@ -382,10 +395,10 @@ class MessageInputStream extends InputStream { @Override public int available() throws IOException { - if (_locallyClosed) throw new IOException("Already closed"); - throwAnyError(); int numBytes = 0; synchronized (_dataLock) { + if (_locallyClosed) throw new IOException("Already closed"); + throwAnyError(); for (int i = 0; i < _readyDataBlocks.size(); i++) { ByteArray cur = _readyDataBlocks.get(i); if (i == 0) @@ -467,14 +480,15 @@ class MessageInputStream extends InputStream { * */ void streamErrorOccurred(IOException ioe) { - if (_streamError == null) - _streamError = ioe; - _locallyClosed = true; synchronized (_dataLock) { + if (_streamError == null) + _streamError = ioe; + _locallyClosed = true; _dataLock.notifyAll(); } } + /** Caller must lock _dataLock */ private void throwAnyError() throws IOException { IOException ioe = _streamError; if (ioe != null) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 663f9e3667..5afb37358a 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -89,15 +89,17 @@ class MessageOutputStream extends OutputStream { _writeTimeout = ms; } + public int getWriteTimeout() { return _writeTimeout; } + public void setBufferSize(int size) { _nextBufferSize = size; } - @Override + @Override public void write(byte b[]) throws IOException { write(b, 0, b.length); } - @Override + @Override public void write(byte b[], int off, int len) throws IOException { if (_closed.get()) throw new IOException("Already closed"); if (_log.shouldLog(Log.DEBUG)) @@ -150,7 +152,13 @@ class MessageOutputStream extends OutputStream { // ok, we've actually added a new packet - lets wait until // its accepted into the queue before moving on (so that we // dont fill our buffer instantly) - ws.waitForAccept(_writeTimeout); + try { + ws.waitForAccept(_writeTimeout); + } catch (InterruptedException ie) { + IOException ioe2 = new InterruptedIOException("Interrupted write"); + ioe2.initCause(ie); + throw ioe2; + } if (!ws.writeAccepted()) { if (_log.shouldLog(Log.WARN)) _log.warn("Write not accepted of " + ws); @@ -296,7 +304,7 @@ class MessageOutputStream extends OutputStream { * * @throws IOException if the write fails */ - @Override + @Override public void flush() throws IOException { /* @throws InterruptedIOException if the write times out * Documented here, but doesn't belong in the javadoc. @@ -343,14 +351,20 @@ class MessageOutputStream extends OutputStream { // Wait a loooooong time, until we have the ACK if (_log.shouldLog(Log.DEBUG)) _log.debug("before waiting " + _writeTimeout + "ms for completion of " + ws); - if (_closed.get() && - ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) || - (_writeTimeout <= 0) ) ) - ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); - else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ) - ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); - else - ws.waitForCompletion(_writeTimeout); + try { + if (_closed.get() && + ( (_writeTimeout > Connection.DISCONNECT_TIMEOUT) || + (_writeTimeout <= 0) ) ) + ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); + else if ( (_writeTimeout <= 0) || (_writeTimeout > Connection.DISCONNECT_TIMEOUT) ) + ws.waitForCompletion(Connection.DISCONNECT_TIMEOUT); + else + ws.waitForCompletion(_writeTimeout); + } catch (InterruptedException ie) { + IOException ioe2 = new InterruptedIOException("Interrupted flush"); + ioe2.initCause(ie); + throw ioe2; + } if (_log.shouldLog(Log.DEBUG)) _log.debug("after waiting " + _writeTimeout + "ms for completion of " + ws); if (ws.writeFailed() && (_writeTimeout > 0) ) @@ -466,6 +480,7 @@ class MessageOutputStream extends OutputStream { void flushAvailable(DataReceiver target) throws IOException { flushAvailable(target, true); } + void flushAvailable(DataReceiver target, boolean blocking) throws IOException { WriteStatus ws = null; long before = System.currentTimeMillis(); @@ -487,7 +502,13 @@ class MessageOutputStream extends OutputStream { _log.debug("Took " + (afterBuild-before) + "ms to build a packet? " + ws); if (blocking && ws != null) { - ws.waitForAccept(_writeTimeout); + try { + ws.waitForAccept(_writeTimeout); + } catch (InterruptedException ie) { + IOException ioe2 = new InterruptedIOException("Interrupted flush"); + ioe2.initCause(ie); + throw ioe2; + } if (ws.writeFailed()) throw new IOException("Flush available failed"); else if (!ws.writeAccepted()) @@ -526,7 +547,7 @@ class MessageOutputStream extends OutputStream { * Success means an ACK FROM THE FAR END. * @param maxWaitMs -1 = forever */ - public void waitForCompletion(int maxWaitMs); + public void waitForCompletion(int maxWaitMs) throws IOException, InterruptedException; /** * Wait until the data written is accepted into the outbound pool, @@ -534,9 +555,9 @@ class MessageOutputStream extends OutputStream { * which we throttle rather than accept arbitrary data and queue * @param maxWaitMs -1 = forever */ - public void waitForAccept(int maxWaitMs); + public void waitForAccept(int maxWaitMs) throws IOException, InterruptedException; - /** the write was accepted. aka did the socket not close? */ + /** Was the write was accepted. aka did the socket not close? */ public boolean writeAccepted(); /** did the write fail? */ public boolean writeFailed(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 6e10805f39..6fbe1dcce4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -254,6 +254,7 @@ class Packet { * @return Delay before resending a packet in seconds. */ public int getResendDelay() { return _resendDelay; } + /** * Unused. * Broken before release 0.7.8 @@ -267,17 +268,22 @@ class Packet { * @return the payload of the message, null if none. */ public ByteArray getPayload() { return _payload; } + public void setPayload(ByteArray payload) { _payload = payload; if ( (payload != null) && (payload.getValid() > MAX_PAYLOAD_SIZE) ) throw new IllegalArgumentException("Too large payload: " + payload.getValid()); } + public int getPayloadSize() { return (_payload == null ? 0 : _payload.getValid()); } + + /** does nothing right now */ public void releasePayload() { //_payload = null; } + public ByteArray acquirePayload() { _payload = new ByteArray(new byte[Packet.MAX_PAYLOAD_SIZE]); return _payload; @@ -288,13 +294,16 @@ class Packet { * @return true if set, false if not. */ public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } + public void setFlag(int flag) { _flags |= flag; } + public void setFlag(int flag, boolean set) { if (set) _flags |= flag; else _flags &= ~flag; } + public void setFlags(int flags) { _flags = flags; } /** the signature on the packet (only included if the flag for it is set) diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index 2ed3dd8440..7012b354ca 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -140,6 +140,7 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { if (_log.shouldLog(Log.DEBUG)) _log.debug("Cancelled! " + toString(), new Exception("cancelled")); } + public SimpleTimer2.TimedEvent getResendEvent() { return _resendEvent; } /** how long after packet creation was it acked? @@ -230,59 +231,71 @@ class PacketLocal extends Packet implements MessageOutputStream.WriteStatus { return buf; } + ////// begin WriteStatus methods + /** * Blocks until outbound window is not full. See Connection.packetSendChoke(). * @param maxWaitMs MessageOutputStream is the only caller, generally with -1 */ - public void waitForAccept(int maxWaitMs) { + public void waitForAccept(int maxWaitMs) throws IOException, InterruptedException { long before = _context.clock().now(); - int queued = _connection.getUnackedPacketsSent(); - int window = _connection.getOptions().getWindowSize(); - boolean accepted = _connection.packetSendChoke(maxWaitMs); - long after = _context.clock().now(); - if (accepted) { - _acceptedOn = after; - } else { - _acceptedOn = -1; - releasePayload(); + boolean accepted = false; + try { + // throws IOE or IE + accepted = _connection.packetSendChoke(maxWaitMs); + } finally { + if (accepted) { + _acceptedOn = _context.clock().now(); + } else { + _acceptedOn = -1; + releasePayload(); + } + if ( (_acceptedOn - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) { + int queued = _connection.getUnackedPacketsSent(); + int window = _connection.getOptions().getWindowSize(); + int afterQueued = _connection.getUnackedPacketsSent(); + _log.debug("Took " + (_acceptedOn - before) + "ms to get " + + (accepted ? "accepted" : "rejected") + + (_cancelledOn > 0 ? " and CANCELLED" : "") + + ", queued behind " + queued +" with a window size of " + window + + ", finally accepted with " + afterQueued + " queued: " + + toString()); + } } - int afterQueued = _connection.getUnackedPacketsSent(); - if ( (after - before > 1000) && (_log.shouldLog(Log.DEBUG)) ) - _log.debug("Took " + (after-before) + "ms to get " - + (accepted ? "accepted" : "rejected") - + (_cancelledOn > 0 ? " and CANCELLED" : "") - + ", queued behind " + queued +" with a window size of " + window - + ", finally accepted with " + afterQueued + " queued: " - + toString()); } /** block until the packet is acked from the far end */ - public void waitForCompletion(int maxWaitMs) { + public void waitForCompletion(int maxWaitMs) throws IOException, InterruptedException { long expiration = _context.clock().now()+maxWaitMs; - while (true) { - long timeRemaining = expiration - _context.clock().now(); - if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break; - try { + try { + while (true) { + long timeRemaining = expiration - _context.clock().now(); + if ( (timeRemaining <= 0) && (maxWaitMs > 0) ) break; synchronized (this) { if (_ackOn > 0) break; - if (_cancelledOn > 0) break; - if (!_connection.getIsConnected()) break; + if (!_connection.getIsConnected()) + throw new IOException("disconnected"); + if (_cancelledOn > 0) + throw new IOException("cancelled"); if (timeRemaining > 60*1000) timeRemaining = 60*1000; else if (timeRemaining <= 0) timeRemaining = 10*1000; wait(timeRemaining); } - } catch (InterruptedException ie) { }//{ break; } + } + } finally { + if (!writeSuccessful()) + releasePayload(); } - if (!writeSuccessful()) - releasePayload(); } public synchronized boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } public synchronized boolean writeFailed() { return _cancelledOn > 0; } public synchronized boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; } + ////// end WriteStatus methods + /** Generate a pcap/tcpdump-compatible format, * so we can use standard debugging tools. */ -- GitLab