12 Commits

Author SHA1 Message Date
eyedeekay
61008ce988 fix test 2025-08-14 16:43:16 -04:00
eyedeekay
294cd52343 fix test 2025-08-14 16:40:12 -04:00
eyedeekay
322b431e8f move constants to own file 2025-08-14 16:34:01 -04:00
eyedeekay
79df514a2f Refactor ReadBuilRequestRecord 2025-08-14 16:27:11 -04:00
eyedeekay
36762c61c5 fix race in sntp 2025-08-14 16:22:20 -04:00
eyedeekay
a8ac47431e reorganize netDb directory 2025-08-14 16:15:46 -04:00
eyedeekay
bd947ec2ca gofmt 2025-08-14 15:58:59 -04:00
eyedeekay
61700c8a05 refactor long ReSeed function in lib/netdb 2025-08-14 15:53:38 -04:00
eyedeekay
d6b8918a14 refactor long ReadFrom and GetRouterInfo function in lib/netdb 2025-08-14 15:46:20 -04:00
eyedeekay
43aa3a72a6 refactor long WriteTo function in lib/netdb 2025-08-14 15:33:23 -04:00
eyedeekay
5d04d24bc5 Refactor long functions Lookup and findClosestPeers in the netdb package 2025-08-14 15:28:16 -04:00
eyedeekay
f168794ca3 start working on the netDb 2025-08-14 15:12:34 -04:00
14 changed files with 655 additions and 314 deletions

View File

