diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Connection.java b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java new file mode 100644 index 0000000000..41de16ec43 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/Connection.java @@ -0,0 +1,386 @@ +package net.i2p.client.streaming; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import net.i2p.I2PAppContext; +import net.i2p.client.I2PSession; +import net.i2p.data.Base64; +import net.i2p.data.Destination; +import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; + +/** + * Maintain the state controlling a streaming connection between two + * destinations. + * + */ +public class Connection { + private I2PAppContext _context; + private Log _log; + private ConnectionManager _connectionManager; + private Destination _remotePeer; + private byte _sendStreamId[]; + private byte _receiveStreamId[]; + private long _lastSendTime; + private long _lastSendId; + private boolean _resetReceived; + private boolean _connected; + private MessageInputStream _inputStream; + private MessageOutputStream _outputStream; + private SchedulerChooser _chooser; + private long _nextSendTime; + private long _ackedPackets; + private long _createdOn; + private long _closeSentOn; + private long _closeReceivedOn; + private int _unackedPacketsReceived; + /** Packet ID (Long) to PacketLocal for sent but unacked packets */ + private Map _outboundPackets; + private PacketQueue _outboundQueue; + private ConnectionPacketHandler _handler; + private ConnectionOptions _options; + private ConnectionDataReceiver _receiver; + private I2PSocketFull _socket; + /** set to an error cause if the connection could not be established */ + private String _connectionError; + + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler) { + this(ctx, manager, chooser, queue, handler, null); + } + public Connection(I2PAppContext ctx, ConnectionManager manager, SchedulerChooser chooser, PacketQueue queue, ConnectionPacketHandler handler, ConnectionOptions opts) { + _context = ctx; + _log = ctx.logManager().getLog(Connection.class); + _receiver = new ConnectionDataReceiver(ctx, this); + _inputStream = new MessageInputStream(ctx); + _outputStream = new MessageOutputStream(ctx, _receiver); + _chooser = chooser; + _outboundPackets = new TreeMap(); + _outboundQueue = queue; + _handler = handler; + _options = (opts != null ? opts : new ConnectionOptions()); + _lastSendId = -1; + _nextSendTime = -1; + _ackedPackets = 0; + _createdOn = ctx.clock().now(); + _closeSentOn = -1; + _closeReceivedOn = -1; + _unackedPacketsReceived = 0; + _connectionManager = manager; + _resetReceived = false; + _connected = true; + } + + public long getNextOutboundPacketNum() { + synchronized (this) { + return ++_lastSendId; + } + } + + void closeReceived() { + setCloseReceivedOn(_context.clock().now()); + _inputStream.closeReceived(); + } + + /** + * Block until there is an open outbound packet slot or the write timeout + * expires. + * + * @return true if the packet should be sent + */ + boolean packetSendChoke() { + //if (true) return true; + long writeExpire = _options.getWriteTimeout(); + if (writeExpire > 0) + writeExpire += _context.clock().now(); + while (true) { + long timeLeft = writeExpire - _context.clock().now(); + synchronized (_outboundPackets) { + if (_outboundPackets.size() >= _options.getWindowSize()) { + if (writeExpire > 0) { + if (timeLeft <= 0) return false; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Outbound window is full (" + _outboundPackets.size() + "/" + _options.getWindowSize() + "), waiting " + timeLeft); + try { _outboundPackets.wait(timeLeft); } catch (InterruptedException ie) {} + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Outbound window is full (" + _outboundPackets.size() + "), waiting indefinitely"); + try { _outboundPackets.wait(); } catch (InterruptedException ie) {} + } + } else { + return true; + } + } + } + } + + /** + * Flush any data that we can + */ + void sendAvailable() { + // this grabs the data, builds a packet, and queues it up via sendPacket + try { + _outputStream.flushAvailable(_receiver); + } catch (IOException ioe) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Error flushing available", ioe); + } + } + + void sendPacket(PacketLocal packet) { + setNextSendTime(-1); + _unackedPacketsReceived = 0; + if (_options.getRequireFullySigned()) { + packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED); + } + + if ( (packet.getSequenceNum() == 0) && (!packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) ) { + // ACK only, no retries + } else { + synchronized (_outboundPackets) { + _outboundPackets.put(new Long(packet.getSequenceNum()), packet); + } + SimpleTimer.getInstance().addEvent(new ResendPacketEvent(packet), _options.getResendDelay()); + } + _lastSendTime = _context.clock().now(); + _outboundQueue.enqueue(packet); + } + + /* + void flushPackets() { + List toSend = null; + synchronized (_outboundPackets) { + for (Iterator iter = _outboundPackets.values().iterator(); iter.hasNext(); ) { + PacketLocal packet = (PacketLocal)iter.next(); + long nextExpected = _options.getResendDelay() << packet.getNumSends(); + if (packet.getLastSend() + nextExpected <= _context.clock().now()) { + // we need to resend + if (toSend == null) toSend = new ArrayList(1); + toSend.add(packet); + } + } + } + + if (toSend != null) { + for (int i = 0; i < toSend.size(); i++) { + PacketLocal packet = (PacketLocal)toSend.get(i); + _lastSendTime = _context.clock().now(); + _outboundQueue.enqueue(packet); + } + } + } + */ + List ackPackets(long ackThrough, long nacks[]) { + List acked = null; + synchronized (_outboundPackets) { + for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { + Long id = (Long)iter.next(); + if (id.longValue() <= ackThrough) { + if (nacks != null) { + // linear search since its probably really tiny + for (int i = 0; i < nacks.length; i++) + if (nacks[i] == id.longValue()) + continue; // NACKed + } else { + // ACKed + if (acked == null) + acked = new ArrayList(1); + PacketLocal ackedPacket = (PacketLocal)_outboundPackets.get(id); + ackedPacket.ackReceived(); + acked.add(ackedPacket); + } + } else { + break; // _outboundPackets is ordered + } + } + if (acked != null) { + for (int i = 0; i < acked.size(); i++) { + PacketLocal p = (PacketLocal)acked.get(i); + _outboundPackets.remove(new Long(p.getSequenceNum())); + _ackedPackets++; + } + } + _outboundPackets.notifyAll(); + } + return acked; + } + + void eventOccurred() { + _chooser.getScheduler(this).eventOccurred(this); + } + + void resetReceived() { + _resetReceived = true; + _outputStream.streamErrorOccurred(new IOException("Reset received")); + _inputStream.streamErrorOccurred(new IOException("Reset received")); + } + public boolean getResetReceived() { return _resetReceived; } + + public boolean getIsConnected() { return _connected; } + + void disconnect(boolean cleanDisconnect) { + if (!_connected) return; + _connected = false; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Disconnecting " + toString(), new Exception("discon")); + + if (cleanDisconnect) { + // send close packets and schedule stuff... + try { + _outputStream.close(); + _inputStream.close(); + } catch (IOException ioe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error on clean disconnect", ioe); + } + } else { + doClose(); + synchronized (_outboundPackets) { + _outboundPackets.clear(); + } + _connectionManager.removeConnection(this); + } + } + + void disconnectComplete() { + _connectionManager.removeConnection(this); + } + + private void doClose() { + _outputStream.streamErrorOccurred(new IOException("Hard disconnect")); + _inputStream.closeReceived(); + } + + /** who are we talking with */ + public Destination getRemotePeer() { return _remotePeer; } + public void setRemotePeer(Destination peer) { _remotePeer = peer; } + + /** what stream do we send data to the peer on? */ + public byte[] getSendStreamId() { return _sendStreamId; } + public void setSendStreamId(byte[] id) { _sendStreamId = id; } + + /** what stream does the peer send data to us on? (may be null) */ + public byte[] getReceiveStreamId() { return _receiveStreamId; } + public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; } + + /** when did we last send anything to the peer? */ + public long getLastSendTime() { return _lastSendTime; } + public void setLastSendTime(long when) { _lastSendTime = when; } + + /** what was the last packet Id sent to the peer? */ + public long getLastSendId() { return _lastSendId; } + public void setLastSendId(long id) { _lastSendId = id; } + + public ConnectionOptions getOptions() { return _options; } + public void setOptions(ConnectionOptions opts) { _options = opts; } + + public I2PSession getSession() { return _connectionManager.getSession(); } + public I2PSocketFull getSocket() { return _socket; } + public void setSocket(I2PSocketFull socket) { _socket = socket; } + + public String getConnectionError() { return _connectionError; } + public void setConnectionError(String err) { _connectionError = err; } + + public ConnectionPacketHandler getPacketHandler() { return _handler; } + + /** + * when does the scheduler next want to send a packet? -1 if never. + * This should be set when we want to send on timeout, for instance, or + * want to delay an ACK. + */ + public long getNextSendTime() { return _nextSendTime; } + public void setNextSendTime(long when) { + if (_nextSendTime > 0) + if (_log.shouldLog(Log.DEBUG)) + _log.debug("set next send time to " + (when-_nextSendTime) + "ms after it was before ("+when+")"); + _nextSendTime = when; + } + + public long getAckedPackets() { return _ackedPackets; } + public long getCreatedOn() { return _createdOn; } + public long getCloseSentOn() { return _closeSentOn; } + public void setCloseSentOn(long when) { _closeSentOn = when; } + public long getCloseReceivedOn() { return _closeReceivedOn; } + public void setCloseReceivedOn(long when) { _closeReceivedOn = when; } + + public void incrementUnackedPacketsReceived() { _unackedPacketsReceived++; } + public int getUnackedPacketsReceived() { return _unackedPacketsReceived; } + public int getUnackedPacketsSent() { + synchronized (_outboundPackets) { + return _outboundPackets.size(); + } + } + + /** stream that the local peer receives data on */ + public MessageInputStream getInputStream() { return _inputStream; } + /** stream that the local peer sends data to the remote peer on */ + public MessageOutputStream getOutputStream() { return _outputStream; } + + public String toString() { + StringBuffer buf = new StringBuffer(128); + buf.append("[Connection "); + if (_receiveStreamId != null) + buf.append(Base64.encode(_receiveStreamId)); + else + buf.append("unknown"); + buf.append("<-->"); + if (_sendStreamId != null) + buf.append(Base64.encode(_sendStreamId)); + else + buf.append("unknown"); + buf.append(" unacked outbound: "); + synchronized (_outboundPackets) { + buf.append(_outboundPackets.size()).append(" ["); + for (Iterator iter = _outboundPackets.keySet().iterator(); iter.hasNext(); ) { + buf.append(((Long)iter.next()).longValue()).append(" "); + } + buf.append("] "); + } + buf.append("unacked inbound? ").append(getUnackedPacketsReceived()); + buf.append("]"); + return buf.toString(); + } + + /** + * Coordinate the resends of a given packet + */ + private class ResendPacketEvent implements SimpleTimer.TimedEvent { + private PacketLocal _packet; + public ResendPacketEvent(PacketLocal packet) { + _packet = packet; + } + + public void timeReached() { + boolean resend = false; + synchronized (_outboundPackets) { + if (_outboundPackets.containsKey(new Long(_packet.getSequenceNum()))) + resend = true; + } + if ( (resend) && (_packet.getAckTime() < 0) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Resend packet " + _packet + " on " + Connection.this); + _outboundQueue.enqueue(_packet); + + int numSends = _packet.getNumSends(); + if (numSends > _options.getMaxResends()) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Too many resends"); + disconnect(false); + } else { + long timeout = _options.getResendDelay() << numSends; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Scheduling resend in " + timeout + "ms"); + SimpleTimer.getInstance().addEvent(ResendPacketEvent.this, timeout); + } + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet acked before resend: " + _packet + " on " + Connection.this); + } + } + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java new file mode 100644 index 0000000000..1d854ca82a --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionDataReceiver.java @@ -0,0 +1,104 @@ +package net.i2p.client.streaming; + +import java.io.InterruptedIOException; +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + * + */ +class ConnectionDataReceiver implements MessageOutputStream.DataReceiver { + private I2PAppContext _context; + private Log _log; + private Connection _connection; + + public ConnectionDataReceiver(I2PAppContext ctx, Connection con) { + _context = ctx; + _log = ctx.logManager().getLog(ConnectionDataReceiver.class); + _connection = con; + } + + public void writeData(byte[] buf, int off, int size) throws InterruptedIOException { + if (!_connection.packetSendChoke()) + throw new InterruptedIOException("Timeout expired waiting to write"); + boolean doSend = true; + if ( (size <= 0) && (_connection.getLastSendId() >= 0) ) { + if (_connection.getOutputStream().getClosed()) { + if (_connection.getCloseSentOn() < 0) { + doSend = true; + } else { + // closed, no new data, and we've already sent a close packet + doSend = false; + } + } else { + // no new data, not closed, already synchronized + doSend = false; + } + } + + if (_connection.getUnackedPacketsReceived() > 0) + doSend = true; + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("writeData called: size="+size + " doSend=" + doSend + " con: " + _connection, new Exception("write called by")); + + if (doSend) { + PacketLocal packet = buildPacket(buf, off, size); + _connection.sendPacket(packet); + } else { + //_connection.flushPackets(); + } + } + + private boolean isAckOnly(int size) { + return ( (size <= 0) && // no data + (_connection.getLastSendId() >= 0) && // not a SYN + ( (!_connection.getOutputStream().getClosed()) || // not a CLOSE + (_connection.getOutputStream().getClosed() && + _connection.getCloseSentOn() > 0) )); // or it is a dup CLOSE + } + + private PacketLocal buildPacket(byte buf[], int off, int size) { + boolean ackOnly = isAckOnly(size); + PacketLocal packet = new PacketLocal(_context, _connection.getRemotePeer()); + byte data[] = new byte[size]; + System.arraycopy(buf, off, data, 0, size); + packet.setPayload(data); + if (ackOnly) + packet.setSequenceNum(0); + else + packet.setSequenceNum(_connection.getNextOutboundPacketNum()); + packet.setSendStreamId(_connection.getSendStreamId()); + packet.setReceiveStreamId(_connection.getReceiveStreamId()); + + packet.setAckThrough(_connection.getInputStream().getHighestBlockId()); + packet.setNacks(_connection.getInputStream().getNacks()); + packet.setOptionalDelay(_connection.getOptions().getChoke()); + packet.setOptionalMaxSize(_connection.getOptions().getMaxMessageSize()); + packet.setResendDelay(_connection.getOptions().getResendDelay()); + + if (_connection.getOptions().getProfile() == ConnectionOptions.PROFILE_INTERACTIVE) + packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, true); + else + packet.setFlag(Packet.FLAG_PROFILE_INTERACTIVE, false); + + packet.setFlag(Packet.FLAG_SIGNATURE_REQUESTED, _connection.getOptions().getRequireFullySigned()); + + if ( (!ackOnly) && (packet.getSequenceNum() <= 0) ) { + packet.setFlag(Packet.FLAG_SYNCHRONIZE); + packet.setOptionalFrom(_connection.getSession().getMyDestination()); + } + + if (_connection.getOutputStream().getClosed()) { + packet.setFlag(Packet.FLAG_CLOSE); + _connection.setCloseSentOn(_context.clock().now()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closed is set for a new packet on " + _connection + ": " + packet); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closed is not set for a new packet on " + _connection + ": " + packet); + } + return packet; + } + +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java new file mode 100644 index 0000000000..9bee67a1e6 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionHandler.java @@ -0,0 +1,126 @@ +package net.i2p.client.streaming; + +import java.util.ArrayList; +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; +import net.i2p.util.SimpleTimer; + +/** + * Receive new connection attempts + */ +class ConnectionHandler { + private I2PAppContext _context; + private Log _log; + private ConnectionManager _manager; + private List _synQueue; + private boolean _active; + private int _acceptTimeout; + + /** max time after receiveNewSyn() and before the matched accept() */ + private static final int DEFAULT_ACCEPT_TIMEOUT = 3*1000; + + /** Creates a new instance of ConnectionHandler */ + public ConnectionHandler(I2PAppContext context, ConnectionManager mgr) { + _context = context; + _log = context.logManager().getLog(ConnectionHandler.class); + _manager = mgr; + _synQueue = new ArrayList(5); + _active = false; + _acceptTimeout = DEFAULT_ACCEPT_TIMEOUT; + } + + public void setActive(boolean active) { _active = active; } + public boolean getActive() { return _active; } + + public void receiveNewSyn(Packet packet) { + if (!_active) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Dropping new SYN request, as we're not listening"); + sendReset(packet); + return; + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Receive new SYN: " + packet + ": timeout in " + _acceptTimeout); + SimpleTimer.getInstance().addEvent(new TimeoutSyn(packet), _acceptTimeout); + synchronized (_synQueue) { + _synQueue.add(packet); + _synQueue.notifyAll(); + } + } + + public Connection accept(long timeoutMs) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accept("+ timeoutMs+") called"); + + long expiration = timeoutMs; + if (expiration > 0) + expiration += _context.clock().now(); + Packet syn = null; + synchronized (_synQueue) { + while ( _active && (_synQueue.size() <= 0) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Accept("+ timeoutMs+"): active=" + _active + " queue: " + _synQueue.size()); + if (timeoutMs <= 0) { + try { _synQueue.wait(); } catch (InterruptedException ie) {} + } else { + long remaining = expiration - _context.clock().now(); + if (remaining < 0) + break; + try { _synQueue.wait(remaining); } catch (InterruptedException ie) {} + } + } + if (_active && _synQueue.size() > 0) { + syn = (Packet)_synQueue.remove(0); + } + } + + if (syn != null) { + return _manager.receiveConnection(syn); + } else { + return null; + } + } + + private void sendReset(Packet packet) { + boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null); + if (!ok) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received a spoofed SYN packet: they said they were " + packet.getOptionalFrom()); + return; + } + PacketLocal reply = new PacketLocal(_context, packet.getOptionalFrom()); + reply.setFlag(Packet.FLAG_RESET); + reply.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + reply.setAckThrough(packet.getSequenceNum()); + reply.setSendStreamId(packet.getReceiveStreamId()); + reply.setReceiveStreamId(null); + reply.setOptionalFrom(_manager.getSession().getMyDestination()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Sending RST: " + reply + " because of " + packet); + // this just sends the packet - no retries or whatnot + _manager.getPacketQueue().enqueue(reply); + } + + private class TimeoutSyn implements SimpleTimer.TimedEvent { + private Packet _synPacket; + public TimeoutSyn(Packet packet) { + _synPacket = packet; + } + + public void timeReached() { + boolean removed = false; + synchronized (_synQueue) { + removed = _synQueue.remove(_synPacket); + } + + if (removed) { + // timeout - send RST + sendReset(_synPacket); + } else { + // handled. noop + } + } + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java new file mode 100644 index 0000000000..9634de5d97 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionManager.java @@ -0,0 +1,200 @@ +package net.i2p.client.streaming; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PSession; +import net.i2p.data.ByteArray; +import net.i2p.data.Destination; + +/** + * Coordinate all of the connections for a single local destination. + * + * + */ +public class ConnectionManager { + private I2PAppContext _context; + private I2PSession _session; + private MessageHandler _messageHandler; + private PacketHandler _packetHandler; + private ConnectionHandler _connectionHandler; + private PacketQueue _outboundQueue; + private SchedulerChooser _schedulerChooser; + private ConnectionPacketHandler _conPacketHandler; + /** Inbound stream ID (ByteArray) to Connection map */ + private Map _connectionByInboundId; + /** Ping ID (ByteArray) to PingRequest */ + private Map _pendingPings; + private boolean _allowIncoming; + private Object _connectionLock; + + public ConnectionManager(I2PAppContext context, I2PSession session) { + _context = context; + _connectionByInboundId = new HashMap(32); + _pendingPings = new HashMap(4); + _connectionLock = new Object(); + _messageHandler = new MessageHandler(context, this); + _packetHandler = new PacketHandler(context, this); + _connectionHandler = new ConnectionHandler(context, this); + _schedulerChooser = new SchedulerChooser(context); + _conPacketHandler = new ConnectionPacketHandler(context); + _session = session; + session.setSessionListener(_messageHandler); + _outboundQueue = new PacketQueue(context, session); + _allowIncoming = false; + } + + Connection getConnectionByInboundId(byte[] id) { + synchronized (_connectionLock) { + return (Connection)_connectionByInboundId.get(new ByteArray(id)); + } + } + + public void setAllowIncomingConnections(boolean allow) { + _connectionHandler.setActive(allow); + } + public boolean getAllowIncomingConnections() { + return _connectionHandler.getActive(); + } + + /** + * Create a new connection based on the SYN packet we received. + * + * @return created Connection with the packet's data already delivered to + * it, or null if the syn's streamId was already taken + */ + public Connection receiveConnection(Packet synPacket) { + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler); + byte receiveId[] = new byte[4]; + _context.random().nextBytes(receiveId); + synchronized (_connectionLock) { + while (true) { + Connection oldCon = (Connection)_connectionByInboundId.put(new ByteArray(receiveId), con); + if (oldCon == null) { + break; + } else { + _connectionByInboundId.put(new ByteArray(receiveId), oldCon); + // receiveId already taken, try another + _context.random().nextBytes(receiveId); + } + } + } + + con.setReceiveStreamId(receiveId); + con.getPacketHandler().receivePacket(synPacket, con); + return con; + } + + /** + * Build a new connection to the given peer + */ + public Connection connect(Destination peer, ConnectionOptions opts) { + Connection con = new Connection(_context, this, _schedulerChooser, _outboundQueue, _conPacketHandler, opts); + con.setRemotePeer(peer); + byte receiveId[] = new byte[4]; + _context.random().nextBytes(receiveId); + synchronized (_connectionLock) { + ByteArray ba = new ByteArray(receiveId); + while (_connectionByInboundId.containsKey(ba)) { + _context.random().nextBytes(receiveId); + } + _connectionByInboundId.put(ba, con); + } + + con.setReceiveStreamId(receiveId); + con.eventOccurred(); + return con; + } + + public MessageHandler getMessageHandler() { return _messageHandler; } + public PacketHandler getPacketHandler() { return _packetHandler; } + public ConnectionHandler getConnectionHandler() { return _connectionHandler; } + public I2PSession getSession() { return _session; } + public PacketQueue getPacketQueue() { return _outboundQueue; } + + /** + * Something b0rked hard, so kill all of our connections without mercy. + * Don't bother sending close packets. + * + */ + public void disconnectAllHard() { + synchronized (_connectionLock) { + for (Iterator iter = _connectionByInboundId.values().iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + con.disconnect(false); + } + _connectionByInboundId.clear(); + } + } + + public void removeConnection(Connection con) { + synchronized (_connectionLock) { + _connectionByInboundId.remove(new ByteArray(con.getReceiveStreamId())); + } + } + + public Set listConnections() { + synchronized (_connectionLock) { + return new HashSet(_connectionByInboundId.values()); + } + } + + public boolean ping(Destination peer, long timeoutMs) { + PingRequest req = new PingRequest(); + byte id[] = new byte[4]; + _context.random().nextBytes(id); + ByteArray ba = new ByteArray(id); + + synchronized (_pendingPings) { + _pendingPings.put(ba, req); + } + + PacketLocal packet = new PacketLocal(_context, peer); + packet.setSendStreamId(id); + packet.setFlag(Packet.FLAG_ECHO); + packet.setFlag(Packet.FLAG_SIGNATURE_INCLUDED); + packet.setOptionalFrom(_session.getMyDestination()); + _outboundQueue.enqueue(packet); + + synchronized (req) { + if (!req.pongReceived()) + try { req.wait(timeoutMs); } catch (InterruptedException ie) {} + } + + synchronized (_pendingPings) { + _pendingPings.remove(ba); + } + + boolean ok = req.pongReceived(); + if (ok) { + _context.sessionKeyManager().tagsDelivered(peer.getPublicKey(), packet.getKeyUsed(), packet.getTagsSent()); + } + return ok; + } + + private class PingRequest { + private boolean _ponged; + public PingRequest() { _ponged = false; } + public void pong() { + synchronized (ConnectionManager.PingRequest.this) { + _ponged = true; + ConnectionManager.PingRequest.this.notifyAll(); + } + } + public boolean pongReceived() { return _ponged; } + } + + void receivePong(byte pingId[]) { + ByteArray ba = new ByteArray(pingId); + PingRequest req = null; + synchronized (_pendingPings) { + req = (PingRequest)_pendingPings.remove(ba); + } + if (req != null) + req.pong(); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java new file mode 100644 index 0000000000..89cbd5c51f --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionOptions.java @@ -0,0 +1,145 @@ +package net.i2p.client.streaming; + +import java.util.Properties; + +/** + * Define the current options for the con (and allow custom tweaking midstream) + * + */ +public class ConnectionOptions extends I2PSocketOptions { + private int _connectDelay; + private boolean _fullySigned; + private int _windowSize; + private int _receiveWindow; + private int _profile; + private int _rtt; + private int _resendDelay; + private int _sendAckDelay; + private int _maxMessageSize; + private int _choke; + private int _maxResends; + + public static final int PROFILE_BULK = 1; + public static final int PROFILE_INTERACTIVE = 2; + + public ConnectionOptions() { + super(); + init(null); + } + + public ConnectionOptions(I2PSocketOptions opts) { + super(opts); + init(null); + } + + public ConnectionOptions(ConnectionOptions opts) { + super(opts); + init(opts); + } + + private void init(ConnectionOptions opts) { + if (opts != null) { + setConnectDelay(opts.getConnectDelay()); + setProfile(opts.getProfile()); + setRTT(opts.getRTT()); + setRequireFullySigned(opts.getRequireFullySigned()); + setWindowSize(opts.getWindowSize()); + setResendDelay(opts.getResendDelay()); + setMaxMessageSize(opts.getMaxMessageSize()); + setChoke(opts.getChoke()); + setMaxResends(opts.getMaxResends()); + } else { + setConnectDelay(2*1000); + setProfile(PROFILE_BULK); + setMaxMessageSize(32*1024); + setRTT(5*1000); + setReceiveWindow(1); + setResendDelay(5*1000); + setSendAckDelay(1*1000); + setWindowSize(1); + setMaxResends(10); + } + } + + public ConnectionOptions(Properties opts) { + super(opts); + // load the options; + } + + /** + * how long will we wait after instantiating a new con + * before actually attempting to connect. If this is + * set to 0, connect ASAP. If it is greater than 0, wait + * until the output stream is flushed, the buffer fills, + * or that many milliseconds pass. + * + */ + public int getConnectDelay() { return _connectDelay; } + public void setConnectDelay(int delayMs) { _connectDelay = delayMs; } + + /** + * Do we want all packets in both directions to be signed, + * or can we deal with signatures on the SYN and FIN packets + * only? + * + */ + public boolean getRequireFullySigned() { return _fullySigned; } + public void setRequireFullySigned(boolean sign) { _fullySigned = sign; } + + /** + * How many messages will we send before waiting for an ACK? + * + */ + public int getWindowSize() { return _windowSize; } + public void setWindowSize(int numMsgs) { _windowSize = numMsgs; } + + /** after how many consecutive messages should we ack? */ + public int getReceiveWindow() { return _receiveWindow; } + public void setReceiveWindow(int numMsgs) { _receiveWindow = numMsgs; } + + /** + * What to set the round trip time estimate to (in milliseconds) + */ + public int getRTT() { return _rtt; } + public void setRTT(int ms) { _rtt = ms; } + + /** How long after sending a packet will we wait before resending? */ + public int getResendDelay() { return _resendDelay; } + public void setResendDelay(int ms) { _resendDelay = ms; } + + /** + * if there are packets we haven't ACKed yet and we don't + * receive _receiveWindow messages before + * (_lastSendTime+_sendAckDelay), send an ACK of what + * we have received so far. + * + */ + public int getSendAckDelay() { return _sendAckDelay; } + public void setSendAckDelay(int delayMs) { _sendAckDelay = delayMs; } + + /** What is the largest message we want to send or receive? */ + public int getMaxMessageSize() { return _maxMessageSize; } + public void setMaxMessageSize(int bytes) { _maxMessageSize = bytes; } + + /** + * how long we want to wait before any data is transferred on the + * connection in either direction + * + */ + public int getChoke() { return _choke; } + public void setChoke(int ms) { _choke = ms; } + + /** + * What profile do we want to use for this connection? + * + */ + public int getProfile() { return _profile; } + public void setProfile(int profile) { _profile = profile; } + + /** + * How many times will we try to send a message before giving up? + * + */ + public int getMaxResends() { return _maxResends; } + public void setMaxResends(int numSends) { _maxResends = numSends; } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java new file mode 100644 index 0000000000..4973e02802 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/ConnectionPacketHandler.java @@ -0,0 +1,174 @@ +package net.i2p.client.streaming; + +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * Receive a packet for a particular connection - placing the data onto the + * queue, marking packets as acked, updating various fields, etc. + * + */ +public class ConnectionPacketHandler { + private I2PAppContext _context; + private Log _log; + + /** rtt = rtt*RTT_DAMPENING + (1-RTT_DAMPENING)*currentPacketRTT */ + private static final double RTT_DAMPENING = 0.9; + + public ConnectionPacketHandler(I2PAppContext context) { + _context = context; + _log = context.logManager().getLog(ConnectionPacketHandler.class); + } + + /** distribute a packet to the connection specified */ + void receivePacket(Packet packet, Connection con) { + boolean ok = verifyPacket(packet, con); + if (!ok) return; + boolean isNew = con.getInputStream().messageReceived(packet.getSequenceNum(), packet.getPayload()); + if (isNew) { + con.incrementUnackedPacketsReceived(); + long nextTime = con.getNextSendTime(); + if (nextTime <= 0) { + con.setNextSendTime(con.getOptions().getSendAckDelay() + _context.clock().now()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Scheduling ack in " + con.getOptions().getSendAckDelay() + "ms for received packet " + packet); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Ack is already scheduled in " + nextTime + "ms, though we just received " + packet); + } + } else { + if (packet.getSequenceNum() > 0) { + // take note of congestion + con.getOptions().setResendDelay(con.getOptions().getResendDelay()*2); + //con.getOptions().setWindowSize(con.getOptions().getWindowSize()/2); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("ACK only packet received: " + packet); + } + } + List acked = con.ackPackets(packet.getAckThrough(), packet.getNacks()); + if ( (acked != null) && (acked.size() > 0) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug(acked.size() + " of our packets acked with " + packet); + // use the lowest RTT, since these would likely be bunched together, + // waiting for the most recent packet received before sending the ACK + int lowestRtt = -1; + for (int i = 0; i < acked.size(); i++) { + PacketLocal p = (PacketLocal)acked.get(i); + if ( (lowestRtt < 0) || (p.getAckTime() < lowestRtt) ) + lowestRtt = p.getAckTime(); + + // ACK the tags we delivered so we can use them + if ( (p.getKeyUsed() != null) && (p.getTagsSent() != null) + && (p.getTagsSent().size() > 0) ) { + _context.sessionKeyManager().tagsDelivered(p.getTo().getPublicKey(), + p.getKeyUsed(), + p.getTagsSent()); + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Packet acked: " + p); + } + int oldRTT = con.getOptions().getRTT(); + int newRTT = (int)(RTT_DAMPENING*oldRTT + (1-RTT_DAMPENING)*lowestRtt); + con.getOptions().setRTT(newRTT); + } + + con.eventOccurred(); + } + + /** + * Make sure this packet is ok and that we can continue processing its data. + * + * @return true if the packet is ok for this connection, false if we shouldn't + * continue processing. + */ + private boolean verifyPacket(Packet packet, Connection con) { + if (packet.isFlagSet(Packet.FLAG_RESET)) { + verifyReset(packet, con); + return false; + } else { + boolean sigOk = verifySignature(packet, con); + + if (con.getSendStreamId() == null) { + if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + con.setSendStreamId(packet.getReceiveStreamId()); + con.setRemotePeer(packet.getOptionalFrom()); + return true; + } else { + // neither RST nor SYN and we dont have the stream id yet? nuh uh + if (_log.shouldLog(Log.WARN)) + _log.warn("Packet without RST or SYN where we dont know stream ID: " + + packet); + return false; + } + } else { + if (!DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Packet received with the wrong reply stream id: " + + con + " / " + packet); + return false; + } else { + return true; + } + } + } + } + + /** + * Make sure this RST packet is valid, and if it is, act on it. + */ + private void verifyReset(Packet packet, Connection con) { + if (DataHelper.eq(con.getReceiveStreamId(), packet.getSendStreamId())) { + boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null); + if (!ok) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received unsigned / forged RST on " + con); + return; + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Reset received"); + // ok, valid RST + con.resetReceived(); + con.eventOccurred(); + + // no further processing + return; + } + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received a packet for the wrong connection? wtf: " + + con + " / " + packet); + return; + } + } + + /** + * Verify the signature if necessary. + * + * @return false only if the signature was required and it was invalid + */ + private boolean verifySignature(Packet packet, Connection con) { + // verify the signature if necessary + if (con.getOptions().getRequireFullySigned() || + packet.isFlagSet(Packet.FLAG_SYNCHRONIZE) || + packet.isFlagSet(Packet.FLAG_CLOSE) ) { + // we need a valid signature + Destination from = con.getRemotePeer(); + if (from == null) + from = packet.getOptionalFrom(); + boolean sigOk = packet.verifySignature(_context, from, null); + if (!sigOk) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received unsigned / forged packet: " + packet); + return false; + } + if (packet.isFlagSet(Packet.FLAG_CLOSE)) + con.closeReceived(); + } + return true; + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java new file mode 100644 index 0000000000..8109b0c166 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PServerSocketFull.java @@ -0,0 +1,24 @@ +package net.i2p.client.streaming; + +import java.net.ConnectException; +import net.i2p.I2PException; + +/** + * Bridge to allow accepting new connections + * + */ +public class I2PServerSocketFull implements I2PServerSocket { + private I2PSocketManagerFull _socketManager; + + public I2PServerSocketFull(I2PSocketManagerFull mgr) { + _socketManager = mgr; + } + + public I2PSocket accept() throws I2PException { + return _socketManager.receiveSocket(); + } + + public void close() { _socketManager.getConnectionManager().setAllowIncomingConnections(false); } + + public I2PSocketManager getManager() { return _socketManager; } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java new file mode 100644 index 0000000000..413d8a9b55 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketFull.java @@ -0,0 +1,67 @@ +package net.i2p.client.streaming; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import net.i2p.data.Destination; + +/** + * Bridge between the full streaming lib and the I2PSocket API + * + */ +public class I2PSocketFull implements I2PSocket { + private Connection _connection; + private I2PSocket.SocketErrorListener _listener; + + public I2PSocketFull(Connection con) { + _connection = con; + } + + public void close() throws IOException { + if (_connection.getIsConnected()) { + _connection.disconnect(true); + } else { + throw new IOException("Not connected"); + } + } + + public InputStream getInputStream() { + return _connection.getInputStream(); + } + + public I2PSocketOptions getOptions() { + return _connection.getOptions(); + } + + public OutputStream getOutputStream() throws IOException { + return _connection.getOutputStream(); + } + + public Destination getPeerDestination() { + return _connection.getRemotePeer(); + } + + public long getReadTimeout() { + return _connection.getOptions().getReadTimeout(); + } + + public Destination getThisDestination() { + return _connection.getSession().getMyDestination(); + } + + public void setOptions(I2PSocketOptions options) { + if (options instanceof ConnectionOptions) + _connection.setOptions((ConnectionOptions)options); + else + _connection.setOptions(new ConnectionOptions(options)); + } + + public void setReadTimeout(long ms) { + _connection.getOptions().setReadTimeout(ms); + } + + public void setSocketErrorListener(I2PSocket.SocketErrorListener lsnr) { + _listener = lsnr; + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java new file mode 100644 index 0000000000..4b98544f9a --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/I2PSocketManagerFull.java @@ -0,0 +1,175 @@ +package net.i2p.client.streaming; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.UnsupportedEncodingException; +import java.net.ConnectException; +import java.net.NoRouteToHostException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Properties; +import java.util.Set; + +import net.i2p.I2PAppContext; +import net.i2p.I2PException; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; +import net.i2p.client.I2PSessionListener; +import net.i2p.data.Destination; +import net.i2p.util.Log; + + +/** + * Centralize the coordination and multiplexing of the local client's streaming. + * There should be one I2PSocketManager for each I2PSession, and if an application + * is sending and receiving data through the streaming library using an + * I2PSocketManager, it should not attempt to call I2PSession's setSessionListener + * or receive any messages with its .receiveMessage + * + */ +public class I2PSocketManagerFull implements I2PSocketManager { + private I2PAppContext _context; + private Log _log; + private I2PSession _session; + private I2PServerSocketFull _serverSocket; + private ConnectionOptions _defaultOptions; + private long _acceptTimeout; + private String _name; + private static int __managerId = 0; + private ConnectionManager _connectionManager; + + /** + * How long to wait for the client app to accept() before sending back CLOSE? + * This includes the time waiting in the queue. Currently set to 5 seconds. + */ + private static final long ACCEPT_TIMEOUT_DEFAULT = 5*1000; + + public I2PSocketManagerFull(I2PAppContext context, I2PSession session, Properties opts, String name) { + _context = context; + _session = session; + _log = _context.logManager().getLog(I2PSocketManagerFull.class); + _connectionManager = new ConnectionManager(_context, _session); + _name = name + " " + (++__managerId); + _acceptTimeout = ACCEPT_TIMEOUT_DEFAULT; + _defaultOptions = new ConnectionOptions(opts); + _serverSocket = new I2PServerSocketFull(this); + } + + public I2PSession getSession() { + return _session; + } + + public ConnectionManager getConnectionManager() { + return _connectionManager; + } + + public I2PSocket receiveSocket() throws I2PException { + if (_session.isClosed()) throw new I2PException("Session closed"); + Connection con = _connectionManager.getConnectionHandler().accept(-1); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("receiveSocket() called: " + con); + if (con != null) { + I2PSocketFull sock = new I2PSocketFull(con); + con.setSocket(sock); + return sock; + } else { + return null; + } + } + + /** + * Ping the specified peer, returning true if they replied to the ping within + * the timeout specified, false otherwise. This call blocks. + * + */ + public boolean ping(Destination peer, long timeoutMs) { + return _connectionManager.ping(peer, timeoutMs); + } + + /** + * How long should we wait for the client to .accept() a socket before + * sending back a NACK/Close? + * + * @param ms milliseconds to wait, maximum + */ + public void setAcceptTimeout(long ms) { _acceptTimeout = ms; } + public long getAcceptTimeout() { return _acceptTimeout; } + + public void setDefaultOptions(I2PSocketOptions options) { + _defaultOptions = new ConnectionOptions(options); + } + + public I2PSocketOptions getDefaultOptions() { + return _defaultOptions; + } + + public I2PServerSocket getServerSocket() { + _connectionManager.setAllowIncomingConnections(true); + return _serverSocket; + } + + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * @param options I2P socket options to be used for connecting + * + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer, I2PSocketOptions options) + throws I2PException, NoRouteToHostException { + if (_connectionManager.getSession().isClosed()) + throw new I2PException("Session is closed"); + Connection con = _connectionManager.connect(peer, new ConnectionOptions(options)); + I2PSocketFull socket = new I2PSocketFull(con); + con.setSocket(socket); + if (con.getConnectionError() != null) { + con.disconnect(false); + throw new NoRouteToHostException(con.getConnectionError()); + } + return socket; + } + + /** + * Create a new connected socket (block until the socket is created) + * + * @param peer Destination to connect to + * + * @throws NoRouteToHostException if the peer is not found or not reachable + * @throws I2PException if there is some other I2P-related problem + */ + public I2PSocket connect(Destination peer) throws I2PException, NoRouteToHostException { + return connect(peer, _defaultOptions); + } + + /** + * Destroy the socket manager, freeing all the associated resources. This + * method will block untill all the managed sockets are closed. + * + */ + public void destroySocketManager() { + _connectionManager.disconnectAllHard(); + _connectionManager.setAllowIncomingConnections(false); + // should we destroy the _session too? + } + + /** + * Retrieve a set of currently connected I2PSockets, either initiated locally or remotely. + * + */ + public Set listSockets() { + Set connections = _connectionManager.listConnections(); + Set rv = new HashSet(connections.size()); + for (Iterator iter = connections.iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + if (con.getSocket() != null) + rv.add(con.getSocket()); + } + return rv; + } + + public String getName() { return _name; } + public void setName(String name) { _name = name; } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java new file mode 100644 index 0000000000..ca1a416c96 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageHandler.java @@ -0,0 +1,78 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionListener; +import net.i2p.client.I2PSessionException; +import net.i2p.util.Log; + +/** + * + */ +public class MessageHandler implements I2PSessionListener { + private ConnectionManager _manager; + private I2PAppContext _context; + private Log _log; + + public MessageHandler(I2PAppContext ctx, ConnectionManager mgr) { + _manager = mgr; + _context = ctx; + _log = ctx.logManager().getLog(MessageHandler.class); + } + + /** Instruct the client that the given session has received a message with + * size # of bytes. + * @param session session to notify + * @param msgId message number available + * @param size size of the message + */ + public void messageAvailable(I2PSession session, int msgId, long size) { + byte data[] = null; + try { + data = session.receiveMessage(msgId); + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error receiving the message", ise); + return; + } + Packet packet = new Packet(); + try { + packet.readPacket(data, 0, data.length); + _manager.getPacketHandler().receivePacket(packet); + } catch (IllegalArgumentException iae) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Received an invalid packet", iae); + } + } + + /** Instruct the client that the session specified seems to be under attack + * and that the client may wish to move its destination to another router. + * @param session session to report abuse to + * @param severity how bad the abuse is + */ + public void reportAbuse(I2PSession session, int severity) { + if (_log.shouldLog(Log.ERROR)) + _log.error("Abuse reported with severity " + severity); + _manager.disconnectAllHard(); + } + + /** + * Notify the client that the session has been terminated + * + */ + public void disconnected(I2PSession session) { + if (_log.shouldLog(Log.ERROR)) + _log.error("I2PSession disconnected"); + _manager.disconnectAllHard(); + } + + /** + * Notify the client that some error occurred + * + */ + public void errorOccurred(I2PSession session, String message, Throwable error) { + if (_log.shouldLog(Log.ERROR)) + _log.error("error occurred: " + message, error); + _manager.disconnectAllHard(); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java index 1e952f20be..53d8710261 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageInputStream.java @@ -11,7 +11,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import net.i2p.I2PAppContext; import net.i2p.data.ByteArray; +import net.i2p.util.Log; /** * Stream that can be given messages out of order @@ -19,6 +21,8 @@ import net.i2p.data.ByteArray; * */ public class MessageInputStream extends InputStream { + private I2PAppContext _context; + private Log _log; /** * List of ByteArray objects of data ready to be read, * with the first ByteArray at index 0, and the next @@ -30,6 +34,8 @@ public class MessageInputStream extends InputStream { private int _readyDataBlockIndex; /** highest message ID used in the readyDataBlocks */ private long _highestReadyBlockId; + /** highest overall message ID */ + private long _highestBlockId; /** * Message ID (Long) to ByteArray for blocks received * out of order when there are lower IDs not yet @@ -44,13 +50,17 @@ public class MessageInputStream extends InputStream { /** if we don't want any more data, ignore the data */ private boolean _locallyClosed; private int _readTimeout; + private IOException _streamError; private Object _dataLock; - public MessageInputStream() { + public MessageInputStream(I2PAppContext ctx) { + _context = ctx; + _log = ctx.logManager().getLog(MessageInputStream.class); _readyDataBlocks = new ArrayList(4); _readyDataBlockIndex = 0; _highestReadyBlockId = -1; + _highestBlockId = -1; _readTimeout = -1; _notYetReadyBlocks = new HashMap(4); _dataLock = new Object(); @@ -65,6 +75,42 @@ public class MessageInputStream extends InputStream { } } + public long getHighestBlockId() { + synchronized (_dataLock) { + return _highestBlockId; + } + } + + /** + * Retrieve the message IDs that are holes in our sequence - ones + * past the highest ready ID and below the highest received message + * ID. This may return null if there are no such IDs. + * + */ + public long[] getNacks() { + List ids = null; + synchronized (_dataLock) { + for (long i = _highestReadyBlockId + 1; i < _highestBlockId; i++) { + Long l = new Long(i); + if (_notYetReadyBlocks.containsKey(l)) { + // ACK + } else { + if (ids != null) + ids = new ArrayList(4); + ids.add(l); + } + } + } + if (ids != null) { + long rv[] = new long[ids.size()]; + for (int i = 0; i < rv.length; i++) + rv[i] = ((Long)ids.get(i)).longValue(); + return rv; + } else { + return null; + } + } + /** * Ascending list of block IDs greater than the highest * ready block ID, or null if there aren't any. @@ -101,56 +147,110 @@ public class MessageInputStream extends InputStream { public int getReadTimeout() { return _readTimeout; } public void setReadTimeout(int timeout) { _readTimeout = timeout; } + public void closeReceived() { + synchronized (_dataLock) { + _closeReceived = true; + } + } + /** * A new message has arrived - toss it on the appropriate queue (moving - * previously pending messages to the ready queue if it fills the gap, etc) + * previously pending messages to the ready queue if it fills the gap, etc). * + * @return true if this is a new packet, false if it is a dup */ - public void messageReceived(long messageId, byte payload[]) { + public boolean messageReceived(long messageId, byte payload[]) { synchronized (_dataLock) { - if (messageId <= _highestReadyBlockId) return; // already received + if (_log.shouldLog(Log.DEBUG)) + _log.debug("received " + messageId + " with " + payload.length); + if (messageId <= _highestReadyBlockId) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("ignoring dup message " + messageId); + return false; // already received + } + if (messageId > _highestBlockId) + _highestBlockId = messageId; + if (_highestReadyBlockId + 1 == messageId) { - if (!_locallyClosed) + if (!_locallyClosed && payload.length > 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("accepting bytes as ready: " + payload.length); _readyDataBlocks.add(new ByteArray(payload)); + } _highestReadyBlockId = messageId; // now pull in any previously pending blocks while (_notYetReadyBlocks.containsKey(new Long(_highestReadyBlockId + 1))) { - _readyDataBlocks.add(_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1))); + ByteArray ba = (ByteArray)_notYetReadyBlocks.get(new Long(_highestReadyBlockId + 1)); + if ( (ba != null) && (ba.getData() != null) && (ba.getData().length > 0) ) { + _readyDataBlocks.add(ba); + } + + if (_log.shouldLog(Log.DEBUG)) + _log.debug("making ready the block " + _highestReadyBlockId); _highestReadyBlockId++; } _dataLock.notifyAll(); } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("message is out of order: " + messageId); if (_locallyClosed) // dont need the payload, just the msgId in order _notYetReadyBlocks.put(new Long(messageId), new ByteArray(null)); else _notYetReadyBlocks.put(new Long(messageId), new ByteArray(payload)); } } + return true; } public int read() throws IOException { if (_locallyClosed) throw new IOException("Already locally closed"); + throwAnyError(); + long expiration = -1; + if (_readTimeout > 0) + expiration = _readTimeout + System.currentTimeMillis(); synchronized (_dataLock) { - if (_readyDataBlocks.size() <= 0) { + while (_readyDataBlocks.size() <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() with readyBlocks.size = " + _readyDataBlocks.size() + " on " + toString()); + if ( (_notYetReadyBlocks.size() <= 0) && (_closeReceived) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() got EOF: " + toString()); return -1; } else { if (_readTimeout < 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() with no timeout: " + toString()); try { _dataLock.wait(); } catch (InterruptedException ie) { } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() with no timeout complete: " + toString()); + throwAnyError(); } else if (_readTimeout > 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() with timeout: " + _readTimeout + ": " + toString()); try { _dataLock.wait(_readTimeout); } catch (InterruptedException ie) { } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() with timeout complete: " + _readTimeout + ": " + toString()); + throwAnyError(); } else { // readTimeout == 0 // noop, don't block + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() with nonblocking setup: " + toString()); } if (_readyDataBlocks.size() <= 0) { - throw new InterruptedIOException("Timeout reading"); + if ( (_readTimeout > 0) && (expiration > System.currentTimeMillis()) ) + throw new InterruptedIOException("Timeout reading (timeout=" + _readTimeout + ")"); } } } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("read() readyBlocks = " + _readyDataBlocks.size() + ": " + toString()); + // either was already ready, or we wait()ed and it arrived ByteArray cur = (ByteArray)_readyDataBlocks.get(0); - byte rv = cur.getData()[_readyDataBlockIndex++]; + byte rv = cur.getData()[_readyDataBlockIndex]; + _readyDataBlockIndex++; if (cur.getData().length <= _readyDataBlockIndex) { _readyDataBlockIndex = 0; _readyDataBlocks.remove(0); @@ -161,6 +261,7 @@ public class MessageInputStream extends InputStream { public int available() throws IOException { if (_locallyClosed) throw new IOException("Already closed, you wanker"); + throwAnyError(); synchronized (_dataLock) { if (_readyDataBlocks.size() <= 0) return 0; @@ -213,4 +314,23 @@ public class MessageInputStream extends InputStream { _locallyClosed = true; } } + + /** + * Stream b0rked, die with the given error + * + */ + void streamErrorOccurred(IOException ioe) { + _streamError = ioe; + synchronized (_dataLock) { + _dataLock.notifyAll(); + } + } + + private void throwAnyError() throws IOException { + if (_streamError != null) { + IOException ioe = _streamError; + _streamError = null; + throw ioe; + } + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java index 2e1e985475..dbc5855612 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/MessageOutputStream.java @@ -3,24 +3,33 @@ package net.i2p.client.streaming; import java.io.IOException; import java.io.OutputStream; +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + /** * */ public class MessageOutputStream extends OutputStream { + private I2PAppContext _context; + private Log _log; private byte _buf[]; private int _valid; private Object _dataLock; private DataReceiver _dataReceiver; private IOException _streamError; + private boolean _closed; - public MessageOutputStream(DataReceiver receiver) { - this(receiver, 64*1024); + public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver) { + this(ctx, receiver, 64*1024); } - public MessageOutputStream(DataReceiver receiver, int bufSize) { + public MessageOutputStream(I2PAppContext ctx, DataReceiver receiver, int bufSize) { super(); + _context = ctx; + _log = ctx.logManager().getLog(MessageOutputStream.class); _buf = new byte[bufSize]; _dataReceiver = receiver; _dataLock = new Object(); + _closed = false; } public void write(byte b[]) throws IOException { @@ -34,6 +43,7 @@ public class MessageOutputStream extends OutputStream { if (_valid + remaining < _buf.length) { // simply buffer the data, no flush System.arraycopy(b, off, _buf, _valid, remaining); + _valid += remaining; remaining = 0; } else { // buffer whatever we can fit then flush, @@ -43,7 +53,14 @@ public class MessageOutputStream extends OutputStream { System.arraycopy(b, off, _buf, _valid, toWrite); remaining -= toWrite; _valid = _buf.length; + if (_log.shouldLog(Log.DEBUG)) + _log.debug("write(b[], " + off + ", " + len + "): valid = " + _valid); + // this blocks until the packet is ack window is open. it + // also throws InterruptedIOException if the write timeout + // expires _dataReceiver.writeData(_buf, 0, _valid); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("write(b[], " + off + ", " + len + "): valid = " + _valid + " complete"); _valid = 0; throwAnyError(); } @@ -59,12 +76,26 @@ public class MessageOutputStream extends OutputStream { public void flush() throws IOException { synchronized (_dataLock) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("flush(): valid = " + _valid); + // this blocks until the packet is ack window is open. it + // also throws InterruptedIOException if the write timeout + // expires _dataReceiver.writeData(_buf, 0, _valid); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("flush(): valid = " + _valid + " complete"); _valid = 0; } throwAnyError(); } + public void close() throws IOException { + _closed = true; + flush(); + } + + public boolean getClosed() { return _closed; } + private void throwAnyError() throws IOException { if (_streamError != null) { IOException ioe = _streamError; @@ -82,7 +113,7 @@ public class MessageOutputStream extends OutputStream { * peer * */ - void flushAvailable(DataReceiver target) { + void flushAvailable(DataReceiver target) throws IOException { synchronized (_dataLock) { target.writeData(_buf, 0, _valid); _valid = 0; @@ -90,6 +121,6 @@ public class MessageOutputStream extends OutputStream { } public interface DataReceiver { - public void writeData(byte buf[], int off, int size); + public void writeData(byte buf[], int off, int size) throws IOException; } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java index 5f598b7ebb..8a89b82bb4 100644 --- a/apps/streaming/java/src/net/i2p/client/streaming/Packet.java +++ b/apps/streaming/java/src/net/i2p/client/streaming/Packet.java @@ -2,6 +2,7 @@ package net.i2p.client.streaming; import java.util.Arrays; import net.i2p.I2PAppContext; +import net.i2p.data.Base64; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.data.Signature; @@ -42,6 +43,9 @@ import net.i2p.data.SigningPrivateKey; * to sign the entire header and payload with the space in the options * for the signature being set to all zeroes.

* + *

If the sequenceNum is 0 and the SYN is not set, this is a plain ACK + * packet that should not be ACKed

+ * */ public class Packet { private byte _sendStreamId[]; @@ -64,11 +68,11 @@ public class Packet { * synchronize packet) * */ - public static final byte RECEIVE_STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 }; + public static final byte STREAM_ID_UNKNOWN[] = new byte[] { 0x00, 0x00, 0x00, 0x00 }; /** * This packet is creating a new socket connection (if the receiveStreamId - * is RECEIVE_STREAM_ID_UNKNOWN) or it is acknowledging a request to + * is STREAM_ID_UNKNOWN) or it is acknowledging a request to * create a connection and in turn is accepting the socket. * */ @@ -122,19 +126,44 @@ public class Packet { * */ public static final int FLAG_PROFILE_INTERACTIVE = (1 << 8); + /** + * If set, this packet is a ping (if sendStreamId is set) or a + * ping reply (if receiveStreamId is set). + */ + public static final int FLAG_ECHO = (1 << 9); + + public static final int DEFAULT_MAX_SIZE = 32*1024; /** what stream is this packet a part of? */ - public byte[] getSendStreamId() { return _sendStreamId; } - public void setSendStreamId(byte[] id) { _sendStreamId = id; } + public byte[] getSendStreamId() { + if ( (_sendStreamId == null) || (DataHelper.eq(_sendStreamId, STREAM_ID_UNKNOWN)) ) + return null; + else + return _sendStreamId; + } + public void setSendStreamId(byte[] id) { + _sendStreamId = id; + if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) ) + _sendStreamId = null; + } /** * what is the stream replies should be sent on? if the * connection is still being built, this should be - * {@see #RECEIVE_STREAM_ID_UNKNOWN}. + * null. * */ - public byte[] getReceiveStreamId() { return _receiveStreamId; } - public void setReceiveStreamId(byte[] id) { _receiveStreamId = id; } + public byte[] getReceiveStreamId() { + if ( (_receiveStreamId == null) || (DataHelper.eq(_receiveStreamId, STREAM_ID_UNKNOWN)) ) + return null; + else + return _receiveStreamId; + } + public void setReceiveStreamId(byte[] id) { + _receiveStreamId = id; + if ( (id != null) && (DataHelper.eq(id, STREAM_ID_UNKNOWN)) ) + _receiveStreamId = null; + } /** 0-indexed sequence number for this Packet in the sendStream */ public long getSequenceNum() { return _sequenceNum; } @@ -173,14 +202,27 @@ public class Packet { /** is a particular flag set on this packet? */ public boolean isFlagSet(int flag) { return 0 != (_flags & flag); } public void setFlag(int flag) { _flags |= flag; } + public void setFlag(int flag, boolean set) { + if (set) + _flags |= flag; + else + _flags &= ~flag; + } /** the signature on the packet (only included if the flag for it is set) */ public Signature getOptionalSignature() { return _optionSignature; } - public void setOptionalSignature(Signature sig) { _optionSignature = sig; } + public void setOptionalSignature(Signature sig) { + setFlag(FLAG_SIGNATURE_INCLUDED, sig != null); + _optionSignature = sig; + } /** the sender of the packet (only included if the flag for it is set) */ public Destination getOptionalFrom() { return _optionFrom; } - public void setOptionalFrom(Destination from) { _optionFrom = from; } + public void setOptionalFrom(Destination from) { + setFlag(FLAG_FROM_INCLUDED, from != null); + if (from == null) throw new RuntimeException("from is null!?"); + _optionFrom = from; + } /** * How many milliseconds the sender of this packet wants the recipient @@ -188,14 +230,20 @@ public class Packet { * set) */ public int getOptionalDelay() { return _optionDelay; } - public void setOptionalDelay(int delayMs) { _optionDelay = delayMs; } + public void setOptionalDelay(int delayMs) { + setFlag(FLAG_DELAY_REQUESTED, delayMs > 0); + _optionDelay = delayMs; + } - /** + /** * What is the largest payload the sender of this packet wants to receive? * */ public int getOptionalMaxSize() { return _optionMaxSize; } - public void setOptionalMaxSize(int numBytes) { _optionMaxSize = numBytes; } + public void setOptionalMaxSize(int numBytes) { + setFlag(FLAG_MAX_PACKET_SIZE_INCLUDED, numBytes > 0); + _optionMaxSize = numBytes; + } /** * Write the packet to the buffer (starting at the offset) and return @@ -212,13 +260,19 @@ public class Packet { */ private int writePacket(byte buffer[], int offset, boolean includeSig) throws IllegalStateException { int cur = offset; - System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length); - cur += _sendStreamId.length; - System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length); - cur += _receiveStreamId.length; - DataHelper.toLong(buffer, cur, 4, _sequenceNum); + if (_sendStreamId != null) + System.arraycopy(_sendStreamId, 0, buffer, cur, _sendStreamId.length); + else + System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length); cur += 4; - DataHelper.toLong(buffer, cur, 4, _ackThrough); + if (_receiveStreamId != null) + System.arraycopy(_receiveStreamId, 0, buffer, cur, _receiveStreamId.length); + else + System.arraycopy(STREAM_ID_UNKNOWN, 0, buffer, cur, STREAM_ID_UNKNOWN.length); + cur += 4; + DataHelper.toLong(buffer, cur, 4, _sequenceNum > 0 ? _sequenceNum : 0); + cur += 4; + DataHelper.toLong(buffer, cur, 4, _ackThrough > 0 ? _ackThrough : 0); cur += 4; if (_nacks != null) { DataHelper.toLong(buffer, cur, 1, _nacks.length); @@ -231,7 +285,7 @@ public class Packet { DataHelper.toLong(buffer, cur, 1, 0); cur++; } - DataHelper.toLong(buffer, cur, 1, _resendDelay); + DataHelper.toLong(buffer, cur, 1, _resendDelay > 0 ? _resendDelay : 0); cur++; DataHelper.toLong(buffer, cur, 2, _flags); cur += 2; @@ -250,21 +304,21 @@ public class Packet { cur += 2; if (isFlagSet(FLAG_DELAY_REQUESTED)) { - DataHelper.toLong(buffer, cur, 1, _optionDelay); + DataHelper.toLong(buffer, cur, 1, _optionDelay > 0 ? _optionDelay : 0); cur++; } if (isFlagSet(FLAG_FROM_INCLUDED)) { cur += _optionFrom.writeBytes(buffer, cur); } if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) { - DataHelper.toLong(buffer, cur, 2, _optionMaxSize); + DataHelper.toLong(buffer, cur, 2, _optionMaxSize > 0 ? _optionMaxSize : DEFAULT_MAX_SIZE); cur += 2; } if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) { if (includeSig) System.arraycopy(_optionSignature.getData(), 0, buffer, cur, Signature.SIGNATURE_BYTES); else // we're signing (or validating) - Arrays.fill(buffer, cur, Signature.SIGNATURE_BYTES, (byte)0x0); + Arrays.fill(buffer, cur, cur+Signature.SIGNATURE_BYTES, (byte)0x0); cur += Signature.SIGNATURE_BYTES; } @@ -272,10 +326,46 @@ public class Packet { System.arraycopy(_payload, 0, buffer, cur, _payload.length); cur += _payload.length; } - + return cur - offset; } + + /** + * how large would this packet be if we wrote it + */ + public int writtenSize() throws IllegalStateException { + int size = 0; + size += _sendStreamId.length; + size += _receiveStreamId.length; + size += 4; // sequenceNum + size += 4; // ackThrough + if (_nacks != null) { + size++; // nacks length + size += 4 * _nacks.length; + } else { + size++; // nacks length + } + size++; // resendDelay + size += 2; // flags + + if (isFlagSet(FLAG_DELAY_REQUESTED)) + size += 1; + if (isFlagSet(FLAG_FROM_INCLUDED)) + size += _optionFrom.size(); + if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) + size += 2; + if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) + size += Signature.SIGNATURE_BYTES; + + size += 2; // option size + + if (_payload != null) { + size += _payload.length; + } + + return size; + } /** * Read the packet from the buffer (starting at the offset) and return * the number of bytes read. @@ -337,10 +427,10 @@ public class Packet { cur += 2; } if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) { - Signature sig = new Signature(); + _optionSignature = new Signature(); byte buf[] = new byte[Signature.SIGNATURE_BYTES]; System.arraycopy(buffer, cur, buf, 0, Signature.SIGNATURE_BYTES); - sig.setData(buf); + _optionSignature.setData(buf); cur += Signature.SIGNATURE_BYTES; } } @@ -355,6 +445,8 @@ public class Packet { if (!isFlagSet(FLAG_SIGNATURE_INCLUDED)) return false; if (_optionSignature == null) return false; + if (buffer == null) + buffer = new byte[writtenSize()]; int size = writePacket(buffer, 0, false); return ctx.dsa().verifySignature(_optionSignature, buffer, 0, size, from.getSigningPublicKey()); } @@ -386,4 +478,33 @@ public class Packet { System.arraycopy(_optionSignature.getData(), 0, buffer, signatureOffset, Signature.SIGNATURE_BYTES); return size; } + + public String toString() { + return "Packet " + _sequenceNum + " on " + toId(_sendStreamId) + + "<-->" + toId(_receiveStreamId) + ": " + toFlagString() + + " ACK through " + _ackThrough + + " size: " + (_payload != null ? _payload.length : 0); + } + + private static final String toId(byte id[]) { + if (id == null) + return Base64.encode(STREAM_ID_UNKNOWN); + else + return Base64.encode(id); + } + + private final String toFlagString() { + StringBuffer buf = new StringBuffer(32); + if (isFlagSet(FLAG_CLOSE)) buf.append(" CLOSE"); + if (isFlagSet(FLAG_DELAY_REQUESTED)) buf.append(" DELAY"); + if (isFlagSet(FLAG_ECHO)) buf.append(" ECHO"); + if (isFlagSet(FLAG_FROM_INCLUDED)) buf.append(" FROM"); + if (isFlagSet(FLAG_MAX_PACKET_SIZE_INCLUDED)) buf.append(" MAXSIZE"); + if (isFlagSet(FLAG_PROFILE_INTERACTIVE)) buf.append(" INTERACTIVE"); + if (isFlagSet(FLAG_RESET)) buf.append(" RESET"); + if (isFlagSet(FLAG_SIGNATURE_INCLUDED)) buf.append(" SIG"); + if (isFlagSet(FLAG_SIGNATURE_REQUESTED)) buf.append(" SIGREQ"); + if (isFlagSet(FLAG_SYNCHRONIZE)) buf.append(" SYN"); + return buf.toString(); + } } diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java new file mode 100644 index 0000000000..6d6954c1da --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketHandler.java @@ -0,0 +1,155 @@ +package net.i2p.client.streaming; + +import java.util.Date; +import java.util.Iterator; +import java.util.Set; + +import net.i2p.I2PAppContext; +import net.i2p.data.Base64; +import net.i2p.data.DataHelper; +import net.i2p.util.Log; + +/** + * receive a packet and dispatch it correctly to the connection specified, + * the server socket, or queue a reply RST packet. + * + */ +public class PacketHandler { + private ConnectionManager _manager; + private I2PAppContext _context; + private Log _log; + + public PacketHandler(I2PAppContext ctx, ConnectionManager mgr) { + _manager = mgr; + _context = ctx; + _log = ctx.logManager().getLog(PacketHandler.class); + } + + void receivePacket(Packet packet) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("packet received: " + packet); + + byte sendId[] = packet.getSendStreamId(); + if (!isNonZero(sendId)) + sendId = null; + + Connection con = (sendId != null ? _manager.getConnectionByInboundId(sendId) : null); + if (con != null) { + receiveKnownCon(con, packet); + System.out.println(new Date() + ": Receive packet " + packet + " on con " + con); + } else { + receiveUnknownCon(packet, sendId); + System.out.println(new Date() + ": Receive packet " + packet + " on an unknown con"); + } + } + + private void receiveKnownCon(Connection con, Packet packet) { + // the packet is pointed at a stream ID we're receiving on + if (isValidMatch(con.getSendStreamId(), packet.getReceiveStreamId())) { + // the packet's receive stream ID also matches what we expect + if (_log.shouldLog(Log.DEBUG)) + _log.debug("receive valid: " + packet); + con.getPacketHandler().receivePacket(packet, con); + } else { + if (packet.isFlagSet(Packet.FLAG_RESET)) { + // refused + if (_log.shouldLog(Log.DEBUG)) + _log.debug("receive reset: " + packet); + con.getPacketHandler().receivePacket(packet, con); + } else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + if ( (con.getSendStreamId() == null) || + (DataHelper.eq(con.getSendStreamId(), packet.getReceiveStreamId())) ) { + // con fully established, w00t + con.setSendStreamId(packet.getReceiveStreamId()); + con.getPacketHandler().receivePacket(packet, con); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Receive a syn packet with the wrong IDs: " + packet); + } + } else { + // someone is sending us a packet on the wrong stream + if (_log.shouldLog(Log.WARN)) + _log.warn("Received a packet on the wrong stream: " + packet); + } + } + } + + private void receiveUnknownCon(Packet packet, byte sendId[]) { + if (packet.isFlagSet(Packet.FLAG_ECHO)) { + if (packet.getSendStreamId() != null) { + receivePing(packet); + } else if (packet.getReceiveStreamId() != null) { + receivePong(packet); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Echo packet received with no stream IDs: " + packet); + } + } else if (packet.isFlagSet(Packet.FLAG_SYNCHRONIZE)) { + if (sendId == null) { + // this is the initial SYN to establish a connection + _manager.getConnectionHandler().receiveNewSyn(packet); + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("Syn packet reply on a stream we don't know about: " + packet); + } + } else { + if (_log.shouldLog(Log.WARN)) { + _log.warn("Packet received on an unknown stream (and not a SYN): " + packet); + StringBuffer buf = new StringBuffer(128); + Set cons = _manager.listConnections(); + for (Iterator iter = cons.iterator(); iter.hasNext(); ) { + Connection con = (Connection)iter.next(); + buf.append(Base64.encode(con.getReceiveStreamId())).append(" "); + } + _log.warn("Other streams: " + buf.toString()); + } + } + } + + private void receivePing(Packet packet) { + boolean ok = packet.verifySignature(_context, packet.getOptionalFrom(), null); + if (!ok) { + if (_log.shouldLog(Log.WARN)) { + if (packet.getOptionalFrom() == null) + _log.warn("Ping with no from (flagged? " + packet.isFlagSet(Packet.FLAG_FROM_INCLUDED) + ")"); + else if (packet.getOptionalSignature() == null) + _log.warn("Ping with no signature (flagged? " + packet.isFlagSet(Packet.FLAG_SIGNATURE_INCLUDED) + ")"); + else + _log.warn("Forged ping, discard (from=" + packet.getOptionalFrom().calculateHash().toBase64() + + " sig=" + packet.getOptionalSignature().toBase64() + ")"); + } + } else { + PacketLocal pong = new PacketLocal(_context, packet.getOptionalFrom()); + pong.setFlag(Packet.FLAG_ECHO, true); + pong.setFlag(Packet.FLAG_SIGNATURE_INCLUDED, false); + pong.setReceiveStreamId(packet.getSendStreamId()); + _manager.getPacketQueue().enqueue(pong); + } + } + + private void receivePong(Packet packet) { + _manager.receivePong(packet.getReceiveStreamId()); + } + + private static final boolean isValidMatch(byte conStreamId[], byte packetStreamId[]) { + if ( (conStreamId == null) || (packetStreamId == null) || + (conStreamId.length != packetStreamId.length) ) + return false; + + boolean nonZeroFound = false; + for (int i = 0; i < conStreamId.length; i++) { + if (conStreamId[i] != packetStreamId[i]) return false; + if (conStreamId[i] != 0x0) nonZeroFound = true; + } + return nonZeroFound; + } + + private static final boolean isNonZero(byte[] b) { + boolean nonZeroFound = false; + for (int i = 0; b != null && i < b.length; i++) { + if (b[i] != 0x0) + nonZeroFound = true; + } + return nonZeroFound; + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java new file mode 100644 index 0000000000..a2bb107d97 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketLocal.java @@ -0,0 +1,63 @@ +package net.i2p.client.streaming; + +import java.util.Set; + +import net.i2p.I2PAppContext; +import net.i2p.data.Destination; +import net.i2p.data.SessionKey; + +/** + * coordinate local attributes about a packet - send time, ack time, number of + * retries, etc. + */ +public class PacketLocal extends Packet { + private I2PAppContext _context; + private Destination _to; + private SessionKey _keyUsed; + private Set _tagsSent; + private long _createdOn; + private int _numSends; + private long _lastSend; + private long _ackOn; + + public PacketLocal(I2PAppContext ctx, Destination to) { + _context = ctx; + _createdOn = ctx.clock().now(); + _to = to; + _lastSend = -1; + } + + public Destination getTo() { return _to; } + public void setTo(Destination to) { _to = to; } + + public SessionKey getKeyUsed() { return _keyUsed; } + public void setKeyUsed(SessionKey key) { _keyUsed = key; } + + public Set getTagsSent() { return _tagsSent; } + public void setTagsSent(Set tags) { _tagsSent = tags; } + + public boolean shouldSign() { + return isFlagSet(FLAG_SIGNATURE_INCLUDED) || + isFlagSet(FLAG_SYNCHRONIZE) || + isFlagSet(FLAG_CLOSE); + } + + public long getCreatedOn() { return _createdOn; } + public void incrementSends() { + _numSends++; + _lastSend = _context.clock().now(); + } + public void ackReceived() { + if (_ackOn <= 0) + _ackOn = _context.clock().now(); + } + /** how long after packet creation was it acked? */ + public int getAckTime() { + if (_ackOn <= 0) + return -1; + else + return (int)(_ackOn - _createdOn); + } + public int getNumSends() { return _numSends; } + public long getLastSend() { return _lastSend; } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java new file mode 100644 index 0000000000..0681fc3744 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/PacketQueue.java @@ -0,0 +1,56 @@ +package net.i2p.client.streaming; + +import java.util.Set; +import java.util.HashSet; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; +import net.i2p.data.SessionKey; +import net.i2p.util.Log; + +/** + * + */ +class PacketQueue { + private I2PAppContext _context; + private Log _log; + private I2PSession _session; + private byte _buf[]; + + public PacketQueue(I2PAppContext context, I2PSession session) { + _context = context; + _session = session; + _buf = new byte[36*1024]; + _log = context.logManager().getLog(PacketQueue.class); + } + + /** + * Add a new packet to be sent out ASAP + */ + public void enqueue(PacketLocal packet) { + int size = 0; + if (packet.shouldSign()) + size = packet.writeSignedPacket(_buf, 0, _context, _session.getPrivateKey()); + else + size = packet.writePacket(_buf, 0); + + SessionKey keyUsed = new SessionKey(); + Set tagsSent = new HashSet(); + try { + // this should not block! + boolean sent = _session.sendMessage(packet.getTo(), _buf, 0, size, keyUsed, tagsSent); + if (!sent) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Send failed for " + packet); + } else { + packet.setKeyUsed(keyUsed); + packet.setTagsSent(tagsSent); + packet.incrementSends(); + } + } catch (I2PSessionException ise) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Unable to send the packet " + packet, ise); + } + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java new file mode 100644 index 0000000000..c52f6c4f53 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerChooser.java @@ -0,0 +1,62 @@ +package net.i2p.client.streaming; + +import java.util.ArrayList; +import java.util.List; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + * Examine a connection's state and pick the right scheduler for it. + * + */ +class SchedulerChooser { + private I2PAppContext _context; + private Log _log; + private TaskScheduler _nullScheduler; + /** list of TaskScheduler objects */ + private List _schedulers; + + public SchedulerChooser(I2PAppContext context) { + _context = context; + _log = context.logManager().getLog(SchedulerChooser.class); + _schedulers = createSchedulers(); + _nullScheduler = new NullScheduler(); + } + + public TaskScheduler getScheduler(Connection con) { + for (int i = 0; i < _schedulers.size(); i++) { + TaskScheduler scheduler = (TaskScheduler)_schedulers.get(i); + if (scheduler.accept(con)) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Scheduling for " + con + " with " + scheduler.getClass().getName()); + return scheduler; + } + } + return _nullScheduler; + } + + private List createSchedulers() { + List rv = new ArrayList(8); + rv.add(new SchedulerPreconnect(_context)); + rv.add(new SchedulerConnecting(_context)); + rv.add(new SchedulerReceived(_context)); + rv.add(new SchedulerConnectedBulk(_context)); + rv.add(new SchedulerClosing(_context)); + rv.add(new SchedulerClosed(_context)); + rv.add(new SchedulerDead(_context)); + return rv; + } + private class NullScheduler implements TaskScheduler { + private Log _log; + public NullScheduler() { + _log = _context.logManager().getLog(NullScheduler.class); + } + + public void eventOccurred(Connection con) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Event occurred on " + con, new Exception("source")); + } + public boolean accept(Connection con) { return true; } + }; +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java new file mode 100644 index 0000000000..1d6bf58309 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosed.java @@ -0,0 +1,51 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

Scheduler used for after both sides have had their close packets + * ACKed, but the final timeout hasn't passed.

+ * + *

Entry conditions:

+ * + *

Events:

+ * + *

Next states:

+ *
  • {@link SchedulerDead dead} - after the final timeout passes
  • + * + * + * + */ +class SchedulerClosed extends SchedulerImpl { + private Log _log; + public SchedulerClosed(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerClosed.class); + } + + static final long CLOSE_TIMEOUT = 30*1000; + + public boolean accept(Connection con) { + boolean ok = (con != null) && + (con.getCloseSentOn() > 0) && + (con.getCloseReceivedOn() > 0) && + (con.getUnackedPacketsReceived() <= 0) && + (con.getUnackedPacketsSent() <= 0) && + (!con.getResetReceived()) && + (con.getCloseSentOn() + CLOSE_TIMEOUT > _context.clock().now()); + return ok; + } + + public void eventOccurred(Connection con) { + long timeLeft = con.getCloseSentOn() + CLOSE_TIMEOUT - _context.clock().now(); + reschedule(timeLeft, con); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java new file mode 100644 index 0000000000..ae8a71e887 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerClosing.java @@ -0,0 +1,53 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

    Scheduler used for after both SYNs have been ACKed and both sides + * have closed the stream, but either we haven't ACKed their close or + * they haven't ACKed ours.

    + * + *

    Entry conditions:

    + * + *

    Events:

    + * + *

    Next states:

    + *
  • {@link SchedulerClosed closed} - after both sending and receiving ACKs on the CLOSE
  • + *
  • {@link SchedulerDead dead} - after sending or receiving a RESET
  • + * + * + */ +class SchedulerClosing extends SchedulerImpl { + private Log _log; + public SchedulerClosing(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerClosing.class); + } + + public boolean accept(Connection con) { + boolean ok = (con != null) && + (con.getCloseSentOn() > 0) && + (con.getCloseReceivedOn() > 0) && + ( (con.getUnackedPacketsReceived() > 0) || (con.getUnackedPacketsSent() > 0) ); + return ok; + } + + public void eventOccurred(Connection con) { + if (con.getNextSendTime() <= 0) + con.setNextSendTime(_context.clock().now() + con.getOptions().getSendAckDelay()); + long remaining = con.getNextSendTime() - _context.clock().now(); + if (remaining <= 0) + con.sendAvailable(); + else + reschedule(remaining, con); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java new file mode 100644 index 0000000000..e5229c7091 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnectedBulk.java @@ -0,0 +1,66 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

    Scheduler used for after our SYN has been sent and ACKed but one + * (or more) sides haven't closed the stream yet. In addition, the + * stream must be using the BULK profile, rather than the INTERACTIVE + * profile.

    + * + *

    Entry conditions:

    + * + *

    Events:

    + * + *

    Next states:

    + *
  • {@link SchedulerClosing closing} - after both sending and receiving a CLOSE
  • + *
  • {@link SchedulerClosed closed} - after both sending and receiving ACKs on the CLOSE
  • + *
  • {@link SchedulerDead dead} - after sending or receiving a RESET
  • + * + * + */ +class SchedulerConnectedBulk extends SchedulerImpl { + private Log _log; + public SchedulerConnectedBulk(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerConnectedBulk.class); + } + + public boolean accept(Connection con) { + boolean ok = (con != null) && + (con.getAckedPackets() > 0) && + (con.getOptions().getProfile() == ConnectionOptions.PROFILE_BULK) && + (!con.getResetReceived()) && + ( (con.getCloseSentOn() <= 0) || (con.getCloseReceivedOn() <= 0) ); + if (!ok) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("con: " + con + " closeSentOn: " + con.getCloseSentOn() + + " closeReceivedOn: " + con.getCloseReceivedOn()); + } + return ok; + } + + public void eventOccurred(Connection con) { + if (con.getNextSendTime() <= 0) + return; + + long timeTillSend = con.getNextSendTime() - _context.clock().now(); + + if (timeTillSend <= 0) { + con.setNextSendTime(-1); + con.sendAvailable(); + } else { + reschedule(timeTillSend, con); + } + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java new file mode 100644 index 0000000000..98fd92cc91 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerConnecting.java @@ -0,0 +1,74 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

    Scheduler used once we've sent our SYN but it hasn't been ACKed yet. + * This connection may or may not be locally created.

    + * + *

    Entry conditions:

    + * + *

    Events:

    + * + *

    Next states:

    + *
  • {@link SchedulerConnected connected} - after receiving an ACK
  • + *
  • {@link SchedulerClosing closing} - after both sending and receiving a CLOSE
  • + *
  • {@link SchedulerClosed closed} - after both sending and receiving ACKs on the CLOSE
  • + *
  • {@link SchedulerDead dead} - after sending or receiving a RESET
  • + * + * + */ +class SchedulerConnecting extends SchedulerImpl { + private Log _log; + + public SchedulerConnecting(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerConnecting.class); + } + + public boolean accept(Connection con) { + return (con != null) && + (con.getLastSendId() >= 0) && + (con.getAckedPackets() <= 0) && + (!con.getResetReceived()); + } + + public void eventOccurred(Connection con) { + long waited = _context.clock().now() - con.getCreatedOn(); + if ( (con.getOptions().getConnectTimeout() > 0) && + (con.getOptions().getConnectTimeout() <= waited) ) { + con.setConnectionError("Timeout waiting for ack (waited " + waited + "ms)"); + con.disconnect(false); + reschedule(0, con); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("waited too long: " + waited); + return; + } else { + if (con.getOptions().getConnectTimeout() > 0) + reschedule(con.getOptions().getConnectTimeout(), con); + } + /* + long timeTillSend = con.getNextSendTime() - _context.clock().now(); + if ( (timeTillSend <= 0) && (con.getNextSendTime() > 0) ) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("send next on " + con); + con.sendAvailable(); + con.setNextSendTime(-1); + } else { + if (con.getNextSendTime() > 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("time till send: " + timeTillSend + " on " + con); + reschedule(timeTillSend, con); + } + } + */ + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java new file mode 100644 index 0000000000..0f11690bdc --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerDead.java @@ -0,0 +1,47 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

    Scheduler used for after the final timeout has passed or the + * connection was reset.

    + * + *

    Entry conditions:

    + * + *

    Events:

    + * + *

    Next states:

    + *
  • None
  • + * + * + * + */ +class SchedulerDead extends SchedulerImpl { + private Log _log; + public SchedulerDead(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerDead.class); + } + + public boolean accept(Connection con) { + boolean ok = (con != null) && + (con.getResetReceived()) || + ((con.getCloseSentOn() > 0) && + (con.getCloseReceivedOn() > 0) && + (con.getUnackedPacketsReceived() <= 0) && + (con.getUnackedPacketsSent() <= 0) && + (con.getCloseSentOn() + SchedulerClosed.CLOSE_TIMEOUT <= _context.clock().now())); + return ok; + } + + public void eventOccurred(Connection con) { + con.disconnectComplete(); + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java new file mode 100644 index 0000000000..b6fbca2eb7 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerImpl.java @@ -0,0 +1,36 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.SimpleTimer; +import net.i2p.util.Log; + +/** + * Base scheduler + */ +abstract class SchedulerImpl implements TaskScheduler { + protected I2PAppContext _context; + private Log _log; + + public SchedulerImpl(I2PAppContext ctx) { + _context = ctx; + _log = ctx.logManager().getLog(SchedulerImpl.class); + } + + protected void reschedule(long msToWait, Connection con) { + SimpleTimer.getInstance().addEvent(new ConEvent(con), msToWait); + } + + private class ConEvent implements SimpleTimer.TimedEvent { + private Connection _connection; + private Exception _addedBy; + public ConEvent(Connection con) { + _connection = con; + _addedBy = new Exception("added by"); + } + public void timeReached() { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("firing event on " + _connection, _addedBy); + _connection.eventOccurred(); + } + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java new file mode 100644 index 0000000000..21d375de9f --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerPreconnect.java @@ -0,0 +1,54 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + *

    Scheduler used for locally created connections where we have not yet + * sent the initial SYN packet.

    + * + *

    Entry conditions:

    + * + *

    Events:

    + * + *

    Next states:

    + *
  • {@link SchedulerConnecting connecting} - after sending a packet
  • + * + */ +class SchedulerPreconnect extends SchedulerImpl { + private Log _log; + + public SchedulerPreconnect(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerPreconnect.class); + } + + public boolean accept(Connection con) { + return (con != null) && + (con.getSendStreamId() == null) && + (con.getLastSendId() < 0); + } + + public void eventOccurred(Connection con) { + if (con.getNextSendTime() < 0) + con.setNextSendTime(_context.clock().now() + con.getOptions().getConnectDelay()); + + long timeTillSend = con.getNextSendTime() - _context.clock().now(); + if (timeTillSend <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Send available for the SYN on " + con); + con.sendAvailable(); + con.setNextSendTime(-1); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Wait " + timeTillSend + " before sending the SYN on " + con); + reschedule(timeTillSend, con); + } + } +} diff --git a/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java new file mode 100644 index 0000000000..09836477e5 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/SchedulerReceived.java @@ -0,0 +1,44 @@ +package net.i2p.client.streaming; + +import net.i2p.I2PAppContext; +import net.i2p.util.Log; + +/** + * Scheduler used after receiving an inbound connection but before + * we have sent our own SYN. + * + */ +class SchedulerReceived extends SchedulerImpl { + private Log _log; + + public SchedulerReceived(I2PAppContext ctx) { + super(ctx); + _log = ctx.logManager().getLog(SchedulerReceived.class); + } + + public boolean accept(Connection con) { + return (con != null) && + (con.getLastSendId() < 0) && + (con.getSendStreamId() != null); + } + + public void eventOccurred(Connection con) { + if (con.getUnackedPacketsReceived() <= 0) { + if (_log.shouldLog(Log.WARN)) + _log.warn("hmm, state is received, but no unacked packets received?"); + return; + } + + long timeTillSend = con.getNextSendTime() - _context.clock().now(); + if (timeTillSend <= 0) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("received con... send a packet"); + con.sendAvailable(); + con.setNextSendTime(-1); + } else { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("received con... time till next send: " + timeTillSend); + reschedule(timeTillSend, con); + } + } +} \ No newline at end of file diff --git a/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java b/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java new file mode 100644 index 0000000000..8657a20538 --- /dev/null +++ b/apps/streaming/java/src/net/i2p/client/streaming/TaskScheduler.java @@ -0,0 +1,22 @@ +package net.i2p.client.streaming; + +/** + * Coordinates what we do 'next'. The scheduler used by a connection is + * selected based upon its current state. + * + */ +interface TaskScheduler { + /** + * An event has occurred (timeout, message sent, or message received), + * so schedule what to do next based on our current state. + * + */ + public void eventOccurred(Connection con); + + /** + * Determine whether this scheduler is fit to operate against the + * given connection + * + */ + public boolean accept(Connection con); +} diff --git a/apps/streaming/java/test/net/i2p/client/streaming/ConnectTest.java b/apps/streaming/java/test/net/i2p/client/streaming/ConnectTest.java new file mode 100644 index 0000000000..381e007ba8 --- /dev/null +++ b/apps/streaming/java/test/net/i2p/client/streaming/ConnectTest.java @@ -0,0 +1,127 @@ +package net.i2p.client.streaming; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PClient; +import net.i2p.client.I2PClientFactory; +import net.i2p.client.I2PSession; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * + */ +public class ConnectTest { + private Log _log; + private I2PSession _client; + private I2PSession _server; + public void test() { + try { + I2PAppContext context = I2PAppContext.getGlobalContext(); + _log = context.logManager().getLog(ConnectTest.class); + _log.debug("creating server session"); + _server = createSession(); + _log.debug("running server"); + runServer(context, _server); + _log.debug("creating client session"); + _client = createSession(); + _log.debug("running client"); + runClient(context, _client); + } catch (Exception e) { + _log.error("error running", e); + } + try { Thread.sleep(30*1000); } catch (Exception e) {} + } + + private void runClient(I2PAppContext ctx, I2PSession session) { + Thread t = new Thread(new ClientRunner(ctx, session)); + t.setName("client"); + t.setDaemon(true); + t.start(); + } + + private void runServer(I2PAppContext ctx, I2PSession session) { + Thread t = new Thread(new ServerRunner(ctx, session)); + t.setName("server"); + t.setDaemon(true); + t.start(); + } + + private class ServerRunner implements Runnable { + private I2PAppContext _context; + private I2PSession _session; + private Log _log; + public ServerRunner(I2PAppContext ctx, I2PSession session) { + _context = ctx; + _session = session; + _log = ctx.logManager().getLog(ServerRunner.class); + } + + public void run() { + try { + Properties opts = new Properties(); + I2PSocketManager mgr = new I2PSocketManagerFull(_context, _session, opts, "client"); + _log.debug("manager created"); + I2PServerSocket ssocket = mgr.getServerSocket(); + _log.debug("server socket created"); + while (true) { + I2PSocket socket = ssocket.accept(); + _log.debug("socket accepted: " + socket); + try { Thread.sleep(10*1000); } catch (InterruptedException ie) {} + socket.close(); + } + } catch (Exception e) { + _log.error("error running", e); + } + } + + } + + private class ClientRunner implements Runnable { + private I2PAppContext _context; + private I2PSession _session; + private Log _log; + public ClientRunner(I2PAppContext ctx, I2PSession session) { + _context = ctx; + _session = session; + _log = ctx.logManager().getLog(ClientRunner.class); + } + + public void run() { + try { + Properties opts = new Properties(); + I2PSocketManager mgr = new I2PSocketManagerFull(_context, _session, opts, "client"); + _log.debug("manager created"); + I2PSocket socket = mgr.connect(_server.getMyDestination()); + _log.debug("socket created"); + socket.close(); + _log.debug("socket closed"); + } catch (Exception e) { + _log.error("error running", e); + } + } + + } + + private I2PSession createSession() { + try { + I2PClient client = I2PClientFactory.createClient(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + Destination dest = client.createDestination(baos); + I2PSession sess = client.createSession(new ByteArrayInputStream(baos.toByteArray()), new Properties()); + sess.connect(); + return sess; + } catch (Exception e) { + _log.error("error running", e); + throw new RuntimeException("b0rk b0rk b0rk"); + } + } + + public static void main(String args[]) { + ConnectTest ct = new ConnectTest(); + ct.test(); + } +} diff --git a/apps/streaming/java/test/net/i2p/client/streaming/EchoTest.java b/apps/streaming/java/test/net/i2p/client/streaming/EchoTest.java new file mode 100644 index 0000000000..70ebf899cb --- /dev/null +++ b/apps/streaming/java/test/net/i2p/client/streaming/EchoTest.java @@ -0,0 +1,179 @@ +package net.i2p.client.streaming; + +import java.io.InputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.util.Date; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PClient; +import net.i2p.client.I2PClientFactory; +import net.i2p.client.I2PSession; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * + */ +public class EchoTest { + private Log _log; + private I2PSession _client; + private I2PSession _server; + public void test() { + try { + I2PAppContext context = I2PAppContext.getGlobalContext(); + _log = context.logManager().getLog(ConnectTest.class); + _log.debug("creating server session"); + _server = createSession(); + _log.debug("running server"); + runServer(context, _server); + _log.debug("creating client session"); + _client = createSession(); + _log.debug("running client"); + runClient(context, _client); + } catch (Exception e) { + _log.error("error running", e); + } + try { Thread.sleep(300*1000); } catch (Exception e) {} + } + + private void runClient(I2PAppContext ctx, I2PSession session) { + Thread t = new Thread(new ClientRunner(ctx, session)); + t.setName("client"); + t.setDaemon(true); + t.start(); + } + + private void runServer(I2PAppContext ctx, I2PSession session) { + Thread t = new Thread(new ServerRunner(ctx, session)); + t.setName("server"); + t.setDaemon(true); + t.start(); + } + + private class ServerRunner implements Runnable { + private I2PAppContext _context; + private I2PSession _session; + private Log _log; + public ServerRunner(I2PAppContext ctx, I2PSession session) { + _context = ctx; + _session = session; + _log = ctx.logManager().getLog(ServerRunner.class); + } + + public void run() { + try { + Properties opts = new Properties(); + I2PSocketManager mgr = new I2PSocketManagerFull(_context, _session, opts, "client"); + _log.debug("manager created"); + I2PServerSocket ssocket = mgr.getServerSocket(); + _log.debug("server socket created"); + while (true) { + I2PSocket socket = ssocket.accept(); + _log.debug("socket accepted: " + socket); + InputStream in = socket.getInputStream(); + OutputStream out = socket.getOutputStream(); + _log.debug("server streams built"); + byte buf[] = new byte[5]; + while (buf != null) { + for (int i = 0; i < buf.length; i++) { + int c = in.read(); + if (c == -1) { + buf = null; + break; + } else { + buf[i] = (byte)(c & 0xFF); + } + } + if (buf != null) { + _log.debug("* server read: " + new String(buf)); + out.write(buf); + out.flush(); + } + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closing the received server socket"); + socket.close(); + } + } catch (Exception e) { + _log.error("error running", e); + } + } + + } + + private class ClientRunner implements Runnable { + private I2PAppContext _context; + private I2PSession _session; + private Log _log; + public ClientRunner(I2PAppContext ctx, I2PSession session) { + _context = ctx; + _session = session; + _log = ctx.logManager().getLog(ClientRunner.class); + } + + public void run() { + try { + Properties opts = new Properties(); + I2PSocketManager mgr = new I2PSocketManagerFull(_context, _session, opts, "client"); + _log.debug("manager created"); + I2PSocket socket = mgr.connect(_server.getMyDestination()); + _log.debug("socket created"); + InputStream in = socket.getInputStream(); + OutputStream out = socket.getOutputStream(); + for (int i = 0; i < 3; i++) { + out.write("blah!".getBytes()); + _log.debug("client wrote a line"); + out.flush(); + _log.debug("client flushed"); + byte buf[] = new byte[5]; + + for (int j = 0; j < buf.length; j++) { + int c = in.read(); + if (c == -1) { + buf = null; + break; + } else { + //_log.debug("client read: " + ((char)c)); + buf[j] = (byte)(c & 0xFF); + } + } + if (buf != null) { + _log.debug("* client read: " + new String(buf)); + } + } + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Closing the client socket"); + socket.close(); + _log.debug("socket closed"); + + Thread.sleep(5*1000); + System.exit(0); + } catch (Exception e) { + _log.error("error running", e); + } + } + + } + + private I2PSession createSession() { + try { + I2PClient client = I2PClientFactory.createClient(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + Destination dest = client.createDestination(baos); + I2PSession sess = client.createSession(new ByteArrayInputStream(baos.toByteArray()), new Properties()); + sess.connect(); + return sess; + } catch (Exception e) { + _log.error("error running", e); + throw new RuntimeException("b0rk b0rk b0rk"); + } + } + + public static void main(String args[]) { + EchoTest et = new EchoTest(); + et.test(); + } +} diff --git a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java index 753f16b352..73f40d19c9 100644 --- a/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java +++ b/apps/streaming/java/test/net/i2p/client/streaming/MessageInputStreamTest.java @@ -25,7 +25,7 @@ public class MessageInputStreamTest { byte orig[] = new byte[32*1024]; _context.random().nextBytes(orig); - MessageInputStream in = new MessageInputStream(); + MessageInputStream in = new MessageInputStream(_context); for (int i = 0; i < 32; i++) { byte msg[] = new byte[1024]; System.arraycopy(orig, i*1024, msg, 0, 1024); @@ -50,7 +50,7 @@ public class MessageInputStreamTest { byte orig[] = new byte[32*1024]; _context.random().nextBytes(orig); - MessageInputStream in = new MessageInputStream(); + MessageInputStream in = new MessageInputStream(_context); ArrayList order = new ArrayList(32); for (int i = 0; i < 32; i++) order.add(new Integer(i)); diff --git a/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java b/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java new file mode 100644 index 0000000000..b5918b3b35 --- /dev/null +++ b/apps/streaming/java/test/net/i2p/client/streaming/PingTest.java @@ -0,0 +1,56 @@ +package net.i2p.client.streaming; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.util.Properties; + +import net.i2p.I2PAppContext; +import net.i2p.client.I2PClient; +import net.i2p.client.I2PClientFactory; +import net.i2p.client.I2PSession; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * + */ +public class PingTest { + public void test() { + try { + I2PAppContext context = I2PAppContext.getGlobalContext(); + I2PSession session = createSession(); + ConnectionManager mgr = new ConnectionManager(context, session); + Log log = context.logManager().getLog(PingTest.class); + for (int i = 0; i < 10; i++) { + log.debug("ping " + i); + long before = context.clock().now(); + boolean ponged = mgr.ping(session.getMyDestination(), 2*1000); + long after = context.clock().now(); + log.debug("ponged? " + ponged + " after " + (after-before) + "ms"); + } + } catch (Exception e) { + e.printStackTrace(); + } + try { Thread.sleep(30*1000); } catch (Exception e) {} + + } + + private I2PSession createSession() { + try { + I2PClient client = I2PClientFactory.createClient(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(512); + Destination dest = client.createDestination(baos); + I2PSession sess = client.createSession(new ByteArrayInputStream(baos.toByteArray()), new Properties()); + sess.connect(); + return sess; + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("b0rk b0rk b0rk"); + } + } + + public static void main(String args[]) { + PingTest pt = new PingTest(); + pt.test(); + } +}