diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index 41de16ec43250ff72168957c07fe3426cbd46695..6a4b02b46de739bdf66956f6639bc1a15d24e0d3 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -93,7 +93,7 @@ public class Connection { * @return true if the packet should be sent */ boolean packetSendChoke() { - //if (true) return true; + if (true) return true; long writeExpire = _options.getWriteTimeout(); if (writeExpire > 0) writeExpire += _context.clock().now(); @@ -151,30 +151,6 @@ public class Connection { _outboundQueue.enqueue(packet); } - /* - void flushPackets() { - List toSend = null; - synchronized (_outboundPackets) { - for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { - PacketLocal packet = (PacketLocal)iter.next(); - long nextExpected = _options.getResendDelay() << packet.getNumSends(); - if (packet.getLastSend() + nextExpected <= _context.clock().now()) { - // we need to resend - if (toSend == null) toSend = new ArrayList(1); - toSend.add(packet); - } - } - } - - if (toSend != null) { - for (int i = 0; i < toSend.size(); i++) { - PacketLocal packet = (PacketLocal)toSend.get(i); - _lastSendTime = _context.clock().now(); - _outboundQueue.enqueue(packet); - } - } - } - */ List ackPackets(long ackThrough, long nacks[]) { List acked = null; synchronized (_outboundPackets) { @@ -362,6 +338,15 @@ public class Connection { resend = true; } if ( (resend) && (_packet.getAckTime() < 0) ) { + // revamp various fields, in case we need to ack more, etc + _packet.setAckThrough(getInputStream().getHighestBlockId()); + _packet.setNacks(getInputStream().getNacks()); + _packet.setOptionalDelay(getOptions().getChoke()); + _packet.setOptionalMaxSize(getOptions().getMaxMessageSize()); + _packet.setResendDelay(getOptions().getResendDelay()); + _packet.setReceiveStreamId(_receiveStreamId); + _packet.setSendStreamId(_sendStreamId); + if (_log.shouldLog(Log.DEBUG)) _log.debug("Resend packet " + _packet + " on " + Connection.this); _outboundQueue.enqueue(_packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java index 4973e028026587b7aedc4509b6dbd60ccb9aca5b..e4def6c9db0a59bfeb5c8e446f39f925ecec95e5 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -35,16 +35,19 @@ public class ConnectionPacketHandler { if (nextTime <= 0) { con.setNextSendTime(con.getOptions().getSendAckDelay() + _context.clock().now()); if (_log.shouldLog(Log.DEBUG)) - _log.debug("Scheduling ack in " + con.getOptions().getSendAckDelay() + "ms for received packet " + packet); + _log.error("Scheduling ack in " + con.getOptions().getSendAckDelay() + "ms for received packet " + packet); } else { if (_log.shouldLog(Log.DEBUG)) - _log.debug("Ack is already scheduled in " + nextTime + "ms, though we just received " + packet); + _log.debug("Ack is already scheduled in " + (nextTime-_context.clock().now()) + + "ms, though we just received " + packet); } } else { if (packet.getSequenceNum() > 0) { // take note of congestion con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); + if (_log.shouldLog(Log.WARN)) + _log.warn("congestion.. dup " + packet); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("ACK only packet received: " + packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 53d8710261cec97b7a2b93085a708efbb504faf6..080ab6917fa5e8420f593e51288f969e2f156296 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -95,7 +95,7 @@ public class MessageInputStream extends InputStream { if (_notYetReadyBlocks.containsKey(l)) { // ACK } else { - if (ids != null) + if (ids == null) ids = new ArrayList(4); ids.add(l); } @@ -210,8 +210,8 @@ public class MessageInputStream extends InputStream { expiration = _readTimeout + System.currentTimeMillis(); synchronized (_dataLock) { while (_readyDataBlocks.size() <= 0) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString()); if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { if (_log.shouldLog(Log.DEBUG)) @@ -244,8 +244,8 @@ public class MessageInputStream extends InputStream { } } - if (_log.shouldLog(Log.DEBUG)) - _log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString()); + //if (_log.shouldLog(Log.DEBUG)) + // _log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString()); // either was already ready, or we wait()ed and it arrived ByteArray cur = (ByteArray)_readyDataBlocks.get(0); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index dbc58556125f6d7a3985db505293c9cf65c16694..8229abd881048b80066f855cf4ff13fd68c20991 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -20,7 +20,7 @@ public class MessageOutputStream extends OutputStream { private boolean _closed; public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { - this(ctx, receiver, 64*1024); + this(ctx, receiver, Packet.MAX_PAYLOAD_SIZE); } public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) { super(); @@ -37,31 +37,38 @@ public class MessageOutputStream extends OutputStream { } public void write(byte b[], int off, int len) throws IOException { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("write(b[], " + off + ", " + len + ")"); synchronized (_dataLock) { + int cur = off; int remaining = len; while (remaining > 0) { if (_valid + remaining < _buf.length) { // simply buffer the data, no flush - System.arraycopy(b, off, _buf, _valid, remaining); + System.arraycopy(b, cur, _buf, _valid, remaining); _valid += remaining; + cur += remaining; remaining = 0; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("write(...): appending valid = " + _valid + " remaining=" + remaining); } else { // buffer whatever we can fit then flush, // repeating until we've pushed all of the // data through int toWrite = _buf.length - _valid; - System.arraycopy(b, off, _buf, _valid, toWrite); + System.arraycopy(b, cur, _buf, _valid, toWrite); remaining -= toWrite; + cur += toWrite; _valid = _buf.length; if (_log.shouldLog(Log.DEBUG)) - _log.debug("write(b[], " + off + ", " + len + "): valid = " + _valid); + _log.debug("write(...): flushing valid = " + _valid + " remaining=" + remaining); // this blocks until the packet is ack window is open. it // also throws InterruptedIOException if the write timeout // expires _dataReceiver.writeData(_buf, 0, _valid); if (_log.shouldLog(Log.DEBUG)) - _log.debug("write(b[], " + off + ", " + len + "): valid = " + _valid + " complete"); - _valid = 0; + _log.debug("write(...): flushing complete valid = " + _valid + " remaining=" + remaining); + _valid = 0; throwAnyError(); } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 8a89b82bb44ab0a72b73ab3d63deb26d4321c8ff..33580119613711af40de9586995eba83e89e7265 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -131,7 +131,7 @@ public class Packet { * ping reply (if receiveStreamId is set). */ public static final int FLAG_ECHO = (1 << 9); - + public static final int DEFAULT_MAX_SIZE = 32*1024; /** what stream is this packet a part of? */ @@ -195,9 +195,15 @@ public class Packet { public int getResendDelay() { return _resendDelay; } public void setResendDelay(int numSeconds) { _resendDelay = numSeconds; } + public static final int MAX_PAYLOAD_SIZE = 32*1024; + /** get the actual payload of the message. may be null */ public byte[] getPayload() { return _payload; } - public void setPayload(byte payload[]) { _payload = payload; } + public void setPayload(byte payload[]) { + _payload = payload; + if ( (payload != null) && (payload.length > MAX_PAYLOAD_SIZE) ) + throw new IllegalArgumentException("Too large payload: " + payload.length); + } /** is a particular flag set on this packet? */ public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } @@ -323,7 +329,12 @@ public class Packet { } if (_payload != null) { - System.arraycopy(_payload, 0, buffer, cur, _payload.length); + try { + System.arraycopy(_payload, 0, buffer, cur, _payload.length); + } catch (ArrayIndexOutOfBoundsException aioobe) { + System.err.println("payload.length: " + _payload.length + " buffer.length: " + buffer.length + " cur: " + cur); + throw aioobe; + } cur += _payload.length; } @@ -411,6 +422,8 @@ public class Packet { // skip ahead to the payload _payload = new byte[offset + length - payloadBegin]; + if (_payload.length > MAX_PAYLOAD_SIZE) + throw new IllegalArgumentException("length: " + length + " offset: " + offset + " begin: " + payloadBegin); System.arraycopy(buffer, payloadBegin, _payload, 0, _payload.length); // ok now lets go back and deal with the options diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 6d6954c1dab2dd4047e1ad41091bd758a7407439..c500fe9c22ab8cad7f59699f233cee435562fcae 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -93,15 +93,28 @@ public class PacketHandler { _log.warn("Syn packet reply on a stream we don't know about: " + packet); } } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet received on an unknown stream (and not a SYN): " + packet); + if (packet.getSendStreamId() == null) { + for (Iterator iter = _manager.listConnections().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId()) && + con.getAckedPackets() <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Received additional packets before the syn on " + con + ": " + packet); + receiveKnownCon(con, packet); + return; + } + } + } if (_log.shouldLog(Log.WARN)) { - _log.warn("Packet received on an unknown stream (and not a SYN): " + packet); StringBuffer buf = new StringBuffer(128); Set cons = _manager.listConnections(); for (Iterator iter = cons.iterator(); iter.hasNext(); ) { Connection con = (Connection)iter.next(); buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); } - _log.warn("Other streams: " + buf.toString()); + _log.warn("Packet belongs to know other cons: " + packet + " connections: " + buf.toString()); } } } diff --git a/apps/streaming/java/test/net/i2p/client/streaming/EchoLargeTest.java b/apps/streaming/java/test/net/i2p/client/streaming/EchoLargeTest.java new file mode 100644 index 0000000000000000000000000000000000000000..5ff344daae155e207a2093c1e3fdeae45f9e035e --- /dev/null +++ b/apps/streaming/java/test/net/i2p/client/streaming/EchoLargeTest.java @@ -0,0 +1,202 @@ +package net.i2p.client.streaming; + +import java.io.InputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PClient; +import net.i2p.client.I2PClientFactory; +import net.i2p.client.I2PSession; +import net.i2p.data.Base64; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * + */ +public class EchoLargeTest { + private Log _log; + private I2PSession _client; + private I2PSession _server; + + public void test() { + try { + I2PAppContext context = I2PAppContext.getGlobalContext(); + _log = context.logManager().getLog(ConnectTest.class); + _log.debug("creating server session"); + _server = createSession(); + _log.debug("running server"); + runServer(context, _server); + _log.debug("creating client session"); + _client = createSession(); + _log.debug("running client"); + runClient(context, _client); + } catch (Exception e) { + _log.error("error running", e); + } + try { Thread.sleep(300*1000); } catch (Exception e) {} + } + + private void runClient(I2PAppContext ctx, I2PSession session) { + Thread t = new Thread(new ClientRunner(ctx, session)); + t.setName("client"); + t.setDaemon(true); + t.start(); + } + + private void runServer(I2PAppContext ctx, I2PSession session) { + Thread t = new Thread(new ServerRunner(ctx, session)); + t.setName("server"); + t.setDaemon(true); + t.start(); + } + + private class ServerRunner implements Runnable { + private I2PAppContext _context; + private I2PSession _session; + private Log _log; + public ServerRunner(I2PAppContext ctx, I2PSession session) { + _context = ctx; + _session = session; + _log = ctx.logManager().getLog(ServerRunner.class); + } + + public void run() { + try { + Properties opts = new Properties(); + I2PSocketManager mgr = new I2PSocketManagerFull(_context, _session, opts, "client"); + _log.debug("manager created"); + I2PServerSocket ssocket = mgr.getServerSocket(); + _log.debug("server socket created"); + while (true) { + I2PSocket socket = ssocket.accept(); + _log.debug("socket accepted: " + socket); + InputStream in = socket.getInputStream(); + OutputStream out = socket.getOutputStream(); + _log.debug("server streams built"); + byte buf[] = new byte[128*1024]; + while (buf != null) { + for (int i = 0; i < buf.length; i++) { + int c = in.read(); + if (c == -1) { + buf = null; + break; + } else { + buf[i] = (byte)(c & 0xFF); + } + } + if (buf != null) { + _log.debug("* server read the full buffer"); + out.write(buf); + out.flush(); + } + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closing the received server socket"); + socket.close(); + } + } catch (Exception e) { + _log.error("error running", e); + } + } + + } + + private class ClientRunner implements Runnable { + private I2PAppContext _context; + private I2PSession _session; + private Log _log; + public ClientRunner(I2PAppContext ctx, I2PSession session) { + _context = ctx; + _session = session; + _log = ctx.logManager().getLog(ClientRunner.class); + } + + public void run() { + try { + Properties opts = new Properties(); + I2PSocketManager mgr = new I2PSocketManagerFull(_context, _session, opts, "client"); + _log.debug("manager created"); + I2PSocket socket = mgr.connect(_server.getMyDestination()); + _log.debug("socket created"); + InputStream in = socket.getInputStream(); + OutputStream out = socket.getOutputStream(); + for (int i = 0; i < 3; i++) { + byte buf[] = new byte[128*1024]; + _context.random().nextBytes(buf); + byte orig[] = new byte[buf.length]; + System.arraycopy(buf, 0, orig, 0, buf.length); + out.write(buf); + _log.debug("client wrote a buffer"); + out.flush(); + _log.debug("client flushed"); + + byte rbuf[] = new byte[buf.length]; + for (int j = 0; j < buf.length; j++) { + int c = in.read(); + if (c == -1) { + buf = null; + break; + } else { + //_log.debug("client read: " + ((char)c)); + if (c < 0) c += 256; + rbuf[j] = (byte)(c & 0xFF); + } + } + if (buf != null) { + _log.debug("* client read a full buffer"); + int firstOff = -1; + for (int k = 0; k < orig.length; k++) { + if (orig[k] != rbuf[k]) { + firstOff = k; + break; + } + } + if (firstOff < 0) { + System.out.println("** Read match"); + } else { + System.out.println("** Read does not match: first off = " + firstOff); + _log.error("read does not match (first off = " + firstOff + "): \n" + + Base64.encode(orig) + "\n" + + Base64.encode(rbuf)); + } + } + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closing the client socket"); + socket.close(); + _log.debug("socket closed"); + + Thread.sleep(5*1000); + System.exit(0); + } catch (Exception e) { + _log.error("error running", e); + } + } + + } + + private I2PSession createSession() { + try { + I2PClient client = I2PClientFactory.createClient(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + Destination dest = client.createDestination(baos); + I2PSession sess = client.createSession(new ByteArrayInputStream(baos.toByteArray()), new Properties()); + sess.connect(); + return sess; + } catch (Exception e) { + _log.error("error running", e); + throw new RuntimeException("b0rk b0rk b0rk"); + } + } + + public static void main(String args[]) { + EchoLargeTest et = new EchoLargeTest(); + et.test(); + } +} diff --git a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java index 73f40d19c987a65656729763699e670b6fd3ff2f..c1d0b2145b265ceec1505e4cdf8424b139b902b2 100644 --- a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java +++ b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java @@ -22,17 +22,17 @@ public class MessageInputStreamTest { } public void testInOrder() { - byte orig[] = new byte[32*1024]; + byte orig[] = new byte[256*1024]; _context.random().nextBytes(orig); MessageInputStream in = new MessageInputStream(_context); - for (int i = 0; i < 32; i++) { + for (int i = 0; i < orig.length / 1024; i++) { byte msg[] = new byte[1024]; System.arraycopy(orig, i*1024, msg, 0, 1024); in.messageReceived(i, msg); } - byte read[] = new byte[32*1024]; + byte read[] = new byte[orig.length]; try { int howMany = DataHelper.read(in, read); if (howMany != orig.length) @@ -47,15 +47,15 @@ public class MessageInputStreamTest { } public void testRandomOrder() { - byte orig[] = new byte[32*1024]; + byte orig[] = new byte[256*1024]; _context.random().nextBytes(orig); MessageInputStream in = new MessageInputStream(_context); ArrayList order = new ArrayList(32); - for (int i = 0; i < 32; i++) + for (int i = 0; i < orig.length / 1024; i++) order.add(new Integer(i)); Collections.shuffle(order); - for (int i = 0; i < 32; i++) { + for (int i = 0; i < orig.length / 1024; i++) { byte msg[] = new byte[1024]; Integer cur = (Integer)order.get(i); System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024); @@ -63,7 +63,7 @@ public class MessageInputStreamTest { _log.debug("Injecting " + cur); } - byte read[] = new byte[32*1024]; + byte read[] = new byte[orig.length]; try { int howMany = DataHelper.read(in, read); if (howMany != orig.length) @@ -77,11 +77,45 @@ public class MessageInputStreamTest { } } + public void testRandomDups() { + byte orig[] = new byte[256*1024]; + _context.random().nextBytes(orig); + + MessageInputStream in = new MessageInputStream(_context); + for (int n = 0; n < 3; n++) { + ArrayList order = new ArrayList(32); + for (int i = 0; i < orig.length / 1024; i++) + order.add(new Integer(i)); + Collections.shuffle(order); + for (int i = 0; i < orig.length / 1024; i++) { + byte msg[] = new byte[1024]; + Integer cur = (Integer)order.get(i); + System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024); + in.messageReceived(cur.intValue(), msg); + _log.debug("Injecting " + cur); + } + } + + byte read[] = new byte[orig.length]; + try { + int howMany = DataHelper.read(in, read); + if (howMany != orig.length) + throw new RuntimeException("Failed test: not enough bytes read [" + howMany + "]"); + if (!DataHelper.eq(orig, read)) + throw new RuntimeException("Failed test: data read is not equal"); + + _log.info("Passed test: random dups"); + } catch (IOException ioe) { + throw new RuntimeException("IOError reading: " + ioe.getMessage()); + } + } + public static void main(String args[]) { MessageInputStreamTest t = new MessageInputStreamTest(); try { t.testInOrder(); t.testRandomOrder(); + t.testRandomDups(); } catch (Exception e) { e.printStackTrace(); } diff --git a/apps/streaming/java/test/net/i2p/client/streaming/MessageOutputStreamTest.java b/apps/streaming/java/test/net/i2p/client/streaming/MessageOutputStreamTest.java new file mode 100644 index 0000000000000000000000000000000000000000..d3b48cd7933e83c04d6c43e4423c1843ce760e41 --- /dev/null +++ b/apps/streaming/java/test/net/i2p/client/streaming/MessageOutputStreamTest.java @@ -0,0 +1,66 @@ +package net.i2p.client.streaming; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +import net.i2p.I2PAppContext; +import net.i2p.data.Base64; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; + +/** + * + */ +public class MessageOutputStreamTest { + private I2PAppContext _context; + private Log _log; + + public MessageOutputStreamTest() { + _context = I2PAppContext.getGlobalContext(); + _log = _context.logManager().getLog(MessageOutputStreamTest.class); + } + + public void test() { + Receiver receiver = new Receiver(); + MessageOutputStream out = new MessageOutputStream(_context, receiver); + byte buf[] = new byte[128*1024]; + _context.random().nextBytes(buf); + try { + out.write(buf); + out.flush(); + } catch (IOException ioe) { ioe.printStackTrace(); } + byte read[] = receiver.getData(); + int firstOff = -1; + for (int k = 0; k < buf.length; k++) { + if (buf[k] != read[k]) { + firstOff = k; + break; + } + } + if (firstOff < 0) { + System.out.println("** Read match"); + } else { + System.out.println("** Read does not match: first off = " + firstOff); + _log.error("read does not match (first off = " + firstOff + "): \n" + + Base64.encode(buf) + "\n" + + Base64.encode(read)); + } + } + + private class Receiver implements MessageOutputStream.DataReceiver { + private ByteArrayOutputStream _data; + public Receiver() { + _data = new ByteArrayOutputStream(); + } + public void writeData(byte[] buf, int off, int size) throws IOException { + _data.write(buf, off, size); + } + public byte[] getData() { return _data.toByteArray(); } + } + + public static void main(String args[]) { + MessageOutputStreamTest t = new MessageOutputStreamTest(); + t.test(); + } +}