From b7696f9be2838afdb323f3ef5b1af65cab21439f Mon Sep 17 00:00:00 2001 From: Matt Drollette Date: Thu, 11 Dec 2014 08:24:06 -0600 Subject: [PATCH] refresh su3 cache --- cmd/reseeder.go | 25 ++++++++++++ reseed/server.go | 22 ++++------ reseed/service.go | 101 ++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 130 insertions(+), 18 deletions(-) diff --git a/cmd/reseeder.go b/cmd/reseeder.go index e1c2619..3dfe26f 100644 --- a/cmd/reseeder.go +++ b/cmd/reseeder.go @@ -3,6 +3,8 @@ package cmd import ( "fmt" "log" + "runtime" + "time" "github.com/MDrollette/go-i2p/reseed" "github.com/codegangsta/cli" @@ -39,6 +41,16 @@ func NewReseedCommand() cli.Command { Value: "reseed_private.pem", Usage: "Path to your su3 signing private key", }, + cli.IntFlag{ + Name: "numRi", + Value: 50, + Usage: "Number of routerInfos to include in each SU3 file", + }, + cli.StringFlag{ + Name: "interval", + Value: "12h", + Usage: "Duration between SU3 cache rebuilds (ex. 12h, 15m)", + }, }, } } @@ -50,6 +62,11 @@ func reseedAction(c *cli.Context) { return } + cpus := runtime.NumCPU() + if cpus >= 4 { + runtime.GOMAXPROCS(cpus / 2) + } + // load our signing privKey privKey, err := loadPrivateKey(c.String("keyfile")) if nil != err { @@ -60,9 +77,17 @@ func reseedAction(c *cli.Context) { netdb := reseed.NewLocalNetDb(netdbDir) // create a reseeder + intr, err := time.ParseDuration(c.String("interval")) + if nil != err { + log.Fatalf("'%s' is not a valid time duration\n", intr) + } + reseeder := reseed.NewReseeder(netdb) reseeder.SigningKey = privKey reseeder.SignerId = []byte("matt@drollette.com") + reseeder.NumRi = c.Int("numRI") + reseeder.RebuildInterval = intr + reseeder.Start() // create a server server := reseed.NewServer() diff --git a/reseed/server.go b/reseed/server.go index d1c20a0..1aea464 100644 --- a/reseed/server.go +++ b/reseed/server.go @@ -3,10 +3,10 @@ package reseed import ( "bytes" "crypto/tls" - "fmt" "io" "net/http" "os" + "strconv" "github.com/PuerkitoBio/throttled" "github.com/PuerkitoBio/throttled/store" @@ -28,7 +28,7 @@ func NewServer() *Server { h := &http.Server{TLSConfig: config} server := Server{h, nil} - th := throttled.RateLimit(throttled.PerHour(4), &throttled.VaryBy{RemoteAddr: true}, store.NewMemStore(10000)) + th := throttled.RateLimit(throttled.PerHour(120), &throttled.VaryBy{RemoteAddr: true}, store.NewMemStore(10000)) middlewareChain := alice.New(proxiedMiddleware, loggingMiddleware, verifyMiddleware, th.Throttle) @@ -39,26 +39,20 @@ func NewServer() *Server { return &server } -func (s *Server) indexHandler(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "Index") -} - func (s *Server) reseedHandler(w http.ResponseWriter, r *http.Request) { peer := s.Reseeder.Peer(r) - seeds, err := s.Reseeder.Seeds(peer) + su3, err := s.Reseeder.PeerSu3Bytes(peer) if nil != err { - http.Error(w, "500 Unable to provide seeds", http.StatusInternalServerError) + http.Error(w, "500 Unable to get SU3", http.StatusInternalServerError) return } - su3, err := s.Reseeder.CreateSu3(seeds) - if nil != err { - http.Error(w, "500 Unable to generate SU3", http.StatusInternalServerError) - return - } + w.Header().Set("Content-Disposition", "attachment; filename=i2pseeds.su3") + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", strconv.FormatInt(int64(len(su3)), 10)) - io.Copy(w, bytes.NewReader(su3.Bytes())) + io.Copy(w, bytes.NewReader(su3)) } func loggingMiddleware(next http.Handler) http.Handler { diff --git a/reseed/service.go b/reseed/service.go index b8782ae..2ba33dc 100644 --- a/reseed/service.go +++ b/reseed/service.go @@ -2,6 +2,8 @@ package reseed import ( "crypto/rsa" + "fmt" + "hash/crc32" "io/ioutil" "log" "math/rand" @@ -10,6 +12,7 @@ import ( "os" "path/filepath" "regexp" + "time" "github.com/MDrollette/go-i2p/su3" ) @@ -29,20 +32,110 @@ type Reseeder interface { Peer(r *http.Request) Peer // create an Su3 file from the given seeds CreateSu3(seeds Seeds) (*su3.Su3File, error) + // get signed su3 bytes for a peer + PeerSu3Bytes(peer Peer) ([]byte, error) } type ReseederImpl struct { - netdb NetDbProvider - SigningKey *rsa.PrivateKey - SignerId []byte + netdb NetDbProvider + su3s chan [][]byte + + SigningKey *rsa.PrivateKey + SignerId []byte + NumRi int + RebuildInterval time.Duration + numSu3 int } func NewReseeder(netdb NetDbProvider) *ReseederImpl { return &ReseederImpl{ - netdb: netdb, + netdb: netdb, + su3s: make(chan [][]byte), + NumRi: 50, + RebuildInterval: 12 * time.Hour, + numSu3: 50, } } +func (rs *ReseederImpl) Start() chan bool { + // atomic swapper + go func() { + var m [][]byte + for { + select { + case m = <-rs.su3s: + case rs.su3s <- m: + } + } + }() + + // init the cache + err := rs.rebuild() + if nil != err { + log.Println(err) + } + + ticker := time.NewTicker(rs.RebuildInterval) + quit := make(chan bool) + go func() { + for { + select { + case <-ticker.C: + err := rs.rebuild() + if nil != err { + log.Println(err) + } + case <-quit: + ticker.Stop() + return + } + } + }() + + return quit +} + +func (rs *ReseederImpl) rebuild() error { + log.Println("Rebuilding SU3s...") + ris, err := rs.netdb.RouterInfos() + if nil != err { + return fmt.Errorf("Unable to get routerInfos: %s", err) + } + + if rs.NumRi > len(ris) { + return fmt.Errorf("Not enough routerInfos.") + } + + newSu3s := make([][]byte, rs.numSu3) + for i := 0; i < rs.numSu3; i++ { + var seeds Seeds + for _, z := range rand.Perm(rs.NumRi) { + seeds = append(seeds, ris[z]) + } + + gs, err := rs.CreateSu3(seeds) + if nil != err { + return err + } + + newSu3s[i] = gs.Bytes() + } + + rs.su3s <- newSu3s + log.Println("Done rebuilding.") + + return nil +} + +func (rs *ReseederImpl) PeerSu3Bytes(peer Peer) ([]byte, error) { + hashMod := int(crc32.ChecksumIEEE([]byte(peer))) % rs.numSu3 + + m := <-rs.su3s + defer func() { rs.su3s <- m }() + + return m[hashMod], nil +} + func (rs *ReseederImpl) Seeds(p Peer) (Seeds, error) { all, err := rs.netdb.RouterInfos() if nil != err {