diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 436e4bb4c4bcbf1bd4feda9c7a7c40e76bc399fb..965ba31bff6c7c286473e9cf53a26c309a750ba3 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -89,7 +89,7 @@ class I2PServerSocketImpl implements I2PServerSocket { */ public boolean addWaitForAccept(I2PSocket s, long timeoutMs) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("addWaitForAccept [new socket arrived, pending: " + pendingSockets.size()); + _log.debug("addWaitForAccept [new socket arrived [" + s.toString() + "], pending: " + pendingSockets.size()); if (closing) { if (_log.shouldLog(Log.WARN)) @@ -111,7 +111,7 @@ class I2PServerSocketImpl implements I2PServerSocket { long now = clock.now(); if (now >= end) { if (_log.shouldLog(Log.INFO)) - _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms"); + _log.info("Expired while waiting for accept (time elapsed =" + (now - start) + "ms) for socket " + s.toString()); pendingSockets.remove(s); return false; } @@ -130,7 +130,7 @@ class I2PServerSocketImpl implements I2PServerSocket { } long now = clock.now(); if (_log.shouldLog(Log.DEBUG)) - _log.info("Socket accepted after " + (now-start) + "ms"); + _log.info("Socket accepted after " + (now-start) + "ms for socket " + s.toString()); return true; } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 7ea7db183ab50b97077d6b3bea19b50d657e8778..b06f29edb365dde4f1ca64fefff11c17cf9a99e7 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -29,7 +29,12 @@ class I2PSocketImpl implements I2PSocket { private Object remoteIDWaiter = new Object(); private I2PInputStream in; private I2POutputStream out; + private SocketErrorListener _socketErrorListener; private boolean outgoing; + private long _socketId; + private static long __socketId = 0; + private long _bytesRead = 0; + private long _bytesWritten = 0; private Object flagLock = new Object(); /** @@ -61,6 +66,7 @@ class I2PSocketImpl implements I2PSocket { this.outgoing = outgoing; manager = mgr; remote = peer; + _socketId = ++__socketId; local = mgr.getSession().getMyDestination(); in = new I2PInputStream(); I2PInputStream pin = new I2PInputStream(); @@ -153,6 +159,7 @@ class I2PSocketImpl implements I2PSocket { * @param data the data to inject into our local inputStream */ public void queueData(byte[] data) { + _bytesRead += data.length; in.queueData(data); } @@ -232,6 +239,17 @@ class I2PSocketImpl implements I2PSocket { in.setReadTimeout(ms); } + public void setSocketErrorListener(SocketErrorListener lsnr) { + _socketErrorListener = lsnr; + } + + void errorOccurred() { + if (_socketErrorListener != null) + _socketErrorListener.errorOccurred(); + } + + private String getPrefix() { return "[" + _socketId + "]: "; } + //-------------------------------------------------- private class I2PInputStream extends InputStream { @@ -256,7 +274,8 @@ class I2PSocketImpl implements I2PSocket { } public synchronized int read(byte[] b, int off, int len) throws IOException { - _log.debug("Read called: " + this.hashCode()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Read called: " + this.hashCode()); if (len == 0) return 0; long dieAfter = System.currentTimeMillis() + readTimeout; byte[] read = bc.startToByteArray(len); @@ -265,7 +284,8 @@ class I2PSocketImpl implements I2PSocket { while (read.length == 0) { synchronized (flagLock) { if (closed) { - _log.debug("Closed is set, so closing stream: " + hashCode()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug(getPrefix() + "Closed is set after reading " + _bytesRead + " and writing " + _bytesWritten + ", so closing stream: " + hashCode()); return -1; } } @@ -279,7 +299,7 @@ class I2PSocketImpl implements I2PSocket { if ((readTimeout >= 0) && (System.currentTimeMillis() >= dieAfter)) { - throw new InterruptedIOException("Timeout reading from I2PSocket (" + readTimeout + " msecs)"); + throw new InterruptedIOException(getPrefix() + "Timeout reading from I2PSocket (" + readTimeout + " msecs)"); } read = bc.startToByteArray(len); @@ -288,7 +308,7 @@ class I2PSocketImpl implements I2PSocket { System.arraycopy(read, 0, b, off, read.length); if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Read from I2PInputStream " + hashCode() + " returned " + _log.debug(getPrefix() + "Read from I2PInputStream " + hashCode() + " returned " + read.length + " bytes"); } //if (_log.shouldLog(Log.DEBUG)) { @@ -309,7 +329,7 @@ class I2PSocketImpl implements I2PSocket { public synchronized void queueData(byte[] data, int off, int len) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Insert " + len + " bytes into queue: " + hashCode()); + _log.debug(getPrefix() + "Insert " + len + " bytes into queue: " + hashCode()); bc.append(data, off, len); notifyAll(); } @@ -338,6 +358,7 @@ class I2PSocketImpl implements I2PSocket { } public void write(byte[] b, int off, int len) throws IOException { + _bytesWritten += len; sendTo.queueData(b, off, len); } @@ -353,10 +374,10 @@ class I2PSocketImpl implements I2PSocket { public I2PSocketRunner(InputStream in) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Runner's input stream is: " + in.hashCode()); + _log.debug(getPrefix() + "Runner's input stream is: " + in.hashCode()); this.in = in; String peer = I2PSocketImpl.this.remote.calculateHash().toBase64(); - setName("SocketRunner " + (++__runnerId) + " " + peer.substring(0, 4)); + setName("SocketRunner " + (++__runnerId) + "/" + _socketId + " " + peer.substring(0, 4)); start(); } @@ -378,7 +399,7 @@ class I2PSocketImpl implements I2PSocket { } if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Runner Point d: " + hashCode()); + _log.debug(getPrefix() + "Runner Point d: " + hashCode()); try { Thread.sleep(PACKET_DELAY); @@ -390,10 +411,11 @@ class I2PSocketImpl implements I2PSocket { byte[] data = bc.startToByteArray(MAX_PACKET_SIZE); if (data.length > 0) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Message size is: " + data.length); + _log.debug(getPrefix() + "Message size is: " + data.length); boolean sent = sendBlock(data); if (!sent) { - _log.error("Error sending message to peer. Killing socket runner"); + _log.error(getPrefix() + "Error sending message to peer. Killing socket runner"); + errorOccurred(); return false; } } @@ -413,7 +435,7 @@ class I2PSocketImpl implements I2PSocket { packetsHandled++; } if ((bc.getCurrentSize() > 0) && (packetsHandled > 1)) { - _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + _log.error(getPrefix() + "A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + in.hashCode() + "; " + "queue size: " + bc.getCurrentSize() + ")"); } @@ -426,32 +448,33 @@ class I2PSocketImpl implements I2PSocket { } // FIXME: Race here? if (sc) { if (_log.shouldLog(Log.INFO)) - _log.info("Sending close packet: " + outgoing); + _log.info(getPrefix() + "Sending close packet: (we started? " + outgoing + ") after reading " + _bytesRead + " and writing " + _bytesWritten); byte[] packet = I2PSocketManager.makePacket(getMask(0x02), remoteID, new byte[0]); boolean sent = manager.getSession().sendMessage(remote, packet); if (!sent) { - _log.error("Error sending close packet to peer"); + _log.error(getPrefix() + "Error sending close packet to peer"); + errorOccurred(); } } manager.removeSocket(I2PSocketImpl.this); } catch (InterruptedIOException ex) { - _log.error("BUG! read() operations should not timeout!", ex); + _log.error(getPrefix() + "BUG! read() operations should not timeout!", ex); } catch (IOException ex) { // WHOEVER removes this event on inconsistent // state before fixing the inconsistent state (a // reference on the socket in the socket manager // etc.) will get hanged by me personally -- mihi - _log.error("Error running - **INCONSISTENT STATE!!!**", ex); + _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex); } catch (I2PException ex) { - _log.error("Error running - **INCONSISTENT STATE!!!**", ex); + _log.error(getPrefix() + "Error running - **INCONSISTENT STATE!!!**", ex); } } private boolean sendBlock(byte data[]) throws I2PSessionException { if (_log.shouldLog(Log.DEBUG)) - _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode()); + _log.debug(getPrefix() + "TIMING: Block to send for " + I2PSocketImpl.this.hashCode()); if (remoteID == null) { - _log.error("NULL REMOTEID"); + _log.error(getPrefix() + "NULL REMOTEID"); return false; } byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data); @@ -463,4 +486,6 @@ class I2PSocketImpl implements I2PSocket { return sent; } } + + public String toString() { return "" + hashCode(); } } diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index f81d2abd16387d7557e9b8cca192ac1101642a08..833ba62b4825477a7e5ac4fd6722c0f8f60a3765 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -189,7 +189,7 @@ public class I2PSocketManager implements I2PSessionListener { s = (I2PSocketImpl) _outSockets.get(id); } - _log.debug("*Disconnect outgoing!"); + _log.debug("*Disconnect outgoing for socket " + s); try { if (s != null) { if (payload.length > 0) { @@ -207,7 +207,7 @@ public class I2PSocketManager implements I2PSessionListener { } return; } catch (Exception t) { - _log.error("Ignoring error on disconnect", t); + _log.error("Ignoring error on disconnect for socket " + s, t); } } @@ -225,7 +225,7 @@ public class I2PSocketManager implements I2PSessionListener { // packet send outgoing if (_log.shouldLog(Log.DEBUG)) - _log.debug("*Packet send outgoing [" + payload.length + "]"); + _log.debug("*Packet send outgoing [" + payload.length + "] for socket " + s); if (s != null) { s.queueData(payload); return; @@ -245,7 +245,6 @@ public class I2PSocketManager implements I2PSessionListener { */ private void synIncomingAvailable(String id, byte payload[], I2PSession session) throws DataFormatException, I2PSessionException { - _log.debug("*Syn!"); Destination d = new Destination(); d.fromByteArray(payload); @@ -259,6 +258,7 @@ public class I2PSocketManager implements I2PSessionListener { s.setRemoteID(id); } } + _log.debug("*Syn! for socket " + s); if (!acceptConnections) { // The app did not instantiate an I2PServerSocket @@ -283,7 +283,7 @@ public class I2PSocketManager implements I2PSessionListener { if (!replySentOk) { if (_log.shouldLog(Log.WARN)) _log.warn("Error sending reply to " + d.calculateHash().toBase64() - + " in response to a new con message", + + " in response to a new con message for socket " + s, new Exception("Failed creation")); s.internalClose(); } @@ -293,7 +293,7 @@ public class I2PSocketManager implements I2PSessionListener { packet[0] = CLOSE_OUT; boolean nackSent = session.sendMessage(d, packet); if (!nackSent) { - _log.warn("Error sending NACK for session creation"); + _log.warn("Error sending NACK for session creation for socket " + s); } s.internalClose(); } @@ -306,7 +306,6 @@ public class I2PSocketManager implements I2PSessionListener { * */ private void disconnectIncoming(String id, byte payload[]) { - _log.debug("*Disconnect incoming!"); I2PSocketImpl s = null; synchronized (lock) { s = (I2PSocketImpl) _inSockets.get(id); @@ -315,6 +314,8 @@ public class I2PSocketManager implements I2PSessionListener { } } + _log.debug("*Disconnect incoming for socket " + s); + try { if (payload.length == 0 && s != null) { s.internalClose(); @@ -339,12 +340,13 @@ public class I2PSocketManager implements I2PSessionListener { * @throws IllegalStateException if the socket isn't open or isn't known */ private void sendIncoming(String id, byte payload[]) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("*Packet send incoming [" + payload.length + "]"); I2PSocketImpl s = null; synchronized (lock) { s = (I2PSocketImpl) _inSockets.get(id); } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("*Packet send incoming [" + payload.length + "] for socket " + s); if (s != null) { s.queueData(payload); @@ -422,7 +424,7 @@ public class I2PSocketManager implements I2PSessionListener { boolean sent = false; sent = _session.sendMessage(peer, packet); if (!sent) { - _log.info("Unable to send & receive ack for SYN packet"); + _log.info("Unable to send & receive ack for SYN packet for socket " + s); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -431,18 +433,18 @@ public class I2PSocketManager implements I2PSessionListener { remoteID = s.getRemoteID(true, options.getConnectTimeout()); if (remoteID == null) - throw new ConnectException("Connection refused by peer"); + throw new ConnectException("Connection refused by peer for socket " + s); if ("".equals(remoteID)) - throw new NoRouteToHostException("Unable to reach peer"); + throw new NoRouteToHostException("Unable to reach peer for socket " + s); if (_log.shouldLog(Log.DEBUG)) _log.debug("TIMING: s given out for remoteID " - + getReadableForm(remoteID)); + + getReadableForm(remoteID) + " for socket " + s); return s; } catch (InterruptedIOException ioe) { if (_log.shouldLog(Log.ERROR)) _log.error("Timeout waiting for ack from syn for id " - + getReadableForm(lcID), ioe); + + getReadableForm(lcID) + " for socket " + s, ioe); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -456,7 +458,7 @@ public class I2PSocketManager implements I2PSessionListener { throw ex; } catch (IOException ex) { if (_log.shouldLog(Log.ERROR)) - _log.error("Error sending syn on id " + getReadableForm(lcID), ex); + _log.error("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -464,7 +466,7 @@ public class I2PSocketManager implements I2PSessionListener { throw new I2PException("Unhandled IOException occurred"); } catch (I2PException ex) { if (_log.shouldLog(Log.INFO)) - _log.info("Error sending syn on id " + getReadableForm(lcID), ex); + _log.info("Error sending syn on id " + getReadableForm(lcID) + " for socket " + s, ex); synchronized (lock) { _outSockets.remove(s.getLocalID()); } @@ -577,7 +579,7 @@ public class I2PSocketManager implements I2PSessionListener { public void removeSocket(I2PSocketImpl sock) { synchronized (lock) { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\""); + _log.debug("Removing socket \"" + getReadableForm(sock.getLocalID()) + "\" [" + sock + "]"); _inSockets.remove(sock.getLocalID()); _outSockets.remove(sock.getLocalID()); lock.notify();