Work on refactoring and potential concurrency issues, but I think primary is broken on master, users should stick to tagged versions unless they want to help

This commit is contained in:
eyedeekay
2024-11-23 00:47:49 -05:00
parent d8dafef068
commit dd2d0c029f
7 changed files with 260 additions and 221 deletions

212
config.go
View File

@ -1,17 +1,35 @@
package sam3
import (
"fmt"
"math/rand"
"net"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
"github.com/go-i2p/i2pkeys"
)
const DEFAULT_LEASESET_TYPE = "i2cp.leaseSetEncType=4"
type SessionOptions struct {
Style string
SignatureType string
FromPort string
ToPort string
Protocol string
UDPPort int
}
// Add transport options
type TransportOptions struct {
UseCompression string
FastReceive string
Reliability string
IdleTimeout time.Duration
}
// I2PConfig is a struct which manages I2P configuration options
type I2PConfig struct {
SamHost string
@ -21,11 +39,8 @@ type I2PConfig struct {
SamMin string
SamMax string
Fromport string
Toport string
Style string
TunType string
SessionOptions
TransportOptions
DestinationKeys i2pkeys.I2PKeys
@ -45,11 +60,7 @@ type I2PConfig struct {
OutVariance string
InBackupQuantity string
OutBackupQuantity string
FastRecieve string
UseCompression string
MessageReliability string
CloseIdle string
CloseIdleTime string
ReduceIdle string
ReduceIdleTime string
ReduceIdleQuantity string
@ -133,9 +144,9 @@ func (f *I2PConfig) FromPort() string {
log.Debug("SAM version < 3.1, FromPort not applicable")
return ""
}
if f.Fromport != "0" {
log.WithField("fromPort", f.Fromport).Debug("FromPort set")
return " FROM_PORT=" + f.Fromport + " "
if f.SessionOptions.FromPort != "0" {
log.WithField("fromPort", f.SessionOptions.FromPort).Debug("FromPort set")
return " FROM_PORT=" + f.SessionOptions.FromPort + " "
}
log.Debug("FromPort not set")
return ""
@ -147,9 +158,9 @@ func (f *I2PConfig) ToPort() string {
log.Debug("SAM version < 3.1, ToPort not applicable")
return ""
}
if f.Toport != "0" {
log.WithField("toPort", f.Toport).Debug("ToPort set")
return " TO_PORT=" + f.Toport + " "
if f.SessionOptions.ToPort != "0" {
log.WithField("toPort", f.SessionOptions.ToPort).Debug("ToPort set")
return " TO_PORT=" + f.SessionOptions.ToPort + " "
}
log.Debug("ToPort not set")
return ""
@ -157,9 +168,9 @@ func (f *I2PConfig) ToPort() string {
// SessionStyle returns the session style setting in the form of "STYLE=style"
func (f *I2PConfig) SessionStyle() string {
if f.Style != "" {
log.WithField("style", f.Style).Debug("Session style set")
return " STYLE=" + f.Style + " "
if f.SessionOptions.Style != "" {
log.WithField("style", f.SessionOptions.Style).Debug("Session style set")
return " STYLE=" + f.SessionOptions.Style + " "
}
log.Debug("Using default STREAM style")
return " STYLE=STREAM "
@ -178,8 +189,8 @@ func (f *I2PConfig) samMax() float64 {
// MinSAM returns the minimum SAM version required in major.minor form
func (f *I2PConfig) MinSAM() string {
if f.SamMin == "" {
log.Debug("Using default MinSAM: 3.0")
return "3.0"
log.Debug("Using default MinSAM: 3.1")
return "3.1"
}
log.WithField("minSAM", f.SamMin).Debug("MinSAM set")
return f.SamMin
@ -188,8 +199,8 @@ func (f *I2PConfig) MinSAM() string {
// MaxSAM returns the maximum SAM version required in major.minor form
func (f *I2PConfig) MaxSAM() string {
if f.SamMax == "" {
log.Debug("Using default MaxSAM: 3.1")
return "3.1"
log.Debug("Using default MaxSAM: 3.3")
return "3.3"
}
log.WithField("maxSAM", f.SamMax).Debug("MaxSAM set")
return f.SamMax
@ -231,9 +242,9 @@ func (f *I2PConfig) EncryptLease() string {
// Reliability returns the message reliability setting in the form of "i2cp.messageReliability=reliability"
func (f *I2PConfig) Reliability() string {
if f.MessageReliability != "" {
log.WithField("reliability", f.MessageReliability).Debug("Message reliability set")
return " i2cp.messageReliability=" + f.MessageReliability + " "
if f.TransportOptions.Reliability != "" {
log.WithField("reliability", f.TransportOptions.Reliability).Debug("Message reliability set")
return " i2cp.messageReliability=" + f.TransportOptions.Reliability + " "
}
log.Debug("Message reliability not set")
return ""
@ -258,9 +269,9 @@ func (f *I2PConfig) Close() string {
if f.CloseIdle == "true" {
log.WithFields(logrus.Fields{
"closeIdle": f.CloseIdle,
"closeIdleTime": f.CloseIdleTime,
"closeIdleTime": f.TransportOptions.IdleTimeout.String(),
}).Debug("Close idle settings applied")
return "i2cp.closeOnIdle=" + f.CloseIdle + "i2cp.closeIdleTime=" + f.CloseIdleTime
return "i2cp.closeOnIdle=" + f.CloseIdle + "i2cp.closeIdleTime=" + f.TransportOptions.IdleTimeout.String()
}
log.Debug("Close idle settings not applied")
return ""
@ -275,8 +286,8 @@ func (f *I2PConfig) DoZero() string {
if f.OutAllowZeroHop == "true" {
r += " outbound.allowZeroHop= " + f.OutAllowZeroHop + " "
}
if f.FastRecieve == "true" {
r += " " + f.FastRecieve + " "
if f.TransportOptions.FastReceive == "true" {
r += " " + f.TransportOptions.FastReceive + " "
}
log.WithField("zeroHopSettings", r).Debug("Zero hop settings applied")
return r
@ -297,7 +308,7 @@ func (f *I2PConfig) Print() []string {
"outbound.quantity=" + f.OutQuantity,
f.DoZero(),
//"i2cp.fastRecieve=" + f.FastRecieve,
"i2cp.gzip=" + f.UseCompression,
"i2cp.gzip=" + f.TransportOptions.UseCompression,
f.Reduce(),
f.Close(),
f.Reliability(),
@ -355,41 +366,48 @@ func (f *I2PConfig) LeaseSetEncryptionType() string {
return "i2cp.leaseSetEncType=" + f.LeaseSetEncryption
}
const DEFAULT_LEASESET_TYPE = "i2cp.leaseSetEncType=4"
// NewConfig returns a new config with default values or updates them with functional arguments
func NewConfig(opts ...func(*I2PConfig) error) (*I2PConfig, error) {
var config I2PConfig
config.SamHost = "127.0.0.1"
config.SamPort = "7656"
config.SamMin = "3.0"
config.SamMax = "3.3"
config.TunName = ""
config.TunType = "server"
config.Style = "STREAM"
config.InLength = "3"
config.OutLength = "3"
config.InQuantity = "2"
config.OutQuantity = "2"
config.InVariance = "1"
config.OutVariance = "1"
config.InBackupQuantity = "3"
config.OutBackupQuantity = "3"
config.InAllowZeroHop = "false"
config.OutAllowZeroHop = "false"
config.EncryptLeaseSet = "false"
config.LeaseSetKey = ""
config.LeaseSetPrivateKey = ""
config.LeaseSetPrivateSigningKey = ""
config.FastRecieve = "false"
config.UseCompression = "true"
config.ReduceIdle = "false"
config.ReduceIdleTime = "15"
config.ReduceIdleQuantity = "4"
config.CloseIdle = "false"
config.CloseIdleTime = "300000"
config.MessageReliability = "none"
config.LeaseSetEncryption = DEFAULT_LEASESET_TYPE
config := I2PConfig{
SamHost: "127.0.0.1",
SamPort: "7656",
SamMin: "3.0",
SamMax: "3.3",
TunName: "",
InLength: "3",
OutLength: "3",
InQuantity: "2",
OutQuantity: "2",
InVariance: "1",
OutVariance: "1",
InBackupQuantity: "3",
OutBackupQuantity: "3",
InAllowZeroHop: "false",
OutAllowZeroHop: "false",
EncryptLeaseSet: "false",
LeaseSetKey: "",
LeaseSetPrivateKey: "",
LeaseSetPrivateSigningKey: "",
ReduceIdle: "false",
ReduceIdleTime: "15",
ReduceIdleQuantity: "1",
CloseIdle: "false",
LeaseSetEncryption: DEFAULT_LEASESET_TYPE,
SessionOptions: SessionOptions{
Style: "STREAM",
SignatureType: "EdDSA_SHA512_Ed25519",
FromPort: "",
ToPort: "",
Protocol: "",
UDPPort: 0,
},
TransportOptions: TransportOptions{
UseCompression: "true",
FastReceive: "false",
Reliability: "none",
IdleTimeout: 5 * time.Minute,
},
}
for _, o := range opts {
if err := o(&config); err != nil {
return nil, err
@ -397,67 +415,3 @@ func NewConfig(opts ...func(*I2PConfig) error) (*I2PConfig, error) {
}
return &config, nil
}
// Options a map of options
type Options map[string]string
// AsList obtain sam options as list of strings
func (opts Options) AsList() (ls []string) {
for k, v := range opts {
ls = append(ls, fmt.Sprintf("%s=%s", k, v))
}
return
}
// Config is the config type for the sam connector api for i2p which allows applications to 'speak' with i2p
type Config struct {
Addr string
Opts Options
Session string
Keyfile string
}
// StreamSession create new sam connector from config with a stream session
func (cfg *Config) StreamSession() (session *StreamSession, err error) {
// connect
var s *SAM
s, err = NewSAM(cfg.Addr)
if err == nil {
// ensure keys exist
var keys i2pkeys.I2PKeys
keys, err = s.EnsureKeyfile(cfg.Keyfile)
if err == nil {
// create session
session, err = s.NewStreamSession(cfg.Session, keys, cfg.Opts.AsList())
}
}
return
}
// DatagramSession create new sam datagram session from config
func (cfg *Config) DatagramSession() (session *DatagramSession, err error) {
// connect
var s *SAM
s, err = NewSAM(cfg.Addr)
if err == nil {
// ensure keys exist
var keys i2pkeys.I2PKeys
keys, err = s.EnsureKeyfile(cfg.Keyfile)
if err == nil {
// determine udp port
var portstr string
_, portstr, err = net.SplitHostPort(cfg.Addr)
if IgnorePortError(err) == nil {
var port int
port, err = strconv.Atoi(portstr)
if err == nil && port > 0 {
// udp port is 1 lower
port--
// create session
session, err = s.NewDatagramSession(cfg.Session, keys, cfg.Opts.AsList(), port)
}
}
}
}
return
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"net"
"strconv"
"sync"
"time"
"github.com/sirupsen/logrus"
@ -146,8 +147,14 @@ func (s *DatagramSession) RemoteAddr() net.Addr {
// implements net.PacketConn
func (s *DatagramSession) ReadFrom(b []byte) (n int, addr net.Addr, err error) {
log.Debug("Reading datagram")
// extra bytes to read the remote address of incomming datagram
buf := make([]byte, len(b)+4096)
// Use sync.Pool for buffers
bufPool := sync.Pool{
New: func() interface{} {
return make([]byte, len(b)+4096)
},
}
buf := bufPool.Get().([]byte)
defer bufPool.Put(buf)
for {
// very basic protection: only accept incomming UDP messages from the IP of the SAM bridge

View File

@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
)
@ -235,7 +236,7 @@ func SetLeaseSetPrivateSigningKey(s string) func(*SAMEmit) error {
// SetMessageReliability sets the host of the SAMEmit's SAM bridge
func SetMessageReliability(s string) func(*SAMEmit) error {
return func(c *SAMEmit) error {
c.I2PConfig.MessageReliability = s
c.I2PConfig.TransportOptions.Reliability = s
log.WithField("messageReliability", s).Debug("Set message reliability")
return nil
}
@ -284,10 +285,10 @@ func SetCompress(b bool) func(*SAMEmit) error {
func SetFastRecieve(b bool) func(*SAMEmit) error {
return func(c *SAMEmit) error {
if b {
c.I2PConfig.FastRecieve = "true"
c.I2PConfig.TransportOptions.FastReceive = "true"
return nil
}
c.I2PConfig.FastRecieve = "false"
c.I2PConfig.TransportOptions.FastReceive = "false"
log.WithField("fastReceive", b).Debug("Set fast receive")
return nil
}
@ -363,10 +364,10 @@ func SetCloseIdle(b bool) func(*SAMEmit) error {
// SetCloseIdleTime sets the time to wait before closing tunnels to idle levels
func SetCloseIdleTime(u int) func(*SAMEmit) error {
return func(c *SAMEmit) error {
c.I2PConfig.CloseIdleTime = "300000"
c.I2PConfig.TransportOptions.IdleTimeout = 300000
if u >= 6 {
idleTime := strconv.Itoa((u * 60) * 1000)
c.I2PConfig.CloseIdleTime = idleTime
idleTime := (u * 60) * 1000
c.I2PConfig.TransportOptions.IdleTimeout = time.Duration(idleTime)
log.WithFields(logrus.Fields{
"minutes": u,
"milliseconds": idleTime,
@ -381,9 +382,9 @@ func SetCloseIdleTime(u int) func(*SAMEmit) error {
// SetCloseIdleTimeMs sets the time to wait before closing tunnels to idle levels in milliseconds
func SetCloseIdleTimeMs(u int) func(*SAMEmit) error {
return func(c *SAMEmit) error {
c.I2PConfig.CloseIdleTime = "300000"
c.I2PConfig.TransportOptions.IdleTimeout = 300000
if u >= 300000 {
c.I2PConfig.CloseIdleTime = strconv.Itoa(u)
c.I2PConfig.TransportOptions.IdleTimeout = time.Duration(u)
log.WithField("closeIdleTimeMs", u).Debug("Set close idle time in milliseconds")
return nil
}

22
emit.go
View File

@ -100,6 +100,28 @@ func (e *SAMEmit) AcceptBytes() []byte {
return []byte(e.Accept())
}
// Extend SAMEmit with additional command generation
func (e *SAMEmit) GenerateSessionCmd(style string) string {
cmd := &strings.Builder{}
cmd.WriteString("SESSION CREATE ")
cmd.WriteString(e.I2PConfig.SessionStyle())
if e.I2PConfig.FromPort() != "" {
cmd.WriteString(" FROM_PORT=" + e.I2PConfig.FromPort())
}
if e.I2PConfig.ToPort() != "" {
cmd.WriteString(" TO_PORT=" + e.I2PConfig.ToPort())
}
cmd.WriteString(e.I2PConfig.ID())
cmd.WriteString(e.I2PConfig.DestinationKey())
cmd.WriteString(e.OptStr())
cmd.WriteString("\n")
return cmd.String()
}
func NewEmit(opts ...func(*SAMEmit) error) (*SAMEmit, error) {
var emit SAMEmit
for _, o := range opts {

View File

@ -7,6 +7,7 @@ import (
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
@ -39,6 +40,7 @@ type PrimarySession struct {
Config SAMEmit
stsess map[string]*StreamSession
dgsess map[string]*DatagramSession
sync.RWMutex
// from string
// to string
}
@ -96,32 +98,39 @@ func (sam *PrimarySession) Dial(network, addr string) (net.Conn, error) {
// DialTCP implements x/dialer
func (sam *PrimarySession) DialTCP(network string, laddr, raddr net.Addr) (net.Conn, error) {
log.WithFields(logrus.Fields{"network": network, "laddr": laddr, "raddr": raddr}).Debug("DialTCP() called")
sam.RLock()
ts, ok := sam.stsess[network+raddr.String()[0:4]]
var err error
sam.RUnlock()
if !ok {
ts, err = sam.NewUniqueStreamSubSession(network + raddr.String()[0:4])
sam.Lock()
ts, err := sam.NewUniqueStreamSubSession(network + raddr.String()[0:4])
if err != nil {
log.WithError(err).Error("Failed to create new unique stream sub-session")
sam.Unlock()
return nil, err
}
sam.stsess[network+raddr.String()[0:4]] = ts
ts, _ = sam.stsess[network+raddr.String()[0:4]]
sam.Unlock()
}
return ts.Dial(network, raddr.String())
}
func (sam *PrimarySession) DialTCPI2P(network, laddr, raddr string) (net.Conn, error) {
log.WithFields(logrus.Fields{"network": network, "laddr": laddr, "raddr": raddr}).Debug("DialTCPI2P() called")
sam.RLock()
ts, ok := sam.stsess[network+raddr[0:4]]
var err error
sam.RUnlock()
if !ok {
ts, err = sam.NewUniqueStreamSubSession(network + laddr)
sam.Lock()
ts, err := sam.NewUniqueStreamSubSession(network + laddr)
if err != nil {
log.WithError(err).Error("Failed to create new unique stream sub-session")
sam.Unlock()
return nil, err
}
sam.stsess[network+raddr[0:4]] = ts
ts, _ = sam.stsess[network+raddr[0:4]]
sam.Unlock()
}
return ts.Dial(network, raddr)
}
@ -129,32 +138,38 @@ func (sam *PrimarySession) DialTCPI2P(network, laddr, raddr string) (net.Conn, e
// DialUDP implements x/dialer
func (sam *PrimarySession) DialUDP(network string, laddr, raddr net.Addr) (net.PacketConn, error) {
log.WithFields(logrus.Fields{"network": network, "laddr": laddr, "raddr": raddr}).Debug("DialUDP() called")
sam.RLock()
ds, ok := sam.dgsess[network+raddr.String()[0:4]]
var err error
sam.RUnlock()
if !ok {
ds, err = sam.NewDatagramSubSession(network+raddr.String()[0:4], 0)
sam.Lock()
ds, err := sam.NewDatagramSubSession(network+raddr.String()[0:4], 0)
if err != nil {
log.WithError(err).Error("Failed to create new datagram sub-session")
sam.Unlock()
return nil, err
}
sam.dgsess[network+raddr.String()[0:4]] = ds
ds, _ = sam.dgsess[network+raddr.String()[0:4]]
sam.Unlock()
}
return ds.Dial(network, raddr.String())
}
func (sam *PrimarySession) DialUDPI2P(network, laddr, raddr string) (*DatagramSession, error) {
log.WithFields(logrus.Fields{"network": network, "laddr": laddr, "raddr": raddr}).Debug("DialUDPI2P() called")
sam.RLock()
ds, ok := sam.dgsess[network+raddr[0:4]]
var err error
sam.RUnlock()
if !ok {
ds, err = sam.NewDatagramSubSession(network+laddr, 0)
sam.Lock()
ds, err := sam.NewDatagramSubSession(network+laddr, 0)
if err != nil {
log.WithError(err).Error("Failed to create new datagram sub-session")
sam.Unlock()
return nil, err
}
sam.dgsess[network+raddr[0:4]] = ds
ds, _ = sam.dgsess[network+raddr[0:4]]
sam.Unlock()
}
return ds.Dial(network, raddr)
}
@ -209,7 +224,19 @@ func (sam *SAM) newPrimarySession(primarySessionSwitch, id string, keys i2pkeys.
}
ssesss := make(map[string]*StreamSession)
dsesss := make(map[string]*DatagramSession)
return &PrimarySession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), Sig_NONE, sam.Config, ssesss, dsesss}, nil
return &PrimarySession{
samAddr: sam.Config.I2PConfig.Sam(),
id: id,
conn: conn,
keys: keys,
Timeout: time.Duration(600 * time.Second),
Deadline: time.Now(),
sigType: Sig_NONE,
Config: sam.Config,
stsess: ssesss,
dgsess: dsesss,
RWMutex: sync.RWMutex{},
}, nil
}
// Creates a new PrimarySession with the I2CP- and PRIMARYinglib options as
@ -228,7 +255,19 @@ func (sam *SAM) NewPrimarySessionWithSignature(id string, keys i2pkeys.I2PKeys,
}
ssesss := make(map[string]*StreamSession)
dsesss := make(map[string]*DatagramSession)
return &PrimarySession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, sam.Config, ssesss, dsesss}, nil
return &PrimarySession{
samAddr: sam.Config.I2PConfig.Sam(),
id: id,
conn: conn,
keys: keys,
Timeout: time.Duration(600 * time.Second),
Deadline: time.Now(),
sigType: sigType,
Config: sam.Config,
stsess: ssesss,
dgsess: dsesss,
RWMutex: sync.RWMutex{},
}, nil
}
// Creates a new session with the style of either "STREAM", "DATAGRAM" or "RAW",

View File

@ -41,14 +41,14 @@ func Test_PrimaryStreamingDial(t *testing.T) {
}
defer ss.Close()
fmt.Println("\tNotice: This may fail if your I2P node is not well integrated in the I2P network.")
fmt.Println("\tLooking up i2p-projekt.i2p")
forumAddr, err := earlysam.Lookup("i2p-projekt.i2p")
fmt.Println("\tLooking up idk.i2p")
forumAddr, err := earlysam.Lookup("idk.i2p")
if err != nil {
fmt.Println(err.Error())
t.Fail()
return
}
fmt.Println("\tDialing i2p-projekt.i2p(", forumAddr.Base32(), forumAddr.DestHash().Hash(), ")")
fmt.Println("\tDialing idk.i2p(", forumAddr.Base32(), forumAddr.DestHash().Hash(), ")")
conn, err := ss.DialI2P(forumAddr)
if err != nil {
fmt.Println(err.Error())
@ -65,9 +65,9 @@ func Test_PrimaryStreamingDial(t *testing.T) {
buf := make([]byte, 4096)
n, err := conn.Read(buf)
if !strings.Contains(strings.ToLower(string(buf[:n])), "http") && !strings.Contains(strings.ToLower(string(buf[:n])), "html") {
fmt.Printf("\tProbably failed to StreamSession.DialI2P(i2p-projekt.i2p)? It replied %d bytes, but nothing that looked like http/html", n)
fmt.Printf("\tProbably failed to StreamSession.DialI2P(idk.i2p)? It replied %d bytes, but nothing that looked like http/html", n)
} else {
fmt.Println("\tRead HTTP/HTML from i2p-projekt.i2p")
fmt.Println("\tRead HTTP/HTML from idk.i2p")
}
}

View File

@ -2,11 +2,13 @@ package sam3
import (
"bufio"
"context"
"errors"
"io"
"net"
"strconv"
"strings"
"time"
"github.com/sirupsen/logrus"
@ -83,79 +85,93 @@ func ExtractPairInt(input, value string) int {
func ExtractDest(input string) string {
log.WithField("input", input).Debug("ExtractDest called")
dest := strings.Split(input, " ")[0]
log.WithField("dest", dest).Debug("Destination extracted")
log.WithField("dest", dest).Debug("Destination extracted", dest)
return strings.Split(input, " ")[0]
}
// accept a new inbound connection
func (l *StreamListener) AcceptI2P() (*SAMConn, error) {
log.Debug("StreamListener.AcceptI2P() called")
s, err := NewSAM(l.session.samAddr)
if err == nil {
log.Debug("Connected to SAM bridge")
// we connected to sam
// send accept() command
_, err = io.WriteString(s.conn, "STREAM ACCEPT ID="+l.id+" SILENT=false\n")
if err != nil {
log.WithError(err).Error("Failed to send STREAM ACCEPT command")
s.Close()
return nil, err
}
// read reply
rd := bufio.NewReader(s.conn)
// read first line
line, err := rd.ReadString(10)
if err != nil {
log.WithError(err).Error("Failed to read SAM bridge response")
s.Close()
return nil, err
}
log.WithField("response", line).Debug("Received SAM bridge response")
log.Println(line)
if strings.HasPrefix(line, "STREAM STATUS RESULT=OK") {
// we gud read destination line
destline, err := rd.ReadString(10)
if err != nil {
if err == io.EOF {
return nil, errors.New("connection closed after OK")
}
return nil, errors.New("error reading destination: " + err.Error())
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Validate destination format
dest := ExtractDest(destline)
if !strings.HasPrefix(dest, "DEST=") {
return nil, errors.New("invalid destination format")
}
if err == nil {
dest := ExtractDest(destline)
l.session.from = ExtractPairString(destline, "FROM_PORT")
l.session.to = ExtractPairString(destline, "TO_PORT")
// return wrapped connection
dest = strings.Trim(dest, "\n")
log.WithFields(logrus.Fields{
"dest": dest,
"from": l.session.from,
"to": l.session.to,
}).Debug("Accepted new I2P connection")
return &SAMConn{
laddr: l.laddr,
raddr: i2pkeys.I2PAddr(dest),
Conn: s.conn,
}, nil
} else {
log.WithError(err).Error("Failed to read destination line")
done := make(chan struct{})
var conn *SAMConn
var err error
go func() {
defer close(done)
log.Debug("StreamListener.AcceptI2P() called")
s, err := NewSAM(l.session.samAddr)
if err == nil {
log.Debug("Connected to SAM bridge")
// we connected to sam
// send accept() command
_, err = io.WriteString(s.conn, "STREAM ACCEPT ID="+l.id+" SILENT=false\n")
if err != nil {
log.WithError(err).Error("Failed to send STREAM ACCEPT command")
s.Close()
return nil, err
}
// read reply
rd := bufio.NewReader(s.conn)
// read first line
line, err := rd.ReadString(10)
if err != nil {
log.WithError(err).Error("Failed to read SAM bridge response")
s.Close()
return
}
log.WithField("response", line).Debug("Received SAM bridge response")
log.Println(line)
if strings.HasPrefix(line, "STREAM STATUS RESULT=OK") {
// we gud read destination line
destline, err := rd.ReadString(10)
if err != nil {
if err == io.EOF {
err = errors.New("connection closed after OK")
}
err = errors.New("error reading destination: " + err.Error())
}
if err == nil {
// Validate destination format
dest := ExtractDest(destline)
if !strings.HasPrefix(dest, "") {
err = errors.New("invalid destination format")
}
l.session.from = ExtractPairString(destline, "FROM_PORT")
l.session.to = ExtractPairString(destline, "TO_PORT")
// return wrapped connection
dest = strings.Trim(dest, "\n")
log.WithFields(logrus.Fields{
"dest": dest,
"from": l.session.from,
"to": l.session.to,
}).Debug("Accepted new I2P connection")
conn = &SAMConn{
laddr: l.laddr,
raddr: i2pkeys.I2PAddr(dest),
Conn: s.conn,
}
err = nil
} else {
log.WithError(err).Error("Failed to read destination line")
s.Close()
return
}
} else {
log.WithField("line", line).Error("Invalid SAM response")
s.Close()
err = errors.New("invalid sam line: " + line)
}
} else {
log.WithField("line", line).Error("Invalid SAM response")
log.WithError(err).Error("Failed to connect to SAM bridge")
s.Close()
return nil, errors.New("invalid sam line: " + line)
}
} else {
log.WithError(err).Error("Failed to connect to SAM bridge")
s.Close()
return nil, err
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-done:
return conn, err
}
}