diff --git a/apps/sam/java/src/net/i2p/sam/MasterSession.java b/apps/sam/java/src/net/i2p/sam/MasterSession.java new file mode 100644 index 0000000000000000000000000000000000000000..6e6a5f07f0c386f91d688cdecaf3aa3b35fd6746 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/MasterSession.java @@ -0,0 +1,358 @@ +package net.i2p.sam; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; + +import net.i2p.I2PException; +import net.i2p.client.I2PSession; +import net.i2p.client.I2PSessionException; +import net.i2p.client.I2PSessionMuxedListener; +import net.i2p.client.streaming.I2PSocket; +import net.i2p.data.DataFormatException; +import net.i2p.data.DataHelper; +import net.i2p.data.Destination; +import net.i2p.util.Log; + +/** + * A session that does nothing, but implements interfaces for raw, datagram, and streaming + * for convenience. + * + * We extend SAMv3StreamSession as we must have it set up the I2PSession, in case + * user adds a STREAM session (and he probably will). + * This session receives all data from I2P, but you can't send any data on it. + * + * @since 0.9.25 + */ +class MasterSession extends SAMv3StreamSession implements SAMDatagramReceiver, SAMRawReceiver, + SAMMessageSess, I2PSessionMuxedListener { + private final SAMv3Handler handler; + private final SAMv3DatagramServer dgs; + private final Map<String, SAMMessageSess> sessions; + + /** + * Build a Session according to information + * registered with the given nickname + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + */ + public MasterSession(String nick, SAMv3DatagramServer dgServer, SAMv3Handler handler, Properties props) + throws IOException, DataFormatException, SAMException { + super(nick); + props.setProperty("net.i2p.streaming.enforceProtocol", "true"); + props.setProperty("i2cp.dontPublishLeaseSet", "false"); + props.setProperty("FROM_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED)); + props.setProperty("TO_PORT", Integer.toString(I2PSession.PORT_UNSPECIFIED)); + dgs = dgServer; + sessions = new ConcurrentHashMap<String, SAMMessageSess>(4); + this.handler = handler; + I2PSession isess = socketMgr.getSession(); + // if we get a RAW session added with 0/0, it will replace this, + // and we won't add this back if removed. + isess.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + } + + /** + * Add a session + * @return null for success, or error message + */ + public synchronized String add(String nick, String style, Properties props) { + SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); + if (rec != null || sessions.containsKey(nick)) + return "Duplicate ID " + nick; + int listenPort = I2PSession.PORT_ANY; + String slp = (String) props.remove("LISTEN_PORT"); + if (slp == null) + slp = props.getProperty("FROM_PORT"); + if (slp != null) { + try { + listenPort = Integer.parseInt(slp); + if (listenPort < 0 || listenPort > 65535) + return "Bad LISTEN_PORT " + slp; + // TODO enforce streaming listen port must be 0 or from port + } catch (NumberFormatException nfe) { + return "Bad LISTEN_PORT " + slp; + } + } + int listenProtocol; + SAMMessageSess sess; + // temp + try { + I2PSession isess = socketMgr.getSession(); + if (style.equals("RAW")) { + if (!props.containsKey("PORT")) + return "RAW subsession must specify PORT"; + listenProtocol = I2PSession.PROTO_DATAGRAM_RAW; + String spr = (String) props.remove("LISTEN_PROTOCOL"); + if (spr == null) + spr = props.getProperty("PROTOCOL"); + if (spr != null) { + try { + listenProtocol = Integer.parseInt(spr); + // RAW can't listen on streaming protocol + if (listenProtocol < 0 || listenProtocol > 255 || + listenProtocol == I2PSession.PROTO_STREAMING) + return "Bad RAW LISTEN_PPROTOCOL " + spr; + } catch (NumberFormatException nfe) { + return "Bad LISTEN_PROTOCOL " + spr; + } + } + sess = new SAMv3RawSession(nick, props, handler, isess, listenProtocol, listenPort, dgs); + } else if (style.equals("DATAGRAM")) { + if (!props.containsKey("PORT")) + return "DATAGRAM subsession must specify PORT"; + listenProtocol = I2PSession.PROTO_DATAGRAM; + sess = new SAMv3DatagramSession(nick, props, handler, isess, listenPort, dgs); + } else if (style.equals("STREAM")) { + listenProtocol = I2PSession.PROTO_STREAMING; + // FIXME need something that hangs off an existing dest + sess = new SAMv3StreamSession(nick, props, handler, socketMgr, listenPort); + } else { + return "Unrecognized SESSION STYLE " + style; + } + } catch (Exception e) { + // temp + return e.toString(); + } + + for (SAMMessageSess s : sessions.values()) { + if (listenProtocol == s.getListenProtocol() && + listenPort == s.getListenPort()) + return "Duplicate protocol " + listenProtocol + " and port " + listenPort; + } + + // add to session db and our map + rec = new SessionRecord(getDestination().toBase64(), props, handler); + try { + if (!SAMv3Handler.sSessionsHash.put(nick, rec)) + return "Duplicate ID " + nick; + sessions.put(nick, sess); + } catch (SessionsDB.ExistingIdException e) { + return e.toString(); + } catch (SessionsDB.ExistingDestException e) { + // fixme need new db method for dup dests + } + // listeners etc + + // all ok + return null; + } + + /** + * Remove a session + * @return null for success, or error message + */ + public synchronized String remove(String nick, Properties props) { + boolean ok = SAMv3Handler.sSessionsHash.del(nick); + SAMMessageSess sess = sessions.remove(nick); + if (sess != null) { + sess.close(); + // TODO if 0/0, add back this as listener? + } else { + ok = false; + } + if (!ok) + return "ID " + nick + " not found"; + // all ok + return null; + } + + /** + * @throws IOException always + */ + public void receiveDatagramBytes(Destination sender, byte[] data, int proto, + int fromPort, int toPort) throws IOException { + throw new IOException("master session"); + } + + /** + * Does nothing. + */ + public void stopDatagramReceiving() {} + + /** + * @throws IOException always + */ + public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException { + throw new IOException("master session"); + } + + /** + * Does nothing. + */ + public void stopRawReceiving() {} + + + + /////// stream session overrides + + /** @throws I2PException always */ + @Override + public void connect(SAMv3Handler handler, String dest, Properties props) throws I2PException { + throw new I2PException("master session"); + } + + /** @throws SAMException always */ + @Override + public void accept(SAMv3Handler handler, boolean verbose) throws SAMException { + throw new SAMException("master session"); + } + + /** @throws SAMException always */ + @Override + public void startForwardingIncoming(Properties props, boolean sendPorts) throws SAMException { + throw new SAMException("master session"); + } + + /** does nothing */ + @Override + public void stopForwardingIncoming() {} + + + ///// SAMMessageSess interface + + @Override + public int getListenProtocol() { + return I2PSession.PROTO_ANY; + } + + @Override + public int getListenPort() { + return I2PSession.PORT_ANY; + } + + /** + * Close the master session + */ + @Override + public void close() { + // close sessions? + super.close(); + } + + // I2PSessionMuxedImpl interface + + public void disconnected(I2PSession session) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2P session disconnected"); + close(); + } + + public void errorOccurred(I2PSession session, String message, + Throwable error) { + if (_log.shouldLog(Log.DEBUG)) + _log.debug("I2P error: " + message, error); + close(); + } + + public void messageAvailable(I2PSession session, int msgId, long size) { + messageAvailable(session, msgId, size, I2PSession.PROTO_UNSPECIFIED, + I2PSession.PORT_UNSPECIFIED, I2PSession.PORT_UNSPECIFIED); + } + + /** @since 0.9.24 */ + public void messageAvailable(I2PSession session, int msgId, long size, + int proto, int fromPort, int toPort) { + try { + byte msg[] = session.receiveMessage(msgId); + if (msg == null) + return; + messageReceived(msg, proto, fromPort, toPort); + } catch (I2PSessionException e) { + _log.error("Error fetching I2P message", e); + close(); + } + } + + public void reportAbuse(I2PSession session, int severity) { + _log.warn("Abuse reported (severity: " + severity + ")"); + close(); + } + + private void messageReceived(byte[] msg, int proto, int fromPort, int toPort) { + if (_log.shouldWarn()) + _log.warn("Unhandled message received, length = " + msg.length + + " protocol: " + proto + " from port: " + fromPort + " to port: " + toPort); + } + + private class StreamAcceptor implements Runnable { + + public StreamAcceptor() { + } + + public void run() { + while (getSocketServer()!=null) { + + // wait and accept a connection from I2P side + I2PSocket i2ps; + try { + i2ps = getSocketServer().accept(); + if (i2ps == null) + continue; + } catch (SocketTimeoutException ste) { + continue; + } catch (ConnectException ce) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error accepting", ce); + try { Thread.sleep(50); } catch (InterruptedException ie) {} + continue; + } catch (I2PException ipe) { + if (_log.shouldLog(Log.WARN)) + _log.warn("Error accepting", ipe); + break; + } + int port = i2ps.getLocalPort(); + SAMMessageSess foundSess = null; + Collection<SAMMessageSess> all = sessions.values(); + for (Iterator<SAMMessageSess> iter = all.iterator(); iter.hasNext(); ) { + SAMMessageSess sess = iter.next(); + if (sess.getListenProtocol() != I2PSession.PROTO_STREAMING) { + // remove as we may be going around again below + iter.remove(); + continue; + } + if (sess.getListenPort() == port) { + foundSess = sess; + break; + } + } + // We never send streaming out as a raw packet to a default listener, + // and we don't allow raw to listen on streaming protocol, + // so we don't have to look for a default protocol, + // but we do have to look for a default port listener. + if (foundSess == null) { + for (SAMMessageSess sess : all) { + if (sess.getListenPort() == 0) { + foundSess = sess; + break; + } + } + } + if (foundSess != null) { + SAMv3StreamSession ssess = (SAMv3StreamSession) foundSess; + boolean ok = ssess.queueSocket(i2ps); + if (!ok) { + _log.logAlways(Log.WARN, "Accept queue overflow for " + ssess); + try { i2ps.close(); } catch (IOException ioe) {} + } + } else { + if (_log.shouldLog(Log.WARN)) + _log.warn("No subsession found for incoming streaming connection on port " + port); + } + } + } + } +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java index b25d6ad0b3b1911b11f7a75c5f8422f1a855b2b5..d72dfb5a48c4cd638384bb81b7e8f79584e10db3 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMDatagramSession.java @@ -45,7 +45,7 @@ class SAMDatagramSession extends SAMMessageSession { * @throws DataFormatException * @throws I2PSessionException */ - public SAMDatagramSession(String dest, Properties props, + protected SAMDatagramSession(String dest, Properties props, SAMDatagramReceiver recv) throws IOException, DataFormatException, I2PSessionException { super(dest, props); @@ -73,6 +73,20 @@ class SAMDatagramSession extends SAMMessageSession { dgramMaker = new I2PDatagramMaker(getI2PSession()); } + /** + * Create a new SAM DATAGRAM session on an existing I2P session. + * + * @since 0.9.25 + */ + protected SAMDatagramSession(I2PSession sess, int listenPort, + SAMDatagramReceiver recv) throws IOException, + DataFormatException, I2PSessionException { + super(sess, I2PSession.PROTO_DATAGRAM, listenPort); + + this.recv = recv; + dgramMaker = new I2PDatagramMaker(getI2PSession()); + } + /** * Send bytes through a SAM DATAGRAM session. * diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java new file mode 100644 index 0000000000000000000000000000000000000000..97af7421250b6a18c9ee049233a10e26a2d3db92 --- /dev/null +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSess.java @@ -0,0 +1,46 @@ +package net.i2p.sam; + +import java.io.Closeable; + +import net.i2p.client.I2PSessionException; +import net.i2p.data.DataFormatException; +import net.i2p.data.Destination; + +/** + * Base interface for SAMMessageSession, which is the base for + * v1/v3 datagram and raw sessions. + * Also implemented by SAMStreamSession. + * + * @since 0.9.25 pulled from SAMMessageSession + */ +interface SAMMessageSess extends Closeable { + + /** + * Close a SAM message-based session. + */ + public void close(); + + /** + * Get the SAM message-based session Destination. + * + * @return The SAM message-based session Destination. + */ + public Destination getDestination(); + + /** + * Send bytes through a SAM message-based session. + * + * @param dest Destination + * @param data Bytes to be sent + * + * @return True if the data was sent, false otherwise + * @throws DataFormatException on unknown / bad dest + * @throws I2PSessionException on serious error, probably session closed + */ + public boolean sendBytes(String dest, byte[] data, int proto, + int fromPort, int toPort) throws DataFormatException, I2PSessionException; + + public int getListenProtocol(); + + public int getListenPort(); +} diff --git a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java index 1c6c90a15043425e6f2ae276cacffb7d502841b0..eca08f1df0f90ddf561d837f12f1b31447477d79 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMMessageSession.java @@ -33,11 +33,13 @@ import net.i2p.util.Log; * * @author human */ -abstract class SAMMessageSession implements Closeable { +abstract class SAMMessageSession implements SAMMessageSess { protected final Log _log; private final I2PSession session; private final SAMMessageSessionHandler handler; + private final int listenProtocol; + private final int listenPort; /** * Initialize a new SAM message-based session. @@ -68,6 +70,33 @@ abstract class SAMMessageSession implements Closeable { handler = new SAMMessageSessionHandler(destStream, props); session = handler.getSession(); + listenProtocol = I2PSession.PROTO_ANY; + listenPort = I2PSession.PORT_ANY; + // FIXME don't start threads in constructors + Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); + t.start(); + } + + /** + * Initialize a new SAM message-based session using an existing I2PSession. + * + * @param destStream Input stream containing the destination and private keys (same format as PrivateKeyFile) + * @param props Properties to setup the I2P session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + protected SAMMessageSession(I2PSession sess, int listenProtocol, int listenPort) + throws IOException, DataFormatException, I2PSessionException { + _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("Initializing SAM message-based session"); + + session = sess; + handler = new SAMMessageSessionHandler(session); + this.listenProtocol = listenProtocol; + this.listenPort = listenPort; // FIXME don't start threads in constructors Thread t = new I2PAppThread(handler, "SAMMessageSessionHandler"); t.start(); @@ -82,6 +111,20 @@ abstract class SAMMessageSession implements Closeable { return session.getMyDestination(); } + /** + * @since 0.9.25 + */ + public int getListenProtocol() { + return listenProtocol; + } + + /** + * @since 0.9.25 + */ + public int getListenPort() { + return listenPort; + } + /** * Send bytes through a SAM message-based session. * @@ -188,7 +231,7 @@ abstract class SAMMessageSession implements Closeable { * * @author human */ - class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener { + private class SAMMessageSessionHandler implements Runnable, I2PSessionMuxedListener { private final I2PSession _session; private final Object runningLock = new Object(); @@ -198,8 +241,8 @@ abstract class SAMMessageSession implements Closeable { * Create a new SAM message-based session handler * * @param destStream Input stream containing the destination keys - * @param props Properties to setup the I2P session - * @throws I2PSessionException + * @param props Properties to setup the I2P session + * @throws I2PSessionException */ public SAMMessageSessionHandler(InputStream destStream, Properties props) throws I2PSessionException { if (_log.shouldLog(Log.DEBUG)) @@ -218,7 +261,17 @@ abstract class SAMMessageSession implements Closeable { if (_log.shouldLog(Log.DEBUG)) _log.debug("I2P session connected"); - _session.addMuxedSessionListener(this, I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + _session.addMuxedSessionListener(this, listenProtocol, listenPort); + } + + /** + * Create a new SAM message-based session handler on an existing I2PSession + * + * @since 0.9.25 + */ + public SAMMessageSessionHandler(I2PSession sess) throws I2PSessionException { + _session = sess; + _session.addMuxedSessionListener(this, listenProtocol, listenPort); } /** @@ -257,7 +310,7 @@ abstract class SAMMessageSession implements Closeable { _log.debug("Shutting down SAM message-based session handler"); shutDown(); - session.removeListener(I2PSession.PROTO_ANY, I2PSession.PORT_ANY); + session.removeListener(listenProtocol, listenPort); try { if (_log.shouldLog(Log.DEBUG)) diff --git a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java index ab1cd76322e36d6ca305510d14b9f0b1f82fffb9..b92a5db698d4e405bc371f3b8816d0e043b439e7 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMRawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMRawSession.java @@ -39,7 +39,7 @@ class SAMRawSession extends SAMMessageSession { * @throws DataFormatException * @throws I2PSessionException */ - public SAMRawSession(String dest, Properties props, + protected SAMRawSession(String dest, Properties props, SAMRawReceiver recv) throws IOException, DataFormatException, I2PSessionException { super(dest, props); @@ -63,6 +63,18 @@ class SAMRawSession extends SAMMessageSession { this.recv = recv; } + /** + * Create a new SAM RAW session on an existing I2P session. + * + * @since 0.9.25 + */ + protected SAMRawSession(I2PSession sess, int listenProtocol, int listenPort, + SAMRawReceiver recv) throws IOException, + DataFormatException, I2PSessionException { + super(sess, listenProtocol, listenPort); + this.recv = recv; + } + /** * Send bytes through a SAM RAW session. * diff --git a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java index 14ed92b3eec9057c3f99449563c15045237e2565..44c7b1d0b2eb933d02b8387996f35e1924af1653 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMStreamSession.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.I2PClient; +import net.i2p.client.I2PSession; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; import net.i2p.client.streaming.I2PSocketManager; @@ -47,7 +48,7 @@ import net.i2p.util.Log; * * @author human */ -class SAMStreamSession { +class SAMStreamSession implements SAMMessageSess { protected final Log _log; @@ -68,6 +69,8 @@ class SAMStreamSession { // Can we create outgoing connections? protected final boolean canCreate; + private final int listenProtocol; + private final int listenPort; /** * should we flush every time we get a STREAM SEND, or leave that up to @@ -105,8 +108,8 @@ class SAMStreamSession { * @throws DataFormatException * @throws SAMException */ - public SAMStreamSession(InputStream destStream, String dir, - Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { + protected SAMStreamSession(InputStream destStream, String dir, + Properties props, SAMStreamReceiver recv) throws IOException, DataFormatException, SAMException { this.recv = recv; _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); @@ -170,8 +173,15 @@ class SAMStreamSession { forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH)); + if (Boolean.parseBoolean(props.getProperty("i2p.streaming.enforceProtocol"))) + listenProtocol = I2PSession.PROTO_STREAMING; + else + listenProtocol = I2PSession.PROTO_ANY; + listenPort = I2PSession.PORT_ANY; + if (startAcceptor) { + // FIXME don't start threads in constructors server = new SAMStreamSessionServer(); Thread t = new I2PAppThread(server, "SAMStreamSessionServer"); @@ -180,6 +190,48 @@ class SAMStreamSession { server = null; } } + + /** + * Create a new SAM STREAM session on an existing socket manager. + * v3 only. + * + * @param props Properties to setup the I2P session + * @param recv Object that will receive incoming data + * @throws IOException + * @throws DataFormatException + * @throws SAMException + * @since 0.9.25 + */ + protected SAMStreamSession(I2PSocketManager mgr, Properties props, SAMStreamReceiver recv, int listenport) + throws IOException, DataFormatException, SAMException { + this.recv = recv; + _log = I2PAppContext.getGlobalContext().logManager().getLog(getClass()); + if (_log.shouldLog(Log.DEBUG)) + _log.debug("SAM STREAM session instantiated"); + canCreate = true; + Properties allprops = (Properties) System.getProperties().clone(); + allprops.putAll(props); + socketMgr = mgr; + socketMgr.addDisconnectListener(new DisconnectListener()); + forceFlush = Boolean.parseBoolean(allprops.getProperty(PROP_FORCE_FLUSH, DEFAULT_FORCE_FLUSH)); + listenProtocol = I2PSession.PROTO_STREAMING; + listenPort = listenport; + server = null; + } + + /* + * @since 0.9.25 + */ + public int getListenProtocol() { + return listenProtocol; + } + + /* + * @since 0.9.25 + */ + public int getListenPort() { + return listenPort; + } protected class DisconnectListener implements I2PSocketManager.DisconnectListener { public void sessionDisconnected() { @@ -304,6 +356,15 @@ class SAMStreamSession { return true; } + /** + * Unsupported + * @throws DataFormatException always + * @since 0.9.25 moved from subclass SAMv3StreamSession to implement SAMMessageSess + */ + public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException { + throw new DataFormatException(null); + } + /** * Create a new SAM STREAM session socket handler, detaching its thread. * diff --git a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java index e5ddb85e6c44c2dc72458aa4cba9d396947e7619..c181fba6b7a521b1f138eacc30f06c3b6e1c5059 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv1Handler.java @@ -40,13 +40,13 @@ import net.i2p.util.Log; */ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramReceiver, SAMStreamReceiver { - protected SAMRawSession rawSession; - protected SAMDatagramSession datagramSession; + protected SAMMessageSess rawSession; + protected SAMMessageSess datagramSession; protected SAMStreamSession streamSession; - protected SAMRawSession getRawSession() {return rawSession ;} - protected SAMDatagramSession getDatagramSession() {return datagramSession ;} - protected SAMStreamSession getStreamSession() {return streamSession ;} + protected final SAMMessageSess getRawSession() { return rawSession; } + protected final SAMMessageSess getDatagramSession() { return datagramSession; } + protected final SAMStreamSession getStreamSession() { return streamSession; } protected final long _id; private static final AtomicLong __id = new AtomicLong(); @@ -199,14 +199,14 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldWarn()) _log.warn("Error closing socket", e); } - if (getRawSession() != null) { - getRawSession().close(); + if (rawSession != null) { + rawSession.close(); } - if (getDatagramSession() != null) { - getDatagramSession().close(); + if (datagramSession != null) { + datagramSession.close(); } - if (getStreamSession() != null) { - getStreamSession().close(); + if (streamSession != null) { + streamSession.close(); } } } @@ -218,8 +218,8 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece try{ if (opcode.equals("CREATE")) { - if ((getRawSession() != null) || (getDatagramSession() != null) - || (getStreamSession() != null)) { + if ((rawSession != null) || (datagramSession != null) + || (streamSession != null)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("Trying to create a session, but one still exists"); return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Session already exists\"\n"); @@ -374,12 +374,12 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece Destination dest = null ; if (name.equals("ME")) { - if (getRawSession() != null) { - dest = getRawSession().getDestination(); - } else if (getStreamSession() != null) { - dest = getStreamSession().getDestination(); - } else if (getDatagramSession() != null) { - dest = getDatagramSession().getDestination(); + if (rawSession != null) { + dest = rawSession.getDestination(); + } else if (streamSession != null) { + dest = streamSession.getDestination(); + } else if (datagramSession != null) { + dest = datagramSession.getDestination(); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Lookup for SESSION destination, but session is null"); @@ -411,7 +411,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /* Parse and execute a DATAGRAM message */ protected boolean execDatagramMessage(String opcode, Properties props) { - if (getDatagramSession() == null) { + if (datagramSession == null) { _log.error("DATAGRAM message received, but no DATAGRAM session exists"); return false; } @@ -478,7 +478,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece in.readFully(data); - if (!getDatagramSession().sendBytes(dest, data, proto, fromPort, toPort)) { + if (!datagramSession.sendBytes(dest, data, proto, fromPort, toPort)) { _log.error("DATAGRAM SEND failed"); // a message send failure is no reason to drop the SAM session // for raw and repliable datagrams, just carry on our merry way @@ -515,7 +515,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /* Parse and execute a RAW message */ protected boolean execRawMessage(String opcode, Properties props) { - if (getRawSession() == null) { + if (rawSession == null) { _log.error("RAW message received, but no RAW session exists"); return false; } @@ -591,7 +591,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece in.readFully(data); - if (!getRawSession().sendBytes(dest, data, proto, fromPort, toPort)) { + if (!rawSession.sendBytes(dest, data, proto, fromPort, toPort)) { _log.error("RAW SEND failed"); // a message send failure is no reason to drop the SAM session // for raw and repliable datagrams, just carry on our merry way @@ -628,7 +628,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /* Parse and execute a STREAM message */ protected boolean execStreamMessage(String opcode, Properties props) { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("STREAM message received, but no STREAM session exists"); return false; } @@ -695,13 +695,13 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } try { - if (!getStreamSession().sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) { + if (!streamSession.sendBytes(id, getClientSocket().socket().getInputStream(), size)) { // data)) { if (_log.shouldLog(Log.WARN)) _log.warn("STREAM SEND [" + size + "] failed"); // a message send failure is no reason to drop the SAM session // for style=stream, tell the client the stream failed, and kill the virtual connection.. boolean rv = writeString("STREAM CLOSED RESULT=CANT_REACH_PEER ID=" + id + " MESSAGE=\"Send of " + size + " bytes failed\"\n"); - getStreamSession().closeConnection(id); + streamSession.closeConnection(id); return rv; } @@ -756,7 +756,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece try { try { - if (!getStreamSession().connect(id, dest, props)) { + if (!streamSession.connect(id, dest, props)) { if (_log.shouldLog(Log.DEBUG)) _log.debug("STREAM connection failed"); return false; @@ -817,7 +817,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } } - boolean closed = getStreamSession().closeConnection(id); + boolean closed = streamSession.closeConnection(id); if ( (!closed) && (_log.shouldLog(Log.WARN)) ) _log.warn("Stream unable to be closed, but this is non fatal"); return true; @@ -835,7 +835,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece // SAMRawReceiver implementation public void receiveRawBytes(byte data[], int proto, int fromPort, int toPort) throws IOException { - if (getRawSession() == null) { + if (rawSession == null) { _log.error("BUG! Received raw bytes, but session is null!"); return; } @@ -861,7 +861,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("stopRawReceiving() invoked"); - if (getRawSession() == null) { + if (rawSession == null) { _log.error("BUG! Got raw receiving stop, but session is null!"); return; } @@ -877,7 +877,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece // SAMDatagramReceiver implementation public void receiveDatagramBytes(Destination sender, byte data[], int proto, int fromPort, int toPort) throws IOException { - if (getDatagramSession() == null) { + if (datagramSession == null) { _log.error("BUG! Received datagram bytes, but session is null!"); return; } @@ -904,7 +904,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("stopDatagramReceiving() invoked"); - if (getDatagramSession() == null) { + if (datagramSession == null) { _log.error("BUG! Got datagram receiving stop, but session is null!"); return; } @@ -921,7 +921,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void streamSendAnswer( int id, String result, String bufferState ) throws IOException { - if ( getStreamSession() == null ) + if ( streamSession == null ) { _log.error ( "BUG! Want to answer to stream SEND, but session is null!" ); return; @@ -939,7 +939,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void notifyStreamSendBufferFree( int id ) throws IOException { - if ( getStreamSession() == null ) + if ( streamSession == null ) { _log.error ( "BUG! Stream outgoing buffer is free, but session is null!" ); return; @@ -953,7 +953,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece public void notifyStreamIncomingConnection(int id, Destination d) throws IOException { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("BUG! Received stream connection, but session is null!"); return; } @@ -968,7 +968,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /** @param msg may be null */ public void notifyStreamOutgoingConnection ( int id, String result, String msg ) throws IOException { - if ( getStreamSession() == null ) + if ( streamSession == null ) { _log.error ( "BUG! Received stream connection, but session is null!" ); return; @@ -1011,7 +1011,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece } public void receiveStreamBytes(int id, ByteBuffer data) throws IOException { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("Received stream bytes, but session is null!"); return; } @@ -1032,7 +1032,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece /** @param msg may be null */ public void notifyStreamDisconnection(int id, String result, String msg) throws IOException { - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("BUG! Received stream disconnection, but session is null!"); return; } @@ -1047,7 +1047,7 @@ class SAMv1Handler extends SAMHandler implements SAMRawReceiver, SAMDatagramRece if (_log.shouldLog(Log.DEBUG)) _log.debug("stopStreamReceiving() invoked", new Exception("stopped")); - if (getStreamSession() == null) { + if (streamSession == null) { _log.error("BUG! Got stream receiving stop, but session is null!"); return; } diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java index eadaaa7c36d9603691c42884345b60202c29b815..6022b1c798474447b247169861c3e02861162ad9 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3DatagramSession.java @@ -6,17 +6,18 @@ package net.i2p.sam; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress ; +import java.nio.ByteBuffer; import java.util.Properties; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.util.Log; -import java.net.InetSocketAddress; -import java.net.SocketAddress ; -import java.nio.ByteBuffer; class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDatagramReceiver { @@ -53,21 +54,28 @@ class SAMv3DatagramSession extends SAMDatagramSession implements Session, SAMDat this.handler = rec.getHandler(); Properties props = rec.getProps(); - String portStr = props.getProperty("PORT"); - if (portStr == null) { - if (_log.shouldDebug()) - _log.debug("receiver port not specified. Current socket will be used."); - this.clientAddress = null; - } else { - int port = Integer.parseInt(portStr); - String host = props.getProperty("HOST"); - if (host == null) { - host = rec.getHandler().getClientIP(); - if (_log.shouldDebug()) - _log.debug("no host specified. Taken from the client socket : " + host+':'+port); - } - this.clientAddress = new InetSocketAddress(host, port); - } + clientAddress = SAMv3RawSession.getSocketAddress(props, handler); + } + + /** + * Build a Datagram Session on an existing i2p session + * registered with the given nickname + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + public SAMv3DatagramSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess, + int listenPort, SAMv3DatagramServer dgServer) + throws IOException, DataFormatException, I2PSessionException { + super(isess, listenPort, null); // to be replaced by this + this.nick = nick ; + this.recv = this ; // replacement + this.server = dgServer; + this.handler = handler; + clientAddress = SAMv3RawSession.getSocketAddress(props, handler); } public void receiveDatagramBytes(Destination sender, byte[] data, int proto, diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java index fbce9c6ed08dbf52632b17934a0cba147c88cd8f..ce61f16871f9f78c418172979c41e6821c169088 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3Handler.java @@ -48,6 +48,7 @@ class SAMv3Handler extends SAMv1Handler { private Session session; + // TODO remove singleton, hang off SAMBridge like dgserver public static final SessionsDB sSessionsHash = new SessionsDB(); private volatile boolean stolenSocket; private volatile boolean streamForwardingSocket; @@ -369,9 +370,16 @@ class SAMv3Handler extends SAMv1Handler protected boolean execSessionMessage(String opcode, Properties props) { String dest = "BUG!"; - String nick = null ; boolean ok = false ; + String nick = (String) props.remove("ID"); + if (nick == null) + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n"); + + String style = (String) props.remove("STYLE"); + if (style == null && !opcode.equals("REMOVE")) + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n"); + try{ if (opcode.equals("CREATE")) { if ((this.getRawSession()!= null) || (this.getDatagramSession() != null) @@ -418,22 +426,6 @@ class SAMv3Handler extends SAMv1Handler return writeString("SESSION STATUS RESULT=INVALID_KEY\n"); } - - nick = (String) props.remove("ID"); - if (nick == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("SESSION ID parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"ID not specified\"\n"); - } - - - String style = (String) props.remove("STYLE"); - if (style == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("SESSION STYLE parameter not specified"); - return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"No SESSION STYLE specified\"\n"); - } - // Unconditionally override what the client may have set // (iMule sets BestEffort) as None is more efficient // and the client has no way to access delivery notifications @@ -472,6 +464,13 @@ class SAMv3Handler extends SAMv1Handler SAMv3StreamSession v3 = newSAMStreamSession(nick); streamSession = v3; this.session = v3; + } else if (style.equals("MASTER")) { + SAMv3DatagramServer dgs = bridge.getV3DatagramServer(props); + MasterSession v3 = new MasterSession(nick, dgs, this, allProps); + streamSession = v3; + datagramSession = v3; + rawSession = v3; + this.session = v3; } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION STYLE: \"" + style +"\""); @@ -480,6 +479,22 @@ class SAMv3Handler extends SAMv1Handler ok = true ; return writeString("SESSION STATUS RESULT=OK DESTINATION=" + dest + "\n"); + } else if (opcode.equals("ADD") || opcode.equals("REMOVE")) { + // prevent trouble in finally block + ok = true; + if (streamSession != null || datagramSession != null || rawSession != null) + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"Not a MASTER session\"\n"); + MasterSession msess = (MasterSession) session; + String msg; + if (opcode.equals("ADD")) { + msg = msess.add(nick, style, props); + } else { + msg = msess.remove(nick, props); + } + if (msg == null) + return writeString("SESSION STATUS RESULT=OK MESSAGE=\"" + opcode + ' ' + nick + "\"\n"); + else + return writeString("SESSION STATUS RESULT=I2P_ERROR MESSAGE=\"" + msg + "\"\n"); } else { if (_log.shouldLog(Log.DEBUG)) _log.debug("Unrecognized SESSION message opcode: \"" diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java index d98762a368a9f7633c7f6260404b25e130190702..2137fbb56b408591877e432f7af11dfe778dc7f8 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3RawSession.java @@ -10,6 +10,7 @@ import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.Properties; +import net.i2p.client.I2PSession; import net.i2p.client.I2PSessionException; import net.i2p.data.DataFormatException; import net.i2p.data.DataHelper; @@ -19,7 +20,7 @@ import net.i2p.util.Log; * @author MKVore * */ -class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver { +class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver { private final String nick; private final SAMv3Handler handler; @@ -42,36 +43,62 @@ class SAMv3RawSession extends SAMRawSession implements Session, SAMRawReceiver throws IOException, DataFormatException, I2PSessionException { super(SAMv3Handler.sSessionsHash.get(nick).getDest(), SAMv3Handler.sSessionsHash.get(nick).getProps(), - SAMv3Handler.sSessionsHash.get(nick).getHandler() // to be replaced by this + null // to be replaced by this ); this.nick = nick ; this.recv = this ; // replacement this.server = dgServer; - SessionRecord rec = SAMv3Handler.sSessionsHash.get(nick); if (rec == null) throw new InterruptedIOException() ; this.handler = rec.getHandler(); Properties props = rec.getProps(); + clientAddress = getSocketAddress(props, handler); + _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) && + Boolean.parseBoolean(props.getProperty("HEADER")); + } + + /** + * Build a Raw Session on an existing i2p session + * registered with the given nickname + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + public SAMv3RawSession(String nick, Properties props, SAMv3Handler handler, I2PSession isess, + int listenProtocol, int listenPort, SAMv3DatagramServer dgServer) + throws IOException, DataFormatException, I2PSessionException { + super(isess, listenProtocol, listenPort, null); // to be replace by this + this.nick = nick ; + this.recv = this ; // replacement + this.server = dgServer; + this.handler = handler; + clientAddress = getSocketAddress(props, handler); + _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) && + Boolean.parseBoolean(props.getProperty("HEADER")); + } + + /** + * @return null if PORT not set + * @since 0.9.25 moved from constructor + */ + static SocketAddress getSocketAddress(Properties props, SAMv3Handler handler) { String portStr = props.getProperty("PORT") ; if (portStr == null) { - if (_log.shouldLog(Log.DEBUG)) - _log.debug("receiver port not specified. Current socket will be used."); - this.clientAddress = null; + return null; } else { int port = Integer.parseInt(portStr); String host = props.getProperty("HOST"); if ( host==null ) { - host = rec.getHandler().getClientIP(); - if (_log.shouldLog(Log.DEBUG)) - _log.debug("no host specified. Taken from the client socket : " + host +':'+port); + host = handler.getClientIP(); } - this.clientAddress = new InetSocketAddress(host, port); + return new InetSocketAddress(host, port); } - _sendHeader = ((handler.verMajor == 3 && handler.verMinor >= 2) || handler.verMajor > 3) && - Boolean.parseBoolean(props.getProperty("HEADER")); } - + public void receiveRawBytes(byte[] data, int proto, int fromPort, int toPort) throws IOException { if (this.clientAddress==null) { this.handler.receiveRawBytes(data, proto, fromPort, toPort); diff --git a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java index 164d9630b7ded40f08846be0b368db33ec21f9b5..bd6a33f37fdea27fb655ea791483cfcacd4aaa19 100644 --- a/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java +++ b/apps/sam/java/src/net/i2p/sam/SAMv3StreamSession.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.security.GeneralSecurityException; import java.util.Properties; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; @@ -30,6 +31,7 @@ import net.i2p.I2PAppContext; import net.i2p.I2PException; import net.i2p.client.streaming.I2PServerSocket; import net.i2p.client.streaming.I2PSocket; +import net.i2p.client.streaming.I2PSocketManager; import net.i2p.client.streaming.I2PSocketOptions; import net.i2p.data.DataFormatException; import net.i2p.data.Destination; @@ -46,13 +48,16 @@ import net.i2p.util.Log; class SAMv3StreamSession extends SAMStreamSession implements Session { - private static final int BUFFER_SIZE = 1024 ; + private static final int BUFFER_SIZE = 1024; + private static final int MAX_ACCEPT_QUEUE = 64; private final Object socketServerLock = new Object(); /** this is ONLY set for FORWARD, not for ACCEPT */ private I2PServerSocket socketServer; /** this is the count of active ACCEPT sockets */ private final AtomicInteger _acceptors = new AtomicInteger(); + /** for subsession only, null otherwise */ + private final LinkedBlockingQueue<I2PSocket> _acceptQueue; private static I2PSSLSocketFactory _sslSocketFactory; @@ -79,6 +84,55 @@ class SAMv3StreamSession extends SAMStreamSession implements Session getDB().get(login).getProps(), getDB().get(login).getHandler()); this.nick = login ; + _acceptQueue = null; + } + + /** + * Build a Datagram Session on an existing I2P session + * registered with the given nickname + * + * @param nick nickname of the session + * @throws IOException + * @throws DataFormatException + * @throws I2PSessionException + * @since 0.9.25 + */ + public SAMv3StreamSession(String login, Properties props, SAMv3Handler handler, I2PSocketManager mgr, + int listenPort) throws IOException, DataFormatException, SAMException { + super(mgr, props, handler, listenPort); + this.nick = login ; + _acceptQueue = new LinkedBlockingQueue<I2PSocket>(MAX_ACCEPT_QUEUE); + } + + /** + * Put a socket on the accept queue. + * Only for subsession, throws IllegalStateException otherwise. + * + * @return success, false if full + * @since 0.9.25 + */ + public boolean queueSocket(I2PSocket sock) { + if (_acceptQueue == null) + throw new IllegalStateException(); + return _acceptQueue.offer(sock); + } + + /** + * Take a socket from the accept queue. + * Only for subsession, throws IllegalStateException otherwise. + * + * @since 0.9.25 + */ + private I2PSocket acceptSocket() throws ConnectException { + if (_acceptQueue == null) + throw new IllegalStateException(); + try { + return _acceptQueue.take(); + } catch (InterruptedException ie) { + ConnectException ce = new ConnectException("interrupted"); + ce.initCause(ie); + throw ce; + } } public static SessionsDB getDB() @@ -185,10 +239,13 @@ class SAMv3StreamSession extends SAMStreamSession implements Session } } - I2PSocket i2ps; + I2PSocket i2ps = null; _acceptors.incrementAndGet(); try { - i2ps = socketMgr.getServerSocket().accept(); + if (_acceptQueue != null) + i2ps = acceptSocket(); + else + i2ps = socketMgr.getServerSocket().accept(); } finally { _acceptors.decrementAndGet(); } @@ -257,25 +314,23 @@ class SAMv3StreamSession extends SAMStreamSession implements Session this.socketServer = this.socketMgr.getServerSocket(); } - SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, this, verbose, sendPorts); + SocketForwarder forwarder = new SocketForwarder(host, port, isSSL, verbose, sendPorts); (new I2PAppThread(rec.getThreadGroup(), forwarder, "SAMV3StreamForwarder")).start(); } /** * Forward sockets from I2P to the host/port provided */ - private static class SocketForwarder implements Runnable + private class SocketForwarder implements Runnable { private final String host; private final int port; - private final SAMv3StreamSession session; private final boolean isSSL, verbose, sendPorts; SocketForwarder(String host, int port, boolean isSSL, - SAMv3StreamSession session, boolean verbose, boolean sendPorts) { + boolean verbose, boolean sendPorts) { this.host = host ; this.port = port ; - this.session = session ; this.verbose = verbose ; this.sendPorts = sendPorts; this.isSSL = isSSL; @@ -283,12 +338,15 @@ class SAMv3StreamSession extends SAMStreamSession implements Session public void run() { - while (session.getSocketServer()!=null) { + while (getSocketServer() != null) { // wait and accept a connection from I2P side I2PSocket i2ps; try { - i2ps = session.getSocketServer().accept(); + if (_acceptQueue != null) + i2ps = acceptSocket(); + else + i2ps = getSocketServer().accept(); if (i2ps == null) continue; } catch (SocketTimeoutException ste) { @@ -437,7 +495,7 @@ class SAMv3StreamSession extends SAMStreamSession implements Session } } - private I2PServerSocket getSocketServer() + protected I2PServerSocket getSocketServer() { synchronized ( this.socketServerLock ) { return this.socketServer ; @@ -479,13 +537,4 @@ class SAMv3StreamSession extends SAMStreamSession implements Session public void close() { socketMgr.destroySocketManager(); } - - /** - * Unsupported - * @throws DataFormatException always - */ - public boolean sendBytes(String s, byte[] b, int pr, int fp, int tp) throws DataFormatException - { - throw new DataFormatException(null); - } }