From fc0d63b4daf9430f658ef0fda979235fe7023e7c Mon Sep 17 00:00:00 2001 From: Matt Drollette Date: Sun, 14 Dec 2014 14:54:12 -0600 Subject: [PATCH] zip and sign su3's concurrently --- cmd/reseeder.go | 12 +++-- reseed/service.go | 121 ++++++++++++++++++++++++++++++++++++++-------- 2 files changed, 110 insertions(+), 23 deletions(-) diff --git a/cmd/reseeder.go b/cmd/reseeder.go index 8700edf..3d43854 100644 --- a/cmd/reseeder.go +++ b/cmd/reseeder.go @@ -52,7 +52,11 @@ func NewReseedCommand() cli.Command { cli.IntFlag{ Name: "numRi", Value: 75, - Usage: "Number of routerInfos to include in each SU3 file", + Usage: "Number of routerInfos to include in each su3 file", + }, + cli.IntFlag{ + Name: "numSu3", + Usage: "Number of su3 files to build", }, cli.StringFlag{ Name: "interval", @@ -91,9 +95,8 @@ func reseedAction(c *cli.Context) { // use at most half of the cores cpus := runtime.NumCPU() - if cpus >= 4 { - runtime.GOMAXPROCS(cpus / 2) - } + runtime.GOMAXPROCS(cpus) + log.Printf("Using %d CPU cores.\n", cpus) // load our signing privKey // @todo: generate a new signing key if one doesn't exist @@ -110,6 +113,7 @@ func reseedAction(c *cli.Context) { reseeder.SigningKey = privKey reseeder.SignerId = []byte(signerId) reseeder.NumRi = c.Int("numRi") + reseeder.NumSu3 = c.Int("numSu3") reseeder.RebuildInterval = reloadIntvl reseeder.Start() diff --git a/reseed/service.go b/reseed/service.go index a4ab05d..9897340 100644 --- a/reseed/service.go +++ b/reseed/service.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "regexp" + "sync" "time" "github.com/MDrollette/go-i2p/su3" @@ -52,16 +53,15 @@ type ReseederImpl struct { SignerId []byte NumRi int RebuildInterval time.Duration - numSu3 int + NumSu3 int } func NewReseeder(netdb NetDbProvider) *ReseederImpl { return &ReseederImpl{ netdb: netdb, su3s: make(chan [][]byte), - NumRi: 50, + NumRi: 75, RebuildInterval: 12 * time.Hour, - numSu3: 50, } } @@ -104,49 +104,108 @@ func (rs *ReseederImpl) Start() chan bool { } func (rs *ReseederImpl) rebuild() error { - log.Println("Rebuilding su3s...") + log.Println("Rebuilding su3 cache...") + + // get all RIs from netdb provider ris, err := rs.netdb.RouterInfos() if nil != err { return fmt.Errorf("Unable to get routerInfos: %s", err) } + // fail if we don't have enough RIs to make a single reseed file if rs.NumRi > len(ris) { return fmt.Errorf("Not enough routerInfos.") } - newSu3s := make([][]byte, rs.numSu3) - for i := 0; i < rs.numSu3; i++ { - var seeds Seeds - for _, z := range rand.Perm(rs.NumRi) { - seeds = append(seeds, ris[z]) - } - - gs, err := rs.CreateSu3(seeds) - if nil != err { - return err - } + // build a pipeline ris -> seeds -> su3 + seedsChan := rs.seedsProducer(ris) + // fan-in multiple builders + su3Chan := fanIn(rs.su3Builder(seedsChan), rs.su3Builder(seedsChan), rs.su3Builder(seedsChan)) + // read from su3 chan and append to su3s slice + var newSu3s [][]byte + for gs := range su3Chan { data, err := gs.MarshalBinary() if nil != err { return err } - newSu3s[i] = data + newSu3s = append(newSu3s, data) } + // use this new set of su3s rs.su3s <- newSu3s + log.Println("Done rebuilding.") return nil } -func (rs *ReseederImpl) PeerSu3Bytes(peer Peer) ([]byte, error) { - hashMod := peer.Hash() % rs.numSu3 +func (rs *ReseederImpl) seedsProducer(ris []routerInfo) <-chan Seeds { + lenRis := len(ris) + // if NumSu3 is not specified, then we determine the "best" number based on the number of RIs + var numSu3s int + if rs.NumSu3 != 0 { + numSu3s = rs.NumSu3 + } else { + switch { + case lenRis > 4000: + numSu3s = 300 + case lenRis > 3000: + numSu3s = 200 + case lenRis > 2000: + numSu3s = 100 + case lenRis > 1000: + numSu3s = 75 + default: + numSu3s = 50 + } + } + + // @todo: only use a portion of available RIs + log.Printf("Building %d su3 files each containing %d out of %d routerInfos.\n", numSu3s, rs.NumRi, lenRis) + + out := make(chan Seeds) + + go func() { + for i := 0; i < numSu3s; i++ { + var seeds Seeds + unsorted := rand.Perm(lenRis) + for z := 0; z < rs.NumRi; z++ { + seeds = append(seeds, ris[unsorted[z]]) + } + + out <- seeds + } + close(out) + }() + + return out +} + +func (rs *ReseederImpl) su3Builder(in <-chan Seeds) <-chan *su3.Su3File { + out := make(chan *su3.Su3File) + go func() { + for seeds := range in { + gs, err := rs.CreateSu3(seeds) + if nil != err { + log.Println(err) + continue + } + + out <- gs + } + close(out) + }() + return out +} + +func (rs *ReseederImpl) PeerSu3Bytes(peer Peer) ([]byte, error) { m := <-rs.su3s defer func() { rs.su3s <- m }() - return m[hashMod], nil + return m[peer.Hash()%len(m)], nil } func (rs *ReseederImpl) Seeds(p Peer) (Seeds, error) { @@ -222,3 +281,27 @@ func (db *LocalNetDbImpl) RouterInfos() (routerInfos []routerInfo, err error) { return } + +func fanIn(inputs ...<-chan *su3.Su3File) <-chan *su3.Su3File { + out := make(chan *su3.Su3File, len(inputs)) + + var wg sync.WaitGroup + wg.Add(len(inputs)) + go func() { + // close "out" when we're done + wg.Wait() + close(out) + }() + + // fan-in all the inputs to a single output + for _, input := range inputs { + go func(in <-chan *su3.Su3File) { + defer wg.Done() + for n := range in { + out <- n + } + }(input) + } + + return out +}