From e5d66f46c6116fa725f7b11c0aa5cab07f8dbeec Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Sun, 15 Aug 2004 20:48:35 +0000 Subject: [PATCH] deal with a race on close more zealous bc synchronization make sure we always close the streams explicitly logging --- .../i2p/client/streaming/I2PSocketImpl.java | 52 +++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 0d4e95d2c9..14d1dac3fc 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -346,7 +346,7 @@ class I2PSocketImpl implements I2PSocket { } boolean timedOut = false; - while (read.length == 0) { + while ( (read.length == 0) && (!inStreamClosed) ) { synchronized (flagLock) { if (closed) { if (_log.shouldLog(Log.DEBUG)) @@ -378,6 +378,9 @@ class I2PSocketImpl implements I2PSocket { } } if (read.length > len) throw new RuntimeException("BUG"); + if ( (inStreamClosed) && ( (read == null) || (read.length <= 0) ) ) + return -1; + System.arraycopy(read, 0, b, off, read.length); if (_log.shouldLog(Log.DEBUG)) { @@ -456,6 +459,8 @@ class I2PSocketImpl implements I2PSocket { synchronized (I2PInputStream.this) { I2PInputStream.this.notifyAll(); } + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getStreamPrefix() + "After insert " + len + " bytes into queue: " + hashCode()); } public void notifyClosed() { @@ -471,6 +476,8 @@ class I2PSocketImpl implements I2PSocket { inStreamClosed = true; bc.notifyAll(); } + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getStreamPrefix() + "After close"); } } @@ -518,10 +525,21 @@ class I2PSocketImpl implements I2PSocket { */ private boolean handleNextPacket(ByteCollector bc, byte buffer[]) throws IOException, I2PSessionException { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket"); int len = in.read(buffer); - int bcsize = bc.getCurrentSize(); + int bcsize = 0; + synchronized (bc) { + bcsize = bc.getCurrentSize(); + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "handleNextPacket len=" + len + " bcsize=" + bcsize); + if (len != -1) { - bc.append(buffer, len); + synchronized (bc) { + bc.append(buffer, len); + } } else if (bcsize == 0) { // nothing left in the buffer, and read(..) got EOF (-1). // the bart the @@ -529,7 +547,7 @@ class I2PSocketImpl implements I2PSocket { } if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Runner Point d: " + hashCode()); + _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Runner Point d: " + hashCode()); try { Thread.sleep(PACKET_DELAY); @@ -538,19 +556,22 @@ class I2PSocketImpl implements I2PSocket { } } if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) { - byte[] data = bc.startToByteArray(MAX_PACKET_SIZE); + byte data[] = null; + synchronized (bc) { + data = bc.startToByteArray(MAX_PACKET_SIZE); + } if (data.length > 0) { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Message size is: " + data.length); + _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message size is: " + data.length); boolean sent = sendBlock(data); if (!sent) { if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "Error sending message to peer. Killing socket runner"); + _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + "Error sending message to peer. Killing socket runner"); errorOccurred(); return false; } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug(getPrefix() + "Message sent to peer"); + _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + "Message sent to peer"); } } } @@ -567,7 +588,14 @@ class I2PSocketImpl implements I2PSocket { while (keepHandling) { keepHandling = handleNextPacket(bc, buffer); packetsHandled++; + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + ":" + Thread.currentThread().getName() + + "Packets handled: " + packetsHandled); } + if (_log.shouldLog(Log.INFO)) + _log.info(getPrefix() + ":" + Thread.currentThread().getName() + + "After handling packets, we're done. Packets handled: " + packetsHandled); + if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) { if (_log.shouldLog(Log.WARN)) _log.warn(getPrefix() + "We lost some data queued up due to a network send error (input stream: " @@ -583,16 +611,20 @@ class I2PSocketImpl implements I2PSocket { } // FIXME: Race here? if (sc) { if (_log.shouldLog(Log.INFO)) - _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten); + _log.info(getPrefix() + ":" + Thread.currentThread().getName() + + "Sending close packet: (we started? " + outgoing + + ") after reading " + _bytesRead + " and writing " + _bytesWritten); byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]); boolean sent = manager.getSession().sendMessage(remote, packet); if (!sent) { if (_log.shouldLog(Log.WARN)) - _log.warn(getPrefix() + "Error sending close packet to peer"); + _log.warn(getPrefix() + ":" + Thread.currentThread().getName() + + "Error sending close packet to peer"); errorOccurred(); } } manager.removeSocket(I2PSocketImpl.this); + internalClose(); } catch (InterruptedIOException ex) { _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex); } catch (IOException ex) { -- GitLab