mirror of
https://github.com/go-i2p/go-sam-go.git
synced 2026-01-21 07:58:36 -05:00
fix data race in raw, partially fix data race in datagram1
This commit is contained in:
@@ -18,17 +18,16 @@ import (
|
||||
// concurrent access safely and provides proper error handling for network issues.
|
||||
// Example usage: datagram, err := reader.ReceiveDatagram()
|
||||
func (r *DatagramReader) ReceiveDatagram() (*Datagram, error) {
|
||||
// Check if the reader is closed before attempting to receive
|
||||
// This prevents operations on invalid readers and provides clear error messages
|
||||
// Hold read lock for the entire operation to prevent race with Close()
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
if r.closed {
|
||||
r.mu.RUnlock()
|
||||
return nil, oops.Errorf("reader is closed")
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
|
||||
// Use select to handle multiple channel operations atomically
|
||||
// This ensures proper handling of datagrams, errors, and close signals
|
||||
// The lock ensures that channels won't be closed while we're selecting on them
|
||||
select {
|
||||
case datagram := <-r.recvChan:
|
||||
// Successfully received a datagram from the network
|
||||
@@ -167,17 +166,10 @@ func (r *DatagramReader) processIncomingDatagram(logger *logger.Entry) bool {
|
||||
datagram, err := r.receiveDatagram()
|
||||
if err != nil {
|
||||
logger.WithError(err).Debug("Error receiving datagram")
|
||||
// Use non-blocking send to errorChan to prevent deadlock during shutdown
|
||||
select {
|
||||
case r.errorChan <- err:
|
||||
// Error successfully queued
|
||||
case <-r.closeChan:
|
||||
// Reader was closed while handling error
|
||||
return false
|
||||
default:
|
||||
// errorChan is full, drop the error to prevent blocking
|
||||
// This prevents deadlock during shutdown when no one reads errorChan
|
||||
logger.WithError(err).Debug("Dropped error due to full error channel")
|
||||
}
|
||||
return true
|
||||
}
|
||||
@@ -200,6 +192,11 @@ func (r *DatagramReader) processIncomingDatagram(logger *logger.Entry) bool {
|
||||
// the core functionality for the receive loop and handles protocol-specific details.
|
||||
// receiveDatagram performs the actual datagram reception from the SAM bridge
|
||||
func (r *DatagramReader) receiveDatagram() (*Datagram, error) {
|
||||
// Check if reader is closing before attempting expensive I/O operation
|
||||
if atomic.LoadInt32(&r.closing) == 1 {
|
||||
return nil, oops.Errorf("reader is closing")
|
||||
}
|
||||
|
||||
// Read data from the SAM connection
|
||||
// This blocks until data is available or an error occurs
|
||||
conn := r.session.Conn()
|
||||
|
||||
103
raw/race_test.go
Normal file
103
raw/race_test.go
Normal file
@@ -0,0 +1,103 @@
|
||||
package raw
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestSessionClosureRaceConditionFixed tests that Issue #1 from AUDIT.md is resolved
|
||||
// This validates that ReceiveDatagram() properly synchronizes with Close() operations
|
||||
func TestSessionClosureRaceConditionFixed(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping race condition test in short mode")
|
||||
}
|
||||
|
||||
// Test that demonstrates the fix works by ensuring clean error handling
|
||||
for attempt := 0; attempt < 10; attempt++ {
|
||||
reader := &RawReader{
|
||||
recvChan: make(chan *RawDatagram, 1),
|
||||
errorChan: make(chan error, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
closed: false,
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
|
||||
// Send close signal after a delay
|
||||
go func() {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
close(reader.closeChan)
|
||||
}()
|
||||
|
||||
// This should now cleanly return with "reader is closed" error
|
||||
// The fix ensures no race between the closed check and channel operation
|
||||
_, err := reader.ReceiveDatagram()
|
||||
|
||||
if err == nil {
|
||||
t.Error("Expected error when receiving on closed reader")
|
||||
} else if err.Error() != "reader is closed" {
|
||||
t.Errorf("Expected 'reader is closed' error, got: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestSessionClosureRaceConditionWithChannels tests proper closure signaling
|
||||
func TestSessionClosureWithProperSignaling(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping race condition test in short mode")
|
||||
}
|
||||
|
||||
// Test proper closure pattern that works with the fix
|
||||
reader := &RawReader{
|
||||
recvChan: make(chan *RawDatagram, 1),
|
||||
errorChan: make(chan error, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
closed: false,
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
|
||||
// Test 1: Pre-closed reader should return immediately
|
||||
reader.mu.Lock()
|
||||
reader.closed = true
|
||||
reader.mu.Unlock()
|
||||
|
||||
_, err := reader.ReceiveDatagram()
|
||||
if err == nil || err.Error() != "reader is closed" {
|
||||
t.Errorf("Expected 'reader is closed' error for pre-closed reader, got: %v", err)
|
||||
}
|
||||
|
||||
// Test 2: Reader closed via channel signal should work correctly
|
||||
reader2 := &RawReader{
|
||||
recvChan: make(chan *RawDatagram, 1),
|
||||
errorChan: make(chan error, 1),
|
||||
closeChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
closed: false,
|
||||
mu: sync.RWMutex{},
|
||||
}
|
||||
|
||||
// Start receive operation that will wait on channels
|
||||
resultChan := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := reader2.ReceiveDatagram()
|
||||
resultChan <- err
|
||||
}()
|
||||
|
||||
// Give the goroutine time to start and enter the select
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Close via channel signal - this should wake up the ReceiveDatagram
|
||||
close(reader2.closeChan)
|
||||
|
||||
// Wait for result
|
||||
select {
|
||||
case err := <-resultChan:
|
||||
if err == nil || err.Error() != "reader is closed" {
|
||||
t.Errorf("Expected 'reader is closed' error, got: %v", err)
|
||||
}
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Error("ReceiveDatagram should have returned within 1 second")
|
||||
}
|
||||
}
|
||||
@@ -14,15 +14,16 @@ import (
|
||||
|
||||
// ReceiveDatagram receives a raw datagram from any source
|
||||
func (r *RawReader) ReceiveDatagram() (*RawDatagram, error) {
|
||||
// Check if closed first, but don't rely on this check for safety
|
||||
// Hold read lock for the entire operation to prevent race with Close()
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
if r.closed {
|
||||
r.mu.RUnlock()
|
||||
return nil, oops.Errorf("reader is closed")
|
||||
}
|
||||
r.mu.RUnlock()
|
||||
|
||||
// Use select with closeChan to handle concurrent close operations safely
|
||||
// The lock ensures that channels won't be closed while we're selecting on them
|
||||
select {
|
||||
case datagram := <-r.recvChan:
|
||||
return datagram, nil
|
||||
|
||||
Reference in New Issue
Block a user