diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java
index 9400efd3e6..16a2ee34bf 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/HTTPResponseOutputStream.java
@@ -37,6 +37,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
private byte _buf1[];
protected boolean _gzip;
private long _dataWritten;
+ private InternalGZIPInputStream _in;
private static final int CACHE_SIZE = 8*1024;
public HTTPResponseOutputStream(OutputStream raw) {
@@ -199,7 +200,7 @@ class HTTPResponseOutputStream extends FilterOutputStream {
}
protected void beginProcessing() throws IOException {
- out.flush();
+ //out.flush();
PipedInputStream pi = new PipedInputStream();
PipedOutputStream po = new PipedOutputStream(pi);
new I2PThread(new Pusher(pi, out), "HTTP decompresser").start();
@@ -207,22 +208,22 @@ class HTTPResponseOutputStream extends FilterOutputStream {
}
private class Pusher implements Runnable {
- private InputStream _in;
+ private InputStream _inRaw;
private OutputStream _out;
public Pusher(InputStream in, OutputStream out) {
- _in = in;
+ _inRaw = in;
_out = out;
}
public void run() {
OutputStream to = null;
- InternalGZIPInputStream in = null;
+ _in = null;
long start = System.currentTimeMillis();
long written = 0;
try {
- in = new InternalGZIPInputStream(_in);
+ _in = new InternalGZIPInputStream(_inRaw);
byte buf[] = new byte[8192];
int read = -1;
- while ( (read = in.read(buf)) != -1) {
+ while ( (read = _in.read(buf)) != -1) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Read " + read + " and writing it to the browser/streams");
_out.write(buf, 0, read);
@@ -230,16 +231,22 @@ class HTTPResponseOutputStream extends FilterOutputStream {
written += read;
}
if (_log.shouldLog(Log.INFO))
- _log.info("Decompressed: " + written + ", " + in.getTotalRead() + "/" + in.getTotalExpanded());
+ _log.info("Decompressed: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded());
} catch (IOException ioe) {
if (_log.shouldLog(Log.WARN))
- _log.warn("Error decompressing: " + written + ", " + in.getTotalRead() + "/" + in.getTotalExpanded(), ioe);
+ _log.warn("Error decompressing: " + written + ", " + _in.getTotalRead() + "/" + _in.getTotalExpanded(), ioe);
} finally {
- if (_out != null) try { _out.close(); } catch (IOException ioe) {}
+ if (_log.shouldLog(Log.WARN))
+ _log.warn("After decompression, written=" + written + " read=" + _in.getTotalRead()
+ + ", expanded=" + _in.getTotalExpanded() + ", remaining=" + _in.getRemaining()
+ + ", finished=" + _in.getFinished());
+ if (_out != null) try {
+ _out.close();
+ } catch (IOException ioe) {}
}
long end = System.currentTimeMillis();
- double compressed = in.getTotalRead();
- double expanded = in.getTotalExpanded();
+ double compressed = _in.getTotalRead();
+ double expanded = _in.getTotalExpanded();
double ratio = 0;
if (expanded > 0)
ratio = compressed/expanded;
@@ -255,6 +262,15 @@ class HTTPResponseOutputStream extends FilterOutputStream {
}
public long getTotalRead() { return super.inf.getTotalIn(); }
public long getTotalExpanded() { return super.inf.getTotalOut(); }
+ public long getRemaining() { return super.inf.getRemaining(); }
+ public boolean getFinished() { return super.inf.finished(); }
+ public String toString() {
+ return "Read: " + getTotalRead() + " expanded: " + getTotalExpanded() + " remaining: " + getRemaining() + " finished: " + getFinished();
+ }
+ }
+
+ public String toString() {
+ return super.toString() + ": " + _in;
}
public static void main(String args[]) {
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java
index 7688d41035..8c442bcc65 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPClientRunner.java
@@ -3,16 +3,13 @@
*/
package net.i2p.i2ptunnel;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.FilterOutputStream;
+import java.io.*;
import java.net.Socket;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
+import net.i2p.I2PAppContext;
import net.i2p.client.streaming.I2PSocket;
import net.i2p.data.ByteArray;
import net.i2p.data.DataHelper;
@@ -30,12 +27,38 @@ import net.i2p.util.Log;
*
*/
public class I2PTunnelHTTPClientRunner extends I2PTunnelRunner {
+ private Log _log;
public I2PTunnelHTTPClientRunner(Socket s, I2PSocket i2ps, Object slock, byte[] initialI2PData, List sockList, Runnable onTimeout) {
super(s, i2ps, slock, initialI2PData, sockList, onTimeout);
+ _log = I2PAppContext.getGlobalContext().logManager().getLog(I2PTunnelHTTPClientRunner.class);
}
protected OutputStream getSocketOut() throws IOException {
OutputStream raw = super.getSocketOut();
return new HTTPResponseOutputStream(raw);
}
+
+ protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException {
+ try {
+ i2pin.close();
+ i2pout.close();
+ } catch (IOException ioe) {
+ // ignore
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Unable to close the i2p socket output stream: " + i2pout, ioe);
+ }
+ try {
+ in.close();
+ out.close();
+ } catch (IOException ioe) {
+ // ignore
+ if (_log.shouldLog(Log.DEBUG))
+ _log.debug("Unable to close the browser output stream: " + out, ioe);
+ }
+ i2ps.close();
+ s.close();
+ t1.join(30*1000);
+ t2.join(30*1000);
+ }
+
}
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java
index b7363fcda0..abccaa5538 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelHTTPServer.java
@@ -71,6 +71,10 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
if ( (_spoofHost != null) && (_spoofHost.trim().length() > 0) )
headers.setProperty("Host", _spoofHost);
headers.setProperty("Connection", "close");
+ // we keep the enc sent by the browser before clobbering it, since it may have
+ // been x-i2p-gzip
+ String enc = headers.getProperty("Accept-encoding");
+ headers.setProperty("Accept-encoding", "identity;q=1, *;q=0");
String modifiedHeader = formatHeaders(headers, command);
//String modifiedHeader = getModifiedHeader(socket);
@@ -91,7 +95,6 @@ public class I2PTunnelHTTPServer extends I2PTunnelServer {
if ( (val != null) && (!Boolean.valueOf(val).booleanValue()) )
allowGZIP = false;
}
- String enc = headers.getProperty("Accept-encoding");
if (_log.shouldLog(Log.INFO))
_log.info("HTTP server encoding header: " + enc);
if ( allowGZIP && (enc != null) && (enc.indexOf("x-i2p-gzip") >= 0) ) {
diff --git a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
index c26eccf47b..d3a8973d27 100644
--- a/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
+++ b/apps/i2ptunnel/java/src/net/i2p/i2ptunnel/I2PTunnelRunner.java
@@ -153,11 +153,8 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
onTimeout.run();
}
- // now one connection is dead - kill the other as well.
- s.close();
- i2ps.close();
- t1.join(30*1000);
- t2.join(30*1000);
+ // now one connection is dead - kill the other as well, after making sure we flush
+ close(out, in, i2pout, i2pin, s, i2ps, t1, t2);
} catch (InterruptedException ex) {
if (_log.shouldLog(Log.ERROR))
_log.error("Interrupted", ex);
@@ -188,6 +185,27 @@ public class I2PTunnelRunner extends I2PThread implements I2PSocket.SocketErrorL
}
}
+ protected void close(OutputStream out, InputStream in, OutputStream i2pout, InputStream i2pin, Socket s, I2PSocket i2ps, Thread t1, Thread t2) throws InterruptedException, IOException {
+ try {
+ out.flush();
+ } catch (IOException ioe) {
+ // ignore
+ }
+ try {
+ i2pout.flush();
+ } catch (IOException ioe) {
+ // ignore
+ }
+ in.close();
+ i2pin.close();
+ // ok, yeah, there's a race here in theory, if data comes in after flushing and before
+ // closing, but its better than before...
+ s.close();
+ i2ps.close();
+ t1.join(30*1000);
+ t2.join(30*1000);
+ }
+
public void errorOccurred() {
synchronized (finishLock) {
finished = true;
diff --git a/apps/routerconsole/jsp/config.jsp b/apps/routerconsole/jsp/config.jsp
index e500a20ddc..3cd2455581 100644
--- a/apps/routerconsole/jsp/config.jsp
+++ b/apps/routerconsole/jsp/config.jsp
@@ -29,10 +29,14 @@
External UDP address:
- Require SSU introductions through NAT hole punching?
+ Require SSU introductions?
/>
-
If you can't poke a hole in your NAT or firewall to allow unsolicited UDP packets to reach the
- router, as detected with the Status: ERR-Reject, then you will need SSU introductions.
+
If you can, please poke a hole in your NAT or firewall to allow unsolicited UDP packets to reach
+ you on your external UDP address. If you can't, I2P now includes supports UDP hole punching
+ with "SSU introductions" - peers who will relay a request from someone you don't know to your
+ router for your router so that you can make an outbound connection to them. I2P will use these
+ introductions automatically if it detects that the port is not forwarded (as shown by
+ the Status: OK (NAT) line), or you can manually require them here.
Users behind symmetric NATs, such as OpenBSD's pf, are not currently supported.
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
index 9d6f53c134..4980d2c9d9 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java
@@ -24,8 +24,8 @@ public class Connection {
private Log _log;
private ConnectionManager _connectionManager;
private Destination _remotePeer;
- private byte _sendStreamId[];
- private byte _receiveStreamId[];
+ private long _sendStreamId;
+ private long _receiveStreamId;
private long _lastSendTime;
private long _lastSendId;
private boolean _resetReceived;
@@ -205,7 +205,7 @@ public class Connection {
_resetSent = true;
if (_resetSentOn <= 0)
_resetSentOn = _context.clock().now();
- if ( (_remotePeer == null) || (_sendStreamId == null) ) return;
+ if ( (_remotePeer == null) || (_sendStreamId <= 0) ) return;
PacketLocal reply = new PacketLocal(_context, _remotePeer);
reply.setFlag(Packet.FLAG_RESET);
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
@@ -521,12 +521,12 @@ public class Connection {
public void setRemotePeer(Destination peer) { _remotePeer = peer; }
/** what stream do we send data to the peer on? */
- public byte[] getSendStreamId() { return _sendStreamId; }
- public void setSendStreamId(byte[] id) { _sendStreamId = id; }
+ public long getSendStreamId() { return _sendStreamId; }
+ public void setSendStreamId(long id) { _sendStreamId = id; }
/** stream the peer sends data to us on. (may be null) */
- public byte[] getReceiveStreamId() { return _receiveStreamId; }
- public void setReceiveStreamId(byte[] id) {
+ public long getReceiveStreamId() { return _receiveStreamId; }
+ public void setReceiveStreamId(long id) {
_receiveStreamId = id;
synchronized (_connectLock) { _connectLock.notifyAll(); }
}
@@ -653,7 +653,7 @@ public class Connection {
void waitForConnect() {
long expiration = _context.clock().now() + _options.getConnectTimeout();
while (true) {
- if (_connected && (_receiveStreamId != null) && (_sendStreamId != null) ) {
+ if (_connected && (_receiveStreamId > 0) && (_sendStreamId > 0) ) {
// w00t
if (_log.shouldLog(Log.DEBUG))
_log.debug("waitForConnect(): Connected and we have stream IDs");
@@ -793,13 +793,13 @@ public class Connection {
public String toString() {
StringBuffer buf = new StringBuffer(128);
buf.append("[Connection ");
- if (_receiveStreamId != null)
- buf.append(Base64.encode(_receiveStreamId));
+ if (_receiveStreamId > 0)
+ buf.append(Packet.toId(_receiveStreamId));
else
buf.append("unknown");
buf.append("<-->");
- if (_sendStreamId != null)
- buf.append(Base64.encode(_sendStreamId));
+ if (_sendStreamId > 0)
+ buf.append(Packet.toId(_sendStreamId));
else
buf.append("unknown");
buf.append(" wsize: ").append(_options.getWindowSize());
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
index 2deb203d3b..471cc97a87 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java
@@ -127,7 +127,7 @@ class ConnectionHandler {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(packet.getSequenceNum());
reply.setSendStreamId(packet.getReceiveStreamId());
- reply.setReceiveStreamId(null);
+ reply.setReceiveStreamId(0);
reply.setOptionalFrom(_manager.getSession().getMyDestination());
if (_log.shouldLog(Log.DEBUG))
_log.debug("Sending RST: " + reply + " because of " + packet);
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
index 9da1b4aec2..4670033be0 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java
@@ -31,9 +31,9 @@ public class ConnectionManager {
private PacketQueue _outboundQueue;
private SchedulerChooser _schedulerChooser;
private ConnectionPacketHandler _conPacketHandler;
- /** Inbound stream ID (ByteArray) to Connection map */
+ /** Inbound stream ID (Long) to Connection map */
private Map _connectionByInboundId;
- /** Ping ID (ByteArray) to PingRequest */
+ /** Ping ID (Long) to PingRequest */
private Map _pendingPings;
private boolean _allowIncoming;
private int _maxConcurrentStreams;
@@ -71,16 +71,16 @@ public class ConnectionManager {
_context.statManager().createRateStat("stream.receiveActive", "How many streams are active when a new one is received (period being not yet dropped)", "Stream", new long[] { 60*60*1000, 24*60*60*1000 });
}
- Connection getConnectionByInboundId(byte[] id) {
+ Connection getConnectionByInboundId(long id) {
synchronized (_connectionLock) {
- return (Connection)_connectionByInboundId.get(new ByteArray(id));
+ return (Connection)_connectionByInboundId.get(new Long(id));
}
}
/**
* not guaranteed to be unique, but in case we receive more than one packet
* on an inbound connection that we havent ack'ed yet...
*/
- Connection getConnectionByOutboundId(byte[] id) {
+ Connection getConnectionByOutboundId(long id) {
synchronized (_connectionLock) {
for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) {
Connection con = (Connection)iter.next();
@@ -107,8 +107,7 @@ public class ConnectionManager {
*/
public Connection receiveConnection(Packet synPacket) {
Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, new ConnectionOptions(_defaultOptions));
- byte receiveId[] = new byte[4];
- _context.random().nextBytes(receiveId);
+ long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
boolean reject = false;
int active = 0;
int total = 0;
@@ -122,16 +121,13 @@ public class ConnectionManager {
reject = true;
} else {
while (true) {
- ByteArray ba = new ByteArray(receiveId);
- Connection oldCon = (Connection)_connectionByInboundId.put(ba, con);
+ Connection oldCon = (Connection)_connectionByInboundId.put(new Long(receiveId), con);
if (oldCon == null) {
break;
} else {
- _connectionByInboundId.put(ba, oldCon);
+ _connectionByInboundId.put(new Long(receiveId), oldCon);
// receiveId already taken, try another
- // (need to realloc receiveId, as ba.getData() points to the old value)
- receiveId = new byte[4];
- _context.random().nextBytes(receiveId);
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
}
}
@@ -148,7 +144,7 @@ public class ConnectionManager {
reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
reply.setAckThrough(synPacket.getSequenceNum());
reply.setSendStreamId(synPacket.getReceiveStreamId());
- reply.setReceiveStreamId(null);
+ reply.setReceiveStreamId(0);
reply.setOptionalFrom(_session.getMyDestination());
// this just sends the packet - no retries or whatnot
_outboundQueue.enqueue(reply);
@@ -160,7 +156,7 @@ public class ConnectionManager {
con.getPacketHandler().receivePacket(synPacket, con);
} catch (I2PException ie) {
synchronized (_connectionLock) {
- _connectionByInboundId.remove(new ByteArray(receiveId));
+ _connectionByInboundId.remove(new Long(receiveId));
}
return null;
}
@@ -179,8 +175,7 @@ public class ConnectionManager {
*/
public Connection connect(Destination peer, ConnectionOptions opts) {
Connection con = null;
- byte receiveId[] = new byte[4];
- _context.random().nextBytes(receiveId);
+ long receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
long expiration = _context.clock().now() + opts.getConnectTimeout();
if (opts.getConnectTimeout() <= 0)
expiration = _context.clock().now() + DEFAULT_STREAM_DELAY_MAX;
@@ -213,11 +208,10 @@ public class ConnectionManager {
con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts);
con.setRemotePeer(peer);
- ByteArray ba = new ByteArray(receiveId);
- while (_connectionByInboundId.containsKey(ba)) {
- _context.random().nextBytes(receiveId);
+ while (_connectionByInboundId.containsKey(new Long(receiveId))) {
+ receiveId = _context.random().nextLong(Packet.MAX_STREAM_ID-1)+1;
}
- _connectionByInboundId.put(ba, con);
+ _connectionByInboundId.put(new Long(receiveId), con);
break; // stop looping as a psuedo-wait
}
}
@@ -284,7 +278,7 @@ public class ConnectionManager {
public void removeConnection(Connection con) {
boolean removed = false;
synchronized (_connectionLock) {
- Object o = _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId()));
+ Object o = _connectionByInboundId.remove(new Long(con.getReceiveStreamId()));
removed = (o == con);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Connection removed? " + removed + " remaining: "
@@ -320,11 +314,9 @@ public class ConnectionManager {
return ping(peer, timeoutMs, blocking, null, null, null);
}
public boolean ping(Destination peer, long timeoutMs, boolean blocking, SessionKey keyToUse, Set tagsToSend, PingNotifier notifier) {
- byte id[] = new byte[4];
- _context.random().nextBytes(id);
- ByteArray ba = new ByteArray(id);
+ Long id = new Long(_context.random().nextLong(Packet.MAX_STREAM_ID-1)+1);
PacketLocal packet = new PacketLocal(_context, peer);
- packet.setSendStreamId(id);
+ packet.setSendStreamId(id.longValue());
packet.setFlag(Packet.FLAG_ECHO);
packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED);
packet.setOptionalFrom(_session.getMyDestination());
@@ -336,7 +328,7 @@ public class ConnectionManager {
PingRequest req = new PingRequest(peer, packet, notifier);
synchronized (_pendingPings) {
- _pendingPings.put(ba, req);
+ _pendingPings.put(id, req);
}
_outboundQueue.enqueue(packet);
@@ -349,10 +341,10 @@ public class ConnectionManager {
}
synchronized (_pendingPings) {
- _pendingPings.remove(ba);
+ _pendingPings.remove(id);
}
} else {
- SimpleTimer.getInstance().addEvent(new PingFailed(ba, notifier), timeoutMs);
+ SimpleTimer.getInstance().addEvent(new PingFailed(id, notifier), timeoutMs);
}
boolean ok = req.pongReceived();
@@ -364,17 +356,17 @@ public class ConnectionManager {
}
private class PingFailed implements SimpleTimer.TimedEvent {
- private ByteArray _ba;
+ private Long _id;
private PingNotifier _notifier;
- public PingFailed(ByteArray ba, PingNotifier notifier) {
- _ba = ba;
+ public PingFailed(Long id, PingNotifier notifier) {
+ _id = id;
_notifier = notifier;
}
public void timeReached() {
boolean removed = false;
synchronized (_pendingPings) {
- Object o = _pendingPings.remove(_ba);
+ Object o = _pendingPings.remove(_id);
if (o != null)
removed = true;
}
@@ -411,11 +403,10 @@ public class ConnectionManager {
public boolean pongReceived() { return _ponged; }
}
- void receivePong(byte pingId[]) {
- ByteArray ba = new ByteArray(pingId);
+ void receivePong(long pingId) {
PingRequest req = null;
synchronized (_pendingPings) {
- req = (PingRequest)_pendingPings.remove(ba);
+ req = (PingRequest)_pendingPings.remove(new Long(pingId));
}
if (req != null)
req.pong();
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
index 83fc55266a..2e845e8f11 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java
@@ -96,10 +96,8 @@ public class ConnectionPacketHandler {
boolean allowAck = true;
if ( (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) &&
- ( (packet.getSendStreamId() == null) ||
- (packet.getReceiveStreamId() == null) ||
- (DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) ||
- (DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) ) )
+ ( (packet.getSendStreamId() <= 0) ||
+ (packet.getReceiveStreamId() <= 0) ) )
allowAck = false;
if (allowAck)
@@ -160,9 +158,7 @@ public class ConnectionPacketHandler {
}
}
- if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) &&
- ((packet.getSendStreamId() == null) ||
- DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN) ) ) {
+ if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) && (packet.getSendStreamId() <= 0) ) {
// don't honor the ACK 0 in SYN packets received when the other side
// has obviously not seen our messages
} else {
@@ -197,8 +193,8 @@ public class ConnectionPacketHandler {
// could actually be acking data (this fixes the buggered up ack of packet 0 problem).
// this is called after packet verification, which places the stream IDs as necessary if
// the SYN verifies (so if we're acking w/out stream IDs, no SYN has been received yet)
- if ( (packet != null) && (packet.getSendStreamId() != null) && (packet.getReceiveStreamId() != null) &&
- (con != null) && (con.getSendStreamId() != null) && (con.getReceiveStreamId() != null) &&
+ if ( (packet != null) && (packet.getSendStreamId() > 0) && (packet.getReceiveStreamId() > 0) &&
+ (con != null) && (con.getSendStreamId() > 0) && (con.getReceiveStreamId() > 0) &&
(!DataHelper.eq(packet.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(packet.getReceiveStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
(!DataHelper.eq(con.getSendStreamId(), Packet.STREAM_ID_UNKNOWN)) &&
@@ -335,7 +331,7 @@ public class ConnectionPacketHandler {
} else {
verifySignature(packet, con);
- if (con.getSendStreamId() == null) {
+ if (con.getSendStreamId() <= 0) {
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) {
con.setSendStreamId(packet.getReceiveStreamId());
con.setRemotePeer(packet.getOptionalFrom());
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
index ea8f373701..bdeac20608 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
@@ -51,8 +51,8 @@ import net.i2p.util.ByteCache;
*
*/
public class Packet {
- private byte _sendStreamId[];
- private byte _receiveStreamId[];
+ private long _sendStreamId;
+ private long _receiveStreamId;
private long _sequenceNum;
private long _ackThrough;
private long _nacks[];
@@ -72,7 +72,9 @@ public class Packet {
* synchronize packet)
*
*/
- public static final byte STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 };
+ public static final long STREAM_ID_UNKNOWN = 0l;
+
+ public static final long MAX_STREAM_ID = 0xffffffffl;
/**
* This packet is creating a new socket connection (if the receiveStreamId
@@ -149,17 +151,8 @@ public class Packet {
}
/** what stream is this packet a part of? */
- public byte[] getSendStreamId() {
- if ( (_sendStreamId == null) || (DataHelper.eq(_sendStreamId, STREAM_ID_UNKNOWN)) )
- return null;
- else
- return _sendStreamId;
- }
- public void setSendStreamId(byte[] id) {
- _sendStreamId = id;
- if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) )
- _sendStreamId = null;
- }
+ public long getSendStreamId() { return _sendStreamId; }
+ public void setSendStreamId(long id) { _sendStreamId = id; }
/**
* Stream that replies should be sent on. if the
@@ -167,17 +160,8 @@ public class Packet {
* null.
*
*/
- public byte[] getReceiveStreamId() {
- if ( (_receiveStreamId == null) || (DataHelper.eq(_receiveStreamId, STREAM_ID_UNKNOWN)) )
- return null;
- else
- return _receiveStreamId;
- }
- public void setReceiveStreamId(byte[] id) {
- _receiveStreamId = id;
- if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) )
- _receiveStreamId = null;
- }
+ public long getReceiveStreamId() { return _receiveStreamId; }
+ public void setReceiveStreamId(long id) { _receiveStreamId = id; }
/** 0-indexed sequence number for this Packet in the sendStream */
public long getSequenceNum() { return _sequenceNum; }
@@ -312,15 +296,9 @@ public class Packet {
*/
private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException {
int cur = offset;
- if ( (_sendStreamId != null) && (_sendStreamId.length == 4) )
- System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length);
- else
- System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length);
+ DataHelper.toLong(buffer, cur, 4, (_sendStreamId >= 0 ? _sendStreamId : STREAM_ID_UNKNOWN));
cur += 4;
- if ( (_receiveStreamId != null) && (_receiveStreamId.length == 4) )
- System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length);
- else
- System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length);
+ DataHelper.toLong(buffer, cur, 4, (_receiveStreamId >= 0 ? _receiveStreamId : STREAM_ID_UNKNOWN));
cur += 4;
DataHelper.toLong(buffer, cur, 4, _sequenceNum > 0 ? _sequenceNum : 0);
cur += 4;
@@ -398,7 +376,7 @@ public class Packet {
size += 4; // sequenceNum
size += 4; // ackThrough
if (_nacks != null) {
- size++; // nacks length
+ size++; // nacks length
size += 4 * _nacks.length;
} else {
size++; // nacks length
@@ -440,11 +418,9 @@ public class Packet {
if (length < 22) // min header size
throw new IllegalArgumentException("Too small: len=" + buffer.length);
int cur = offset;
- _sendStreamId = new byte[4];
- System.arraycopy(buffer, cur, _sendStreamId, 0, 4);
+ _sendStreamId = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
- _receiveStreamId = new byte[4];
- System.arraycopy(buffer, cur, _receiveStreamId, 0, 4);
+ _receiveStreamId = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
_sequenceNum = DataHelper.fromLong(buffer, cur, 4);
cur += 4;
@@ -593,11 +569,8 @@ public class Packet {
return buf;
}
- static final String toId(byte id[]) {
- if (id == null)
- return Base64.encode(STREAM_ID_UNKNOWN);
- else
- return Base64.encode(id);
+ static final String toId(long id) {
+ return Base64.encode(DataHelper.toLong(4, id));
}
private final String toFlagString() {
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
index 72799f2fdf..310c5e36be 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java
@@ -97,11 +97,9 @@ public class PacketHandler {
//if (_log.shouldLog(Log.DEBUG))
// _log.debug("packet received: " + packet);
- byte sendId[] = packet.getSendStreamId();
- if (!isNonZero(sendId))
- sendId = null;
+ long sendId = packet.getSendStreamId();
- Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null);
+ Connection con = (sendId > 0 ? _manager.getConnectionByInboundId(sendId) : null);
if (con != null) {
receiveKnownCon(con, packet);
displayPacket(packet, "RECV", "wsize " + con.getOptions().getWindowSize() + " rto " + con.getOptions().getRTO());
@@ -127,9 +125,9 @@ public class PacketHandler {
private void receiveKnownCon(Connection con, Packet packet) {
if (packet.isFlagSet(Packet.FLAG_ECHO)) {
- if (packet.getSendStreamId() != null) {
+ if (packet.getSendStreamId() > 0) {
receivePing(packet);
- } else if (packet.getReceiveStreamId() != null) {
+ } else if (packet.getReceiveStreamId() > 0) {
receivePong(packet);
} else {
if (_log.shouldLog(Log.WARN))
@@ -162,9 +160,9 @@ public class PacketHandler {
_log.warn("Received forged reset for " + con, ie);
}
} else {
- if ( (con.getSendStreamId() == null) ||
+ if ( (con.getSendStreamId() <= 0) ||
(DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ) {
- byte oldId[] =con.getSendStreamId();
+ long oldId =con.getSendStreamId();
if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) // con fully established, w00t
con.setSendStreamId(packet.getReceiveStreamId());
@@ -214,11 +212,11 @@ public class PacketHandler {
_manager.getPacketQueue().enqueue(reply);
}
- private void receiveUnknownCon(Packet packet, byte sendId[]) {
+ private void receiveUnknownCon(Packet packet, long sendId) {
if (packet.isFlagSet(Packet.FLAG_ECHO)) {
- if (packet.getSendStreamId() != null) {
+ if (packet.getSendStreamId() > 0) {
receivePing(packet);
- } else if (packet.getReceiveStreamId() != null) {
+ } else if (packet.getReceiveStreamId() > 0) {
receivePong(packet);
} else {
if (_log.shouldLog(Log.WARN))
@@ -228,7 +226,7 @@ public class PacketHandler {
} else {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Packet received on an unknown stream (and not an ECHO): " + packet);
- if (sendId == null) {
+ if (sendId <= 0) {
Connection con = _manager.getConnectionByOutboundId(packet.getReceiveStreamId());
if (con != null) {
if (con.getAckedPackets() <= 0) {
@@ -257,7 +255,7 @@ public class PacketHandler {
}
_log.warn("Packet belongs to no other cons: " + packet + " connections: "
+ buf.toString() + " sendId: "
- + (sendId != null ? Base64.encode(sendId) : " unknown"));
+ + (sendId > 0 ? Packet.toId(sendId) : " unknown"));
}
packet.releasePayload();
}
@@ -289,25 +287,7 @@ public class PacketHandler {
_manager.receivePong(packet.getReceiveStreamId());
}
- private static final boolean isValidMatch(byte conStreamId[], byte packetStreamId[]) {
- if ( (conStreamId == null) || (packetStreamId == null) ||
- (conStreamId.length != packetStreamId.length) )
- return false;
-
- boolean nonZeroFound = false;
- for (int i = 0; i < conStreamId.length; i++) {
- if (conStreamId[i] != packetStreamId[i]) return false;
- if (conStreamId[i] != 0x0) nonZeroFound = true;
- }
- return nonZeroFound;
- }
-
- private static final boolean isNonZero(byte[] b) {
- boolean nonZeroFound = false;
- for (int i = 0; b != null && i < b.length; i++) {
- if (b[i] != 0x0)
- nonZeroFound = true;
- }
- return nonZeroFound;
+ private static final boolean isValidMatch(long conStreamId, long packetStreamId) {
+ return ( (conStreamId == packetStreamId) && (conStreamId != 0) );
}
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java
index 167b5a5458..df09ff6d86 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java
@@ -41,7 +41,7 @@ class SchedulerClosed extends SchedulerImpl {
(!con.getResetReceived()) &&
(timeSinceClose < Connection.DISCONNECT_TIMEOUT);
boolean conTimeout = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
- con.getSendStreamId() == null &&
+ con.getSendStreamId() <= 0 &&
con.getLifetime() < Connection.DISCONNECT_TIMEOUT;
return (ok || conTimeout);
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java
index 3c0d4bda8e..2bff627c7a 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java
@@ -36,7 +36,7 @@ class SchedulerDead extends SchedulerImpl {
boolean nothingLeftToDo = (con.getDisconnectScheduledOn() > 0) &&
(timeSinceClose >= Connection.DISCONNECT_TIMEOUT);
boolean timedOut = (con.getOptions().getConnectTimeout() < con.getLifetime()) &&
- con.getSendStreamId() == null &&
+ con.getSendStreamId() <= 0 &&
con.getLifetime() >= Connection.DISCONNECT_TIMEOUT;
return nothingLeftToDo || timedOut;
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java
index 21d375de9f..091a7131f6 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java
@@ -31,7 +31,7 @@ class SchedulerPreconnect extends SchedulerImpl {
public boolean accept(Connection con) {
return (con != null) &&
- (con.getSendStreamId() == null) &&
+ (con.getSendStreamId() <= 0) &&
(con.getLastSendId() < 0);
}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java
index 89d28b3546..343b716201 100644
--- a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java
+++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java
@@ -19,7 +19,7 @@ class SchedulerReceived extends SchedulerImpl {
public boolean accept(Connection con) {
return (con != null) &&
(con.getLastSendId() < 0) &&
- (con.getSendStreamId() != null);
+ (con.getSendStreamId() > 0);
}
public void eventOccurred(Connection con) {
diff --git a/history.txt b/history.txt
index 3f1a04119e..c99c6dabdc 100644
--- a/history.txt
+++ b/history.txt
@@ -1,4 +1,13 @@
-$Id: history.txt,v 1.264 2005/09/25 04:29:01 jrandom Exp $
+$Id: history.txt,v 1.265 2005/09/25 18:52:58 jrandom Exp $
+
+2005-09-26 jrandom
+ * Reworded the SSU introductions config section (thanks duck!)
+ * Force identity content encoding for I2PTunnel httpserver requests
+ (thanks redzara!)
+ * Further x-i2p-gzip bugfixes for the end of streams
+ * Reduce the minimum bandwidth limits to 3KBps steady and burst (though
+ I2P's performance at 3KBps is another issue)
+ * Cleaned up some streaming lib structures
2005-09-25 jrandom
* Allow reseeding on the console if the netDb knows less than 30 peers,
diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java
index f8e230f722..abe0046008 100644
--- a/router/java/src/net/i2p/router/RouterVersion.java
+++ b/router/java/src/net/i2p/router/RouterVersion.java
@@ -15,9 +15,9 @@ import net.i2p.CoreVersion;
*
*/
public class RouterVersion {
- public final static String ID = "$Revision: 1.244 $ $Date: 2005/09/25 04:29:02 $";
+ public final static String ID = "$Revision: 1.245 $ $Date: 2005/09/25 18:52:58 $";
public final static String VERSION = "0.6.0.6";
- public final static long BUILD = 4;
+ public final static long BUILD = 5;
public static void main(String args[]) {
System.out.println("I2P Router version: " + VERSION + "-" + BUILD);
System.out.println("Router ID: " + RouterVersion.ID);
diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java
index df71d27492..34300d18a4 100644
--- a/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java
+++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthLimiter.java
@@ -567,7 +567,7 @@ public class FIFOBandwidthLimiter {
public void renderStatusHTML(Writer out) throws IOException {
long now = now();
StringBuffer buf = new StringBuffer(4096);
- buf.append(" Limiter status: ").append(getStatus().toString()).append(" \n");
+ buf.append(" Limiter status: ").append(getStatus().toString()).append(" \n");
buf.append("Pending bandwidth requests:
");
buf.append("
Inbound requests: ");
synchronized (_pendingInboundRequests) {
diff --git a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java
index 0218600414..b52c111299 100644
--- a/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java
+++ b/router/java/src/net/i2p/router/transport/FIFOBandwidthRefiller.java
@@ -33,19 +33,19 @@ class FIFOBandwidthRefiller implements Runnable {
// no longer allow unlimited bandwidth - the user must specify a value, and if they do not, it is 16KBps
public static final int DEFAULT_INBOUND_BANDWIDTH = 16;
public static final int DEFAULT_OUTBOUND_BANDWIDTH = 16;
- public static final int DEFAULT_INBOUND_BURST_BANDWIDTH = 32;
- public static final int DEFAULT_OUTBOUND_BURST_BANDWIDTH = 32;
+ public static final int DEFAULT_INBOUND_BURST_BANDWIDTH = 16;
+ public static final int DEFAULT_OUTBOUND_BURST_BANDWIDTH = 16;
public static final int DEFAULT_BURST_SECONDS = 60;
- /** For now, until there is some tuning and safe throttling, we set the floor at 6KBps inbound */
- public static final int MIN_INBOUND_BANDWIDTH = 5;
- /** For now, until there is some tuning and safe throttling, we set the floor at 6KBps outbound */
- public static final int MIN_OUTBOUND_BANDWIDTH = 5;
- /** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
- public static final int MIN_INBOUND_BANDWIDTH_PEAK = 10;
- /** For now, until there is some tuning and safe throttling, we set the floor at a 10 second burst */
- public static final int MIN_OUTBOUND_BANDWIDTH_PEAK = 10;
+ /** For now, until there is some tuning and safe throttling, we set the floor at 3KBps inbound */
+ public static final int MIN_INBOUND_BANDWIDTH = 3;
+ /** For now, until there is some tuning and safe throttling, we set the floor at 3KBps outbound */
+ public static final int MIN_OUTBOUND_BANDWIDTH = 3;
+ /** For now, until there is some tuning and safe throttling, we set the floor at a 3KBps during burst */
+ public static final int MIN_INBOUND_BANDWIDTH_PEAK = 3;
+ /** For now, until there is some tuning and safe throttling, we set the floor at a 3KBps during burst */
+ public static final int MIN_OUTBOUND_BANDWIDTH_PEAK = 3;
/**
* how often we replenish the queues.
diff --git a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
index 81865c24f5..c0dde851a5 100644
--- a/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
+++ b/router/java/src/net/i2p/router/transport/udp/UDPTransport.java
@@ -136,6 +136,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
_introManager = new IntroductionManager(_context, this);
_introducersSelectedOn = -1;
+ _context.statManager().createRateStat("udp.alreadyConnected", "What is the lifetime of a reestablished session", "udp", new long[] { 60*1000, 10*60*1000, 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.droppedPeer", "How long ago did we receive from a dropped peer (duration == session lifetime", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.droppedPeerInactive", "How long ago did we receive from a dropped peer (duration == session lifetime)", "udp", new long[] { 60*60*1000, 24*60*60*1000 });
_context.statManager().createRateStat("udp.statusOK", "How many times the peer test returned OK", "udp", new long[] { 5*60*1000, 20*60*1000, 60*60*1000, 24*60*60*1000 });
@@ -862,7 +863,7 @@ public class UDPTransport extends TransportImpl implements TimedWeightedPriority
long offsetTotal = 0;
StringBuffer buf = new StringBuffer(512);
- buf.append("UDP connections: ").append(peers.size()).append(" \n");
+ buf.append("UDP connections: ").append(peers.size()).append(" \n");
buf.append("