From 82eeba6275791ca1eef621d32738b163da52e6d9 Mon Sep 17 00:00:00 2001 From: eyedeekay Date: Sun, 24 Nov 2024 19:11:04 -0500 Subject: [PATCH] Start abstracting away UDP session management --- common/udp.go | 140 +++++++++++++++++++++++++++++ datagram.go | 197 ++++++++++++++++++++--------------------- primary.go | 68 +++++++------- primary_stream_test.go | 2 +- 4 files changed, 271 insertions(+), 136 deletions(-) create mode 100644 common/udp.go diff --git a/common/udp.go b/common/udp.go new file mode 100644 index 0000000..71903b0 --- /dev/null +++ b/common/udp.go @@ -0,0 +1,140 @@ +package common + +import ( + "errors" + "fmt" + "net" + "strconv" + "time" + + "github.com/go-i2p/logger" +) + +// Package common provides shared UDP common functionality for SAM sessions +// +// It handles: +// - UDP port validation and defaults +// - Address resolution +// - Connection setup +// - Logging +// +// Example Usage: +// +// cfg := &UDPSessionConfig{ +// Port: 7655, +// ParentConn: samConn, +// Log: logger, +// } +// +// session, err := NewUDPSession(cfg) +// if err != nil { +// // Handle error +// } +// defer session.Close() + +// UDPSessionConfig holds all UDP session configuration +type UDPSessionConfig struct { + Port int + ParentConn net.Conn + Log *logger.Logger + DefaultPort int + AllowZeroPort bool + Style string + FromPort string + ToPort string + ReadTimeout time.Duration + WriteTimeout time.Duration +} + +// UDPSession represents an established UDP session +type UDPSession struct { + LocalAddr *net.UDPAddr + RemoteAddr *net.UDPAddr + Conn *net.UDPConn +} + +func (u *UDPSession) SetReadTimeout(timeout time.Duration) error { + if u.Conn != nil { + return u.Conn.SetReadDeadline(time.Now().Add(timeout)) + } + return nil +} + +func (u *UDPSession) SetWriteTimeout(timeout time.Duration) error { + if u.Conn != nil { + return u.Conn.SetWriteDeadline(time.Now().Add(timeout)) + } + return nil +} + +func (u UDPSession) LocalPort() int { + return u.LocalAddr.Port +} + +func (u UDPSession) Close() { + u.Conn.Close() +} + +// NewUDPSession creates and configures a new UDP session +func NewUDPSession(cfg *UDPSessionConfig) (*UDPSession, error) { + if err := validatePort(cfg.Port, cfg.AllowZeroPort); err != nil { + cfg.Log.WithError(err).Error("Invalid UDP port configuration") + return nil, err + } + + port := cfg.Port + if port == 0 { + port = cfg.DefaultPort + cfg.Log.WithField("port", port).Debug("Using default UDP port") + } + + laddr, raddr, err := resolveAddresses(cfg.ParentConn, port) + if err != nil { + return nil, fmt.Errorf("address resolution failed: %w", err) + } + + conn, err := net.ListenUDP("udp4", laddr) + if err != nil { + return nil, fmt.Errorf("UDP listen failed: %w", err) + } + + return &UDPSession{ + LocalAddr: laddr, + RemoteAddr: raddr, + Conn: conn, + }, nil +} + +func validatePort(port int, allowZero bool) error { + if port < 0 || port > 65535 { + return errors.New("port must be between 0-65535") + } + if port == 0 && !allowZero { + return errors.New("port 0 not allowed in this context") + } + return nil +} + +func resolveAddresses(parent net.Conn, remotePort int) (*net.UDPAddr, *net.UDPAddr, error) { + lhost, _, err := net.SplitHostPort(parent.LocalAddr().String()) + if err != nil { + return nil, nil, err + } + + laddr, err := net.ResolveUDPAddr("udp4", lhost+":0") + if err != nil { + return nil, nil, err + } + + rhost, _, err := net.SplitHostPort(parent.RemoteAddr().String()) + if err != nil { + return nil, nil, err + } + + raddr, err := net.ResolveUDPAddr("udp4", rhost+":"+strconv.Itoa(remotePort)) + if err != nil { + return nil, nil, err + } + + return laddr, raddr, nil +} \ No newline at end of file diff --git a/datagram.go b/datagram.go index 8412c71..96aa7ff 100644 --- a/datagram.go +++ b/datagram.go @@ -5,13 +5,13 @@ import ( "errors" "fmt" "net" - "strconv" "sync" "time" "github.com/sirupsen/logrus" "github.com/go-i2p/i2pkeys" + "github.com/go-i2p/sam3/common" ) // The DatagramSession implements net.PacketConn. It works almost like ordinary @@ -22,10 +22,9 @@ type DatagramSession struct { samAddr string // address to the sam bridge (ipv4:port) id string // tunnel name conn net.Conn // connection to sam bridge - udpconn *net.UDPConn // used to deliver datagrams keys i2pkeys.I2PKeys // i2p destination keys - rUDPAddr *net.UDPAddr // the SAM bridge UDP-port remoteAddr *i2pkeys.I2PAddr // optional remote I2P address + common.UDPSession *DatagramOptions } @@ -36,43 +35,25 @@ func (s *SAM) NewDatagramSession(id string, keys i2pkeys.I2PKeys, options []stri "id": id, "udpPort": udpPort, }).Debug("Creating new DatagramSession") - - if udpPort > 65335 || udpPort < 0 { - log.WithField("udpPort", udpPort).Error("Invalid UDP port") - return nil, errors.New("udpPort needs to be in the intervall 0-65335") + udpSessionConfig := &common.UDPSessionConfig{ + Port: udpPort, + ParentConn: s.conn, + Log: log, + DefaultPort: 7655, + AllowZeroPort: true, + // Add required session parameters + Style: "DATAGRAM", + FromPort: "0", // Allow dynamic port assignment + ToPort: "0", + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, } - if udpPort == 0 { - udpPort = 7655 - log.Debug("Using default UDP port 7655") - } - lhost, _, err := SplitHostPort(s.conn.LocalAddr().String()) + udpconn, err := common.NewUDPSession(udpSessionConfig) if err != nil { - log.WithError(err).Error("Failed to split local host port") - s.Close() + log.WithError(err).Error("Failed to create UDP session") return nil, err } - lUDPAddr, err := net.ResolveUDPAddr("udp4", lhost+":0") - if err != nil { - log.WithError(err).Error("Failed to resolve local UDP address") - return nil, err - } - udpconn, err := net.ListenUDP("udp4", lUDPAddr) - if err != nil { - log.WithError(err).Error("Failed to listen on UDP") - return nil, err - } - rhost, _, err := SplitHostPort(s.conn.RemoteAddr().String()) - if err != nil { - log.WithError(err).Error("Failed to split remote host port") - s.Close() - return nil, err - } - rUDPAddr, err := net.ResolveUDPAddr("udp4", rhost+":"+strconv.Itoa(udpPort)) - if err != nil { - log.WithError(err).Error("Failed to resolve remote UDP address") - return nil, err - } - _, lport, err := net.SplitHostPort(udpconn.LocalAddr().String()) + _, lport, err := net.SplitHostPort(udpconn.Conn.LocalAddr().String()) if err != nil { log.WithError(err).Error("Failed to get local port") s.Close() @@ -84,10 +65,24 @@ func (s *SAM) NewDatagramSession(id string, keys i2pkeys.I2PKeys, options []stri return nil, err } if len(datagramOptions) > 0 { - return &DatagramSession{s.address, id, conn, udpconn, keys, rUDPAddr, nil, &datagramOptions[0]}, nil + return &DatagramSession{ + samAddr: s.address, + id: id, + conn: conn, + keys: keys, + UDPSession: *udpconn, + DatagramOptions: &datagramOptions[0], + }, nil } log.WithField("id", id).Info("DatagramSession created successfully") - return &DatagramSession{s.address, id, conn, udpconn, keys, rUDPAddr, nil, nil}, nil + // return &DatagramSession{s.address, id, conn, udpconn, keys, rUDPAddr, nil, nil}, nil + return &DatagramSession{ + samAddr: s.address, + id: id, + conn: conn, + keys: keys, + UDPSession: *udpconn, + }, nil } func (s *DatagramSession) B32() string { @@ -159,12 +154,12 @@ func (s *DatagramSession) ReadFrom(b []byte) (n int, addr net.Addr, err error) { for { // very basic protection: only accept incomming UDP messages from the IP of the SAM bridge var saddr *net.UDPAddr - n, saddr, err = s.udpconn.ReadFromUDP(buf) + n, saddr, err = s.UDPSession.Conn.ReadFromUDP(buf) if err != nil { log.WithError(err).Error("Failed to read from UDP") return 0, i2pkeys.I2PAddr(""), err } - if bytes.Equal(saddr.IP, s.rUDPAddr.IP) { + if bytes.Equal(saddr.IP, s.UDPSession.RemoteAddr.IP) { continue } break @@ -210,75 +205,75 @@ const ( // writing, maximum size is 31 kilobyte, but this may change in the future. // Implements net.PacketConn. func (s *DatagramSession) WriteTo(b []byte, addr net.Addr) (n int, err error) { - log.WithFields(logrus.Fields{ - "addr": addr, - "datagramLen": len(b), - }).Debug("Writing datagram") + log.WithFields(logrus.Fields{ + "addr": addr, + "datagramLen": len(b), + }).Debug("Writing datagram") - if len(b) > MAX_DATAGRAM_SIZE { - return 0, errors.New("datagram exceeds maximum size") - } + if len(b) > MAX_DATAGRAM_SIZE { + return 0, errors.New("datagram exceeds maximum size") + } - // Use chunking for anything above recommended size - if len(b) > RECOMMENDED_SIZE { - return s.writeChunked(b, addr) - } + // Use chunking for anything above recommended size + if len(b) > RECOMMENDED_SIZE { + return s.writeChunked(b, addr) + } - // Single message path - if s.DatagramOptions != nil { - return s.writeToWithOptions(b, addr.(i2pkeys.I2PAddr)) - } + // Single message path + if s.DatagramOptions != nil { + return s.writeToWithOptions(b, addr.(i2pkeys.I2PAddr)) + } - header := []byte("3.1 " + s.id + " " + addr.String() + "\n") - msg := append(header, b...) - n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr) - if err != nil { - log.WithError(err).Error("Failed to write to UDP") - } else { - log.WithField("bytesWritten", n).Debug("Datagram written successfully") - } - return n, err + header := []byte("3.1 " + s.id + " " + addr.String() + "\n") + msg := append(header, b...) + n, err = s.UDPSession.Conn.WriteToUDP(msg, s.UDPSession.RemoteAddr) + if err != nil { + log.WithError(err).Error("Failed to write to UDP") + } else { + log.WithField("bytesWritten", n).Debug("Datagram written successfully") + } + return n, err } func (s *DatagramSession) writeChunked(b []byte, addr net.Addr) (total int, err error) { - chunkSize := RECOMMENDED_SIZE - 256 // Allow for header overhead - chunks := (len(b) + chunkSize - 1) / chunkSize + chunkSize := RECOMMENDED_SIZE - 256 // Allow for header overhead + chunks := (len(b) + chunkSize - 1) / chunkSize - log.WithFields(logrus.Fields{ - "totalSize": len(b), - "chunks": chunks, - }).Debug("Splitting datagram into chunks") + log.WithFields(logrus.Fields{ + "totalSize": len(b), + "chunks": chunks, + }).Debug("Splitting datagram into chunks") - for i := 0; i < chunks; i++ { - start := i * chunkSize - end := start + chunkSize - if end > len(b) { - end = len(b) - } + for i := 0; i < chunks; i++ { + start := i * chunkSize + end := start + chunkSize + if end > len(b) { + end = len(b) + } - chunk := b[start:end] - var n int + chunk := b[start:end] + var n int - // Single write path that handles both cases - if s.DatagramOptions != nil { - n, err = s.writeToWithOptions(chunk, addr.(i2pkeys.I2PAddr)) - } else { - header := []byte("3.1 " + s.id + " " + addr.String() + "\n") - msg := append(header, chunk...) - n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr) - } + // Single write path that handles both cases + if s.DatagramOptions != nil { + n, err = s.writeToWithOptions(chunk, addr.(i2pkeys.I2PAddr)) + } else { + header := []byte("3.1 " + s.id + " " + addr.String() + "\n") + msg := append(header, chunk...) + n, err = s.UDPSession.Conn.WriteToUDP(msg, s.UDPSession.RemoteAddr) + } - if err != nil { - return total, fmt.Errorf("chunk %d/%d failed: %w", i+1, chunks, err) - } - total += n + if err != nil { + return total, fmt.Errorf("chunk %d/%d failed: %w", i+1, chunks, err) + } + total += n - if i < chunks-1 { - time.Sleep(50 * time.Millisecond) - } - } + if i < chunks-1 { + time.Sleep(50 * time.Millisecond) + } + } - return total, nil + return total, nil } type DatagramOptions struct { @@ -307,7 +302,7 @@ func (s *DatagramSession) writeToWithOptions(b []byte, addr i2pkeys.I2PAddr) (n header.WriteString("\n") msg := append(header.Bytes(), b...) - return s.udpconn.WriteToUDP(msg, s.rUDPAddr) + return s.UDPSession.Conn.WriteToUDP(msg, s.UDPSession.RemoteAddr) } func (s *DatagramSession) Write(b []byte) (int, error) { @@ -319,7 +314,7 @@ func (s *DatagramSession) Write(b []byte) (int, error) { func (s *DatagramSession) Close() error { log.Debug("Closing DatagramSession") err := s.conn.Close() - err2 := s.udpconn.Close() + err2 := s.UDPSession.Conn.Close() if err != nil { log.WithError(err).Error("Failed to close connection") return err @@ -363,22 +358,22 @@ func (s *DatagramSession) Lookup(name string) (a net.Addr, err error) { // is seldom done. func (s *DatagramSession) SetDeadline(t time.Time) error { log.WithField("deadline", t).Debug("Setting deadline") - return s.udpconn.SetDeadline(t) + return s.UDPSession.Conn.SetDeadline(t) } // Sets read deadline for the DatagramSession. Implements net.PacketConn func (s *DatagramSession) SetReadDeadline(t time.Time) error { log.WithField("readDeadline", t).Debug("Setting read deadline") - return s.udpconn.SetReadDeadline(t) + return s.UDPSession.Conn.SetReadDeadline(t) } // Sets the write deadline for the DatagramSession. Implements net.Packetconn. func (s *DatagramSession) SetWriteDeadline(t time.Time) error { log.WithField("writeDeadline", t).Debug("Setting write deadline") - return s.udpconn.SetWriteDeadline(t) + return s.UDPSession.Conn.SetWriteDeadline(t) } func (s *DatagramSession) SetWriteBuffer(bytes int) error { log.WithField("bytes", bytes).Debug("Setting write buffer") - return s.udpconn.SetWriteBuffer(bytes) + return s.UDPSession.Conn.SetWriteBuffer(bytes) } diff --git a/primary.go b/primary.go index 267428b..4d17a52 100644 --- a/primary.go +++ b/primary.go @@ -13,6 +13,7 @@ import ( "github.com/sirupsen/logrus" "github.com/go-i2p/i2pkeys" + "github.com/go-i2p/sam3/common" ) const ( @@ -413,42 +414,25 @@ func (s *PrimarySession) I2PListener(name string) (*StreamListener, error) { // and if you set it to zero, it will use SAMs standard UDP port. func (s *PrimarySession) NewDatagramSubSession(id string, udpPort int, datagramOptions ...DatagramOptions) (*DatagramSession, error) { log.WithFields(logrus.Fields{"id": id, "udpPort": udpPort}).Debug("NewDatagramSubSession called") - if udpPort > 65335 || udpPort < 0 { - log.WithField("udpPort", udpPort).Error("Invalid UDP port") - return nil, errors.New("udpPort needs to be in the intervall 0-65335") + udpSessionConfig := &common.UDPSessionConfig{ + Port: udpPort, + ParentConn: s.conn, + Log: log, + DefaultPort: 7655, + AllowZeroPort: true, + // Add required session parameters + Style: "DATAGRAM", + FromPort: "0", // Allow dynamic port assignment + ToPort: "0", + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, } - if udpPort == 0 { - udpPort = 7655 - log.Debug("Using default UDP port 7655") - } - lhost, _, err := SplitHostPort(s.conn.LocalAddr().String()) + udpConn, err := common.NewUDPSession(udpSessionConfig) if err != nil { - log.WithError(err).Error("Failed to split local host port") - s.Close() + log.WithError(err).Error("Failed to create UDP session") return nil, err } - lUDPAddr, err := net.ResolveUDPAddr("udp4", lhost+":0") - if err != nil { - log.WithError(err).Error("Failed to resolve local UDP address") - return nil, err - } - udpconn, err := net.ListenUDP("udp4", lUDPAddr) - if err != nil { - log.WithError(err).Error("Failed to listen on UDP") - return nil, err - } - rhost, _, err := SplitHostPort(s.conn.RemoteAddr().String()) - if err != nil { - log.WithError(err).Error("Failed to split remote host port") - s.Close() - return nil, err - } - rUDPAddr, err := net.ResolveUDPAddr("udp4", rhost+":"+strconv.Itoa(udpPort)) - if err != nil { - log.WithError(err).Error("Failed to resolve remote UDP address") - return nil, err - } - _, lport, err := net.SplitHostPort(udpconn.LocalAddr().String()) + _, lport, err := net.SplitHostPort(udpConn.Conn.LocalAddr().String()) if err != nil { log.WithError(err).Error("Failed to get local port") s.Close() @@ -460,7 +444,15 @@ func (s *PrimarySession) NewDatagramSubSession(id string, udpPort int, datagramO return nil, err } if len(datagramOptions) > 0 { - return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, &datagramOptions[0]}, nil + // return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, &datagramOptions[0]}, nil + return &DatagramSession{ + samAddr: s.Config.I2PConfig.Sam(), + id: id, + conn: conn, + keys: s.keys, + UDPSession: *udpConn, + DatagramOptions: &datagramOptions[0], + }, nil } opts := &DatagramOptions{ SendTags: 0, @@ -469,7 +461,15 @@ func (s *PrimarySession) NewDatagramSubSession(id string, udpPort int, datagramO SendLeaseset: false, } log.WithFields(logrus.Fields{"id": id, "localPort": lport}).Debug("Created new datagram sub-session") - return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, opts}, nil + // return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, opts}, nil + return &DatagramSession{ + samAddr: s.Config.I2PConfig.Sam(), + id: id, + conn: conn, + keys: s.keys, + UDPSession: *udpConn, + DatagramOptions: opts, + }, nil } // Creates a new raw session. udpPort is the UDP port SAM is listening on, diff --git a/primary_stream_test.go b/primary_stream_test.go index c4fc350..da30da6 100644 --- a/primary_stream_test.go +++ b/primary_stream_test.go @@ -42,7 +42,7 @@ func Test_PrimaryStreamingDial(t *testing.T) { defer ss.Close() fmt.Println("\tNotice: This may fail if your I2P node is not well integrated in the I2P network.") fmt.Println("\tLooking up idk.i2p") - forumAddr, err := earlysam.Lookup("idk.i2p") + forumAddr, err := ss.Lookup("idk.i2p") if err != nil { fmt.Println(err.Error()) t.Fail()