From f904b012e9e0ed1b329c9faab013c664ad422508 Mon Sep 17 00:00:00 2001
From: jrandom <jrandom>
Date: Sun, 17 Oct 2004 03:47:03 +0000
Subject: [PATCH] initial impl for the new streaming lib (saying this isn't
 done should be obvious, but the packet spec is at a save point)

---
 .../client/streaming/MessageInputStream.java  | 216 ++++++++++
 .../client/streaming/MessageOutputStream.java |  95 +++++
 .../src/net/i2p/client/streaming/Packet.java  | 389 ++++++++++++++++++
 .../streaming/MessageInputStreamTest.java     |  90 ++++
 4 files changed, 790 insertions(+)
 create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
 create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
 create mode 100644 apps/streaming/java/src/net/i2p/client/streaming/Packet.java
 create mode 100644 apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java

diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
new file mode 100644
index 0000000000..1e952f20be
--- /dev/null
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java
@@ -0,0 +1,216 @@
+package net.i2p.client.streaming;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InterruptedIOException;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import net.i2p.data.ByteArray;
+
+/**
+ * Stream that can be given messages out of order 
+ * yet present them in order.
+ *
+ */
+public class MessageInputStream extends InputStream {
+    /** 
+     * List of ByteArray objects of data ready to be read,
+     * with the first ByteArray at index 0, and the next
+     * actual byte to be read at _readyDataBlockIndex of 
+     * that array.
+     *
+     */
+    private List _readyDataBlocks;
+    private int _readyDataBlockIndex;
+    /** highest message ID used in the readyDataBlocks */
+    private long _highestReadyBlockId;
+    /** 
+     * Message ID (Long) to ByteArray for blocks received
+     * out of order when there are lower IDs not yet 
+     * received
+     */
+    private Map _notYetReadyBlocks;
+    /** 
+     * if we have received a flag saying there won't be later messages, EOF
+     * after we have cleared what we have received.
+     */
+    private boolean _closeReceived;
+    /** if we don't want any more data, ignore the data */
+    private boolean _locallyClosed;
+    private int _readTimeout;
+    
+    private Object _dataLock;
+    
+    public MessageInputStream() {
+        _readyDataBlocks = new ArrayList(4);
+        _readyDataBlockIndex = 0;
+        _highestReadyBlockId = -1;
+        _readTimeout = -1;
+        _notYetReadyBlocks = new HashMap(4);
+        _dataLock = new Object();
+        _closeReceived = false;
+        _locallyClosed = false;
+    }
+    
+    /** What is the highest block ID we've completely received through? */
+    public long getHighestReadyBockId() { 
+        synchronized (_dataLock) {
+            return _highestReadyBlockId; 
+        }
+    }
+    
+    /**
+     * Ascending list of block IDs greater than the highest
+     * ready block ID, or null if there aren't any.
+     *
+     */
+    public long[] getOutOfOrderBlocks() {
+        long blocks[] = null;
+        synchronized (_dataLock) {
+            int num = _notYetReadyBlocks.size();
+            if (num <= 0) return null;
+            blocks = new long[num];
+            int i = 0;
+            for (Iterator iter = _notYetReadyBlocks.keySet().iterator(); iter.hasNext(); ) {
+                Long id = (Long)iter.next();
+                blocks[i] = id.longValue();
+                i++;
+            }
+        }
+        Arrays.sort(blocks);
+        return blocks;
+    }
+    
+    /** how many blocks have we received that we still have holes before? */
+    public int getOutOfOrderBlockCount() { 
+        synchronized (_dataLock) { 
+            return _notYetReadyBlocks.size(); 
+        }
+    }
+  
+    /** 
+     * how long a read() call should block (if less than 0, block indefinitely,
+     * but if it is 0, do not block at all)
+     */
+    public int getReadTimeout() { return _readTimeout; }
+    public void setReadTimeout(int timeout) { _readTimeout = timeout; }
+    
+    /**
+     * A new message has arrived - toss it on the appropriate queue (moving 
+     * previously pending messages to the ready queue if it fills the gap, etc)
+     *
+     */
+    public void messageReceived(long messageId, byte payload[]) {
+        synchronized (_dataLock) {
+            if (messageId <= _highestReadyBlockId) return; // already received
+            if (_highestReadyBlockId + 1 == messageId) {
+                if (!_locallyClosed)
+                    _readyDataBlocks.add(new ByteArray(payload));
+                _highestReadyBlockId = messageId;
+                // now pull in any previously pending blocks
+                while (_notYetReadyBlocks.containsKey(new Long(_highestReadyBlockId + 1))) {
+                    _readyDataBlocks.add(_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1)));
+                    _highestReadyBlockId++;
+                }
+                _dataLock.notifyAll();
+            } else {
+                if (_locallyClosed) // dont need the payload, just the msgId in order
+                    _notYetReadyBlocks.put(new Long(messageId), new ByteArray(null));
+                else
+                    _notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload));
+            }
+        }
+    }
+    
+    public int read() throws IOException {
+        if (_locallyClosed) throw new IOException("Already locally closed");
+        synchronized (_dataLock) {
+            if (_readyDataBlocks.size() <= 0) {
+                if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) {
+                    return -1;
+                } else {
+                    if (_readTimeout < 0) {
+                        try { _dataLock.wait(); } catch (InterruptedException ie) { }
+                    } else if (_readTimeout > 0) {
+                        try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { }
+                    } else { // readTimeout == 0
+                        // noop, don't block
+                    }
+                    if (_readyDataBlocks.size() <= 0) {
+                        throw new InterruptedIOException("Timeout reading");
+                    }
+                }
+            }
+            
+            // either was already ready, or we wait()ed and it arrived
+            ByteArray cur = (ByteArray)_readyDataBlocks.get(0);
+            byte rv = cur.getData()[_readyDataBlockIndex++];
+            if (cur.getData().length <= _readyDataBlockIndex) {
+                _readyDataBlockIndex = 0;
+                _readyDataBlocks.remove(0);
+            }
+            return (rv < 0 ? rv + 256 : rv);
+        }
+    }
+    
+    public int available() throws IOException {
+        if (_locallyClosed) throw new IOException("Already closed, you wanker");
+        synchronized (_dataLock) {
+            if (_readyDataBlocks.size() <= 0) 
+                return 0;
+            int numBytes = 0;
+            for (int i = 0; i < _readyDataBlocks.size(); i++) {
+                ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
+                if (i == 0)
+                    numBytes += cur.getData().length - _readyDataBlockIndex;
+                else
+                    numBytes += cur.getData().length;
+            }
+            return numBytes;
+        }
+    }
+    
+    /**
+     * How many bytes are queued up for reading (or sitting in the out-of-order
+     * buffer)?
+     *
+     */
+    public int getTotalQueuedSize() {
+        synchronized (_dataLock) {
+            if (_locallyClosed) return 0;
+            int numBytes = 0;
+            for (int i = 0; i < _readyDataBlocks.size(); i++) {
+                ByteArray cur = (ByteArray)_readyDataBlocks.get(i);
+                if (i == 0)
+                    numBytes += cur.getData().length - _readyDataBlockIndex;
+                else
+                    numBytes += cur.getData().length;
+            }
+            for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
+                ByteArray cur = (ByteArray)iter.next();
+                numBytes += cur.getData().length;
+            }
+            return numBytes;
+        }
+    }
+    
+    public void close() {
+        synchronized (_dataLock) {
+            _readyDataBlocks.clear();
+             
+            // we don't need the data, but we do need to keep track of the messageIds
+            // received, so we can ACK accordingly
+            for (Iterator iter = _notYetReadyBlocks.values().iterator(); iter.hasNext(); ) {
+                ByteArray ba = (ByteArray)iter.next();
+                ba.setData(null);
+            }
+            _locallyClosed = true;
+        }
+    }
+}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
new file mode 100644
index 0000000000..2e1e985475
--- /dev/null
+++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java
@@ -0,0 +1,95 @@
+package net.i2p.client.streaming;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ *
+ */
+public class MessageOutputStream extends OutputStream {
+    private byte _buf[];
+    private int _valid;
+    private Object _dataLock;
+    private DataReceiver _dataReceiver;
+    private IOException _streamError;
+    
+    public MessageOutputStream(DataReceiver receiver) {
+        this(receiver, 64*1024);
+    }
+    public MessageOutputStream(DataReceiver receiver, int bufSize) {
+        super();
+        _buf = new byte[bufSize];
+        _dataReceiver = receiver;
+        _dataLock = new Object();
+    }
+    
+    public void write(byte b[]) throws IOException {
+        write(b, 0, b.length);
+    }
+    
+    public void write(byte b[], int off, int len) throws IOException {
+        synchronized (_dataLock) {
+            int remaining = len;
+            while (remaining > 0) {
+                if (_valid + remaining < _buf.length) {
+                    // simply buffer the data, no flush
+                    System.arraycopy(b, off, _buf, _valid, remaining);
+                    remaining = 0;
+                } else {
+                    // buffer whatever we can fit then flush,
+                    // repeating until we've pushed all of the
+                    // data through
+                    int toWrite = _buf.length - _valid;
+                    System.arraycopy(b, off, _buf, _valid, toWrite);
+                    remaining -= toWrite;
+                    _valid = _buf.length;
+                    _dataReceiver.writeData(_buf, 0, _valid);
+                    _valid = 0;
+                    throwAnyError();
+                }
+            }
+        }
+        throwAnyError();
+    }
+    
+    public void write(int b) throws IOException {
+        write(new byte[] { (byte)b }, 0, 1);
+        throwAnyError();
+    }
+    
+    public void flush() throws IOException {
+        synchronized (_dataLock) {
+            _dataReceiver.writeData(_buf, 0, _valid);
+            _valid = 0;
+        }
+        throwAnyError();
+    }
+    
+    private void throwAnyError() throws IOException {
+        if (_streamError != null) {
+            IOException ioe = _streamError;
+            _streamError = null;
+            throw ioe;
+        }
+    }
+    
+    void streamErrorOccurred(IOException ioe) {
+        _streamError = ioe;
+    }
+    
+    /** 
+     * called whenever the engine wants to push more data to the
+     * peer
+     *
+     */
+    void flushAvailable(DataReceiver target) {
+        synchronized (_dataLock) {
+            target.writeData(_buf, 0, _valid);
+            _valid = 0;
+        }
+    }
+    
+    public interface DataReceiver {
+        public void writeData(byte buf[], int off, int size);
+    }
+}
diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
new file mode 100644
index 0000000000..5f598b7ebb
--- /dev/null
+++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java
@@ -0,0 +1,389 @@
+package net.i2p.client.streaming;
+
+import java.util.Arrays;
+import net.i2p.I2PAppContext;
+import net.i2p.data.DataHelper;
+import net.i2p.data.Destination;
+import net.i2p.data.Signature;
+import net.i2p.data.SigningPrivateKey;
+
+/**
+ * Contain a single packet transferred as part of a streaming connection.  
+ * The data format is as follows:<ul>
+ * <li>{@see #getSendStreamId sendStreamId} [4 byte value]</li>
+ * <li>{@see #getReceiveStreamId receiveStreamId} [4 byte value]</li>
+ * <li>{@see #getSequenceNum sequenceNum} [4 byte unsigned integer]</li>
+ * <li>{@see #getAckThrough ackThrough} [4 byte unsigned integer]</li>
+ * <li>number of NACKs [1 byte unsigned integer]</li>
+ * <li>that many {@see #getNacks NACKs}</li>
+ * <li>{@see #getResendDelay resendDelay} [1 byte integer]</li>
+ * <li>flags [2 byte value]</li>
+ * <li>option data size [2 byte integer]</li>
+ * <li>option data specified by those flags [0 or more bytes]</li>
+ * <li>payload [remaining packet size]</li>
+ * </ul>
+ *
+ * <p>The flags field above specifies some metadata about the packet, and in
+ * turn may require certain additional data to be included.  The flags are
+ * as follows (with any data structures specified added to the options area
+ * in the given order):</p><ol>
+ * <li>{@see #FLAG_SYNCHRONIZE}: no option data</li>
+ * <li>{@see #FLAG_CLOSE}: no option data</li>
+ * <li>{@see #FLAG_RESET}: no option data</li>
+ * <li>{@see #FLAG_SIGNATURE_INCLUDED}: {@see net.i2p.data.Signature}</li>
+ * <li>{@see #FLAG_SIGNATURE_REQUESTED}: no option data</li>
+ * <li>{@see #FLAG_FROM_INCLUDED}: {@see net.i2p.data.Destination}</li>
+ * <li>{@see #FLAG_DELAY_REQUESTED}: 1 byte integer</li>
+ * <li>{@see #FLAG_MAX_PACKET_SIZE_INCLUDED}: 2 byte integer</li>
+ * <li>{@see #FLAG_PROFILE_INTERACTIVE}: no option data</li>
+ * </ol>
+ *
+ * <p>If the signature is included, it uses the Destination's DSA key 
+ * to sign the entire header and payload with the space in the options 
+ * for the signature being set to all zeroes.</p>
+ *
+ */
+public class Packet {
+    private byte _sendStreamId[];
+    private byte _receiveStreamId[];
+    private long _sequenceNum;
+    private long _ackThrough;
+    private long _nacks[];
+    private int _resendDelay;
+    private int _flags;
+    private byte _payload[];
+    // the next four are set only if the flags say so
+    private Signature _optionSignature;
+    private Destination _optionFrom;
+    private int _optionDelay;
+    private int _optionMaxSize;
+    
+    /** 
+     * The receiveStreamId will be set to this when the packet doesn't know 
+     * what ID will be assigned by the remote peer (aka this is the initial
+     * synchronize packet)
+     *
+     */
+    public static final byte RECEIVE_STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 };
+    
+    /**
+     * This packet is creating a new socket connection (if the receiveStreamId
+     * is RECEIVE_STREAM_ID_UNKNOWN) or it is acknowledging a request to 
+     * create a connection and in turn is accepting the socket.
+     *
+     */
+    public static final int FLAG_SYNCHRONIZE = (1 << 0);
+    /**
+     * The sender of this packet will not be sending any more payload data.
+     */
+    public static final int FLAG_CLOSE = (1 << 1);
+    /**
+     * This packet is being sent to signify that the socket does not exist 
+     * (or, if in response to an initial synchronize packet, that the 
+     * connection was refused).
+     *
+     */
+    public static final int FLAG_RESET = (1 << 2);
+    /**
+     * This packet contains a DSA signature from the packet's sender.  This 
+     * signature is within the packet options.  All synchronize packets must
+     * have this flag set.
+     *
+     */
+    public static final int FLAG_SIGNATURE_INCLUDED = (1 << 3);
+    /**
+     * This packet wants the recipient to include signatures on subsequent
+     * packets sent to the creator of this packet.
+     */
+    public static final int FLAG_SIGNATURE_REQUESTED = (1 << 4);
+    /**
+     * This packet includes the full I2P destination of the packet's sender.
+     * The initial synchronize packet must have this flag set.
+     */
+    public static final int FLAG_FROM_INCLUDED = (1 << 5);
+    /**
+     * This packet includes an explicit request for the recipient to delay
+     * sending any packets with data for a given amount of time.
+     *
+     */
+    public static final int FLAG_DELAY_REQUESTED = (1 << 6);
+    /**
+     * This packet includes a request that the recipient not send any 
+     * subsequent packets with payloads greater than a specific size.
+     * If not set and no prior value was delivered, the maximum value 
+     * will be assumed (approximately 32KB).
+     *
+     */
+    public static final int FLAG_MAX_PACKET_SIZE_INCLUDED = (1 << 7);
+    /**
+     * If set, this packet is travelling as part of an interactive flow,
+     * meaning it is more lag sensitive than throughput sensitive.  aka
+     * send data ASAP rather than waiting around to send full packets.
+     *
+     */
+    public static final int FLAG_PROFILE_INTERACTIVE = (1 << 8);
+    
+    /** what stream is this packet a part of? */
+    public byte[] getSendStreamId() { return _sendStreamId; }
+    public void setSendStreamId(byte[] id) { _sendStreamId = id; }
+    
+    /** 
+     * what is the stream replies should be sent on?  if the 
+     * connection is still being built, this should be 
+     * {@see #RECEIVE_STREAM_ID_UNKNOWN}.
+     *
+     */
+    public byte[] getReceiveStreamId() { return _receiveStreamId; }
+    public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; }
+    
+    /** 0-indexed sequence number for this Packet in the sendStream */
+    public long getSequenceNum() { return _sequenceNum; }
+    public void setSequenceNum(long num) { _sequenceNum = num; }
+    
+    /** 
+     * what is the highest packet sequence number that received
+     * on the receiveStreamId?  This field is ignored on the initial
+     * connection packet (where receiveStreamId is the unknown id).
+     *
+     */
+    public long getAckThrough() { return _ackThrough; }
+    public void setAckThrough(long id) { _ackThrough = id; }
+    
+    /**
+     * What packet sequence numbers below the getAckThrough() value
+     * have not been received?  this may be null.
+     *
+     */
+    public long[] getNacks() { return _nacks; }
+    public void setNacks(long nacks[]) { _nacks = nacks; }
+    
+    /**
+     * How long is the creator of this packet going to wait before
+     * resending this packet (if it hasn't yet been ACKed).  The 
+     * value is seconds since the packet was created.
+     *
+     */
+    public int getResendDelay() { return _resendDelay; }
+    public void setResendDelay(int numSeconds) { _resendDelay = numSeconds; }
+    
+    /** get the actual payload of the message.  may be null */
+    public byte[] getPayload() { return _payload; }
+    public void setPayload(byte payload[]) { _payload = payload; }
+
+    /** is a particular flag set on this packet? */
+    public boolean isFlagSet(int flag) { return 0 != (_flags & flag); }
+    public void setFlag(int flag) { _flags |= flag; }
+
+    /** the signature on the packet (only included if the flag for it is set) */
+    public Signature getOptionalSignature() { return _optionSignature; }
+    public void setOptionalSignature(Signature sig) { _optionSignature = sig; }
+
+    /** the sender of the packet (only included if the flag for it is set) */
+    public Destination getOptionalFrom() { return _optionFrom; }
+    public void setOptionalFrom(Destination from) { _optionFrom = from; }
+    
+    /** 
+     * How many milliseconds the sender of this packet wants the recipient
+     * to wait before sending any more data (only valid if the flag for it is
+     * set) 
+     */
+    public int getOptionalDelay() { return _optionDelay; }
+    public void setOptionalDelay(int delayMs) { _optionDelay = delayMs; }
+    
+    /** 
+     * What is the largest payload the sender of this packet wants to receive?
+     *
+     */
+    public int getOptionalMaxSize() { return _optionMaxSize; }
+    public void setOptionalMaxSize(int numBytes) { _optionMaxSize = numBytes; }
+    
+    /**
+     * Write the packet to the buffer (starting at the offset) and return
+     * the number of bytes written.
+     *
+     * @throws IllegalStateException if there is data missing or otherwise b0rked
+     */
+    public int writePacket(byte buffer[], int offset) throws IllegalStateException {
+        return writePacket(buffer, offset, true);
+    }
+    /**
+     * @param includeSig if true, include the real signature, otherwise put zeroes
+     *                   in its place.
+     */
+    private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException {
+        int cur = offset;
+        System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length);
+        cur += _sendStreamId.length;
+        System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length);
+        cur += _receiveStreamId.length;
+        DataHelper.toLong(buffer, cur, 4, _sequenceNum);
+        cur += 4;
+        DataHelper.toLong(buffer, cur, 4, _ackThrough);
+        cur += 4;
+        if (_nacks != null) {
+            DataHelper.toLong(buffer, cur, 1, _nacks.length);
+            cur++;
+            for (int i = 0; i < _nacks.length; i++) {
+                DataHelper.toLong(buffer, cur, 4, _nacks[i]);
+                cur += 4;
+            }
+        } else {
+            DataHelper.toLong(buffer, cur, 1, 0);
+            cur++;
+        }
+        DataHelper.toLong(buffer, cur, 1, _resendDelay);
+        cur++;
+        DataHelper.toLong(buffer, cur, 2, _flags);
+        cur += 2;
+
+        int optionSize = 0;
+        if (isFlagSet(FLAG_DELAY_REQUESTED))
+            optionSize += 1;
+        if (isFlagSet(FLAG_FROM_INCLUDED))
+            optionSize += _optionFrom.size();
+        if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED))
+            optionSize += 2;
+        if (isFlagSet(FLAG_SIGNATURE_INCLUDED))
+            optionSize += Signature.SIGNATURE_BYTES;
+        
+        DataHelper.toLong(buffer, cur, 2, optionSize);
+        cur += 2;
+        
+        if (isFlagSet(FLAG_DELAY_REQUESTED)) {
+            DataHelper.toLong(buffer, cur, 1, _optionDelay);
+            cur++;
+        }
+        if (isFlagSet(FLAG_FROM_INCLUDED)) {
+            cur += _optionFrom.writeBytes(buffer, cur);
+        }
+        if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) {
+            DataHelper.toLong(buffer, cur, 2, _optionMaxSize);
+            cur += 2;
+        }
+        if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) {
+            if (includeSig)
+                System.arraycopy(_optionSignature.getData(), 0, buffer, cur, Signature.SIGNATURE_BYTES);
+            else // we're signing (or validating)
+                Arrays.fill(buffer, cur, Signature.SIGNATURE_BYTES, (byte)0x0);
+            cur += Signature.SIGNATURE_BYTES;
+        }
+        
+        if (_payload != null) {
+            System.arraycopy(_payload, 0, buffer, cur, _payload.length);
+            cur += _payload.length;
+        }
+        
+        return cur - offset;
+    }
+    
+    /**
+     * Read the packet from the buffer (starting at the offset) and return
+     * the number of bytes read.
+     *
+     * @param buffer packet buffer containing the data
+     * @param offset index into the buffer to start readign
+     * @param length how many bytes within the buffer past the offset are 
+     *               part of the packet?
+     *
+     * @throws IllegalArgumentException if the data is b0rked
+     */
+    public void readPacket(byte buffer[], int offset, int length) throws IllegalArgumentException {
+        int cur = offset;
+        _sendStreamId = new byte[4];
+        System.arraycopy(buffer, cur, _sendStreamId, 0, 4);
+        cur += 4;
+        _receiveStreamId = new byte[4];
+        System.arraycopy(buffer, cur, _receiveStreamId, 0, 4);
+        cur += 4;
+        _sequenceNum = DataHelper.fromLong(buffer, cur, 4);
+        cur += 4;
+        _ackThrough = DataHelper.fromLong(buffer, cur, 4);
+        cur += 4;
+        int numNacks = (int)DataHelper.fromLong(buffer, cur, 1);
+        cur++;
+        if (numNacks > 0) {
+            _nacks = new long[numNacks];
+            for (int i = 0; i < numNacks; i++) {
+                _nacks[i] = DataHelper.fromLong(buffer, cur, 4);
+                cur += 4;
+            }
+        } else {
+            _nacks = null;
+        }
+        _resendDelay = (int)DataHelper.fromLong(buffer, cur, 1);
+        cur++;
+        _flags = (int)DataHelper.fromLong(buffer, cur, 2);
+        cur += 2;
+        
+        int optionSize = (int)DataHelper.fromLong(buffer, cur, 2);
+        cur += 2;
+        int payloadBegin = cur + optionSize;
+        
+        // skip ahead to the payload
+        _payload = new byte[offset + length - payloadBegin];
+        System.arraycopy(buffer, payloadBegin, _payload, 0, _payload.length);
+        
+        // ok now lets go back and deal with the options
+        if (isFlagSet(FLAG_DELAY_REQUESTED)) {
+            _optionDelay = (int)DataHelper.fromLong(buffer, cur, 1);
+            cur++;
+        }
+        if (isFlagSet(FLAG_FROM_INCLUDED)) {
+            _optionFrom = new Destination();
+            cur += _optionFrom.readBytes(buffer, cur);
+        }
+        if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) {
+            _optionMaxSize = (int)DataHelper.fromLong(buffer, cur, 2);
+            cur += 2;
+        }
+        if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) {
+            Signature sig = new Signature();
+            byte buf[] = new byte[Signature.SIGNATURE_BYTES];
+            System.arraycopy(buffer, cur, buf, 0, Signature.SIGNATURE_BYTES);
+            sig.setData(buf);
+            cur += Signature.SIGNATURE_BYTES;
+        }
+    }
+    
+    /**
+     * Determine whether the signature on the data is valid.  
+     *
+     * @return true if the signature exists and validates against the data, 
+     *         false otherwise.
+     */
+    public boolean verifySignature(I2PAppContext ctx, Destination from, byte buffer[]) {
+        if (!isFlagSet(FLAG_SIGNATURE_INCLUDED)) return false;
+        if (_optionSignature == null) return false;
+        
+        int size = writePacket(buffer, 0, false);
+        return ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey());
+    }
+
+    /**
+     * Sign and write the packet to the buffer (starting at the offset) and return
+     * the number of bytes written.
+     *
+     * @throws IllegalStateException if there is data missing or otherwise b0rked
+     */
+    public int writeSignedPacket(byte buffer[], int offset, I2PAppContext ctx, SigningPrivateKey key) throws IllegalStateException {
+        setFlag(FLAG_SIGNATURE_INCLUDED);
+        int size = writePacket(buffer, offset, false);
+        _optionSignature = ctx.dsa().sign(buffer, offset, size, key);
+        // jump into the signed data and inject the signature where we 
+        // previously placed a bunch of zeroes
+        int signatureOffset = offset 
+                              + 4 // sendStreamId
+                              + 4 // receiveStreamId
+                              + 4 // sequenceNum
+                              + 4 // ackThrough
+                              + (_nacks != null ? 4*_nacks.length + 1 : 1)
+                              + 1 // resendDelay
+                              + 2 // flags
+                              + 2 // optionSize
+                              + (isFlagSet(FLAG_DELAY_REQUESTED) ? 1 : 0)
+                              + (isFlagSet(FLAG_FROM_INCLUDED) ? _optionFrom.size() : 0)
+                              + (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED) ? 2 : 0);
+        System.arraycopy(_optionSignature.getData(), 0, buffer, signatureOffset, Signature.SIGNATURE_BYTES);
+        return size;
+    }
+}
diff --git a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java
new file mode 100644
index 0000000000..753f16b352
--- /dev/null
+++ b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java
@@ -0,0 +1,90 @@
+package net.i2p.client.streaming;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+
+import net.i2p.I2PAppContext;
+import net.i2p.data.DataHelper;
+import net.i2p.util.Log;
+
+/**
+ * Stress test the MessageInputStream
+ */
+public class MessageInputStreamTest {
+    private I2PAppContext _context;
+    private Log _log;
+    
+    public MessageInputStreamTest() {
+        _context = I2PAppContext.getGlobalContext();
+        _log = _context.logManager().getLog(MessageInputStreamTest.class);
+    }
+    
+    public void testInOrder() {
+        byte orig[] = new byte[32*1024];
+        _context.random().nextBytes(orig);
+        
+        MessageInputStream in = new MessageInputStream();
+        for (int i = 0; i < 32; i++) {
+            byte msg[] = new byte[1024];
+            System.arraycopy(orig, i*1024, msg, 0, 1024);
+            in.messageReceived(i, msg);
+        }
+        
+        byte read[] = new byte[32*1024];
+        try {
+            int howMany = DataHelper.read(in, read);
+            if (howMany != orig.length)
+                throw new RuntimeException("Failed test: not enough bytes read [" + howMany + "]");
+            if (!DataHelper.eq(orig, read))
+                throw new RuntimeException("Failed test: data read is not equal");
+            
+            _log.info("Passed test: in order");
+        } catch (IOException ioe) {
+            throw new RuntimeException("IOError reading: " + ioe.getMessage());
+        }
+    }
+    
+    public void testRandomOrder() {
+        byte orig[] = new byte[32*1024];
+        _context.random().nextBytes(orig);
+        
+        MessageInputStream in = new MessageInputStream();
+        ArrayList order = new ArrayList(32);
+        for (int i = 0; i < 32; i++)
+            order.add(new Integer(i));
+        Collections.shuffle(order);
+        for (int i = 0; i < 32; i++) {
+            byte msg[] = new byte[1024];
+            Integer cur = (Integer)order.get(i);
+            System.arraycopy(orig, cur.intValue()*1024, msg, 0, 1024);
+            in.messageReceived(cur.intValue(), msg);
+            _log.debug("Injecting " + cur);
+        }
+        
+        byte read[] = new byte[32*1024];
+        try {
+            int howMany = DataHelper.read(in, read);
+            if (howMany != orig.length)
+                throw new RuntimeException("Failed test: not enough bytes read [" + howMany + "]");
+            if (!DataHelper.eq(orig, read))
+                throw new RuntimeException("Failed test: data read is not equal");
+            
+            _log.info("Passed test: random order");
+        } catch (IOException ioe) {
+            throw new RuntimeException("IOError reading: " + ioe.getMessage());
+        }
+    }
+    
+    public static void main(String args[]) {
+        MessageInputStreamTest t = new MessageInputStreamTest();
+        try {
+            t.testInOrder();
+            t.testRandomOrder();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        try { Thread.sleep(10*1000); } catch (InterruptedException ie) {}
+    }
+}
-- 
GitLab