Refactor cleanup mechanism to prevent resource leaks in datagram and raw connections

This commit is contained in:
eyedeekay
2025-09-30 15:31:23 -04:00
parent 16ce59a662
commit 34df6905a5
10 changed files with 89 additions and 39 deletions

View File

@@ -84,8 +84,8 @@ func (ds *DatagramSession) DialContext(ctx context.Context, destination string)
go conn.reader.receiveLoop()
}
// Set up finalizer to prevent resource leaks if Close() is not called
conn.setFinalizer()
// Set up cleanup to prevent resource leaks if Close() is not called
conn.addCleanup()
logger.Debug("Successfully created datagram connection")
return conn, nil
@@ -158,8 +158,8 @@ func (ds *DatagramSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PA
go conn.reader.receiveLoop()
}
// Set up finalizer to prevent resource leaks if Close() is not called
conn.setFinalizer()
// Set up cleanup to prevent resource leaks if Close() is not called
conn.addCleanup()
logger.Debug("Successfully created I2P datagram connection")
return conn, nil

View File

@@ -86,9 +86,7 @@ func (c *DatagramConn) Close() error {
c.closed = true
// Clear the finalizer since we're cleaning up explicitly
c.clearFinalizer()
// Close reader and writer - these are owned by this connection
c.clearCleanup() // Close reader and writer - these are owned by this connection
if c.reader != nil {
c.reader.Close()
}
@@ -192,12 +190,30 @@ func datagramConnFinalizer(c *DatagramConn) {
c.mu.Unlock()
}
// setFinalizer sets up automatic cleanup for the connection to prevent resource leaks
func (c *DatagramConn) setFinalizer() {
runtime.SetFinalizer(c, datagramConnFinalizer)
// cleanupDatagramConn is called by AddCleanup to ensure resources are cleaned up
// even if the user forgets to call Close(). This prevents goroutine leaks.
func cleanupDatagramConn(c *DatagramConn) {
c.mu.Lock()
if !c.closed {
log.Warn("DatagramConn was garbage collected without being closed - cleaning up resources")
c.closed = true
if c.reader != nil {
c.reader.Close()
}
}
c.mu.Unlock()
}
// clearFinalizer removes the finalizer when Close() is called explicitly
func (c *DatagramConn) clearFinalizer() {
runtime.SetFinalizer(c, nil)
// addCleanup sets up automatic cleanup for the connection to prevent resource leaks
func (c *DatagramConn) addCleanup() {
c.cleanup = runtime.AddCleanup(c, cleanupDatagramConn, c)
}
// clearCleanup removes the cleanup when Close() is called explicitly
func (c *DatagramConn) clearCleanup() {
var zero runtime.Cleanup
if c.cleanup != zero {
c.cleanup.Stop()
c.cleanup = zero
}
}

View File

@@ -1,6 +1,7 @@
package datagram
import (
"runtime"
"sync"
"time"
@@ -79,4 +80,5 @@ type DatagramConn struct {
remoteAddr *i2pkeys.I2PAddr
mu sync.RWMutex
closed bool
cleanup runtime.Cleanup
}

View File

@@ -75,8 +75,8 @@ func (rs *RawSession) DialContext(ctx context.Context, destination string) (net.
go conn.reader.receiveLoop()
}
// Set up finalizer to prevent resource leaks if Close() is not called
conn.setFinalizer()
// Set up cleanup to prevent resource leaks if Close() is not called
conn.addCleanup()
logger.WithField("session_id", rs.ID()).Debug("Successfully created raw connection")
return conn, nil
@@ -142,8 +142,8 @@ func (rs *RawSession) DialI2PContext(ctx context.Context, addr i2pkeys.I2PAddr)
go conn.reader.receiveLoop()
}
// Set up finalizer to prevent resource leaks if Close() is not called
conn.setFinalizer()
// Set up cleanup to prevent resource leaks if Close() is not called
conn.addCleanup()
logger.WithField("session_id", rs.ID()).Debug("Successfully created I2P raw connection")
return conn, nil

View File

@@ -49,8 +49,8 @@ func TestFinalizerPreventsGoroutineLeaks(t *testing.T) {
}
}(conn.reader)
// Set up the finalizer (this is what our fix does)
conn.setFinalizer()
// Set up the cleanup (this is what our fix does)
conn.addCleanup()
}
// Connections go out of scope here without Close() being called
}()
@@ -110,8 +110,8 @@ func TestExplicitCloseSkipsFinalizer(t *testing.T) {
}
}(conn.reader)
// Set up the finalizer
conn.setFinalizer()
// Set up the cleanup
conn.addCleanup()
// Wait a bit for the goroutine to start
time.Sleep(50 * time.Millisecond)

