Compare commits

9 Commits

Author SHA1 Message Date
eyedeekay
61419dc4a1 disable Tor and I2P in CI 2025-07-21 20:54:11 -04:00
eyedeekay
ef78a24b4a fix test 2025-07-21 20:44:15 -04:00
eyedeekay
4766493cde break down complex functions 2025-07-21 20:16:10 -04:00
eyedeekay
bf17009909 more refactoring 2025-07-21 19:49:39 -04:00
eyedeekay
bfb60c2c0d break down complex functions 2025-07-21 19:20:59 -04:00
eyedeekay
5310d6d5de function breakdown reduce cyclomatic complexity, make it easier to debug 2025-07-21 18:42:26 -04:00
eyedeekay
6d08c32e71 Reduce cyclomatic complexity of listener function 2025-07-21 18:26:59 -04:00
eyedeekay
12ea259682 fix panic on deadlock 2025-07-21 17:54:27 -04:00
eyedeekay
02e53243b8 work on deadlock thing that probably never actually happens 2025-07-21 16:46:22 -04:00
11 changed files with 396 additions and 217 deletions

2
.gitignore vendored
View File

@@ -24,7 +24,7 @@ go.work.sum
# env file
.env
/*keys
/data-dir*
data-dir*
mp
metaproxy
test*

2
go.mod
View File

@@ -11,7 +11,6 @@ require (
require (
github.com/cretz/bine v0.2.0 // indirect
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8 // indirect
github.com/go-i2p/i2pkeys v0.33.92 // indirect
github.com/go-i2p/sam3 v0.33.92 // indirect
github.com/oklog/ulid/v2 v2.1.1 // indirect
@@ -24,5 +23,4 @@ require (
golang.org/x/net v0.42.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.12.0 // indirect
)

34
go.sum
View File

@@ -3,29 +3,17 @@ github.com/cretz/bine v0.2.0/go.mod h1:WU4o9QR9wWp8AVKtTM1XD5vUHkEqnf2vVSo6dBqbe
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-i2p/go-limit v0.0.0-20250203203028-656fa82e1cb3 h1:czQe9f4Y+UFwqMKvMVsWDG/HWKdIN6uPelIkfoCjAsk=
github.com/go-i2p/go-limit v0.0.0-20250203203028-656fa82e1cb3/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
github.com/go-i2p/go-limit v0.0.0-20250718203732-4734f182b014 h1:USgEZEPweXNRwXJjbsIvGz9q+ADSIdo+rMxzyuqbHbQ=
github.com/go-i2p/go-limit v0.0.0-20250718203732-4734f182b014/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8 h1:eKJ5X3wWYg/ln8C2G9Ibp3bUn8Qg8sYe28Uthypmb9g=
github.com/go-i2p/go-limit v0.0.0-20250718212214-52e5c6fec5d8/go.mod h1:4jjmVRhvKj47sQ6B6wdDhN1IrEZunE6KwkYLQx/BeVE=
github.com/go-i2p/i2pkeys v0.0.0-20241108200332-e4f5ccdff8c4/go.mod h1:m5TlHjPZrU5KbTd7Lr+I2rljyC6aJ88HdkeMQXV0U0E=
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708 h1:Tiy9IBwi21maNpK74yCdHursJJMkyH7w87tX1nXGWzg=
github.com/go-i2p/i2pkeys v0.33.10-0.20241113193422-e10de5e60708/go.mod h1:m5TlHjPZrU5KbTd7Lr+I2rljyC6aJ88HdkeMQXV0U0E=
github.com/go-i2p/i2pkeys v0.33.92 h1:e2vx3vf7tNesaJ8HmAlGPOcfiGM86jzeIGxh27I9J2Y=
github.com/go-i2p/i2pkeys v0.33.92/go.mod h1:BRURQ/twxV0WKjZlFSKki93ivBi+MirZPWudfwTzMpE=
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c h1:VTiECn3dFEmUlZjto+wOwJ7SSJTHPLyNprQMR5HzIMI=
github.com/go-i2p/logger v0.0.0-20241123010126-3050657e5d0c/go.mod h1:te7Zj3g3oMeIl8uBXAgO62UKmZ6m6kHRNg1Mm+X8Hzk=
github.com/go-i2p/onramp v0.33.92 h1:Dk3A0SGpdEw829rSjW2LqN8o16pUvuhiN0vn36z7Gpc=
github.com/go-i2p/onramp v0.33.92/go.mod h1:5sfB8H2xk05gAS2K7XAUZ7ekOfwGJu3tWF0fqdXzJG4=
github.com/go-i2p/sam3 v0.33.9 h1:3a+gunx75DFc6jxloUZTAVJbdP6736VU1dy2i7I9fKA=
github.com/go-i2p/sam3 v0.33.9/go.mod h1:oDuV145l5XWKKafeE4igJHTDpPwA0Yloz9nyKKh92eo=
github.com/go-i2p/sam3 v0.33.92 h1:TVpi4GH7Yc7nZBiE1QxLjcZfnC4fI/80zxQz1Rk36BA=
github.com/go-i2p/sam3 v0.33.92/go.mod h1:oDuV145l5XWKKafeE4igJHTDpPwA0Yloz9nyKKh92eo=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/oklog/ulid/v2 v2.1.0 h1:+9lhoxAP56we25tyYETBBY1YLA2SaoLvUFgrP2miPJU=
github.com/oklog/ulid/v2 v2.1.0/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/oklog/ulid/v2 v2.1.1 h1:suPZ4ARWLOJLegGFiZZ1dFAkqzhMjL3J1TzI+5wHz8s=
github.com/oklog/ulid/v2 v2.1.1/go.mod h1:rcEKHmBBKfef9DhnvX7y1HZBYxjXb0cP5ExxNsTT1QQ=
github.com/opd-ai/wileedot v0.0.0-20241217172720-521d4175e624 h1:FXCTQV93+31Yj46zpYbd41es+EYgT7qi4RK6KSVrGQM=
@@ -35,12 +23,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/samber/lo v1.50.0 h1:XrG0xOeHs+4FQ8gJR97zDz5uOFMW7OwFWiFVzqopKgY=
github.com/samber/lo v1.50.0/go.mod h1:RjZyNk6WSnUFRKK6EyOhsRJMqft3G+pg7dCWHQCWvsc=
github.com/samber/lo v1.51.0 h1:kysRYLbHy/MB7kQZf5DSN50JHmMsNEdeY24VzJFu7wI=
github.com/samber/lo v1.51.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0=
github.com/samber/oops v1.18.0 h1:NnoCdxlOg/ajFos8HIC0+dV8S6cZRcrjW1WrfZe+GOc=
github.com/samber/oops v1.18.0/go.mod h1:DcZbba2s+PzSx14vY6HjvhV1FDsGOZ1TJg7T/ZZARBQ=
github.com/samber/oops v1.19.0 h1:sfZAwC8MmTXBRRyNc4Z1utuTPBx+hFKF5fJ9DEQRZfw=
github.com/samber/oops v1.19.0/go.mod h1:+f+61dbiMxEMQ8gw/zTxW2pk+YGobaDM4glEHQtPOww=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
@@ -49,42 +33,28 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw=
go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8=
go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ=
go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I=
go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4=
go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ=
go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4=
go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo=
golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM=
golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/time v0.12.0 h1:ScB/8o8olJvc+CQPWrK3fPZNfh7qgwCrY0zJmoEQLSE=
golang.org/x/time v0.12.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -2,6 +2,7 @@ package meta
import (
"net"
"strings"
"sync/atomic"
"time"
)
@@ -9,74 +10,104 @@ import (
// handleListener runs in a separate goroutine for each added listener
// and forwards accepted connections to the connCh channel.
func (ml *MetaListener) handleListener(id string, listener net.Listener) {
defer func() {
// Recover from any panic to ensure WaitGroup.Done() is always called
if r := recover(); r != nil {
log.Printf("PANIC in listener goroutine for %s: %v", id, r)
}
log.Printf("Listener goroutine for %s exiting", id)
ml.listenerWg.Done()
}()
defer ml.recoverAndCleanup(id)
for {
// First check if the MetaListener is closed
select {
case <-ml.closeCh:
log.Printf("MetaListener closed, stopping %s listener", id)
if ml.shouldStopListener(id) {
return
default:
}
// Set a deadline for Accept to prevent blocking indefinitely
if deadline, ok := listener.(interface{ SetDeadline(time.Time) error }); ok {
deadline.SetDeadline(time.Now().Add(1 * time.Second))
}
ml.setAcceptDeadline(listener)
conn, err := listener.Accept()
if err != nil {
// Check if this is a timeout error (which we expect due to our deadline)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
if ml.handleAcceptError(id, err) {
continue
}
// Check if this is any other temporary error
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Printf("Temporary error in %s listener: %v, retrying in 100ms", id, err)
time.Sleep(100 * time.Millisecond)
continue
}
// Check if the listener was closed (expected during shutdown)
if atomic.LoadInt64(&ml.isClosed) != 0 {
log.Printf("Listener %s closed during shutdown", id)
return
}
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
select {
case ml.removeListenerCh <- id:
// Successfully signaled for removal
case <-ml.closeCh:
// MetaListener is closing, no need to signal removal
}
return
}
// If we reach here, we have a valid connection
log.Printf("Listener %s accepted connection from %s", id, conn.RemoteAddr())
// Try to forward the connection, but don't block indefinitely
select {
case ml.connCh <- ConnResult{Conn: conn, src: id}:
log.Printf("Connection from %s successfully forwarded via %s", conn.RemoteAddr(), id)
case <-ml.closeCh:
log.Printf("MetaListener closing while forwarding connection, closing connection")
conn.Close()
return
case <-time.After(5 * time.Second):
// If we can't forward within 5 seconds, something is seriously wrong
log.Printf("WARNING: Connection forwarding timed out, closing connection from %s", conn.RemoteAddr())
conn.Close()
}
ml.forwardConnection(id, conn)
}
}
// recoverAndCleanup handles panic recovery and ensures proper cleanup for listener goroutines.
func (ml *MetaListener) recoverAndCleanup(id string) {
if r := recover(); r != nil {
log.Printf("PANIC in listener goroutine for %s: %v", id, r)
}
log.Printf("Listener goroutine for %s exiting", id)
ml.listenerWg.Done()
}
// shouldStopListener checks if the MetaListener is closed and should stop processing.
func (ml *MetaListener) shouldStopListener(id string) bool {
select {
case <-ml.closeCh:
log.Printf("MetaListener closed, stopping %s listener", id)
return true
default:
return false
}
}
// setAcceptDeadline sets a deadline for Accept to prevent blocking indefinitely.
func (ml *MetaListener) setAcceptDeadline(listener net.Listener) {
if deadline, ok := listener.(interface{ SetDeadline(time.Time) error }); ok {
deadline.SetDeadline(time.Now().Add(1 * time.Second))
}
}
// handleAcceptError processes errors from listener.Accept() and determines if processing should continue.
// Returns true if the listener should continue processing, false if it should stop.
func (ml *MetaListener) handleAcceptError(id string, err error) bool {
// Check if this is a timeout error (which we expect due to our deadline)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return true
}
// For other network errors, check if it's retryable based on common patterns
errStr := err.Error()
if strings.Contains(errStr, "connection reset") || strings.Contains(errStr, "broken pipe") ||
strings.Contains(errStr, "resource temporarily unavailable") {
log.Printf("Retryable error in %s listener: %v, retrying in 100ms", id, err)
time.Sleep(100 * time.Millisecond)
return true
}
// Check if the listener was closed (expected during shutdown)
if atomic.LoadInt64(&ml.isClosed) != 0 {
log.Printf("Listener %s closed during shutdown", id)
return false
}
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
ml.signalListenerRemoval(id)
return false
}
// signalListenerRemoval attempts to signal that a listener should be removed.
func (ml *MetaListener) signalListenerRemoval(id string) {
select {
case ml.removeListenerCh <- id:
// Successfully signaled for removal
case <-ml.closeCh:
// MetaListener is closing, no need to signal removal
}
}
// forwardConnection attempts to forward a connection through the connection channel.
func (ml *MetaListener) forwardConnection(id string, conn net.Conn) {
select {
case ml.connCh <- ConnResult{Conn: conn, src: id}:
log.Printf("Connection from %s successfully forwarded via %s", conn.RemoteAddr(), id)
case <-ml.closeCh:
log.Printf("MetaListener closing while forwarding connection, closing connection")
conn.Close()
case <-time.After(5 * time.Second):
// If we can't forward within 5 seconds, something is seriously wrong
log.Printf("WARNING: Connection forwarding timed out, closing connection from %s", conn.RemoteAddr())
conn.Close()
}
}

View File

@@ -15,6 +15,11 @@ func (ml *MetaListener) Accept() (net.Conn, error) {
return nil, ErrListenerClosed
}
return ml.waitForConnection()
}
// waitForConnection waits for the next available connection from any managed listener.
func (ml *MetaListener) waitForConnection() (net.Conn, error) {
for {
select {
case result, ok := <-ml.connCh:
@@ -47,7 +52,21 @@ func (ml *MetaListener) Close() error {
// Signal all goroutines to stop first, before clearing listeners map
close(ml.closeCh)
// Close all listeners first to stop accepting new connections
// Close all listeners and collect any errors
errs := ml.closeAllListeners()
// Clear the listeners map since they're all closed
ml.listeners = make(map[string]net.Listener)
ml.mu.Unlock()
// Wait for all listener goroutines to exit gracefully
ml.waitForListenerShutdown()
return ml.formatCloseErrors(errs)
}
// closeAllListeners closes all managed listeners and returns any errors encountered.
func (ml *MetaListener) closeAllListeners() []error {
var errs []error
for id, listener := range ml.listeners {
if err := listener.Close(); err != nil {
@@ -55,13 +74,11 @@ func (ml *MetaListener) Close() error {
errs = append(errs, err)
}
}
return errs
}
// Clear the listeners map since they're all closed
ml.listeners = make(map[string]net.Listener)
ml.mu.Unlock()
// Allow a brief grace period for handlers to finish processing current connections
// waitForListenerShutdown waits for all listener goroutines to exit with a grace period.
func (ml *MetaListener) waitForListenerShutdown() {
gracePeriod := 100 * time.Millisecond
done := make(chan struct{})
@@ -80,12 +97,13 @@ func (ml *MetaListener) Close() error {
ml.listenerWg.Wait()
}
log.Printf("All listener goroutines have exited")
}
// Return combined errors if any
// formatCloseErrors combines multiple close errors into a single error or returns nil.
func (ml *MetaListener) formatCloseErrors(errs []error) error {
if len(errs) > 0 {
return fmt.Errorf("errors closing listeners: %v", errs)
}
return nil
}

View File

@@ -2,6 +2,7 @@ package mirror
import (
"os"
"strconv"
"sync"
"testing"
"time"
@@ -34,9 +35,10 @@ func TestFixedConcurrentListenDataRace(t *testing.T) {
wg.Add(1)
go func(id int) {
defer wg.Done()
port := 5000 + id
portstr := strconv.Itoa(port)
// Use different addresses to avoid network conflicts
addr := "test-fixed-" + string(rune('a'+id)) + ":5000"
addr := "test-fixed-" + string(rune('a'+id)) + ":" + portstr
// This should now be safe thanks to mutex protection
listener, err := mirror.Listen(addr, "")

View File

@@ -48,6 +48,11 @@ func (m *Mirror) Close() error {
log.Println("Garlic closed")
}
}
// Clear the maps to prevent reuse of closed instances
m.Onions = make(map[string]*onramp.Onion)
m.Garlics = make(map[string]*onramp.Garlic)
log.Println("Mirror closed")
return nil
}
@@ -93,9 +98,8 @@ func NewMirror(name string) (*Mirror, error) {
return ml, nil
}
func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
log.Println("Starting Mirror Listener")
// get the port:
// parsePortFromName extracts the port from a name string, defaulting to "3000" if parsing fails.
func parsePortFromName(name string) string {
_, port, err := net.SplitHostPort(name)
if err != nil {
// check if host is an IP address
@@ -104,8 +108,11 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
}
port = "3000"
}
hiddenTls := hiddenTls(port)
log.Printf("Actual args: name: '%s' addr: '%s' certDir: '%s' hiddenTls: '%t'\n", name, addr, certDir(), hiddenTls)
return port
}
// setupLocalTCPListener creates and configures a local TCP listener with hardening.
func setupLocalTCPListener(port string, metaListener *meta.MetaListener) (*net.TCPListener, error) {
localAddr := net.JoinHostPort("127.0.0.1", port)
listener, err := net.Listen("tcp", localAddr)
if err != nil {
@@ -117,127 +124,235 @@ func (ml Mirror) Listen(name, addr string) (net.Listener, error) {
log.Fatal(err)
}
log.Printf("TCP listener created on %s\n", localAddr)
if err := ml.AddListener(port, hardenedListener); err != nil {
if err := metaListener.AddListener(port, hardenedListener); err != nil {
return nil, err
}
log.Printf("HTTP Local listener added http://%s\n", tcpListener.Addr())
log.Println("Checking for existing onion and garlic listeners")
listenerId := fmt.Sprintf("metalistener-%s-%s", name, port)
log.Println("Listener ID:", listenerId)
return tcpListener, nil
}
// Protect map access with mutex to prevent data race
// ensureHiddenServiceListeners creates onion and garlic listeners if they don't exist.
func (ml *Mirror) ensureHiddenServiceListeners(port, listenerId string) error {
ml.mu.Lock()
defer ml.mu.Unlock()
// Check if onion and garlic listeners already exist
if ml.Onions[port] == nil && !DisableTor() {
// make a new onion listener
// and add it to the map
log.Println("Creating new onion listener")
onion, err := onramp.NewOnion(listenerId)
if err != nil {
ml.mu.Unlock()
return nil, err
return err
}
log.Println("Onion listener created for port", port)
ml.Onions[port] = onion
}
if ml.Garlics[port] == nil && !DisableI2P() {
// make a new garlic listener
// and add it to the map
log.Println("Creating new garlic listener")
garlic, err := onramp.NewGarlic(listenerId, "127.0.0.1:7656", onramp.OPT_WIDE)
if err != nil {
ml.mu.Unlock()
return nil, err
return err
}
log.Println("Garlic listener created for port", port)
ml.Garlics[port] = garlic
}
ml.mu.Unlock()
if hiddenTls {
// make sure an onion and a garlic listener exist at ml.Onions[port] and ml.Garlics[port]
// and listen on them, check existence first
if !DisableTor() {
ml.mu.RLock()
onionInstance := ml.Onions[port]
ml.mu.RUnlock()
return nil
}
onionListener, err := onionInstance.ListenTLS()
if err != nil {
return nil, err
}
oid := fmt.Sprintf("onion-%s", onionListener.Addr().String())
if err := ml.AddListener(oid, onionListener); err != nil {
return nil, err
}
log.Printf("OnionTLS listener added https://%s\n", onionListener.Addr())
}
if !DisableI2P() {
ml.mu.RLock()
garlicInstance := ml.Garlics[port]
ml.mu.RUnlock()
// addOnionListener adds an onion listener to the meta listener, either TLS or regular.
func (ml *Mirror) addOnionListener(port string, metaListener *meta.MetaListener, useTLS bool) error {
if DisableTor() {
return nil
}
garlicListener, err := garlicInstance.ListenTLS()
if err != nil {
return nil, err
}
gid := fmt.Sprintf("garlic-%s", garlicListener.Addr().String())
if err := ml.AddListener(gid, garlicListener); err != nil {
return nil, err
}
log.Printf("GarlicTLS listener added https://%s\n", garlicListener.Addr())
}
onionInstance, err := ml.getOnionInstance(port)
if err != nil {
return err
}
listener, protocol, err := ml.createOnionListener(onionInstance, useTLS)
if err != nil {
return err
}
return ml.registerOnionListener(listener, metaListener, protocol, useTLS)
}
// getOnionInstance retrieves the onion instance for the specified port.
func (ml *Mirror) getOnionInstance(port string) (*onramp.Onion, error) {
ml.mu.RLock()
defer ml.mu.RUnlock()
onionInstance := ml.Onions[port]
if onionInstance == nil {
return nil, fmt.Errorf("no onion instance found for port %s", port)
}
return onionInstance, nil
}
// createOnionListener creates either a TLS or regular onion listener.
func (ml *Mirror) createOnionListener(onionInstance *onramp.Onion, useTLS bool) (net.Listener, string, error) {
var listener net.Listener
var err error
var protocol string
if useTLS {
listener, err = onionInstance.ListenTLS()
protocol = "https"
} else {
if !DisableTor() {
ml.mu.RLock()
onionInstance := ml.Onions[port]
ml.mu.RUnlock()
onionListener, err := onionInstance.Listen()
if err != nil {
return nil, err
}
oid := fmt.Sprintf("onion-%s", onionListener.Addr().String())
if err := ml.AddListener(oid, onionListener); err != nil {
return nil, err
}
log.Printf("Onion listener added http://%s\n", onionListener.Addr())
}
if !DisableI2P() {
ml.mu.RLock()
garlicInstance := ml.Garlics[port]
ml.mu.RUnlock()
garlicListener, err := garlicInstance.Listen()
if err != nil {
return nil, err
}
gid := fmt.Sprintf("garlic-%s", garlicListener.Addr().String())
if err := ml.AddListener(gid, garlicListener); err != nil {
return nil, err
}
log.Printf("Garlic listener added http://%s\n", garlicListener.Addr())
}
listener, err = onionInstance.Listen()
protocol = "http"
}
if addr != "" {
cfg := wileedot.Config{
Domain: name,
AllowedDomains: []string{name},
CertDir: certDir(),
Email: addr,
}
tlsListener, err := wileedot.New(cfg)
if err != nil {
return nil, err
}
tid := fmt.Sprintf("tls-%s", tlsListener.Addr().String())
if err := ml.AddListener(tid, tlsListener); err != nil {
return nil, err
}
log.Printf("TLS listener added https://%s\n", tlsListener.Addr())
return listener, protocol, err
}
// registerOnionListener registers the onion listener with the meta listener and logs the result.
func (ml *Mirror) registerOnionListener(listener net.Listener, metaListener *meta.MetaListener, protocol string, useTLS bool) error {
oid := fmt.Sprintf("onion-%s", listener.Addr().String())
if err := metaListener.AddListener(oid, listener); err != nil {
return err
}
return &ml, nil
tlsPrefix := ""
if useTLS {
tlsPrefix = "TLS"
}
log.Printf("Onion%s listener added %s://%s\n", tlsPrefix, protocol, listener.Addr())
return nil
}
// addGarlicListener adds a garlic listener to the meta listener, either TLS or regular.
func (ml *Mirror) addGarlicListener(port string, metaListener *meta.MetaListener, useTLS bool) error {
if DisableI2P() {
return nil
}
garlicInstance, err := ml.getGarlicInstance(port)
if err != nil {
return err
}
listener, protocol, err := ml.createGarlicListener(garlicInstance, useTLS)
if err != nil {
return err
}
return ml.registerGarlicListener(listener, metaListener, protocol, useTLS)
}
// getGarlicInstance retrieves the garlic instance for the specified port.
func (ml *Mirror) getGarlicInstance(port string) (*onramp.Garlic, error) {
ml.mu.RLock()
defer ml.mu.RUnlock()
garlicInstance := ml.Garlics[port]
if garlicInstance == nil {
return nil, fmt.Errorf("no garlic instance found for port %s", port)
}
return garlicInstance, nil
}
// createGarlicListener creates either a TLS or regular garlic listener.
func (ml *Mirror) createGarlicListener(garlicInstance *onramp.Garlic, useTLS bool) (net.Listener, string, error) {
var listener net.Listener
var err error
var protocol string
if useTLS {
listener, err = garlicInstance.ListenTLS()
protocol = "https"
} else {
listener, err = garlicInstance.Listen()
protocol = "http"
}
return listener, protocol, err
}
// registerGarlicListener registers the garlic listener with the meta listener and logs the result.
func (ml *Mirror) registerGarlicListener(listener net.Listener, metaListener *meta.MetaListener, protocol string, useTLS bool) error {
gid := fmt.Sprintf("garlic-%s", listener.Addr().String())
if err := metaListener.AddListener(gid, listener); err != nil {
return err
}
tlsPrefix := ""
if useTLS {
tlsPrefix = "TLS"
}
log.Printf("Garlic%s listener added %s://%s\n", tlsPrefix, protocol, listener.Addr())
return nil
}
// setupTLSListener creates and adds a TLS listener using wileedot if addr is provided.
func setupTLSListener(name, addr string, metaListener *meta.MetaListener) error {
if addr == "" {
return nil
}
cfg := wileedot.Config{
Domain: name,
AllowedDomains: []string{name},
CertDir: certDir(),
Email: addr,
}
tlsListener, err := wileedot.New(cfg)
if err != nil {
return err
}
tid := fmt.Sprintf("tls-%s", tlsListener.Addr().String())
if err := metaListener.AddListener(tid, tlsListener); err != nil {
return err
}
log.Printf("TLS listener added https://%s\n", tlsListener.Addr())
return nil
}
// Listen creates a comprehensive network listener that supports multiple protocols.
// It sets up TCP, onion, garlic, and optionally TLS listeners.
func (ml *Mirror) Listen(name, addr string) (net.Listener, error) {
log.Println("Starting Mirror Listener")
// Create a new MetaListener for this specific Listen() call
newMetaListener := meta.NewMetaListener()
// Parse port from name
port := parsePortFromName(name)
hiddenTls := hiddenTls(port)
log.Printf("Actual args: name: '%s' addr: '%s' certDir: '%s' hiddenTls: '%t'\n", name, addr, certDir(), hiddenTls)
// Setup local TCP listener
_, err := setupLocalTCPListener(port, newMetaListener)
if err != nil {
return nil, err
}
// Ensure hidden service listeners exist
listenerId := fmt.Sprintf("metalistener-%s-%s", name, port)
log.Println("Listener ID:", listenerId)
log.Println("Checking for existing onion and garlic listeners")
if err := ml.ensureHiddenServiceListeners(port, listenerId); err != nil {
return nil, err
}
// Add onion and garlic listeners
if err := ml.addOnionListener(port, newMetaListener, hiddenTls); err != nil {
return nil, err
}
if err := ml.addGarlicListener(port, newMetaListener, hiddenTls); err != nil {
return nil, err
}
// Setup TLS listener if email address is provided
if err := setupTLSListener(name, addr, newMetaListener); err != nil {
return nil, err
}
return newMetaListener, nil
}
// Listen creates a new Mirror instance and sets up listeners for TLS, Onion, and Garlic.

View File

@@ -1,6 +1,8 @@
package mirror
import (
"os"
"strconv"
"sync"
"testing"
"time"
@@ -8,8 +10,14 @@ import (
// TestConcurrentListenDataRace tests for data race in concurrent Listen() calls
func TestConcurrentListenDataRace(t *testing.T) {
// Enable race detector with go test -race
// This test reproduces the data race in Mirror.Listen()
// Disable Tor and I2P to avoid deadlocks in concurrent access to shared connections
// This isolates the test to focus on the map race condition in Mirror.Listen()
os.Setenv("DISABLE_TOR", "true")
os.Setenv("DISABLE_I2P", "true")
defer func() {
os.Unsetenv("DISABLE_TOR")
os.Unsetenv("DISABLE_I2P")
}()
mirror, err := NewMirror("test-mirror:3001")
if err != nil {
@@ -29,8 +37,9 @@ func TestConcurrentListenDataRace(t *testing.T) {
// Use different ports to avoid "address already in use" errors
// The race condition occurs on the map access, not the network binding
port := 4000 + id
addr := "test-" + string(rune('a'+id)) + ":" + string(rune('0'+port%10))
port := 14000 + id
portstr := strconv.Itoa(port)
addr := "test-" + string(rune('a'+id)) + ":" + portstr
// This should cause concurrent map access on ml.Onions and ml.Garlics
listener, err := mirror.Listen(addr, "")
@@ -69,26 +78,34 @@ func TestConcurrentListenDataRace(t *testing.T) {
// TestSequentialListenWorks verifies that sequential Listen calls work correctly
func TestSequentialListenWorks(t *testing.T) {
// Disable Tor and I2P to avoid timeout issues in testing environment
os.Setenv("DISABLE_TOR", "true")
os.Setenv("DISABLE_I2P", "true")
defer func() {
os.Unsetenv("DISABLE_TOR")
os.Unsetenv("DISABLE_I2P")
}()
mirror, err := NewMirror("test-sequential:3002")
if err != nil {
t.Fatalf("Failed to create mirror: %v", err)
}
defer mirror.Close()
// Sequential calls should work fine
listener1, err := mirror.Listen("test-seq-1:3002", "")
// Sequential calls should work fine with different ports
listener1, err := mirror.Listen("test-seq-1:3003", "")
if err != nil {
t.Fatalf("First Listen() failed: %v", err)
}
if listener1 != nil {
listener1.Close()
if listener1 == nil {
t.Fatal("First Listen() returned nil listener")
}
listener2, err := mirror.Listen("test-seq-2:3002", "")
listener2, err := mirror.Listen("test-seq-2:3004", "")
if err != nil {
t.Fatalf("Second Listen() failed: %v", err)
}
if listener2 != nil {
listener2.Close()
if listener2 == nil {
t.Fatal("Second Listen() returned nil listener")
}
}

View File

@@ -1,6 +1,7 @@
package mirror
import (
"os"
"sync"
"testing"
"time"
@@ -10,6 +11,14 @@ import (
// TestConcurrentMapAccessDataRace tests for data race in concurrent map access
func TestConcurrentMapAccessDataRace(t *testing.T) {
// Disable network services to prevent actual connections during testing
os.Setenv("DISABLE_TOR", "1")
os.Setenv("DISABLE_I2P", "1")
defer func() {
os.Unsetenv("DISABLE_TOR")
os.Unsetenv("DISABLE_I2P")
}()
// This test directly reproduces the data race on the maps
// Create a Mirror with empty maps like in Listen()
mirror := &Mirror{

View File

@@ -204,7 +204,7 @@ func main() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
log.Printf("Proxy server starting, forwarding to %s:%d (max concurrent connections: %d)", *host, *port, *maxConns)
log.Printf("Proxy server starting on %d, forwarding to %s:%d (max concurrent connections: %d)", *listenPort, *host, *port, *maxConns)
// Start accepting connections in a separate goroutine
go func() {

View File

@@ -81,22 +81,41 @@ func (hl *hardenedListener) Accept() (net.Conn, error) {
// hardenConnection applies security and performance settings to a TCP connection.
func (hl *hardenedListener) hardenConnection(conn *net.TCPConn) error {
// Enable TCP keep-alive to detect dead connections
if err := hl.configureKeepAlive(conn); err != nil {
return err
}
if err := hl.configureLatencyOptimization(conn); err != nil {
return err
}
if err := hl.configureBufferSizes(conn); err != nil {
return err
}
return nil
}
// configureKeepAlive enables TCP keep-alive functionality to detect dead connections.
func (hl *hardenedListener) configureKeepAlive(conn *net.TCPConn) error {
if err := conn.SetKeepAlive(true); err != nil {
return err
}
// Set keep-alive interval for timely detection of connection issues
if err := conn.SetKeepAlivePeriod(keepAliveInterval); err != nil {
return err
}
// Disable Nagle's algorithm for lower latency
if err := conn.SetNoDelay(true); err != nil {
return err
}
return nil
}
// Set socket buffer sizes for optimal throughput
// configureLatencyOptimization disables Nagle's algorithm for reduced latency.
func (hl *hardenedListener) configureLatencyOptimization(conn *net.TCPConn) error {
return conn.SetNoDelay(true)
}
// configureBufferSizes sets socket buffer sizes for optimal throughput.
func (hl *hardenedListener) configureBufferSizes(conn *net.TCPConn) error {
if err := conn.SetReadBuffer(socketBufferSize); err != nil {
return err
}