Improve testing and deadlock handling

This commit is contained in:
eyedeekay
2025-10-01 11:53:18 -04:00
parent bdc993eb98
commit 848b14f76b
6 changed files with 70 additions and 30 deletions

View File

@@ -138,6 +138,11 @@ func (l *DatagramListener) dispatchPacketConnection(conn net.Conn) bool {
case <-l.closeChan:
conn.Close()
return false
default:
// Non-blocking: if acceptChan is full, drop the connection to prevent deadlock
logger.Warn("Accept channel full, dropping connection to prevent deadlock")
conn.Close()
return true
}
}

View File

@@ -70,15 +70,20 @@ func (r *DatagramReader) Close() error {
// This prevents the background goroutine from continuing to run
close(r.closeChan)
// Wait for the receive loop to confirm termination
// This ensures proper cleanup before returning
select {
case <-r.doneChan:
// Receive loop has confirmed it stopped
logger.Debug("Receive loop stopped")
case <-time.After(5 * time.Second):
// Timeout protection to prevent indefinite blocking
logger.Warn("Timeout waiting for receive loop to stop")
// Only wait for the receive loop if it was actually started
if r.loopStarted {
// Wait for the receive loop to confirm termination
// This ensures proper cleanup before returning
select {
case <-r.doneChan:
// Receive loop has confirmed it stopped
logger.Debug("Receive loop stopped")
case <-time.After(5 * time.Second):
// Timeout protection to prevent indefinite blocking
logger.Warn("Timeout waiting for receive loop to stop")
}
} else {
logger.Debug("Receive loop was never started, skipping wait")
}
// Clean up channels to prevent resource leaks
@@ -97,6 +102,11 @@ func (r *DatagramReader) Close() error {
// DATAGRAM RECEIVED responses and forwarding datagrams to the appropriate channels.
// It runs until the reader is closed and provides error handling for network issues.
func (r *DatagramReader) receiveLoop() {
// Mark that the receive loop has started to handle proper cleanup
r.mu.Lock()
r.loopStarted = true
r.mu.Unlock()
logger := r.initializeReceiveLoop()
defer r.signalReceiveLoopCompletion()

View File

@@ -58,14 +58,15 @@ func (s *DatagramSession) NewReader() *DatagramReader {
// Create reader with buffered channels for non-blocking operation
// The buffer size of 10 prevents blocking when multiple datagrams arrive rapidly
return &DatagramReader{
session: s,
recvChan: make(chan *Datagram, 10), // Buffer for incoming datagrams
errorChan: make(chan error, 1),
closeChan: make(chan struct{}),
doneChan: make(chan struct{}, 1),
closed: false,
mu: sync.RWMutex{},
closeOnce: sync.Once{},
session: s,
recvChan: make(chan *Datagram, 10), // Buffer for incoming datagrams
errorChan: make(chan error, 1),
closeChan: make(chan struct{}),
doneChan: make(chan struct{}, 1),
closed: false,
loopStarted: false,
mu: sync.RWMutex{},
closeOnce: sync.Once{},
}
}

View File

@@ -28,14 +28,15 @@ type DatagramSession struct {
// message processing and provides thread-safe access to received datagrams.
// Example usage: reader := session.NewReader(); datagram, err := reader.ReceiveDatagram()
type DatagramReader struct {
session *DatagramSession
recvChan chan *Datagram
errorChan chan error
closeChan chan struct{}
doneChan chan struct{}
closed bool
mu sync.RWMutex
closeOnce sync.Once
session *DatagramSession
recvChan chan *Datagram
errorChan chan error
closeChan chan struct{}
doneChan chan struct{}
closed bool
loopStarted bool
mu sync.RWMutex
closeOnce sync.Once
}
// DatagramWriter handles outgoing datagram transmission to I2P destinations.

View File

@@ -126,6 +126,11 @@ func (l *RawListener) dispatchConnection(conn *RawConn) bool {
case <-l.closeChan:
conn.Close()
return false
default:
// Non-blocking: if acceptChan is full, drop the connection to prevent deadlock
logger.Warn("Accept channel full, dropping connection to prevent deadlock")
conn.Close()
return true
}
}

View File

@@ -2,6 +2,7 @@ package raw
import (
"testing"
"time"
"github.com/go-i2p/go-sam-go/common"
"github.com/go-i2p/i2pkeys"
@@ -12,10 +13,27 @@ const testSAMAddr = "127.0.0.1:7656"
func setupTestSAM(t *testing.T) (*common.SAM, i2pkeys.I2PKeys) {
t.Helper()
sam, err := common.NewSAM(testSAMAddr)
if err != nil {
t.Fatalf("Failed to create SAM connection: %v", err)
// Add retry mechanism for resource exhaustion
var sam *common.SAM
var err error
for attempts := 0; attempts < 3; attempts++ {
sam, err = common.NewSAM(testSAMAddr)
if err == nil {
break
}
// Wait a bit before retrying in case of resource exhaustion
time.Sleep(time.Duration(attempts+1) * 500 * time.Millisecond)
}
if err != nil {
t.Fatalf("Failed to create SAM connection after retries: %v", err)
}
// Ensure SAM connection is closed when test completes
t.Cleanup(func() {
if sam != nil {
sam.Close()
}
})
keys, err := sam.NewKeys()
if err != nil {
@@ -49,8 +67,8 @@ func TestNewRawSession(t *testing.T) {
name: "session with small tunnel config",
id: "test_raw_small",
options: []string{
"inbound.length=0",
"outbound.length=0",
"inbound.length=1",
"outbound.length=1",
"inbound.lengthVariance=0",
"outbound.lengthVariance=0",
"inbound.quantity=1",