View File

@@ -84,9 +84,7 @@ func (c *RawConn) Close() error {
c.closed = true
// Clear the finalizer since we're cleaning up explicitly
c.clearFinalizer()
// Close reader and writer - these are owned by this connection
c.clearCleanup() // Close reader and writer - these are owned by this connection
if c.reader != nil {
c.reader.Close()
}
@@ -196,12 +194,30 @@ func connFinalizer(c *RawConn) {
c.mu.Unlock()
}
// setFinalizer sets up automatic cleanup for the connection to prevent resource leaks
func (c *RawConn) setFinalizer() {
runtime.SetFinalizer(c, connFinalizer)
// cleanupResources is called by AddCleanup to ensure resources are cleaned up
// even if the user forgets to call Close(). This prevents goroutine leaks.
func cleanupRawConn(c *RawConn) {
c.mu.Lock()
if !c.closed {
log.Warn("RawConn was garbage collected without being closed - cleaning up resources")
c.closed = true
if c.reader != nil {
c.reader.Close()
}
}
c.mu.Unlock()
}
// clearFinalizer removes the finalizer when Close() is called explicitly
func (c *RawConn) clearFinalizer() {
runtime.SetFinalizer(c, nil)
// addCleanup sets up automatic cleanup for the connection to prevent resource leaks
func (c *RawConn) addCleanup() {
c.cleanup = runtime.AddCleanup(c, cleanupRawConn, c)
}
// clearCleanup removes the cleanup when Close() is called explicitly
func (c *RawConn) clearCleanup() {
var zero runtime.Cleanup
if c.cleanup != zero {
c.cleanup.Stop()
c.cleanup = zero
}
}

View File

@@ -1,6 +1,7 @@
package raw
import (
"runtime"
"sync"
"time"
@@ -54,6 +55,7 @@ type RawConn struct {
remoteAddr *i2pkeys.I2PAddr
mu sync.RWMutex
closed bool
cleanup runtime.Cleanup
}
// RawListener implements net.Listener for I2P raw connections

View File

@@ -69,8 +69,12 @@ func (l *StreamListener) Close() error {
// Unregister this listener from the session
l.session.unregisterListener(l)
// Remove the finalizer to prevent it from running on an already closed listener
runtime.SetFinalizer(l, nil)
// Remove the cleanup to prevent it from running on an already closed listener
var zero runtime.Cleanup
if l.cleanup != zero {
l.cleanup.Stop()
l.cleanup = zero
}
logger.Debug("Successfully closed StreamListener")
return nil
@@ -99,8 +103,12 @@ func (l *StreamListener) closeWithoutUnregister() error {
l.cancel()
}
// Remove the finalizer to prevent it from running on an already closed listener
runtime.SetFinalizer(l, nil)
// Remove the cleanup to prevent it from running on an already closed listener
var zero runtime.Cleanup
if l.cleanup != zero {
l.cleanup.Stop()
l.cleanup = zero
}
logger.Debug("Successfully closed StreamListener without unregister")
return nil

View File

@@ -11,6 +11,13 @@ import (
"github.com/sirupsen/logrus"
)
// cleanupStreamListener is called by AddCleanup to ensure the listener is closed and the goroutine is cleaned up
// This prevents goroutine leaks if the user forgets to call Close()
func cleanupStreamListener(l *StreamListener) {
log.Warn("StreamListener garbage collected without being closed, closing now to prevent goroutine leak")
l.Close()
}
// NewStreamSession creates a new streaming session for TCP-like I2P connections.
// It initializes the session with the provided SAM connection, session ID, cryptographic keys,
// and configuration options. The session provides both client and server capabilities for
@@ -75,12 +82,9 @@ func (s *StreamSession) Listen() (*StreamListener, error) {
cancel: cancel,
}
// Set a finalizer to ensure the listener is closed and the goroutine is cleaned up
// Set up cleanup to ensure the listener is closed and the goroutine is cleaned up
// This prevents goroutine leaks if the user forgets to call Close()
runtime.SetFinalizer(listener, func(l *StreamListener) {
logger.Warn("StreamListener garbage collected without being closed, closing now to prevent goroutine leak")
l.Close()
})
listener.cleanup = runtime.AddCleanup(listener, cleanupStreamListener, listener)
// Start accepting connections in a goroutine
go listener.acceptLoop()

View File

@@ -3,6 +3,7 @@ package stream
import (
"context"
"net"
"runtime"
"sync"
"time"
@@ -40,6 +41,7 @@ type StreamListener struct {
cancel context.CancelFunc
closed bool
mu sync.RWMutex
cleanup runtime.Cleanup
}
// StreamConn implements net.Conn for I2P streaming connections.