diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java index a47d361a668e0e0495eaaebfec4f1578614d8606..1c79e97d1c1a3b0e0920f81f51c5522bfb50256c 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -73,6 +73,8 @@ public class Connection { private int _randomWait; private long _lifetimeBytesSent; + /** TBD for tcpdump-compatible ack output */ + private long _lowestBytesAckedThrough; private long _lifetimeBytesReceived; private long _lifetimeDupMessageSent; private long _lifetimeDupMessageReceived; @@ -742,7 +744,9 @@ public class Connection { public long getCongestionWindowEnd() { return _congestionWindowEnd; } public void setCongestionWindowEnd(long endMsg) { _congestionWindowEnd = endMsg; } + /** @return the highest outbound packet we have recieved an ack for */ public long getHighestAckedThrough() { return _highestAckedThrough; } + /** @deprecated unused */ public void setHighestAckedThrough(long msgNum) { _highestAckedThrough = msgNum; } public long getLastActivityOn() { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java index 382c984d9af303d0d58eb31b3ef0a46919cfda5a..9e5e5c9635fa2f6a6b6d45abaed15d54e5cfe365 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -175,6 +175,9 @@ public class ConnectionHandler { // between here and PacketHandler, causing the packet to loop forever.... _manager.getPacketHandler().receivePacketDirect(packet, false); } else { + // log it here, just before we kill it - dest will be unknown + ((PacketLocal)packet).logTCPDump(true); + // goodbye if (_log.shouldLog(Log.WARN)) _log.warn("Did not find con for queued non-syn packet, dropping: " + packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java index af44c41f24b1f0d02552f9241b5700732bb3a4f4..19308814806116d7474cae93fd234cbb69bdd1e9 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -176,6 +176,9 @@ public class ConnectionManager { } con.setReceiveStreamId(receiveId); + // finally, we know enough that we can log the packet with the conn filled in + ((PacketLocal)synPacket).setConnection(con); + ((PacketLocal)synPacket).logTCPDump(true); try { con.getPacketHandler().receivePacket(synPacket, con); } catch (I2PException ie) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java index 98165cf7de165a0b21bb518d12f191f532689caa..3a74a49848425a229cdb26b91d56754e68f35765 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -45,7 +45,9 @@ public class MessageHandler implements I2PSessionListener { return; } if (data == null) return; - Packet packet = new Packet(); + //Packet packet = new Packet(); + // for tcpdump + Packet packet = new PacketLocal(_context, null); try { packet.readPacket(data, 0, data.length); _manager.getPacketHandler().receivePacket(packet); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 2a6ae59ad1b4dcf6dc2224b94a450f5b7290a270..e077d2f194a8980b7e6c8bf6cfb3fdcb7c5c5ccf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -42,7 +42,7 @@ import net.i2p.util.Log; * <li>{@link #FLAG_MAX_PACKET_SIZE_INCLUDED}: 2 byte integer</li> * <li>{@link #FLAG_PROFILE_INTERACTIVE}: no option data</li> * <li>{@link #FLAG_ECHO}: no option data</li> - * <li>{@link #FLAG_NO_ACK}: no option data</li> + * <li>{@link #FLAG_NO_ACK}: no option data - this appears to be unused, we always ack, even for the first packet</li> * </ol> * * <p>If the signature is included, it uses the Destination's DSA key diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java index 19e62db0c9d74e795c1e48eac2c988c079da1fbb..a8233a26b162cc1259b95f509a496b9ddcfed26d 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -107,6 +107,8 @@ public class PacketHandler { receiveUnknownCon(packet, sendId, queueIfNoConn); displayPacket(packet, "UNKN", null); } + // Don't log here, wait until we have the conn to make the dumps easier to follow + //((PacketLocal)packet).logTCPDump(true); } private static final SimpleDateFormat _fmt = new SimpleDateFormat("HH:mm:ss.SSS"); @@ -127,6 +129,9 @@ public class PacketHandler { } private void receiveKnownCon(Connection con, Packet packet) { + // is this ok here or does it need to be below each packetHandler().receivePacket() ? + ((PacketLocal)packet).setConnection(con); + ((PacketLocal)packet).logTCPDump(true); if (packet.isFlagSet(Packet.FLAG_ECHO)) { if (packet.getSendStreamId() > 0) { if (con.getOptions().getAnswerPings()) @@ -266,8 +271,13 @@ public class PacketHandler { } if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + // logTCPDump() will be called in ConnectionManager.receiveConnection(), + // which is called by ConnectionHandler.receiveNewSyn(), + // after we have a new conn, which makes the logging better. _manager.getConnectionHandler().receiveNewSyn(packet); } else if (queueIfNoConn) { + // don't call logTCPDump() here, wait for it to find a conn + // We can get here on the 2nd+ packet if the 1st (SYN) packet // is still on the _synQueue in the ConnectionHandler, and // ConnectionManager.receiveConnection() hasn't run yet to put @@ -293,6 +303,8 @@ public class PacketHandler { //packet.releasePayload(); _manager.getConnectionHandler().receiveNewSyn(packet); } else { + // log it here, just before we kill it - dest will be unknown + ((PacketLocal)packet).logTCPDump(true); // don't queue again (infinite loop!) sendReset(packet); packet.releasePayload(); diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java index b1438a033ad63ec6c95e5a8483ce3014ddf92dbf..127f53131d122273e2eaacfa9fb6bbb07bad13cf 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -1,5 +1,6 @@ package net.i2p.client.streaming; +import java.io.IOException; import java.util.Set; import net.i2p.I2PAppContext; @@ -28,6 +29,9 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat private volatile int _nackCount; private volatile boolean _retransmitted; private SimpleTimer2.TimedEvent _resendEvent; + private static final Object initLock = new Object(); + private static boolean _initialized; + private static PcapWriter _pcapWriter; public PacketLocal(I2PAppContext ctx, Destination to) { this(ctx, to, null); @@ -42,6 +46,12 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat _cancelledOn = -1; _nackCount = 0; _retransmitted = false; + synchronized(initLock) { + if (!_initialized) { + initPcap(); + _initialized = true; + } + } } public Destination getTo() { return _to; } @@ -139,6 +149,8 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public int getNumSends() { return _numSends; } public long getLastSend() { return _lastSend; } public Connection getConnection() { return _connection; } + /** used to set the rcvd conn after the fact for incoming syn replies */ + public void setConnection(Connection con) { _connection = con; } public void incrementNACKs() { int cnt = ++_nackCount; @@ -242,4 +254,28 @@ public class PacketLocal extends Packet implements MessageOutputStream.WriteStat public boolean writeAccepted() { return _acceptedOn > 0 && _cancelledOn <= 0; } public boolean writeFailed() { return _cancelledOn > 0; } public boolean writeSuccessful() { return _ackOn > 0 && _cancelledOn <= 0; } + + static final String PCAP = "foo.pcap"; + private void initPcap() { + try { + _pcapWriter = new PcapWriter(_context, PCAP); + } catch (IOException ioe) { + System.err.println("pcap init ioe: " + ioe); + } + } + + /** Generate a pcap/tcpdump-compatible format, + * so we can use standard debugging tools. + */ + public void logTCPDump(boolean isInbound) { + if (!_log.shouldLog(Log.INFO)) return; + _log.info(toString()); + if (_pcapWriter != null) { + try { + _pcapWriter.write(this, isInbound); + } catch (IOException ioe) { + _log.warn("pcap write ioe: " + ioe); + } + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java index 8a4692ada76444ae972c52861b776d2453f6b4f5..4b6c69230efa04746a50dfd72a3e80c53c18d2bd 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -150,6 +150,7 @@ public class PacketQueue { Connection c = packet.getConnection(); String suffix = (c != null ? "wsize " + c.getOptions().getWindowSize() + " rto " + c.getOptions().getRTO() : null); _connectionManager.getPacketHandler().displayPacket(packet, "SEND", suffix); + ((PacketLocal)packet).logTCPDump(false); } if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PcapWriter.java b/apps/streaming/java/src/net/i2p/client/streaming/PcapWriter.java new file mode 100644 index 0000000000000000000000000000000000000000..724fb1ee30d714797eb74902e1183dc79f9933de --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/PcapWriter.java @@ -0,0 +1,289 @@ +package net.i2p.client.streaming; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Hash; + +/** + * Write a standard pcap file with a "TCP" packet that can be analyzed with + * standard tools such as wireshark. + * + * The source and dest "IP" and "port" are fake but are generated from the + * hashes of the Destinations and stream ID's, so they will be consistent. + * The local "IP" will always be of the form 127.0.x.y + * Initial IP for a conn will be 127.0.0.0 for the local and 0.0.0.0 for the remote. + * + * Reference: http://wiki.wireshark.org/Development/LibpcapFileFormat + * + * The Jpcap library http://netresearch.ics.uci.edu/kfujii/jpcap/doc/ + * was close to what I want, but it requires you to instantiate a "captor" + * before you can write a file, and it requires a native lib to do so, + * and even then, it only wants to read the file, not write it. + * + * We even calculate a correct TCP header checksum to keep the tools happy. + * We don't, however, convert I2P-style sequence numbers, which count packets, + * to TCP-style byte counts. We don't track a lowest-acked-thru byte count atm, really. + * + * We do represent the window size in bytes though, so that's real confusing. + * + * This is designed to debug the streaming lib, but there are not log calls for every + * single packet - pings and pongs, and various odd cases where received packets + * are dropped, are not logged. + * + * Yes we could dump it natively and write a wireshark dissector. That sounds hard. + * And we wouldn't get the TCP stream analysis built into the tools. + * + * @author zzz + */ +public class PcapWriter { + + /** big-endian, see file format ref - 24 bytes */ + private static final byte[] FILE_HEADER = { (byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4, + 0, 2, 0, 4, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, (byte) 0xff, (byte) 0xff, 0, 0, 0, 1 }; + + /** dummy macs and ethertype */ + private static final byte[] MAC_HEADER = { 1, 2, 3, 4, 5, 6, + 1, 2, 3, 4, 5, 6, + (byte) 0x80, 0 }; + private static final byte[] IP_HEADER_1 = { 0x45, 0 }; // the length goes after this + private static final byte[] IP_HEADER_2 = { 0x12, 0x34, 0x40, 0, 64, 6 }; // ID, flags, TTL and TCP + private static final byte[] UNK_IP = { (byte) 0xff, 0, 0, 0}; + private static final byte[] MY_UNK_IP = {127, 0, 0, 0}; + /** max # of streaming lib payload bytes to dump */ + private static final int MAX_PAYLOAD_BYTES = 10; + + private FileOutputStream _fos; + private I2PAppContext _context; + + public PcapWriter(I2PAppContext ctx, String file) throws IOException { + _context = ctx; + File f = new File(ctx.getLogDir(), file); + //if (f.exists()) { + // _fos = new FileOutputStream(f, true); + //} else { + _fos = new FileOutputStream(f); + _fos.write(FILE_HEADER); + //} + } + + public void close() { + FileOutputStream fos = _fos; + if (fos != null) { + try { + fos.close(); + } catch (IOException ioe) {} + _fos = null; + } + } + + public void write(PacketLocal pkt, boolean isInbound) throws IOException { + try { + wrt(pkt, isInbound); + } catch (DataFormatException dfe) { + dfe.printStackTrace(); + throw new IOException(dfe.toString()); + } + // remove me + _fos.flush(); + } + + private synchronized void wrt(PacketLocal pkt, boolean isInbound) throws IOException, DataFormatException { + FileOutputStream fos = _fos; + if (fos == null) + throw new IOException("Not open or already closed"); + Connection con = pkt.getConnection(); + int includeLen = Math.min(MAX_PAYLOAD_BYTES, pkt.getPayloadSize()); + + // PCAP Header + long now; + if (isInbound) + now = pkt.getCreatedOn(); + else + now = pkt.getLastSend(); + DataHelper.writeLong(fos, 4, now / 1000); + DataHelper.writeLong(fos, 4, 1000 * (now % 1000)); + DataHelper.writeLong(fos, 4, 54 + includeLen); // 14 MAC + 20 IP + 20 TCP + DataHelper.writeLong(fos, 4, 58 + pkt.getPayloadSize()); // 54 + MAC checksum + + // MAC Header 14 bytes + fos.write(MAC_HEADER); + + // IP 20 bytes total + // IP Header 12 bytes + int length = 20 + 20 + pkt.getPayloadSize(); + fos.write(IP_HEADER_1); + DataHelper.writeLong(fos, 2, length); // total IP length + fos.write(IP_HEADER_2); + + // src and dst IP 8 bytes + // make our side always start with 127.0.x.x + byte[] srcAddr, dstAddr; + if (isInbound) { + if (con != null) { + dstAddr = new byte[4]; + dstAddr[0] = 127; + dstAddr[1] = 0; + System.arraycopy(con.getSession().getMyDestination().calculateHash().getData(), 0, dstAddr, 2, 2); + } else + dstAddr = MY_UNK_IP; + + if (con != null && con.getRemotePeer() != null) + srcAddr = con.getRemotePeer().calculateHash().getData(); + else if (pkt.getOptionalFrom() != null) + srcAddr = pkt.getOptionalFrom().calculateHash().getData(); + else + srcAddr = UNK_IP; + } else { + if (con != null) { + srcAddr = new byte[4]; + srcAddr[0] = 127; + srcAddr[1] = 0; + System.arraycopy(con.getSession().getMyDestination().calculateHash().getData(), 0, srcAddr, 2, 2); + } else + srcAddr = MY_UNK_IP; + + if (con != null && con.getRemotePeer() != null) + dstAddr = con.getRemotePeer().calculateHash().getData(); + else + dstAddr = UNK_IP; + } + + // calculate and output the correct IP header checksum to keep the analyzers happy + int checksum = length; + checksum = update(checksum, IP_HEADER_1); + checksum = update(checksum, IP_HEADER_2); + checksum = update(checksum, srcAddr, 4); + checksum = update(checksum, dstAddr, 4); + DataHelper.writeLong(fos, 2, checksum ^ 0xffff); + + // IPs + fos.write(srcAddr, 0, 4); + fos.write(dstAddr, 0, 4); + + // TCP header 20 bytes total + // src and dst port 4 bytes + // the rcv ID is the source, and the send ID is the dest. + DataHelper.writeLong(fos, 2, pkt.getReceiveStreamId() & 0xffff); + DataHelper.writeLong(fos, 2, pkt.getSendStreamId() & 0xffff); + + // seq and acks 8 bytes + long seq; + // wireshark wants the seq # in a SYN packet to be one less than the first data packet, + // so let's set it to 0. ??????????? + if (pkt.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + seq = 0xffffffff; + else + seq = pkt.getSequenceNum(); + long acked = 0; + if (con != null) { + if (isInbound) + acked = getLowestAckedThrough(pkt, con); + else + acked = getLowestAckedThrough(pkt, con); + } + DataHelper.writeLong(fos, 4, pkt.getSequenceNum()); + DataHelper.writeLong(fos, 4, acked); + + // offset and flags 2 bytes + int flags = 0; + if (pkt.isFlagSet(Packet.FLAG_CLOSE)) + flags |= 0x01; + if (pkt.isFlagSet(Packet.FLAG_SYNCHRONIZE)) + flags |= 0x02; + if (pkt.isFlagSet(Packet.FLAG_RESET)) + flags |= 0x04; + if (!pkt.isFlagSet(Packet.FLAG_NO_ACK)) + flags |= 0x10; + // delay request -> ECE, not a perfect match, but ok for now + if (pkt.isFlagSet(Packet.FLAG_DELAY_REQUESTED)) + flags |= 0x40; + //if (pkt.isFlagSet(FLAG_DELAY_REQUESTED)) + // foo; + DataHelper.writeLong(fos, 1, 0x50); // 5 32-byte words + DataHelper.writeLong(fos, 1, flags); + + // window size 2 bytes + long window = ConnectionOptions.INITIAL_WINDOW_SIZE; + long msgSize = ConnectionOptions.DEFAULT_MAX_MESSAGE_SIZE; + if (con != null) { + if (isInbound) { + // try to represent what he thinks the window is, we don't really know + // this isn't really right, the lastsendid can get way ahead + window = acked + con.getOptions().getWindowSize() - con.getLastSendId(); + } else { + // following is from ConnectionPacketHandler + long ready = con.getInputStream().getHighestReadyBockId(); + int available = con.getOptions().getInboundBufferSize() - con.getInputStream().getTotalReadySize(); + int allowedBlocks = available/con.getOptions().getMaxMessageSize(); + window = (ready + allowedBlocks) - pkt.getSequenceNum(); + } + if (window < 0) + window = 0; + msgSize = con.getOptions().getMaxMessageSize(); + } + // messages -> bytes + window *= msgSize; + // for now we don't spoof window scaling + if (window > 65535) + window = 65535; + DataHelper.writeLong(fos, 2, window); + + // checksum and urgent pointer 4 bytes + DataHelper.writeLong(fos, 4, 0); + + // some data + if (includeLen > 0) + fos.write(pkt.getPayload().getData(), 0, includeLen); + } + + /** + * copied from Connection.ackPackets() + * + * This is really nasty, but if the packet has an ACK, then we + * find the lowest NACK, and we are acked thru the lowest - 1. + * + * If there is no ACK, then we could use the conn's highest acked through, + * for an inbound packet (containing acks for outbound packets) + * But it appears that all packets have ACKs, as FLAG_NO_ACK is never set. + * + * To do: Add the SACK option to the TCP header. + */ + private static long getLowestAckedThrough(PacketLocal pkt, Connection con) { + long nacks[] = pkt.getNacks(); + long lowest = pkt.getAckThrough(); // can return -1 but we increment below + if (nacks != null) { + for (int i = 0; i < nacks.length; i++) { + if (nacks[i] - 1 < lowest) + lowest = nacks[i] - 1; + } + } + // I2P ack is of current seq number; TCP is next expected seq number + // should be >= 0 now + lowest++; + // just in case + return Math.max(0, lowest); + } + + /** one's complement 2-byte checksum update */ + private static int update(int checksum, byte[] b) { + return update(checksum, b, b.length); + } + + private static int update(int checksum, byte[] b, int len) { + int rv = checksum; + for (int i = 0; i < len; i += 2) { + rv += ((b[i] << 8) & 0xff00) | (b[i+1] & 0xff); + if (rv > 0xffff) { + rv &= 0xffff; + rv++; + } + } + return rv; + } +} diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index c0533b1fff24a466102abad0cbec70342c3facc3..54efdc1a894ac482db9cf66c2426afe015c17c7b 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -271,7 +271,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { _demultiplexer.messageAvailable(I2PSessionMuxedImpl.this, msg.id, msg.size, msg.proto, msg.fromPort, msg.toPort); } catch (Exception e) { - _log.error("Error notifying app of message availability"); + _log.error("Error notifying app of message availability", e); } } } diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 3e5e8985dd5771e110f3b6c0871a03c61fb5e850..ffb1da55c82beaf475f94b497edc26bb8c67587b 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -20,7 +20,7 @@ public class RouterVersion { public final static String VERSION = CoreVersion.VERSION; public final static long BUILD = 12; /** for example "-test" */ - public final static String EXTRA = ""; + public final static String EXTRA = "-pcap"; public final static String FULL_VERSION = VERSION + "-" + BUILD + EXTRA; public static void main(String args[]) { System.out.println("I2P Router version: " + FULL_VERSION);