I2P Address: [http://git.idk.i2p]

Skip to content
Snippets Groups Projects
Commit 0b599c45 authored by sponge's avatar sponge
Browse files

Refactored code.

Piles of pedantic lock checks to ensure we don't get deadlocked, it's ugly.
parent 7f3f6dfd
No related branches found
No related tags found
No related merge requests found
...@@ -113,7 +113,7 @@ public class BOB { ...@@ -113,7 +113,7 @@ public class BOB {
public final static String PROP_BOB_PORT = "BOB.port"; public final static String PROP_BOB_PORT = "BOB.port";
public final static String PROP_BOB_HOST = "BOB.host"; public final static String PROP_BOB_HOST = "BOB.host";
private static int maxConnections = 0; private static int maxConnections = 0;
private static nickname database; private static NamedDB database;
/** /**
* Log a warning * Log a warning
...@@ -141,7 +141,7 @@ public class BOB { ...@@ -141,7 +141,7 @@ public class BOB {
* @param args * @param args
*/ */
public static void main(String[] args) { public static void main(String[] args) {
database = new nickname(); database = new NamedDB();
int i = 0; int i = 0;
boolean save = false; boolean save = false;
// Set up all defaults to be passed forward to other threads. // Set up all defaults to be passed forward to other threads.
...@@ -212,10 +212,10 @@ public class BOB { ...@@ -212,10 +212,10 @@ public class BOB {
Socket server; Socket server;
while((i++ < maxConnections) || (maxConnections == 0)) { while((i++ < maxConnections) || (maxConnections == 0)) {
//doCMDS connection; //DoCMDS connection;
server = listener.accept(); server = listener.accept();
doCMDS conn_c = new doCMDS(server, props, database, _log); DoCMDS conn_c = new DoCMDS(server, props, database, _log);
Thread t = new Thread(conn_c); Thread t = new Thread(conn_c);
t.start(); t.start();
} }
......
This diff is collapsed.
...@@ -40,7 +40,7 @@ import net.i2p.util.Log; ...@@ -40,7 +40,7 @@ import net.i2p.util.Log;
*/ */
public class I2Plistener implements Runnable { public class I2Plistener implements Runnable {
private nickname info, database; private NamedDB info, database;
private Log _log; private Log _log;
private int tgwatch; private int tgwatch;
public I2PSocketManager socketManager; public I2PSocketManager socketManager;
...@@ -53,7 +53,7 @@ public class I2Plistener implements Runnable { ...@@ -53,7 +53,7 @@ public class I2Plistener implements Runnable {
* @param database * @param database
* @param _log * @param _log
*/ */
I2Plistener(I2PSocketManager S, nickname info, nickname database, Log _log) { I2Plistener(I2PSocketManager S, NamedDB info, NamedDB database, Log _log) {
this.database = database; this.database = database;
this.info = info; this.info = info;
this._log = _log; this._log = _log;
......
...@@ -36,7 +36,7 @@ import net.i2p.client.streaming.I2PSocket; ...@@ -36,7 +36,7 @@ import net.i2p.client.streaming.I2PSocket;
public class I2PtoTCP implements Runnable { public class I2PtoTCP implements Runnable {
private I2PSocket I2P; private I2PSocket I2P;
private nickname info, database; private NamedDB info, database;
private Socket sock; private Socket sock;
/** /**
...@@ -46,65 +46,93 @@ public class I2PtoTCP implements Runnable { ...@@ -46,65 +46,93 @@ public class I2PtoTCP implements Runnable {
* @param info * @param info
* @param database * @param database
*/ */
I2PtoTCP(I2PSocket I2Psock, nickname info, nickname database) { I2PtoTCP(I2PSocket I2Psock, NamedDB info, NamedDB database) {
this.I2P = I2Psock; this.I2P = I2Psock;
this.info = info; this.info = info;
this.database = database; this.database = database;
} }
private void rlock() throws Exception {
database.getReadLock();
info.getReadLock();
}
private void runlock() throws Exception {
database.releaseReadLock();
info.releaseReadLock();
}
/** /**
* I2P stream to TCP stream thread starter * I2P stream to TCP stream thread starter
* *
*/ */
public void run() { public void run() {
String host;
try { int port;
database.getReadLock(); boolean tell;
info.getReadLock(); die: {
String host = info.get("OUTHOST").toString(); try {
int port = Integer.parseInt(info.get("OUTPORT").toString());
boolean tell = info.get("QUIET").equals(Boolean.FALSE);
info.releaseReadLock();
database.releaseReadLock();
sock = new Socket(host, port);
// make readers/writers
InputStream in = sock.getInputStream();
OutputStream out = sock.getOutputStream();
InputStream Iin = I2P.getInputStream();
OutputStream Iout = I2P.getOutputStream();
I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default
if(tell) {
// tell who is connecting
out.write(I2P.getPeerDestination().toBase64().getBytes());
out.write(10); // nl
out.flush(); // not really needed, but...
}
// setup to cross the streams
TCPio conn_c = new TCPio(in, Iout, info, database); // app -> I2P
TCPio conn_a = new TCPio(Iin, out, info, database); // I2P -> app
Thread t = new Thread(conn_c, "TCPioA");
Thread q = new Thread(conn_a, "TCPioB");
// Fire!
t.start();
q.start();
while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
try { try {
Thread.sleep(10); //sleep for 10 ms rlock();
} catch(InterruptedException e) { } catch(Exception e) {
// nop break die;
} }
} try {
host = info.get("OUTHOST").toString();
port = Integer.parseInt(info.get("OUTPORT").toString());
tell = info.get("QUIET").equals(Boolean.FALSE);
} catch(Exception e) {
runlock();
break die;
}
try {
runlock();
} catch(Exception e) {
break die;
}
sock = new Socket(host, port);
// make readers/writers
InputStream in = sock.getInputStream();
OutputStream out = sock.getOutputStream();
InputStream Iin = I2P.getInputStream();
OutputStream Iout = I2P.getOutputStream();
I2P.setReadTimeout(0); // temp bugfix, this *SHOULD* be the default
} catch(Exception e) { if(tell) {
} // tell who is connecting
out.write(I2P.getPeerDestination().toBase64().getBytes());
out.write(10); // nl
out.flush(); // not really needed, but...
}
// setup to cross the streams
TCPio conn_c = new TCPio(in, Iout, info, database); // app -> I2P
TCPio conn_a = new TCPio(Iin, out, info, database); // I2P -> app
Thread t = new Thread(conn_c, "TCPioA");
Thread q = new Thread(conn_a, "TCPioB");
// Fire!
t.start();
q.start();
while(t.isAlive() && q.isAlive()) { // AND is used here to kill off the other thread
try {
Thread.sleep(10); //sleep for 10 ms
} catch(InterruptedException e) {
// nop
}
}
} catch(Exception e) {
break die;
}
} // die
try { try {
I2P.close(); I2P.close();
} catch(Exception e) { } catch(Exception e) {
tell = false;
} }
try { try {
sock.close(); sock.close();
} catch(Exception e) { } catch(Exception e) {
tell = false;
} }
} }
} }
...@@ -41,7 +41,7 @@ import net.i2p.util.Log; ...@@ -41,7 +41,7 @@ import net.i2p.util.Log;
*/ */
public class MUXlisten implements Runnable { public class MUXlisten implements Runnable {
private nickname database, info; private NamedDB database, info;
private Log _log; private Log _log;
private I2PSocketManager socketManager; private I2PSocketManager socketManager;
private ByteArrayInputStream prikey; private ByteArrayInputStream prikey;
...@@ -51,6 +51,7 @@ public class MUXlisten implements Runnable { ...@@ -51,6 +51,7 @@ public class MUXlisten implements Runnable {
private int backlog = 50; // should this be more? less? private int backlog = 50; // should this be more? less?
boolean go_out; boolean go_out;
boolean come_in; boolean come_in;
/** /**
* Constructor Will fail if INPORT is occupied. * Constructor Will fail if INPORT is occupied.
* *
...@@ -60,7 +61,7 @@ public class MUXlisten implements Runnable { ...@@ -60,7 +61,7 @@ public class MUXlisten implements Runnable {
* @throws net.i2p.I2PException * @throws net.i2p.I2PException
* @throws java.io.IOException * @throws java.io.IOException
*/ */
MUXlisten(nickname database, nickname info, Log _log) throws I2PException, IOException { MUXlisten(NamedDB database, NamedDB info, Log _log) throws I2PException, IOException, RuntimeException {
int port = 0; int port = 0;
InetAddress host = null; InetAddress host = null;
this.database = database; this.database = database;
...@@ -90,7 +91,7 @@ public class MUXlisten implements Runnable { ...@@ -90,7 +91,7 @@ public class MUXlisten implements Runnable {
if(this.come_in) { if(this.come_in) {
this.listener = new ServerSocket(port, backlog, host); this.listener = new ServerSocket(port, backlog, host);
} }
// Everything is OK as far as we can tell. // Everything is OK as far as we can tell.
this.database.getWriteLock(); this.database.getWriteLock();
this.info.getWriteLock(); this.info.getWriteLock();
...@@ -99,82 +100,143 @@ public class MUXlisten implements Runnable { ...@@ -99,82 +100,143 @@ public class MUXlisten implements Runnable {
this.database.releaseWriteLock(); this.database.releaseWriteLock();
} }
private void rlock() throws Exception {
database.getReadLock();
info.getReadLock();
}
private void runlock() throws Exception {
database.releaseReadLock();
info.releaseReadLock();
}
private void wlock() throws Exception {
database.getWriteLock();
info.getWriteLock();
}
private void wunlock() throws Exception {
info.releaseWriteLock();
database.releaseWriteLock();
}
/** /**
* MUX sockets, fire off a thread to connect, get destination info, and do I/O * MUX sockets, fire off a thread to connect, get destination info, and do I/O
* *
*/ */
public void run() { public void run() {
this.database.getWriteLock();
this.info.getWriteLock();
info.add("RUNNING", Boolean.TRUE);
info.add("STARTING", Boolean.FALSE);
this.info.releaseWriteLock();
this.database.releaseWriteLock();
try { try {
tg = new ThreadGroup(N); wlock();
try {
// toss the connections to a new threads. info.add("RUNNING", Boolean.TRUE);
// will wrap with TCP and UDP when UDP works info.add("STARTING", Boolean.FALSE);
} catch(Exception e) {
if(go_out) { wunlock();
// I2P -> TCP return;
I2Plistener conn = new I2Plistener(socketManager, info, database, _log);
Thread t = new Thread(tg, conn, "BOBI2Plistener " + N);
t.start();
}
if(come_in) {
// TCP -> I2P
TCPlistener conn = new TCPlistener(listener, socketManager, info, database, _log);
Thread q = new Thread(tg, conn, "BOBTCPlistener" + N);
q.start();
} }
} catch(Exception e) {
return;
}
try {
wunlock();
} catch(Exception e) {
return;
}
boolean spin = true; quit: {
while(spin) { try {
try { tg = new ThreadGroup(N);
Thread.sleep(1000); //sleep for 1000 ms (One second) die: {
} catch(InterruptedException e) { // toss the connections to a new threads.
// nop // will wrap with TCP and UDP when UDP works
if(go_out) {
// I2P -> TCP
I2Plistener conn = new I2Plistener(socketManager, info, database, _log);
Thread t = new Thread(tg, conn, "BOBI2Plistener " + N);
t.start();
}
if(come_in) {
// TCP -> I2P
TCPlistener conn = new TCPlistener(listener, socketManager, info, database, _log);
Thread q = new Thread(tg, conn, "BOBTCPlistener" + N);
q.start();
}
boolean spin = true;
while(spin) {
try {
Thread.sleep(1000); //sleep for 1000 ms (One second)
} catch(InterruptedException e) {
// nop
}
try {
rlock();
try {
spin = info.get("STOPPING").equals(Boolean.FALSE);
} catch(Exception e) {
runlock();
break die;
}
} catch(Exception e) {
break die;
}
try {
runlock();
} catch(Exception e) {
break die;
}
}
try {
wlock();
try {
info.add("RUNNING", Boolean.FALSE);
} catch(Exception e) {
wunlock();
break die;
}
} catch(Exception e) {
break die;
}
try {
wunlock();
} catch(Exception e) {
break die;
}
} // die
// wait for child threads and thread groups to die
while(tg.activeCount() + tg.activeGroupCount() != 0) {
try {
Thread.sleep(1000); //sleep for 1000 ms (One second)
} catch(InterruptedException ex) {
// nop
}
} }
tg.destroy();
this.database.getReadLock(); // Zap reference to the ThreadGroup so the JVM can GC it.
this.info.getReadLock(); tg = null;
spin = info.get("STOPPING").equals(Boolean.FALSE); } catch(Exception e) {
this.database.releaseReadLock(); break quit;
this.info.releaseReadLock();
} }
} // quit
this.database.getWriteLock(); socketManager.destroySocketManager();
this.info.getWriteLock(); // zero out everything, just incase.
info.add("RUNNING", Boolean.FALSE); try {
this.info.releaseWriteLock(); wlock();
this.database.releaseWriteLock(); try {
info.add("STARTING", Boolean.FALSE);
// wait for child threads and thread groups to die info.add("STOPPING", Boolean.FALSE);
while(tg.activeCount() + tg.activeGroupCount() != 0) { info.add("RUNNING", Boolean.FALSE);
try { } catch(Exception e) {
Thread.sleep(1000); //sleep for 1000 ms (One second) wunlock();
} catch(InterruptedException ex) { return;
// nop
}
} }
tg.destroy(); wunlock();
// Zap reference to the ThreadGroup so the JVM can GC it.
tg = null;
} catch(Exception e) { } catch(Exception e) {
return;
} }
socketManager.destroySocketManager();
// zero out everything, just incase.
this.database.getWriteLock();
this.info.getWriteLock();
info.add("STARTING", Boolean.FALSE);
info.add("STOPPING", Boolean.FALSE);
info.add("RUNNING", Boolean.FALSE);
this.info.releaseWriteLock();
this.database.releaseWriteLock();
} }
} }
...@@ -28,7 +28,7 @@ package net.i2p.BOB; ...@@ -28,7 +28,7 @@ package net.i2p.BOB;
* *
* @author sponge * @author sponge
*/ */
public class nickname { public class NamedDB {
private volatile Object[][] data; private volatile Object[][] data;
private volatile int index, writersWaiting, readers; private volatile int index, writersWaiting, readers;
...@@ -37,7 +37,7 @@ public class nickname { ...@@ -37,7 +37,7 @@ public class nickname {
* make initial NULL object * make initial NULL object
* *
*/ */
public nickname() { public NamedDB() {
this.data = new Object[1][2]; this.data = new Object[1][2];
this.index = this.writersWaiting = this.readers = 0; this.index = this.writersWaiting = this.readers = 0;
} }
...@@ -76,8 +76,9 @@ public class nickname { ...@@ -76,8 +76,9 @@ public class nickname {
* Find objects in the array, returns it's index or throws exception * Find objects in the array, returns it's index or throws exception
* @param key * @param key
* @return an objects index * @return an objects index
* @throws ArrayIndexOutOfBoundsException when key does not exist
*/ */
public int idx(Object key) { public int idx(Object key) throws ArrayIndexOutOfBoundsException {
for(int i = 0; i < index; i++) { for(int i = 0; i < index; i++) {
if(key.equals(data[i][0])) { if(key.equals(data[i][0])) {
return i; return i;
...@@ -115,7 +116,6 @@ public class nickname { ...@@ -115,7 +116,6 @@ public class nickname {
} }
index -= didsomething; index -= didsomething;
data = olddata; data = olddata;
} }
/** /**
......
...@@ -36,7 +36,7 @@ public class TCPio implements Runnable { ...@@ -36,7 +36,7 @@ public class TCPio implements Runnable {
private InputStream Ain; private InputStream Ain;
private OutputStream Aout; private OutputStream Aout;
private nickname info, database; private NamedDB info, database;
/** /**
* Constructor * Constructor
...@@ -46,16 +46,13 @@ public class TCPio implements Runnable { ...@@ -46,16 +46,13 @@ public class TCPio implements Runnable {
* @param info * @param info
* @param database * @param database
*/ */
TCPio(InputStream Ain, OutputStream Aout, nickname info, nickname database) { TCPio(InputStream Ain, OutputStream Aout, NamedDB info, NamedDB database) {
this.Ain = Ain; this.Ain = Ain;
this.Aout = Aout; this.Aout = Aout;
this.info = info; this.info = info;
this.database = database; this.database = database;
} }
/**
* kill off the streams, to hopefully cause an IOException in the thread in order to kill it.
*/
/** /**
* Copy from source to destination... * Copy from source to destination...
* and yes, we are totally OK to block here on writes, * and yes, we are totally OK to block here on writes,
...@@ -73,17 +70,18 @@ public class TCPio implements Runnable { ...@@ -73,17 +70,18 @@ public class TCPio implements Runnable {
spin = info.get("RUNNING").equals(Boolean.TRUE); spin = info.get("RUNNING").equals(Boolean.TRUE);
info.releaseReadLock(); info.releaseReadLock();
database.releaseReadLock(); database.releaseReadLock();
b = Ain.read(a, 0, 1); b = Ain.read(a, 0, 1);
// System.out.println(info.get("NICKNAME").toString() + " " + b); // System.out.println(info.get("NICKNAME").toString() + " " + b);
if(b > 0) { if(b > 0) {
Aout.write(a, 0, 1); Aout.write(a, 0, b);
// Aout.flush(); too slow!
} else if(b == 0) { } else if(b == 0) {
try { Thread.yield(); // this should act like a mini sleep.
// Thread.yield(); if(Ain.available() == 0) {
Thread.sleep(10); try {
} catch(InterruptedException ex) { // Thread.yield();
Thread.sleep(10);
} catch(InterruptedException ex) {
}
} }
} else { } else {
/* according to the specs: /* according to the specs:
...@@ -97,6 +95,8 @@ public class TCPio implements Runnable { ...@@ -97,6 +95,8 @@ public class TCPio implements Runnable {
} }
} }
} catch(Exception e) { } catch(Exception e) {
// Eject!!! Eject!!!
return;
} }
} }
} }
...@@ -40,7 +40,7 @@ import net.i2p.util.Log; ...@@ -40,7 +40,7 @@ import net.i2p.util.Log;
*/ */
public class TCPlistener implements Runnable { public class TCPlistener implements Runnable {
private nickname info, database; private NamedDB info, database;
private Log _log; private Log _log;
private int tgwatch; private int tgwatch;
public I2PSocketManager socketManager; public I2PSocketManager socketManager;
...@@ -54,7 +54,7 @@ public class TCPlistener implements Runnable { ...@@ -54,7 +54,7 @@ public class TCPlistener implements Runnable {
* @param database * @param database
* @param _log * @param _log
*/ */
TCPlistener(ServerSocket listener, I2PSocketManager S, nickname info, nickname database, Log _log) { TCPlistener(ServerSocket listener, I2PSocketManager S, NamedDB info, NamedDB database, Log _log) {
this.database = database; this.database = database;
this.info = info; this.info = info;
this._log = _log; this._log = _log;
......
...@@ -45,7 +45,7 @@ import net.i2p.i2ptunnel.I2PTunnel; ...@@ -45,7 +45,7 @@ import net.i2p.i2ptunnel.I2PTunnel;
public class TCPtoI2P implements Runnable { public class TCPtoI2P implements Runnable {
private I2PSocket I2P; private I2PSocket I2P;
private nickname info, database; private NamedDB info, database;
private Socket sock; private Socket sock;
private I2PSocketManager socketManager; private I2PSocketManager socketManager;
...@@ -57,7 +57,7 @@ public class TCPtoI2P implements Runnable { ...@@ -57,7 +57,7 @@ public class TCPtoI2P implements Runnable {
* @return line of text as a String * @return line of text as a String
* @throws Exception * @throws Exception
*/ */
public static String Lread(InputStream in) throws Exception { private static String lnRead(InputStream in) throws Exception {
String S; String S;
int b; int b;
char c; char c;
...@@ -87,7 +87,7 @@ public class TCPtoI2P implements Runnable { ...@@ -87,7 +87,7 @@ public class TCPtoI2P implements Runnable {
* @param info * @param info
* @param database * @param database
*/ */
TCPtoI2P(I2PSocketManager i2p, Socket socket, nickname info, nickname database) { TCPtoI2P(I2PSocketManager i2p, Socket socket, NamedDB info, NamedDB database) {
this.sock = socket; this.sock = socket;
this.info = info; this.info = info;
this.database = database; this.database = database;
...@@ -119,7 +119,7 @@ public class TCPtoI2P implements Runnable { ...@@ -119,7 +119,7 @@ public class TCPtoI2P implements Runnable {
InputStream in = sock.getInputStream(); InputStream in = sock.getInputStream();
OutputStream out = sock.getOutputStream(); OutputStream out = sock.getOutputStream();
try { try {
line = Lread(in); line = lnRead(in);
input = line.toLowerCase(); input = line.toLowerCase();
Destination dest = null; Destination dest = null;
......
...@@ -45,7 +45,7 @@ import net.i2p.util.Log; ...@@ -45,7 +45,7 @@ import net.i2p.util.Log;
*/ */
public class UDPIOthread implements I2PSessionListener, Runnable { public class UDPIOthread implements I2PSessionListener, Runnable {
private nickname info; private NamedDB info;
private Log _log; private Log _log;
private Socket socket; private Socket socket;
private DataInputStream in; private DataInputStream in;
...@@ -61,7 +61,7 @@ public class UDPIOthread implements I2PSessionListener, Runnable { ...@@ -61,7 +61,7 @@ public class UDPIOthread implements I2PSessionListener, Runnable {
* @param socket * @param socket
* @param _session * @param _session
*/ */
UDPIOthread(nickname info, Log _log, Socket socket, I2PSession _session) { UDPIOthread(NamedDB info, Log _log, Socket socket, I2PSession _session) {
this.info = info; this.info = info;
this._log = _log; this._log = _log;
this.socket = socket; this.socket = socket;
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment