From 0942a7f3ffea9d0a88d7cbbb8d8fd3461648f214 Mon Sep 17 00:00:00 2001 From: jrandom <jrandom> Date: Wed, 19 May 2004 15:14:30 +0000 Subject: [PATCH] truckloads of logging new async interface for error notification (e.g. you can get notified of an error prior to it throwing the IOException). This async is useful since the IOException can be delayed for up to a minute while waiting for the close packet to be delivered. The alternative is to fire off a new thread to do the closing, and we may want to go there later, but i'm not sure. --- .../client/streaming/I2PServerSocketImpl.java | 6 +- .../i2p/client/streaming/I2PSocketImpl.java | 61 +++++++++++++------ .../client/streaming/I2PSocketManager.java | 36 +++++------ 3 files changed, 65 insertions(+), 38 deletions(-) diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 436e4bb4c4..965ba31bff 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -89,7 +89,7 @@ class I2PServerSocketImpl implements I2PServerSocket { */ public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("addWaitForAccept [new socket arrived, pending: " + pendingSockets.size()); + _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); if (closing) { if (_log.shouldLog(Log.WARN)) @@ -111,7 +111,7 @@ class I2PServerSocketImpl implements I2PServerSocket { long now = clock.now(); if (now >= end) { if (_log.shouldLog(Log.INFO)) - _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms"); + _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); pendingSockets.remove(s); return false; } @@ -130,7 +130,7 @@ class I2PServerSocketImpl implements I2PServerSocket { } long now = clock.now(); if (_log.shouldLog(Log.DEBUG)) - _log.info("Socket accepted after " + (now-start) + "ms"); + _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); return true; } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 7ea7db183a..b06f29edb3 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -29,7 +29,12 @@ class I2PSocketImpl implements I2PSocket { private Object remoteIDWaiter = new Object(); private I2PInputStream in; private I2POutputStream out; + private SocketErrorListener _socketErrorListener; private boolean outgoing; + private long _socketId; + private static long __socketId = 0; + private long _bytesRead = 0; + private long _bytesWritten = 0; private Object flagLock = new Object(); /** @@ -61,6 +66,7 @@ class I2PSocketImpl implements I2PSocket { this.outgoing = outgoing; manager = mgr; remote = peer; + _socketId = ++__socketId; local = mgr.getSession().getMyDestination(); in = new I2PInputStream(); I2PInputStream pin = new I2PInputStream(); @@ -153,6 +159,7 @@ class I2PSocketImpl implements I2PSocket { * @param data the data to inject into our local inputStream */ public void queueData(byte[] data) { + _bytesRead += data.length; in.queueData(data); } @@ -232,6 +239,17 @@ class I2PSocketImpl implements I2PSocket { in.setReadTimeout(ms); } + public void setSocketErrorListener(SocketErrorListener lsnr) { + _socketErrorListener = lsnr; + } + + void errorOccurred() { + if (_socketErrorListener != null) + _socketErrorListener.errorOccurred(); + } + + private String getPrefix() { return "[" + _socketId + "]: "; } + //-------------------------------------------------- private class I2PInputStream extends InputStream { @@ -256,7 +274,8 @@ class I2PSocketImpl implements I2PSocket { } public synchronized int read(byte[] b, int off, int len) throws IOException { - _log.debug("Read called: " + this.hashCode()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Read called: " + this.hashCode()); if (len == 0) return 0; long dieAfter = System.currentTimeMillis() + readTimeout; byte[] read = bc.startToByteArray(len); @@ -265,7 +284,8 @@ class I2PSocketImpl implements I2PSocket { while (read.length == 0) { synchronized (flagLock) { if (closed) { - _log.debug("Closed is set, so closing stream: " + hashCode()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Closed is set after reading " + _bytesRead + " and writing " + _bytesWritten + ", so closing stream: " + hashCode()); return -1; } } @@ -279,7 +299,7 @@ class I2PSocketImpl implements I2PSocket { if ((readTimeout >= 0) && (System.currentTimeMillis() >= dieAfter)) { - throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)"); + throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)"); } read = bc.startToByteArray(len); @@ -288,7 +308,7 @@ class I2PSocketImpl implements I2PSocket { System.arraycopy(read, 0, b, off, read.length); if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Read from I2PInputStream " + hashCode() + " returned " + _log.debug(getPrefix() + "Read from I2PInputStream " + hashCode() + " returned " + read.length + " bytes"); } //if (_log.shouldLog(Log.DEBUG)) { @@ -309,7 +329,7 @@ class I2PSocketImpl implements I2PSocket { public synchronized void queueData(byte[] data, int off, int len) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Insert " + len + " bytes into queue: " + hashCode()); + _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); bc.append(data, off, len); notifyAll(); } @@ -338,6 +358,7 @@ class I2PSocketImpl implements I2PSocket { } public void write(byte[] b, int off, int len) throws IOException { + _bytesWritten += len; sendTo.queueData(b, off, len); } @@ -353,10 +374,10 @@ class I2PSocketImpl implements I2PSocket { public I2PSocketRunner(InputStream in) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Runner's input stream is: " + in.hashCode()); + _log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode()); this.in = in; String peer = I2PSocketImpl.this.remote.calculateHash().toBase64(); - setName("SocketRunner " + (++__runnerId) + " " + peer.substring(0, 4)); + setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4)); start(); } @@ -378,7 +399,7 @@ class I2PSocketImpl implements I2PSocket { } if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Runner Point d: " + hashCode()); + _log.debug(getPrefix() + "Runner Point d: " + hashCode()); try { Thread.sleep(PACKET_DELAY); @@ -390,10 +411,11 @@ class I2PSocketImpl implements I2PSocket { byte[] data = bc.startToByteArray(MAX_PACKET_SIZE); if (data.length > 0) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message size is: " + data.length); + _log.debug(getPrefix() + "Message size is: " + data.length); boolean sent = sendBlock(data); if (!sent) { - _log.error("Error sending message to peer. Killing socket runner"); + _log.error(getPrefix() + "Error sending message to peer. Killing socket runner"); + errorOccurred(); return false; } } @@ -413,7 +435,7 @@ class I2PSocketImpl implements I2PSocket { packetsHandled++; } if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) { - _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + _log.error(getPrefix() + "A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + in.hashCode() + "; " + "queue size: " + bc.getCurrentSize() + ")"); } @@ -426,32 +448,33 @@ class I2PSocketImpl implements I2PSocket { } // FIXME: Race here? if (sc) { if (_log.shouldLog(Log.INFO)) - _log.info("Sending close packet: " + outgoing); + _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten); byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]); boolean sent = manager.getSession().sendMessage(remote, packet); if (!sent) { - _log.error("Error sending close packet to peer"); + _log.error(getPrefix() + "Error sending close packet to peer"); + errorOccurred(); } } manager.removeSocket(I2PSocketImpl.this); } catch (InterruptedIOException ex) { - _log.error("BUG! read() operations should not timeout!", ex); + _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex); } catch (IOException ex) { // WHOEVER removes this event on inconsistent // state before fixing the inconsistent state (a // reference on the socket in the socket manager // etc.) will get hanged by me personally -- mihi - _log.error("Error running - **INCONSISTENT STATE!!!**", ex); + _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex); } catch (I2PException ex) { - _log.error("Error running - **INCONSISTENT STATE!!!**", ex); + _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex); } } private boolean sendBlock(byte data[]) throws I2PSessionException { if (_log.shouldLog(Log.DEBUG)) - _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode()); + _log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode()); if (remoteID == null) { - _log.error("NULL REMOTEID"); + _log.error(getPrefix() + "NULL REMOTEID"); return false; } byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data); @@ -463,4 +486,6 @@ class I2PSocketImpl implements I2PSocket { return sent; } } + + public String toString() { return "" + hashCode(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index f81d2abd16..833ba62b48 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -189,7 +189,7 @@ public class I2PSocketManager implements I2PSessionListener { s = (I2PSocketImpl) _outSockets.get(id); } - _log.debug("*Disconnect outgoing!"); + _log.debug("*Disconnect outgoing for socket " + s); try { if (s != null) { if (payload.length > 0) { @@ -207,7 +207,7 @@ public class I2PSocketManager implements I2PSessionListener { } return; } catch (Exception t) { - _log.error("Ignoring error on disconnect", t); + _log.error("Ignoring error on disconnect for socket " + s, t); } } @@ -225,7 +225,7 @@ public class I2PSocketManager implements I2PSessionListener { // packet send outgoing if (_log.shouldLog(Log.DEBUG)) - _log.debug("*Packet send outgoing [" + payload.length + "]"); + _log.debug("*Packet send outgoing [" + payload.length + "] for socket " + s); if (s != null) { s.queueData(payload); return; @@ -245,7 +245,6 @@ public class I2PSocketManager implements I2PSessionListener { */ private void synIncomingAvailable(String id, byte payload[], I2PSession session) throws DataFormatException, I2PSessionException { - _log.debug("*Syn!"); Destination d = new Destination(); d.fromByteArray(payload); @@ -259,6 +258,7 @@ public class I2PSocketManager implements I2PSessionListener { s.setRemoteID(id); } } + _log.debug("*Syn! for socket " + s); if (!acceptConnections) { // The app did not instantiate an I2PServerSocket @@ -283,7 +283,7 @@ public class I2PSocketManager implements I2PSessionListener { if (!replySentOk) { if (_log.shouldLog(Log.WARN)) _log.warn("Error sending reply to " + d.calculateHash().toBase64() - + " in response to a new con message", + + " in response to a new con message for socket " + s, new Exception("Failed creation")); s.internalClose(); } @@ -293,7 +293,7 @@ public class I2PSocketManager implements I2PSessionListener { packet[0] = CLOSE_OUT; boolean nackSent = session.sendMessage(d, packet); if (!nackSent) { - _log.warn("Error sending NACK for session creation"); + _log.warn("Error sending NACK for session creation for socket " + s); } s.internalClose(); } @@ -306,7 +306,6 @@ public class I2PSocketManager implements I2PSessionListener { * */ private void disconnectIncoming(String id, byte payload[]) { - _log.debug("*Disconnect incoming!"); I2PSocketImpl s = null; synchronized (lock) { s = (I2PSocketImpl) _inSockets.get(id); @@ -315,6 +314,8 @@ public class I2PSocketManager implements I2PSessionListener { } } + _log.debug("*Disconnect incoming for socket " + s); + try { if (payload.length == 0 && s != null) { s.internalClose(); @@ -339,12 +340,13 @@ public class I2PSocketManager implements I2PSessionListener { * @throws IllegalStateException if the socket isn't open or isn't known */ private void sendIncoming(String id, byte payload[]) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("*Packet send incoming [" + payload.length + "]"); I2PSocketImpl s = null; synchronized (lock) { s = (I2PSocketImpl) _inSockets.get(id); } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("*Packet send incoming [" + payload.length + "] for socket " + s); if (s != null) { s.queueData(payload); @@ -422,7 +424,7 @@ public class I2PSocketManager implements I2PSessionListener { boolean sent = false; sent = _session.sendMessage(peer, packet); if (!sent) { - _log.info("Unable to send & receive ack for SYN packet"); + _log.info("Unable to send & receive ack for SYN packet for socket " + s); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -431,18 +433,18 @@ public class I2PSocketManager implements I2PSessionListener { remoteID = s.getRemoteID(true, options.getConnectTimeout()); if (remoteID == null) - throw new ConnectException("Connection refused by peer"); + throw new ConnectException("Connection refused by peer for socket " + s); if ("".equals(remoteID)) - throw new NoRouteToHostException("Unable to reach peer"); + throw new NoRouteToHostException("Unable to reach peer for socket " + s); if (_log.shouldLog(Log.DEBUG)) _log.debug("TIMING: s given out for remoteID " - + getReadableForm(remoteID)); + + getReadableForm(remoteID) + " for socket " + s); return s; } catch (InterruptedIOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Timeout waiting for ack from syn for id " - + getReadableForm(lcID), ioe); + + getReadableForm(lcID) + " for socket " + s, ioe); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -456,7 +458,7 @@ public class I2PSocketManager implements I2PSessionListener { throw ex; } catch (IOException ex) { if (_log.shouldLog(Log.ERROR)) - _log.error("Error sending syn on id " + getReadableForm(lcID), ex); + _log.error("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -464,7 +466,7 @@ public class I2PSocketManager implements I2PSessionListener { throw new I2PException("Unhandled IOException occurred"); } catch (I2PException ex) { if (_log.shouldLog(Log.INFO)) - _log.info("Error sending syn on id " + getReadableForm(lcID), ex); + _log.info("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -577,7 +579,7 @@ public class I2PSocketManager implements I2PSessionListener { public void removeSocket(I2PSocketImpl sock) { synchronized (lock) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\""); + _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\" [" + sock + "]"); _inSockets.remove(sock.getLocalID()); _outSockets.remove(sock.getLocalID()); lock.notify(); -- GitLab