Compare commits

4 Commits

Author SHA1 Message Date
eyedeekay
cff9eef383 upgrade mirrorListener 2025-05-21 15:56:07 -04:00
eyedeekay
58490c00e7 refactor metalistener 2025-05-21 15:21:43 -04:00
eyedeekay
273f4a33fb use a buffer to fix the header rewriting 2025-05-21 14:53:34 -04:00
eyedeekay
2058a43096 Fix incoming host header 2025-05-21 13:01:31 -04:00
5 changed files with 129 additions and 75 deletions

View File

@@ -27,7 +27,7 @@ func main() {
if err := metaListener.AddListener("tcp", tcpListener); err != nil {
log.Fatalf("Failed to add TCP listener: %v", err)
}
log.Println("Added TCP listener on 127.0.0.1:8080")
log.Println("Added TCP listener on 127.0.0.1:8082")
// Create and add a Unix socket listener (on Unix systems)
socketPath := "/tmp/example.sock"
@@ -42,6 +42,7 @@ func main() {
log.Println("Added Unix socket listener on", socketPath)
}
}
log.Println("Starting http server...")
// Create a simple HTTP server using the meta listener
server := &http.Server{
@@ -49,6 +50,7 @@ func main() {
fmt.Fprintf(w, "Hello from MetaListener! You connected via: %s\n", r.Proto)
}),
}
log.Println("Server is ready to accept connections...")
// Handle server shutdown gracefully
stop := make(chan os.Signal, 1)

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"log"
"net"
"sync"
"time"
@@ -14,8 +15,6 @@ var (
ErrListenerClosed = errors.New("listener is closed")
// ErrNoListeners is returned when the meta listener has no active listeners
ErrNoListeners = errors.New("no active listeners")
// ErrInternalListenerFailure is returned when an internal listener fails
ErrInternalListenerFailure = errors.New("Internal listener error, shutting down metalistener for restart")
)
// MetaListener implements the net.Listener interface and manages multiple
@@ -27,8 +26,6 @@ type MetaListener struct {
listenerWg sync.WaitGroup
// connCh is used to receive connections from all managed listeners
connCh chan ConnResult
// errCh is used to receive errors from all managed listeners
errCh chan error
// closeCh signals all goroutines to stop
closeCh chan struct{}
// isClosed indicates whether the meta listener has been closed
@@ -47,8 +44,7 @@ type ConnResult struct {
func NewMetaListener() *MetaListener {
return &MetaListener{
listeners: make(map[string]net.Listener),
connCh: make(chan ConnResult),
errCh: make(chan error, 1), // Buffered to prevent blocking
connCh: make(chan ConnResult, 100), // Larger buffer for high connection volume
closeCh: make(chan struct{}),
}
}
@@ -102,58 +98,61 @@ func (ml *MetaListener) RemoveListener(id string) error {
// handleListener runs in a separate goroutine for each added listener
// and forwards accepted connections to the connCh channel.
func (ml *MetaListener) handleListener(id string, listener net.Listener) {
defer ml.listenerWg.Done()
defer func() {
log.Printf("Listener goroutine for %s exiting", id)
ml.listenerWg.Done()
}()
for {
conn, err := listener.Accept()
// First check if the MetaListener is closed
select {
case <-ml.closeCh:
// Meta listener is being closed, exit
log.Printf("MetaListener closed, stopping %s listener", id)
return
default:
// Continue processing
}
// Set a deadline for Accept to prevent blocking indefinitely
if deadline, ok := listener.(interface{ SetDeadline(time.Time) error }); ok {
deadline.SetDeadline(time.Now().Add(1 * time.Second))
}
conn, err := listener.Accept()
if err != nil {
// Check if this is a temporary error
// Check if this is a timeout error (which we expect due to our deadline)
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
// Check if this is any other temporary error
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
// For temporary errors, wait a bit and try again
log.Printf("Temporary error in %s listener: %v, retrying in 100ms", id, err)
time.Sleep(100 * time.Millisecond)
continue
}
// For non-temporary errors, check if listener was closed
ml.mu.RLock()
_, stillExists := ml.listeners[id]
ml.mu.RUnlock()
if stillExists {
// Create a combined error with both the standard message and original error details
combinedErr := fmt.Errorf("%w: listener %s error - %v",
ErrInternalListenerFailure, id, err)
// Send the combined error to notify Accept() calls
select {
case ml.errCh <- combinedErr:
default:
// Don't block if no one is reading errors
}
// Then close all listeners
go ml.Close()
}
log.Printf("Permanent error in %s listener: %v, stopping", id, err)
ml.mu.Lock()
delete(ml.listeners, id)
ml.mu.Unlock()
return
}
// Send the accepted connection to the connection channel
// If we reach here, we have a valid connection
log.Printf("Listener %s accepted connection from %s", id, conn.RemoteAddr())
// Try to forward the connection, but don't block indefinitely
select {
case ml.connCh <- ConnResult{Conn: conn, src: id}:
// Connection forwarded successfully
log.Printf("Connection from %s successfully forwarded via %s", conn.RemoteAddr(), id)
case <-ml.closeCh:
// If we're closing and got a connection, close it
log.Printf("MetaListener closing while forwarding connection, closing connection")
conn.Close()
return
case <-time.After(5 * time.Second):
// If we can't forward within 5 seconds, something is seriously wrong
log.Printf("WARNING: Connection forwarding timed out, closing connection from %s", conn.RemoteAddr())
conn.Close()
}
}
}
@@ -161,26 +160,31 @@ func (ml *MetaListener) handleListener(id string, listener net.Listener) {
// Accept implements the net.Listener Accept method.
// It waits for and returns the next connection from any of the managed listeners.
func (ml *MetaListener) Accept() (net.Conn, error) {
ml.mu.RLock()
if ml.isClosed {
ml.mu.RUnlock()
return nil, ErrListenerClosed
}
for {
ml.mu.RLock()
if ml.isClosed {
ml.mu.RUnlock()
return nil, ErrListenerClosed
}
if len(ml.listeners) == 0 {
if len(ml.listeners) == 0 {
ml.mu.RUnlock()
return nil, ErrNoListeners
}
ml.mu.RUnlock()
return nil, ErrNoListeners
}
ml.mu.RUnlock()
// Wait for either a connection, an error, or close signal
select {
case result := <-ml.connCh:
return result.Conn, nil
case err := <-ml.errCh:
return nil, err
case <-ml.closeCh:
return nil, ErrListenerClosed
// Wait for either a connection or close signal
select {
case result, ok := <-ml.connCh:
if !ok {
return nil, ErrListenerClosed
}
log.Printf("Accept returning connection from %s via %s",
result.RemoteAddr(), result.src)
return result.Conn, nil
case <-ml.closeCh:
return nil, ErrListenerClosed
}
}
}
@@ -194,6 +198,7 @@ func (ml *MetaListener) Close() error {
return nil
}
log.Printf("Closing MetaListener with %d listeners", len(ml.listeners))
ml.isClosed = true
// Signal all goroutines to stop
@@ -203,7 +208,8 @@ func (ml *MetaListener) Close() error {
var errs []error
for id, listener := range ml.listeners {
if err := listener.Close(); err != nil {
errs = append(errs, fmt.Errorf("failed to close listener %s: %w", id, err))
log.Printf("Error closing %s listener: %v", id, err)
errs = append(errs, err)
}
}
@@ -211,6 +217,7 @@ func (ml *MetaListener) Close() error {
// Wait for all listener goroutines to exit
ml.listenerWg.Wait()
log.Printf("All listener goroutines have exited")
// Return combined errors if any
if len(errs) > 0 {

View File

@@ -2,10 +2,13 @@ package mirror
import (
"bufio"
"bytes"
"crypto/tls"
"io"
"log"
"net"
"net/http"
"time"
)
// AddHeaders adds headers to the connection.
@@ -13,30 +16,56 @@ import (
// It only adds headers if the connection is an HTTP connection.
// It returns a net.Conn with the headers added.
func AddHeaders(conn net.Conn, headers map[string]string) net.Conn {
// read a request from the connection
// if the request is an HTTP request, add the headers
// if the request is not an HTTP request, return the connection as is
req, err := http.ReadRequest(bufio.NewReader(conn))
// Create a buffer to store the original request
var buf bytes.Buffer
teeReader := io.TeeReader(conn, &buf)
// Try to read the request, but also save it to our buffer
req, err := http.ReadRequest(bufio.NewReader(teeReader))
if err != nil {
log.Println("Error reading request:", err)
// if the request is not an HTTP request, return the connection as is
// Not an HTTP request or couldn't parse, return original connection
return conn
}
log.Println("Adding headers to connection:", req.Method, req.URL)
// Add our headers
for key, value := range headers {
req.Header.Add(key, value)
log.Println("Added header:", key, value)
}
// write the request back to the connection
if err := req.Write(conn); err != nil {
log.Println("Error writing request:", err)
// if there is an error writing the request, return the connection as is
return conn
// Create a pipe to connect our modified request with the output
pr, pw := io.Pipe()
// Write the modified request to one end of the pipe
go func() {
req.Write(pw)
// Then copy the rest of the original connection
io.Copy(pw, conn)
pw.Close()
}()
// Return a ReadWriter that reads from our pipe and writes to the original connection
return &readWriteConn{
Reader: pr,
Writer: conn,
conn: conn,
}
// If all goes well, return the connection with the headers added
return conn
}
// readWriteConn implements net.Conn
type readWriteConn struct {
io.Reader
io.Writer
conn net.Conn
}
// Implement the rest of net.Conn interface by delegating to the original connection
func (rwc *readWriteConn) Close() error { return rwc.conn.Close() }
func (rwc *readWriteConn) LocalAddr() net.Addr { return rwc.conn.LocalAddr() }
func (rwc *readWriteConn) RemoteAddr() net.Addr { return rwc.conn.RemoteAddr() }
func (rwc *readWriteConn) SetDeadline(t time.Time) error { return rwc.conn.SetDeadline(t) }
func (rwc *readWriteConn) SetReadDeadline(t time.Time) error { return rwc.conn.SetReadDeadline(t) }
func (rwc *readWriteConn) SetWriteDeadline(t time.Time) error { return rwc.conn.SetWriteDeadline(t) }
// Accept accepts a connection from the listener.
// It takes a net.Listener as input and returns a net.Conn with the headers added.
// It is used to accept connections from the meta listener and add headers to them.
@@ -56,11 +85,11 @@ func (ml *Mirror) Accept() (net.Conn, error) {
return nil, err
}
// If the handshake is successful, get the underlying connection
conn = tlsConn.NetConn()
//conn = tlsConn.NetConn()
}
host := map[string]string{
"Host": ml.MetaListener.Addr().String(),
"Host": conn.LocalAddr().String(),
"X-Forwarded-For": conn.RemoteAddr().String(),
"X-Forwarded-Proto": "http",
}

View File

@@ -21,37 +21,49 @@ type Mirror struct {
var _ net.Listener = &Mirror{}
func (m *Mirror) Close() error {
log.Println("Closing Mirror")
if err := m.MetaListener.Close(); err != nil {
log.Println("Error closing MetaListener:", err)
} else {
log.Println("MetaListener closed")
}
for _, onion := range m.Onions {
if err := onion.Close(); err != nil {
log.Println("Error closing Onion:", err)
} else {
log.Println("Onion closed")
}
}
for _, garlic := range m.Garlics {
if err := garlic.Close(); err != nil {
log.Println("Error closing Garlic:", err)
} else {
log.Println("Garlic closed")
}
}
log.Println("Mirror closed")
return nil
}
func NewMirror(name string) (*Mirror, error) {
log.Println("Creating new Mirror")
inner := meta.NewMetaListener()
name = strings.TrimSpace(name)
name = strings.ReplaceAll(name, " ", "")
if name == "" {
name = "mirror"
}
log.Printf("Creating new MetaListener with name: '%s'\n", name)
onion, err := onramp.NewOnion("metalistener-" + name)
if err != nil {
return nil, err
}
log.Println("Created new Onion manager")
garlic, err := onramp.NewGarlic("metalistener-"+name, "127.0.0.1:7656", onramp.OPT_WIDE)
if err != nil {
return nil, err
}
log.Println("Created new Garlic manager")
_, port, err := net.SplitHostPort(name)
if err != nil {
port = "3000"
@@ -65,6 +77,7 @@ func NewMirror(name string) (*Mirror, error) {
Onions: onions,
Garlics: garlics,
}
log.Printf("Mirror created with name: '%s' and port: '%s', '%s'\n", name, port, ml.MetaListener.Addr().String())
return ml, nil
}
@@ -81,7 +94,7 @@ func (ml Mirror) Listen(name, addr, certdir string, hiddenTls bool) (net.Listene
port = "3000"
}
if strings.HasSuffix(port, "22") {
log.Println("Port ends with 22, setting hiddenTls to true")
log.Println("Port ends with 22, setting hiddenTls to false")
log.Println("This is a workaround for the fact that the default port for SSH is 22")
log.Println("This is so self-configuring SSH servers can be used without TLS, which would make connecting to them wierd")
hiddenTls = false
@@ -98,6 +111,7 @@ func (ml Mirror) Listen(name, addr, certdir string, hiddenTls bool) (net.Listene
log.Printf("HTTP Local listener added http://%s\n", tcpListener.Addr())
log.Println("Checking for existing onion and garlic listeners")
listenerId := fmt.Sprintf("metalistener-%s-%s", name, port)
log.Println("Listener ID:", listenerId)
// Check if onion and garlic listeners already exist
if ml.Onions[port] == nil {
// make a new onion listener

View File

@@ -14,13 +14,15 @@ import (
func main() {
host := flag.String("host", "localhost", "Host to forward connections to")
port := flag.Int("port", 8080, "Port to forward connections to")
listenPort := flag.Int("listen-port", 3002, "Port to listen for incoming connections")
domain := flag.String("domain", "i2pgit.org", "Domain name for TLS listener")
email := flag.String("email", "example@example.com", "Email address for Let's Encrypt registration")
email := flag.String("email", "", "Email address for Let's Encrypt registration")
certDir := flag.String("certdir", "./certs", "Directory for storing certificates")
hiddenTls := flag.Bool("hidden-tls", false, "Enable hidden TLS")
flag.Parse()
addr := net.JoinHostPort(*domain, fmt.Sprintf("%d", *listenPort))
// Create a new meta listener
metaListener, err := mirror.Listen(*domain, *email, *certDir, *hiddenTls)
metaListener, err := mirror.Listen(addr, *email, *certDir, *hiddenTls)
if err != nil {
panic(err)
}