Enhance logging throughout SAM connection and datagram handling for better traceability and error reporting

This commit is contained in:
eyedeekay
2025-10-20 20:57:51 -04:00
parent a92e0dc1a1
commit 42c4c03732
7 changed files with 212 additions and 12 deletions

View File

@@ -42,26 +42,39 @@ func NewFullSAMResolver(address string) (*SAMResolver, error) {
// Performs a lookup, probably this order: 1) routers known addresses, cached
// addresses, 3) by asking peers in the I2P network.
func (sam *SAMResolver) Resolve(name string) (i2pkeys.I2PAddr, error) {
log.WithField("name", name).Debug("Resolving name")
log.WithField("name", name).Debug("Starting name resolution")
// Trim away the port, if it appears
name = strings.Split(name, ":")[0]
log.WithField("name", name).Debug("Sending lookup request")
if err := sam.sendLookupRequest(name, false); err != nil {
log.WithField("name", name).WithError(err).Error("Failed to send lookup request")
return i2pkeys.I2PAddr(""), err
}
response, err := sam.readLookupResponse()
if err != nil {
log.WithField("name", name).WithError(err).Error("Failed to read lookup response")
return i2pkeys.I2PAddr(""), err
}
scanner, err := sam.prepareLookupScanner(response)
if err != nil {
log.WithField("name", name).WithError(err).Error("Failed to prepare lookup scanner")
return i2pkeys.I2PAddr(""), err
}
addr, _, err := sam.processLookupResponse(scanner, name)
if err != nil {
log.WithField("name", name).WithError(err).Error("Failed to process lookup response")
} else {
log.WithFields(logger.Fields{
"name": name,
"address": addr.Base32(),
}).Debug("Successfully resolved name")
}
return addr, err
}

View File

@@ -11,10 +11,15 @@ import (
// This is an internal helper function used during SAM instance initialization.
// Returns the established connection or an error if the connection fails.
func connectToSAM(address string) (net.Conn, error) {
log.WithField("address", address).Debug("Connecting to SAM bridge")
conn, err := net.Dial("tcp", address)
if err != nil {
log.WithField("address", address).WithError(err).Error("Failed to connect to SAM bridge")
return nil, oops.Errorf("failed to connect to SAM bridge at %s: %w", address, err)
}
log.WithField("address", address).Debug("Successfully connected to SAM bridge")
return conn, nil
}
@@ -22,24 +27,33 @@ func connectToSAM(address string) (net.Conn, error) {
// This internal function sends the HELLO message and ensures the SAM bridge supports the protocol.
// Returns an error if the handshake fails or the protocol version is unsupported.
func sendHelloAndValidate(conn net.Conn, s *SAM) error {
if _, err := conn.Write(s.SAMEmit.HelloBytes()); err != nil {
helloMessage := s.SAMEmit.HelloBytes()
log.WithField("message", string(helloMessage)).Debug("Sending HELLO message to SAM bridge")
if _, err := conn.Write(helloMessage); err != nil {
log.WithError(err).Error("Failed to send HELLO message")
return oops.Errorf("failed to send hello message: %w", err)
}
buf := make([]byte, 256)
n, err := conn.Read(buf)
if err != nil {
log.WithError(err).Error("Failed to read SAM HELLO response")
return oops.Errorf("failed to read SAM response: %w", err)
}
response := string(buf[:n])
log.WithField("response", response).Debug("Received SAM HELLO response")
switch {
case strings.Contains(response, HELLO_REPLY_OK):
log.Debug("SAM hello successful")
return nil
case response == HELLO_REPLY_NOVERSION:
log.Error("SAM bridge does not support SAMv3")
return oops.Errorf("SAM bridge does not support SAMv3")
default:
log.WithField("response", response).Error("Unexpected SAM response")
return oops.Errorf("unexpected SAM response: %s", response)
}
}

View File

@@ -1,6 +1,10 @@
package datagram
import "net"
import (
"net"
"github.com/go-i2p/logger"
)
// ReadFrom reads a datagram message from the session, storing it in p.
// It returns the number of bytes read (n), the sender's I2P address (addr),
@@ -18,14 +22,31 @@ import "net"
// }
// fmt.Printf("Received %d bytes from %s\n", n, addr.Base32())
func (ds *DatagramSession) ReadFrom(p []byte) (n int, addr net.Addr, err error) {
log.WithField("session_id", ds.ID()).Debug("ReadFrom: waiting for datagram")
dg, err := ds.ReceiveDatagram()
if err != nil {
log.WithField("session_id", ds.ID()).WithError(err).Error("ReadFrom: failed to receive datagram")
return 0, addr, err
}
bytesToCopy := len(dg.Data)
truncated := false
if len(p) < len(dg.Data) {
bytesToCopy = len(p)
truncated = true
copy(p, dg.Data[:len(p)])
} else {
copy(p, dg.Data)
}
log.WithFields(logger.Fields{
"session_id": ds.ID(),
"source": dg.Source.String(),
"bytes_read": bytesToCopy,
"datagram_size": len(dg.Data),
"truncated": truncated,
}).Debug("ReadFrom: datagram received successfully")
return len(dg.Data), dg.Source, nil
}

View File

@@ -4,6 +4,7 @@ import (
"net"
"github.com/go-i2p/i2pkeys"
"github.com/go-i2p/logger"
)
// WriteTo sends a datagram message to the specified I2P address.
@@ -28,12 +29,23 @@ import (
// }
// fmt.Printf("Sent %d bytes to %s\n", n, addr.Base32())
func (ds *DatagramSession) WriteTo(p []byte, addr net.Addr) (n int, err error) {
log.WithFields(logger.Fields{
"session_id": ds.ID(),
"destination": addr.String(),
"data_size": len(p),
}).Debug("WriteTo: sending datagram")
switch addr := addr.(type) {
case *i2pkeys.I2PAddr:
// Valid I2P address type, proceed to send datagram
case i2pkeys.I2PAddr:
// Valid I2P address type, proceed to send datagram
default:
log.WithFields(logger.Fields{
"session_id": ds.ID(),
"addr_type": addr.Network(),
"addr": addr.String(),
}).Error("WriteTo: invalid address type")
return 0, &net.OpError{
Op: "write",
Net: "i2p-datagram",
@@ -41,6 +53,21 @@ func (ds *DatagramSession) WriteTo(p []byte, addr net.Addr) (n int, err error) {
Err: &net.AddrError{Err: "invalid address type", Addr: addr.String()},
}
}
err = ds.SendDatagram(p, addr.(i2pkeys.I2PAddr))
if err != nil {
log.WithFields(logger.Fields{
"session_id": ds.ID(),
"destination": addr.String(),
"data_size": len(p),
}).WithError(err).Error("WriteTo: failed to send datagram")
} else {
log.WithFields(logger.Fields{
"session_id": ds.ID(),
"destination": addr.String(),
"bytes_sent": len(p),
}).Debug("WriteTo: datagram sent successfully")
}
return len(p), err
}

View File

@@ -4,8 +4,8 @@ import (
"net"
"time"
"github.com/samber/oops"
"github.com/go-i2p/logger"
"github.com/samber/oops"
)
// Read reads data from the connection into the provided buffer.
@@ -17,6 +17,10 @@ func (c *StreamConn) Read(b []byte) (int, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
}).Debug("Read attempted on closed connection")
return 0, oops.Errorf("connection is closed")
}
conn := c.conn
@@ -25,9 +29,16 @@ func (c *StreamConn) Read(b []byte) (int, error) {
n, err := conn.Read(b)
if err != nil {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"bytes_read": n,
}).WithError(err).Debug("Read error")
} else {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"bytes_read": n,
}).Debug("Read successful")
}
return n, err
}
@@ -41,6 +52,10 @@ func (c *StreamConn) Write(b []byte) (int, error) {
c.mu.RLock()
if c.closed {
c.mu.RUnlock()
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
}).Debug("Write attempted on closed connection")
return 0, oops.Errorf("connection is closed")
}
conn := c.conn
@@ -49,9 +64,17 @@ func (c *StreamConn) Write(b []byte) (int, error) {
n, err := conn.Write(b)
if err != nil {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"bytes_written": n,
"buffer_size": len(b),
}).WithError(err).Debug("Write error")
} else {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"bytes_written": n,
}).Debug("Write successful")
}
return n, err
}
@@ -113,6 +136,12 @@ func (c *StreamConn) RemoteAddr() net.Addr {
// connection timeouts for both read and write operations.
// Example usage: conn.SetDeadline(time.Now().Add(30*time.Second))
func (c *StreamConn) SetDeadline(t time.Time) error {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"deadline": t,
}).Debug("Setting connection deadline")
if err := c.SetReadDeadline(t); err != nil {
return err
}
@@ -130,9 +159,19 @@ func (c *StreamConn) SetReadDeadline(t time.Time) error {
c.mu.RUnlock()
if conn == nil {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
}).Error("SetReadDeadline called on nil connection")
return oops.Errorf("connection is nil")
}
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"read_deadline": t,
}).Debug("Setting read deadline")
return conn.SetReadDeadline(t)
}
@@ -147,8 +186,18 @@ func (c *StreamConn) SetWriteDeadline(t time.Time) error {
c.mu.RUnlock()
if conn == nil {
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
}).Error("SetWriteDeadline called on nil connection")
return oops.Errorf("connection is nil")
}
log.WithFields(logger.Fields{
"local": c.laddr.Base32(),
"remote": c.raddr.Base32(),
"write_deadline": t,
}).Debug("Setting write deadline")
return conn.SetWriteDeadline(t)
}

