316 lines
9.9 KiB
Go
316 lines
9.9 KiB
Go
package sam3
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"github.com/go-i2p/i2pkeys"
|
|
)
|
|
|
|
// Represents a streaming session.
|
|
type StreamSession struct {
|
|
samAddr string // address to the sam bridge (ipv4:port)
|
|
id string // tunnel name
|
|
conn net.Conn // connection to sam
|
|
keys i2pkeys.I2PKeys // i2p destination keys
|
|
Timeout time.Duration
|
|
Deadline time.Time
|
|
sigType string
|
|
from string
|
|
to string
|
|
}
|
|
|
|
// Read reads data from the stream.
|
|
func (s *StreamSession) Read(buf []byte) (int, error) {
|
|
return s.conn.Read(buf)
|
|
}
|
|
|
|
// Write sends data over the stream.
|
|
func (s *StreamSession) Write(data []byte) (int, error) {
|
|
return s.conn.Write(data)
|
|
}
|
|
|
|
func (s *StreamSession) SetDeadline(t time.Time) error {
|
|
log.WithField("deadline", t).Debug("Setting deadline for StreamSession")
|
|
return s.conn.SetDeadline(t)
|
|
}
|
|
|
|
func (s *StreamSession) SetReadDeadline(t time.Time) error {
|
|
log.WithField("readDeadline", t).Debug("Setting read deadline for StreamSession")
|
|
return s.conn.SetReadDeadline(t)
|
|
}
|
|
|
|
func (s *StreamSession) SetWriteDeadline(t time.Time) error {
|
|
log.WithField("writeDeadline", t).Debug("Setting write deadline for StreamSession")
|
|
return s.conn.SetWriteDeadline(t)
|
|
}
|
|
|
|
func (s *StreamSession) From() string {
|
|
return s.from
|
|
}
|
|
|
|
func (s *StreamSession) To() string {
|
|
return s.to
|
|
}
|
|
|
|
func (s *StreamSession) SignatureType() string {
|
|
return s.sigType
|
|
}
|
|
|
|
// Returns the local tunnel name of the I2P tunnel used for the stream session
|
|
func (s *StreamSession) ID() string {
|
|
return s.id
|
|
}
|
|
|
|
func (s *StreamSession) Close() error {
|
|
log.WithField("id", s.id).Debug("Closing StreamSession")
|
|
return s.conn.Close()
|
|
}
|
|
|
|
// Returns the I2P destination (the address) of the stream session
|
|
func (s *StreamSession) Addr() i2pkeys.I2PAddr {
|
|
return s.keys.Addr()
|
|
}
|
|
|
|
func (s *StreamSession) LocalAddr() net.Addr {
|
|
return s.keys.Addr()
|
|
}
|
|
|
|
// Returns the keys associated with the stream session
|
|
func (s *StreamSession) Keys() i2pkeys.I2PKeys {
|
|
return s.keys
|
|
}
|
|
|
|
// Creates a new StreamSession with the I2CP- and streaminglib options as
|
|
// specified. See the I2P documentation for a full list of options.
|
|
func (sam *SAM) NewStreamSession(id string, keys i2pkeys.I2PKeys, options []string) (*StreamSession, error) {
|
|
log.WithFields(logrus.Fields{"id": id, "options": options}).Debug("Creating new StreamSession")
|
|
conn, err := sam.newGenericSession("STREAM", id, keys, options, []string{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.WithField("id", id).Debug("Created new StreamSession")
|
|
return &StreamSession{sam.SAMEmit.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), Sig_NONE, "0", "0"}, nil
|
|
}
|
|
|
|
// Creates a new StreamSession with the I2CP- and streaminglib options as
|
|
// specified. See the I2P documentation for a full list of options.
|
|
func (sam *SAM) NewStreamSessionWithSignature(id string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
|
|
log.WithFields(logrus.Fields{"id": id, "options": options, "sigType": sigType}).Debug("Creating new StreamSession with signature")
|
|
conn, err := sam.newGenericSessionWithSignature("STREAM", id, keys, sigType, options, []string{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.WithFields(logrus.Fields{"id": id, "sigType": sigType}).Debug("Created new StreamSession with signature")
|
|
return &StreamSession{sam.SAMEmit.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, "0", "0"}, nil
|
|
}
|
|
|
|
// Creates a new StreamSession with the I2CP- and streaminglib options as
|
|
// specified. See the I2P documentation for a full list of options.
|
|
func (sam *SAM) NewStreamSessionWithSignatureAndPorts(id, from, to string, keys i2pkeys.I2PKeys, options []string, sigType string) (*StreamSession, error) {
|
|
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "options": options, "sigType": sigType}).Debug("Creating new StreamSession with signature and ports")
|
|
conn, err := sam.newGenericSessionWithSignatureAndPorts("STREAM", id, from, to, keys, sigType, options, []string{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "sigType": sigType}).Debug("Created new StreamSession with signature and ports")
|
|
return &StreamSession{sam.SAMEmit.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, from, to}, nil
|
|
}
|
|
|
|
// lookup name, convenience function
|
|
func (s *StreamSession) Lookup(name string) (i2pkeys.I2PAddr, error) {
|
|
log.WithField("name", name).Debug("Looking up address")
|
|
sam, err := NewSAM(s.samAddr)
|
|
if err == nil {
|
|
addr, err := sam.Lookup(name)
|
|
defer sam.Close()
|
|
if err != nil {
|
|
log.WithError(err).Error("Lookup failed")
|
|
} else {
|
|
log.WithField("addr", addr).Debug("Lookup successful")
|
|
}
|
|
return addr, err
|
|
}
|
|
log.WithError(err).Error("Failed to create SAM instance for lookup")
|
|
return i2pkeys.I2PAddr(""), err
|
|
}
|
|
|
|
// context-aware dialer, eventually...
|
|
func (s *StreamSession) DialContext(ctx context.Context, n, addr string) (net.Conn, error) {
|
|
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("DialContext called")
|
|
return s.DialContextI2P(ctx, n, addr)
|
|
}
|
|
|
|
// context-aware dialer, eventually...
|
|
func (s *StreamSession) DialContextI2P(ctx context.Context, n, addr string) (*SAMConn, error) {
|
|
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("DialContextI2P called")
|
|
if ctx == nil {
|
|
log.Panic("nil context")
|
|
panic("nil context")
|
|
}
|
|
deadline := s.deadline(ctx, time.Now())
|
|
if !deadline.IsZero() {
|
|
if d, ok := ctx.Deadline(); !ok || deadline.Before(d) {
|
|
subCtx, cancel := context.WithDeadline(ctx, deadline)
|
|
defer cancel()
|
|
ctx = subCtx
|
|
}
|
|
}
|
|
|
|
i2paddr, err := i2pkeys.NewI2PAddrFromString(addr)
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to create I2P address from string")
|
|
return nil, err
|
|
}
|
|
return s.DialI2P(i2paddr)
|
|
}
|
|
|
|
/*
|
|
func (s *StreamSession) Cancel() chan *StreamSession {
|
|
ch := make(chan *StreamSession)
|
|
ch <- s
|
|
return ch
|
|
}*/
|
|
|
|
func minNonzeroTime(a, b time.Time) time.Time {
|
|
if a.IsZero() {
|
|
return b
|
|
}
|
|
if b.IsZero() || a.Before(b) {
|
|
return a
|
|
}
|
|
return b
|
|
}
|
|
|
|
// deadline returns the earliest of:
|
|
// - now+Timeout
|
|
// - d.Deadline
|
|
// - the context's deadline
|
|
//
|
|
// Or zero, if none of Timeout, Deadline, or context's deadline is set.
|
|
func (s *StreamSession) deadline(ctx context.Context, now time.Time) (earliest time.Time) {
|
|
if s.Timeout != 0 { // including negative, for historical reasons
|
|
earliest = now.Add(s.Timeout)
|
|
}
|
|
if d, ok := ctx.Deadline(); ok {
|
|
earliest = minNonzeroTime(earliest, d)
|
|
}
|
|
return minNonzeroTime(earliest, s.Deadline)
|
|
}
|
|
|
|
// implement net.Dialer
|
|
func (s *StreamSession) Dial(n, addr string) (c net.Conn, err error) {
|
|
log.WithFields(logrus.Fields{"network": n, "addr": addr}).Debug("Dial called")
|
|
|
|
var i2paddr i2pkeys.I2PAddr
|
|
var host string
|
|
host, _, err = SplitHostPort(addr)
|
|
// log.Println("Dialing:", host)
|
|
if err = IgnorePortError(err); err == nil {
|
|
// check for name
|
|
if strings.HasSuffix(host, ".b32.i2p") || strings.HasSuffix(host, ".i2p") {
|
|
// name lookup
|
|
i2paddr, err = s.Lookup(host)
|
|
log.WithFields(logrus.Fields{"host": host, "i2paddr": i2paddr}).Debug("Looked up I2P address")
|
|
} else {
|
|
// probably a destination
|
|
i2paddr, err = i2pkeys.NewI2PAddrFromBytes([]byte(host))
|
|
// i2paddr = i2pkeys.I2PAddr(host)
|
|
// log.Println("Destination:", i2paddr, err)
|
|
log.WithFields(logrus.Fields{"host": host, "i2paddr": i2paddr}).Debug("Created I2P address from bytes")
|
|
}
|
|
if err == nil {
|
|
return s.DialI2P(i2paddr)
|
|
}
|
|
}
|
|
log.WithError(err).Error("Dial failed")
|
|
return
|
|
}
|
|
|
|
// Dials to an I2P destination and returns a SAMConn, which implements a net.Conn.
|
|
func (s *StreamSession) DialI2P(addr i2pkeys.I2PAddr) (*SAMConn, error) {
|
|
log.WithField("addr", addr).Debug("DialI2P called")
|
|
sam, err := NewSAM(s.samAddr)
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to create new SAM instance")
|
|
return nil, err
|
|
}
|
|
conn := sam.conn
|
|
cmd := fmt.Sprintf("STREAM CONNECT ID=%s DESTINATION=%s FROM_PORT=%s TO_PORT=%s SILENT=false\n",
|
|
s.id,
|
|
addr.Base64(),
|
|
s.from,
|
|
s.to)
|
|
_, err = conn.Write([]byte(cmd))
|
|
if err != nil {
|
|
log.WithError(err).Error("Failed to write STREAM CONNECT command")
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
buf := make([]byte, 4096)
|
|
n, err := conn.Read(buf)
|
|
if err != nil && err != io.EOF {
|
|
log.WithError(err).Error("Failed to write STREAM CONNECT command")
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
scanner := bufio.NewScanner(bytes.NewReader(buf[:n]))
|
|
scanner.Split(bufio.ScanWords)
|
|
for scanner.Scan() {
|
|
switch scanner.Text() {
|
|
case "STREAM":
|
|
continue
|
|
case "STATUS":
|
|
continue
|
|
case "RESULT=OK":
|
|
log.Debug("Successfully connected to I2P destination")
|
|
return &SAMConn{s.keys.Addr(), addr, conn}, nil
|
|
case "RESULT=CANT_REACH_PEER":
|
|
log.Error("Can't reach peer")
|
|
conn.Close()
|
|
return nil, errors.New("Can not reach peer")
|
|
case "RESULT=I2P_ERROR":
|
|
log.Error("I2P internal error")
|
|
conn.Close()
|
|
return nil, errors.New("I2P internal error")
|
|
case "RESULT=INVALID_KEY":
|
|
log.Error("Invalid key - Stream Session")
|
|
conn.Close()
|
|
return nil, errors.New("Invalid key - Stream Session")
|
|
case "RESULT=INVALID_ID":
|
|
log.Error("Invalid tunnel ID")
|
|
conn.Close()
|
|
return nil, errors.New("Invalid tunnel ID")
|
|
case "RESULT=TIMEOUT":
|
|
log.Error("Connection timeout")
|
|
conn.Close()
|
|
return nil, errors.New("Timeout")
|
|
default:
|
|
log.WithField("error", scanner.Text()).Error("Unknown error")
|
|
conn.Close()
|
|
return nil, fmt.Errorf("Unknown error: %s : %s", scanner.Text(), string(buf[:n]))
|
|
}
|
|
}
|
|
log.Panic("Unexpected end of StreamSession.DialI2P()")
|
|
panic("sam3 go library error in StreamSession.DialI2P()")
|
|
}
|
|
|
|
// create a new stream listener to accept inbound connections
|
|
func (s *StreamSession) Listen() (*StreamListener, error) {
|
|
log.WithFields(logrus.Fields{"id": s.id, "laddr": s.keys.Addr()}).Debug("Creating new StreamListener")
|
|
return &StreamListener{
|
|
session: s,
|
|
id: s.id,
|
|
laddr: s.keys.Addr(),
|
|
}, nil
|
|
}
|