Implement subsession management for SAMv3.3 protocol: add methods for creating and removing subsessions, and update related session constructors to utilize existing subsessions.

This commit is contained in:
eyedeekay
2025-10-02 17:05:57 -04:00
parent 854a9e7a3f
commit 01e970872f
8 changed files with 339 additions and 34 deletions

View File

@@ -40,6 +40,13 @@ const (
SIG_DEFAULT = SIG_EdDSA_SHA512_Ed25519
)
// SESSION_ADD_OK indicates successful subsession addition to primary session.
// SESSION_REMOVE_OK indicates successful subsession removal from primary session.
const (
SESSION_ADD_OK = "SESSION STATUS RESULT=OK"
SESSION_REMOVE_OK = "SESSION STATUS RESULT=OK"
)
// SAM_RESULT_OK indicates successful SAM operation completion.
// SAM_RESULT_INVALID_KEY indicates SAM operation failed due to invalid key format.
// SAM_RESULT_KEY_NOT_FOUND indicates SAM operation failed due to missing key.

View File

@@ -172,3 +172,147 @@ func (sam *SAM) handleUnknownResponse(response string) error {
log.WithField("reply", response).Error("Unable to parse SAMv3 reply")
return oops.Errorf("Unable to parse SAMv3 reply: %v", response)
}
// AddSubSession adds a subsession to an existing PRIMARY session using the SESSION ADD command.
// This method implements the SAMv3.3 protocol for creating subsessions that share the same
// destination and tunnels as the primary session while providing separate protocol handling.
//
// Parameters:
// - style: Session style ("STREAM", "DATAGRAM", or "RAW")
// - id: Unique subsession identifier within the primary session scope
// - options: Additional SAM protocol options for the subsession
//
// The subsession inherits the destination from the primary session and uses the same
// tunnel infrastructure for enhanced efficiency. Each subsession must have a unique
// combination of style and port to enable proper routing of incoming traffic.
//
// Example usage:
//
// err := sam.AddSubSession("STREAM", "stream-sub-1", []string{"FROM_PORT=8080"})
func (sam *SAM) AddSubSession(style, id string, options []string) error {
log.WithFields(logrus.Fields{
"style": style,
"id": id,
"options": options,
}).Debug("Adding subsession to primary session")
message, err := sam.buildSessionAddMessage(style, id, options)
if err != nil {
return err
}
if err := sam.transmitSessionMessage(message); err != nil {
return err
}
response, err := sam.readSessionResponse()
if err != nil {
return err
}
return sam.parseSessionAddResponse(response, id)
}
// RemoveSubSession removes a subsession from the primary session using the SESSION REMOVE command.
// This method implements the SAMv3.3 protocol for cleanly terminating subsessions while
// keeping the primary session and other subsessions active.
//
// Parameters:
// - id: Unique subsession identifier to remove
//
// After removal, the subsession is closed and may not be used for sending or receiving data.
// The primary session and other subsessions remain unaffected by this operation.
//
// Example usage:
//
// err := sam.RemoveSubSession("stream-sub-1")
func (sam *SAM) RemoveSubSession(id string) error {
log.WithField("id", id).Debug("Removing subsession from primary session")
message := []byte("SESSION REMOVE ID=" + id + "\n")
log.WithField("message", string(message)).Debug("Sending SESSION REMOVE message")
if err := sam.transmitSessionMessage(message); err != nil {
return err
}
response, err := sam.readSessionResponse()
if err != nil {
return err
}
return sam.parseSessionRemoveResponse(response, id)
}
// buildSessionAddMessage constructs the SESSION ADD message with style, ID, and options.
func (sam *SAM) buildSessionAddMessage(style, id string, options []string) ([]byte, error) {
baseMsg := "SESSION ADD STYLE=" + style + " ID=" + id
extraStr := strings.Join(options, " ")
if extraStr != "" {
baseMsg += " " + extraStr
}
message := []byte(baseMsg + "\n")
log.WithField("message", string(message)).Debug("Built SESSION ADD message")
return message, nil
}
// parseSessionAddResponse parses the SAM response for SESSION ADD and returns appropriate errors.
func (sam *SAM) parseSessionAddResponse(response, id string) error {
if strings.HasPrefix(response, SESSION_ADD_OK) {
log.WithField("id", id).Debug("Successfully added subsession")
return nil
}
log.WithFields(logrus.Fields{
"id": id,
"response": response,
}).Error("Failed to add subsession")
return sam.handleErrorResponse(response)
}
// parseSessionRemoveResponse parses the SAM response for SESSION REMOVE and returns appropriate errors.
func (sam *SAM) parseSessionRemoveResponse(response, id string) error {
if strings.HasPrefix(response, SESSION_REMOVE_OK) {
log.WithField("id", id).Debug("Successfully removed subsession")
return nil
}
log.WithFields(logrus.Fields{
"id": id,
"response": response,
}).Error("Failed to remove subsession")
return sam.handleErrorResponse(response)
}
// NewBaseSessionFromSubsession creates a BaseSession for a subsession that has already been
// registered with a PRIMARY session using SESSION ADD. This constructor is used when the
// subsession is already registered with the SAM bridge and doesn't need a new session creation.
//
// This function is specifically designed for use with SAMv3.3 PRIMARY sessions where
// subsessions are created using SESSION ADD rather than SESSION CREATE commands.
//
// Parameters:
// - sam: SAM connection for data operations (separate from the primary session's control connection)
// - id: The subsession ID that was already registered with SESSION ADD
// - keys: The I2P keys from the primary session (shared across all subsessions)
//
// Returns a BaseSession ready for use without attempting to create a new SAM session.
func NewBaseSessionFromSubsession(sam *SAM, id string, keys i2pkeys.I2PKeys) (*BaseSession, error) {
log.WithField("id", id).Debug("Creating BaseSession from existing subsession")
// Create a BaseSession using the provided connection and shared keys
// The session is already registered with the SAM bridge via SESSION ADD
baseSession := &BaseSession{
id: id,
conn: sam.Conn,
keys: keys,
SAM: *sam,
}
log.WithField("id", id).Debug("Successfully created BaseSession from subsession")
return baseSession, nil
}

View File

@@ -50,6 +50,44 @@ func NewDatagramSession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, option
return ds, nil
}
// NewDatagramSessionFromSubsession creates a DatagramSession for a subsession that has already been
// registered with a PRIMARY session using SESSION ADD. This constructor skips the session
// creation step since the subsession is already registered with the SAM bridge.
//
// This function is specifically designed for use with SAMv3.3 PRIMARY sessions where
// subsessions are created using SESSION ADD rather than SESSION CREATE commands.
//
// Parameters:
// - sam: SAM connection for data operations (separate from the primary session's control connection)
// - id: The subsession ID that was already registered with SESSION ADD
// - keys: The I2P keys from the primary session (shared across all subsessions)
// - options: Configuration options for the subsession
//
// Returns a DatagramSession ready for use without attempting to create a new SAM session.
func NewDatagramSessionFromSubsession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []string) (*DatagramSession, error) {
logger := log.WithFields(logrus.Fields{
"id": id,
"options": options,
})
logger.Debug("Creating DatagramSession from existing subsession")
// Create a BaseSession manually since the session is already registered
baseSession, err := common.NewBaseSessionFromSubsession(sam, id, keys)
if err != nil {
logger.WithError(err).Error("Failed to create base session from subsession")
return nil, oops.Errorf("failed to create datagram session from subsession: %w", err)
}
ds := &DatagramSession{
BaseSession: baseSession,
sam: sam,
options: options,
}
logger.Debug("Successfully created DatagramSession from subsession")
return ds, nil
}
// NewReader creates a DatagramReader for receiving datagrams from any source.
// This method initializes a new reader with buffered channels for asynchronous datagram
// reception. The reader must be started manually with receiveLoop() for continuous operation.

View File

@@ -132,13 +132,13 @@ func NewPrimarySessionWithSignature(sam *common.SAM, id string, keys i2pkeys.I2P
// while providing full StreamSession functionality for TCP-like reliable connections.
// Each sub-session must have a unique identifier within the primary session scope.
//
// The created sub-session inherits the primary session's keys and base configuration
// but can have additional stream-specific options for customizing behavior such as
// connection timeouts, buffer sizes, and other streaming parameters.
// This implementation uses the SAMv3.3 SESSION ADD protocol to properly register
// the subsession with the primary session's SAM connection, ensuring compliance
// with the I2P SAM protocol specification for PRIMARY session management.
//
// Example usage:
//
// streamSub, err := primary.NewStreamSubSession("tcp-handler", []string{"connect.timeout=30"})
// streamSub, err := primary.NewStreamSubSession("tcp-handler", []string{"FROM_PORT=8080"})
// listener, err := streamSub.Listen()
// conn, err := streamSub.Dial("destination.b32.i2p")
func (p *PrimarySession) NewStreamSubSession(id string, options []string) (*StreamSubSession, error) {
@@ -156,20 +156,29 @@ func (p *PrimarySession) NewStreamSubSession(id string, options []string) (*Stre
})
logger.Debug("Creating stream sub-session")
// Create a new SAM connection for the sub-session
// Sub-sessions need their own SAM connections but share keys with primary
// Add the subsession to the primary session using SESSION ADD
if err := p.sam.AddSubSession("STREAM", id, options); err != nil {
logger.WithError(err).Error("Failed to add stream subsession")
return nil, oops.Errorf("failed to create stream sub-session: %w", err)
}
// Create a new SAM connection for the sub-session data operations
// This connection will be used for STREAM CONNECT, STREAM ACCEPT, etc.
subSAM, err := p.createSubSAMConnection()
if err != nil {
logger.WithError(err).Error("Failed to create sub-SAM connection")
// Clean up the subsession registration
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to create sub-SAM connection: %w", err)
}
// Create the stream session using the shared keys from the primary session
// This ensures the sub-session has the same I2P identity as the primary
streamSession, err := stream.NewStreamSession(subSAM, id, p.Keys(), options)
// Create the stream session using the new subsession constructor
// This avoids creating a duplicate session since it's already registered via SESSION ADD
streamSession, err := stream.NewStreamSessionFromSubsession(subSAM, id, p.Keys(), options)
if err != nil {
logger.WithError(err).Error("Failed to create stream session")
logger.WithError(err).Error("Failed to create stream session wrapper")
subSAM.Close()
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to create stream sub-session: %w", err)
}
@@ -180,6 +189,7 @@ func (p *PrimarySession) NewStreamSubSession(id string, options []string) (*Stre
if err := p.registry.Register(id, subSession); err != nil {
logger.WithError(err).Error("Failed to register stream sub-session")
streamSession.Close()
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to register stream sub-session: %w", err)
}
@@ -192,13 +202,13 @@ func (p *PrimarySession) NewStreamSubSession(id string, options []string) (*Stre
// while providing full DatagramSession functionality for UDP-like authenticated messaging.
// Each sub-session must have a unique identifier within the primary session scope.
//
// The created sub-session inherits the primary session's keys and base configuration
// but can have additional datagram-specific options for customizing behavior such as
// message timeouts, reliability settings, and other datagram parameters.
// This implementation uses the SAMv3.3 SESSION ADD protocol to properly register
// the subsession with the primary session's SAM connection, ensuring compliance
// with the I2P SAM protocol specification for PRIMARY session management.
//
// Example usage:
//
// datagramSub, err := primary.NewDatagramSubSession("udp-handler", []string{"receive.timeout=60"})
// datagramSub, err := primary.NewDatagramSubSession("udp-handler", []string{"FROM_PORT=8080"})
// writer := datagramSub.NewWriter()
// reader := datagramSub.NewReader()
func (p *PrimarySession) NewDatagramSubSession(id string, options []string) (*DatagramSubSession, error) {
@@ -216,18 +226,28 @@ func (p *PrimarySession) NewDatagramSubSession(id string, options []string) (*Da
})
logger.Debug("Creating datagram sub-session")
// Create a new SAM connection for the sub-session
// Add the subsession to the primary session using SESSION ADD
if err := p.sam.AddSubSession("DATAGRAM", id, options); err != nil {
logger.WithError(err).Error("Failed to add datagram subsession")
return nil, oops.Errorf("failed to create datagram sub-session: %w", err)
}
// Create a new SAM connection for the sub-session data operations
subSAM, err := p.createSubSAMConnection()
if err != nil {
logger.WithError(err).Error("Failed to create sub-SAM connection")
// Clean up the subsession registration
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to create sub-SAM connection: %w", err)
}
// Create the datagram session using the shared keys from the primary session
datagramSession, err := datagram.NewDatagramSession(subSAM, id, p.Keys(), options)
// Create the datagram session using a constructor that doesn't create a new session
// since the subsession is already registered via SESSION ADD
datagramSession, err := datagram.NewDatagramSessionFromSubsession(subSAM, id, p.Keys(), options)
if err != nil {
logger.WithError(err).Error("Failed to create datagram session")
logger.WithError(err).Error("Failed to create datagram session wrapper")
subSAM.Close()
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to create datagram sub-session: %w", err)
}
@@ -238,6 +258,7 @@ func (p *PrimarySession) NewDatagramSubSession(id string, options []string) (*Da
if err := p.registry.Register(id, subSession); err != nil {
logger.WithError(err).Error("Failed to register datagram sub-session")
datagramSession.Close()
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to register datagram sub-session: %w", err)
}
@@ -250,13 +271,13 @@ func (p *PrimarySession) NewDatagramSubSession(id string, options []string) (*Da
// while providing full RawSession functionality for unrepliable datagram communication.
// Each sub-session must have a unique identifier within the primary session scope.
//
// The created sub-session inherits the primary session's keys and base configuration
// but can have additional raw-specific options for customizing behavior such as
// transmission parameters and other raw communication settings.
// This implementation uses the SAMv3.3 SESSION ADD protocol to properly register
// the subsession with the primary session's SAM connection, ensuring compliance
// with the I2P SAM protocol specification for PRIMARY session management.
//
// Example usage:
//
// rawSub, err := primary.NewRawSubSession("raw-sender", []string{"send.timeout=30"})
// rawSub, err := primary.NewRawSubSession("raw-sender", []string{"FROM_PORT=8080"})
// writer := rawSub.NewWriter()
// reader := rawSub.NewReader()
func (p *PrimarySession) NewRawSubSession(id string, options []string) (*RawSubSession, error) {
@@ -274,18 +295,28 @@ func (p *PrimarySession) NewRawSubSession(id string, options []string) (*RawSubS
})
logger.Debug("Creating raw sub-session")
// Create a new SAM connection for the sub-session
// Add the subsession to the primary session using SESSION ADD
if err := p.sam.AddSubSession("RAW", id, options); err != nil {
logger.WithError(err).Error("Failed to add raw subsession")
return nil, oops.Errorf("failed to create raw sub-session: %w", err)
}
// Create a new SAM connection for the sub-session data operations
subSAM, err := p.createSubSAMConnection()
if err != nil {
logger.WithError(err).Error("Failed to create sub-SAM connection")
// Clean up the subsession registration
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to create sub-SAM connection: %w", err)
}
// Create the raw session using the shared keys from the primary session
rawSession, err := raw.NewRawSession(subSAM, id, p.Keys(), options)
// Create the raw session using a constructor that doesn't create a new session
// since the subsession is already registered via SESSION ADD
rawSession, err := raw.NewRawSessionFromSubsession(subSAM, id, p.Keys(), options)
if err != nil {
logger.WithError(err).Error("Failed to create raw session")
logger.WithError(err).Error("Failed to create raw session wrapper")
subSAM.Close()
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to create raw sub-session: %w", err)
}
@@ -296,6 +327,7 @@ func (p *PrimarySession) NewRawSubSession(id string, options []string) (*RawSubS
if err := p.registry.Register(id, subSession); err != nil {
logger.WithError(err).Error("Failed to register raw sub-session")
rawSession.Close()
p.sam.RemoveSubSession(id)
return nil, oops.Errorf("failed to register raw sub-session: %w", err)
}

View File

@@ -3,6 +3,7 @@ package primary
import (
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"
@@ -174,7 +175,8 @@ func TestPrimarySessionSubSessions(t *testing.T) {
t.Run("create datagram sub-session", func(t *testing.T) {
datagramSubID := "datagram_sub_1"
datagramSub, err := session.NewDatagramSubSession(datagramSubID, []string{})
// DATAGRAM subsessions require a PORT parameter per SAM v3.3 specification
datagramSub, err := session.NewDatagramSubSession(datagramSubID, []string{"PORT=8080"})
if err != nil {
t.Fatalf("Failed to create datagram sub-session: %v", err)
}
@@ -200,7 +202,8 @@ func TestPrimarySessionSubSessions(t *testing.T) {
t.Run("create raw sub-session", func(t *testing.T) {
rawSubID := "raw_sub_1"
rawSub, err := session.NewRawSubSession(rawSubID, []string{})
// RAW subsessions require a PORT parameter per SAM v3.3 specification
rawSub, err := session.NewRawSubSession(rawSubID, []string{"PORT=8081"})
if err != nil {
t.Fatalf("Failed to create raw sub-session: %v", err)
}
@@ -321,12 +324,12 @@ func TestPrimarySessionCascadeClose(t *testing.T) {
t.Fatalf("Failed to create stream sub-session: %v", err)
}
datagramSub, err := session.NewDatagramSubSession("datagram_cascade", []string{})
datagramSub, err := session.NewDatagramSubSession("datagram_cascade", []string{"PORT=8082"})
if err != nil {
t.Fatalf("Failed to create datagram sub-session: %v", err)
}
rawSub, err := session.NewRawSubSession("raw_cascade", []string{})
rawSub, err := session.NewRawSubSession("raw_cascade", []string{"PORT=8083"})
if err != nil {
t.Fatalf("Failed to create raw sub-session: %v", err)
}
@@ -394,9 +397,9 @@ func TestConcurrentSubSessionOperations(t *testing.T) {
case 0:
_, err = session.NewStreamSubSession(subSessionID, []string{})
case 1:
_, err = session.NewDatagramSubSession(subSessionID, []string{})
_, err = session.NewDatagramSubSession(subSessionID, []string{"PORT=" + strconv.Itoa(9000+id)})
case 2:
_, err = session.NewRawSubSession(subSessionID, []string{})
_, err = session.NewRawSubSession(subSessionID, []string{"PORT=" + strconv.Itoa(9100+id)})
}
if err != nil {

View File

@@ -45,6 +45,44 @@ func NewRawSession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []s
return rs, nil
}
// NewRawSessionFromSubsession creates a RawSession for a subsession that has already been
// registered with a PRIMARY session using SESSION ADD. This constructor skips the session
// creation step since the subsession is already registered with the SAM bridge.
//
// This function is specifically designed for use with SAMv3.3 PRIMARY sessions where
// subsessions are created using SESSION ADD rather than SESSION CREATE commands.
//
// Parameters:
// - sam: SAM connection for data operations (separate from the primary session's control connection)
// - id: The subsession ID that was already registered with SESSION ADD
// - keys: The I2P keys from the primary session (shared across all subsessions)
// - options: Configuration options for the subsession
//
// Returns a RawSession ready for use without attempting to create a new SAM session.
func NewRawSessionFromSubsession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []string) (*RawSession, error) {
logger := log.WithFields(logrus.Fields{
"id": id,
"options": options,
})
logger.Debug("Creating RawSession from existing subsession")
// Create a BaseSession manually since the session is already registered
baseSession, err := common.NewBaseSessionFromSubsession(sam, id, keys)
if err != nil {
logger.WithError(err).Error("Failed to create base session from subsession")
return nil, oops.Errorf("failed to create raw session from subsession: %w", err)
}
rs := &RawSession{
BaseSession: baseSession,
sam: sam,
options: options,
}
logger.Debug("Successfully created RawSession from subsession")
return rs, nil
}
// NewReader creates a RawReader for receiving raw datagrams from any source.
// It initializes buffered channels for incoming datagrams and errors, returning nil if the session is closed.
// The caller must start the receive loop manually by calling receiveLoop() in a goroutine.

View File

@@ -295,7 +295,9 @@ func TestPrimarySessionSubSessions(t *testing.T) {
done := make(chan error, 1)
go func() {
datagramSub, err := primary.NewDatagramSubSession("datagram-sub-"+RandString(), Options_Small)
// DATAGRAM subsessions require a PORT parameter per SAM v3.3 specification
options := append(Options_Small, "PORT=8080")
datagramSub, err := primary.NewDatagramSubSession("datagram-sub-"+RandString(), options)
if err != nil {
done <- err
return
@@ -326,7 +328,9 @@ func TestPrimarySessionSubSessions(t *testing.T) {
done := make(chan error, 1)
go func() {
rawSub, err := primary.NewRawSubSession("raw-sub-"+RandString(), Options_Small)
// RAW subsessions require a PORT parameter per SAM v3.3 specification
options := append(Options_Small, "PORT=8081")
rawSub, err := primary.NewRawSubSession("raw-sub-"+RandString(), options)
if err != nil {
done <- err
return

View File

@@ -54,6 +54,45 @@ func NewStreamSession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options
return ss, nil
}
// NewStreamSessionFromSubsession creates a StreamSession for a subsession that has already been
// registered with a PRIMARY session using SESSION ADD. This constructor skips the session
// creation step since the subsession is already registered with the SAM bridge.
//
// This function is specifically designed for use with SAMv3.3 PRIMARY sessions where
// subsessions are created using SESSION ADD rather than SESSION CREATE commands.
//
// Parameters:
// - sam: SAM connection for data operations (separate from the primary session's control connection)
// - id: The subsession ID that was already registered with SESSION ADD
// - keys: The I2P keys from the primary session (shared across all subsessions)
// - options: Configuration options for the subsession
//
// Returns a StreamSession ready for use without attempting to create a new SAM session.
func NewStreamSessionFromSubsession(sam *common.SAM, id string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
logger := log.WithFields(logrus.Fields{
"id": id,
"options": options,
})
logger.Debug("Creating StreamSession from existing subsession")
// Create a BaseSession manually since the session is already registered
// We need a way to create BaseSession from the common package
baseSession, err := common.NewBaseSessionFromSubsession(sam, id, keys)
if err != nil {
logger.WithError(err).Error("Failed to create base session from subsession")
return nil, oops.Errorf("failed to create stream session from subsession: %w", err)
}
ss := &StreamSession{
BaseSession: baseSession,
sam: sam,
options: options,
}
logger.Debug("Successfully created StreamSession from subsession")
return ss, nil
}
// NewStreamSessionWithSignature creates a new streaming session with a custom signature type for TCP-like I2P connections.
// This is the package-level function version that allows specifying cryptographic signature algorithms.
// It initializes the session with the provided SAM connection, session ID, cryptographic keys,