automatically chunk datagrams

This commit is contained in:
eyedeekay
2024-11-23 14:19:02 -05:00
parent dd2d0c029f
commit ac4ef13405
5 changed files with 99 additions and 49 deletions

View File

@ -210,28 +210,75 @@ const (
// writing, maximum size is 31 kilobyte, but this may change in the future.
// Implements net.PacketConn.
func (s *DatagramSession) WriteTo(b []byte, addr net.Addr) (n int, err error) {
log.WithFields(logrus.Fields{
"addr": addr,
"datagramLen": len(b),
}).Debug("Writing datagram")
if len(b) > MAX_DATAGRAM_SIZE {
return 0, errors.New("datagram exceeds maximum size")
}
if len(b) > RECOMMENDED_SIZE {
log.Warning("datagram exceeds recommended size of 11KB")
}
if s.DatagramOptions != nil {
return s.writeToWithOptions(b, addr.(i2pkeys.I2PAddr))
}
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, b...)
n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr)
if err != nil {
log.WithError(err).Error("Failed to write to UDP")
} else {
log.WithField("bytesWritten", n).Debug("Datagram written successfully")
}
return n, err
log.WithFields(logrus.Fields{
"addr": addr,
"datagramLen": len(b),
}).Debug("Writing datagram")
if len(b) > MAX_DATAGRAM_SIZE {
return 0, errors.New("datagram exceeds maximum size")
}
// Use chunking for anything above recommended size
if len(b) > RECOMMENDED_SIZE {
return s.writeChunked(b, addr)
}
// Single message path
if s.DatagramOptions != nil {
return s.writeToWithOptions(b, addr.(i2pkeys.I2PAddr))
}
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, b...)
n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr)
if err != nil {
log.WithError(err).Error("Failed to write to UDP")
} else {
log.WithField("bytesWritten", n).Debug("Datagram written successfully")
}
return n, err
}
func (s *DatagramSession) writeChunked(b []byte, addr net.Addr) (total int, err error) {
chunkSize := RECOMMENDED_SIZE - 256 // Allow for header overhead
chunks := (len(b) + chunkSize - 1) / chunkSize
log.WithFields(logrus.Fields{
"totalSize": len(b),
"chunks": chunks,
}).Debug("Splitting datagram into chunks")
for i := 0; i < chunks; i++ {
start := i * chunkSize
end := start + chunkSize
if end > len(b) {
end = len(b)
}
chunk := b[start:end]
var n int
// Single write path that handles both cases
if s.DatagramOptions != nil {
n, err = s.writeToWithOptions(chunk, addr.(i2pkeys.I2PAddr))
} else {
header := []byte("3.1 " + s.id + " " + addr.String() + "\n")
msg := append(header, chunk...)
n, err = s.udpconn.WriteToUDP(msg, s.rUDPAddr)
}
if err != nil {
return total, fmt.Errorf("chunk %d/%d failed: %w", i+1, chunks, err)
}
total += n
if i < chunks-1 {
time.Sleep(50 * time.Millisecond)
}
}
return total, nil
}
type DatagramOptions struct {

View File

@ -225,14 +225,14 @@ func (sam *SAM) newPrimarySession(primarySessionSwitch, id string, keys i2pkeys.
ssesss := make(map[string]*StreamSession)
dsesss := make(map[string]*DatagramSession)
return &PrimarySession{
samAddr: sam.Config.I2PConfig.Sam(),
samAddr: sam.SAMEmit.I2PConfig.Sam(),
id: id,
conn: conn,
keys: keys,
Timeout: time.Duration(600 * time.Second),
Deadline: time.Now(),
sigType: Sig_NONE,
Config: sam.Config,
Config: sam.SAMEmit,
stsess: ssesss,
dgsess: dsesss,
RWMutex: sync.RWMutex{},
@ -256,14 +256,14 @@ func (sam *SAM) NewPrimarySessionWithSignature(id string, keys i2pkeys.I2PKeys,
ssesss := make(map[string]*StreamSession)
dsesss := make(map[string]*DatagramSession)
return &PrimarySession{
samAddr: sam.Config.I2PConfig.Sam(),
samAddr: sam.SAMEmit.I2PConfig.Sam(),
id: id,
conn: conn,
keys: keys,
Timeout: time.Duration(600 * time.Second),
Deadline: time.Now(),
sigType: sigType,
Config: sam.Config,
Config: sam.SAMEmit,
stsess: ssesss,
dgsess: dsesss,
RWMutex: sync.RWMutex{},

2
raw.go
View File

@ -82,7 +82,7 @@ func (s *SAM) NewRawSession(id string, keys i2pkeys.I2PKeys, options []string, u
"remoteUDPAddr": rUDPAddr,
}).Debug("Created new RawSession")
return &RawSession{s.Config.I2PConfig.Sam(), id, conn, udpconn, keys, rUDPAddr}, nil
return &RawSession{s.SAMEmit.I2PConfig.Sam(), id, conn, udpconn, keys, rUDPAddr}, nil
}
// Read one raw datagram sent to the destination of the DatagramSession. Returns

41
sam3.go
View File

@ -26,12 +26,12 @@ func init() {
// Used for controlling I2Ps SAMv3.
// This implements the "Control Socket" for all connections.
type SAM struct {
address string
conn net.Conn
resolver *SAMResolver
Config SAMEmit
keys *i2pkeys.I2PKeys
sigType int
address string
conn net.Conn
keys *i2pkeys.I2PKeys
sigType int
SAMEmit
*SAMResolver
}
const (
@ -73,24 +73,27 @@ func NewSAM(address string) (*SAM, error) {
log.WithError(err).Error("Failed to dial SAM address")
return nil, fmt.Errorf("error dialing to address '%s': %w", address, err)
}
if _, err := conn.Write(s.Config.HelloBytes()); err != nil {
if _, err := conn.Write(s.SAMEmit.HelloBytes()); err != nil {
log.WithError(err).Error("Failed to write hello message")
conn.Close()
return nil, fmt.Errorf("error writing to address '%s': %w", address, err)
}
buf := make([]byte, 256)
n, err := conn.Read(buf)
/*buf := make([]byte, 256)
n, err := conn.Read(buf)*/
reader := bufio.NewReader(conn)
response, err := reader.ReadString('\n')
if err != nil {
log.WithError(err).Error("Failed to read SAM response")
conn.Close()
return nil, fmt.Errorf("error reading onto buffer: %w", err)
return nil, fmt.Errorf("error reading SAM response: %w", err)
}
buf := []byte(response)
n := len(buf)
if strings.Contains(string(buf[:n]), "HELLO REPLY RESULT=OK") {
log.Debug("SAM hello successful")
s.Config.I2PConfig.SetSAMAddress(address)
s.SAMEmit.I2PConfig.SetSAMAddress(address)
s.conn = conn
// s.Config.I2PConfig.DestinationKeys = nil
s.resolver, err = NewSAMResolver(&s)
s.SAMResolver, err = NewSAMResolver(&s)
if err != nil {
log.WithError(err).Error("Failed to create SAM resolver")
return nil, fmt.Errorf("error creating resolver: %w", err)
@ -110,7 +113,7 @@ func NewSAM(address string) (*SAM, error) {
func (sam *SAM) Keys() (k *i2pkeys.I2PKeys) {
// TODO: copy them?
log.Debug("Retrieving SAM keys")
k = &sam.Config.I2PConfig.DestinationKeys
k = &sam.SAMEmit.I2PConfig.DestinationKeys
return
}
@ -121,7 +124,7 @@ func (sam *SAM) ReadKeys(r io.Reader) (err error) {
keys, err = i2pkeys.LoadKeysIncompat(r)
if err == nil {
log.Debug("Keys loaded successfully")
sam.Config.I2PConfig.DestinationKeys = keys
sam.SAMEmit.I2PConfig.DestinationKeys = keys
}
log.WithError(err).Error("Failed to load keys")
return
@ -134,7 +137,7 @@ func (sam *SAM) EnsureKeyfile(fname string) (keys i2pkeys.I2PKeys, err error) {
// transient
keys, err = sam.NewKeys()
if err == nil {
sam.Config.I2PConfig.DestinationKeys = keys
sam.SAMEmit.I2PConfig.DestinationKeys = keys
log.WithFields(logrus.Fields{
"keys": keys,
}).Debug("Generated new transient keys")
@ -146,7 +149,7 @@ func (sam *SAM) EnsureKeyfile(fname string) (keys i2pkeys.I2PKeys, err error) {
// make the keys
keys, err = sam.NewKeys()
if err == nil {
sam.Config.I2PConfig.DestinationKeys = keys
sam.SAMEmit.I2PConfig.DestinationKeys = keys
// save keys
var f io.WriteCloser
f, err = os.OpenFile(fname, os.O_WRONLY|os.O_CREATE, 0o600)
@ -163,7 +166,7 @@ func (sam *SAM) EnsureKeyfile(fname string) (keys i2pkeys.I2PKeys, err error) {
if err == nil {
keys, err = i2pkeys.LoadKeysIncompat(f)
if err == nil {
sam.Config.I2PConfig.DestinationKeys = keys
sam.SAMEmit.I2PConfig.DestinationKeys = keys
log.Debug("Loaded existing keys from file")
}
}
@ -227,7 +230,7 @@ func (sam *SAM) NewKeys(sigType ...string) (i2pkeys.I2PKeys, error) {
// addresses, 3) by asking peers in the I2P network.
func (sam *SAM) Lookup(name string) (i2pkeys.I2PAddr, error) {
log.WithField("name", name).Debug("Looking up address")
return sam.resolver.Resolve(name)
return sam.SAMResolver.Resolve(name)
}
// Creates a new session with the style of either "STREAM", "DATAGRAM" or "RAW",

View File

@ -99,7 +99,7 @@ func (sam *SAM) NewStreamSession(id string, keys i2pkeys.I2PKeys, options []stri
return nil, err
}
log.WithField("id", id).Debug("Created new StreamSession")
return &StreamSession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), Sig_NONE, "0", "0"}, nil
return &StreamSession{sam.SAMEmit.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), Sig_NONE, "0", "0"}, nil
}
// Creates a new StreamSession with the I2CP- and streaminglib options as
@ -111,7 +111,7 @@ func (sam *SAM) NewStreamSessionWithSignature(id string, keys i2pkeys.I2PKeys, o
return nil, err
}
log.WithFields(logrus.Fields{"id": id, "sigType": sigType}).Debug("Created new StreamSession with signature")
return &StreamSession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, "0", "0"}, nil
return &StreamSession{sam.SAMEmit.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, "0", "0"}, nil
}
// Creates a new StreamSession with the I2CP- and streaminglib options as
@ -123,7 +123,7 @@ func (sam *SAM) NewStreamSessionWithSignatureAndPorts(id, from, to string, keys
return nil, err
}
log.WithFields(logrus.Fields{"id": id, "from": from, "to": to, "sigType": sigType}).Debug("Created new StreamSession with signature and ports")
return &StreamSession{sam.Config.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, from, to}, nil
return &StreamSession{sam.SAMEmit.I2PConfig.Sam(), id, conn, keys, time.Duration(600 * time.Second), time.Now(), sigType, from, to}, nil
}
// lookup name, convenience function