AsyncFortunaRandomSource: Refactor refiller for concurrent and shutdown

Add shutdown hook for FortunaRandomSource
This commit is contained in:
zzz
2011-06-16 12:42:25 +00:00
parent 20ad7a44a7
commit 518fdd8c03
4 changed files with 95 additions and 70 deletions

View File

@@ -2,6 +2,7 @@ package gnu.crypto.prng;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import net.i2p.I2PAppContext;
import net.i2p.util.Log;
@@ -10,6 +11,12 @@ import net.i2p.util.Log;
* fortuna instance that tries to avoid blocking if at all possible by using separate
* filled buffer segments rather than one buffer (and blocking when that buffer's data
* has been eaten)
*
* Note that this class is not fully Thread safe!
* The following methods must be synchronized externally, they are not
* sycned here or in super():
* addRandomByte(), addRandomBytes(), nextByte(), nextBytes(), seed()
*
*/
public class AsyncFortunaStandalone extends FortunaStandalone implements Runnable {
/**
@@ -19,25 +26,23 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
private static final int DEFAULT_BUFFERS = 2;
private static final int DEFAULT_BUFSIZE = 256*1024;
private final int _bufferCount;
private final byte asyncBuffers[][];
private final int status[];
private int nextBuf = 0;
private final int _bufferSize;
/** the lock */
private final Object asyncBuffers = new Object();
private final I2PAppContext _context;
private final Log _log;
private volatile boolean _isRunning;
private Thread _refillThread;
private final LinkedBlockingQueue<AsyncBuffer> _fullBuffers;
private final LinkedBlockingQueue<AsyncBuffer> _emptyBuffers;
private AsyncBuffer _currentBuffer;
private static final int STATUS_NEED_FILL = 0;
private static final int STATUS_FILLING = 1;
private static final int STATUS_FILLED = 2;
private static final int STATUS_LIVE = 3;
public AsyncFortunaStandalone(I2PAppContext context) {
super();
_bufferCount = Math.max(context.getProperty("prng.buffers", DEFAULT_BUFFERS), 2);
int bufferSize = Math.max(context.getProperty("prng.bufferSize", DEFAULT_BUFSIZE), 16*1024);
asyncBuffers = new byte[_bufferCount][bufferSize];
status = new int[_bufferCount];
for (int i = 0; i < _bufferCount; i++)
status[i] = STATUS_NEED_FILL;
_bufferSize = Math.max(context.getProperty("prng.bufferSize", DEFAULT_BUFSIZE), 16*1024);
_emptyBuffers = new LinkedBlockingQueue(_bufferCount);
_fullBuffers = new LinkedBlockingQueue(_bufferCount);
_context = context;
context.statManager().createRequiredRateStat("prng.bufferWaitTime", "Delay for random number buffer (ms)", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } );
context.statManager().createRequiredRateStat("prng.bufferFillTime", "Time to fill random number buffer (ms)", "Encryption", new long[] { 60*1000, 10*60*1000, 60*60*1000 } );
@@ -45,10 +50,27 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
}
public void startup() {
Thread refillThread = new Thread(this, "PRNG");
refillThread.setDaemon(true);
refillThread.setPriority(Thread.MIN_PRIORITY+1);
refillThread.start();
for (int i = 0; i < _bufferCount; i++)
_emptyBuffers.offer(new AsyncBuffer(_bufferSize));
_isRunning = true;
_refillThread = new Thread(this, "PRNG");
_refillThread.setDaemon(true);
_refillThread.setPriority(Thread.MIN_PRIORITY+1);
_refillThread.start();
}
/**
* Note - methods may hang or NPE or throw IllegalStateExceptions after this
* @since 0.8.8
*/
public void shutdown() {
_isRunning = false;
_emptyBuffers.clear();
_fullBuffers.clear();
_refillThread.interrupt();
// unsynchronized to avoid hanging, may NPE elsewhere
_currentBuffer = null;
buffer = null;
}
/** the seed is only propogated once the prng is started with startup() */
@@ -63,80 +85,67 @@ public class AsyncFortunaStandalone extends FortunaStandalone implements Runnabl
@Override
protected void allocBuffer() {}
private static class AsyncBuffer {
public final byte[] buffer;
public AsyncBuffer(int size) {
buffer = new byte[size];
}
}
/**
* make the next available filled buffer current, scheduling any unfilled
* buffers for refill, and blocking until at least one buffer is ready
*/
protected void rotateBuffer() {
synchronized (asyncBuffers) {
// wait until we get some filled
AsyncBuffer old = _currentBuffer;
if (old != null)
_emptyBuffers.offer(old);
long before = System.currentTimeMillis();
long waited = 0;
while (status[nextBuf] != STATUS_FILLED) {
//System.out.println(Thread.currentThread().getName() + ": Next PRNG buffer "
// + nextBuf + " isn't ready (" + status[nextBuf] + ")");
//new Exception("source").printStackTrace();
asyncBuffers.notifyAll();
AsyncBuffer nextBuffer = null;
while (nextBuffer == null) {
if (!_isRunning)
throw new IllegalStateException("shutdown");
try {
asyncBuffers.wait();
} catch (InterruptedException ie) {}
waited = System.currentTimeMillis()-before;
nextBuffer = _fullBuffers.take();
} catch (InterruptedException ie) {
continue;
}
}
long waited = System.currentTimeMillis()-before;
_context.statManager().addRateData("prng.bufferWaitTime", waited, 0);
if (waited > 10*1000 && _log.shouldLog(Log.WARN))
_log.warn(Thread.currentThread().getName() + ": Took " + waited
+ "ms for a full PRNG buffer to be found");
//System.out.println(Thread.currentThread().getName() + ": Switching to prng buffer " + nextBuf);
buffer = asyncBuffers[nextBuf];
status[nextBuf] = STATUS_LIVE;
int prev=nextBuf-1;
if (prev<0)
prev = _bufferCount-1;
if (status[prev] == STATUS_LIVE)
status[prev] = STATUS_NEED_FILL;
nextBuf++;
if (nextBuf >= _bufferCount)
nextBuf = 0;
asyncBuffers.notify();
_currentBuffer = nextBuffer;
buffer = nextBuffer.buffer;
}
}
/**
* The refiller thread
*/
public void run() {
while (true) {
int toFill = -1;
while (_isRunning) {
AsyncBuffer aBuff = null;
try {
synchronized (asyncBuffers) {
for (int i = 0; i < _bufferCount; i++) {
if (status[i] == STATUS_NEED_FILL) {
status[i] = STATUS_FILLING;
toFill = i;
break;
}
}
if (toFill == -1) {
//System.out.println(Thread.currentThread().getName() + ": All pending buffers full");
asyncBuffers.wait();
}
}
} catch (InterruptedException ie) {}
aBuff = _emptyBuffers.take();
} catch (InterruptedException ie) {
continue;
}
if (toFill != -1) {
//System.out.println(Thread.currentThread().getName() + ": Filling prng buffer " + toFill);
long before = System.currentTimeMillis();
doFill(asyncBuffers[toFill]);
doFill(aBuff.buffer);
long after = System.currentTimeMillis();
synchronized (asyncBuffers) {
status[toFill] = STATUS_FILLED;
//System.out.println(Thread.currentThread().getName() + ": Prng buffer " + toFill + " filled after " + (after-before));
asyncBuffers.notifyAll();
}
_fullBuffers.offer(aBuff);
_context.statManager().addRateData("prng.bufferFillTime", after - before, 0);
Thread.yield();
long waitTime = (after-before)*5;
if (waitTime <= 0) // somehow postman saw waitTime show up as negative
waitTime = 50;
try { Thread.sleep(waitTime); } catch (InterruptedException ie) {}
}
}
}