@@ -177,94 +177,122 @@ var ERR_BUILD_REQUEST_RECORD_NOT_ENOUGH_DATA = oops.Errorf("not enough i2np buil
func ReadBuildRequestRecord(data []byte) (BuildRequestRecord, error) {
log.Debug("Reading BuildRequestRecord")
build_request_record := BuildRequestRecord{}
record := BuildRequestRecord{}
if err := parseTunnelIdentifiers(data, &record); err != nil {
return record, err
}
if err := parseSessionKeys(data, &record); err != nil {
return record, err
}
if err := parseMetadata(data, &record); err != nil {
return record, err
}
log.Debug("BuildRequestRecord read successfully")
return record, nil
}
// parseTunnelIdentifiers extracts tunnel and identity information from the record data.
func parseTunnelIdentifiers(data []byte, record *BuildRequestRecord) error {
receive_tunnel, err := readBuildRequestRecordReceiveTunnel(data)
if err != nil {
log.WithError(err).Error("Failed to read ReceiveTunnel")
return build_request_record, err
return err
}
build_request_record.ReceiveTunnel = receive_tunnel
record.ReceiveTunnel = receive_tunnel
our_ident, err := readBuildRequestRecordOurIdent(data)
if err != nil {
log.WithError(err).Error("Failed to read OurIdent")
return build_request_record, err
return err
}
build_request_record.OurIdent = our_ident
record.OurIdent = our_ident
next_tunnel, err := readBuildRequestRecordNextTunnel(data)
if err != nil {
log.WithError(err).Error("Failed to read NextTunnel")
return build_request_record, err
return err
}
build_request_record.NextTunnel = next_tunnel
record.NextTunnel = next_tunnel
next_ident, err := readBuildRequestRecordNextIdent(data)
if err != nil {
log.WithError(err).Error("Failed to read NextIdent")
return build_request_record, err
return err
}
build_request_record.NextIdent = next_ident
record.NextIdent = next_ident
return nil
}
// parseSessionKeys extracts all cryptographic keys from the record data.
func parseSessionKeys(data []byte, record *BuildRequestRecord) error {
layer_key, err := readBuildRequestRecordLayerKey(data)
if err != nil {
log.WithError(err).Error("Failed to read LayerKey")
return build_request_record, err
return err
}
build_request_record.LayerKey = layer_key
record.LayerKey = layer_key
iv_key, err := readBuildRequestRecordIVKey(data)
if err != nil {
log.WithError(err).Error("Failed to read IVKey")
return build_request_record, err
return err
}
build_request_record.IVKey = iv_key
record.IVKey = iv_key
reply_key, err := readBuildRequestRecordReplyKey(data)
if err != nil {
log.WithError(err).Error("Failed to read ReplyKey")
return build_request_record, err
return err
}
build_request_record.ReplyKey = reply_key
record.ReplyKey = reply_key
reply_iv, err := readBuildRequestRecordReplyIV(data)
if err != nil {
log.WithError(err).Error("Failed to read ReplyIV")
return build_request_record, err
return err
}
build_request_record.ReplyIV = reply_iv
record.ReplyIV = reply_iv
return nil
}
// parseMetadata extracts flags, timestamps, and padding from the record data.
func parseMetadata(data []byte, record *BuildRequestRecord) error {
flag, err := readBuildRequestRecordFlag(data)
if err != nil {
log.WithError(err).Error("Failed to read Flag")
return build_request_record, err
return err
}
build_request_record.Flag = flag
record.Flag = flag
request_time, err := readBuildRequestRecordRequestTime(data)
if err != nil {
log.WithError(err).Error("Failed to read RequestTime")
return build_request_record, err
return err
}
build_request_record.RequestTime = request_time
record.RequestTime = request_time
send_message_id, err := readBuildRequestRecordSendMessageID(data)
if err != nil {
log.WithError(err).Error("Failed to read SendMessageID")
return build_request_record, err
return err
}
build_request_record.SendMessageID = send_message_id
record.SendMessageID = send_message_id
padding, err := readBuildRequestRecordPadding(data)
if err != nil {
log.WithError(err).Error("Failed to read Padding")
return build_request_record, err
return err
}
build_request_record.Padding = padding
record.Padding = padding
log.Debug("BuildRequestRecord read successfully")
return build_request_record, nil
return nil
}
func readBuildRequestRecordReceiveTunnel(data []byte) (tunnel.TunnelID, error) {

18
lib/i2np/constants.go Normal file
View File

@@ -0,0 +1,18 @@
package i2np
// I2NP Message Type Constants
// Moved from: header.go
const (
I2NP_MESSAGE_TYPE_DATABASE_STORE = 1
I2NP_MESSAGE_TYPE_DATABASE_LOOKUP = 2
I2NP_MESSAGE_TYPE_DATABASE_SEARCH_REPLY = 3
I2NP_MESSAGE_TYPE_DELIVERY_STATUS = 10
I2NP_MESSAGE_TYPE_GARLIC = 11
I2NP_MESSAGE_TYPE_TUNNEL_DATA = 18
I2NP_MESSAGE_TYPE_TUNNEL_GATEWAY = 19
I2NP_MESSAGE_TYPE_DATA = 20
I2NP_MESSAGE_TYPE_TUNNEL_BUILD = 21
I2NP_MESSAGE_TYPE_TUNNEL_BUILD_REPLY = 22
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD = 23
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD_REPLY = 24
)

View File

@@ -61,21 +61,6 @@ data ::
purpose -> actual message contents
*/
const (
I2NP_MESSAGE_TYPE_DATABASE_STORE = 1
I2NP_MESSAGE_TYPE_DATABASE_LOOKUP = 2
I2NP_MESSAGE_TYPE_DATABASE_SEARCH_REPLY = 3
I2NP_MESSAGE_TYPE_DELIVERY_STATUS = 10
I2NP_MESSAGE_TYPE_GARLIC = 11
I2NP_MESSAGE_TYPE_TUNNEL_DATA = 18
I2NP_MESSAGE_TYPE_TUNNEL_GATEWAY = 19
I2NP_MESSAGE_TYPE_DATA = 20
I2NP_MESSAGE_TYPE_TUNNEL_BUILD = 21
I2NP_MESSAGE_TYPE_TUNNEL_BUILD_REPLY = 22
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD = 23
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD_REPLY = 24
)
type I2NPNTCPHeader struct {
Type int
MessageID int

View File

@@ -241,7 +241,8 @@ func (mr *MessageRouter) RouteTunnelMessage(msg interface{}) error {
// CreateTunnelRecord creates a build request record with interface methods
func CreateTunnelRecord(receiveTunnel, nextTunnel tunnel.TunnelID,
ourIdent, nextIdent common.Hash) TunnelIdentifier {
ourIdent, nextIdent common.Hash,
) TunnelIdentifier {
return &BuildRequestRecord{
ReceiveTunnel: receiveTunnel,
NextTunnel: nextTunnel,

View File

@@ -117,9 +117,9 @@ func (ks *RouterInfoKeystore) KeyID() string {
return "fallback-" + hex.EncodeToString(randomBytes)
}
if len(public.Bytes()) > 10 {
return string(public.Bytes()[:10])
return hex.EncodeToString(public.Bytes()[:10])
}
return string(public.Bytes())
return hex.EncodeToString(public.Bytes())
}
return ks.name
}

5
lib/netdb/constants.go Normal file
View File

@@ -0,0 +1,5 @@
package netdb
// Moved from: std.go
// name of file to hold precomputed size of netdb
const CacheFileName = "sizecache.txt"

View File

@@ -16,108 +16,179 @@ type Entry struct {
*lease_set.LeaseSet
}
func (e *Entry) WriteTo(w io.Writer) (err error) {
// Check if we have a RouterInfo to write
// WriteTo writes the Entry to the provided writer.
func (e *Entry) WriteTo(w io.Writer) error {
if e.RouterInfo != nil {
// Get the serialized bytes of the RouterInfo
data, err := e.RouterInfo.Bytes()
if err != nil {
return fmt.Errorf("failed to serialize RouterInfo: %w", err)
}
// Check if the data is empty
if len(data) == 0 {
return fmt.Errorf("RouterInfo data is empty")
}
// Write the entry type indicator (1 for RouterInfo)
if _, err = w.Write([]byte{1}); err != nil {
return fmt.Errorf("failed to write entry type: %w", err)
}
// Write the length as a 2-byte big-endian value
lenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(lenBytes, uint16(len(data)))
if _, err = w.Write(lenBytes); err != nil {
return fmt.Errorf("failed to write length: %w", err)
}
// Write the actual RouterInfo data
if _, err = w.Write(data); err != nil {
return fmt.Errorf("failed to write RouterInfo data: %w", err)
}
return nil
return e.writeRouterInfo(w)
}
// Check if we have a LeaseSet to write
if e.LeaseSet != nil {
// Get the serialized bytes of the LeaseSet
data, err := e.LeaseSet.Bytes()
// Write the entry type indicator (2 for LeaseSet)
if _, err = w.Write([]byte{2}); err != nil {
return fmt.Errorf("failed to write entry type: %w", err)
}
// Write the length as a 2-byte big-endian value
lenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(lenBytes, uint16(len(data)))
if _, err = w.Write(lenBytes); err != nil {
return fmt.Errorf("failed to write length: %w", err)
}
// Write the actual LeaseSet data
if _, err = w.Write(data); err != nil {
return fmt.Errorf("failed to write LeaseSet data: %w", err)
}
return nil
return e.writeLeaseSet(w)
}
return fmt.Errorf("entry contains neither RouterInfo nor LeaseSet")
}
func (e *Entry) ReadFrom(r io.Reader) (err error) {
// Read the entry type indicator
typeBytes := make([]byte, 1)
if _, err = r.Read(typeBytes); err != nil {
return fmt.Errorf("failed to read entry type: %w", err)
// writeRouterInfo writes a RouterInfo entry to the writer.
func (e *Entry) writeRouterInfo(w io.Writer) error {
data, err := e.serializeRouterInfo()
if err != nil {
return err
}
// Read the length
lenBytes := make([]byte, 2)
if _, err = r.Read(lenBytes); err != nil {
return fmt.Errorf("failed to read length: %w", err)
}
dataLen := binary.BigEndian.Uint16(lenBytes)
return e.writeEntryData(w, 1, data)
}
// Read the entry data
data := make([]byte, dataLen)
if _, err = io.ReadFull(r, data); err != nil {
return fmt.Errorf("failed to read entry data: %w", err)
// writeLeaseSet writes a LeaseSet entry to the writer.
func (e *Entry) writeLeaseSet(w io.Writer) error {
data, err := e.serializeLeaseSet()
if err != nil {
return err
}
// Process based on entry type
switch typeBytes[0] {
case 1: // RouterInfo
ri, _, err := router_info.ReadRouterInfo(data)
if err != nil {
return fmt.Errorf("failed to parse RouterInfo: %w", err)
}
e.RouterInfo = &ri
e.LeaseSet = nil
return e.writeEntryData(w, 2, data)
}
case 2: // LeaseSet
ls, err := lease_set.ReadLeaseSet(data)
if err != nil {
return fmt.Errorf("failed to parse LeaseSet: %w", err)
}
e.LeaseSet = &ls
e.RouterInfo = nil
default:
return fmt.Errorf("unknown entry type: %d", typeBytes[0])
// serializeRouterInfo serializes the RouterInfo and validates the result.
func (e *Entry) serializeRouterInfo() ([]byte, error) {
data, err := e.RouterInfo.Bytes()
if err != nil {
return nil, fmt.Errorf("failed to serialize RouterInfo: %w", err)
}
if len(data) == 0 {
return nil, fmt.Errorf("RouterInfo data is empty")
}
return data, nil
}
// serializeLeaseSet serializes the LeaseSet and validates the result.
func (e *Entry) serializeLeaseSet() ([]byte, error) {
data, err := e.LeaseSet.Bytes()
if err != nil {
return nil, fmt.Errorf("failed to serialize LeaseSet: %w", err)
}
return data, nil
}
// writeEntryData writes entry data with type indicator and length prefix.
func (e *Entry) writeEntryData(w io.Writer, entryType byte, data []byte) error {
if err := e.writeEntryType(w, entryType); err != nil {
return err
}
if err := e.writeDataLength(w, len(data)); err != nil {
return err
}
return e.writeData(w, data)
}
// writeEntryType writes the entry type indicator.
func (e *Entry) writeEntryType(w io.Writer, entryType byte) error {
if _, err := w.Write([]byte{entryType}); err != nil {
return fmt.Errorf("failed to write entry type: %w", err)
}
return nil
}
// writeDataLength writes the data length as a 2-byte big-endian value.
func (e *Entry) writeDataLength(w io.Writer, length int) error {
lenBytes := make([]byte, 2)
binary.BigEndian.PutUint16(lenBytes, uint16(length))
if _, err := w.Write(lenBytes); err != nil {
return fmt.Errorf("failed to write length: %w", err)
}
return nil
}
// writeData writes the actual entry data.
func (e *Entry) writeData(w io.Writer, data []byte) error {
if _, err := w.Write(data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
return nil
}
func (e *Entry) ReadFrom(r io.Reader) (err error) {
entryType, err := e.readEntryType(r)
if err != nil {
return err
}
data, err := e.readEntryData(r)
if err != nil {
return err
}
return e.processEntryData(entryType, data)
}
// readEntryType reads and returns the entry type indicator from the reader.
func (e *Entry) readEntryType(r io.Reader) (byte, error) {
typeBytes := make([]byte, 1)
if _, err := r.Read(typeBytes); err != nil {
return 0, fmt.Errorf("failed to read entry type: %w", err)
}
return typeBytes[0], nil
}
// readEntryData reads the length and data from the reader.
func (e *Entry) readEntryData(r io.Reader) ([]byte, error) {
dataLen, err := e.readDataLength(r)
if err != nil {
return nil, err
}
data := make([]byte, dataLen)
if _, err = io.ReadFull(r, data); err != nil {
return nil, fmt.Errorf("failed to read entry data: %w", err)
}
return data, nil
}
// readDataLength reads and returns the data length from the reader.
func (e *Entry) readDataLength(r io.Reader) (uint16, error) {
lenBytes := make([]byte, 2)
if _, err := r.Read(lenBytes); err != nil {
return 0, fmt.Errorf("failed to read length: %w", err)
}
return binary.BigEndian.Uint16(lenBytes), nil
}
// processEntryData processes the entry data based on the entry type.
func (e *Entry) processEntryData(entryType byte, data []byte) error {
switch entryType {
case 1: // RouterInfo
return e.processRouterInfoData(data)
case 2: // LeaseSet
return e.processLeaseSetData(data)
default:
return fmt.Errorf("unknown entry type: %d", entryType)
}
}
// processRouterInfoData processes RouterInfo data and sets the entry.
func (e *Entry) processRouterInfoData(data []byte) error {
ri, _, err := router_info.ReadRouterInfo(data)
if err != nil {
return fmt.Errorf("failed to parse RouterInfo: %w", err)
}
e.RouterInfo = &ri
e.LeaseSet = nil
return nil
}
// processLeaseSetData processes LeaseSet data and sets the entry.
func (e *Entry) processLeaseSetData(data []byte) error {
ls, err := lease_set.ReadLeaseSet(data)
if err != nil {
return fmt.Errorf("failed to parse LeaseSet: %w", err)
}
e.LeaseSet = &ls
e.RouterInfo = nil
return nil
}

View File

@@ -21,6 +21,12 @@ type KademliaResolver struct {
pool *tunnel.Pool
}
// peerDistance represents a peer with its calculated XOR distance
type peerDistance struct {
hash common.Hash
distance []byte
}
func (kr *KademliaResolver) Lookup(h common.Hash, timeout time.Duration) (*router_info.RouterInfo, error) {
log.WithFields(logrus.Fields{
"hash": h,
@@ -32,24 +38,48 @@ func (kr *KademliaResolver) Lookup(h common.Hash, timeout time.Duration) (*route
defer cancel()
// Try local lookup first
ri := kr.NetworkDatabase.GetRouterInfo(h)
if &ri == nil {
log.WithField("hash", h).Debug("RouterInfo found locally")
return &ri, nil
if ri := kr.attemptLocalLookup(h); ri != nil {
return ri, nil
}
// If we don't have a tunnel pool, we can't do remote lookups
// Technically, we could do a direct lookup, but that would require
// the transport anyway which is what I'm working on now.
if kr.pool == nil {
return nil, fmt.Errorf("tunnel pool required for remote lookups")
// Validate remote lookup capability
if err := kr.validateRemoteLookupCapability(); err != nil {
return nil, err
}
// Set up channels for the result and errors
resultChan := make(chan *router_info.RouterInfo, 1)
errChan := make(chan error, 1)
// Start the Kademlia lookup in a goroutine
// Start the remote lookup process
kr.performRemoteLookup(ctx, h, timeout, resultChan, errChan)
// Wait for result, error, or timeout
return kr.collectLookupResult(resultChan, errChan, ctx, timeout)
}
// attemptLocalLookup tries to find the RouterInfo locally first.
func (kr *KademliaResolver) attemptLocalLookup(h common.Hash) *router_info.RouterInfo {
ri := kr.NetworkDatabase.GetRouterInfo(h)
// Check if the RouterInfo is valid by comparing with an empty hash
var emptyHash common.Hash
if ri.IdentHash() != emptyHash {
log.WithField("hash", h).Debug("RouterInfo found locally")
return &ri
}
return nil
}
// validateRemoteLookupCapability checks if remote lookups are possible.
func (kr *KademliaResolver) validateRemoteLookupCapability() error {
if kr.pool == nil {
return fmt.Errorf("tunnel pool required for remote lookups")
}
return nil
}
// performRemoteLookup starts the Kademlia lookup process in a goroutine.
func (kr *KademliaResolver) performRemoteLookup(ctx context.Context, h common.Hash, timeout time.Duration, resultChan chan *router_info.RouterInfo, errChan chan error) {
go func() {
// Find the closest peers we know to the target hash
closestPeers := kr.findClosestPeers(h)
@@ -59,33 +89,45 @@ func (kr *KademliaResolver) Lookup(h common.Hash, timeout time.Duration) (*route
}
// Query each closest peer in parallel
for _, peer := range closestPeers {
go func(p common.Hash) {
ri, err := kr.queryPeer(ctx, p, h)
if err != nil {
log.WithFields(logrus.Fields{
"peer": p,
"error": err,
}).Debug("Peer query failed")
return
}
if ri != nil {
resultChan <- ri
}
}(peer)
}
kr.queryClosestPeers(ctx, closestPeers, h, resultChan)
// Allow some time for queries to complete, but eventually give up
select {
case <-time.After(timeout - time.Second): // Leave 1s buffer for the main select
errChan <- fmt.Errorf("kademlia lookup failed to find router info")
case <-ctx.Done():
// Context was already canceled, main select will handle it
}
kr.handleLookupTimeout(ctx, timeout, errChan)
}()
}
// Wait for result, error, or timeout
// queryClosestPeers queries each of the closest peers in parallel.
func (kr *KademliaResolver) queryClosestPeers(ctx context.Context, peers []common.Hash, target common.Hash, resultChan chan *router_info.RouterInfo) {
for _, peer := range peers {
go func(p common.Hash) {
ri, err := kr.queryPeer(ctx, p, target)
if err != nil {
log.WithFields(logrus.Fields{
"peer": p,
"error": err,
}).Debug("Peer query failed")
return
}
if ri != nil {
resultChan <- ri
}
}(peer)
}
}
// handleLookupTimeout manages the timeout for the lookup operation.
func (kr *KademliaResolver) handleLookupTimeout(ctx context.Context, timeout time.Duration, errChan chan error) {
select {
case <-time.After(timeout - time.Second): // Leave 1s buffer for the main select
errChan <- fmt.Errorf("kademlia lookup failed to find router info")
case <-ctx.Done():
// Context was already canceled, main select will handle it
}
}
// collectLookupResult waits for and processes the lookup result.
func (kr *KademliaResolver) collectLookupResult(resultChan chan *router_info.RouterInfo, errChan chan error, ctx context.Context, timeout time.Duration) (*router_info.RouterInfo, error) {
select {
case result := <-resultChan:
// Store the result in our local database
@@ -109,12 +151,19 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
return []common.Hash{}
}
// Calculate XOR distance for each peer
type peerDistance struct {
hash common.Hash
distance []byte
// Calculate XOR distances for all peers
peers := kr.calculatePeerDistances(allRouterInfos, target)
if len(peers) == 0 {
log.Debug("No suitable peers found after filtering")
return []common.Hash{}
}
// Sort and select closest peers
return kr.selectClosestPeers(peers, target, K)
}
// calculatePeerDistances calculates XOR distances for all router infos.
func (kr *KademliaResolver) calculatePeerDistances(allRouterInfos []router_info.RouterInfo, target common.Hash) []peerDistance {
peers := make([]peerDistance, 0, len(allRouterInfos))
for _, ri := range allRouterInfos {
@@ -126,10 +175,7 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
}
// Calculate XOR distance between target and peer
distance := make([]byte, len(target))
for i := 0; i < len(target); i++ {
distance[i] = target[i] ^ peerHash[i]
}
distance := kr.calculateXORDistance(target, peerHash)
peers = append(peers, peerDistance{
hash: peerHash,
@@ -137,23 +183,23 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
})
}
if len(peers) == 0 {
log.Debug("No suitable peers found after filtering")
return []common.Hash{}
}
return peers
}
// calculateXORDistance calculates the XOR distance between two hashes.
func (kr *KademliaResolver) calculateXORDistance(target, peer common.Hash) []byte {
distance := make([]byte, len(target))
for i := 0; i < len(target); i++ {
distance[i] = target[i] ^ peer[i]
}
return distance
}
// selectClosestPeers sorts peers by distance and returns the K closest ones.
func (kr *KademliaResolver) selectClosestPeers(peers []peerDistance, target common.Hash, K int) []common.Hash {
// Sort peers by XOR distance (closest first)
sort.Slice(peers, func(i, j int) bool {
// Compare distances byte by byte (big endian comparison)
for k := 0; k < len(peers[i].distance); k++ {
if peers[i].distance[k] < peers[j].distance[k] {
return true
}
if peers[i].distance[k] > peers[j].distance[k] {
return false
}
}
return false // Equal distances
return kr.compareDistances(peers[i].distance, peers[j].distance)
})
// Take up to K closest peers
@@ -176,6 +222,19 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
return result
}
// compareDistances compares two distance byte arrays (big endian comparison).
func (kr *KademliaResolver) compareDistances(dist1, dist2 []byte) bool {
for k := 0; k < len(dist1); k++ {
if dist1[k] < dist2[k] {
return true
}
if dist1[k] > dist2[k] {
return false
}
}
return false // Equal distances
}
// queryPeer sends a lookup request to a specific peer through the tunnel
func (kr *KademliaResolver) queryPeer(ctx context.Context, peer common.Hash, target common.Hash) (*router_info.RouterInfo, error) {
// This would send a DatabaseLookup message through the tunnel to the peer
@@ -187,14 +246,3 @@ func (kr *KademliaResolver) queryPeer(ctx context.Context, peer common.Hash, tar
// Placeholder implementation that would need to be completed
return nil, fmt.Errorf("peer query not implemented")
}
// create a new resolver that stores result into a NetworkDatabase and uses a tunnel pool for the lookup
func NewKademliaResolver(netDb NetworkDatabase, pool *tunnel.Pool) (r Resolver) {
if pool != nil && netDb != nil {
r = &KademliaResolver{
NetworkDatabase: netDb,
pool: pool,
}
}
return
}

View File

@@ -0,0 +1,17 @@
package netdb
import (
"github.com/go-i2p/go-i2p/lib/tunnel"
)
// Moved from: kad.go
// NewKademliaResolver creates a new resolver that stores result into a NetworkDatabase and uses a tunnel pool for the lookup
func NewKademliaResolver(netDb NetworkDatabase, pool *tunnel.Pool) (r Resolver) {
if pool != nil && netDb != nil {
r = &KademliaResolver{
NetworkDatabase: netDb,
pool: pool,
}
}
return
}

View File

@@ -9,6 +9,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"github.com/go-i2p/logger"
"github.com/sirupsen/logrus"
@@ -28,51 +29,85 @@ var log = logger.GetGoI2PLogger()
type StdNetDB struct {
DB string
RouterInfos map[common.Hash]Entry
riMutex sync.Mutex // mutex for RouterInfos
LeaseSets map[common.Hash]Entry
lsMutex sync.Mutex // mutex for LeaseSets
}
func NewStdNetDB(db string) StdNetDB {
func NewStdNetDB(db string) *StdNetDB {
log.WithField("db_path", db).Debug("Creating new StdNetDB")
return StdNetDB{
return &StdNetDB{
DB: db,
RouterInfos: make(map[common.Hash]Entry),
riMutex: sync.Mutex{},
LeaseSets: make(map[common.Hash]Entry),
lsMutex: sync.Mutex{},
}
}
func (db *StdNetDB) GetRouterInfo(hash common.Hash) (chnl chan router_info.RouterInfo) {
log.WithField("hash", hash).Debug("Getting RouterInfo")
// Check memory cache first
if ri, ok := db.RouterInfos[hash]; ok {
log.Debug("RouterInfo found in memory cache")
chnl = make(chan router_info.RouterInfo)
chnl <- *ri.RouterInfo
return
}
// Load from file
data, err := db.loadRouterInfoFromFile(hash)
if err != nil {
log.WithError(err).Error("Failed to load RouterInfo from file")
return nil
}
chnl = make(chan router_info.RouterInfo)
ri, err := db.parseAndCacheRouterInfo(hash, data)
if err != nil {
log.WithError(err).Error("Failed to parse RouterInfo")
return
}
chnl <- ri
return
}
// loadRouterInfoFromFile loads RouterInfo data from the skiplist file.
func (db *StdNetDB) loadRouterInfoFromFile(hash common.Hash) ([]byte, error) {
fname := db.SkiplistFile(hash)
buff := new(bytes.Buffer)
if f, err := os.Open(fname); err != nil {
log.WithError(err).Error("Failed to open RouterInfo file")
return nil
} else {
if _, err := io.Copy(buff, f); err != nil {
log.WithError(err).Error("Failed to read RouterInfo file")
return nil
}
defer f.Close()
f, err := os.Open(fname)
if err != nil {
return nil, fmt.Errorf("failed to open RouterInfo file: %w", err)
}
chnl = make(chan router_info.RouterInfo)
ri, _, err := router_info.ReadRouterInfo(buff.Bytes())
if err == nil {
if _, ok := db.RouterInfos[hash]; !ok {
log.Debug("Adding RouterInfo to memory cache")
db.RouterInfos[hash] = Entry{
RouterInfo: &ri,
}
}
chnl <- ri
} else {
log.WithError(err).Error("Failed to parse RouterInfo")
defer f.Close()
if _, err := io.Copy(buff, f); err != nil {
return nil, fmt.Errorf("failed to read RouterInfo file: %w", err)
}
return
return buff.Bytes(), nil
}
// parseAndCacheRouterInfo parses RouterInfo data and adds it to the memory cache.
func (db *StdNetDB) parseAndCacheRouterInfo(hash common.Hash, data []byte) (router_info.RouterInfo, error) {
ri, _, err := router_info.ReadRouterInfo(data)
if err != nil {
return router_info.RouterInfo{}, fmt.Errorf("failed to parse RouterInfo: %w", err)
}
// Add to cache if not already present
if _, ok := db.RouterInfos[hash]; !ok {
log.Debug("Adding RouterInfo to memory cache")
db.RouterInfos[hash] = Entry{
RouterInfo: &ri,
}
}
return ri, nil
}
func (db *StdNetDB) GetAllRouterInfos() (ri []router_info.RouterInfo) {
@@ -127,9 +162,6 @@ func (db *StdNetDB) Size() (routers int) {
return
}
// name of file to hold precomputed size of netdb
const CacheFileName = "sizecache.txt"
// get filepath for storing netdb info cache
func (db *StdNetDB) cacheFilePath() string {
return filepath.Join(db.Path(), CacheFileName)
@@ -150,71 +182,133 @@ func (db *StdNetDB) CheckFilePathValid(fpath string) bool {
func (db *StdNetDB) RecalculateSize() (err error) {
log.Debug("Recalculating NetDB size")
count := 0
err = filepath.Walk(db.Path(), func(fname string, info os.FileInfo, err error) error {
if info.IsDir() {
if !strings.HasPrefix(fname, db.Path()) {
if db.Path() == fname {
log.Debug("Reached end of NetDB directory")
log.Debug("path==name time to exit")
return nil
}
log.Debug("Outside of netDb dir time to exit", db.Path(), " ", fname)
return err
}
return err
}
if db.CheckFilePathValid(fname) {
log.WithField("file_name", fname).Debug("Reading RouterInfo file")
log.Println("Reading in file:", fname)
b, err := os.ReadFile(fname)
if err != nil {
log.WithError(err).Error("Failed to read RouterInfo file")
return err
}
ri, _, err := router_info.ReadRouterInfo(b)
if err != nil {
log.WithError(err).Error("Failed to parse RouterInfo")
return err
}
ih := ri.IdentHash().Bytes()
log.Printf("Read in IdentHash: %s", base32.EncodeToString(ih[:]))
for _, addr := range ri.RouterAddresses() {
log.Println(string(addr.Bytes()))
log.WithField("address", string(addr.Bytes())).Debug("RouterInfo address")
}
if ent, ok := db.RouterInfos[ih]; !ok {
log.Debug("Adding new RouterInfo to memory cache")
db.RouterInfos[ri.IdentHash()] = Entry{
RouterInfo: &ri,
}
} else {
log.Debug("RouterInfo already in memory cache")
log.Println("entry previously found in table", ent, fname)
}
ri = router_info.RouterInfo{}
count++
} else {
log.WithField("file_path", fname).Warn("Invalid file path")
log.Println("Invalid path error")
}
// Walk through all files and count valid RouterInfos
count, err = db.countValidRouterInfos()
if err != nil {
log.WithError(err).Error("Failed to count RouterInfos")
return err
})
if err == nil {
log.WithField("count", count).Debug("Finished recalculating NetDB size")
str := fmt.Sprintf("%d", count)
var f *os.File
f, err = os.OpenFile(db.cacheFilePath(), os.O_CREATE|os.O_WRONLY, 0o600)
if err == nil {
_, err = io.WriteString(f, str)
f.Close()
log.Debug("Updated NetDB size cache file")
} else {
log.WithError(err).Error("Failed to update NetDB size cache file")
}
} else {
}
// Update the cache file with the count
err = db.updateSizeCache(count)
if err != nil {
log.WithError(err).Error("Failed to update NetDB size cache file")
}
return
return err
}
// countValidRouterInfos walks through the database directory and counts valid RouterInfo files.
func (db *StdNetDB) countValidRouterInfos() (int, error) {
count := 0
err := filepath.Walk(db.Path(), func(fname string, info os.FileInfo, err error) error {
return db.processWalkEntry(fname, info, err, &count)
})
if err == nil {
log.WithField("count", count).Debug("Finished counting RouterInfos")
}
return count, err
}
// processWalkEntry handles each entry encountered during the directory walk.
func (db *StdNetDB) processWalkEntry(fname string, info os.FileInfo, err error, count *int) error {
if info.IsDir() {
return db.handleDirectoryWalk(fname, err)
}
if db.CheckFilePathValid(fname) {
if err := db.processRouterInfoFile(fname, count); err != nil {
return err
}
} else {
log.WithField("file_path", fname).Warn("Invalid file path")
log.Println("Invalid path error")
}
return err
}
// handleDirectoryWalk processes directory entries during the walk.
func (db *StdNetDB) handleDirectoryWalk(fname string, err error) error {
if !strings.HasPrefix(fname, db.Path()) {
if db.Path() == fname {
log.Debug("Reached end of NetDB directory")
log.Debug("path==name time to exit")
return nil
}
log.Debug("Outside of netDb dir time to exit", db.Path(), " ", fname)
return err
}
return err
}
// processRouterInfoFile reads and validates a single RouterInfo file.
func (db *StdNetDB) processRouterInfoFile(fname string, count *int) error {
log.WithField("file_name", fname).Debug("Reading RouterInfo file")
log.Println("Reading in file:", fname)
b, err := os.ReadFile(fname)
if err != nil {
log.WithError(err).Error("Failed to read RouterInfo file")
return err
}
ri, _, err := router_info.ReadRouterInfo(b)
if err != nil {
log.WithError(err).Error("Failed to parse RouterInfo")
return err
}
// Process the RouterInfo
db.logRouterInfoDetails(ri)
db.cacheRouterInfo(ri, fname)
(*count)++
return nil
}
// logRouterInfoDetails logs details about the RouterInfo for debugging.
func (db *StdNetDB) logRouterInfoDetails(ri router_info.RouterInfo) {
ih := ri.IdentHash().Bytes()
log.Printf("Read in IdentHash: %s", base32.EncodeToString(ih[:]))
for _, addr := range ri.RouterAddresses() {
log.Println(string(addr.Bytes()))
log.WithField("address", string(addr.Bytes())).Debug("RouterInfo address")
}
}
// cacheRouterInfo adds the RouterInfo to the in-memory cache if not already present.
func (db *StdNetDB) cacheRouterInfo(ri router_info.RouterInfo, fname string) {
ih := ri.IdentHash()
if ent, ok := db.RouterInfos[ih]; !ok {
log.Debug("Adding new RouterInfo to memory cache")
db.RouterInfos[ri.IdentHash()] = Entry{
RouterInfo: &ri,
}
} else {
log.Debug("RouterInfo already in memory cache")
log.Println("entry previously found in table", ent, fname)
}
}
// updateSizeCache writes the count to the cache file.
func (db *StdNetDB) updateSizeCache(count int) error {
str := fmt.Sprintf("%d", count)
f, err := os.OpenFile(db.cacheFilePath(), os.O_CREATE|os.O_WRONLY, 0o600)
if err != nil {
return err
}
defer f.Close()
_, err = io.WriteString(f, str)
if err == nil {
log.Debug("Updated NetDB size cache file")
}
return err
}
// return true if the network db directory exists and is writable
@@ -273,26 +367,50 @@ func (db *StdNetDB) Save() (err error) {
// reseed if we have less than minRouters known routers
// returns error if reseed failed
func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) {
if !db.isReseedRequired(minRouters) {
return nil
}
peers, err := db.retrievePeersFromBootstrap(b)
if err != nil {
return err
}
count := db.addNewRouterInfos(peers)
log.WithField("added_routers", count).Info("Reseed completed successfully")
return db.updateCacheAfterReseed()
}
// isReseedRequired checks if reseed is necessary based on current database size.
func (db *StdNetDB) isReseedRequired(minRouters int) bool {
log.WithField("min_routers", minRouters).Debug("Checking if reseed is necessary")
if db.Size() > minRouters {
log.Debug("Reseed not necessary")
return nil
return false
}
log.Warn("NetDB size below minimum, reseed required")
return true
}
// retrievePeersFromBootstrap gets peers from the bootstrap provider with timeout.
func (db *StdNetDB) retrievePeersFromBootstrap(b bootstrap.Bootstrap) ([]router_info.RouterInfo, error) {
ctx, cancel := context.WithTimeout(context.Background(), reseed.DefaultDialTimeout)
defer cancel()
// Get peers from the bootstrap provider
peersChan, err := b.GetPeers(ctx, 0) // Get as many peers as possible
if err != nil {
log.WithError(err).Error("Failed to get peers from bootstrap provider")
return fmt.Errorf("bootstrap failed: %w", err)
return nil, fmt.Errorf("bootstrap failed: %w", err)
}
// Process the received peers
return peersChan, nil
}
// addNewRouterInfos processes and adds new RouterInfos from peers to the database.
func (db *StdNetDB) addNewRouterInfos(peers []router_info.RouterInfo) int {
count := 0
for _, ri := range peersChan {
for _, ri := range peers {
hash := ri.IdentHash()
if _, exists := db.RouterInfos[hash]; !exists {
log.WithField("hash", hash).Debug("Adding new RouterInfo from reseed")
@@ -302,15 +420,15 @@ func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) {
count++
}
}
return count
}
log.WithField("added_routers", count).Info("Reseed completed successfully")
// Update the size cache
err = db.RecalculateSize()
// updateCacheAfterReseed updates the size cache after successful reseed operation.
func (db *StdNetDB) updateCacheAfterReseed() error {
err := db.RecalculateSize()
if err != nil {
log.WithError(err).Warn("Failed to update NetDB size cache after reseed")
}
return nil
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/go-i2p/go-i2p/lib/bootstrap"
)
// Moved from: netdb.go
// resolves unknown RouterInfos given the hash of their RouterIdentity
type Resolver interface {
// resolve a router info by hash
@@ -15,6 +16,7 @@ type Resolver interface {
Lookup(hash common.Hash, timeout time.Duration) (*router_info.RouterInfo, error)
}
// Moved from: netdb.go
// i2p network database, storage of i2p RouterInfos
type NetworkDatabase interface {
// obtain a RouterInfo by its hash locally

View File

@@ -27,10 +27,10 @@ type Router struct {
*keys.RouterInfoKeystore
// multi-transport manager
*transport.TransportMuxer
// netdb
*netdb.StdNetDB
// router configuration
cfg *config.RouterConfig
// netdb
ndb netdb.StdNetDB
// close channel
closeChnl chan bool
// running flag
@@ -160,27 +160,27 @@ func (r *Router) Start() {
// run i2p router mainloop
func (r *Router) mainloop() {
log.Debug("Entering router mainloop")
r.ndb = netdb.NewStdNetDB(r.cfg.NetDb.Path)
r.StdNetDB = netdb.NewStdNetDB(r.cfg.NetDb.Path)
log.WithField("netdb_path", r.cfg.NetDb.Path).Debug("Created StdNetDB")
// make sure the netdb is ready
var e error
if err := r.ndb.Ensure(); err != nil {
if err := r.StdNetDB.Ensure(); err != nil {
e = err
log.WithError(err).Error("Failed to ensure NetDB")
}
if sz := r.ndb.Size(); sz >= 0 {
if sz := r.StdNetDB.Size(); sz >= 0 {
log.WithField("size", sz).Debug("NetDB Size: " + strconv.Itoa(sz))
} else {
log.Warn("Unable to determine NetDB size")
}
if r.ndb.Size() < r.cfg.Bootstrap.LowPeerThreshold {
if r.StdNetDB.Size() < r.cfg.Bootstrap.LowPeerThreshold {
log.Info("NetDB below threshold, initiating reseed")
// Create a bootstrap instance
bootstrapper := bootstrap.NewReseedBootstrap(r.cfg.Bootstrap)
// Reseed the network database
if err := r.ndb.Reseed(bootstrapper, r.cfg.Bootstrap.LowPeerThreshold); err != nil {
if err := r.StdNetDB.Reseed(bootstrapper, r.cfg.Bootstrap.LowPeerThreshold); err != nil {
log.WithError(err).Warn("Initial reseed failed, continuing with limited NetDB")
// Continue anyway, we might have some peers
}

View File

@@ -136,18 +136,22 @@ func (rt *RouterTimestamper) performTimeQuery() bool {
lastFailed := true
if rt.priorityServers != nil {
for _, servers := range rt.priorityServers {
lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6)
if !lastFailed {
break
}
// Read configuration safely by creating copies under mutex protection
rt.mutex.Lock()
priorityServers := rt.priorityServers
servers := rt.servers
rt.mutex.Unlock()
for _, serverList := range priorityServers {
lastFailed = !rt.queryTime(serverList, shortTimeout, preferIPv6)
if !lastFailed {
break
}
}
if rt.priorityServers == nil || lastFailed {
if len(priorityServers) == 0 || lastFailed {
prefIPv6 := preferIPv6 && rt.secureRandBool(0.75)
lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6)
lastFailed = !rt.queryTime(servers, defaultTimeout, prefIPv6)
}
rt.mutex.Lock()
@@ -229,7 +233,13 @@ func (rt *RouterTimestamper) run() {
rt.consecutiveFails = 0
randomDelay := time.Duration(rand.Int63n(int64(rt.queryFrequency / 2)))
sleepTime = rt.queryFrequency + randomDelay
if rt.wellSynced {
// Safely read wellSynced with mutex protection
rt.mutex.Lock()
wellSynced := rt.wellSynced
rt.mutex.Unlock()
if wellSynced {
sleepTime *= 3
}
}
@@ -276,7 +286,11 @@ func (rt *RouterTimestamper) runOnce() {
func (rt *RouterTimestamper) queryTime(servers []string, timeout time.Duration, preferIPv6 bool) bool {
found := make([]time.Duration, rt.concurringServers)
var expectedDelta time.Duration
// Safely set wellSynced to false with mutex protection
rt.mutex.Lock()
rt.wellSynced = false
rt.mutex.Unlock()
for i := 0; i < rt.concurringServers; i++ {
server := servers[rand.Intn(len(servers))]
@@ -302,7 +316,10 @@ func (rt *RouterTimestamper) queryTime(servers []string, timeout time.Duration,
if i == 0 {
if absDuration(delta) < maxVariance {
if absDuration(delta) < 500*time.Millisecond {
// Safely set wellSynced to true with mutex protection
rt.mutex.Lock()
rt.wellSynced = true
rt.mutex.Unlock()
}
break
} else {
@@ -329,6 +346,10 @@ func (rt *RouterTimestamper) stampTime(now time.Time) {
}
func (rt *RouterTimestamper) updateConfig() {
// Protect configuration updates with mutex to prevent race conditions
rt.mutex.Lock()
defer rt.mutex.Unlock()
serverList := defaultServerList
rt.servers = strings.Split(serverList, ",")
for i, server := range rt.servers {
@@ -413,3 +434,30 @@ func (rt *RouterTimestamper) GetCurrentTime() time.Time {
}
return time.Now() // Fallback to system time
}
// GetServers returns a copy of the current server list safely
func (rt *RouterTimestamper) GetServers() []string {
rt.mutex.Lock()
defer rt.mutex.Unlock()
servers := make([]string, len(rt.servers))
copy(servers, rt.servers)
return servers
}
// GetPriorityServers returns a copy of the current priority server lists safely
func (rt *RouterTimestamper) GetPriorityServers() [][]string {
rt.mutex.Lock()
defer rt.mutex.Unlock()
if rt.priorityServers == nil {
return nil
}
priorityServers := make([][]string, len(rt.priorityServers))
for i, serverList := range rt.priorityServers {
priorityServers[i] = make([]string, len(serverList))
copy(priorityServers[i], serverList)
}
return priorityServers
}

View File

@@ -139,8 +139,8 @@ func TestTimestampNowWithRealNTP(t *testing.T) {
}
}
t.Logf("NTP Servers: %v", timestamper.servers)
t.Logf("Priority Servers: %v", timestamper.priorityServers)
t.Logf("NTP Servers: %v", timestamper.GetServers())
t.Logf("Priority Servers: %v", timestamper.GetPriorityServers())
}
func TestWaitForInitialization(t *testing.T) {