I2P Address: [http://git.idk.i2p]

Verified Commit 0419665d authored by idk's avatar idk
Browse files

use separate incoming and outgoing queues

parent dab108c2
......@@ -30,7 +30,7 @@ func (c *NoiseTransport) Handshake(routerInfo router_info.RouterInfo) error {
defer session.(*NoiseSession).Mutex.Unlock()
c.Mutex.Lock()
// if c.config.isClient {
if err := session.(*NoiseSession).RunClientHandshake(); err != nil {
if err := session.(*NoiseSession).RunOutgoingHandshake(); err != nil {
return err
}
// Wake any other goroutines that are waiting for this handshake to
......
package noise
import "github.com/flynn/noise"
import (
"math"
"github.com/flynn/noise"
)
const (
NOISE_DH_CURVE25519 = 1
......@@ -16,7 +20,8 @@ const (
NOISE_PATTERN_XX = 9
NOISE_PATTERN_IK = 14
uint16Size = 2 // uint16 takes 2 bytes
uint16Size = 2 // uint16 takes 2 bytes
MaxPayloadSize = math.MaxUint16 - 16 /*mac size*/ - uint16Size /*data len*/
)
var ciphers = map[byte]noise.CipherFunc{
......
......@@ -49,7 +49,7 @@ func ComposeInitiatorHandshakeMessage(s noise.DHKey, rs []byte, payload []byte,
return
}
func (c *NoiseSession) RunIncomingHandshake() error {
func (c *NoiseSession) RunOutgoingHandshake() error {
var (
negData, msg []byte
state *noise.HandshakeState
......@@ -65,10 +65,12 @@ func (c *NoiseSession) RunIncomingHandshake() error {
return err
}
//read negotiation data
/*if err := c.readPacket(); err != nil {
if i2np, err := c.ReadNextI2NP(); err != nil {
return err
} else {
c.RecvQueue.Enqueue(i2np)
}
negotiationData := c.handshakeBuffer.Next(c.handshakeBuffer.Len())
/*negotiationData := c.handshakeBuffer.Next(c.handshakeBuffer.Len())
//read noise message
if err := c.readPacket(); err != nil {
return err
......@@ -76,21 +78,21 @@ func (c *NoiseSession) RunIncomingHandshake() error {
msg = c.handshakeBuffer.Next(c.handshakeBuffer.Len())
if len(negotiationData) != 0 || len(msg) == 0 {
return errors.New("Server returned error")
}
// cannot reuse msg for read, need another buf
inBlock := c.NoiseTransport.newBlock()
inBlock.reserve(len(msg))*/
}*/
//cannot reuse msg for read, need another buf
inBlock := newBlock()
//inBlock.reserve(len(msg))
var payload []byte
payload, c.CipherState, c.NoiseTransport.CipherState, err = state.ReadMessage(inBlock.data, msg)
/*if err != nil {
c.NoiseTransport.freeBlock(inBlock)
payload, c.CipherState, c.NoiseTransport.CipherState, err = state.ReadMessage(inBlock, msg)
if err != nil {
//c.NoiseTransport.freeBlock(inBlock)
return err
}*/
}
err = c.processCallback(state.PeerStatic(), payload)
if err != nil {
/*if err != nil {
c.NoiseTransport.freeBlock(inBlock)
return err
}
}*/
/*c.NoiseTransport.freeBlock(inBlock)
if c.CipherState == nil && c.NoiseTransport.CipherState == nil {
b := c.newBlock()
......
......@@ -17,13 +17,14 @@ import (
)
type NoiseSession struct {
*cb.Queue
router_info.RouterInfo
*noise.CipherState
sync.Mutex
*sync.Cond
*NoiseTransport
noise.DHKey
RecvQueue *cb.Queue
SendQueue *cb.Queue
VerifyCallback VerifyCallbackFunc
handshakeBuffer bytes.Buffer
activeCall int32
......@@ -64,11 +65,11 @@ func (s *NoiseSession) LocalAddr() net.Addr {
}
func (s *NoiseSession) QueueSendI2NP(msg i2np.I2NPMessage) {
s.Queue.Enqueue(msg)
s.SendQueue.Enqueue(msg)
}
func (s *NoiseSession) SendQueueSize() int {
return s.Queue.Size()
return s.SendQueue.Size()
}
func (s *NoiseSession) ReadNextI2NP() (i2np.I2NPMessage, error) {
......@@ -76,7 +77,8 @@ func (s *NoiseSession) ReadNextI2NP() (i2np.I2NPMessage, error) {
}
func (s *NoiseSession) Close() error {
s.Queue.Clear()
s.SendQueue.Clear()
s.RecvQueue.Clear()
return nil
}
......@@ -90,8 +92,8 @@ func (c *NoiseSession) processCallback(publicKey []byte, payload []byte) error {
}
// newBlock allocates a new packet, from hc's free list if possible.
func (h *NoiseSession) newBlock() *buffer {
return new(buffer)
func newBlock() []byte {
return make([]byte, MaxPayloadSize)
}
type VerifyCallbackFunc func(publicKey []byte, data []byte) error
......@@ -102,7 +104,8 @@ func NewNoiseTransportSession(ri router_info.RouterInfo) (transport.TransportSes
return nil, err
}
return &NoiseSession{
Queue: cb.New(1024),
SendQueue: cb.New(1024),
RecvQueue: cb.New(1024),
RouterInfo: ri,
Conn: socket,
}, nil
......
......@@ -24,7 +24,7 @@ func (c *NoiseSession) Write(b []byte) (int, error) {
break
}
}
if err := c.RunIncomingHandshake(); err != nil {
if err := c.RunOutgoingHandshake(); err != nil {
return 0, err
}
c.Mutex.Lock()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment