Start abstracting away UDP session management

This commit is contained in:
eyedeekay
2024-11-24 19:11:04 -05:00
parent eea1f916cd
commit 82eeba6275
4 changed files with 271 additions and 136 deletions

140
common/udp.go Normal file
View File

@ -0,0 +1,140 @@
package common
import (
"errors"
"fmt"
"net"
"strconv"
"time"
"github.com/go-i2p/logger"
)
// Package common provides shared UDP common functionality for SAM sessions
//
// It handles:
// - UDP port validation and defaults
// - Address resolution
// - Connection setup
// - Logging
//
// Example Usage:
//
// cfg := &UDPSessionConfig{
// Port: 7655,
// ParentConn: samConn,
// Log: logger,
// }
//
// session, err := NewUDPSession(cfg)
// if err != nil {
// // Handle error
// }
// defer session.Close()
// UDPSessionConfig holds all UDP session configuration
type UDPSessionConfig struct {
Port int
ParentConn net.Conn
Log *logger.Logger
DefaultPort int
AllowZeroPort bool
Style string
FromPort string
ToPort string
ReadTimeout time.Duration
WriteTimeout time.Duration
}
// UDPSession represents an established UDP session
type UDPSession struct {
LocalAddr *net.UDPAddr
RemoteAddr *net.UDPAddr
Conn *net.UDPConn
}
func (u *UDPSession) SetReadTimeout(timeout time.Duration) error {
if u.Conn != nil {
return u.Conn.SetReadDeadline(time.Now().Add(timeout))
}
return nil
}
func (u *UDPSession) SetWriteTimeout(timeout time.Duration) error {
if u.Conn != nil {
return u.Conn.SetWriteDeadline(time.Now().Add(timeout))
}
return nil
}
func (u UDPSession) LocalPort() int {
return u.LocalAddr.Port
}
func (u UDPSession) Close() {
u.Conn.Close()
}
// NewUDPSession creates and configures a new UDP session
func NewUDPSession(cfg *UDPSessionConfig) (*UDPSession, error) {
if err := validatePort(cfg.Port, cfg.AllowZeroPort); err != nil {
cfg.Log.WithError(err).Error("Invalid UDP port configuration")
return nil, err
}
port := cfg.Port
if port == 0 {
port = cfg.DefaultPort
cfg.Log.WithField("port", port).Debug("Using default UDP port")
}
laddr, raddr, err := resolveAddresses(cfg.ParentConn, port)
if err != nil {
return nil, fmt.Errorf("address resolution failed: %w", err)
}
conn, err := net.ListenUDP("udp4", laddr)
if err != nil {
return nil, fmt.Errorf("UDP listen failed: %w", err)
}
return &UDPSession{
LocalAddr: laddr,
RemoteAddr: raddr,
Conn: conn,
}, nil
}
func validatePort(port int, allowZero bool) error {
if port < 0 || port > 65535 {
return errors.New("port must be between 0-65535")
}
if port == 0 && !allowZero {
return errors.New("port 0 not allowed in this context")
}
return nil
}
func resolveAddresses(parent net.Conn, remotePort int) (*net.UDPAddr, *net.UDPAddr, error) {
lhost, _, err := net.SplitHostPort(parent.LocalAddr().String())
if err != nil {
return nil, nil, err
}
laddr, err := net.ResolveUDPAddr("udp4", lhost+":0")
if err != nil {
return nil, nil, err
}
rhost, _, err := net.SplitHostPort(parent.RemoteAddr().String())
if err != nil {
return nil, nil, err
}
raddr, err := net.ResolveUDPAddr("udp4", rhost+":"+strconv.Itoa(remotePort))
if err != nil {
return nil, nil, err
}
return laddr, raddr, nil
}

View File

