From aef33548b35848b46991ab14cf8ddde142c982e1 Mon Sep 17 00:00:00 2001 From: jrandom Date: Mon, 26 Sep 2005 23:45:52 +0000 Subject: [PATCH] 2005-09-26 jrandom * Reworded the SSU introductions config section (thanks duck!) * Force identity content encoding for I2PTunnel httpserver requests (thanks redzara!) * Further x-i2p-gzip bugfixes for the end of streams * Reduce the minimum bandwidth limits to 3KBps steady and burst (though I2P's performance at 3KBps is another issue) * Cleaned up some streaming lib structures --- .../i2ptunnel/HTTPResponseOutputStream.java | 38 +++++++---- .../i2ptunnel/I2PTunnelHTTPClientRunner.java | 33 ++++++++-- .../i2p/i2ptunnel/I2PTunnelHTTPServer.java | 5 +- .../net/i2p/i2ptunnel/I2PTunnelRunner.java | 28 +++++++-- apps/routerconsole/jsp/config.jsp | 10 ++- .../net/i2p/client/streaming/Connection.java | 24 +++---- .../client/streaming/ConnectionHandler.java | 2 +- .../client/streaming/ConnectionManager.java | 63 ++++++++----------- .../streaming/ConnectionPacketHandler.java | 16 ++--- .../src/net/i2p/client/streaming/Packet.java | 59 +++++------------ .../i2p/client/streaming/PacketHandler.java | 46 ++++---------- .../i2p/client/streaming/SchedulerClosed.java | 2 +- .../i2p/client/streaming/SchedulerDead.java | 2 +- .../client/streaming/SchedulerPreconnect.java | 2 +- .../client/streaming/SchedulerReceived.java | 2 +- history.txt | 11 +++- .../src/net/i2p/router/RouterVersion.java | 4 +- .../transport/FIFOBandwidthLimiter.java | 2 +- .../transport/FIFOBandwidthRefiller.java | 20 +++--- .../router/transport/udp/UDPTransport.java | 3 +- 20 files changed, 193 insertions(+), 179 deletions(-) diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java index 9400efd3e..16a2ee34b 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java @@ -37,6 +37,7 @@ class HTTPResponseOutputStream extends FilterOutputStream { private byte _buf1[]; protected boolean _gzip; private long _dataWritten; + private InternalGZIPInputStream _in; private static final int CACHE_SIZE = 8*1024; public HTTPResponseOutputStream(OutputStream raw) { @@ -199,7 +200,7 @@ class HTTPResponseOutputStream extends FilterOutputStream { } protected void beginProcessing() throws IOException { - out.flush(); + //out.flush(); PipedInputStream pi = new PipedInputStream(); PipedOutputStream po = new PipedOutputStream(pi); new I2PThread(new Pusher(pi, out), "HTTP decompresser").start(); @@ -207,22 +208,22 @@ class HTTPResponseOutputStream extends FilterOutputStream { } private class Pusher implements Runnable { - private InputStream _in; + private InputStream _inRaw; private OutputStream _out; public Pusher(InputStream in, OutputStream out) { - _in = in; + _inRaw = in; _out = out; } public void run() { OutputStream to = null; - InternalGZIPInputStream in = null; + _in = null; long start = System.currentTimeMillis(); long written = 0; try { - in = new InternalGZIPInputStream(_in); + _in = new InternalGZIPInputStream(_inRaw); byte buf[] = new byte[8192]; int read = -1; - while ( (read = in.read(buf)) != -1) { + while ( (read = _in.read(buf)) != -1) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Read " + read + " and writing it to the browser/streams"); _out.write(buf, 0, read); @@ -230,16 +231,22 @@ class HTTPResponseOutputStream extends FilterOutputStream { written += read; } if (_log.shouldLog(Log.INFO)) - _log.info("Decompressed: " + written + ", " + in.getTotalRead() + "/" + in.getTotalExpanded()); + _log.info("Decompressed: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded()); } catch (IOException ioe) { if (_log.shouldLog(Log.WARN)) - _log.warn("Error decompressing: " + written + ", " + in.getTotalRead() + "/" + in.getTotalExpanded(), ioe); + _log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded(), ioe); } finally { - if (_out != null) try { _out.close(); } catch (IOException ioe) {} + if (_log.shouldLog(Log.WARN)) + _log.warn("After decompression, written=" + written + " read=" + _in.getTotalRead() + + ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining() + + ", finished=" + _in.getFinished()); + if (_out != null) try { + _out.close(); + } catch (IOException ioe) {} } long end = System.currentTimeMillis(); - double compressed = in.getTotalRead(); - double expanded = in.getTotalExpanded(); + double compressed = _in.getTotalRead(); + double expanded = _in.getTotalExpanded(); double ratio = 0; if (expanded > 0) ratio = compressed/expanded; @@ -255,6 +262,15 @@ class HTTPResponseOutputStream extends FilterOutputStream { } public long getTotalRead() { return super.inf.getTotalIn(); } public long getTotalExpanded() { return super.inf.getTotalOut(); } + public long getRemaining() { return super.inf.getRemaining(); } + public boolean getFinished() { return super.inf.finished(); } + public String toString() { + return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished(); + } + } + + public String toString() { + return super.toString() + ": " + _in; } public static void main(String args[]) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java index 7688d4103..8c442bcc6 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java @@ -3,16 +3,13 @@ */ package net.i2p.i2ptunnel; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.FilterOutputStream; +import java.io.*; import java.net.Socket; import java.util.Iterator; import java.util.List; import java.util.Properties; +import net.i2p.I2PAppContext; import net.i2p.client.streaming.I2PSocket; import net.i2p.data.ByteArray; import net.i2p.data.DataHelper; @@ -30,12 +27,38 @@ import net.i2p.util.Log; * */ public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner { + private Log _log; public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) { super(s, i2ps, slock, initialI2PData, sockList, onTimeout); + _log = I2PAppContext.getGlobalContext().logManager().getLog(I2PTunnelHTTPClientRunner.class); } protected OutputStream getSocketOut() throws IOException { OutputStream raw = super.getSocketOut(); return new HTTPResponseOutputStream(raw); } + + protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException { + try { + i2pin.close(); + i2pout.close(); + } catch (IOException ioe) { + // ignore + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Unable to close the i2p socket output stream: " + i2pout, ioe); + } + try { + in.close(); + out.close(); + } catch (IOException ioe) { + // ignore + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Unable to close the browser output stream: " + out, ioe); + } + i2ps.close(); + s.close(); + t1.join(30*1000); + t2.join(30*1000); + } + } diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java index b7363fcda..abccaa553 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java @@ -71,6 +71,10 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { if ( (_spoofHost != null) && (_spoofHost.trim().length() > 0) ) headers.setProperty("Host", _spoofHost); headers.setProperty("Connection", "close"); + // we keep the enc sent by the browser before clobbering it, since it may have + // been x-i2p-gzip + String enc = headers.getProperty("Accept-encoding"); + headers.setProperty("Accept-encoding", "identity;q=1, *;q=0"); String modifiedHeader = formatHeaders(headers, command); //String modifiedHeader = getModifiedHeader(socket); @@ -91,7 +95,6 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer { if ( (val != null) && (!Boolean.valueOf(val).booleanValue()) ) allowGZIP = false; } - String enc = headers.getProperty("Accept-encoding"); if (_log.shouldLog(Log.INFO)) _log.info("HTTP server encoding header: " + enc); if ( allowGZIP && (enc != null) && (enc.indexOf("x-i2p-gzip") >= 0) ) { diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java index c26eccf47..d3a8973d2 100644 --- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java +++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java @@ -153,11 +153,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL onTimeout.run(); } - // now one connection is dead - kill the other as well. - s.close(); - i2ps.close(); - t1.join(30*1000); - t2.join(30*1000); + // now one connection is dead - kill the other as well, after making sure we flush + close(out, in, i2pout, i2pin, s, i2ps, t1, t2); } catch (InterruptedException ex) { if (_log.shouldLog(Log.ERROR)) _log.error("Interrupted", ex); @@ -188,6 +185,27 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL } } + protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException { + try { + out.flush(); + } catch (IOException ioe) { + // ignore + } + try { + i2pout.flush(); + } catch (IOException ioe) { + // ignore + } + in.close(); + i2pin.close(); + // ok, yeah, there's a race here in theory, if data comes in after flushing and before + // closing, but its better than before... + s.close(); + i2ps.close(); + t1.join(30*1000); + t2.join(30*1000); + } + public void errorOccurred() { synchronized (finishLock) { finished = true; diff --git a/apps/routerconsole/jsp/config.jsp b/apps/routerconsole/jsp/config.jsp index e500a20dd..3cd245558 100644 --- a/apps/routerconsole/jsp/config.jsp +++ b/apps/routerconsole/jsp/config.jsp @@ -29,10 +29,14 @@ External UDP address:
- Require SSU introductions through NAT hole punching? + Require SSU introductions? />
-

If you can't poke a hole in your NAT or firewall to allow unsolicited UDP packets to reach the - router, as detected with the Status: ERR-Reject, then you will need SSU introductions. +

If you can, please poke a hole in your NAT or firewall to allow unsolicited UDP packets to reach + you on your external UDP address. If you can't, I2P now includes supports UDP hole punching + with "SSU introductions" - peers who will relay a request from someone you don't know to your + router for your router so that you can make an outbound connection to them. I2P will use these + introductions automatically if it detects that the port is not forwarded (as shown by + the Status: OK (NAT) line), or you can manually require them here. Users behind symmetric NATs, such as OpenBSD's pf, are not currently supported.


diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 9d6f53c13..4980d2c9d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -24,8 +24,8 @@ public class Connection { private Log _log; private ConnectionManager _connectionManager; private Destination _remotePeer; - private byte _sendStreamId[]; - private byte _receiveStreamId[]; + private long _sendStreamId; + private long _receiveStreamId; private long _lastSendTime; private long _lastSendId; private boolean _resetReceived; @@ -205,7 +205,7 @@ public class Connection { _resetSent = true; if (_resetSentOn <= 0) _resetSentOn = _context.clock().now(); - if ( (_remotePeer == null) || (_sendStreamId == null) ) return; + if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return; PacketLocal reply = new PacketLocal(_context, _remotePeer); reply.setFlag(Packet.FLAG_RESET); reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); @@ -521,12 +521,12 @@ public class Connection { public void setRemotePeer(Destination peer) { _remotePeer = peer; } /** what stream do we send data to the peer on? */ - public byte[] getSendStreamId() { return _sendStreamId; } - public void setSendStreamId(byte[] id) { _sendStreamId = id; } + public long getSendStreamId() { return _sendStreamId; } + public void setSendStreamId(long id) { _sendStreamId = id; } /** stream the peer sends data to us on. (may be null) */ - public byte[] getReceiveStreamId() { return _receiveStreamId; } - public void setReceiveStreamId(byte[] id) { + public long getReceiveStreamId() { return _receiveStreamId; } + public void setReceiveStreamId(long id) { _receiveStreamId = id; synchronized (_connectLock) { _connectLock.notifyAll(); } } @@ -653,7 +653,7 @@ public class Connection { void waitForConnect() { long expiration = _context.clock().now() + _options.getConnectTimeout(); while (true) { - if (_connected && (_receiveStreamId != null) && (_sendStreamId != null) ) { + if (_connected && (_receiveStreamId > 0) && (_sendStreamId > 0) ) { // w00t if (_log.shouldLog(Log.DEBUG)) _log.debug("waitForConnect(): Connected and we have stream IDs"); @@ -793,13 +793,13 @@ public class Connection { public String toString() { StringBuffer buf = new StringBuffer(128); buf.append("[Connection "); - if (_receiveStreamId != null) - buf.append(Base64.encode(_receiveStreamId)); + if (_receiveStreamId > 0) + buf.append(Packet.toId(_receiveStreamId)); else buf.append("unknown"); buf.append("<-->"); - if (_sendStreamId != null) - buf.append(Base64.encode(_sendStreamId)); + if (_sendStreamId > 0) + buf.append(Packet.toId(_sendStreamId)); else buf.append("unknown"); buf.append(" wsize: ").append(_options.getWindowSize()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 2deb203d3..471cc97a8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -127,7 +127,7 @@ class ConnectionHandler { reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setAckThrough(packet.getSequenceNum()); reply.setSendStreamId(packet.getReceiveStreamId()); - reply.setReceiveStreamId(null); + reply.setReceiveStreamId(0); reply.setOptionalFrom(_manager.getSession().getMyDestination()); if (_log.shouldLog(Log.DEBUG)) _log.debug("Sending RST: " + reply + " because of " + packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index 9da1b4aec..4670033be 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -31,9 +31,9 @@ public class ConnectionManager { private PacketQueue _outboundQueue; private SchedulerChooser _schedulerChooser; private ConnectionPacketHandler _conPacketHandler; - /** Inbound stream ID (ByteArray) to Connection map */ + /** Inbound stream ID (Long) to Connection map */ private Map _connectionByInboundId; - /** Ping ID (ByteArray) to PingRequest */ + /** Ping ID (Long) to PingRequest */ private Map _pendingPings; private boolean _allowIncoming; private int _maxConcurrentStreams; @@ -71,16 +71,16 @@ public class ConnectionManager { _context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 }); } - Connection getConnectionByInboundId(byte[] id) { + Connection getConnectionByInboundId(long id) { synchronized (_connectionLock) { - return (Connection)_connectionByInboundId.get(new ByteArray(id)); + return (Connection)_connectionByInboundId.get(new Long(id)); } } /** * not guaranteed to be unique, but in case we receive more than one packet * on an inbound connection that we havent ack'ed yet... */ - Connection getConnectionByOutboundId(byte[] id) { + Connection getConnectionByOutboundId(long id) { synchronized (_connectionLock) { for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); @@ -107,8 +107,7 @@ public class ConnectionManager { */ public Connection receiveConnection(Packet synPacket) { Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions)); - byte receiveId[] = new byte[4]; - _context.random().nextBytes(receiveId); + long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; boolean reject = false; int active = 0; int total = 0; @@ -122,16 +121,13 @@ public class ConnectionManager { reject = true; } else { while (true) { - ByteArray ba = new ByteArray(receiveId); - Connection oldCon = (Connection)_connectionByInboundId.put(ba, con); + Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con); if (oldCon == null) { break; } else { - _connectionByInboundId.put(ba, oldCon); + _connectionByInboundId.put(new Long(receiveId), oldCon); // receiveId already taken, try another - // (need to realloc receiveId, as ba.getData() points to the old value) - receiveId = new byte[4]; - _context.random().nextBytes(receiveId); + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; } } } @@ -148,7 +144,7 @@ public class ConnectionManager { reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); reply.setAckThrough(synPacket.getSequenceNum()); reply.setSendStreamId(synPacket.getReceiveStreamId()); - reply.setReceiveStreamId(null); + reply.setReceiveStreamId(0); reply.setOptionalFrom(_session.getMyDestination()); // this just sends the packet - no retries or whatnot _outboundQueue.enqueue(reply); @@ -160,7 +156,7 @@ public class ConnectionManager { con.getPacketHandler().receivePacket(synPacket, con); } catch (I2PException ie) { synchronized (_connectionLock) { - _connectionByInboundId.remove(new ByteArray(receiveId)); + _connectionByInboundId.remove(new Long(receiveId)); } return null; } @@ -179,8 +175,7 @@ public class ConnectionManager { */ public Connection connect(Destination peer, ConnectionOptions opts) { Connection con = null; - byte receiveId[] = new byte[4]; - _context.random().nextBytes(receiveId); + long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; long expiration = _context.clock().now() + opts.getConnectTimeout(); if (opts.getConnectTimeout() <= 0) expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX; @@ -213,11 +208,10 @@ public class ConnectionManager { con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); con.setRemotePeer(peer); - ByteArray ba = new ByteArray(receiveId); - while (_connectionByInboundId.containsKey(ba)) { - _context.random().nextBytes(receiveId); + while (_connectionByInboundId.containsKey(new Long(receiveId))) { + receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1; } - _connectionByInboundId.put(ba, con); + _connectionByInboundId.put(new Long(receiveId), con); break; // stop looping as a psuedo-wait } } @@ -284,7 +278,7 @@ public class ConnectionManager { public void removeConnection(Connection con) { boolean removed = false; synchronized (_connectionLock) { - Object o = _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId())); + Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId())); removed = (o == con); if (_log.shouldLog(Log.DEBUG)) _log.debug("Connection removed? " + removed + " remaining: " @@ -320,11 +314,9 @@ public class ConnectionManager { return ping(peer, timeoutMs, blocking, null, null, null); } public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) { - byte id[] = new byte[4]; - _context.random().nextBytes(id); - ByteArray ba = new ByteArray(id); + Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1); PacketLocal packet = new PacketLocal(_context, peer); - packet.setSendStreamId(id); + packet.setSendStreamId(id.longValue()); packet.setFlag(Packet.FLAG_ECHO); packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); packet.setOptionalFrom(_session.getMyDestination()); @@ -336,7 +328,7 @@ public class ConnectionManager { PingRequest req = new PingRequest(peer, packet, notifier); synchronized (_pendingPings) { - _pendingPings.put(ba, req); + _pendingPings.put(id, req); } _outboundQueue.enqueue(packet); @@ -349,10 +341,10 @@ public class ConnectionManager { } synchronized (_pendingPings) { - _pendingPings.remove(ba); + _pendingPings.remove(id); } } else { - SimpleTimer.getInstance().addEvent(new PingFailed(ba, notifier), timeoutMs); + SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs); } boolean ok = req.pongReceived(); @@ -364,17 +356,17 @@ public class ConnectionManager { } private class PingFailed implements SimpleTimer.TimedEvent { - private ByteArray _ba; + private Long _id; private PingNotifier _notifier; - public PingFailed(ByteArray ba, PingNotifier notifier) { - _ba = ba; + public PingFailed(Long id, PingNotifier notifier) { + _id = id; _notifier = notifier; } public void timeReached() { boolean removed = false; synchronized (_pendingPings) { - Object o = _pendingPings.remove(_ba); + Object o = _pendingPings.remove(_id); if (o != null) removed = true; } @@ -411,11 +403,10 @@ public class ConnectionManager { public boolean pongReceived() { return _ponged; } } - void receivePong(byte pingId[]) { - ByteArray ba = new ByteArray(pingId); + void receivePong(long pingId) { PingRequest req = null; synchronized (_pendingPings) { - req = (PingRequest)_pendingPings.remove(ba); + req = (PingRequest)_pendingPings.remove(new Long(pingId)); } if (req != null) req.pong(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 83fc55266..2e845e8f1 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -96,10 +96,8 @@ public class ConnectionPacketHandler { boolean allowAck = true; if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) && - ( (packet.getSendStreamId() == null) || - (packet.getReceiveStreamId() == null) || - (DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) || - (DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) ) ) + ( (packet.getSendStreamId() <= 0) || + (packet.getReceiveStreamId() <= 0) ) ) allowAck = false; if (allowAck) @@ -160,9 +158,7 @@ public class ConnectionPacketHandler { } } - if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && - ((packet.getSendStreamId() == null) || - DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN) ) ) { + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && (packet.getSendStreamId() <= 0) ) { // don't honor the ACK 0 in SYN packets received when the other side // has obviously not seen our messages } else { @@ -197,8 +193,8 @@ public class ConnectionPacketHandler { // could actually be acking data (this fixes the buggered up ack of packet 0 problem). // this is called after packet verification, which places the stream IDs as necessary if // the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet) - if ( (packet != null) && (packet.getSendStreamId() != null) && (packet.getReceiveStreamId() != null) && - (con != null) && (con.getSendStreamId() != null) && (con.getReceiveStreamId() != null) && + if ( (packet != null) && (packet.getSendStreamId() > 0) && (packet.getReceiveStreamId() > 0) && + (con != null) && (con.getSendStreamId() > 0) && (con.getReceiveStreamId() > 0) && (!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) && (!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) && (!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) && @@ -335,7 +331,7 @@ public class ConnectionPacketHandler { } else { verifySignature(packet, con); - if (con.getSendStreamId() == null) { + if (con.getSendStreamId() <= 0) { if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { con.setSendStreamId(packet.getReceiveStreamId()); con.setRemotePeer(packet.getOptionalFrom()); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index ea8f37370..bdeac2060 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -51,8 +51,8 @@ import net.i2p.util.ByteCache; * */ public class Packet { - private byte _sendStreamId[]; - private byte _receiveStreamId[]; + private long _sendStreamId; + private long _receiveStreamId; private long _sequenceNum; private long _ackThrough; private long _nacks[]; @@ -72,7 +72,9 @@ public class Packet { * synchronize packet) * */ - public static final byte STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 }; + public static final long STREAM_ID_UNKNOWN = 0l; + + public static final long MAX_STREAM_ID = 0xffffffffl; /** * This packet is creating a new socket connection (if the receiveStreamId @@ -149,17 +151,8 @@ public class Packet { } /** what stream is this packet a part of? */ - public byte[] getSendStreamId() { - if ( (_sendStreamId == null) || (DataHelper.eq(_sendStreamId, STREAM_ID_UNKNOWN)) ) - return null; - else - return _sendStreamId; - } - public void setSendStreamId(byte[] id) { - _sendStreamId = id; - if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) ) - _sendStreamId = null; - } + public long getSendStreamId() { return _sendStreamId; } + public void setSendStreamId(long id) { _sendStreamId = id; } /** * Stream that replies should be sent on. if the @@ -167,17 +160,8 @@ public class Packet { * null. * */ - public byte[] getReceiveStreamId() { - if ( (_receiveStreamId == null) || (DataHelper.eq(_receiveStreamId, STREAM_ID_UNKNOWN)) ) - return null; - else - return _receiveStreamId; - } - public void setReceiveStreamId(byte[] id) { - _receiveStreamId = id; - if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) ) - _receiveStreamId = null; - } + public long getReceiveStreamId() { return _receiveStreamId; } + public void setReceiveStreamId(long id) { _receiveStreamId = id; } /** 0-indexed sequence number for this Packet in the sendStream */ public long getSequenceNum() { return _sequenceNum; } @@ -312,15 +296,9 @@ public class Packet { */ private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException { int cur = offset; - if ( (_sendStreamId != null) && (_sendStreamId.length == 4) ) - System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length); - else - System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length); + DataHelper.toLong(buffer, cur, 4, (_sendStreamId >= 0 ? _sendStreamId : STREAM_ID_UNKNOWN)); cur += 4; - if ( (_receiveStreamId != null) && (_receiveStreamId.length == 4) ) - System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length); - else - System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length); + DataHelper.toLong(buffer, cur, 4, (_receiveStreamId >= 0 ? _receiveStreamId : STREAM_ID_UNKNOWN)); cur += 4; DataHelper.toLong(buffer, cur, 4, _sequenceNum > 0 ? _sequenceNum : 0); cur += 4; @@ -398,7 +376,7 @@ public class Packet { size += 4; // sequenceNum size += 4; // ackThrough if (_nacks != null) { - size++; // nacks length + size++; // nacks length size += 4 * _nacks.length; } else { size++; // nacks length @@ -440,11 +418,9 @@ public class Packet { if (length < 22) // min header size throw new IllegalArgumentException("Too small: len=" + buffer.length); int cur = offset; - _sendStreamId = new byte[4]; - System.arraycopy(buffer, cur, _sendStreamId, 0, 4); + _sendStreamId = DataHelper.fromLong(buffer, cur, 4); cur += 4; - _receiveStreamId = new byte[4]; - System.arraycopy(buffer, cur, _receiveStreamId, 0, 4); + _receiveStreamId = DataHelper.fromLong(buffer, cur, 4); cur += 4; _sequenceNum = DataHelper.fromLong(buffer, cur, 4); cur += 4; @@ -593,11 +569,8 @@ public class Packet { return buf; } - static final String toId(byte id[]) { - if (id == null) - return Base64.encode(STREAM_ID_UNKNOWN); - else - return Base64.encode(id); + static final String toId(long id) { + return Base64.encode(DataHelper.toLong(4, id)); } private final String toFlagString() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 72799f2fd..310c5e36b 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -97,11 +97,9 @@ public class PacketHandler { //if (_log.shouldLog(Log.DEBUG)) // _log.debug("packet received: " + packet); - byte sendId[] = packet.getSendStreamId(); - if (!isNonZero(sendId)) - sendId = null; + long sendId = packet.getSendStreamId(); - Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null); + Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null); if (con != null) { receiveKnownCon(con, packet); displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO()); @@ -127,9 +125,9 @@ public class PacketHandler { private void receiveKnownCon(Connection con, Packet packet) { if (packet.isFlagSet(Packet.FLAG_ECHO)) { - if (packet.getSendStreamId() != null) { + if (packet.getSendStreamId() > 0) { receivePing(packet); - } else if (packet.getReceiveStreamId() != null) { + } else if (packet.getReceiveStreamId() > 0) { receivePong(packet); } else { if (_log.shouldLog(Log.WARN)) @@ -162,9 +160,9 @@ public class PacketHandler { _log.warn("Received forged reset for " + con, ie); } } else { - if ( (con.getSendStreamId() == null) || + if ( (con.getSendStreamId() <= 0) || (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ) { - byte oldId[] =con.getSendStreamId(); + long oldId =con.getSendStreamId(); if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) // con fully established, w00t con.setSendStreamId(packet.getReceiveStreamId()); @@ -214,11 +212,11 @@ public class PacketHandler { _manager.getPacketQueue().enqueue(reply); } - private void receiveUnknownCon(Packet packet, byte sendId[]) { + private void receiveUnknownCon(Packet packet, long sendId) { if (packet.isFlagSet(Packet.FLAG_ECHO)) { - if (packet.getSendStreamId() != null) { + if (packet.getSendStreamId() > 0) { receivePing(packet); - } else if (packet.getReceiveStreamId() != null) { + } else if (packet.getReceiveStreamId() > 0) { receivePong(packet); } else { if (_log.shouldLog(Log.WARN)) @@ -228,7 +226,7 @@ public class PacketHandler { } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Packet received on an unknown stream (and not an ECHO): " + packet); - if (sendId == null) { + if (sendId <= 0) { Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId()); if (con != null) { if (con.getAckedPackets() <= 0) { @@ -257,7 +255,7 @@ public class PacketHandler { } _log.warn("Packet belongs to no other cons: " + packet + " connections: " + buf.toString() + " sendId: " - + (sendId != null ? Base64.encode(sendId) : " unknown")); + + (sendId > 0 ? Packet.toId(sendId) : " unknown")); } packet.releasePayload(); } @@ -289,25 +287,7 @@ public class PacketHandler { _manager.receivePong(packet.getReceiveStreamId()); } - private static final boolean isValidMatch(byte conStreamId[], byte packetStreamId[]) { - if ( (conStreamId == null) || (packetStreamId == null) || - (conStreamId.length != packetStreamId.length) ) - return false; - - boolean nonZeroFound = false; - for (int i = 0; i < conStreamId.length; i++) { - if (conStreamId[i] != packetStreamId[i]) return false; - if (conStreamId[i] != 0x0) nonZeroFound = true; - } - return nonZeroFound; - } - - private static final boolean isNonZero(byte[] b) { - boolean nonZeroFound = false; - for (int i = 0; b != null && i < b.length; i++) { - if (b[i] != 0x0) - nonZeroFound = true; - } - return nonZeroFound; + private static final boolean isValidMatch(long conStreamId, long packetStreamId) { + return ( (conStreamId == packetStreamId) && (conStreamId != 0) ); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java index 167b5a545..df09ff6d8 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java @@ -41,7 +41,7 @@ class SchedulerClosed extends SchedulerImpl { (!con.getResetReceived()) && (timeSinceClose < Connection.DISCONNECT_TIMEOUT); boolean conTimeout = (con.getOptions().getConnectTimeout() < con.getLifetime()) && - con.getSendStreamId() == null && + con.getSendStreamId() <= 0 && con.getLifetime() < Connection.DISCONNECT_TIMEOUT; return (ok || conTimeout); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java index 3c0d4bda8..2bff627c7 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java @@ -36,7 +36,7 @@ class SchedulerDead extends SchedulerImpl { boolean nothingLeftToDo = (con.getDisconnectScheduledOn() > 0) && (timeSinceClose >= Connection.DISCONNECT_TIMEOUT); boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) && - con.getSendStreamId() == null && + con.getSendStreamId() <= 0 && con.getLifetime() >= Connection.DISCONNECT_TIMEOUT; return nothingLeftToDo || timedOut; } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java index 21d375de9..091a7131f 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java @@ -31,7 +31,7 @@ class SchedulerPreconnect extends SchedulerImpl { public boolean accept(Connection con) { return (con != null) && - (con.getSendStreamId() == null) && + (con.getSendStreamId() <= 0) && (con.getLastSendId() < 0); } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java index 89d28b354..343b71620 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java @@ -19,7 +19,7 @@ class SchedulerReceived extends SchedulerImpl { public boolean accept(Connection con) { return (con != null) && (con.getLastSendId() < 0) && - (con.getSendStreamId() != null); + (con.getSendStreamId() > 0); } public void eventOccurred(Connection con) { diff --git a/history.txt b/history.txt index 3f1a04119..c99c6dabd 100644 --- a/history.txt +++ b/history.txt @@ -1,4 +1,13 @@ -$Id: history.txt,v 1.264 2005/09/25 04:29:01 jrandom Exp $ +$Id: history.txt,v 1.265 2005/09/25 18:52:58 jrandom Exp $ + +2005-09-26 jrandom + * Reworded the SSU introductions config section (thanks duck!) + * Force identity content encoding for I2PTunnel httpserver requests + (thanks redzara!) + * Further x-i2p-gzip bugfixes for the end of streams + * Reduce the minimum bandwidth limits to 3KBps steady and burst (though + I2P's performance at 3KBps is another issue) + * Cleaned up some streaming lib structures 2005-09-25 jrandom * Allow reseeding on the console if the netDb knows less than 30 peers, diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index f8e230f72..abe004600 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -15,9 +15,9 @@ import net.i2p.CoreVersion; * */ public class RouterVersion { - public final static String ID = "$Revision: 1.244 $ $Date: 2005/09/25 04:29:02 $"; + public final static String ID = "$Revision: 1.245 $ $Date: 2005/09/25 18:52:58 $"; public final static String VERSION = "0.6.0.6"; - public final static long BUILD = 4; + public final static long BUILD = 5; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID); diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java index df71d2749..34300d18a 100644 --- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java +++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java @@ -567,7 +567,7 @@ public class FIFOBandwidthLimiter { public void renderStatusHTML(Writer out) throws IOException { long now = now(); StringBuffer buf = new StringBuffer(4096); - buf.append("
Limiter status: ").append(getStatus().toString()).append("
\n"); + buf.append("
Limiter status: ").append(getStatus().toString()).append("
\n"); buf.append("Pending bandwidth requests: