From 42c4c037320c8f27cb65ed372380fce546036130 Mon Sep 17 00:00:00 2001 From: eyedeekay Date: Mon, 20 Oct 2025 20:57:51 -0400 Subject: [PATCH] Enhance logging throughout SAM connection and datagram handling for better traceability and error reporting --- common/resolver.go | 15 ++++++++++- common/sam3.go | 16 +++++++++++- datagram/read_from.go | 23 ++++++++++++++++- datagram/write_to.go | 27 ++++++++++++++++++++ stream/conn.go | 59 +++++++++++++++++++++++++++++++++++++++---- stream/dialer.go | 34 +++++++++++++++++++++++-- stream/listener.go | 50 ++++++++++++++++++++++++++++++++++-- 7 files changed, 212 insertions(+), 12 deletions(-) diff --git a/common/resolver.go b/common/resolver.go index 69b6eb612..596c78b95 100644 --- a/common/resolver.go +++ b/common/resolver.go @@ -42,26 +42,39 @@ func NewFullSAMResolver(address string) (*SAMResolver, error) { // Performs a lookup, probably this order: 1) routers known addresses, cached // addresses, 3) by asking peers in the I2P network. func (sam *SAMResolver) Resolve(name string) (i2pkeys.I2PAddr, error) { - log.WithField("name", name).Debug("Resolving name") + log.WithField("name", name).Debug("Starting name resolution") // Trim away the port, if it appears name = strings.Split(name, ":")[0] + log.WithField("name", name).Debug("Sending lookup request") + if err := sam.sendLookupRequest(name, false); err != nil { + log.WithField("name", name).WithError(err).Error("Failed to send lookup request") return i2pkeys.I2PAddr(""), err } response, err := sam.readLookupResponse() if err != nil { + log.WithField("name", name).WithError(err).Error("Failed to read lookup response") return i2pkeys.I2PAddr(""), err } scanner, err := sam.prepareLookupScanner(response) if err != nil { + log.WithField("name", name).WithError(err).Error("Failed to prepare lookup scanner") return i2pkeys.I2PAddr(""), err } addr, _, err := sam.processLookupResponse(scanner, name) + if err != nil { + log.WithField("name", name).WithError(err).Error("Failed to process lookup response") + } else { + log.WithFields(logger.Fields{ + "name": name, + "address": addr.Base32(), + }).Debug("Successfully resolved name") + } return addr, err } diff --git a/common/sam3.go b/common/sam3.go index 0e90ab443..0fcf48215 100644 --- a/common/sam3.go +++ b/common/sam3.go @@ -11,10 +11,15 @@ import ( // This is an internal helper function used during SAM instance initialization. // Returns the established connection or an error if the connection fails. func connectToSAM(address string) (net.Conn, error) { + log.WithField("address", address).Debug("Connecting to SAM bridge") + conn, err := net.Dial("tcp", address) if err != nil { + log.WithField("address", address).WithError(err).Error("Failed to connect to SAM bridge") return nil, oops.Errorf("failed to connect to SAM bridge at %s: %w", address, err) } + + log.WithField("address", address).Debug("Successfully connected to SAM bridge") return conn, nil } @@ -22,24 +27,33 @@ func connectToSAM(address string) (net.Conn, error) { // This internal function sends the HELLO message and ensures the SAM bridge supports the protocol. // Returns an error if the handshake fails or the protocol version is unsupported. func sendHelloAndValidate(conn net.Conn, s *SAM) error { - if _, err := conn.Write(s.SAMEmit.HelloBytes()); err != nil { + helloMessage := s.SAMEmit.HelloBytes() + log.WithField("message", string(helloMessage)).Debug("Sending HELLO message to SAM bridge") + + if _, err := conn.Write(helloMessage); err != nil { + log.WithError(err).Error("Failed to send HELLO message") return oops.Errorf("failed to send hello message: %w", err) } buf := make([]byte, 256) n, err := conn.Read(buf) if err != nil { + log.WithError(err).Error("Failed to read SAM HELLO response") return oops.Errorf("failed to read SAM response: %w", err) } response := string(buf[:n]) + log.WithField("response", response).Debug("Received SAM HELLO response") + switch { case strings.Contains(response, HELLO_REPLY_OK): log.Debug("SAM hello successful") return nil case response == HELLO_REPLY_NOVERSION: + log.Error("SAM bridge does not support SAMv3") return oops.Errorf("SAM bridge does not support SAMv3") default: + log.WithField("response", response).Error("Unexpected SAM response") return oops.Errorf("unexpected SAM response: %s", response) } } diff --git a/datagram/read_from.go b/datagram/read_from.go index 8633b2a27..7a5c92b9b 100644 --- a/datagram/read_from.go +++ b/datagram/read_from.go @@ -1,6 +1,10 @@ package datagram -import "net" +import ( + "net" + + "github.com/go-i2p/logger" +) // ReadFrom reads a datagram message from the session, storing it in p. // It returns the number of bytes read (n), the sender's I2P address (addr), @@ -18,14 +22,31 @@ import "net" // } // fmt.Printf("Received %d bytes from %s\n", n, addr.Base32()) func (ds *DatagramSession) ReadFrom(p []byte) (n int, addr net.Addr, err error) { + log.WithField("session_id", ds.ID()).Debug("ReadFrom: waiting for datagram") + dg, err := ds.ReceiveDatagram() if err != nil { + log.WithField("session_id", ds.ID()).WithError(err).Error("ReadFrom: failed to receive datagram") return 0, addr, err } + + bytesToCopy := len(dg.Data) + truncated := false if len(p) < len(dg.Data) { + bytesToCopy = len(p) + truncated = true copy(p, dg.Data[:len(p)]) } else { copy(p, dg.Data) } + + log.WithFields(logger.Fields{ + "session_id": ds.ID(), + "source": dg.Source.String(), + "bytes_read": bytesToCopy, + "datagram_size": len(dg.Data), + "truncated": truncated, + }).Debug("ReadFrom: datagram received successfully") + return len(dg.Data), dg.Source, nil } diff --git a/datagram/write_to.go b/datagram/write_to.go index 42724d768..e95129a85 100644 --- a/datagram/write_to.go +++ b/datagram/write_to.go @@ -4,6 +4,7 @@ import ( "net" "github.com/go-i2p/i2pkeys" + "github.com/go-i2p/logger" ) // WriteTo sends a datagram message to the specified I2P address. @@ -28,12 +29,23 @@ import ( // } // fmt.Printf("Sent %d bytes to %s\n", n, addr.Base32()) func (ds *DatagramSession) WriteTo(p []byte, addr net.Addr) (n int, err error) { + log.WithFields(logger.Fields{ + "session_id": ds.ID(), + "destination": addr.String(), + "data_size": len(p), + }).Debug("WriteTo: sending datagram") + switch addr := addr.(type) { case *i2pkeys.I2PAddr: // Valid I2P address type, proceed to send datagram case i2pkeys.I2PAddr: // Valid I2P address type, proceed to send datagram default: + log.WithFields(logger.Fields{ + "session_id": ds.ID(), + "addr_type": addr.Network(), + "addr": addr.String(), + }).Error("WriteTo: invalid address type") return 0, &net.OpError{ Op: "write", Net: "i2p-datagram", @@ -41,6 +53,21 @@ func (ds *DatagramSession) WriteTo(p []byte, addr net.Addr) (n int, err error) { Err: &net.AddrError{Err: "invalid address type", Addr: addr.String()}, } } + err = ds.SendDatagram(p, addr.(i2pkeys.I2PAddr)) + if err != nil { + log.WithFields(logger.Fields{ + "session_id": ds.ID(), + "destination": addr.String(), + "data_size": len(p), + }).WithError(err).Error("WriteTo: failed to send datagram") + } else { + log.WithFields(logger.Fields{ + "session_id": ds.ID(), + "destination": addr.String(), + "bytes_sent": len(p), + }).Debug("WriteTo: datagram sent successfully") + } + return len(p), err } diff --git a/stream/conn.go b/stream/conn.go index f8cb175ca..ccb9563dd 100644 --- a/stream/conn.go +++ b/stream/conn.go @@ -4,8 +4,8 @@ import ( "net" "time" - "github.com/samber/oops" "github.com/go-i2p/logger" + "github.com/samber/oops" ) // Read reads data from the connection into the provided buffer. @@ -17,6 +17,10 @@ func (c *StreamConn) Read(b []byte) (int, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + }).Debug("Read attempted on closed connection") return 0, oops.Errorf("connection is closed") } conn := c.conn @@ -25,9 +29,16 @@ func (c *StreamConn) Read(b []byte) (int, error) { n, err := conn.Read(b) if err != nil { log.WithFields(logger.Fields{ - "local": c.laddr.Base32(), - "remote": c.raddr.Base32(), + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "bytes_read": n, }).WithError(err).Debug("Read error") + } else { + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "bytes_read": n, + }).Debug("Read successful") } return n, err } @@ -41,6 +52,10 @@ func (c *StreamConn) Write(b []byte) (int, error) { c.mu.RLock() if c.closed { c.mu.RUnlock() + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + }).Debug("Write attempted on closed connection") return 0, oops.Errorf("connection is closed") } conn := c.conn @@ -49,9 +64,17 @@ func (c *StreamConn) Write(b []byte) (int, error) { n, err := conn.Write(b) if err != nil { log.WithFields(logger.Fields{ - "local": c.laddr.Base32(), - "remote": c.raddr.Base32(), + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "bytes_written": n, + "buffer_size": len(b), }).WithError(err).Debug("Write error") + } else { + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "bytes_written": n, + }).Debug("Write successful") } return n, err } @@ -113,6 +136,12 @@ func (c *StreamConn) RemoteAddr() net.Addr { // connection timeouts for both read and write operations. // Example usage: conn.SetDeadline(time.Now().Add(30*time.Second)) func (c *StreamConn) SetDeadline(t time.Time) error { + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "deadline": t, + }).Debug("Setting connection deadline") + if err := c.SetReadDeadline(t); err != nil { return err } @@ -130,9 +159,19 @@ func (c *StreamConn) SetReadDeadline(t time.Time) error { c.mu.RUnlock() if conn == nil { + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + }).Error("SetReadDeadline called on nil connection") return oops.Errorf("connection is nil") } + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "read_deadline": t, + }).Debug("Setting read deadline") + return conn.SetReadDeadline(t) } @@ -147,8 +186,18 @@ func (c *StreamConn) SetWriteDeadline(t time.Time) error { c.mu.RUnlock() if conn == nil { + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + }).Error("SetWriteDeadline called on nil connection") return oops.Errorf("connection is nil") } + log.WithFields(logger.Fields{ + "local": c.laddr.Base32(), + "remote": c.raddr.Base32(), + "write_deadline": t, + }).Debug("Setting write deadline") + return conn.SetWriteDeadline(t) } diff --git a/stream/dialer.go b/stream/dialer.go index b5203fdb3..718de4e17 100644 --- a/stream/dialer.go +++ b/stream/dialer.go @@ -8,8 +8,8 @@ import ( "github.com/go-i2p/go-sam-go/common" "github.com/go-i2p/i2pkeys" - "github.com/samber/oops" "github.com/go-i2p/logger" + "github.com/samber/oops" ) // Dial establishes a connection to the specified destination using the default context. @@ -36,12 +36,27 @@ func (d *StreamDialer) DialI2P(addr i2pkeys.I2PAddr) (*StreamConn, error) { // default timeout and provides fine-grained control over connection establishment. // Example usage: conn, err := dialer.DialContext(ctx, "destination.b32.i2p") func (d *StreamDialer) DialContext(ctx context.Context, destination string) (*StreamConn, error) { + log.WithFields(logger.Fields{ + "session_id": d.session.ID(), + "destination": destination, + }).Debug("DialContext: starting connection with destination resolution") + // First resolve the destination addr, err := d.session.sam.Lookup(destination) if err != nil { + log.WithFields(logger.Fields{ + "session_id": d.session.ID(), + "destination": destination, + }).WithError(err).Error("DialContext: failed to resolve destination") return nil, oops.Errorf("failed to resolve destination %s: %w", destination, err) } + log.WithFields(logger.Fields{ + "session_id": d.session.ID(), + "destination": destination, + "resolved": addr.Base32(), + }).Debug("DialContext: destination resolved successfully") + return d.DialI2PContext(ctx, addr) } @@ -76,6 +91,7 @@ func (d *StreamDialer) validateSessionState() error { defer d.session.mu.RUnlock() if d.session.closed { + log.WithField("session_id", d.session.ID()).Error("Dial attempted on closed session") return oops.Errorf("session is closed") } return nil @@ -91,11 +107,25 @@ func (d *StreamDialer) logDialAttempt(addr i2pkeys.I2PAddr) { // createSAMConnection creates a new SAM connection for the dial operation. func (d *StreamDialer) createSAMConnection() (*common.SAM, error) { + log.WithFields(logger.Fields{ + "session_id": d.session.ID(), + "sam_address": d.session.sam.Sam(), + }).Debug("Creating SAM connection for dial") + sam, err := common.NewSAM(d.session.sam.Sam()) if err != nil { - log.WithError(err).Error("Failed to create SAM connection") + log.WithFields(logger.Fields{ + "session_id": d.session.ID(), + "sam_address": d.session.sam.Sam(), + }).WithError(err).Error("Failed to create SAM connection") return nil, oops.Errorf("failed to create SAM connection: %w", err) } + + log.WithFields(logger.Fields{ + "session_id": d.session.ID(), + "sam_address": d.session.sam.Sam(), + }).Debug("SAM connection created successfully") + return sam, nil } diff --git a/stream/listener.go b/stream/listener.go index c1f4b2cda..32b4316da 100644 --- a/stream/listener.go +++ b/stream/listener.go @@ -32,16 +32,26 @@ func (l *StreamListener) AcceptStream() (*StreamConn, error) { l.mu.RLock() if l.closed { l.mu.RUnlock() + log.WithField("session_id", l.session.ID()).Debug("AcceptStream called on closed listener") return nil, oops.Errorf("listener is closed") } l.mu.RUnlock() + log.WithField("session_id", l.session.ID()).Debug("Waiting for incoming connection") + select { case conn := <-l.acceptChan: + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "local": conn.LocalAddr().String(), + "remote": conn.RemoteAddr().String(), + }).Debug("Connection accepted from channel") return conn, nil case err := <-l.errorChan: + log.WithField("session_id", l.session.ID()).WithError(err).Error("Accept error received") return nil, err case <-l.closeChan: + log.WithField("session_id", l.session.ID()).Debug("Listener closed while waiting for connection") return nil, oops.Errorf("listener is closed") } } @@ -262,11 +272,25 @@ func (l *StreamListener) createAcceptSocket() (*common.SAM, error) { samAddress = "127.0.0.1:7656" } + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "sam_address": samAddress, + }).Debug("Creating SAM socket for ACCEPT") + sam, err := common.NewSAM(samAddress) if err != nil { + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "sam_address": samAddress, + }).WithError(err).Error("Failed to create SAM connection for ACCEPT") return nil, oops.Errorf("failed to create SAM connection for ACCEPT: %w", err) } + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "sam_address": samAddress, + }).Debug("Successfully created SAM socket for ACCEPT") + return sam, nil } @@ -320,30 +344,46 @@ func (l *StreamListener) parseStreamStatusResponse(sam *common.SAM, logger *logg // readDestinationLine waits for and reads the destination line when a connection arrives. // Format: "$destination FROM_PORT=nnn TO_PORT=nnn\n" (SAM 3.2+) or "$destination\n" (SAM 3.0/3.1) func (l *StreamListener) readDestinationLine(sam *common.SAM, logger *logger.Entry) (string, error) { + logger.Debug("Waiting for destination line from SAM bridge") + destBuf := make([]byte, 4096) n, err := sam.Read(destBuf) if err != nil { + logger.WithError(err).Error("Failed to read destination line") return "", oops.Errorf("failed to read destination: %w", err) } destLine := string(destBuf[:n]) - logger.WithField("destLine", destLine).Debug("Received destination") + logger.WithField("destLine", destLine).WithField("bytes_read", n).Debug("Received destination line") // Parse destination line // Format: "$destination FROM_PORT=nnn TO_PORT=nnn\n" (SAM 3.2+) // Or just: "$destination\n" (SAM 3.0/3.1) parts := strings.Fields(destLine) if len(parts) == 0 { + logger.Error("Empty destination line received") return "", oops.Errorf("empty destination line") } - return parts[0], nil + dest := parts[0] + logger.WithField("destination", dest).Debug("Parsed destination address") + + return dest, nil } // createStreamConnectionWithSocket creates a new StreamConn using the provided accept socket. func (l *StreamListener) createStreamConnectionWithSocket(dest string, sam *common.SAM) (*StreamConn, error) { + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "destination": dest, + }).Debug("Creating StreamConn from accepted connection") + remoteAddr, err := i2pkeys.NewI2PAddrFromString(dest) if err != nil { + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "destination": dest, + }).WithError(err).Error("Failed to parse remote address") return nil, oops.Errorf("failed to parse remote address: %w", err) } @@ -355,6 +395,12 @@ func (l *StreamListener) createStreamConnectionWithSocket(dest string, sam *comm raddr: remoteAddr, } + log.WithFields(logger.Fields{ + "session_id": l.session.ID(), + "local": streamConn.laddr.Base32(), + "remote": streamConn.raddr.Base32(), + }).Debug("Successfully created StreamConn") + return streamConn, nil }