View File

@@ -44,6 +44,14 @@ public class FortunaRandomSource extends RandomSource implements EntropyHarveste
_haveNextGaussian = false;
}
/**
* Note - methods may hang or NPE or throw IllegalStateExceptions after this
* @since 0.8.8
*/
public void shutdown() {
_fortuna.shutdown();
}
@Override
public synchronized void setSeed(byte buf[]) {
_fortuna.addRandomBytes(buf);

View File

@@ -28,6 +28,12 @@ public class RandomSource extends SecureRandom implements EntropyHarvester {
private final EntropyHarvester _entropyHarvester;
protected final I2PAppContext _context;
/**
* Deprecated - do not instantiate this directly, as you won't get the
* good one (Fortuna). Use getInstance() or
* I2PAppContext.getGlobalContext().random() to get the FortunaRandomSource
* instance.
*/
public RandomSource(I2PAppContext context) {
super();
_context = context;
@@ -202,10 +208,4 @@ public class RandomSource extends SecureRandom implements EntropyHarvester {
rs.saveSeed();
}
}
// noop
private static class DummyEntropyHarvester implements EntropyHarvester {
public void feedEntropy(String source, long data, int bitoffset, int bits) {}
public void feedEntropy(String source, byte[] data, int offset, int len) {}
}
}

View File

@@ -45,6 +45,7 @@ import net.i2p.stat.RateStat;
import net.i2p.stat.StatManager;
import net.i2p.util.ByteCache;
import net.i2p.util.FileUtil;
import net.i2p.util.FortunaRandomSource;
import net.i2p.util.I2PAppThread;
import net.i2p.util.I2PThread;
import net.i2p.util.Log;
@@ -973,6 +974,13 @@ public class Router {
//try { _sessionKeyPersistenceHelper.shutdown(); } catch (Throwable t) { _log.log(Log.CRIT, "Error shutting down the session key manager", t); }
_context.deleteTempDir();
RouterContext.listContexts().remove(_context);
// shut down I2PAppContext tasks here
try {
((FortunaRandomSource)_context.random()).shutdown();
} catch (Throwable t) { _log.log(Log.CRIT, "Error shutting random()", t); }
// logManager shut down in finalShutdown()
finalShutdown(exitCode);
}