zip and sign su3's concurrently

This commit is contained in:
Matt Drollette
2014-12-14 14:54:12 -06:00
parent ffab6ad5e9
commit fc0d63b4da
2 changed files with 110 additions and 23 deletions

View File

@@ -52,7 +52,11 @@ func NewReseedCommand() cli.Command {
cli.IntFlag{ cli.IntFlag{
Name: "numRi", Name: "numRi",
Value: 75, Value: 75,
Usage: "Number of routerInfos to include in each SU3 file", Usage: "Number of routerInfos to include in each su3 file",
},
cli.IntFlag{
Name: "numSu3",
Usage: "Number of su3 files to build",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "interval", Name: "interval",
@@ -91,9 +95,8 @@ func reseedAction(c *cli.Context) {
// use at most half of the cores // use at most half of the cores
cpus := runtime.NumCPU() cpus := runtime.NumCPU()
if cpus >= 4 { runtime.GOMAXPROCS(cpus)
runtime.GOMAXPROCS(cpus / 2) log.Printf("Using %d CPU cores.\n", cpus)
}
// load our signing privKey // load our signing privKey
// @todo: generate a new signing key if one doesn't exist // @todo: generate a new signing key if one doesn't exist
@@ -110,6 +113,7 @@ func reseedAction(c *cli.Context) {
reseeder.SigningKey = privKey reseeder.SigningKey = privKey
reseeder.SignerId = []byte(signerId) reseeder.SignerId = []byte(signerId)
reseeder.NumRi = c.Int("numRi") reseeder.NumRi = c.Int("numRi")
reseeder.NumSu3 = c.Int("numSu3")
reseeder.RebuildInterval = reloadIntvl reseeder.RebuildInterval = reloadIntvl
reseeder.Start() reseeder.Start()

View File

@@ -12,6 +12,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"regexp" "regexp"
"sync"
"time" "time"
"github.com/MDrollette/go-i2p/su3" "github.com/MDrollette/go-i2p/su3"
@@ -52,16 +53,15 @@ type ReseederImpl struct {
SignerId []byte SignerId []byte
NumRi int NumRi int
RebuildInterval time.Duration RebuildInterval time.Duration
numSu3 int NumSu3 int
} }
func NewReseeder(netdb NetDbProvider) *ReseederImpl { func NewReseeder(netdb NetDbProvider) *ReseederImpl {
return &ReseederImpl{ return &ReseederImpl{
netdb: netdb, netdb: netdb,
su3s: make(chan [][]byte), su3s: make(chan [][]byte),
NumRi: 50, NumRi: 75,
RebuildInterval: 12 * time.Hour, RebuildInterval: 12 * time.Hour,
numSu3: 50,
} }
} }
@@ -104,49 +104,108 @@ func (rs *ReseederImpl) Start() chan bool {
} }
func (rs *ReseederImpl) rebuild() error { func (rs *ReseederImpl) rebuild() error {
log.Println("Rebuilding su3s...") log.Println("Rebuilding su3 cache...")
// get all RIs from netdb provider
ris, err := rs.netdb.RouterInfos() ris, err := rs.netdb.RouterInfos()
if nil != err { if nil != err {
return fmt.Errorf("Unable to get routerInfos: %s", err) return fmt.Errorf("Unable to get routerInfos: %s", err)
} }
// fail if we don't have enough RIs to make a single reseed file
if rs.NumRi > len(ris) { if rs.NumRi > len(ris) {
return fmt.Errorf("Not enough routerInfos.") return fmt.Errorf("Not enough routerInfos.")
} }
newSu3s := make([][]byte, rs.numSu3) // build a pipeline ris -> seeds -> su3
for i := 0; i < rs.numSu3; i++ { seedsChan := rs.seedsProducer(ris)
var seeds Seeds // fan-in multiple builders
for _, z := range rand.Perm(rs.NumRi) { su3Chan := fanIn(rs.su3Builder(seedsChan), rs.su3Builder(seedsChan), rs.su3Builder(seedsChan))
seeds = append(seeds, ris[z])
}
gs, err := rs.CreateSu3(seeds)
if nil != err {
return err
}
// read from su3 chan and append to su3s slice
var newSu3s [][]byte
for gs := range su3Chan {
data, err := gs.MarshalBinary() data, err := gs.MarshalBinary()
if nil != err { if nil != err {
return err return err
} }
newSu3s[i] = data newSu3s = append(newSu3s, data)
} }
// use this new set of su3s
rs.su3s <- newSu3s rs.su3s <- newSu3s
log.Println("Done rebuilding.") log.Println("Done rebuilding.")
return nil return nil
} }
func (rs *ReseederImpl) PeerSu3Bytes(peer Peer) ([]byte, error) { func (rs *ReseederImpl) seedsProducer(ris []routerInfo) <-chan Seeds {
hashMod := peer.Hash() % rs.numSu3 lenRis := len(ris)
// if NumSu3 is not specified, then we determine the "best" number based on the number of RIs
var numSu3s int
if rs.NumSu3 != 0 {
numSu3s = rs.NumSu3
} else {
switch {
case lenRis > 4000:
numSu3s = 300
case lenRis > 3000:
numSu3s = 200
case lenRis > 2000:
numSu3s = 100
case lenRis > 1000:
numSu3s = 75
default:
numSu3s = 50
}
}
// @todo: only use a portion of available RIs
log.Printf("Building %d su3 files each containing %d out of %d routerInfos.\n", numSu3s, rs.NumRi, lenRis)
out := make(chan Seeds)
go func() {
for i := 0; i < numSu3s; i++ {
var seeds Seeds
unsorted := rand.Perm(lenRis)
for z := 0; z < rs.NumRi; z++ {
seeds = append(seeds, ris[unsorted[z]])
}
out <- seeds
}
close(out)
}()
return out
}
func (rs *ReseederImpl) su3Builder(in <-chan Seeds) <-chan *su3.Su3File {
out := make(chan *su3.Su3File)
go func() {
for seeds := range in {
gs, err := rs.CreateSu3(seeds)
if nil != err {
log.Println(err)
continue
}
out <- gs
}
close(out)
}()
return out
}
func (rs *ReseederImpl) PeerSu3Bytes(peer Peer) ([]byte, error) {
m := <-rs.su3s m := <-rs.su3s
defer func() { rs.su3s <- m }() defer func() { rs.su3s <- m }()
return m[hashMod], nil return m[peer.Hash()%len(m)], nil
} }
func (rs *ReseederImpl) Seeds(p Peer) (Seeds, error) { func (rs *ReseederImpl) Seeds(p Peer) (Seeds, error) {
@@ -222,3 +281,27 @@ func (db *LocalNetDbImpl) RouterInfos() (routerInfos []routerInfo, err error) {
return return
} }
func fanIn(inputs ...<-chan *su3.Su3File) <-chan *su3.Su3File {
out := make(chan *su3.Su3File, len(inputs))
var wg sync.WaitGroup
wg.Add(len(inputs))
go func() {
// close "out" when we're done
wg.Wait()
close(out)
}()
// fan-in all the inputs to a single output
for _, input := range inputs {
go func(in <-chan *su3.Su3File) {
defer wg.Done()
for n := range in {
out <- n
}
}(input)
}
return out
}