diff --git a/apps/BOB/src/net/i2p/BOB/DoCMDS.java b/apps/BOB/src/net/i2p/BOB/DoCMDS.java index 16da28ce9c019e2b588cacf7d83d0983e260e6e0..1a2d2a19a6f0c7552e4798c3bbcfd701b4f657c3 100644 --- a/apps/BOB/src/net/i2p/BOB/DoCMDS.java +++ b/apps/BOB/src/net/i2p/BOB/DoCMDS.java @@ -1263,11 +1263,11 @@ public class DoCMDS implements Runnable { tunnel = new MUXlisten(database, nickinfo, _log); Thread t = new Thread(tunnel); t.start(); - try { - Thread.sleep(1000 * 10); // Slow down the startup. - } catch(InterruptedException ie) { - // ignore it - } + // try { + // Thread.sleep(1000 * 10); // Slow down the startup. + // } catch(InterruptedException ie) { + // // ignore it + // } out.println("OK tunnel starting"); } catch (I2PException e) { out.println("ERROR starting tunnel: " + e); diff --git a/apps/BOB/src/net/i2p/BOB/MUXlisten.java b/apps/BOB/src/net/i2p/BOB/MUXlisten.java index dc30c5445d0d31ca3e668e5b1cf14a491f21b112..cf65a0b128cbd10a8775146020f6355c90355d6e 100644 --- a/apps/BOB/src/net/i2p/BOB/MUXlisten.java +++ b/apps/BOB/src/net/i2p/BOB/MUXlisten.java @@ -133,7 +133,7 @@ public class MUXlisten implements Runnable { */ public void run() { I2PServerSocket SS = null; - int ticks = 1200; // Allow 120 seconds, no more. + int ticks = 100; // Allow 10 seconds, no more. try { wlock(); try { @@ -267,7 +267,6 @@ die: // System.out.println("MUXlisten: waiting for children"); if (tg.activeCount() + tg.activeGroupCount() != 0) { while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { - tg.interrupt(); // unwedge any blocking threads. ticks--; try { Thread.sleep(100); //sleep for 100 ms (One tenth second) @@ -308,7 +307,7 @@ die: } catch (Exception e) { // nop } - ticks = 600; // 60 seconds + ticks = 100; // 10 seconds if (tg.activeCount() + tg.activeGroupCount() != 0) { while ((tg.activeCount() + tg.activeGroupCount() != 0) && ticks != 0) { tg.interrupt(); // unwedge any blocking threads. @@ -325,10 +324,14 @@ die: // Zap reference to the ThreadGroup so the JVM can GC it. tg = null; } else { - System.out.println("BOB: MUXlisten: Can't kill threads. Please send the following dump to sponge@mail.i2p"); + System.out.println("BOB: MUXlisten: Forcibly killing threads."); System.out.println("\n\nBOB: MUXlisten: ThreadGroup dump BEGIN"); visit(tg, 0); System.out.println("BOB: MUXlisten: ThreadGroup dump END\n\n"); + nuke(tg,0); + tg.destroy(); + // Zap reference to the ThreadGroup so the JVM can GC it. + tg = null; } } @@ -354,7 +357,6 @@ die: wunlock(); } catch (Exception e) { } - } @@ -402,4 +404,37 @@ die: visit(groups[i], level + 1); } } + private static void nuke(ThreadGroup group, int level) { + // Get threads in `group' + int numThreads = group.activeCount(); + Thread[] threads = new Thread[numThreads * 2]; + numThreads = group.enumerate(threads, false); + // Enumerate each thread in `group' and stop it. + for (int i = 0; i < numThreads; i++) { + // Get thread + Thread thread = threads[i]; + try { + if(thread.isAlive()) thread.stop(); + } catch(SecurityException se) { + //nop + } + } + + // Get thread subgroups of `group' + int numGroups = group.activeGroupCount(); + ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; + numGroups = group.enumerate(groups, false); + + // Recursively visit each subgroup + for (int i = 0; i < numGroups; i++) { + nuke(groups[i], level + 1); + } + try { + group.destroy(); + } catch (IllegalThreadStateException IE) { + //nop + } catch(SecurityException se) { + //nop + } + } } diff --git a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java index a3e5e57a7f870087cff8648ca15e39bce4d7d3d9..a906dbcf857d2dc8a25091efdef1ea7947d0f5be 100644 --- a/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java +++ b/core/java/src/net/i2p/client/I2PSessionMuxedImpl.java @@ -4,18 +4,16 @@ package net.i2p.client; * public domain */ -import java.io.IOException; import java.io.InputStream; import java.util.concurrent.LinkedBlockingQueue; -import java.util.HashSet; import java.util.Properties; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import net.i2p.I2PAppContext; import net.i2p.data.DataHelper; import net.i2p.data.Destination; import net.i2p.data.SessionKey; -import net.i2p.data.SessionTag; import net.i2p.data.i2cp.MessagePayloadMessage; import net.i2p.util.Log; import net.i2p.util.SimpleScheduler; @@ -97,6 +95,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * 255 disallowed * @param port 1-65535 or PORT_ANY for all */ + @Override public void addSessionListener(I2PSessionListener lsnr, int proto, int port) { _demultiplexer.addListener(lsnr, proto, port); } @@ -107,11 +106,13 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * @param proto 1-254 or 0 for all; 255 disallowed * @param port 1-65535 or 0 for all */ + @Override public void addMuxedSessionListener(I2PSessionMuxedListener l, int proto, int port) { _demultiplexer.addMuxedListener(l, proto, port); } /** removes the specified listener (only) */ + @Override public void removeListener(int proto, int port) { _demultiplexer.removeListener(proto, port); } @@ -149,6 +150,7 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { * @param fromPort 1-65535 or 0 for unset * @param toPort 1-65535 or 0 for unset */ + @Override public boolean sendMessage(Destination dest, byte[] payload, int offset, int size, SessionKey keyUsed, Set tagsSent, long expires, int proto, int fromPort, int toPort) @@ -198,24 +200,36 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { protected class MuxedAvailabilityNotifier extends AvailabilityNotifier { private LinkedBlockingQueue<MsgData> _msgs; - private boolean _alive; + private AtomicBoolean _alive = new AtomicBoolean(false); private static final int POISON_SIZE = -99999; public MuxedAvailabilityNotifier() { _msgs = new LinkedBlockingQueue(); } - - public void stopNotifying() { - _msgs.clear(); - if (_alive) { - _alive = false; - try { - _msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0)); - } catch (InterruptedException ie) {} + + @Override + public void stopNotifying() { + boolean again = true; + _msgs.clear(); + // Thread.yield(); + if (_alive.get()) { + // System.out.println("I2PSessionMuxedImpl.stopNotifying()"); + while(again) { + _msgs.clear(); + try { + _msgs.put(new MsgData(0, POISON_SIZE, 0, 0, 0)); + again = false; + // System.out.println("I2PSessionMuxedImpl.stopNotifying() success."); + } catch (InterruptedException ie) { + continue; + } + } } + _alive.set(false); } /** unused */ + @Override public void available(long msgId, int size) { throw new IllegalArgumentException("no"); } public void available(long msgId, int size, int proto, int fromPort, int toPort) { @@ -224,20 +238,24 @@ class I2PSessionMuxedImpl extends I2PSessionImpl2 implements I2PSession { } catch (InterruptedException ie) {} } + @Override public void run() { - _alive = true; - while (true) { - MsgData msg; + MsgData msg; + _alive.set(true); + while (_alive.get()) { try { msg = _msgs.take(); } catch (InterruptedException ie) { + System.out.println("I2PSessionMuxedImpl.run() InterruptedException " + String.valueOf(_msgs.size()) + " Messages, Alive " + _alive.toString()); continue; } - if (msg.size == POISON_SIZE) + if (msg.size == POISON_SIZE) { + // System.out.println("I2PSessionMuxedImpl.run() POISONED"); break; + } try { - _demultiplexer.messageAvailable(I2PSessionMuxedImpl.this, msg.id, - msg.size, msg.proto, msg.fromPort, msg.toPort); + _demultiplexer.messageAvailable(I2PSessionMuxedImpl.this, + msg.id, msg.size, msg.proto, msg.fromPort, msg.toPort); } catch (Exception e) { _log.error("Error notifying app of message availability"); } diff --git a/history.txt b/history.txt index 50315edcd199ca59741c0f48c30d5bc4525d01c7..29c4157ed119c708b0accaf7b3a1bf3be6ba9602 100644 --- a/history.txt +++ b/history.txt @@ -1,5 +1,32 @@ +2009-04-25 sponge + * I2PSessionMuxedImpl atomic fixes + * BOB fixes. This should be the final bug wack. Good Luck to everybody! + +2009-04-23 zzz + * Blocklist: cleanup + * eepget: handle -h, --help, bad options, etc. + (http://forum.i2p/viewtopic.php?p=16261#16261) + * Fragmenter: don't re-throw the corrupt fragment IllegalStateException, + to limit the damage - root cause still not found + * i2psnark: (http://forum.i2p/viewtopic.php?t=3317) + - Change file limit to 512 (was 256) + - Change size limit to 10GB (was 5GB) + - Change request size to 16KB (was 32KB) + - Change pipeline to 5 (was 3) + * logs.jsp: Move version info to the top + * Jetty: Fix temp dir name handling on windows, which was + causing susidns not to start + (http://forum.i2p/viewtopic.php?t=3364) + * NTCP: Prevent IllegalStateException + * PeerProfile: + - Replace a hot lock with concurrent RW lock + - Rewrite ugly IP Restriction code + - Also use transport IP in restriction code + * RouterConsole: Make summary bar a refreshing iframe + * Transport: Start the previously unused CleanupUnreachable + 2009-04-21 sponge - * Code janator work, basic corrections involving @Override, and + * Code janitor work, basic corrections involving @Override, and appling final where it is important. Also fixed some equals methods and commented places that need fixing. diff --git a/router/java/src/net/i2p/router/RouterVersion.java b/router/java/src/net/i2p/router/RouterVersion.java index 1d4797d6fc46971d6b640b96122df2e67994299a..a34e9238c0046cbb2b2de9472a6a1282eb962def 100644 --- a/router/java/src/net/i2p/router/RouterVersion.java +++ b/router/java/src/net/i2p/router/RouterVersion.java @@ -17,7 +17,7 @@ import net.i2p.CoreVersion; public class RouterVersion { public final static String ID = "$Revision: 1.548 $ $Date: 2008-06-07 23:00:00 $"; public final static String VERSION = CoreVersion.VERSION; - public final static long BUILD = 1; + public final static long BUILD = 3; public static void main(String args[]) { System.out.println("I2P Router version: " + VERSION + "-" + BUILD); System.out.println("Router ID: " + RouterVersion.ID);