diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java b/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java index 6d459ae3279a42124de7c6fdc9f335839d0c2c17..912914bf5560b7022d1c0cd3005bb6e919facf3c 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/ByteCollector.java @@ -6,154 +6,152 @@ public class ByteCollector { int size; public ByteCollector() { - contents=new byte[80]; - size=0; + contents = new byte[80]; + size = 0; } public ByteCollector(byte[] b) { - this(); - append(b); + this(); + append(b); } public ByteCollector(byte b) { - this(); - append(b); + this(); + append(b); } - public ByteCollector append (byte b) { - ensureCapacity(size+1); - contents[size++]=b; - return this; + public ByteCollector append(byte b) { + ensureCapacity(size + 1); + contents[size++] = b; + return this; } - public ByteCollector append (byte[] b) { - ensureCapacity(size+b.length); - System.arraycopy(b,0,contents,size,b.length); - size+=b.length; - return this; + public ByteCollector append(byte[] b) { + ensureCapacity(size + b.length); + System.arraycopy(b, 0, contents, size, b.length); + size += b.length; + return this; } public ByteCollector append(byte[] b, int len) { - return append(b,0,len); + return append(b, 0, len); } public ByteCollector append(byte[] b, int off, int len) { - ensureCapacity(size+len); - System.arraycopy(b,off,contents,size,len); - size+=len; - return this; + ensureCapacity(size + len); + System.arraycopy(b, off, contents, size, len); + size += len; + return this; } - + public ByteCollector append(ByteCollector bc) { - // optimieren? - return append(bc.toByteArray()); + // optimieren? + return append(bc.toByteArray()); } public byte[] toByteArray() { - byte[] result=new byte[size]; - System.arraycopy(contents,0,result,0,size); - return result; + byte[] result = new byte[size]; + System.arraycopy(contents, 0, result, 0, size); + return result; } public byte[] startToByteArray(int maxlen) { - if (size < maxlen) { - byte[] res = toByteArray(); - clear(); - return res; - } else { - byte[] result = new byte[maxlen]; - System.arraycopy(contents,0,result,0,maxlen); - System.arraycopy(contents,maxlen,contents,0,size-maxlen); - size-=maxlen; - return result; - } - } - - public int getCurrentSize() { - return size; + if (size < maxlen) { + byte[] res = toByteArray(); + clear(); + return res; + } else { + byte[] result = new byte[maxlen]; + System.arraycopy(contents, 0, result, 0, maxlen); + System.arraycopy(contents, maxlen, contents, 0, size - maxlen); + size -= maxlen; + return result; + } + } + + public int getCurrentSize() { + return size; } public boolean ensureCapacity(int cap) { - if (contents.length<cap) { - int l=contents.length; - while (l<cap) { - l=(l*3)/2+1; - } - byte[] newcont=new byte[l]; - System.arraycopy(contents,0,newcont,0,size); - contents=newcont; - return true; - } - return false; + if (contents.length < cap) { + int l = contents.length; + while (l < cap) { + l = (l * 3) / 2 + 1; + } + byte[] newcont = new byte[l]; + System.arraycopy(contents, 0, newcont, 0, size); + contents = newcont; + return true; + } + return false; } public boolean isEmpty() { - return size==0; + return size == 0; } public int indexOf(ByteCollector bc) { - // optimieren? - return indexOf(bc.toByteArray()); + // optimieren? + return indexOf(bc.toByteArray()); } - + public int indexOf(byte b) { - // optimieren? - return indexOf(new byte[] {b}); + // optimieren? + return indexOf(new byte[] { b}); } - + public int indexOf(byte[] ba) { - loop: - for (int i=0;i<size-ba.length+1;i++) { - for (int j=0;j<ba.length;j++) { - if (contents[i+j]!=ba[j]) continue loop; - } - return i; - } - return -1; - } - + loop: for (int i = 0; i < size - ba.length + 1; i++) { + for (int j = 0; j < ba.length; j++) { + if (contents[i + j] != ba[j]) continue loop; + } + return i; + } + return -1; + } + public void clear() { - size=0; + size = 0; } public void clearAndShorten() { - size=0; - contents=new byte[80]; + size = 0; + contents = new byte[80]; } public String toString() { - return new String(toByteArray()); + return new String(toByteArray()); } public int hashCode() { - int h =0; - for (int i=0;i<size;i++) { - h+=contents[i]*contents[i]; - } - return h; + int h = 0; + for (int i = 0; i < size; i++) { + h += contents[i] * contents[i]; + } + return h; } public boolean equals(Object o) { - if (o instanceof ByteCollector) { - ByteCollector by=(ByteCollector)o; - if (size!=by.size) return false; - for (int i=0;i<size;i++) { - if (contents[i]!=by.contents[i]) return false; - } - return true; - } else { - return false; - } + if (o instanceof ByteCollector) { + ByteCollector by = (ByteCollector) o; + if (size != by.size) return false; + for (int i = 0; i < size; i++) { + if (contents[i] != by.contents[i]) return false; + } + return true; + } else { + return false; + } } public byte removeFirst() { - byte bb=contents[0]; - if (size==0) - throw new IllegalArgumentException("ByteCollector is empty"); - if(size>1) - System.arraycopy(contents,1,contents,0,--size); - else - size=0; - return bb; - } -} + byte bb = contents[0]; + if (size == 0) throw new IllegalArgumentException("ByteCollector is empty"); + if (size > 1) + System.arraycopy(contents, 1, contents, 0, --size); + else + size = 0; + return bb; + } +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java index 8461d687b9dcace15b0254d575f3c7a990ed6656..6f46364d6370389b83ec111e118926abc8d617f0 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocket.java @@ -11,7 +11,7 @@ public interface I2PServerSocket { * Closes the socket. */ public void close() throws I2PException; - + /** * Waits for the next socket connecting. If a remote user tried to make a * connection and the local application wasn't .accept()ing new connections, @@ -26,4 +26,4 @@ public interface I2PServerSocket { * Access the manager which is coordinating the server socket */ public I2PSocketManager getManager(); -} +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java index 3cafcb9d68ac304c0dc55de3731c15de29836cde..102a84af2aafa4d87cfe511aecb9c8733efcd982 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PServerSocketImpl.java @@ -10,41 +10,44 @@ import net.i2p.util.Log; class I2PServerSocketImpl implements I2PServerSocket { private final static Log _log = new Log(I2PServerSocketImpl.class); private I2PSocketManager mgr; - private I2PSocket cached=null; // buffer one socket here - + private I2PSocket cached = null; // buffer one socket here + public I2PServerSocketImpl(I2PSocketManager mgr) { - this.mgr = mgr; + this.mgr = mgr; } - + public synchronized I2PSocket accept() throws I2PException { - while(cached == null) { - myWait(); - } - I2PSocket ret=cached; - cached=null; - notifyAll(); - _log.debug("TIMING: handed out accept result "+ret.hashCode()); - return ret; + while (cached == null) { + myWait(); + } + I2PSocket ret = cached; + cached = null; + notifyAll(); + _log.debug("TIMING: handed out accept result " + ret.hashCode()); + return ret; } - - public synchronized boolean getNewSocket(I2PSocket s){ - while(cached != null) { - myWait(); - } - cached=s; - notifyAll(); - return true; + + public synchronized boolean getNewSocket(I2PSocket s) { + while (cached != null) { + myWait(); + } + cached = s; + notifyAll(); + return true; } - + public void close() throws I2PException { - //noop + //noop } - + private void myWait() { - try{ - wait(); - } catch (InterruptedException ex) {} + try { + wait(); + } catch (InterruptedException ex) { + } } - public I2PSocketManager getManager() { return mgr; } -} + public I2PSocketManager getManager() { + return mgr; + } +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java index a3a96e12b2d0a691e96d34074e053d31e0d2c81c..1dad61f8177f8fea0b90369dfa29dd32d45afa55 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocket.java @@ -36,4 +36,4 @@ public interface I2PSocket { * Closes the socket if not closed yet */ public void close() throws IOException; -} +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java index 74d4cbb976703f1215f756909f08995e306d413a..137358586d3aeec30064bd9db2c6fdb6d8dd3349 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketImpl.java @@ -18,9 +18,9 @@ import net.i2p.util.Log; class I2PSocketImpl implements I2PSocket { private final static Log _log = new Log(I2PSocketImpl.class); - public static final int MAX_PACKET_SIZE = 1024*32; - public static final int PACKET_DELAY=100; - + public static final int MAX_PACKET_SIZE = 1024 * 32; + public static final int PACKET_DELAY = 100; + private I2PSocketManager manager; private Destination local; private Destination remote; @@ -31,307 +31,306 @@ class I2PSocketImpl implements I2PSocket { private I2POutputStream out; private boolean outgoing; private Object flagLock = new Object(); - private boolean closed = false, sendClose=true, closed2=false; - - public I2PSocketImpl(Destination peer, I2PSocketManager mgr, - boolean outgoing, String localID) { - this.outgoing=outgoing; - manager = mgr; - remote = peer; - local = mgr.getSession().getMyDestination(); - in = new I2PInputStream(); - I2PInputStream pin = new I2PInputStream(); - out = new I2POutputStream(pin); - new I2PSocketRunner(pin); - this.localID = localID; + private boolean closed = false, sendClose = true, closed2 = false; + + public I2PSocketImpl(Destination peer, I2PSocketManager mgr, boolean outgoing, String localID) { + this.outgoing = outgoing; + manager = mgr; + remote = peer; + local = mgr.getSession().getMyDestination(); + in = new I2PInputStream(); + I2PInputStream pin = new I2PInputStream(); + out = new I2POutputStream(pin); + new I2PSocketRunner(pin); + this.localID = localID; } - + public String getLocalID() { - return localID; + return localID; } public void setRemoteID(String id) { - synchronized(remoteIDWaiter) { - remoteID=id; - remoteIDWaiter.notifyAll(); - } + synchronized (remoteIDWaiter) { + remoteID = id; + remoteIDWaiter.notifyAll(); + } } public String getRemoteID(boolean wait) throws InterruptedIOException { - return getRemoteID(wait, -1); + return getRemoteID(wait, -1); } + public String getRemoteID(boolean wait, long maxWait) throws InterruptedIOException { - long dieAfter = System.currentTimeMillis() + maxWait; - synchronized(remoteIDWaiter) { - while (wait && remoteID==null) { - try { - if (maxWait > 0) - remoteIDWaiter.wait(maxWait); - else - remoteIDWaiter.wait(); - } catch (InterruptedException ex) {} - - if ( (maxWait > 0) && (System.currentTimeMillis() > dieAfter) ) - throw new InterruptedIOException("Timed out waiting for remote ID"); - } - if (wait) { - _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) +" for "+this.hashCode()); - } - return remoteID; - } + long dieAfter = System.currentTimeMillis() + maxWait; + synchronized (remoteIDWaiter) { + while (wait && remoteID == null) { + try { + if (maxWait > 0) + remoteIDWaiter.wait(maxWait); + else + remoteIDWaiter.wait(); + } catch (InterruptedException ex) { + } + + if ((maxWait > 0) && (System.currentTimeMillis() > dieAfter)) + throw new InterruptedIOException("Timed out waiting for remote ID"); + } + if (wait) { + _log.debug("TIMING: RemoteID set to " + I2PSocketManager.getReadableForm(remoteID) + " for " + + this.hashCode()); + } + return remoteID; + } } public String getRemoteID() throws InterruptedIOException { - return getRemoteID(false); + return getRemoteID(false); } public void queueData(byte[] data) { - in.queueData(data); + in.queueData(data); } /** * Return the Destination of this side of the socket. */ - public Destination getThisDestination() { return local; } + public Destination getThisDestination() { + return local; + } /** * Return the destination of the peer. */ - public Destination getPeerDestination() { return remote; } + public Destination getPeerDestination() { + return remote; + } /** * Return an InputStream to read from the socket. */ - public InputStream getInputStream() throws IOException { - if ( (in == null) ) - throw new IOException("Not connected"); - return in; + public InputStream getInputStream() throws IOException { + if ((in == null)) throw new IOException("Not connected"); + return in; } /** * Return an OutputStream to write into the socket. */ public OutputStream getOutputStream() throws IOException { - if ( (out == null) ) - throw new IOException("Not connected"); - return out; + if ((out == null)) throw new IOException("Not connected"); + return out; } /** * Closes the socket if not closed yet */ public void close() throws IOException { - synchronized(flagLock) { - _log.debug("Closing connection"); - closed=true; - } - out.close(); - in.notifyClosed(); + synchronized (flagLock) { + _log.debug("Closing connection"); + closed = true; + } + out.close(); + in.notifyClosed(); } public void internalClose() { - synchronized(flagLock) { - closed=true; - closed2=true; - sendClose=false; - } - out.close(); - in.notifyClosed(); + synchronized (flagLock) { + closed = true; + closed2 = true; + sendClose = false; + } + out.close(); + in.notifyClosed(); } - private byte getMask(int add) { - return (byte)((outgoing?(byte)0xA0:(byte)0x50)+(byte)add); + return (byte) ((outgoing ? (byte) 0xA0 : (byte) 0x50) + (byte) add); } - + //-------------------------------------------------- public class I2PInputStream extends InputStream { - private ByteCollector bc = new ByteCollector(); - - public int read() throws IOException { - byte[] b = new byte[1]; - int res = read(b); - if (res == 1) return b[0] & 0xff; - if (res == -1) return -1; - throw new RuntimeException("Incorrect read() result"); - } - - public synchronized int read(byte[] b, int off, int len) throws IOException { - _log.debug("Read called: "+this.hashCode()); - if (len==0) return 0; - byte[] read = bc.startToByteArray(len); - while (read.length==0) { - synchronized(flagLock) { - if (closed){ - _log.debug("Closed is set, so closing stream: "+this.hashCode()); - return -1; - } - } - try { - wait(); - } catch (InterruptedException ex) {} - read = bc.startToByteArray(len); - } - if (read.length>len) throw new RuntimeException("BUG"); - System.arraycopy(read,0,b,off,read.length); - - if (_log.shouldLog(Log.DEBUG)) { - _log.debug("Read from I2PInputStream " + this.hashCode() - + " returned "+read.length+" bytes"); - } - //if (_log.shouldLog(Log.DEBUG)) { - // _log.debug("Read from I2PInputStream " + this.hashCode() - // + " returned "+read.length+" bytes:\n" - // + HexDump.dump(read)); - //} - return read.length; - } - - public int available() { - return bc.getCurrentSize(); - } - - public void queueData(byte[] data) { - queueData(data,0,data.length); - } - - public synchronized void queueData(byte[] data, int off, int len) { - _log.debug("Insert "+len+" bytes into queue: "+this.hashCode()); - bc.append(data, off, len); - notifyAll(); - } - - public synchronized void notifyClosed() { - notifyAll(); - } - + private ByteCollector bc = new ByteCollector(); + + public int read() throws IOException { + byte[] b = new byte[1]; + int res = read(b); + if (res == 1) return b[0] & 0xff; + if (res == -1) return -1; + throw new RuntimeException("Incorrect read() result"); + } + + public synchronized int read(byte[] b, int off, int len) throws IOException { + _log.debug("Read called: " + this.hashCode()); + if (len == 0) return 0; + byte[] read = bc.startToByteArray(len); + while (read.length == 0) { + synchronized (flagLock) { + if (closed) { + _log.debug("Closed is set, so closing stream: " + this.hashCode()); + return -1; + } + } + try { + wait(); + } catch (InterruptedException ex) { + } + read = bc.startToByteArray(len); + } + if (read.length > len) throw new RuntimeException("BUG"); + System.arraycopy(read, 0, b, off, read.length); + + if (_log.shouldLog(Log.DEBUG)) { + _log.debug("Read from I2PInputStream " + this.hashCode() + " returned " + read.length + " bytes"); + } + //if (_log.shouldLog(Log.DEBUG)) { + // _log.debug("Read from I2PInputStream " + this.hashCode() + // + " returned "+read.length+" bytes:\n" + // + HexDump.dump(read)); + //} + return read.length; + } + + public int available() { + return bc.getCurrentSize(); + } + + public void queueData(byte[] data) { + queueData(data, 0, data.length); + } + + public synchronized void queueData(byte[] data, int off, int len) { + _log.debug("Insert " + len + " bytes into queue: " + this.hashCode()); + bc.append(data, off, len); + notifyAll(); + } + + public synchronized void notifyClosed() { + notifyAll(); + } + } public class I2POutputStream extends OutputStream { - public I2PInputStream sendTo; - - public I2POutputStream(I2PInputStream sendTo) { - this.sendTo=sendTo; - } - public void write(int b) throws IOException { - write(new byte[] {(byte)b}); - } - - public void write (byte[] b, int off, int len) throws IOException { - sendTo.queueData(b,off,len); - } - - public void close() { - sendTo.notifyClosed(); - } + public I2PInputStream sendTo; + + public I2POutputStream(I2PInputStream sendTo) { + this.sendTo = sendTo; + } + + public void write(int b) throws IOException { + write(new byte[] { (byte) b}); + } + + public void write(byte[] b, int off, int len) throws IOException { + sendTo.queueData(b, off, len); + } + + public void close() { + sendTo.notifyClosed(); + } } public class I2PSocketRunner extends I2PThread { - public InputStream in; - - public I2PSocketRunner(InputStream in) { - _log.debug("Runner's input stream is: "+in.hashCode()); - this.in=in; - setName("SocketRunner from " + I2PSocketImpl.this.remote.calculateHash().toBase64().substring(0, 4)); - start(); - } - - public void run() { - byte[] buffer = new byte[MAX_PACKET_SIZE]; - ByteCollector bc = new ByteCollector(); - boolean sent = true; - try { - int len, bcsize; -// try { - while (true) { - len = in.read(buffer); - bcsize = bc.getCurrentSize(); - if (len != -1) { - bc.append(buffer,len); - } else if (bcsize == 0) { - break; - } - if ((bcsize < MAX_PACKET_SIZE) - && (in.available()==0)) { - _log.debug("Runner Point d: "+this.hashCode()); - - try { - Thread.sleep(PACKET_DELAY); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - if ((bcsize >= MAX_PACKET_SIZE) - || (in.available()==0) ) { - byte[] data = bc.startToByteArray(MAX_PACKET_SIZE); - if (data.length > 0) { - _log.debug("Message size is: "+data.length); - sent = sendBlock(data); - if (!sent) { - _log.error("Error sending message to peer. Killing socket runner"); - break; - } - } - } - } - if ((bc.getCurrentSize() > 0) && sent) { - _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " - + "(input stream: " + in.hashCode() + "; " - + "queue size: " + bc.getCurrentSize() + ")"); - } - synchronized(flagLock) { - closed2=true; - } -// } catch (IOException ex) { -// if (_log.shouldLog(Log.INFO)) -// _log.info("Error reading and writing", ex); -// } - boolean sc; - synchronized(flagLock) { - sc=sendClose; - } // FIXME: Race here? - if (sc) { - _log.info("Sending close packet: "+outgoing); - byte[] packet = I2PSocketManager.makePacket - ((byte)(getMask(0x02)),remoteID, new byte[0]); - synchronized(manager.getSession()) { - sent = manager.getSession().sendMessage(remote, packet); - } - if (!sent) { - _log.error("Error sending close packet to peer"); - } - } - manager.removeSocket(I2PSocketImpl.this); - } catch (IOException ex) { - // WHOEVER removes this event on inconsistent - // state before fixing the inconsistent state (a - // reference on the socket in the socket manager - // etc.) will get hanged by me personally -- mihi - _log.error("Error running - **INCONSISTENT STATE!!!**", ex); - } catch (I2PException ex) { - _log.error("Error running - **INCONSISTENT STATE!!!**" , ex); - } - } - - private boolean sendBlock(byte data[]) throws I2PSessionException { - _log.debug("TIMING: Block to send for "+I2PSocketImpl.this.hashCode()); - if (remoteID==null) { - _log.error("NULL REMOTEID"); - return false; - } - byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, - data); - boolean sent; - synchronized(flagLock) { - if (closed2) return false; - } - synchronized(manager.getSession()) { - sent = manager.getSession().sendMessage(remote, packet); - } - return sent; - } + public InputStream in; + + public I2PSocketRunner(InputStream in) { + _log.debug("Runner's input stream is: " + in.hashCode()); + this.in = in; + setName("SocketRunner from " + I2PSocketImpl.this.remote.calculateHash().toBase64().substring(0, 4)); + start(); + } + + public void run() { + byte[] buffer = new byte[MAX_PACKET_SIZE]; + ByteCollector bc = new ByteCollector(); + boolean sent = true; + try { + int len, bcsize; + // try { + while (true) { + len = in.read(buffer); + bcsize = bc.getCurrentSize(); + if (len != -1) { + bc.append(buffer, len); + } else if (bcsize == 0) { + break; + } + if ((bcsize < MAX_PACKET_SIZE) && (in.available() == 0)) { + _log.debug("Runner Point d: " + this.hashCode()); + + try { + Thread.sleep(PACKET_DELAY); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + if ((bcsize >= MAX_PACKET_SIZE) || (in.available() == 0)) { + byte[] data = bc.startToByteArray(MAX_PACKET_SIZE); + if (data.length > 0) { + _log.debug("Message size is: " + data.length); + sent = sendBlock(data); + if (!sent) { + _log.error("Error sending message to peer. Killing socket runner"); + break; + } + } + } + } + if ((bc.getCurrentSize() > 0) && sent) { + _log.error("A SCARY MONSTER HAS EATEN SOME DATA! " + "(input stream: " + in.hashCode() + "; " + + "queue size: " + bc.getCurrentSize() + ")"); + } + synchronized (flagLock) { + closed2 = true; + } + // } catch (IOException ex) { + // if (_log.shouldLog(Log.INFO)) + // _log.info("Error reading and writing", ex); + // } + boolean sc; + synchronized (flagLock) { + sc = sendClose; + } // FIXME: Race here? + if (sc) { + _log.info("Sending close packet: " + outgoing); + byte[] packet = I2PSocketManager.makePacket((byte) (getMask(0x02)), remoteID, new byte[0]); + synchronized (manager.getSession()) { + sent = manager.getSession().sendMessage(remote, packet); + } + if (!sent) { + _log.error("Error sending close packet to peer"); + } + } + manager.removeSocket(I2PSocketImpl.this); + } catch (IOException ex) { + // WHOEVER removes this event on inconsistent + // state before fixing the inconsistent state (a + // reference on the socket in the socket manager + // etc.) will get hanged by me personally -- mihi + _log.error("Error running - **INCONSISTENT STATE!!!**", ex); + } catch (I2PException ex) { + _log.error("Error running - **INCONSISTENT STATE!!!**", ex); + } + } + + private boolean sendBlock(byte data[]) throws I2PSessionException { + _log.debug("TIMING: Block to send for " + I2PSocketImpl.this.hashCode()); + if (remoteID == null) { + _log.error("NULL REMOTEID"); + return false; + } + byte[] packet = I2PSocketManager.makePacket(getMask(0x00), remoteID, data); + boolean sent; + synchronized (flagLock) { + if (closed2) return false; + } + synchronized (manager.getSession()) { + sent = manager.getSession().sendMessage(remote, packet); + } + return sent; + } } -} +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java index d660b203d0f76cf6d1b8a5f7f17b897648932b31..43bad0a6d1f0198f2e3a69656127cd98534bef61 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManager.java @@ -37,201 +37,199 @@ public class I2PSocketManager implements I2PSessionListener { private HashMap _inSockets; private I2PSocketOptions _defaultOptions; - public static final int PUBKEY_LENGTH=387; + public static final int PUBKEY_LENGTH = 387; - public I2PSocketManager() { - _session=null; - _serverSocket = new I2PServerSocketImpl(this); - _inSockets = new HashMap(16); - _outSockets = new HashMap(16); + _session = null; + _serverSocket = new I2PServerSocketImpl(this); + _inSockets = new HashMap(16); + _outSockets = new HashMap(16); } public I2PSession getSession() { - return _session; + return _session; } - - public void setSession(I2PSession session) { - _session = session; - if (session != null) - session.setSessionListener(this); + + public void setSession(I2PSession session) { + _session = session; + if (session != null) session.setSessionListener(this); } - + public void disconnected(I2PSession session) { - _log.error("Disconnected from the session"); + _log.error("Disconnected from the session"); } - + public void errorOccurred(I2PSession session, String message, Throwable error) { - _log.error("Error occurred: [" + message + "]", error); + _log.error("Error occurred: [" + message + "]", error); } - + public void messageAvailable(I2PSession session, int msgId, long size) { - try { - I2PSocketImpl s; - byte msg[] = session.receiveMessage(msgId); - if (msg.length == 1 && msg[0] == -1) { - _log.debug("Ping received"); - return; - } - if (msg.length <4) { - _log.error("==== packet too short ===="); - return; - } - int type = msg[0] & 0xff; - String id = new String(new byte[] {msg[1], msg[2], msg[3]}, - "ISO-8859-1"); - byte[] payload = new byte[msg.length-4]; - System.arraycopy(msg, 4, payload, 0, payload.length); - _log.debug("Message read: type = [" + Integer.toHexString(type) + - "] id = [" + getReadableForm(id)+ - "] payload length: " + payload.length + "]"); - synchronized(lock) { - switch(type) { - case 0x51: // ACK outgoing - s = (I2PSocketImpl) _outSockets.get(id); - if (s == null) { - _log.warn("No socket responsible for ACK packet"); - return; - } - if (payload.length==3 && s.getRemoteID(false)==null) { - String newID = new String(payload, - "ISO-8859-1"); - s.setRemoteID(newID); - return; - } else { - if (payload.length != 3) - _log.warn("Ack packet had " + payload.length + " bytes"); - else - _log.warn("Remote ID already exists? " + s.getRemoteID()); - return; - } - case 0x52: // disconnect outgoing - _log.debug("*Disconnect outgoing!"); - try { - s = (I2PSocketImpl) _outSockets.get(id); - if (payload.length==0 && s != null) { - s.internalClose(); - _outSockets.remove(id); - return; - } else { - if (payload.length > 0) - _log.warn("Disconnect packet had " + payload.length + " bytes"); - return; - } - } catch (Exception t) { - _log.error("Ignoring error on disconnect", t); - } - case 0x50: // packet send outgoing - _log.debug("*Packet send outgoing [" + payload.length + "]"); - s = (I2PSocketImpl) _outSockets.get(id); - if (s != null) { - s.queueData(payload); - return; - } else { - _log.error("Null socket with data available"); - throw new IllegalStateException("Null socket with data available"); - } - case 0xA1: // SYN incoming - _log.debug("*Syn!"); - if (payload.length==PUBKEY_LENGTH) { - String newLocalID = makeID(_inSockets); - Destination d = new Destination(); - d.readBytes(new ByteArrayInputStream(payload)); - - s = new I2PSocketImpl(d, this, false, - newLocalID); - s.setRemoteID(id); - if (_serverSocket.getNewSocket(s)) { - _inSockets.put(newLocalID, s); - byte[] packet = makePacket - ((byte)0x51, id, - newLocalID.getBytes("ISO-8859-1")); - boolean replySentOk = false; - synchronized(_session) { - replySentOk = _session.sendMessage(d, packet); - } - if (!replySentOk) { - _log.error("Error sending reply to " + - d.calculateHash().toBase64() + - " in response to a new con message", - new Exception("Failed creation")); - s.internalClose(); - } - } else { - byte[] packet = - (" "+id).getBytes("ISO-8859-1"); - packet[0]=0x52; - boolean nackSent = session.sendMessage(d, packet); - if (!nackSent) { - _log.error("Error sending NACK for session creation"); - } - s.internalClose(); - } - return; - } else { - _log.error("Syn packet that has a payload not equal to the pubkey length (" + payload.length + " != " + PUBKEY_LENGTH + ")"); - return; - } - case 0xA2: // disconnect incoming - _log.debug("*Disconnect incoming!"); - try { - s = (I2PSocketImpl) _inSockets.get(id); - if (payload.length==0 && s != null) { - s.internalClose(); - _inSockets.remove(id); - return; - } else { - if (payload.length > 0) - _log.warn("Disconnect packet had " + payload.length + " bytes"); - return; - } - } catch (Exception t) { - _log.error("Ignoring error on disconnect", t); - return; - } - case 0xA0: // packet send incoming - _log.debug("*Packet send incoming [" + payload.length + "]"); - s = (I2PSocketImpl) _inSockets.get(id); - if (s != null) { - s.queueData(payload); - return; - } else { - _log.error("Null socket with data available"); - throw new IllegalStateException("Null socket with data available"); - } - case 0xFF: // ignore - return; - } - _log.error("\n\n=============== Unknown packet! "+ - "============"+ - "\nType: "+(int)type+ - "\nID: " + getReadableForm(id)+ - "\nBase64'ed Data: "+Base64.encode(payload)+ - "\n\n\n"); - if (id != null) { - _inSockets.remove(id); - _outSockets.remove(id); - } - } - } catch (I2PException ise) { - _log.error("Error processing", ise); - } catch (IOException ioe) { - _log.error("Error processing", ioe); - } catch (IllegalStateException ise) { - _log.debug("Error processing", ise); - } + try { + I2PSocketImpl s; + byte msg[] = session.receiveMessage(msgId); + if (msg.length == 1 && msg[0] == -1) { + _log.debug("Ping received"); + return; + } + if (msg.length < 4) { + _log.error("==== packet too short ===="); + return; + } + int type = msg[0] & 0xff; + String id = new String(new byte[] { msg[1], msg[2], msg[3]}, "ISO-8859-1"); + byte[] payload = new byte[msg.length - 4]; + System.arraycopy(msg, 4, payload, 0, payload.length); + _log.debug("Message read: type = [" + Integer.toHexString(type) + "] id = [" + getReadableForm(id) + + "] payload length: " + payload.length + "]"); + synchronized (lock) { + switch (type) { + case 0x51: + // ACK outgoing + s = (I2PSocketImpl) _outSockets.get(id); + if (s == null) { + _log.warn("No socket responsible for ACK packet"); + return; + } + if (payload.length == 3 && s.getRemoteID(false) == null) { + String newID = new String(payload, "ISO-8859-1"); + s.setRemoteID(newID); + return; + } else { + if (payload.length != 3) + _log.warn("Ack packet had " + payload.length + " bytes"); + else + _log.warn("Remote ID already exists? " + s.getRemoteID()); + return; + } + case 0x52: + // disconnect outgoing + _log.debug("*Disconnect outgoing!"); + try { + s = (I2PSocketImpl) _outSockets.get(id); + if (payload.length == 0 && s != null) { + s.internalClose(); + _outSockets.remove(id); + return; + } else { + if (payload.length > 0) _log.warn("Disconnect packet had " + payload.length + " bytes"); + return; + } + } catch (Exception t) { + _log.error("Ignoring error on disconnect", t); + } + case 0x50: + // packet send outgoing + _log.debug("*Packet send outgoing [" + payload.length + "]"); + s = (I2PSocketImpl) _outSockets.get(id); + if (s != null) { + s.queueData(payload); + return; + } else { + _log.error("Null socket with data available"); + throw new IllegalStateException("Null socket with data available"); + } + case 0xA1: + // SYN incoming + _log.debug("*Syn!"); + if (payload.length == PUBKEY_LENGTH) { + String newLocalID = makeID(_inSockets); + Destination d = new Destination(); + d.readBytes(new ByteArrayInputStream(payload)); + + s = new I2PSocketImpl(d, this, false, newLocalID); + s.setRemoteID(id); + if (_serverSocket.getNewSocket(s)) { + _inSockets.put(newLocalID, s); + byte[] packet = makePacket((byte) 0x51, id, newLocalID.getBytes("ISO-8859-1")); + boolean replySentOk = false; + synchronized (_session) { + replySentOk = _session.sendMessage(d, packet); + } + if (!replySentOk) { + _log.error("Error sending reply to " + d.calculateHash().toBase64() + + " in response to a new con message", new Exception("Failed creation")); + s.internalClose(); + } + } else { + byte[] packet = (" " + id).getBytes("ISO-8859-1"); + packet[0] = 0x52; + boolean nackSent = session.sendMessage(d, packet); + if (!nackSent) { + _log.error("Error sending NACK for session creation"); + } + s.internalClose(); + } + return; + } else { + _log.error("Syn packet that has a payload not equal to the pubkey length (" + payload.length + + " != " + PUBKEY_LENGTH + ")"); + return; + } + case 0xA2: + // disconnect incoming + _log.debug("*Disconnect incoming!"); + try { + s = (I2PSocketImpl) _inSockets.get(id); + if (payload.length == 0 && s != null) { + s.internalClose(); + _inSockets.remove(id); + return; + } else { + if (payload.length > 0) _log.warn("Disconnect packet had " + payload.length + " bytes"); + return; + } + } catch (Exception t) { + _log.error("Ignoring error on disconnect", t); + return; + } + case 0xA0: + // packet send incoming + _log.debug("*Packet send incoming [" + payload.length + "]"); + s = (I2PSocketImpl) _inSockets.get(id); + if (s != null) { + s.queueData(payload); + return; + } else { + _log.error("Null socket with data available"); + throw new IllegalStateException("Null socket with data available"); + } + case 0xFF: + // ignore + return; + } + _log.error("\n\n=============== Unknown packet! " + "============" + "\nType: " + (int) type + + "\nID: " + getReadableForm(id) + "\nBase64'ed Data: " + Base64.encode(payload) + + "\n\n\n"); + if (id != null) { + _inSockets.remove(id); + _outSockets.remove(id); + } + } + } catch (I2PException ise) { + _log.error("Error processing", ise); + } catch (IOException ioe) { + _log.error("Error processing", ioe); + } catch (IllegalStateException ise) { + _log.debug("Error processing", ise); + } } - + public void reportAbuse(I2PSession session, int severity) { - _log.error("Abuse reported [" + severity + "]"); + _log.error("Abuse reported [" + severity + "]"); + } + + public void setDefaultOptions(I2PSocketOptions options) { + _defaultOptions = options; + } + + public I2PSocketOptions getDefaultOptions() { + return _defaultOptions; + } + + public I2PServerSocket getServerSocket() { + return _serverSocket; } - - public void setDefaultOptions(I2PSocketOptions options) { _defaultOptions = options; } - public I2PSocketOptions getDefaultOptions() { return _defaultOptions ; } - - public I2PServerSocket getServerSocket() { return _serverSocket; } - /** * Create a new connected socket (block until the socket is created) * @@ -239,104 +237,101 @@ public class I2PSocketManager implements I2PSessionListener { */ public I2PSocket connect(Destination peer, I2PSocketOptions options) throws I2PException { - String localID, lcID; - I2PSocketImpl s; - synchronized(lock) { - localID=makeID(_outSockets); - lcID=getReadableForm(localID); - s = new I2PSocketImpl(peer, this, true, localID); - _outSockets.put(s.getLocalID(),s); - } - try { - ByteArrayOutputStream pubkey = new ByteArrayOutputStream(); - _session.getMyDestination().writeBytes(pubkey); - String remoteID; - byte[] packet = makePacket((byte)0xA1, localID, - pubkey.toByteArray()); - boolean sent = false; - synchronized(_session) { - sent = _session.sendMessage(peer, packet); - } - if (!sent) { - _log.info("Unable to send & receive ack for SYN packet"); - synchronized(lock) { - _outSockets.remove(s.getLocalID()); - } - throw new I2PException("Unable to reach peer"); - } - remoteID = s.getRemoteID(true, options.getConnectTimeout()); - if ("".equals(remoteID)) { - throw new I2PException("Unable to reach peer"); - } - _log.debug("TIMING: s given out for remoteID "+getReadableForm(remoteID)); - return s; - } catch (InterruptedIOException ioe) { - _log.error("Timeout waiting for ack from syn for id " + getReadableForm(lcID), ioe); - synchronized(lock) { - _outSockets.remove(s.getLocalID()); - } - throw new I2PException("Timeout waiting for ack"); - } catch (IOException ex) { - _log.error("Error sending syn on id " + getReadableForm(lcID), ex); - synchronized(lock) { - _outSockets.remove(s.getLocalID()); - } - throw new I2PException("IOException occurred"); - } catch (I2PException ex) { - _log.info("Error sending syn on id " + getReadableForm(lcID), ex); - synchronized(lock) { - _outSockets.remove(s.getLocalID()); - } - throw ex; - } + String localID, lcID; + I2PSocketImpl s; + synchronized (lock) { + localID = makeID(_outSockets); + lcID = getReadableForm(localID); + s = new I2PSocketImpl(peer, this, true, localID); + _outSockets.put(s.getLocalID(), s); + } + try { + ByteArrayOutputStream pubkey = new ByteArrayOutputStream(); + _session.getMyDestination().writeBytes(pubkey); + String remoteID; + byte[] packet = makePacket((byte) 0xA1, localID, pubkey.toByteArray()); + boolean sent = false; + synchronized (_session) { + sent = _session.sendMessage(peer, packet); + } + if (!sent) { + _log.info("Unable to send & receive ack for SYN packet"); + synchronized (lock) { + _outSockets.remove(s.getLocalID()); + } + throw new I2PException("Unable to reach peer"); + } + remoteID = s.getRemoteID(true, options.getConnectTimeout()); + if ("".equals(remoteID)) { throw new I2PException("Unable to reach peer"); } + _log.debug("TIMING: s given out for remoteID " + getReadableForm(remoteID)); + return s; + } catch (InterruptedIOException ioe) { + _log.error("Timeout waiting for ack from syn for id " + getReadableForm(lcID), ioe); + synchronized (lock) { + _outSockets.remove(s.getLocalID()); + } + throw new I2PException("Timeout waiting for ack"); + } catch (IOException ex) { + _log.error("Error sending syn on id " + getReadableForm(lcID), ex); + synchronized (lock) { + _outSockets.remove(s.getLocalID()); + } + throw new I2PException("IOException occurred"); + } catch (I2PException ex) { + _log.info("Error sending syn on id " + getReadableForm(lcID), ex); + synchronized (lock) { + _outSockets.remove(s.getLocalID()); + } + throw ex; + } } - + public I2PSocket connect(Destination peer) throws I2PException { - return connect(peer, null); + return connect(peer, null); } - - /** + + /** * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. * */ public Set listSockets() { - Set sockets = new HashSet(8); - synchronized (lock) { - sockets.addAll(_inSockets.values()); - sockets.addAll(_outSockets.values()); - } - return sockets; + Set sockets = new HashSet(8); + synchronized (lock) { + sockets.addAll(_inSockets.values()); + sockets.addAll(_outSockets.values()); + } + return sockets; } - + /** * Ping the specified peer, returning true if they replied to the ping within * the timeout specified, false otherwise. This call blocks. * */ public boolean ping(Destination peer, long timeoutMs) { - try { - return _session.sendMessage(peer, new byte[] {(byte)0xFF}); - } catch (I2PException ex) { - _log.error("I2PException:",ex); - return false; - } + try { + return _session.sendMessage(peer, new byte[] { (byte) 0xFF}); + } catch (I2PException ex) { + _log.error("I2PException:", ex); + return false; + } } public void removeSocket(I2PSocketImpl sock) { - synchronized(lock) { - _inSockets.remove(sock.getLocalID()); - _outSockets.remove(sock.getLocalID()); - } + synchronized (lock) { + _inSockets.remove(sock.getLocalID()); + _outSockets.remove(sock.getLocalID()); + } } public static String getReadableForm(String id) { - try { - if (id.length() != 3) return "Bogus"; - return Base64.encode(id.getBytes("ISO-8859-1")); - } catch (UnsupportedEncodingException ex) { - ex.printStackTrace(); - return null; - } + try { + if (id.length() != 3) return "Bogus"; + return Base64.encode(id.getBytes("ISO-8859-1")); + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); + return null; + } } /** @@ -345,21 +340,21 @@ public class I2PSocketManager implements I2PSessionListener { * @param uniqueIn map of already known local IDs so we don't collide. WARNING - NOT THREADSAFE! */ public static String makeID(HashMap uniqueIn) { - String newID; - try { - do { - int id = (int)(Math.random()*16777215+1); - byte[] nid = new byte[3]; - nid[0]=(byte)(id / 65536); - nid[1] = (byte)((id/256) % 256); - nid[2]= (byte)(id %256); - newID = new String(nid, "ISO-8859-1"); - } while (uniqueIn.get(newID) != null); - return newID; - } catch (UnsupportedEncodingException ex) { - ex.printStackTrace(); - return null; - } + String newID; + try { + do { + int id = (int) (Math.random() * 16777215 + 1); + byte[] nid = new byte[3]; + nid[0] = (byte) (id / 65536); + nid[1] = (byte) ((id / 256) % 256); + nid[2] = (byte) (id % 256); + newID = new String(nid, "ISO-8859-1"); + } while (uniqueIn.get(newID) != null); + return newID; + } catch (UnsupportedEncodingException ex) { + ex.printStackTrace(); + return null; + } } /** @@ -367,20 +362,17 @@ public class I2PSocketManager implements I2PSessionListener { * the given payload */ public static byte[] makePacket(byte type, String id, byte[] payload) { - try { - byte[] packet = new byte[payload.length+4]; - packet[0]=type; - byte[] temp = id.getBytes("ISO-8859-1"); - if (temp.length != 3) - throw new RuntimeException("Incorrect ID length: "+ - temp.length); - System.arraycopy(temp,0,packet,1,3); - System.arraycopy(payload,0,packet,4,payload.length); - return packet; - } catch (UnsupportedEncodingException ex) { - if (_log.shouldLog(Log.ERROR)) - _log.error("Error building the packet", ex); - return new byte[0]; - } + try { + byte[] packet = new byte[payload.length + 4]; + packet[0] = type; + byte[] temp = id.getBytes("ISO-8859-1"); + if (temp.length != 3) throw new RuntimeException("Incorrect ID length: " + temp.length); + System.arraycopy(temp, 0, packet, 1, 3); + System.arraycopy(payload, 0, packet, 4, payload.length); + return packet; + } catch (UnsupportedEncodingException ex) { + if (_log.shouldLog(Log.ERROR)) _log.error("Error building the packet", ex); + return new byte[0]; + } } -} +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java index 578e5499270b66bfc003c9b5bd5a962da99ae0b0..1f92a7952e1d4cc2a114e3c4baab34e8ae8c18b0 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketManagerFactory.java @@ -22,7 +22,7 @@ import net.i2p.util.Log; */ public class I2PSocketManagerFactory { private final static Log _log = new Log(I2PSocketManagerFactory.class); - + /** * Create a socket manager using a brand new destination connected to the * I2CP router on the local machine on the default port (7654). @@ -30,9 +30,9 @@ public class I2PSocketManagerFactory { * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager() { - return createManager("localhost", 7654, new Properties()); + return createManager("localhost", 7654, new Properties()); } - + /** * Create a socket manager using a brand new destination connected to the * I2CP router on the given machine reachable through the given port. @@ -40,21 +40,21 @@ public class I2PSocketManagerFactory { * @return the newly created socket manager, or null if there were errors */ public static I2PSocketManager createManager(String i2cpHost, int i2cpPort, Properties opts) { - I2PClient client = I2PClientFactory.createClient(); - ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512); - try { - Destination dest = client.createDestination(keyStream); - ByteArrayInputStream in = new ByteArrayInputStream(keyStream.toByteArray()); - return createManager(in, i2cpHost, i2cpPort, opts); - } catch (IOException ioe) { - _log.error("Error creating the destination for socket manager", ioe); - return null; - } catch (I2PException ie) { - _log.error("Error creating the destination for socket manager", ie); - return null; - } + I2PClient client = I2PClientFactory.createClient(); + ByteArrayOutputStream keyStream = new ByteArrayOutputStream(512); + try { + Destination dest = client.createDestination(keyStream); + ByteArrayInputStream in = new ByteArrayInputStream(keyStream.toByteArray()); + return createManager(in, i2cpHost, i2cpPort, opts); + } catch (IOException ioe) { + _log.error("Error creating the destination for socket manager", ioe); + return null; + } catch (I2PException ie) { + _log.error("Error creating the destination for socket manager", ie); + return null; + } } - + /** * Create a socket manager using the destination loaded from the given private key * stream and connected to the I2CP router on the specified machine on the given @@ -62,24 +62,25 @@ public class I2PSocketManagerFactory { * * @return the newly created socket manager, or null if there were errors */ - public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, Properties opts) { - I2PClient client = I2PClientFactory.createClient(); - opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); - opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost); - opts.setProperty(I2PClient.PROP_TCP_PORT, ""+i2cpPort); - try { - I2PSession session = client.createSession(myPrivateKeyStream, opts); - session.connect(); - return createManager(session); - } catch (I2PSessionException ise) { - _log.error("Error creating session for socket manager", ise); - return null; - } + public static I2PSocketManager createManager(InputStream myPrivateKeyStream, String i2cpHost, int i2cpPort, + Properties opts) { + I2PClient client = I2PClientFactory.createClient(); + opts.setProperty(I2PClient.PROP_RELIABILITY, I2PClient.PROP_RELIABILITY_GUARANTEED); + opts.setProperty(I2PClient.PROP_TCP_HOST, i2cpHost); + opts.setProperty(I2PClient.PROP_TCP_PORT, "" + i2cpPort); + try { + I2PSession session = client.createSession(myPrivateKeyStream, opts); + session.connect(); + return createManager(session); + } catch (I2PSessionException ise) { + _log.error("Error creating session for socket manager", ise); + return null; + } } - + private static I2PSocketManager createManager(I2PSession session) { - I2PSocketManager mgr = new I2PSocketManager(); - mgr.setSession(session); - return mgr; + I2PSocketManager mgr = new I2PSocketManager(); + mgr.setSession(session); + return mgr; } -} +} \ No newline at end of file diff --git a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java index 99feb7c09b7596c4459f43114823667dc2ab84d8..ee71cf72a612dc875830307cab6507528f88cd8f 100644 --- a/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java +++ b/apps/ministreaming/java/src/net/i2p/client/streaming/I2PSocketOptions.java @@ -7,15 +7,21 @@ package net.i2p.client.streaming; */ public class I2PSocketOptions { private long _connectTimeout; + public I2PSocketOptions() { - _connectTimeout = -1; + _connectTimeout = -1; } - + /** * How long we will wait for the ACK from a SYN, in milliseconds. * * @return milliseconds to wait, or -1 if we will wait indefinitely */ - public long getConnectTimeout() { return _connectTimeout; } - public void setConnectTimeout(long ms) { _connectTimeout = ms; } -} + public long getConnectTimeout() { + return _connectTimeout; + } + + public void setConnectTimeout(long ms) { + _connectTimeout = ms; + } +} \ No newline at end of file diff --git a/apps/phttprelay/java/src/net/i2p/phttprelay/CheckSendStatusServlet.java b/apps/phttprelay/java/src/net/i2p/phttprelay/CheckSendStatusServlet.java index e303f733cdc3b87885aa48631ed95644cd80badc..b7c2af1b9481fccfae9b93e74c6d5b151cc56dc7 100644 --- a/apps/phttprelay/java/src/net/i2p/phttprelay/CheckSendStatusServlet.java +++ b/apps/phttprelay/java/src/net/i2p/phttprelay/CheckSendStatusServlet.java @@ -1,4 +1,5 @@ package net.i2p.phttprelay; + /* * free (adj.): unencumbered; not under the control of others * Written by jrandom in 2003 and released into the public domain @@ -44,70 +45,70 @@ import javax.servlet.http.HttpServletResponse; */ public class CheckSendStatusServlet extends PHTTPRelayServlet { /* URL parameters on the check */ - + /** H(routerIdent).toBase64() of the target to receive the message */ - public final static String PARAM_SEND_TARGET = "target"; + public final static String PARAM_SEND_TARGET = "target"; /** msgId parameter */ public final static String PARAM_MSG_ID = "msgId"; public final static String PROP_STATUS = "status"; public final static String STATUS_PENDING = "pending"; public final static String STATUS_UNKNOWN = "unknown"; - + public void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - String target = req.getParameter(PARAM_SEND_TARGET); - String msgIdStr = req.getParameter(PARAM_MSG_ID); - - log("Checking status of [" + target + "] message [" + msgIdStr + "]"); - if (!isKnownMessage(target, msgIdStr)) { - log("Not known - its not pending"); - notPending(req, resp); - return; - } else { - log("Known - its still pending"); - pending(req, resp); - return; - } + String target = req.getParameter(PARAM_SEND_TARGET); + String msgIdStr = req.getParameter(PARAM_MSG_ID); + + log("Checking status of [" + target + "] message [" + msgIdStr + "]"); + if (!isKnownMessage(target, msgIdStr)) { + log("Not known - its not pending"); + notPending(req, resp); + return; + } else { + log("Known - its still pending"); + pending(req, resp); + return; + } } - + private boolean isKnownMessage(String target, String msgId) throws IOException { - if ( (target == null) || (target.trim().length() <= 0) ) return false; - if ( (msgId == null) || (msgId.trim().length() <= 0) ) return false; - File identDir = getIdentDir(target); - if (identDir.exists()) { - File identFile = new File(identDir, "identity.dat"); - if (identFile.exists()) { - // known and valid (maybe we need to check the file format... naw, fuck it - File msgFile = new File(identDir, "msg" + msgId + ".dat"); - if (msgFile.exists()) - return true; - else - return false; - } else { - return false; - } - } else { - return false; - } + if ((target == null) || (target.trim().length() <= 0)) return false; + if ((msgId == null) || (msgId.trim().length() <= 0)) return false; + File identDir = getIdentDir(target); + if (identDir.exists()) { + File identFile = new File(identDir, "identity.dat"); + if (identFile.exists()) { + // known and valid (maybe we need to check the file format... naw, fuck it + File msgFile = new File(identDir, "msg" + msgId + ".dat"); + if (msgFile.exists()) + return true; + else + return false; + } else { + return false; + } + } else { + return false; + } } - + private void pending(HttpServletRequest req, HttpServletResponse resp) throws IOException { - resp.setStatus(HttpServletResponse.SC_OK); - ServletOutputStream out = resp.getOutputStream(); - StringBuffer buf = new StringBuffer(); - buf.append(PROP_STATUS).append('=').append(STATUS_PENDING).append('\n'); - out.write(buf.toString().getBytes()); - out.flush(); - out.close(); + resp.setStatus(HttpServletResponse.SC_OK); + ServletOutputStream out = resp.getOutputStream(); + StringBuffer buf = new StringBuffer(); + buf.append(PROP_STATUS).append('=').append(STATUS_PENDING).append('\n'); + out.write(buf.toString().getBytes()); + out.flush(); + out.close(); } - + private void notPending(HttpServletRequest req, HttpServletResponse resp) throws IOException { - resp.setStatus(HttpServletResponse.SC_OK); - ServletOutputStream out = resp.getOutputStream(); - StringBuffer buf = new StringBuffer(); - buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n'); - out.write(buf.toString().getBytes()); - out.flush(); - out.close(); + resp.setStatus(HttpServletResponse.SC_OK); + ServletOutputStream out = resp.getOutputStream(); + StringBuffer buf = new StringBuffer(); + buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n'); + out.write(buf.toString().getBytes()); + out.flush(); + out.close(); } -} +} \ No newline at end of file diff --git a/apps/phttprelay/java/src/net/i2p/phttprelay/LockManager.java b/apps/phttprelay/java/src/net/i2p/phttprelay/LockManager.java index c84941191d47963a632a468f3f42bb789e8e3a9f..433203169f57d4dd123bb9c9fa996f3e5dd50c49 100644 --- a/apps/phttprelay/java/src/net/i2p/phttprelay/LockManager.java +++ b/apps/phttprelay/java/src/net/i2p/phttprelay/LockManager.java @@ -1,4 +1,5 @@ package net.i2p.phttprelay; + /* * free (adj.): unencumbered; not under the control of others * Written by jrandom in 2003 and released into the public domain @@ -18,23 +19,26 @@ import java.util.Set; */ class LockManager { private volatile static Set _locks = new HashSet(); // target - + public static void lockIdent(String target) { - while (true) { - synchronized (_locks) { - if (!_locks.contains(target)) { - _locks.add(target); - return; - } - try { _locks.wait(1000); } catch (InterruptedException ie) {} - } - } + while (true) { + synchronized (_locks) { + if (!_locks.contains(target)) { + _locks.add(target); + return; + } + try { + _locks.wait(1000); + } catch (InterruptedException ie) { + } + } + } } - + public static void unlockIdent(String target) { - synchronized (_locks) { - _locks.remove(target); - _locks.notifyAll(); - } + synchronized (_locks) { + _locks.remove(target); + _locks.notifyAll(); + } } -} +} \ No newline at end of file diff --git a/apps/phttprelay/java/src/net/i2p/phttprelay/PHTTPRelayServlet.java b/apps/phttprelay/java/src/net/i2p/phttprelay/PHTTPRelayServlet.java index 2cec78304ebf03e805cb0dba06bd494de90f7aa1..45744a2d20e3badb2d7cc647ca10b47dfd27c31a 100644 --- a/apps/phttprelay/java/src/net/i2p/phttprelay/PHTTPRelayServlet.java +++ b/apps/phttprelay/java/src/net/i2p/phttprelay/PHTTPRelayServlet.java @@ -1,4 +1,5 @@ package net.i2p.phttprelay; + /* * free (adj.): unencumbered; not under the control of others * Written by jrandom in 2003 and released into the public domain @@ -25,49 +26,50 @@ abstract class PHTTPRelayServlet extends HttpServlet { /* config params */ /*public final static String PARAM_BASEDIR = "baseDir";*/ public final static String ENV_BASEDIR = "phttpRelay.baseDir"; - + /** match the clock fudge factor on the router, rather than importing the entire router cvs module */ - public final static long CLOCK_FUDGE_FACTOR = 1*60*1000; - + public final static long CLOCK_FUDGE_FACTOR = 1 * 60 * 1000; + protected String buildURL(HttpServletRequest req, String path) { - StringBuffer buf = new StringBuffer(); - buf.append(req.getScheme()).append("://"); - buf.append(req.getServerName()).append(":").append(req.getServerPort()); - buf.append(req.getContextPath()); - buf.append(path); - log("URL built: " + buf.toString()); - return buf.toString(); + StringBuffer buf = new StringBuffer(); + buf.append(req.getScheme()).append("://"); + buf.append(req.getServerName()).append(":").append(req.getServerPort()); + buf.append(req.getContextPath()); + buf.append(path); + log("URL built: " + buf.toString()); + return buf.toString(); } - + protected File getIdentDir(String target) throws IOException { - if ( (_baseDir == null) || (target == null) ) throw new IOException("dir not specified to deal with"); - File baseDir = new File(_baseDir); - if (!baseDir.exists()) { - boolean created = baseDir.mkdirs(); - log("Creating PHTTP Relay Base Directory: " + baseDir.getAbsolutePath() + " - ok? " + created); - } - File identDir = new File(baseDir, target); - log("Ident dir: " + identDir.getAbsolutePath()); - return identDir; + if ((_baseDir == null) || (target == null)) throw new IOException("dir not specified to deal with"); + File baseDir = new File(_baseDir); + if (!baseDir.exists()) { + boolean created = baseDir.mkdirs(); + log("Creating PHTTP Relay Base Directory: " + baseDir.getAbsolutePath() + " - ok? " + created); + } + File identDir = new File(baseDir, target); + log("Ident dir: " + identDir.getAbsolutePath()); + return identDir; } - + public void init(ServletConfig config) throws ServletException { - super.init(config); - String dir = System.getProperty(ENV_BASEDIR); - if (dir == null) { - _log.warn("Base directory for the polling http relay system not in the environment [" + ENV_BASEDIR +"]"); - _log.warn("Setting the base directory to ./relayDir for " + getServletName()); - _baseDir = ".relayDir"; - } else { - _baseDir = dir; - log("Loaded up " + getServletName() + " with base directory " + _baseDir); - } + super.init(config); + String dir = System.getProperty(ENV_BASEDIR); + if (dir == null) { + _log.warn("Base directory for the polling http relay system not in the environment [" + ENV_BASEDIR + "]"); + _log.warn("Setting the base directory to ./relayDir for " + getServletName()); + _baseDir = ".relayDir"; + } else { + _baseDir = dir; + log("Loaded up " + getServletName() + " with base directory " + _baseDir); + } } - + public void log(String msg) { - _log.debug(msg); + _log.debug(msg); } + public void log(String msg, Throwable t) { - _log.debug(msg, t); + _log.debug(msg, t); } -} +} \ No newline at end of file diff --git a/apps/phttprelay/java/src/net/i2p/phttprelay/PollServlet.java b/apps/phttprelay/java/src/net/i2p/phttprelay/PollServlet.java index 7da0bcef6c74c77c5671c1c8669bf416334793b6..d1cf4acaf3652ad1b72a0096153e7949c0e66102 100644 --- a/apps/phttprelay/java/src/net/i2p/phttprelay/PollServlet.java +++ b/apps/phttprelay/java/src/net/i2p/phttprelay/PollServlet.java @@ -1,4 +1,5 @@ package net.i2p.phttprelay; + /* * free (adj.): unencumbered; not under the control of others * Written by jrandom in 2003 and released into the public domain @@ -55,209 +56,208 @@ import net.i2p.util.Clock; * baseDir is the directory under which registrants and their pending messages are stored * */ -public class PollServlet extends PHTTPRelayServlet { +public class PollServlet extends PHTTPRelayServlet { /* URL parameters on the check */ - + /** H(routerIdent).toBase64() of the target to receive the message */ - public final static String PARAM_SEND_TARGET = "target"; - + public final static String PARAM_SEND_TARGET = "target"; + /** HTTP error code if the target is not known*/ public final static int CODE_UNKNOWN = HttpServletResponse.SC_NOT_FOUND; /** HTTP error code if the signature failed */ public final static int CODE_UNAUTHORIZED = HttpServletResponse.SC_UNAUTHORIZED; /** HTTP error code if everything is ok */ public final static int CODE_OK = HttpServletResponse.SC_OK; - + public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - byte data[] = getData(req); - if (data == null) return; - ByteArrayInputStream bais = new ByteArrayInputStream(data); - String target = getTarget(bais); - if (target == null) { - log("Target not specified"); - resp.sendError(CODE_UNKNOWN); - return; - } - - if (!isKnown(target)) { - resp.sendError(CODE_UNKNOWN); - return; - } - - if (!isAuthorized(target, bais)) { - resp.sendError(CODE_UNAUTHORIZED); - return; - } else { - log("Authorized access for target " + target); - } - - sendMessages(resp, target); + byte data[] = getData(req); + if (data == null) return; + ByteArrayInputStream bais = new ByteArrayInputStream(data); + String target = getTarget(bais); + if (target == null) { + log("Target not specified"); + resp.sendError(CODE_UNKNOWN); + return; + } + + if (!isKnown(target)) { + resp.sendError(CODE_UNKNOWN); + return; + } + + if (!isAuthorized(target, bais)) { + resp.sendError(CODE_UNAUTHORIZED); + return; + } else { + log("Authorized access for target " + target); + } + + sendMessages(resp, target); } - + private byte[] getData(HttpServletRequest req) throws ServletException, IOException { - ServletInputStream in = req.getInputStream(); - int len = req.getContentLength(); - byte data[] = new byte[len]; - int cur = 0; - int read = DataHelper.read(in, data); - if (read != len) { - log("Size read is incorrect [" + read + " instead of expected " + len + "]"); - return null; - } else { - log("Read data length: " + data.length + " in base64: " + Base64.encode(data)); - return data; - } + ServletInputStream in = req.getInputStream(); + int len = req.getContentLength(); + byte data[] = new byte[len]; + int cur = 0; + int read = DataHelper.read(in, data); + if (read != len) { + log("Size read is incorrect [" + read + " instead of expected " + len + "]"); + return null; + } else { + log("Read data length: " + data.length + " in base64: " + Base64.encode(data)); + return data; + } } - + private String getTarget(InputStream in) throws IOException { - StringBuffer buf = new StringBuffer(64); - int numBytes = 0; - int c = 0; - while ( (c = in.read()) != -1) { - if (c == (int)'&') break; - buf.append((char)c); - numBytes++; - if (numBytes > 128) { - log("Target didn't find the & after 128 bytes [" + buf.toString() + "]"); - return null; - } - } - if (buf.toString().indexOf("target=") != 0) { - log("Did not start with target= [" + buf.toString() + "]"); - return null; - } - return buf.substring("target=".length()); + StringBuffer buf = new StringBuffer(64); + int numBytes = 0; + int c = 0; + while ((c = in.read()) != -1) { + if (c == (int) '&') break; + buf.append((char) c); + numBytes++; + if (numBytes > 128) { + log("Target didn't find the & after 128 bytes [" + buf.toString() + "]"); + return null; + } + } + if (buf.toString().indexOf("target=") != 0) { + log("Did not start with target= [" + buf.toString() + "]"); + return null; + } + return buf.substring("target=".length()); } - + private void sendMessages(HttpServletResponse resp, String target) throws IOException { - log("Before lock " + target); - LockManager.lockIdent(target); - log("Locked " + target); - try { - File identDir = getIdentDir(target); - expire(identDir); - File messageFiles[] = identDir.listFiles(); - resp.setStatus(CODE_OK); - log("Sending back " + (messageFiles.length -1) + " messages"); - ServletOutputStream out = resp.getOutputStream(); - DataHelper.writeDate(out, new Date(Clock.getInstance().now())); - DataHelper.writeLong(out, 2, messageFiles.length -1); - for (int i = 0; i < messageFiles.length; i++) { - if ("identity.dat".equals(messageFiles[i].getName())) { - // skip - } else { - log("Message file " + messageFiles[i].getName() + " is " + messageFiles[i].length() + " bytes"); - DataHelper.writeLong(out, 4, messageFiles[i].length()); - writeFile(out, messageFiles[i]); - boolean deleted = messageFiles[i].delete(); - if (!deleted) { - log("!!!Error removing message file " + messageFiles[i].getAbsolutePath() + " - please delete!"); - } - } - } - out.flush(); - out.close(); - } catch (DataFormatException dfe) { - log("Error sending message", dfe); - } finally { - LockManager.unlockIdent(target); - log("Unlocked " + target); - } + log("Before lock " + target); + LockManager.lockIdent(target); + log("Locked " + target); + try { + File identDir = getIdentDir(target); + expire(identDir); + File messageFiles[] = identDir.listFiles(); + resp.setStatus(CODE_OK); + log("Sending back " + (messageFiles.length - 1) + " messages"); + ServletOutputStream out = resp.getOutputStream(); + DataHelper.writeDate(out, new Date(Clock.getInstance().now())); + DataHelper.writeLong(out, 2, messageFiles.length - 1); + for (int i = 0; i < messageFiles.length; i++) { + if ("identity.dat".equals(messageFiles[i].getName())) { + // skip + } else { + log("Message file " + messageFiles[i].getName() + " is " + messageFiles[i].length() + " bytes"); + DataHelper.writeLong(out, 4, messageFiles[i].length()); + writeFile(out, messageFiles[i]); + boolean deleted = messageFiles[i].delete(); + if (!deleted) { + log("!!!Error removing message file " + messageFiles[i].getAbsolutePath() + " - please delete!"); + } + } + } + out.flush(); + out.close(); + } catch (DataFormatException dfe) { + log("Error sending message", dfe); + } finally { + LockManager.unlockIdent(target); + log("Unlocked " + target); + } } - - private final static long EXPIRE_DELAY = 60*1000; // expire messages every minute - + + private final static long EXPIRE_DELAY = 60 * 1000; // expire messages every minute + private void expire(File identDir) throws IOException { - File files[] = identDir.listFiles(); - long now = System.currentTimeMillis(); - for (int i = 0 ; i < files.length; i++) { - if ("identity.dat".equals(files[i].getName())) { - continue; - } - if (files[i].lastModified() + EXPIRE_DELAY < now) { - log("Expiring " + files[i].getAbsolutePath()); - files[i].delete(); - } - } + File files[] = identDir.listFiles(); + long now = System.currentTimeMillis(); + for (int i = 0; i < files.length; i++) { + if ("identity.dat".equals(files[i].getName())) { + continue; + } + if (files[i].lastModified() + EXPIRE_DELAY < now) { + log("Expiring " + files[i].getAbsolutePath()); + files[i].delete(); + } + } } - + private void writeFile(ServletOutputStream out, File file) throws IOException { - FileInputStream fis = new FileInputStream(file); - try { - byte buf[] = new byte[4096]; - while (true) { - int read = DataHelper.read(fis, buf); - if (read > 0) - out.write(buf, 0, read); - else - break; - } - } finally { - fis.close(); - } + FileInputStream fis = new FileInputStream(file); + try { + byte buf[] = new byte[4096]; + while (true) { + int read = DataHelper.read(fis, buf); + if (read > 0) + out.write(buf, 0, read); + else + break; + } + } finally { + fis.close(); + } } - - + private boolean isKnown(String target) throws IOException { - File identDir = getIdentDir(target); - if (identDir.exists()) { - File identFile = new File(identDir, "identity.dat"); - if (identFile.exists()) { - // known and valid (maybe we need to check the file format... naw, fuck it - return true; - } else { - return false; - } - } else { - return false; - } + File identDir = getIdentDir(target); + if (identDir.exists()) { + File identFile = new File(identDir, "identity.dat"); + if (identFile.exists()) { + // known and valid (maybe we need to check the file format... naw, fuck it + return true; + } else { + return false; + } + } else { + return false; + } } - + private boolean isAuthorized(String target, InputStream in) throws IOException { - RouterIdentity ident = null; - try { - ident = getRouterIdentity(target); - } catch (DataFormatException dfe) { - log("Identity was not valid", dfe); - } - - if (ident == null) { - log("Identity not registered"); - return false; - } - - try { - long val = DataHelper.readLong(in, 4); - Signature sig = new Signature(); - sig.readBytes(in); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataHelper.writeLong(baos, 4, val); - if (DSAEngine.getInstance().verifySignature(sig, baos.toByteArray(), ident.getSigningPublicKey())) { - return true; - } else { - log("Signature does NOT match"); - return false; - } - } catch (DataFormatException dfe) { - log("Format error reading the nonce and signature", dfe); - return false; - } + RouterIdentity ident = null; + try { + ident = getRouterIdentity(target); + } catch (DataFormatException dfe) { + log("Identity was not valid", dfe); + } + + if (ident == null) { + log("Identity not registered"); + return false; + } + + try { + long val = DataHelper.readLong(in, 4); + Signature sig = new Signature(); + sig.readBytes(in); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataHelper.writeLong(baos, 4, val); + if (DSAEngine.getInstance().verifySignature(sig, baos.toByteArray(), ident.getSigningPublicKey())) { + return true; + } else { + log("Signature does NOT match"); + return false; + } + } catch (DataFormatException dfe) { + log("Format error reading the nonce and signature", dfe); + return false; + } } - + private RouterIdentity getRouterIdentity(String target) throws IOException, DataFormatException { - File identDir = getIdentDir(target); - if (identDir.exists()) { - File identFile = new File(identDir, "identity.dat"); - if (identFile.exists()) { - // known and valid (maybe we need to check the file format... naw, fuck it - RouterIdentity ident = new RouterIdentity(); - ident.readBytes(new FileInputStream(identFile)); - return ident; - } else { - return null; - } - } else { - return null; - } + File identDir = getIdentDir(target); + if (identDir.exists()) { + File identFile = new File(identDir, "identity.dat"); + if (identFile.exists()) { + // known and valid (maybe we need to check the file format... naw, fuck it + RouterIdentity ident = new RouterIdentity(); + ident.readBytes(new FileInputStream(identFile)); + return ident; + } else { + return null; + } + } else { + return null; + } } -} +} \ No newline at end of file diff --git a/apps/phttprelay/java/src/net/i2p/phttprelay/RegisterServlet.java b/apps/phttprelay/java/src/net/i2p/phttprelay/RegisterServlet.java index 3c52fa498d8e7fbc2593579836d3bf1d232a8b77..822228b18b833ed9e94722f8e628e1e9d3bd96bc 100644 --- a/apps/phttprelay/java/src/net/i2p/phttprelay/RegisterServlet.java +++ b/apps/phttprelay/java/src/net/i2p/phttprelay/RegisterServlet.java @@ -1,4 +1,5 @@ package net.i2p.phttprelay; + /* * free (adj.): unencumbered; not under the control of others * Written by jrandom in 2003 and released into the public domain @@ -66,89 +67,92 @@ public class RegisterServlet extends PHTTPRelayServlet { /* config params */ public final static String PARAM_POLL_PATH = "pollPath"; public final static String PARAM_SEND_PATH = "sendPath"; - + /* key=val keys sent back on registration */ - public final static String PROP_STATUS = "status"; + public final static String PROP_STATUS = "status"; public final static String PROP_POLL_URL = "pollURL"; public final static String PROP_SEND_URL = "sendURL"; public final static String PROP_TIME_OFFSET = "timeOffset"; // ms (local-remote) /* values for the PROP_STATUS */ - public final static String STATUS_FAILED = "failed"; + public final static String STATUS_FAILED = "failed"; public final static String STATUS_REGISTERED = "registered"; - + public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - ServletInputStream in = req.getInputStream(); - RouterIdentity ident = new RouterIdentity(); - try { - Date remoteTime = DataHelper.readDate(in); - long skew = getSkew(remoteTime); - ident.readBytes(in); - boolean ok = registerIdent(ident); - sendURLs(req, resp, skew, ok); - } catch (DataFormatException dfe) { - log("Invalid format for router identity posted", dfe); - } finally { - in.close(); - } + ServletInputStream in = req.getInputStream(); + RouterIdentity ident = new RouterIdentity(); + try { + Date remoteTime = DataHelper.readDate(in); + long skew = getSkew(remoteTime); + ident.readBytes(in); + boolean ok = registerIdent(ident); + sendURLs(req, resp, skew, ok); + } catch (DataFormatException dfe) { + log("Invalid format for router identity posted", dfe); + } finally { + in.close(); + } } - + private long getSkew(Date remoteDate) { - if (remoteDate == null) { - log("*ERROR: remote date was null"); - return Long.MAX_VALUE; - } else { - long diff = Clock.getInstance().now() - remoteDate.getTime(); - return diff; - } + if (remoteDate == null) { + log("*ERROR: remote date was null"); + return Long.MAX_VALUE; + } else { + long diff = Clock.getInstance().now() - remoteDate.getTime(); + return diff; + } } - + private boolean registerIdent(RouterIdentity ident) throws DataFormatException, IOException { - File identDir = getIdentDir(ident.getHash().toBase64()); - boolean created = identDir.mkdirs(); - File identFile = new File(identDir, "identity.dat"); - FileOutputStream fos = null; - try { - fos = new FileOutputStream(identFile); - ident.writeBytes(fos); - } finally { - if (fos != null) try { fos.close(); } catch (IOException ioe) {} - } - log("Identity registered into " + identFile.getAbsolutePath()); - return true; + File identDir = getIdentDir(ident.getHash().toBase64()); + boolean created = identDir.mkdirs(); + File identFile = new File(identDir, "identity.dat"); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(identFile); + ident.writeBytes(fos); + } finally { + if (fos != null) try { + fos.close(); + } catch (IOException ioe) { + } + } + log("Identity registered into " + identFile.getAbsolutePath()); + return true; } - + private void sendURLs(HttpServletRequest req, HttpServletResponse resp, long skew, boolean ok) throws IOException { - ServletOutputStream out = resp.getOutputStream(); - - log("*Debug: clock skew of " + skew + "ms (local-remote)"); - - StringBuffer buf = new StringBuffer(); - if (ok) { - buf.append(PROP_POLL_URL).append("=").append(buildURL(req, _pollPath)).append("\n"); - buf.append(PROP_SEND_URL).append("=").append(buildURL(req, _sendPath)).append("\n"); - buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n"); - buf.append(PROP_STATUS).append("=").append(STATUS_REGISTERED).append("\n"); - } else { - buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n"); - buf.append(PROP_STATUS).append("=").append(STATUS_FAILED).append("\n"); - } - out.write(buf.toString().getBytes()); - out.close(); + ServletOutputStream out = resp.getOutputStream(); + + log("*Debug: clock skew of " + skew + "ms (local-remote)"); + + StringBuffer buf = new StringBuffer(); + if (ok) { + buf.append(PROP_POLL_URL).append("=").append(buildURL(req, _pollPath)).append("\n"); + buf.append(PROP_SEND_URL).append("=").append(buildURL(req, _sendPath)).append("\n"); + buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n"); + buf.append(PROP_STATUS).append("=").append(STATUS_REGISTERED).append("\n"); + } else { + buf.append(PROP_TIME_OFFSET).append("=").append(skew).append("\n"); + buf.append(PROP_STATUS).append("=").append(STATUS_FAILED).append("\n"); + } + out.write(buf.toString().getBytes()); + out.close(); } - + public void init(ServletConfig config) throws ServletException { - super.init(config); - - String pollPath = config.getInitParameter(PARAM_POLL_PATH); - if (pollPath == null) - throw new ServletException("Polling path for the registration servlet required [" + PARAM_POLL_PATH + "]"); - else - _pollPath = pollPath; - String sendPath = config.getInitParameter(PARAM_SEND_PATH); - if (sendPath == null) - throw new ServletException("Sending path for the registration servlet required [" + PARAM_SEND_PATH + "]"); - else - _sendPath = sendPath; + super.init(config); + + String pollPath = config.getInitParameter(PARAM_POLL_PATH); + if (pollPath == null) + throw new ServletException("Polling path for the registration servlet required [" + PARAM_POLL_PATH + "]"); + else + _pollPath = pollPath; + String sendPath = config.getInitParameter(PARAM_SEND_PATH); + if (sendPath == null) + throw new ServletException("Sending path for the registration servlet required [" + PARAM_SEND_PATH + "]"); + else + _sendPath = sendPath; } -} +} \ No newline at end of file diff --git a/apps/phttprelay/java/src/net/i2p/phttprelay/SendServlet.java b/apps/phttprelay/java/src/net/i2p/phttprelay/SendServlet.java index 10d7be8720ae67d55b975a04390660ea0fac3d89..a09a4258c4a9e9a85711978e43cb035753530906 100644 --- a/apps/phttprelay/java/src/net/i2p/phttprelay/SendServlet.java +++ b/apps/phttprelay/java/src/net/i2p/phttprelay/SendServlet.java @@ -1,4 +1,5 @@ package net.i2p.phttprelay; + /* * free (adj.): unencumbered; not under the control of others * Written by jrandom in 2003 and released into the public domain @@ -63,256 +64,261 @@ import javax.servlet.http.HttpServletResponse; public class SendServlet extends PHTTPRelayServlet { private String _checkPath; private int _maxMessagesPerIdent; - + /* config params */ public final static String PARAM_CHECK_PATH = "checkPath"; public final static String PARAM_MAX_MESSAGES_PER_IDENT = "maxMessagesPerIdent"; - + /* URL parameters on the send */ - + /** H(routerIdent).toBase64() of the target to receive the message */ - public final static String PARAM_SEND_TARGET = "target"; + public final static String PARAM_SEND_TARGET = "target"; /** # ms to wait for the message to be delivered before failing it */ public final static String PARAM_SEND_TIMEOUTMS = "timeoutMs"; /** # bytes to be sent in the message */ public final static String PARAM_SEND_DATA_LENGTH = "dataLength"; /** sending router's time in ms */ public final static String PARAM_SEND_TIME = "localTime"; - + /** msgId parameter to access the check path servlet with (along side PARAM_SEND_TARGET) */ public final static String PARAM_MSG_ID = "msgId"; - /* key=val keys sent back on registration */ public final static String PROP_CHECK_URL = "statusCheckURL"; public final static String PROP_STATUS = "status"; public final static String STATUS_OK = "accepted"; public final static String STATUS_UNKNOWN = "unknown"; - private final static String STATUS_CLOCKSKEW = "clockSkew_"; /** prefix for (local-remote) */ - + private final static String STATUS_CLOCKSKEW = "clockSkew_"; + + /** prefix for (local-remote) */ + public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - ServletInputStream in = req.getInputStream(); - try { - int contentLen = req.getContentLength(); - String firstLine = getFirstLine(in, contentLen); - if (firstLine == null) { - return; - } - Map params = getParameters(firstLine); - String target = (String)params.get(PARAM_SEND_TARGET); - String timeoutStr = (String)params.get(PARAM_SEND_TIMEOUTMS); - String lenStr = (String)params.get(PARAM_SEND_DATA_LENGTH); - String remoteTimeStr = (String)params.get(PARAM_SEND_TIME); - long skew = 0; - try { - long remTime = Long.parseLong(remoteTimeStr); - skew = System.currentTimeMillis() - remTime; - } catch (Throwable t) { - skew = Long.MAX_VALUE; - log("*ERROR could not parse the remote time from [" + remoteTimeStr + "]"); - } + ServletInputStream in = req.getInputStream(); + try { + int contentLen = req.getContentLength(); + String firstLine = getFirstLine(in, contentLen); + if (firstLine == null) { return; } + Map params = getParameters(firstLine); + String target = (String) params.get(PARAM_SEND_TARGET); + String timeoutStr = (String) params.get(PARAM_SEND_TIMEOUTMS); + String lenStr = (String) params.get(PARAM_SEND_DATA_LENGTH); + String remoteTimeStr = (String) params.get(PARAM_SEND_TIME); + long skew = 0; + try { + long remTime = Long.parseLong(remoteTimeStr); + skew = System.currentTimeMillis() - remTime; + } catch (Throwable t) { + skew = Long.MAX_VALUE; + log("*ERROR could not parse the remote time from [" + remoteTimeStr + "]"); + } + + log("Target [" + target + "] timeout [" + timeoutStr + "] length [" + lenStr + "] skew [" + skew + "]"); - log("Target [" + target + "] timeout [" + timeoutStr + "] length [" + lenStr + "] skew [" + skew + "]"); + if ((skew > CLOCK_FUDGE_FACTOR) || (skew < 0 - CLOCK_FUDGE_FACTOR)) { + log("Attempt to send by a skewed router: skew = " + skew + "ms (local-remote)"); + failSkewed(req, resp, skew); + } - if ( (skew > CLOCK_FUDGE_FACTOR) || (skew < 0 - CLOCK_FUDGE_FACTOR) ) { - log("Attempt to send by a skewed router: skew = " + skew + "ms (local-remote)"); - failSkewed(req, resp, skew); - } - - if (!isValidTarget(target)) { - log("Attempt to send to an invalid target [" + target + "]"); - fail(req, resp, "Unknown or invalid target"); - return; - } + if (!isValidTarget(target)) { + log("Attempt to send to an invalid target [" + target + "]"); + fail(req, resp, "Unknown or invalid target"); + return; + } - long len = -1; - try { - len = Long.parseLong(lenStr); - } catch (Throwable t) { - log("Unable to parse length parameter [" + PARAM_SEND_DATA_LENGTH + "] (" + lenStr + ")"); - fail(req, resp, "Invalid length parameter"); - return; - } + long len = -1; + try { + len = Long.parseLong(lenStr); + } catch (Throwable t) { + log("Unable to parse length parameter [" + PARAM_SEND_DATA_LENGTH + "] (" + lenStr + ")"); + fail(req, resp, "Invalid length parameter"); + return; + } - int msgId = saveFile(in, resp, target, len); - if (msgId >= 0) { - sendSuccess(req, resp, target, msgId); - } else { - fail(req, resp, "Unable to queue up the message for delivery"); - } - } finally { - try { in.close(); } catch (IOException ioe) {} - } + int msgId = saveFile(in, resp, target, len); + if (msgId >= 0) { + sendSuccess(req, resp, target, msgId); + } else { + fail(req, resp, "Unable to queue up the message for delivery"); + } + } finally { + try { + in.close(); + } catch (IOException ioe) { + } + } } - - + private String getFirstLine(ServletInputStream in, int len) throws ServletException, IOException { - StringBuffer buf = new StringBuffer(128); - int numBytes = 0; - int c = 0; - while ( (c = in.read()) != -1) { - if (c == (int)'\n') break; - buf.append((char)c); - numBytes++; - if (numBytes > 512) { - log("First line is > 512 bytes [" + buf.toString() + "]"); - return null; - } - } - log("First line: " + buf.toString()); - return buf.toString(); + StringBuffer buf = new StringBuffer(128); + int numBytes = 0; + int c = 0; + while ((c = in.read()) != -1) { + if (c == (int) '\n') break; + buf.append((char) c); + numBytes++; + if (numBytes > 512) { + log("First line is > 512 bytes [" + buf.toString() + "]"); + return null; + } + } + log("First line: " + buf.toString()); + return buf.toString(); } - + private static Map getParameters(String line) { - //StringTokenizer tok = new StringTokenizer(line, "&=", true); - Map params = new HashMap(); - while (line != null) { - String key = null; - String val = null; - int firstAmp = line.indexOf('&'); - int firstEq = line.indexOf('='); - if (firstAmp > 0) { - key = line.substring(0, firstEq); - val = line.substring(firstEq+1, firstAmp); - line = line.substring(firstAmp+1); - params.put(key, val); - } else { - line = null; - } - } - return params; + //StringTokenizer tok = new StringTokenizer(line, "&=", true); + Map params = new HashMap(); + while (line != null) { + String key = null; + String val = null; + int firstAmp = line.indexOf('&'); + int firstEq = line.indexOf('='); + if (firstAmp > 0) { + key = line.substring(0, firstEq); + val = line.substring(firstEq + 1, firstAmp); + line = line.substring(firstAmp + 1); + params.put(key, val); + } else { + line = null; + } + } + return params; } - + private boolean isValidTarget(String target) throws IOException { - File identDir = getIdentDir(target); - if (identDir.exists()) { - File identFile = new File(identDir, "identity.dat"); - if (identFile.exists()) { - // known and valid (maybe we need to check the file format... naw, fuck it - String files[] = identDir.list(); - // we skip 1 because of identity.dat - if (files.length -1 > _maxMessagesPerIdent) { - log("Too many messages pending for " + target + ": " + (files.length-1)); - return false; - } else { - return true; - } - } else { - log("Ident directory exists, but identity does not... corrupt for " + target); - return false; - } - } else { - log("Unknown ident " + target); - return false; - } + File identDir = getIdentDir(target); + if (identDir.exists()) { + File identFile = new File(identDir, "identity.dat"); + if (identFile.exists()) { + // known and valid (maybe we need to check the file format... naw, fuck it + String files[] = identDir.list(); + // we skip 1 because of identity.dat + if (files.length - 1 > _maxMessagesPerIdent) { + log("Too many messages pending for " + target + ": " + (files.length - 1)); + return false; + } else { + return true; + } + } else { + log("Ident directory exists, but identity does not... corrupt for " + target); + return false; + } + } else { + log("Unknown ident " + target); + return false; + } } - + private int saveFile(InputStream in, HttpServletResponse resp, String target, long len) throws IOException { - File identDir = getIdentDir(target); - if (!identDir.exists()) return -1; - try { - LockManager.lockIdent(target); - int i = 0; - while (true) { - File curFile = new File(identDir, "msg" + i + ".dat"); - if (!curFile.exists()) { - boolean ok = writeFile(curFile, in, len); - if (ok) - return i; - else - return -1; - } - i++; - continue; - } - } finally { - LockManager.unlockIdent(target); - } + File identDir = getIdentDir(target); + if (!identDir.exists()) return -1; + try { + LockManager.lockIdent(target); + int i = 0; + while (true) { + File curFile = new File(identDir, "msg" + i + ".dat"); + if (!curFile.exists()) { + boolean ok = writeFile(curFile, in, len); + if (ok) + return i; + else + return -1; + } + i++; + continue; + } + } finally { + LockManager.unlockIdent(target); + } } - + private boolean writeFile(File file, InputStream in, long len) throws IOException { - long remaining = len; - FileOutputStream fos = null; - try { - fos = new FileOutputStream(file); - byte buf[] = new byte[4096]; - while (remaining > 0) { - int read = in.read(buf); - if (read == -1) - break; - remaining -= read; - if (read > 0) - fos.write(buf, 0, read); - } - } finally { - if (fos != null) { - try { fos.close(); } catch (IOException ioe) {} - } - if (remaining != 0) { - log("Invalid remaining bytes [" + remaining + " out of " + len + "] - perhaps message was cancelled partway through delivery? deleting " + file.getAbsolutePath()); - boolean deleted = file.delete(); - if (!deleted) - log("!!!Error deleting temporary file " + file.getAbsolutePath()); - return false; - } - } - return true; + long remaining = len; + FileOutputStream fos = null; + try { + fos = new FileOutputStream(file); + byte buf[] = new byte[4096]; + while (remaining > 0) { + int read = in.read(buf); + if (read == -1) break; + remaining -= read; + if (read > 0) fos.write(buf, 0, read); + } + } finally { + if (fos != null) { + try { + fos.close(); + } catch (IOException ioe) { + } + } + if (remaining != 0) { + log("Invalid remaining bytes [" + remaining + " out of " + len + + "] - perhaps message was cancelled partway through delivery? deleting " + file.getAbsolutePath()); + boolean deleted = file.delete(); + if (!deleted) log("!!!Error deleting temporary file " + file.getAbsolutePath()); + return false; + } + } + return true; } - - private void sendSuccess(HttpServletRequest req, HttpServletResponse resp, String target, int msgId) throws IOException { - ServletOutputStream out = resp.getOutputStream(); - StringBuffer buf = new StringBuffer(); - buf.append(PROP_STATUS).append('=').append(STATUS_OK).append('\n'); - buf.append(PROP_CHECK_URL).append('=').append(buildURL(req, _checkPath)); - buf.append('?'); - buf.append(PARAM_SEND_TARGET).append('=').append(target).append("&"); - buf.append(PARAM_MSG_ID).append('=').append(msgId).append("\n"); - out.write(buf.toString().getBytes()); - out.flush(); + + private void sendSuccess(HttpServletRequest req, HttpServletResponse resp, String target, int msgId) + throws IOException { + ServletOutputStream out = resp.getOutputStream(); + StringBuffer buf = new StringBuffer(); + buf.append(PROP_STATUS).append('=').append(STATUS_OK).append('\n'); + buf.append(PROP_CHECK_URL).append('=').append(buildURL(req, _checkPath)); + buf.append('?'); + buf.append(PARAM_SEND_TARGET).append('=').append(target).append("&"); + buf.append(PARAM_MSG_ID).append('=').append(msgId).append("\n"); + out.write(buf.toString().getBytes()); + out.flush(); } - + private void fail(HttpServletRequest req, HttpServletResponse resp, String err) throws IOException { - ServletOutputStream out = resp.getOutputStream(); - StringBuffer buf = new StringBuffer(); - buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n'); - out.write(buf.toString().getBytes()); - out.flush(); + ServletOutputStream out = resp.getOutputStream(); + StringBuffer buf = new StringBuffer(); + buf.append(PROP_STATUS).append('=').append(STATUS_UNKNOWN).append('\n'); + out.write(buf.toString().getBytes()); + out.flush(); } - + private void failSkewed(HttpServletRequest req, HttpServletResponse resp, long skew) throws IOException { - ServletOutputStream out = resp.getOutputStream(); - StringBuffer buf = new StringBuffer(); - buf.append(PROP_STATUS).append('=').append(STATUS_CLOCKSKEW).append(skew).append('\n'); - out.write(buf.toString().getBytes()); - out.flush(); + ServletOutputStream out = resp.getOutputStream(); + StringBuffer buf = new StringBuffer(); + buf.append(PROP_STATUS).append('=').append(STATUS_CLOCKSKEW).append(skew).append('\n'); + out.write(buf.toString().getBytes()); + out.flush(); } - + public void init(ServletConfig config) throws ServletException { - super.init(config); - - String checkPath = config.getInitParameter(PARAM_CHECK_PATH); - if (checkPath == null) - throw new ServletException("Check status path for the sending servlet required [" + PARAM_CHECK_PATH + "]"); - else - _checkPath = checkPath; - - String maxMessagesPerIdentStr = config.getInitParameter(PARAM_MAX_MESSAGES_PER_IDENT); - if (maxMessagesPerIdentStr == null) - throw new ServletException("Max messages per ident for the sending servlet required [" + PARAM_MAX_MESSAGES_PER_IDENT + "]"); - try { - _maxMessagesPerIdent = Integer.parseInt(maxMessagesPerIdentStr); - } catch (Throwable t) { - throw new ServletException("Valid max messages per ident for the sending servlet required [" + PARAM_MAX_MESSAGES_PER_IDENT + "]"); - } + super.init(config); + + String checkPath = config.getInitParameter(PARAM_CHECK_PATH); + if (checkPath == null) + throw new ServletException("Check status path for the sending servlet required [" + PARAM_CHECK_PATH + "]"); + else + _checkPath = checkPath; + + String maxMessagesPerIdentStr = config.getInitParameter(PARAM_MAX_MESSAGES_PER_IDENT); + if (maxMessagesPerIdentStr == null) + throw new ServletException("Max messages per ident for the sending servlet required [" + + PARAM_MAX_MESSAGES_PER_IDENT + "]"); + try { + _maxMessagesPerIdent = Integer.parseInt(maxMessagesPerIdentStr); + } catch (Throwable t) { + throw new ServletException("Valid max messages per ident for the sending servlet required [" + + PARAM_MAX_MESSAGES_PER_IDENT + "]"); + } } - + public static void main(String args[]) { - String line = "target=pp0ARjQiB~IKC-0FsMUsPEMrwR3gxVBZGRYfEr1IzHI=&timeoutMs=52068&dataLength=2691&"; - Map props = getParameters(line); - for (java.util.Iterator iter = props.keySet().iterator(); iter.hasNext(); ) { - String key = (String)iter.next(); - String val = (String)props.get(key); - System.out.println("[" + key + "] = [" + val + "]"); - } + String line = "target=pp0ARjQiB~IKC-0FsMUsPEMrwR3gxVBZGRYfEr1IzHI=&timeoutMs=52068&dataLength=2691&"; + Map props = getParameters(line); + for (java.util.Iterator iter = props.keySet().iterator(); iter.hasNext();) { + String key = (String) iter.next(); + String val = (String) props.get(key); + System.out.println("[" + key + "] = [" + val + "]"); + } } -} +} \ No newline at end of file diff --git a/apps/tests/echotester/BasicEchoTestAnalyzer.java b/apps/tests/echotester/BasicEchoTestAnalyzer.java index 5bab10e8ca20df19d0e030b952da4fa86270a96c..577db8feb06777541738433eadacd8891fef3c38 100644 --- a/apps/tests/echotester/BasicEchoTestAnalyzer.java +++ b/apps/tests/echotester/BasicEchoTestAnalyzer.java @@ -8,87 +8,81 @@ public class BasicEchoTestAnalyzer implements EchoTestAnalyzer { * printed. Default is every 20 events. */ private static int REPORT_DELAY = 20; - + private static int SUMMARY_SIZE = 100; public BasicEchoTestAnalyzer() { - this(20, 100); + this(20, 100); } - + public BasicEchoTestAnalyzer(int reportDelay, int summarySize) { - REPORT_DELAY = reportDelay; - SUMMARY_SIZE = summarySize; + REPORT_DELAY = reportDelay; + SUMMARY_SIZE = summarySize; } - - private int events = 0, - packetLosses = 0, - packetLossesDisconnect=0, - disconnects = 0, - disconnectsRefused = 0, - delayCount=0, - lastDelayPtr = 0; - private long minDelay=Long.MAX_VALUE, maxDelay = 0, delaySum=0; + + private int events = 0, packetLosses = 0, packetLossesDisconnect = 0, disconnects = 0, disconnectsRefused = 0, + delayCount = 0, lastDelayPtr = 0; + private long minDelay = Long.MAX_VALUE, maxDelay = 0, delaySum = 0; private long[] lastDelays = new long[SUMMARY_SIZE]; - - + public synchronized void packetLossOccurred(boolean beforeDisconnect) { - System.out.println("1: Packet lost"+ - (beforeDisconnect?" before disconnect":"")+ - "."); - packetLosses++; - if (beforeDisconnect) packetLossesDisconnect++; - countEvent(); + System.out.println("1: Packet lost" + (beforeDisconnect ? " before disconnect" : "") + "."); + packetLosses++; + if (beforeDisconnect) packetLossesDisconnect++; + countEvent(); } public synchronized void successOccurred(long delay) { - System.out.println("0: Delay = "+delay); - if (delay > maxDelay) maxDelay=delay; - if (delay < minDelay) minDelay=delay; - delaySum+=delay; - delayCount++; - lastDelays[lastDelayPtr++]=delay; - lastDelayPtr%=SUMMARY_SIZE; - countEvent(); + System.out.println("0: Delay = " + delay); + if (delay > maxDelay) maxDelay = delay; + if (delay < minDelay) minDelay = delay; + delaySum += delay; + delayCount++; + lastDelays[lastDelayPtr++] = delay; + lastDelayPtr %= SUMMARY_SIZE; + countEvent(); } public synchronized void disconnected(boolean refused) { - System.out.println("2: Disconnected"+ - (refused?" (connection refused)":"")+ - "."); - disconnects++; - if (refused) disconnectsRefused++; - countEvent(); + System.out.println("2: Disconnected" + (refused ? " (connection refused)" : "") + "."); + disconnects++; + if (refused) disconnectsRefused++; + countEvent(); } private void countEvent() { - events++; - if (events % REPORT_DELAY == 0) { - int packets = packetLosses+delayCount; - long delaySummary=0; - for (int i=0;i<SUMMARY_SIZE;i++) { - delaySummary+=lastDelays[i]; - } - System.out.println - ("++++++++++++++++ ECHO STATISTICS +++++++++++++++++++++++++"+ - "\n++ Number of total echo messages: "+packets+ - "\n++ No response for "+packetLosses+ - "\n++ (of which "+ packetLossesDisconnect+ - " due to a disconnect)"+ - "\n++ Disconnects: "+disconnects+ - "\n++ (of which "+disconnectsRefused+ - " due to 'connection refused')"+ - (disconnects>0 || true - ?"\n++ Average lost packets per disconnect: "+ - (packetLossesDisconnect/(float)disconnects) - :"")+ - "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++"+ - "\n++ Minimal delay: "+minDelay+ - "\n++ Average delay: "+(delaySum/(float)delayCount)+ - "\n++ Maximal delay: "+maxDelay+ - (delayCount >=SUMMARY_SIZE - ?"\n++ Average delay over last " + SUMMARY_SIZE + ": "+(delaySummary/(float)SUMMARY_SIZE) - :"")+ - "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); - } + events++; + if (events % REPORT_DELAY == 0) { + int packets = packetLosses + delayCount; + long delaySummary = 0; + for (int i = 0; i < SUMMARY_SIZE; i++) { + delaySummary += lastDelays[i]; + } + System.out.println("++++++++++++++++ ECHO STATISTICS +++++++++++++++++++++++++" + + "\n++ Number of total echo messages: " + + packets + + "\n++ No response for " + + packetLosses + + "\n++ (of which " + + packetLossesDisconnect + + " due to a disconnect)" + + "\n++ Disconnects: " + + disconnects + + "\n++ (of which " + + disconnectsRefused + + " due to 'connection refused')" + + (disconnects > 0 || true ? "\n++ Average lost packets per disconnect: " + + (packetLossesDisconnect / (float) disconnects) : "") + + "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++" + + "\n++ Minimal delay: " + + minDelay + + "\n++ Average delay: " + + (delaySum / (float) delayCount) + + "\n++ Maximal delay: " + + maxDelay + + (delayCount >= SUMMARY_SIZE ? "\n++ Average delay over last " + SUMMARY_SIZE + ": " + + (delaySummary / (float) SUMMARY_SIZE) : "") + + "\n++++++++++++++++++++++++++++++++++++++++++++++++++++++++"); + } } -} +} \ No newline at end of file diff --git a/apps/tests/echotester/EchoTestAnalyzer.java b/apps/tests/echotester/EchoTestAnalyzer.java index 3067a1ba7f1758156925770157c5002f36e8066f..96c5f715b7bf38da16ddbf2d1cc29155fda9f63a 100644 --- a/apps/tests/echotester/EchoTestAnalyzer.java +++ b/apps/tests/echotester/EchoTestAnalyzer.java @@ -15,5 +15,3 @@ public interface EchoTestAnalyzer { } - - diff --git a/apps/tests/echotester/EchoTester.java b/apps/tests/echotester/EchoTester.java index d2344848fdbba91bd8c5db18023753311d8f3a90..1fc0607d720ffbb6ff527adb0c58216d82e5217b 100644 --- a/apps/tests/echotester/EchoTester.java +++ b/apps/tests/echotester/EchoTester.java @@ -26,152 +26,142 @@ public class EchoTester extends Thread { /** * How long to wait between packets. Default is 6 seconds. */ - private static long PACKET_DELAY= 6000; + private static long PACKET_DELAY = 6000; /** * How many packets may be on the way before the connection is * seen as "broken" and disconnected. */ - private static final long MAX_PACKETS_QUEUED=50; // unused - - + private static final long MAX_PACKETS_QUEUED = 50; // unused + private EchoTestAnalyzer eta; private String host; private int port; // the following vars are synchronized via the lock. private Object lock = new Object(); - private long nextPacket=0; - private long nextUnreceived=0; - private boolean readerRunning=false; + private long nextPacket = 0; + private long nextUnreceived = 0; + private boolean readerRunning = false; public static void main(String[] args) { - if (args.length == 3) - PACKET_DELAY = Long.parseLong(args[2]); - new EchoTester(args[0], Integer.parseInt(args[1]), - new BasicEchoTestAnalyzer()); + if (args.length == 3) PACKET_DELAY = Long.parseLong(args[2]); + new EchoTester(args[0], Integer.parseInt(args[1]), new BasicEchoTestAnalyzer()); } - + public EchoTester(String host, int port, EchoTestAnalyzer eta) { - this.eta=eta; - this.host=host; - this.port=port; - start(); + this.eta = eta; + this.host = host; + this.port = port; + start(); } public void run() { - try { - while (true) { - Socket s; - try { - s = new Socket(host, port); - } catch (ConnectException ex) { - eta.disconnected(true); - Thread.sleep(PACKET_DELAY); - continue; - } - System.out.println("41: Connected to "+host+":"+port); - synchronized(lock) { - nextUnreceived=nextPacket; - } - Thread t = new ResponseReaderThread(s); - Writer w = new BufferedWriter(new OutputStreamWriter - (s.getOutputStream())); - while (true) { - long no; - synchronized(lock) { - no = nextPacket++; - } - try { - w.write(no+" "+System.currentTimeMillis()+"\n"); - w.flush(); - } catch (SocketException ex) { - break; - } - Thread.sleep(PACKET_DELAY); - } - s.close(); - t.join(); - synchronized(lock) { - if (readerRunning) { - System.out.println("*** WHY IS THIS THREAD STILL"+ - " RUNNING?"); - } - while (nextUnreceived < nextPacket) { - nextUnreceived++; - eta.packetLossOccurred(true); - } - if (nextUnreceived > nextPacket) { - System.out.println("*** WTF? "+nextUnreceived+" > "+ - nextPacket); - } - } - eta.disconnected(false); - } - } catch (InterruptedException ex) { - ex.printStackTrace(); - System.exit(1); // treat these errors as fatal - } catch (IOException ex) { - ex.printStackTrace(); - System.exit(1); // treat these errors as fatal - } + try { + while (true) { + Socket s; + try { + s = new Socket(host, port); + } catch (ConnectException ex) { + eta.disconnected(true); + Thread.sleep(PACKET_DELAY); + continue; + } + System.out.println("41: Connected to " + host + ":" + port); + synchronized (lock) { + nextUnreceived = nextPacket; + } + Thread t = new ResponseReaderThread(s); + Writer w = new BufferedWriter(new OutputStreamWriter(s.getOutputStream())); + while (true) { + long no; + synchronized (lock) { + no = nextPacket++; + } + try { + w.write(no + " " + System.currentTimeMillis() + "\n"); + w.flush(); + } catch (SocketException ex) { + break; + } + Thread.sleep(PACKET_DELAY); + } + s.close(); + t.join(); + synchronized (lock) { + if (readerRunning) { + System.out.println("*** WHY IS THIS THREAD STILL" + " RUNNING?"); + } + while (nextUnreceived < nextPacket) { + nextUnreceived++; + eta.packetLossOccurred(true); + } + if (nextUnreceived > nextPacket) { + System.out.println("*** WTF? " + nextUnreceived + " > " + nextPacket); + } + } + eta.disconnected(false); + } + } catch (InterruptedException ex) { + ex.printStackTrace(); + System.exit(1); // treat these errors as fatal + } catch (IOException ex) { + ex.printStackTrace(); + System.exit(1); // treat these errors as fatal + } } private class ResponseReaderThread extends Thread { - private Socket s; + private Socket s; - public ResponseReaderThread(Socket s) { - this.s=s; - synchronized(lock) { - readerRunning=true; - } - start(); - } + public ResponseReaderThread(Socket s) { + this.s = s; + synchronized (lock) { + readerRunning = true; + } + start(); + } - public void run() { - try { - BufferedReader br = new BufferedReader(new InputStreamReader - (s.getInputStream())); - String line; - int index; - while ((line=br.readLine()) != null) { - if ((index=line.indexOf(" ")) == -1) - continue; - long now, packetNumber, packetTime; - now = System.currentTimeMillis(); - try { - packetNumber = Long.parseLong - (line.substring(0,index)); - packetTime = Long.parseLong - (line.substring(index+1)); - } catch (NumberFormatException ex) { - System.out.println(ex.toString()); - continue; - } - synchronized (lock) { - while (packetNumber > nextUnreceived) { - nextUnreceived++; - eta.packetLossOccurred(false); - } - if (nextUnreceived > packetNumber) { - System.out.println("*** DOUBLE PACKET!"); - } else { - nextUnreceived++; - } - } - eta.successOccurred(now-packetTime); - } - } catch (SocketException ex) { - // ignore - } catch (IOException ex) { - ex.printStackTrace(); - System.exit(0); - } - synchronized(lock) { - readerRunning=false; - } - } + public void run() { + try { + BufferedReader br = new BufferedReader(new InputStreamReader(s.getInputStream())); + String line; + int index; + while ((line = br.readLine()) != null) { + if ((index = line.indexOf(" ")) == -1) continue; + long now, packetNumber, packetTime; + now = System.currentTimeMillis(); + try { + packetNumber = Long.parseLong(line.substring(0, index)); + packetTime = Long.parseLong(line.substring(index + 1)); + } catch (NumberFormatException ex) { + System.out.println(ex.toString()); + continue; + } + synchronized (lock) { + while (packetNumber > nextUnreceived) { + nextUnreceived++; + eta.packetLossOccurred(false); + } + if (nextUnreceived > packetNumber) { + System.out.println("*** DOUBLE PACKET!"); + } else { + nextUnreceived++; + } + } + eta.successOccurred(now - packetTime); + } + } catch (SocketException ex) { + // ignore + } catch (IOException ex) { + ex.printStackTrace(); + System.exit(0); + } + synchronized (lock) { + readerRunning = false; + } + } } -} +} \ No newline at end of file