View File

@@ -8,8 +8,8 @@ import (
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
"github.com/samber/oops"
"github.com/go-i2p/logger"
"github.com/samber/oops"
)
// Dial establishes a connection to the specified destination using the default context.
@@ -36,12 +36,27 @@ func (d *StreamDialer) DialI2P(addr i2pkeys.I2PAddr) (*StreamConn, error) {
// default timeout and provides fine-grained control over connection establishment.
// Example usage: conn, err := dialer.DialContext(ctx, "destination.b32.i2p")
func (d *StreamDialer) DialContext(ctx context.Context, destination string) (*StreamConn, error) {
log.WithFields(logger.Fields{
"session_id": d.session.ID(),
"destination": destination,
}).Debug("DialContext: starting connection with destination resolution")
// First resolve the destination
addr, err := d.session.sam.Lookup(destination)
if err != nil {
log.WithFields(logger.Fields{
"session_id": d.session.ID(),
"destination": destination,
}).WithError(err).Error("DialContext: failed to resolve destination")
return nil, oops.Errorf("failed to resolve destination %s: %w", destination, err)
}
log.WithFields(logger.Fields{
"session_id": d.session.ID(),
"destination": destination,
"resolved": addr.Base32(),
}).Debug("DialContext: destination resolved successfully")
return d.DialI2PContext(ctx, addr)
}
@@ -76,6 +91,7 @@ func (d *StreamDialer) validateSessionState() error {
defer d.session.mu.RUnlock()
if d.session.closed {
log.WithField("session_id", d.session.ID()).Error("Dial attempted on closed session")
return oops.Errorf("session is closed")
}
return nil
@@ -91,11 +107,25 @@ func (d *StreamDialer) logDialAttempt(addr i2pkeys.I2PAddr) {
// createSAMConnection creates a new SAM connection for the dial operation.
func (d *StreamDialer) createSAMConnection() (*common.SAM, error) {
log.WithFields(logger.Fields{
"session_id": d.session.ID(),
"sam_address": d.session.sam.Sam(),
}).Debug("Creating SAM connection for dial")
sam, err := common.NewSAM(d.session.sam.Sam())
if err != nil {
log.WithError(err).Error("Failed to create SAM connection")
log.WithFields(logger.Fields{
"session_id": d.session.ID(),
"sam_address": d.session.sam.Sam(),
}).WithError(err).Error("Failed to create SAM connection")
return nil, oops.Errorf("failed to create SAM connection: %w", err)
}
log.WithFields(logger.Fields{
"session_id": d.session.ID(),
"sam_address": d.session.sam.Sam(),
}).Debug("SAM connection created successfully")
return sam, nil
}

View File

@@ -32,16 +32,26 @@ func (l *StreamListener) AcceptStream() (*StreamConn, error) {
l.mu.RLock()
if l.closed {
l.mu.RUnlock()
log.WithField("session_id", l.session.ID()).Debug("AcceptStream called on closed listener")
return nil, oops.Errorf("listener is closed")
}
l.mu.RUnlock()
log.WithField("session_id", l.session.ID()).Debug("Waiting for incoming connection")
select {
case conn := <-l.acceptChan:
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"local": conn.LocalAddr().String(),
"remote": conn.RemoteAddr().String(),
}).Debug("Connection accepted from channel")
return conn, nil
case err := <-l.errorChan:
log.WithField("session_id", l.session.ID()).WithError(err).Error("Accept error received")
return nil, err
case <-l.closeChan:
log.WithField("session_id", l.session.ID()).Debug("Listener closed while waiting for connection")
return nil, oops.Errorf("listener is closed")
}
}
@@ -262,11 +272,25 @@ func (l *StreamListener) createAcceptSocket() (*common.SAM, error) {
samAddress = "127.0.0.1:7656"
}
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"sam_address": samAddress,
}).Debug("Creating SAM socket for ACCEPT")
sam, err := common.NewSAM(samAddress)
if err != nil {
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"sam_address": samAddress,
}).WithError(err).Error("Failed to create SAM connection for ACCEPT")
return nil, oops.Errorf("failed to create SAM connection for ACCEPT: %w", err)
}
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"sam_address": samAddress,
}).Debug("Successfully created SAM socket for ACCEPT")
return sam, nil
}
@@ -320,30 +344,46 @@ func (l *StreamListener) parseStreamStatusResponse(sam *common.SAM, logger *logg
// readDestinationLine waits for and reads the destination line when a connection arrives.
// Format: "$destination FROM_PORT=nnn TO_PORT=nnn\n" (SAM 3.2+) or "$destination\n" (SAM 3.0/3.1)
func (l *StreamListener) readDestinationLine(sam *common.SAM, logger *logger.Entry) (string, error) {
logger.Debug("Waiting for destination line from SAM bridge")
destBuf := make([]byte, 4096)
n, err := sam.Read(destBuf)
if err != nil {
logger.WithError(err).Error("Failed to read destination line")
return "", oops.Errorf("failed to read destination: %w", err)
}
destLine := string(destBuf[:n])
logger.WithField("destLine", destLine).Debug("Received destination")
logger.WithField("destLine", destLine).WithField("bytes_read", n).Debug("Received destination line")
// Parse destination line
// Format: "$destination FROM_PORT=nnn TO_PORT=nnn\n" (SAM 3.2+)
// Or just: "$destination\n" (SAM 3.0/3.1)
parts := strings.Fields(destLine)
if len(parts) == 0 {
logger.Error("Empty destination line received")
return "", oops.Errorf("empty destination line")
}
return parts[0], nil
dest := parts[0]
logger.WithField("destination", dest).Debug("Parsed destination address")
return dest, nil
}
// createStreamConnectionWithSocket creates a new StreamConn using the provided accept socket.
func (l *StreamListener) createStreamConnectionWithSocket(dest string, sam *common.SAM) (*StreamConn, error) {
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"destination": dest,
}).Debug("Creating StreamConn from accepted connection")
remoteAddr, err := i2pkeys.NewI2PAddrFromString(dest)
if err != nil {
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"destination": dest,
}).WithError(err).Error("Failed to parse remote address")
return nil, oops.Errorf("failed to parse remote address: %w", err)
}
@@ -355,6 +395,12 @@ func (l *StreamListener) createStreamConnectionWithSocket(dest string, sam *comm
raddr: remoteAddr,
}
log.WithFields(logger.Fields{
"session_id": l.session.ID(),
"local": streamConn.laddr.Base32(),
"remote": streamConn.raddr.Base32(),
}).Debug("Successfully created StreamConn")
return streamConn, nil
}