mirror of
https://github.com/go-i2p/go-i2p.git
synced 2025-08-19 09:45:28 -04:00
Compare commits
12 Commits
9c9f19b904
...
61008ce988
Author | SHA1 | Date | |
---|---|---|---|
![]() |
61008ce988 | ||
![]() |
294cd52343 | ||
![]() |
322b431e8f | ||
![]() |
79df514a2f | ||
![]() |
36762c61c5 | ||
![]() |
a8ac47431e | ||
![]() |
bd947ec2ca | ||
![]() |
61700c8a05 | ||
![]() |
d6b8918a14 | ||
![]() |
43aa3a72a6 | ||
![]() |
5d04d24bc5 | ||
![]() |
f168794ca3 |
@@ -177,94 +177,122 @@ var ERR_BUILD_REQUEST_RECORD_NOT_ENOUGH_DATA = oops.Errorf("not enough i2np buil
|
||||
|
||||
func ReadBuildRequestRecord(data []byte) (BuildRequestRecord, error) {
|
||||
log.Debug("Reading BuildRequestRecord")
|
||||
build_request_record := BuildRequestRecord{}
|
||||
|
||||
record := BuildRequestRecord{}
|
||||
|
||||
if err := parseTunnelIdentifiers(data, &record); err != nil {
|
||||
return record, err
|
||||
}
|
||||
|
||||
if err := parseSessionKeys(data, &record); err != nil {
|
||||
return record, err
|
||||
}
|
||||
|
||||
if err := parseMetadata(data, &record); err != nil {
|
||||
return record, err
|
||||
}
|
||||
|
||||
log.Debug("BuildRequestRecord read successfully")
|
||||
return record, nil
|
||||
}
|
||||
|
||||
// parseTunnelIdentifiers extracts tunnel and identity information from the record data.
|
||||
func parseTunnelIdentifiers(data []byte, record *BuildRequestRecord) error {
|
||||
receive_tunnel, err := readBuildRequestRecordReceiveTunnel(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read ReceiveTunnel")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.ReceiveTunnel = receive_tunnel
|
||||
record.ReceiveTunnel = receive_tunnel
|
||||
|
||||
our_ident, err := readBuildRequestRecordOurIdent(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read OurIdent")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.OurIdent = our_ident
|
||||
record.OurIdent = our_ident
|
||||
|
||||
next_tunnel, err := readBuildRequestRecordNextTunnel(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read NextTunnel")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.NextTunnel = next_tunnel
|
||||
record.NextTunnel = next_tunnel
|
||||
|
||||
next_ident, err := readBuildRequestRecordNextIdent(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read NextIdent")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.NextIdent = next_ident
|
||||
record.NextIdent = next_ident
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseSessionKeys extracts all cryptographic keys from the record data.
|
||||
func parseSessionKeys(data []byte, record *BuildRequestRecord) error {
|
||||
layer_key, err := readBuildRequestRecordLayerKey(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read LayerKey")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.LayerKey = layer_key
|
||||
record.LayerKey = layer_key
|
||||
|
||||
iv_key, err := readBuildRequestRecordIVKey(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read IVKey")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.IVKey = iv_key
|
||||
record.IVKey = iv_key
|
||||
|
||||
reply_key, err := readBuildRequestRecordReplyKey(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read ReplyKey")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.ReplyKey = reply_key
|
||||
record.ReplyKey = reply_key
|
||||
|
||||
reply_iv, err := readBuildRequestRecordReplyIV(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read ReplyIV")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.ReplyIV = reply_iv
|
||||
record.ReplyIV = reply_iv
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseMetadata extracts flags, timestamps, and padding from the record data.
|
||||
func parseMetadata(data []byte, record *BuildRequestRecord) error {
|
||||
flag, err := readBuildRequestRecordFlag(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read Flag")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.Flag = flag
|
||||
record.Flag = flag
|
||||
|
||||
request_time, err := readBuildRequestRecordRequestTime(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read RequestTime")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.RequestTime = request_time
|
||||
record.RequestTime = request_time
|
||||
|
||||
send_message_id, err := readBuildRequestRecordSendMessageID(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read SendMessageID")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.SendMessageID = send_message_id
|
||||
record.SendMessageID = send_message_id
|
||||
|
||||
padding, err := readBuildRequestRecordPadding(data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read Padding")
|
||||
return build_request_record, err
|
||||
return err
|
||||
}
|
||||
build_request_record.Padding = padding
|
||||
record.Padding = padding
|
||||
|
||||
log.Debug("BuildRequestRecord read successfully")
|
||||
return build_request_record, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func readBuildRequestRecordReceiveTunnel(data []byte) (tunnel.TunnelID, error) {
|
||||
|
18
lib/i2np/constants.go
Normal file
18
lib/i2np/constants.go
Normal file
@@ -0,0 +1,18 @@
|
||||
package i2np
|
||||
|
||||
// I2NP Message Type Constants
|
||||
// Moved from: header.go
|
||||
const (
|
||||
I2NP_MESSAGE_TYPE_DATABASE_STORE = 1
|
||||
I2NP_MESSAGE_TYPE_DATABASE_LOOKUP = 2
|
||||
I2NP_MESSAGE_TYPE_DATABASE_SEARCH_REPLY = 3
|
||||
I2NP_MESSAGE_TYPE_DELIVERY_STATUS = 10
|
||||
I2NP_MESSAGE_TYPE_GARLIC = 11
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_DATA = 18
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_GATEWAY = 19
|
||||
I2NP_MESSAGE_TYPE_DATA = 20
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_BUILD = 21
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_BUILD_REPLY = 22
|
||||
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD = 23
|
||||
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD_REPLY = 24
|
||||
)
|
@@ -61,21 +61,6 @@ data ::
|
||||
purpose -> actual message contents
|
||||
*/
|
||||
|
||||
const (
|
||||
I2NP_MESSAGE_TYPE_DATABASE_STORE = 1
|
||||
I2NP_MESSAGE_TYPE_DATABASE_LOOKUP = 2
|
||||
I2NP_MESSAGE_TYPE_DATABASE_SEARCH_REPLY = 3
|
||||
I2NP_MESSAGE_TYPE_DELIVERY_STATUS = 10
|
||||
I2NP_MESSAGE_TYPE_GARLIC = 11
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_DATA = 18
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_GATEWAY = 19
|
||||
I2NP_MESSAGE_TYPE_DATA = 20
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_BUILD = 21
|
||||
I2NP_MESSAGE_TYPE_TUNNEL_BUILD_REPLY = 22
|
||||
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD = 23
|
||||
I2NP_MESSAGE_TYPE_VARIABLE_TUNNEL_BUILD_REPLY = 24
|
||||
)
|
||||
|
||||
type I2NPNTCPHeader struct {
|
||||
Type int
|
||||
MessageID int
|
||||
|
@@ -241,7 +241,8 @@ func (mr *MessageRouter) RouteTunnelMessage(msg interface{}) error {
|
||||
|
||||
// CreateTunnelRecord creates a build request record with interface methods
|
||||
func CreateTunnelRecord(receiveTunnel, nextTunnel tunnel.TunnelID,
|
||||
ourIdent, nextIdent common.Hash) TunnelIdentifier {
|
||||
ourIdent, nextIdent common.Hash,
|
||||
) TunnelIdentifier {
|
||||
return &BuildRequestRecord{
|
||||
ReceiveTunnel: receiveTunnel,
|
||||
NextTunnel: nextTunnel,
|
||||
|
@@ -117,9 +117,9 @@ func (ks *RouterInfoKeystore) KeyID() string {
|
||||
return "fallback-" + hex.EncodeToString(randomBytes)
|
||||
}
|
||||
if len(public.Bytes()) > 10 {
|
||||
return string(public.Bytes()[:10])
|
||||
return hex.EncodeToString(public.Bytes()[:10])
|
||||
}
|
||||
return string(public.Bytes())
|
||||
return hex.EncodeToString(public.Bytes())
|
||||
}
|
||||
return ks.name
|
||||
}
|
||||
|
5
lib/netdb/constants.go
Normal file
5
lib/netdb/constants.go
Normal file
@@ -0,0 +1,5 @@
|
||||
package netdb
|
||||
|
||||
// Moved from: std.go
|
||||
// name of file to hold precomputed size of netdb
|
||||
const CacheFileName = "sizecache.txt"
|
@@ -16,108 +16,179 @@ type Entry struct {
|
||||
*lease_set.LeaseSet
|
||||
}
|
||||
|
||||
func (e *Entry) WriteTo(w io.Writer) (err error) {
|
||||
// Check if we have a RouterInfo to write
|
||||
// WriteTo writes the Entry to the provided writer.
|
||||
func (e *Entry) WriteTo(w io.Writer) error {
|
||||
if e.RouterInfo != nil {
|
||||
// Get the serialized bytes of the RouterInfo
|
||||
data, err := e.RouterInfo.Bytes()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to serialize RouterInfo: %w", err)
|
||||
}
|
||||
// Check if the data is empty
|
||||
if len(data) == 0 {
|
||||
return fmt.Errorf("RouterInfo data is empty")
|
||||
}
|
||||
|
||||
// Write the entry type indicator (1 for RouterInfo)
|
||||
if _, err = w.Write([]byte{1}); err != nil {
|
||||
return fmt.Errorf("failed to write entry type: %w", err)
|
||||
}
|
||||
|
||||
// Write the length as a 2-byte big-endian value
|
||||
lenBytes := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(lenBytes, uint16(len(data)))
|
||||
if _, err = w.Write(lenBytes); err != nil {
|
||||
return fmt.Errorf("failed to write length: %w", err)
|
||||
}
|
||||
|
||||
// Write the actual RouterInfo data
|
||||
if _, err = w.Write(data); err != nil {
|
||||
return fmt.Errorf("failed to write RouterInfo data: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return e.writeRouterInfo(w)
|
||||
}
|
||||
|
||||
// Check if we have a LeaseSet to write
|
||||
if e.LeaseSet != nil {
|
||||
// Get the serialized bytes of the LeaseSet
|
||||
data, err := e.LeaseSet.Bytes()
|
||||
|
||||
// Write the entry type indicator (2 for LeaseSet)
|
||||
if _, err = w.Write([]byte{2}); err != nil {
|
||||
return fmt.Errorf("failed to write entry type: %w", err)
|
||||
}
|
||||
|
||||
// Write the length as a 2-byte big-endian value
|
||||
lenBytes := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(lenBytes, uint16(len(data)))
|
||||
if _, err = w.Write(lenBytes); err != nil {
|
||||
return fmt.Errorf("failed to write length: %w", err)
|
||||
}
|
||||
|
||||
// Write the actual LeaseSet data
|
||||
if _, err = w.Write(data); err != nil {
|
||||
return fmt.Errorf("failed to write LeaseSet data: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return e.writeLeaseSet(w)
|
||||
}
|
||||
|
||||
return fmt.Errorf("entry contains neither RouterInfo nor LeaseSet")
|
||||
}
|
||||
|
||||
func (e *Entry) ReadFrom(r io.Reader) (err error) {
|
||||
// Read the entry type indicator
|
||||
typeBytes := make([]byte, 1)
|
||||
if _, err = r.Read(typeBytes); err != nil {
|
||||
return fmt.Errorf("failed to read entry type: %w", err)
|
||||
// writeRouterInfo writes a RouterInfo entry to the writer.
|
||||
func (e *Entry) writeRouterInfo(w io.Writer) error {
|
||||
data, err := e.serializeRouterInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Read the length
|
||||
lenBytes := make([]byte, 2)
|
||||
if _, err = r.Read(lenBytes); err != nil {
|
||||
return fmt.Errorf("failed to read length: %w", err)
|
||||
}
|
||||
dataLen := binary.BigEndian.Uint16(lenBytes)
|
||||
return e.writeEntryData(w, 1, data)
|
||||
}
|
||||
|
||||
// Read the entry data
|
||||
data := make([]byte, dataLen)
|
||||
if _, err = io.ReadFull(r, data); err != nil {
|
||||
return fmt.Errorf("failed to read entry data: %w", err)
|
||||
// writeLeaseSet writes a LeaseSet entry to the writer.
|
||||
func (e *Entry) writeLeaseSet(w io.Writer) error {
|
||||
data, err := e.serializeLeaseSet()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Process based on entry type
|
||||
switch typeBytes[0] {
|
||||
case 1: // RouterInfo
|
||||
ri, _, err := router_info.ReadRouterInfo(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse RouterInfo: %w", err)
|
||||
}
|
||||
e.RouterInfo = &ri
|
||||
e.LeaseSet = nil
|
||||
return e.writeEntryData(w, 2, data)
|
||||
}
|
||||
|
||||
case 2: // LeaseSet
|
||||
ls, err := lease_set.ReadLeaseSet(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse LeaseSet: %w", err)
|
||||
}
|
||||
e.LeaseSet = &ls
|
||||
e.RouterInfo = nil
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unknown entry type: %d", typeBytes[0])
|
||||
// serializeRouterInfo serializes the RouterInfo and validates the result.
|
||||
func (e *Entry) serializeRouterInfo() ([]byte, error) {
|
||||
data, err := e.RouterInfo.Bytes()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize RouterInfo: %w", err)
|
||||
}
|
||||
|
||||
if len(data) == 0 {
|
||||
return nil, fmt.Errorf("RouterInfo data is empty")
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// serializeLeaseSet serializes the LeaseSet and validates the result.
|
||||
func (e *Entry) serializeLeaseSet() ([]byte, error) {
|
||||
data, err := e.LeaseSet.Bytes()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to serialize LeaseSet: %w", err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// writeEntryData writes entry data with type indicator and length prefix.
|
||||
func (e *Entry) writeEntryData(w io.Writer, entryType byte, data []byte) error {
|
||||
if err := e.writeEntryType(w, entryType); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := e.writeDataLength(w, len(data)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.writeData(w, data)
|
||||
}
|
||||
|
||||
// writeEntryType writes the entry type indicator.
|
||||
func (e *Entry) writeEntryType(w io.Writer, entryType byte) error {
|
||||
if _, err := w.Write([]byte{entryType}); err != nil {
|
||||
return fmt.Errorf("failed to write entry type: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeDataLength writes the data length as a 2-byte big-endian value.
|
||||
func (e *Entry) writeDataLength(w io.Writer, length int) error {
|
||||
lenBytes := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(lenBytes, uint16(length))
|
||||
if _, err := w.Write(lenBytes); err != nil {
|
||||
return fmt.Errorf("failed to write length: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// writeData writes the actual entry data.
|
||||
func (e *Entry) writeData(w io.Writer, data []byte) error {
|
||||
if _, err := w.Write(data); err != nil {
|
||||
return fmt.Errorf("failed to write data: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Entry) ReadFrom(r io.Reader) (err error) {
|
||||
entryType, err := e.readEntryType(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
data, err := e.readEntryData(r)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.processEntryData(entryType, data)
|
||||
}
|
||||
|
||||
// readEntryType reads and returns the entry type indicator from the reader.
|
||||
func (e *Entry) readEntryType(r io.Reader) (byte, error) {
|
||||
typeBytes := make([]byte, 1)
|
||||
if _, err := r.Read(typeBytes); err != nil {
|
||||
return 0, fmt.Errorf("failed to read entry type: %w", err)
|
||||
}
|
||||
return typeBytes[0], nil
|
||||
}
|
||||
|
||||
// readEntryData reads the length and data from the reader.
|
||||
func (e *Entry) readEntryData(r io.Reader) ([]byte, error) {
|
||||
dataLen, err := e.readDataLength(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data := make([]byte, dataLen)
|
||||
if _, err = io.ReadFull(r, data); err != nil {
|
||||
return nil, fmt.Errorf("failed to read entry data: %w", err)
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
||||
|
||||
// readDataLength reads and returns the data length from the reader.
|
||||
func (e *Entry) readDataLength(r io.Reader) (uint16, error) {
|
||||
lenBytes := make([]byte, 2)
|
||||
if _, err := r.Read(lenBytes); err != nil {
|
||||
return 0, fmt.Errorf("failed to read length: %w", err)
|
||||
}
|
||||
return binary.BigEndian.Uint16(lenBytes), nil
|
||||
}
|
||||
|
||||
// processEntryData processes the entry data based on the entry type.
|
||||
func (e *Entry) processEntryData(entryType byte, data []byte) error {
|
||||
switch entryType {
|
||||
case 1: // RouterInfo
|
||||
return e.processRouterInfoData(data)
|
||||
case 2: // LeaseSet
|
||||
return e.processLeaseSetData(data)
|
||||
default:
|
||||
return fmt.Errorf("unknown entry type: %d", entryType)
|
||||
}
|
||||
}
|
||||
|
||||
// processRouterInfoData processes RouterInfo data and sets the entry.
|
||||
func (e *Entry) processRouterInfoData(data []byte) error {
|
||||
ri, _, err := router_info.ReadRouterInfo(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse RouterInfo: %w", err)
|
||||
}
|
||||
e.RouterInfo = &ri
|
||||
e.LeaseSet = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// processLeaseSetData processes LeaseSet data and sets the entry.
|
||||
func (e *Entry) processLeaseSetData(data []byte) error {
|
||||
ls, err := lease_set.ReadLeaseSet(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse LeaseSet: %w", err)
|
||||
}
|
||||
e.LeaseSet = &ls
|
||||
e.RouterInfo = nil
|
||||
return nil
|
||||
}
|
||||
|
180
lib/netdb/kad.go
180
lib/netdb/kad.go
@@ -21,6 +21,12 @@ type KademliaResolver struct {
|
||||
pool *tunnel.Pool
|
||||
}
|
||||
|
||||
// peerDistance represents a peer with its calculated XOR distance
|
||||
type peerDistance struct {
|
||||
hash common.Hash
|
||||
distance []byte
|
||||
}
|
||||
|
||||
func (kr *KademliaResolver) Lookup(h common.Hash, timeout time.Duration) (*router_info.RouterInfo, error) {
|
||||
log.WithFields(logrus.Fields{
|
||||
"hash": h,
|
||||
@@ -32,24 +38,48 @@ func (kr *KademliaResolver) Lookup(h common.Hash, timeout time.Duration) (*route
|
||||
defer cancel()
|
||||
|
||||
// Try local lookup first
|
||||
ri := kr.NetworkDatabase.GetRouterInfo(h)
|
||||
if &ri == nil {
|
||||
log.WithField("hash", h).Debug("RouterInfo found locally")
|
||||
return &ri, nil
|
||||
if ri := kr.attemptLocalLookup(h); ri != nil {
|
||||
return ri, nil
|
||||
}
|
||||
|
||||
// If we don't have a tunnel pool, we can't do remote lookups
|
||||
// Technically, we could do a direct lookup, but that would require
|
||||
// the transport anyway which is what I'm working on now.
|
||||
if kr.pool == nil {
|
||||
return nil, fmt.Errorf("tunnel pool required for remote lookups")
|
||||
// Validate remote lookup capability
|
||||
if err := kr.validateRemoteLookupCapability(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Set up channels for the result and errors
|
||||
resultChan := make(chan *router_info.RouterInfo, 1)
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
// Start the Kademlia lookup in a goroutine
|
||||
// Start the remote lookup process
|
||||
kr.performRemoteLookup(ctx, h, timeout, resultChan, errChan)
|
||||
|
||||
// Wait for result, error, or timeout
|
||||
return kr.collectLookupResult(resultChan, errChan, ctx, timeout)
|
||||
}
|
||||
|
||||
// attemptLocalLookup tries to find the RouterInfo locally first.
|
||||
func (kr *KademliaResolver) attemptLocalLookup(h common.Hash) *router_info.RouterInfo {
|
||||
ri := kr.NetworkDatabase.GetRouterInfo(h)
|
||||
// Check if the RouterInfo is valid by comparing with an empty hash
|
||||
var emptyHash common.Hash
|
||||
if ri.IdentHash() != emptyHash {
|
||||
log.WithField("hash", h).Debug("RouterInfo found locally")
|
||||
return &ri
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateRemoteLookupCapability checks if remote lookups are possible.
|
||||
func (kr *KademliaResolver) validateRemoteLookupCapability() error {
|
||||
if kr.pool == nil {
|
||||
return fmt.Errorf("tunnel pool required for remote lookups")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// performRemoteLookup starts the Kademlia lookup process in a goroutine.
|
||||
func (kr *KademliaResolver) performRemoteLookup(ctx context.Context, h common.Hash, timeout time.Duration, resultChan chan *router_info.RouterInfo, errChan chan error) {
|
||||
go func() {
|
||||
// Find the closest peers we know to the target hash
|
||||
closestPeers := kr.findClosestPeers(h)
|
||||
@@ -59,33 +89,45 @@ func (kr *KademliaResolver) Lookup(h common.Hash, timeout time.Duration) (*route
|
||||
}
|
||||
|
||||
// Query each closest peer in parallel
|
||||
for _, peer := range closestPeers {
|
||||
go func(p common.Hash) {
|
||||
ri, err := kr.queryPeer(ctx, p, h)
|
||||
if err != nil {
|
||||
log.WithFields(logrus.Fields{
|
||||
"peer": p,
|
||||
"error": err,
|
||||
}).Debug("Peer query failed")
|
||||
return
|
||||
}
|
||||
|
||||
if ri != nil {
|
||||
resultChan <- ri
|
||||
}
|
||||
}(peer)
|
||||
}
|
||||
kr.queryClosestPeers(ctx, closestPeers, h, resultChan)
|
||||
|
||||
// Allow some time for queries to complete, but eventually give up
|
||||
select {
|
||||
case <-time.After(timeout - time.Second): // Leave 1s buffer for the main select
|
||||
errChan <- fmt.Errorf("kademlia lookup failed to find router info")
|
||||
case <-ctx.Done():
|
||||
// Context was already canceled, main select will handle it
|
||||
}
|
||||
kr.handleLookupTimeout(ctx, timeout, errChan)
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for result, error, or timeout
|
||||
// queryClosestPeers queries each of the closest peers in parallel.
|
||||
func (kr *KademliaResolver) queryClosestPeers(ctx context.Context, peers []common.Hash, target common.Hash, resultChan chan *router_info.RouterInfo) {
|
||||
for _, peer := range peers {
|
||||
go func(p common.Hash) {
|
||||
ri, err := kr.queryPeer(ctx, p, target)
|
||||
if err != nil {
|
||||
log.WithFields(logrus.Fields{
|
||||
"peer": p,
|
||||
"error": err,
|
||||
}).Debug("Peer query failed")
|
||||
return
|
||||
}
|
||||
|
||||
if ri != nil {
|
||||
resultChan <- ri
|
||||
}
|
||||
}(peer)
|
||||
}
|
||||
}
|
||||
|
||||
// handleLookupTimeout manages the timeout for the lookup operation.
|
||||
func (kr *KademliaResolver) handleLookupTimeout(ctx context.Context, timeout time.Duration, errChan chan error) {
|
||||
select {
|
||||
case <-time.After(timeout - time.Second): // Leave 1s buffer for the main select
|
||||
errChan <- fmt.Errorf("kademlia lookup failed to find router info")
|
||||
case <-ctx.Done():
|
||||
// Context was already canceled, main select will handle it
|
||||
}
|
||||
}
|
||||
|
||||
// collectLookupResult waits for and processes the lookup result.
|
||||
func (kr *KademliaResolver) collectLookupResult(resultChan chan *router_info.RouterInfo, errChan chan error, ctx context.Context, timeout time.Duration) (*router_info.RouterInfo, error) {
|
||||
select {
|
||||
case result := <-resultChan:
|
||||
// Store the result in our local database
|
||||
@@ -109,12 +151,19 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
|
||||
return []common.Hash{}
|
||||
}
|
||||
|
||||
// Calculate XOR distance for each peer
|
||||
type peerDistance struct {
|
||||
hash common.Hash
|
||||
distance []byte
|
||||
// Calculate XOR distances for all peers
|
||||
peers := kr.calculatePeerDistances(allRouterInfos, target)
|
||||
if len(peers) == 0 {
|
||||
log.Debug("No suitable peers found after filtering")
|
||||
return []common.Hash{}
|
||||
}
|
||||
|
||||
// Sort and select closest peers
|
||||
return kr.selectClosestPeers(peers, target, K)
|
||||
}
|
||||
|
||||
// calculatePeerDistances calculates XOR distances for all router infos.
|
||||
func (kr *KademliaResolver) calculatePeerDistances(allRouterInfos []router_info.RouterInfo, target common.Hash) []peerDistance {
|
||||
peers := make([]peerDistance, 0, len(allRouterInfos))
|
||||
|
||||
for _, ri := range allRouterInfos {
|
||||
@@ -126,10 +175,7 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
|
||||
}
|
||||
|
||||
// Calculate XOR distance between target and peer
|
||||
distance := make([]byte, len(target))
|
||||
for i := 0; i < len(target); i++ {
|
||||
distance[i] = target[i] ^ peerHash[i]
|
||||
}
|
||||
distance := kr.calculateXORDistance(target, peerHash)
|
||||
|
||||
peers = append(peers, peerDistance{
|
||||
hash: peerHash,
|
||||
@@ -137,23 +183,23 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
|
||||
})
|
||||
}
|
||||
|
||||
if len(peers) == 0 {
|
||||
log.Debug("No suitable peers found after filtering")
|
||||
return []common.Hash{}
|
||||
}
|
||||
return peers
|
||||
}
|
||||
|
||||
// calculateXORDistance calculates the XOR distance between two hashes.
|
||||
func (kr *KademliaResolver) calculateXORDistance(target, peer common.Hash) []byte {
|
||||
distance := make([]byte, len(target))
|
||||
for i := 0; i < len(target); i++ {
|
||||
distance[i] = target[i] ^ peer[i]
|
||||
}
|
||||
return distance
|
||||
}
|
||||
|
||||
// selectClosestPeers sorts peers by distance and returns the K closest ones.
|
||||
func (kr *KademliaResolver) selectClosestPeers(peers []peerDistance, target common.Hash, K int) []common.Hash {
|
||||
// Sort peers by XOR distance (closest first)
|
||||
sort.Slice(peers, func(i, j int) bool {
|
||||
// Compare distances byte by byte (big endian comparison)
|
||||
for k := 0; k < len(peers[i].distance); k++ {
|
||||
if peers[i].distance[k] < peers[j].distance[k] {
|
||||
return true
|
||||
}
|
||||
if peers[i].distance[k] > peers[j].distance[k] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false // Equal distances
|
||||
return kr.compareDistances(peers[i].distance, peers[j].distance)
|
||||
})
|
||||
|
||||
// Take up to K closest peers
|
||||
@@ -176,6 +222,19 @@ func (kr *KademliaResolver) findClosestPeers(target common.Hash) []common.Hash {
|
||||
return result
|
||||
}
|
||||
|
||||
// compareDistances compares two distance byte arrays (big endian comparison).
|
||||
func (kr *KademliaResolver) compareDistances(dist1, dist2 []byte) bool {
|
||||
for k := 0; k < len(dist1); k++ {
|
||||
if dist1[k] < dist2[k] {
|
||||
return true
|
||||
}
|
||||
if dist1[k] > dist2[k] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return false // Equal distances
|
||||
}
|
||||
|
||||
// queryPeer sends a lookup request to a specific peer through the tunnel
|
||||
func (kr *KademliaResolver) queryPeer(ctx context.Context, peer common.Hash, target common.Hash) (*router_info.RouterInfo, error) {
|
||||
// This would send a DatabaseLookup message through the tunnel to the peer
|
||||
@@ -187,14 +246,3 @@ func (kr *KademliaResolver) queryPeer(ctx context.Context, peer common.Hash, tar
|
||||
// Placeholder implementation that would need to be completed
|
||||
return nil, fmt.Errorf("peer query not implemented")
|
||||
}
|
||||
|
||||
// create a new resolver that stores result into a NetworkDatabase and uses a tunnel pool for the lookup
|
||||
func NewKademliaResolver(netDb NetworkDatabase, pool *tunnel.Pool) (r Resolver) {
|
||||
if pool != nil && netDb != nil {
|
||||
r = &KademliaResolver{
|
||||
NetworkDatabase: netDb,
|
||||
pool: pool,
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
17
lib/netdb/kademlia_resolver.go
Normal file
17
lib/netdb/kademlia_resolver.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package netdb
|
||||
|
||||
import (
|
||||
"github.com/go-i2p/go-i2p/lib/tunnel"
|
||||
)
|
||||
|
||||
// Moved from: kad.go
|
||||
// NewKademliaResolver creates a new resolver that stores result into a NetworkDatabase and uses a tunnel pool for the lookup
|
||||
func NewKademliaResolver(netDb NetworkDatabase, pool *tunnel.Pool) (r Resolver) {
|
||||
if pool != nil && netDb != nil {
|
||||
r = &KademliaResolver{
|
||||
NetworkDatabase: netDb,
|
||||
pool: pool,
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
316
lib/netdb/std.go
316
lib/netdb/std.go
@@ -9,6 +9,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-i2p/logger"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -28,51 +29,85 @@ var log = logger.GetGoI2PLogger()
|
||||
type StdNetDB struct {
|
||||
DB string
|
||||
RouterInfos map[common.Hash]Entry
|
||||
riMutex sync.Mutex // mutex for RouterInfos
|
||||
LeaseSets map[common.Hash]Entry
|
||||
lsMutex sync.Mutex // mutex for LeaseSets
|
||||
}
|
||||
|
||||
func NewStdNetDB(db string) StdNetDB {
|
||||
func NewStdNetDB(db string) *StdNetDB {
|
||||
log.WithField("db_path", db).Debug("Creating new StdNetDB")
|
||||
return StdNetDB{
|
||||
return &StdNetDB{
|
||||
DB: db,
|
||||
RouterInfos: make(map[common.Hash]Entry),
|
||||
riMutex: sync.Mutex{},
|
||||
LeaseSets: make(map[common.Hash]Entry),
|
||||
lsMutex: sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (db *StdNetDB) GetRouterInfo(hash common.Hash) (chnl chan router_info.RouterInfo) {
|
||||
log.WithField("hash", hash).Debug("Getting RouterInfo")
|
||||
|
||||
// Check memory cache first
|
||||
if ri, ok := db.RouterInfos[hash]; ok {
|
||||
log.Debug("RouterInfo found in memory cache")
|
||||
chnl = make(chan router_info.RouterInfo)
|
||||
chnl <- *ri.RouterInfo
|
||||
return
|
||||
}
|
||||
|
||||
// Load from file
|
||||
data, err := db.loadRouterInfoFromFile(hash)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to load RouterInfo from file")
|
||||
return nil
|
||||
}
|
||||
|
||||
chnl = make(chan router_info.RouterInfo)
|
||||
ri, err := db.parseAndCacheRouterInfo(hash, data)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to parse RouterInfo")
|
||||
return
|
||||
}
|
||||
|
||||
chnl <- ri
|
||||
return
|
||||
}
|
||||
|
||||
// loadRouterInfoFromFile loads RouterInfo data from the skiplist file.
|
||||
func (db *StdNetDB) loadRouterInfoFromFile(hash common.Hash) ([]byte, error) {
|
||||
fname := db.SkiplistFile(hash)
|
||||
buff := new(bytes.Buffer)
|
||||
if f, err := os.Open(fname); err != nil {
|
||||
log.WithError(err).Error("Failed to open RouterInfo file")
|
||||
return nil
|
||||
} else {
|
||||
if _, err := io.Copy(buff, f); err != nil {
|
||||
log.WithError(err).Error("Failed to read RouterInfo file")
|
||||
return nil
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
f, err := os.Open(fname)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open RouterInfo file: %w", err)
|
||||
}
|
||||
chnl = make(chan router_info.RouterInfo)
|
||||
ri, _, err := router_info.ReadRouterInfo(buff.Bytes())
|
||||
if err == nil {
|
||||
if _, ok := db.RouterInfos[hash]; !ok {
|
||||
log.Debug("Adding RouterInfo to memory cache")
|
||||
db.RouterInfos[hash] = Entry{
|
||||
RouterInfo: &ri,
|
||||
}
|
||||
}
|
||||
chnl <- ri
|
||||
} else {
|
||||
log.WithError(err).Error("Failed to parse RouterInfo")
|
||||
defer f.Close()
|
||||
|
||||
if _, err := io.Copy(buff, f); err != nil {
|
||||
return nil, fmt.Errorf("failed to read RouterInfo file: %w", err)
|
||||
}
|
||||
return
|
||||
|
||||
return buff.Bytes(), nil
|
||||
}
|
||||
|
||||
// parseAndCacheRouterInfo parses RouterInfo data and adds it to the memory cache.
|
||||
func (db *StdNetDB) parseAndCacheRouterInfo(hash common.Hash, data []byte) (router_info.RouterInfo, error) {
|
||||
ri, _, err := router_info.ReadRouterInfo(data)
|
||||
if err != nil {
|
||||
return router_info.RouterInfo{}, fmt.Errorf("failed to parse RouterInfo: %w", err)
|
||||
}
|
||||
|
||||
// Add to cache if not already present
|
||||
if _, ok := db.RouterInfos[hash]; !ok {
|
||||
log.Debug("Adding RouterInfo to memory cache")
|
||||
db.RouterInfos[hash] = Entry{
|
||||
RouterInfo: &ri,
|
||||
}
|
||||
}
|
||||
|
||||
return ri, nil
|
||||
}
|
||||
|
||||
func (db *StdNetDB) GetAllRouterInfos() (ri []router_info.RouterInfo) {
|
||||
@@ -127,9 +162,6 @@ func (db *StdNetDB) Size() (routers int) {
|
||||
return
|
||||
}
|
||||
|
||||
// name of file to hold precomputed size of netdb
|
||||
const CacheFileName = "sizecache.txt"
|
||||
|
||||
// get filepath for storing netdb info cache
|
||||
func (db *StdNetDB) cacheFilePath() string {
|
||||
return filepath.Join(db.Path(), CacheFileName)
|
||||
@@ -150,71 +182,133 @@ func (db *StdNetDB) CheckFilePathValid(fpath string) bool {
|
||||
func (db *StdNetDB) RecalculateSize() (err error) {
|
||||
log.Debug("Recalculating NetDB size")
|
||||
count := 0
|
||||
err = filepath.Walk(db.Path(), func(fname string, info os.FileInfo, err error) error {
|
||||
if info.IsDir() {
|
||||
if !strings.HasPrefix(fname, db.Path()) {
|
||||
if db.Path() == fname {
|
||||
log.Debug("Reached end of NetDB directory")
|
||||
log.Debug("path==name time to exit")
|
||||
return nil
|
||||
}
|
||||
log.Debug("Outside of netDb dir time to exit", db.Path(), " ", fname)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
if db.CheckFilePathValid(fname) {
|
||||
log.WithField("file_name", fname).Debug("Reading RouterInfo file")
|
||||
log.Println("Reading in file:", fname)
|
||||
b, err := os.ReadFile(fname)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read RouterInfo file")
|
||||
return err
|
||||
}
|
||||
ri, _, err := router_info.ReadRouterInfo(b)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to parse RouterInfo")
|
||||
return err
|
||||
}
|
||||
ih := ri.IdentHash().Bytes()
|
||||
log.Printf("Read in IdentHash: %s", base32.EncodeToString(ih[:]))
|
||||
for _, addr := range ri.RouterAddresses() {
|
||||
log.Println(string(addr.Bytes()))
|
||||
log.WithField("address", string(addr.Bytes())).Debug("RouterInfo address")
|
||||
}
|
||||
if ent, ok := db.RouterInfos[ih]; !ok {
|
||||
log.Debug("Adding new RouterInfo to memory cache")
|
||||
db.RouterInfos[ri.IdentHash()] = Entry{
|
||||
RouterInfo: &ri,
|
||||
}
|
||||
} else {
|
||||
log.Debug("RouterInfo already in memory cache")
|
||||
log.Println("entry previously found in table", ent, fname)
|
||||
}
|
||||
ri = router_info.RouterInfo{}
|
||||
count++
|
||||
} else {
|
||||
log.WithField("file_path", fname).Warn("Invalid file path")
|
||||
log.Println("Invalid path error")
|
||||
}
|
||||
|
||||
// Walk through all files and count valid RouterInfos
|
||||
count, err = db.countValidRouterInfos()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to count RouterInfos")
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
log.WithField("count", count).Debug("Finished recalculating NetDB size")
|
||||
str := fmt.Sprintf("%d", count)
|
||||
var f *os.File
|
||||
f, err = os.OpenFile(db.cacheFilePath(), os.O_CREATE|os.O_WRONLY, 0o600)
|
||||
if err == nil {
|
||||
_, err = io.WriteString(f, str)
|
||||
f.Close()
|
||||
log.Debug("Updated NetDB size cache file")
|
||||
} else {
|
||||
log.WithError(err).Error("Failed to update NetDB size cache file")
|
||||
}
|
||||
} else {
|
||||
}
|
||||
|
||||
// Update the cache file with the count
|
||||
err = db.updateSizeCache(count)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to update NetDB size cache file")
|
||||
}
|
||||
return
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// countValidRouterInfos walks through the database directory and counts valid RouterInfo files.
|
||||
func (db *StdNetDB) countValidRouterInfos() (int, error) {
|
||||
count := 0
|
||||
err := filepath.Walk(db.Path(), func(fname string, info os.FileInfo, err error) error {
|
||||
return db.processWalkEntry(fname, info, err, &count)
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
log.WithField("count", count).Debug("Finished counting RouterInfos")
|
||||
}
|
||||
|
||||
return count, err
|
||||
}
|
||||
|
||||
// processWalkEntry handles each entry encountered during the directory walk.
|
||||
func (db *StdNetDB) processWalkEntry(fname string, info os.FileInfo, err error, count *int) error {
|
||||
if info.IsDir() {
|
||||
return db.handleDirectoryWalk(fname, err)
|
||||
}
|
||||
|
||||
if db.CheckFilePathValid(fname) {
|
||||
if err := db.processRouterInfoFile(fname, count); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.WithField("file_path", fname).Warn("Invalid file path")
|
||||
log.Println("Invalid path error")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// handleDirectoryWalk processes directory entries during the walk.
|
||||
func (db *StdNetDB) handleDirectoryWalk(fname string, err error) error {
|
||||
if !strings.HasPrefix(fname, db.Path()) {
|
||||
if db.Path() == fname {
|
||||
log.Debug("Reached end of NetDB directory")
|
||||
log.Debug("path==name time to exit")
|
||||
return nil
|
||||
}
|
||||
log.Debug("Outside of netDb dir time to exit", db.Path(), " ", fname)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// processRouterInfoFile reads and validates a single RouterInfo file.
|
||||
func (db *StdNetDB) processRouterInfoFile(fname string, count *int) error {
|
||||
log.WithField("file_name", fname).Debug("Reading RouterInfo file")
|
||||
log.Println("Reading in file:", fname)
|
||||
|
||||
b, err := os.ReadFile(fname)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to read RouterInfo file")
|
||||
return err
|
||||
}
|
||||
|
||||
ri, _, err := router_info.ReadRouterInfo(b)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to parse RouterInfo")
|
||||
return err
|
||||
}
|
||||
|
||||
// Process the RouterInfo
|
||||
db.logRouterInfoDetails(ri)
|
||||
db.cacheRouterInfo(ri, fname)
|
||||
(*count)++
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// logRouterInfoDetails logs details about the RouterInfo for debugging.
|
||||
func (db *StdNetDB) logRouterInfoDetails(ri router_info.RouterInfo) {
|
||||
ih := ri.IdentHash().Bytes()
|
||||
log.Printf("Read in IdentHash: %s", base32.EncodeToString(ih[:]))
|
||||
|
||||
for _, addr := range ri.RouterAddresses() {
|
||||
log.Println(string(addr.Bytes()))
|
||||
log.WithField("address", string(addr.Bytes())).Debug("RouterInfo address")
|
||||
}
|
||||
}
|
||||
|
||||
// cacheRouterInfo adds the RouterInfo to the in-memory cache if not already present.
|
||||
func (db *StdNetDB) cacheRouterInfo(ri router_info.RouterInfo, fname string) {
|
||||
ih := ri.IdentHash()
|
||||
if ent, ok := db.RouterInfos[ih]; !ok {
|
||||
log.Debug("Adding new RouterInfo to memory cache")
|
||||
db.RouterInfos[ri.IdentHash()] = Entry{
|
||||
RouterInfo: &ri,
|
||||
}
|
||||
} else {
|
||||
log.Debug("RouterInfo already in memory cache")
|
||||
log.Println("entry previously found in table", ent, fname)
|
||||
}
|
||||
}
|
||||
|
||||
// updateSizeCache writes the count to the cache file.
|
||||
func (db *StdNetDB) updateSizeCache(count int) error {
|
||||
str := fmt.Sprintf("%d", count)
|
||||
f, err := os.OpenFile(db.cacheFilePath(), os.O_CREATE|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
_, err = io.WriteString(f, str)
|
||||
if err == nil {
|
||||
log.Debug("Updated NetDB size cache file")
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// return true if the network db directory exists and is writable
|
||||
@@ -273,26 +367,50 @@ func (db *StdNetDB) Save() (err error) {
|
||||
// reseed if we have less than minRouters known routers
|
||||
// returns error if reseed failed
|
||||
func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) {
|
||||
if !db.isReseedRequired(minRouters) {
|
||||
return nil
|
||||
}
|
||||
|
||||
peers, err := db.retrievePeersFromBootstrap(b)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count := db.addNewRouterInfos(peers)
|
||||
log.WithField("added_routers", count).Info("Reseed completed successfully")
|
||||
|
||||
return db.updateCacheAfterReseed()
|
||||
}
|
||||
|
||||
// isReseedRequired checks if reseed is necessary based on current database size.
|
||||
func (db *StdNetDB) isReseedRequired(minRouters int) bool {
|
||||
log.WithField("min_routers", minRouters).Debug("Checking if reseed is necessary")
|
||||
if db.Size() > minRouters {
|
||||
log.Debug("Reseed not necessary")
|
||||
return nil
|
||||
return false
|
||||
}
|
||||
log.Warn("NetDB size below minimum, reseed required")
|
||||
return true
|
||||
}
|
||||
|
||||
// retrievePeersFromBootstrap gets peers from the bootstrap provider with timeout.
|
||||
func (db *StdNetDB) retrievePeersFromBootstrap(b bootstrap.Bootstrap) ([]router_info.RouterInfo, error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), reseed.DefaultDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Get peers from the bootstrap provider
|
||||
peersChan, err := b.GetPeers(ctx, 0) // Get as many peers as possible
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to get peers from bootstrap provider")
|
||||
return fmt.Errorf("bootstrap failed: %w", err)
|
||||
return nil, fmt.Errorf("bootstrap failed: %w", err)
|
||||
}
|
||||
|
||||
// Process the received peers
|
||||
return peersChan, nil
|
||||
}
|
||||
|
||||
// addNewRouterInfos processes and adds new RouterInfos from peers to the database.
|
||||
func (db *StdNetDB) addNewRouterInfos(peers []router_info.RouterInfo) int {
|
||||
count := 0
|
||||
for _, ri := range peersChan {
|
||||
for _, ri := range peers {
|
||||
hash := ri.IdentHash()
|
||||
if _, exists := db.RouterInfos[hash]; !exists {
|
||||
log.WithField("hash", hash).Debug("Adding new RouterInfo from reseed")
|
||||
@@ -302,15 +420,15 @@ func (db *StdNetDB) Reseed(b bootstrap.Bootstrap, minRouters int) (err error) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
return count
|
||||
}
|
||||
|
||||
log.WithField("added_routers", count).Info("Reseed completed successfully")
|
||||
|
||||
// Update the size cache
|
||||
err = db.RecalculateSize()
|
||||
// updateCacheAfterReseed updates the size cache after successful reseed operation.
|
||||
func (db *StdNetDB) updateCacheAfterReseed() error {
|
||||
err := db.RecalculateSize()
|
||||
if err != nil {
|
||||
log.WithError(err).Warn("Failed to update NetDB size cache after reseed")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/go-i2p/go-i2p/lib/bootstrap"
|
||||
)
|
||||
|
||||
// Moved from: netdb.go
|
||||
// resolves unknown RouterInfos given the hash of their RouterIdentity
|
||||
type Resolver interface {
|
||||
// resolve a router info by hash
|
||||
@@ -15,6 +16,7 @@ type Resolver interface {
|
||||
Lookup(hash common.Hash, timeout time.Duration) (*router_info.RouterInfo, error)
|
||||
}
|
||||
|
||||
// Moved from: netdb.go
|
||||
// i2p network database, storage of i2p RouterInfos
|
||||
type NetworkDatabase interface {
|
||||
// obtain a RouterInfo by its hash locally
|
@@ -27,10 +27,10 @@ type Router struct {
|
||||
*keys.RouterInfoKeystore
|
||||
// multi-transport manager
|
||||
*transport.TransportMuxer
|
||||
// netdb
|
||||
*netdb.StdNetDB
|
||||
// router configuration
|
||||
cfg *config.RouterConfig
|
||||
// netdb
|
||||
ndb netdb.StdNetDB
|
||||
// close channel
|
||||
closeChnl chan bool
|
||||
// running flag
|
||||
@@ -160,27 +160,27 @@ func (r *Router) Start() {
|
||||
// run i2p router mainloop
|
||||
func (r *Router) mainloop() {
|
||||
log.Debug("Entering router mainloop")
|
||||
r.ndb = netdb.NewStdNetDB(r.cfg.NetDb.Path)
|
||||
r.StdNetDB = netdb.NewStdNetDB(r.cfg.NetDb.Path)
|
||||
log.WithField("netdb_path", r.cfg.NetDb.Path).Debug("Created StdNetDB")
|
||||
// make sure the netdb is ready
|
||||
var e error
|
||||
if err := r.ndb.Ensure(); err != nil {
|
||||
if err := r.StdNetDB.Ensure(); err != nil {
|
||||
e = err
|
||||
log.WithError(err).Error("Failed to ensure NetDB")
|
||||
}
|
||||
if sz := r.ndb.Size(); sz >= 0 {
|
||||
if sz := r.StdNetDB.Size(); sz >= 0 {
|
||||
log.WithField("size", sz).Debug("NetDB Size: " + strconv.Itoa(sz))
|
||||
} else {
|
||||
log.Warn("Unable to determine NetDB size")
|
||||
}
|
||||
if r.ndb.Size() < r.cfg.Bootstrap.LowPeerThreshold {
|
||||
if r.StdNetDB.Size() < r.cfg.Bootstrap.LowPeerThreshold {
|
||||
log.Info("NetDB below threshold, initiating reseed")
|
||||
|
||||
// Create a bootstrap instance
|
||||
bootstrapper := bootstrap.NewReseedBootstrap(r.cfg.Bootstrap)
|
||||
|
||||
// Reseed the network database
|
||||
if err := r.ndb.Reseed(bootstrapper, r.cfg.Bootstrap.LowPeerThreshold); err != nil {
|
||||
if err := r.StdNetDB.Reseed(bootstrapper, r.cfg.Bootstrap.LowPeerThreshold); err != nil {
|
||||
log.WithError(err).Warn("Initial reseed failed, continuing with limited NetDB")
|
||||
// Continue anyway, we might have some peers
|
||||
}
|
||||
|
@@ -136,18 +136,22 @@ func (rt *RouterTimestamper) performTimeQuery() bool {
|
||||
|
||||
lastFailed := true
|
||||
|
||||
if rt.priorityServers != nil {
|
||||
for _, servers := range rt.priorityServers {
|
||||
lastFailed = !rt.queryTime(servers, shortTimeout, preferIPv6)
|
||||
if !lastFailed {
|
||||
break
|
||||
}
|
||||
// Read configuration safely by creating copies under mutex protection
|
||||
rt.mutex.Lock()
|
||||
priorityServers := rt.priorityServers
|
||||
servers := rt.servers
|
||||
rt.mutex.Unlock()
|
||||
|
||||
for _, serverList := range priorityServers {
|
||||
lastFailed = !rt.queryTime(serverList, shortTimeout, preferIPv6)
|
||||
if !lastFailed {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if rt.priorityServers == nil || lastFailed {
|
||||
if len(priorityServers) == 0 || lastFailed {
|
||||
prefIPv6 := preferIPv6 && rt.secureRandBool(0.75)
|
||||
lastFailed = !rt.queryTime(rt.servers, defaultTimeout, prefIPv6)
|
||||
lastFailed = !rt.queryTime(servers, defaultTimeout, prefIPv6)
|
||||
}
|
||||
|
||||
rt.mutex.Lock()
|
||||
@@ -229,7 +233,13 @@ func (rt *RouterTimestamper) run() {
|
||||
rt.consecutiveFails = 0
|
||||
randomDelay := time.Duration(rand.Int63n(int64(rt.queryFrequency / 2)))
|
||||
sleepTime = rt.queryFrequency + randomDelay
|
||||
if rt.wellSynced {
|
||||
|
||||
// Safely read wellSynced with mutex protection
|
||||
rt.mutex.Lock()
|
||||
wellSynced := rt.wellSynced
|
||||
rt.mutex.Unlock()
|
||||
|
||||
if wellSynced {
|
||||
sleepTime *= 3
|
||||
}
|
||||
}
|
||||
@@ -276,7 +286,11 @@ func (rt *RouterTimestamper) runOnce() {
|
||||
func (rt *RouterTimestamper) queryTime(servers []string, timeout time.Duration, preferIPv6 bool) bool {
|
||||
found := make([]time.Duration, rt.concurringServers)
|
||||
var expectedDelta time.Duration
|
||||
|
||||
// Safely set wellSynced to false with mutex protection
|
||||
rt.mutex.Lock()
|
||||
rt.wellSynced = false
|
||||
rt.mutex.Unlock()
|
||||
|
||||
for i := 0; i < rt.concurringServers; i++ {
|
||||
server := servers[rand.Intn(len(servers))]
|
||||
@@ -302,7 +316,10 @@ func (rt *RouterTimestamper) queryTime(servers []string, timeout time.Duration,
|
||||
if i == 0 {
|
||||
if absDuration(delta) < maxVariance {
|
||||
if absDuration(delta) < 500*time.Millisecond {
|
||||
// Safely set wellSynced to true with mutex protection
|
||||
rt.mutex.Lock()
|
||||
rt.wellSynced = true
|
||||
rt.mutex.Unlock()
|
||||
}
|
||||
break
|
||||
} else {
|
||||
@@ -329,6 +346,10 @@ func (rt *RouterTimestamper) stampTime(now time.Time) {
|
||||
}
|
||||
|
||||
func (rt *RouterTimestamper) updateConfig() {
|
||||
// Protect configuration updates with mutex to prevent race conditions
|
||||
rt.mutex.Lock()
|
||||
defer rt.mutex.Unlock()
|
||||
|
||||
serverList := defaultServerList
|
||||
rt.servers = strings.Split(serverList, ",")
|
||||
for i, server := range rt.servers {
|
||||
@@ -413,3 +434,30 @@ func (rt *RouterTimestamper) GetCurrentTime() time.Time {
|
||||
}
|
||||
return time.Now() // Fallback to system time
|
||||
}
|
||||
|
||||
// GetServers returns a copy of the current server list safely
|
||||
func (rt *RouterTimestamper) GetServers() []string {
|
||||
rt.mutex.Lock()
|
||||
defer rt.mutex.Unlock()
|
||||
|
||||
servers := make([]string, len(rt.servers))
|
||||
copy(servers, rt.servers)
|
||||
return servers
|
||||
}
|
||||
|
||||
// GetPriorityServers returns a copy of the current priority server lists safely
|
||||
func (rt *RouterTimestamper) GetPriorityServers() [][]string {
|
||||
rt.mutex.Lock()
|
||||
defer rt.mutex.Unlock()
|
||||
|
||||
if rt.priorityServers == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
priorityServers := make([][]string, len(rt.priorityServers))
|
||||
for i, serverList := range rt.priorityServers {
|
||||
priorityServers[i] = make([]string, len(serverList))
|
||||
copy(priorityServers[i], serverList)
|
||||
}
|
||||
return priorityServers
|
||||
}
|
||||
|
@@ -139,8 +139,8 @@ func TestTimestampNowWithRealNTP(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
t.Logf("NTP Servers: %v", timestamper.servers)
|
||||
t.Logf("Priority Servers: %v", timestamper.priorityServers)
|
||||
t.Logf("NTP Servers: %v", timestamper.GetServers())
|
||||
t.Logf("Priority Servers: %v", timestamper.GetPriorityServers())
|
||||
}
|
||||
|
||||
func TestWaitForInitialization(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user