@ -5,13 +5,13 @@ import (
"errors"
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/go-i2p/i2pkeys"
"github.com/go-i2p/sam3/common"
)
// The DatagramSession implements net.PacketConn. It works almost like ordinary
@ -22,10 +22,9 @@ type DatagramSession struct {
samAddr string // address to the sam bridge (ipv4:port)
id string // tunnel name
conn net.Conn // connection to sam bridge
udpconn *net.UDPConn // used to deliver datagrams
keys i2pkeys.I2PKeys // i2p destination keys
rUDPAddr *net.UDPAddr // the SAM bridge UDP-port
remoteAddr *i2pkeys.I2PAddr // optional remote I2P address
common.UDPSession
*DatagramOptions
}
@ -36,43 +35,25 @@ func (s *SAM) NewDatagramSession(id string, keys i2pkeys.I2PKeys, options []stri
"id": id,
"udpPort": udpPort,
}).Debug("Creating new DatagramSession")
if udpPort > 65335 || udpPort < 0 {
log.WithField("udpPort", udpPort).Error("Invalid UDP port")
return nil, errors.New("udpPort needs to be in the intervall 0-65335")
udpSessionConfig := &common.UDPSessionConfig{
Port: udpPort,
ParentConn: s.conn,
Log: log,
DefaultPort: 7655,
AllowZeroPort: true,
// Add required session parameters
Style: "DATAGRAM",
FromPort: "0", // Allow dynamic port assignment
ToPort: "0",
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
if udpPort == 0 {
udpPort = 7655
log.Debug("Using default UDP port 7655")
}
lhost, _, err := SplitHostPort(s.conn.LocalAddr().String())
udpconn, err := common.NewUDPSession(udpSessionConfig)
if err != nil {
log.WithError(err).Error("Failed to split local host port")
s.Close()
log.WithError(err).Error("Failed to create UDP session")
return nil, err
}
lUDPAddr, err := net.ResolveUDPAddr("udp4", lhost+":0")
if err != nil {
log.WithError(err).Error("Failed to resolve local UDP address")
return nil, err
}
udpconn, err := net.ListenUDP("udp4", lUDPAddr)
if err != nil {
log.WithError(err).Error("Failed to listen on UDP")
return nil, err
}
rhost, _, err := SplitHostPort(s.conn.RemoteAddr().String())
if err != nil {
log.WithError(err).Error("Failed to split remote host port")
s.Close()
return nil, err
}
rUDPAddr, err := net.ResolveUDPAddr("udp4", rhost+":"+strconv.Itoa(udpPort))
if err != nil {
log.WithError(err).Error("Failed to resolve remote UDP address")
return nil, err
}
_, lport, err := net.SplitHostPort(udpconn.LocalAddr().String())
_, lport, err := net.SplitHostPort(udpconn.Conn.LocalAddr().String())
if err != nil {
log.WithError(err).Error("Failed to get local port")
s.Close()
@ -84,10 +65,24 @@ func (s *SAM) NewDatagramSession(id string, keys i2pkeys.I2PKeys, options []stri
return nil, err
}
if len(datagramOptions) > 0 {
return &DatagramSession{s.address, id, conn, udpconn, keys, rUDPAddr, nil, &datagramOptions[0]}, nil
return &DatagramSession{
samAddr: s.address,
id: id,
conn: conn,
keys: keys,
UDPSession: *udpconn,
DatagramOptions: &datagramOptions[0],
}, nil
}
log.WithField("id", id).Info("DatagramSession created successfully")
return &DatagramSession{s.address, id, conn, udpconn, keys, rUDPAddr, nil, nil}, nil
// return &DatagramSession{s.address, id, conn, udpconn, keys, rUDPAddr, nil, nil}, nil
return &DatagramSession{
samAddr: s.address,
id: id,
conn: conn,
keys: keys,
UDPSession: *udpconn,
}, nil
}
func (s *DatagramSession) B32() string {
@ -159,12 +154,12 @@ func (s *DatagramSession) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
for {
// very basic protection: only accept incomming UDP messages from the IP of the SAM bridge
var saddr *net.UDPAddr
n, saddr, err = s.udpconn.ReadFromUDP(buf)
n, saddr, err = s.UDPSession.Conn.ReadFromUDP(buf)
if err != nil {
log.WithError(err).Error("Failed to read from UDP")
return 0, i2pkeys.I2PAddr(""), err
}
if bytes.Equal(saddr.IP, s.rUDPAddr.IP) {
if bytes.Equal(saddr.IP, s.UDPSession.RemoteAddr.IP) {
continue
}
break
@ -210,75 +205,75 @@ const (
// writing, maximum size is 31 kilobyte, but this may change in the future.
// Implements net.PacketConn.
func (s *DatagramSession) WriteTo(b []byte, addr net.Addr) (n int, err error) {
log.WithFields(logrus.Fields{
"addr": addr,
"datagramLen": len(b),
}).Debug("Writing datagram")
log.WithFields(logrus.Fields{
"addr": addr,
"datagramLen": len(b),
}).Debug("Writing datagram")
if len(b) > MAX_DATAGRAM_SIZE {
return 0, errors.New("datagram exceeds maximum size")
}
if len(b) > MAX_DATAGRAM_SIZE {
return 0, errors.New("datagram exceeds maximum size")
}
// Use chunking for anything above recommended size
if len(b) > RECOMMENDED_SIZE {
return s.writeChunked(b, addr)
}
// Use chunking for anything above recommended size
if len(b) > RECOMMENDED_SIZE {
return s.writeChunked(b, addr)
}
// Single message path
if s.DatagramOptions != nil {
return s.writeToWithOptions(b, addr.(i2pkeys.I2PAddr))
}
// Single message path
if s.DatagramOptions != nil {
return s.writeToWithOptions(b, addr.(i2pkeys.I2PAddr))
}
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, b...)
n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr)
if err != nil {
log.WithError(err).Error("Failed to write to UDP")
} else {
log.WithField("bytesWritten", n).Debug("Datagram written successfully")
}
return n, err
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, b...)
n, err = s.UDPSession.Conn.WriteToUDP(msg, s.UDPSession.RemoteAddr)
if err != nil {
log.WithError(err).Error("Failed to write to UDP")
} else {
log.WithField("bytesWritten", n).Debug("Datagram written successfully")
}
return n, err
}
func (s *DatagramSession) writeChunked(b []byte, addr net.Addr) (total int, err error) {
chunkSize := RECOMMENDED_SIZE - 256 // Allow for header overhead
chunks := (len(b) + chunkSize - 1) / chunkSize
chunkSize := RECOMMENDED_SIZE - 256 // Allow for header overhead
chunks := (len(b) + chunkSize - 1) / chunkSize
log.WithFields(logrus.Fields{
"totalSize": len(b),
"chunks": chunks,
}).Debug("Splitting datagram into chunks")
log.WithFields(logrus.Fields{
"totalSize": len(b),
"chunks": chunks,
}).Debug("Splitting datagram into chunks")
for i := 0; i < chunks; i++ {
start := i * chunkSize
end := start + chunkSize
if end > len(b) {
end = len(b)
}
for i := 0; i < chunks; i++ {
start := i * chunkSize
end := start + chunkSize
if end > len(b) {
end = len(b)
}
chunk := b[start:end]
var n int
chunk := b[start:end]
var n int
// Single write path that handles both cases
if s.DatagramOptions != nil {
n, err = s.writeToWithOptions(chunk, addr.(i2pkeys.I2PAddr))
} else {
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, chunk...)
n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr)
}
// Single write path that handles both cases
if s.DatagramOptions != nil {
n, err = s.writeToWithOptions(chunk, addr.(i2pkeys.I2PAddr))
} else {
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, chunk...)
n, err = s.UDPSession.Conn.WriteToUDP(msg, s.UDPSession.RemoteAddr)
}
if err != nil {
return total, fmt.Errorf("chunk %d/%d failed: %w", i+1, chunks, err)
}
total += n
if err != nil {
return total, fmt.Errorf("chunk %d/%d failed: %w", i+1, chunks, err)
}
total += n
if i < chunks-1 {
time.Sleep(50 * time.Millisecond)
}
}
if i < chunks-1 {
time.Sleep(50 * time.Millisecond)
}
}
return total, nil
return total, nil
}
type DatagramOptions struct {
@ -307,7 +302,7 @@ func (s *DatagramSession) writeToWithOptions(b []byte, addr i2pkeys.I2PAddr) (n
header.WriteString("\n")
msg := append(header.Bytes(), b...)
return s.udpconn.WriteToUDP(msg, s.rUDPAddr)
return s.UDPSession.Conn.WriteToUDP(msg, s.UDPSession.RemoteAddr)
}
func (s *DatagramSession) Write(b []byte) (int, error) {
@ -319,7 +314,7 @@ func (s *DatagramSession) Write(b []byte) (int, error) {
func (s *DatagramSession) Close() error {
log.Debug("Closing DatagramSession")
err := s.conn.Close()
err2 := s.udpconn.Close()
err2 := s.UDPSession.Conn.Close()
if err != nil {
log.WithError(err).Error("Failed to close connection")
return err
@ -363,22 +358,22 @@ func (s *DatagramSession) Lookup(name string) (a net.Addr, err error) {
// is seldom done.
func (s *DatagramSession) SetDeadline(t time.Time) error {
log.WithField("deadline", t).Debug("Setting deadline")
return s.udpconn.SetDeadline(t)
return s.UDPSession.Conn.SetDeadline(t)
}
// Sets read deadline for the DatagramSession. Implements net.PacketConn
func (s *DatagramSession) SetReadDeadline(t time.Time) error {
log.WithField("readDeadline", t).Debug("Setting read deadline")
return s.udpconn.SetReadDeadline(t)
return s.UDPSession.Conn.SetReadDeadline(t)
}
// Sets the write deadline for the DatagramSession. Implements net.Packetconn.
func (s *DatagramSession) SetWriteDeadline(t time.Time) error {
log.WithField("writeDeadline", t).Debug("Setting write deadline")
return s.udpconn.SetWriteDeadline(t)
return s.UDPSession.Conn.SetWriteDeadline(t)
}
func (s *DatagramSession) SetWriteBuffer(bytes int) error {
log.WithField("bytes", bytes).Debug("Setting write buffer")
return s.udpconn.SetWriteBuffer(bytes)
return s.UDPSession.Conn.SetWriteBuffer(bytes)
}

View File

@ -13,6 +13,7 @@ import (
"github.com/sirupsen/logrus"
"github.com/go-i2p/i2pkeys"
"github.com/go-i2p/sam3/common"
)
const (
@ -413,42 +414,25 @@ func (s *PrimarySession) I2PListener(name string) (*StreamListener, error) {
// and if you set it to zero, it will use SAMs standard UDP port.
func (s *PrimarySession) NewDatagramSubSession(id string, udpPort int, datagramOptions ...DatagramOptions) (*DatagramSession, error) {
log.WithFields(logrus.Fields{"id": id, "udpPort": udpPort}).Debug("NewDatagramSubSession called")
if udpPort > 65335 || udpPort < 0 {
log.WithField("udpPort", udpPort).Error("Invalid UDP port")
return nil, errors.New("udpPort needs to be in the intervall 0-65335")
udpSessionConfig := &common.UDPSessionConfig{
Port: udpPort,
ParentConn: s.conn,
Log: log,
DefaultPort: 7655,
AllowZeroPort: true,
// Add required session parameters
Style: "DATAGRAM",
FromPort: "0", // Allow dynamic port assignment
ToPort: "0",
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
}
if udpPort == 0 {
udpPort = 7655
log.Debug("Using default UDP port 7655")
}
lhost, _, err := SplitHostPort(s.conn.LocalAddr().String())
udpConn, err := common.NewUDPSession(udpSessionConfig)
if err != nil {
log.WithError(err).Error("Failed to split local host port")
s.Close()
log.WithError(err).Error("Failed to create UDP session")
return nil, err
}
lUDPAddr, err := net.ResolveUDPAddr("udp4", lhost+":0")
if err != nil {
log.WithError(err).Error("Failed to resolve local UDP address")
return nil, err
}
udpconn, err := net.ListenUDP("udp4", lUDPAddr)
if err != nil {
log.WithError(err).Error("Failed to listen on UDP")
return nil, err
}
rhost, _, err := SplitHostPort(s.conn.RemoteAddr().String())
if err != nil {
log.WithError(err).Error("Failed to split remote host port")
s.Close()
return nil, err
}
rUDPAddr, err := net.ResolveUDPAddr("udp4", rhost+":"+strconv.Itoa(udpPort))
if err != nil {
log.WithError(err).Error("Failed to resolve remote UDP address")
return nil, err
}
_, lport, err := net.SplitHostPort(udpconn.LocalAddr().String())
_, lport, err := net.SplitHostPort(udpConn.Conn.LocalAddr().String())
if err != nil {
log.WithError(err).Error("Failed to get local port")
s.Close()
@ -460,7 +444,15 @@ func (s *PrimarySession) NewDatagramSubSession(id string, udpPort int, datagramO
return nil, err
}
if len(datagramOptions) > 0 {
return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, &datagramOptions[0]}, nil
// return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, &datagramOptions[0]}, nil
return &DatagramSession{
samAddr: s.Config.I2PConfig.Sam(),
id: id,
conn: conn,
keys: s.keys,
UDPSession: *udpConn,
DatagramOptions: &datagramOptions[0],
}, nil
}
opts := &DatagramOptions{
SendTags: 0,
@ -469,7 +461,15 @@ func (s *PrimarySession) NewDatagramSubSession(id string, udpPort int, datagramO
SendLeaseset: false,
}
log.WithFields(logrus.Fields{"id": id, "localPort": lport}).Debug("Created new datagram sub-session")
return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, opts}, nil
// return &DatagramSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, s.keys, rUDPAddr, nil, opts}, nil
return &DatagramSession{
samAddr: s.Config.I2PConfig.Sam(),
id: id,
conn: conn,
keys: s.keys,
UDPSession: *udpConn,
DatagramOptions: opts,
}, nil
}
// Creates a new raw session. udpPort is the UDP port SAM is listening on,

View File

@ -42,7 +42,7 @@ func Test_PrimaryStreamingDial(t *testing.T) {
defer ss.Close()
fmt.Println("\tNotice: This may fail if your I2P node is not well integrated in the I2P network.")
fmt.Println("\tLooking up idk.i2p")
forumAddr, err := earlysam.Lookup("idk.i2p")
forumAddr, err := ss.Lookup("idk.i2p")
if err != nil {
fmt.Println(err.Error())
t.Fail()