client端发送数据

先来看一下smux发送数据的一个简单例子:


func client() {
// Get a TCP connection
conn, err := net.Dial(...)
if err != nil {
panic(err)
}

// Setup client side of smux
session, err := smux.Client(conn, nil)
if err != nil {
panic(err)
}

// Open a new stream
stream, err := session.OpenStream()
if err != nil {
panic(err)
}

// Stream implements io.ReadWriteCloser
stream.Write([]byte("ping"))
}

上面代码是官方的demo,首先是建立一个tcp链接,通过tcp的链接去初始化smux 的Client从而拿到一个session,session的定义其实就是一个链接,或者直译就是会话,然后通过OpenStream打开一个stream进行数据的传输。

Session的的流程如下,总的来说 一个Session就代表了一次会话,也就是一次链接,用一个链接来传输数据,多个stream共用一个通道传输

从上图来看,其实这个框架的大体设计是和http2.0的设计几乎是差不多的,感觉像是一个http2.0核心逻辑的缩小版本。

Session解析

先来看一下smux是如何拿到一个session的


func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) {
    if config == nil {
        config = DefaultConfig()
    }

    if err := VerifyConfig(config); err != nil {
        return nil, err
    }
    return newSession(config, conn, true), nil
}

func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {

    s := new(Session)
    //创建一个die chan,用来监听关闭消息
    s.die = make(chan struct{})
    s.conn = conn
    s.config = config
    //初始化stream map
    s.streams = make(map[uint32]*Stream)
    //初始化一个
    s.chAccepts = make(chan *Stream, defaultAcceptBacklog)
    //根据配置初始化MaxReceiveBuffer
    s.bucket = int32(config.MaxReceiveBuffer)
    //初始化一个数据包的通知chan
    s.bucketNotify = make(chan struct{}, 1)
    //初始化一个write消息chan
    s.writes = make(chan writeRequest)
    //  判断是否是client
    if client {
        s.nextStreamID = 1
    } else {
        s.nextStreamID = 0
    }
    //开启接收loop
    go s.recvLoop()
    //开启发送数据的loop
    go s.sendLoop()
    //开启keepalive
    go s.keepalive()
    return

Client函数主要是做一些配置校验,主要的初始化还是newSession进行的,上面的每一个参数的用处后面都会进行讲解,先来看一下核心的三个函数recvLoopsendLoopkeepalive

recvLoop解析

smux的通过复用主要是通过recvLoopsendLoop这两个函数来实现的,所有发送的数据,都会放倒这两个goroutine中去发送来实现通道复用

要看要理解recvloop需要理解几个东西:

  1. 数据包的格式
  2. frame的流程

1.数据包的格式

VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)

VERSION

先来看看version这个字段 总的来说目前没有什么用,整个项目中对这个字段目前还没有什么操作,但是总的来说,一个通信协议肯定是需要用version这个字段

CMD

CMD是具体的操作,下面是CMD的定义

const ( // cmds
    cmdSYN byte = iota // stream open
    cmdFIN             // stream close, a.k.a EOF mark
    cmdPSH             // data push
    cmdNOP             // no operation
)

cmdSYN 主要是打开一个Session通道,client端主动建立一个stream的映射,opentStream的部分代码如下

if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil {
        return nil, errors.Wrap(err, "writeFrame")
    }

cmdFIN主要是用来关闭通道的,比如传输完数据就对stream进行关闭

cmdPSH主要是发送数据的命令

cmdNOP 没有太大的具体含义,主要是给keepalive使用,用来探活的

LENGTH

数据包的大小

STREAMID

因为通道是共用的,所以需要通过STREAMID来区分数据,

DATA

具体的数据包

2.Frame流程

frame是Smux中最小的传输单位,一个数据包可能是一个frame也可能是多个frame,frame的大小是通过config.MaxFrameSize去决定的,默认是4096字节

type Frame struct {
    ver  byte
    cmd  byte
    sid  uint32
    data []byte
}

recvloop读取数据再解析

首先看一下服务端的简易实现:

func server() {
    // Accept a TCP connection
    conn, err := listener.Accept()
    if err != nil {
        panic(err)
    }

    // Setup server side of smux
    session, err := smux.Server(conn, nil)
    if err != nil {
        panic(err)
    }

    // Accept a stream
    stream, err := session.AcceptStream()
    if err != nil {
        panic(err)
    }

    // Listen for a message
    buf := make([]byte, 4)
    stream.Read(buf)
}

服务端接收数据主要是AcceptStream()接收数据这个函数,这块主要是通过s.chAccepts接收stream


func (s *Session) AcceptStream() (*Stream, error) {
    //超时的实现
    var deadline <-chan time.Time
    if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() {
        timer := time.NewTimer(d.Sub(time.Now()))
        defer timer.Stop()
        deadline = timer.C
    }
    select {
    case stream := <-s.chAccepts:
        return stream, nil
    case <-deadline://如果超时也会直接返回
        return nil, errTimeout
    case <-s.die:
        return nil, errors.New(errBrokenPipe)
    }
}

理解了上面的概念后,再来继续看,首先会make一个buffer,最大的buffer字节是1<<16(65536字节) 加上header之后是65544,所以上面frame的config.MaxFrameSize最大不能超过1<<16(65536字节)

// recvLoop keeps on reading from underlying connection if tokens are available
func (s *Session) recvLoop() {
    //总的buffer长度是headerSize(sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength) 8字节
    //1<<16(65536字节)
    //总的就是(65544字节)
    buffer := make([]byte, (1<<16)+headerSize)
    for {
       //如果当前没有包 并且session没有关闭 那么就等待包
        for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
            <-s.bucketNotify
        }
        //读取frame
        if f, err := s.readFrame(buffer); err == nil {
            atomic.StoreInt32(&s.dataReady, 1)
            //根据frame中的cmd作出对应的操作
            switch f.cmd {
            case cmdNOP:
            case cmdSYN:
                s.streamLock.Lock()
                if _, ok := s.streams[f.sid]; !ok {
                    stream := newStream(f.sid, s.config.MaxFrameSize, s)
                    s.streams[f.sid] = stream
                    select {
                    case s.chAccepts <- stream:
                    case <-s.die:
                    }
                }
                s.streamLock.Unlock()
            case cmdFIN:
                s.streamLock.Lock()
                if stream, ok := s.streams[f.sid]; ok {
                    stream.markRST()
                    stream.notifyReadEvent()
                }
                s.streamLock.Unlock()
            case cmdPSH:
                s.streamLock.Lock()
                if stream, ok := s.streams[f.sid]; ok {
                    atomic.AddInt32(&s.bucket, -int32(len(f.data)))
                    stream.pushBytes(f.data)
                    stream.notifyReadEvent()
                }
                s.streamLock.Unlock()
            default:
                s.Close()
                return
            }
        } else {
            s.Close()
            return
        }
    }
}

上面的recvLoop主要支持这几个cmd,cmdNOPcmdSYNcmdFINcmdPSH,具体含义上面已经讲解这里不过多描述,这里看看他们的代码实现逻辑,cmdNOP没有任何实现就不用讲解了

cmdSYN

cmdSYN主要是创建一个stream用来接收数据,然后把数据传输到s.chAccepts通道中

case cmdSYN:
    s.streamLock.Lock()
    if _, ok := s.streams[f.sid]; !ok {
        stream := newStream(f.sid, s.config.MaxFrameSize, s)
        s.streams[f.sid] = stream
        select {
        case s.chAccepts <- stream:
        case <-s.die:
        }
    }
    s.streamLock.Unlock()

cmdFIN

cmdFIN主要是用来关闭stream的,数据传输完成后调用,这里主要是调用了stream的 markRSTnotifyReadEvent

case cmdFIN:
    s.streamLock.Lock()
    if stream, ok := s.streams[f.sid]; ok {
        stream.markRST()
        stream.notifyReadEvent()
    }
    s.streamLock.Unlock()


// mark this stream has been reset
func (s *Stream) markRST() {
    atomic.StoreInt32(&s.rstflag, 1)
}
// notify read event
func (s *Stream) notifyReadEvent() {
    select {
    case s.chReadEvent <- struct{}{}:
    default:
    }
}

func (s *Stream) Read(b []byte) (n int, err error) {
...
READ:
    .....
    if n > 0 {
        s.sess.returnTokens(n)
        return n, nil
    } else if atomic.LoadInt32(&s.rstflag) == 1 {
        _ = s.Close()
        return 0, io.EOF
    }

    select {
    case <-s.chReadEvent:
        goto READ
    case <-deadline:
        return n, errTimeout
    case <-s.die:
        return 0, errors.New(errBrokenPipe)
    }
    ....
}

markRSTrstflag设置成1用来标记rstflag已经可以close,然后调用notifyReadEvent触发select中的case <-s.chReadEvent:通过goto让代码重新跳转到READ部分读取数据,然后判断rstflag==1来关闭

cmdPSH

通过sid拿到再session中保存的stream,然后把数据放进去,然后发送notifyReadEvent事件

case cmdPSH:
    s.streamLock.Lock()
    if stream, ok := s.streams[f.sid]; ok {
        atomic.AddInt32(&s.bucket, -int32(len(f.data)))
        stream.pushBytes(f.data)
        stream.notifyReadEvent()
    }
    s.streamLock.Unlock()

notifyReadEvent事件后,goto会重新触发下面这段代码

READ:
    //判断是否die或者超时
    select {
    case <-s.die:
        return 0, errors.New(errBrokenPipe)
    case <-deadline:
        return n, errTimeout
    default:
    }
    读取数据到b中
    s.bufferLock.Lock()
    n, err = s.buffer.Read(b)
    s.bufferLock.Unlock()
    //如果有数据
    if n > 0 {
        //就把读的都返回
        s.sess.returnTokens(n)
        return n, nil
    } else if atomic.LoadInt32(&s.rstflag) == 1 {
        _ = s.Close()
        return 0, io.EOF
    }
    //继续阻塞
    select {
    case <-s.chReadEvent:
        goto READ
    case <-deadline:
        return n, errTimeout
    case <-s.die:
        return 0, errors.New(errBrokenPipe)
    }

从上面代码可以看到如果每次读到数据之后都会直接返回,所以客户端需要显示调用stream的close来关闭通道,服务端判断异常io.EOF之后才能知道数据发送完毕

sendLoop发送数据解析

客户端发送数据的话需要通过调用OpenStream()方法,在client端和server端建立一个stream映射,具体流程如下

/ OpenStream is used to create a new stream
func (s *Session) OpenStream() (*Stream, error) {
    //如果已经关闭就直接返回了
    if s.IsClosed() {
        return nil, errors.New(errBrokenPipe)
    }
    // 用nextStreamIDLock 所加锁
    // generate stream id
    s.nextStreamIDLock.Lock()
    //如果s.goAway 大于0(这里主要判断sid有没有溢出)
    if s.goAway > 0 {
        s.nextStreamIDLock.Unlock()
        return nil, errors.New(errGoAway)
    }
    //拿到下一个streamId
    s.nextStreamID += 2
    sid := s.nextStreamID
    //处理溢出的情况
    if sid == sid%2 { // stream-id overflows
        s.goAway = 1
        s.nextStreamIDLock.Unlock()
        return nil, errors.New(errGoAway)
    }
    //解锁
    s.nextStreamIDLock.Unlock()
    //创建一个stream
    stream := newStream(sid, s.config.MaxFrameSize, s)
    //写入一个frame进行打开通道 
    if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil {
        return nil, errors.Wrap(err, "writeFrame")
    }
    //解锁
    s.streamLock.Lock()
    s.streams[sid] = stream
    s.streamLock.Unlock()
    return stream, nil
}

上面代码主要是通过发送一个指令cmdSYN到server端,建立映射,具体可以看上面相关cmdSYN的解析部分。

拿到一个stream之后就可以进行write数据,下面是write的逻辑


// Write implements net.Conn
func (s *Stream) Write(b []byte) (n int, err error) {
    //设置超时
    var deadline <-chan time.Time
    if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() {
        timer := time.NewTimer(d.Sub(time.Now()))
        defer timer.Stop()
        deadline = timer.C
    }
    //如果stream已经关闭就直接返回
    select {
    case <-s.die:
        return 0, errors.New(errBrokenPipe)
    default:
    }
    //拆分数据,然后组装frame,这块拆分比较简单,主要就是根据之前config中的frame最大值进行一个拆分数组而已
    frames := s.split(b, cmdPSH, s.id)
    sent := 0
    //遍历frames数组 分多个包发送
    for k := range frames {
        req := writeRequest{
            frame:  frames[k],
            result: make(chan writeResult, 1),
        }
        //把数据写入session中的writes通道
        select {
        case s.sess.writes <- req:
        case <-s.die:
            return sent, errors.New(errBrokenPipe)
        case <-deadline://处理超时
            return sent, errTimeout
        }
        //阻塞拿到每一个frame发送的结果
        select {
        case result := <-req.result:
            sent += result.n
            if result.err != nil {
                return sent, result.err
            }
        case <-s.die:
            return sent, errors.New(errBrokenPipe)
        case <-deadline:
            return sent, errTimeout
        }
    }
    return sent, nil
}

上面代码发送数据主要是通过s.sess.writes <- req: 把writeRequest放入到sendLoop中进行发送,sendLoop()是真正发送数据的地方

func (s *Session) sendLoop() {
    buf := make([]byte, (1<<16)+headerSize)
    for {
        select {
        case <-s.die:
            return
        case request, ok := <-s.writes://这里主要是接收来自通道`s.sess.writes <- req:` 中的数据
            if !ok {
                continue
            }
            //组装数据包
            buf[0] = request.frame.ver
            buf[1] = request.frame.cmd
            binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data)))
            binary.LittleEndian.PutUint32(buf[4:], request.frame.sid)
            copy(buf[headerSize:], request.frame.data)
            n, err := s.conn.Write(buf[:headerSize+len(request.frame.data)])

            n -= headerSize
            if n < 0 {
                n = 0
            }

            result := writeResult{
                n:   n,
                err: err,
            }
            //回传数据结果到result中
            request.result <- result
            close(request.result)
        }
    }
}

keepalive()解析

keepalive()这个函数来看,keepalive主要是用来做心跳检测的,从默认的配置来看,KeepAliveInterval代表的是10秒一次的心跳包 KeepAliveTimeout是30秒的keepalive,如果创建超过了30秒依然没有数据发送那么就会关闭掉

func DefaultConfig() *Config {
    return &Config{
        KeepAliveInterval: 10 * time.Second,
        KeepAliveTimeout:  30 * time.Second,
        MaxFrameSize:      4096,
        MaxReceiveBuffer:  4194304,
    }
}
func (s *Session) keepalive() {
    //创建两个定时任务
    tickerPing := time.NewTicker(s.config.KeepAliveInterval)
    tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout)
    defer tickerPing.Stop()
    defer tickerTimeout.Stop()
    for {
        select {
        case <-tickerPing.C:
            s.writeFrame(newFrame(cmdNOP, 0))
            s.notifyBucket() // force a signal to the recvLoop
        case <-tickerTimeout.C:
            if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) {
                s.Close()
                return
            }
        case <-s.die:
            return
        }
    }
}
func (s *Session) recvLoop() {
    buffer := make([]byte, (1<<16)+headerSize)
    for {
    //如果刚开始没有包 那么就会阻塞到这里
        for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
            <-s.bucketNotify
        }

        if f, err := s.readFrame(buffer); err == nil {
            atomic.StoreInt32(&s.dataReady, 1)
....

atomic.StoreInt32(&s.dataReady, 1)主要是用来标记有数据发送,让KeepAliveTimeout不会关闭掉

session的关闭逻辑

session关闭的话,就会关闭掉所有的stream,并且关闭掉网络链接,然后通过s.notifyBucket发送信息


func (s *Session) Close() (err error) {
    s.dieLock.Lock()

    select {
    case <-s.die:
        s.dieLock.Unlock()
        return errors.New(errBrokenPipe)
    default:
        close(s.die)
        s.dieLock.Unlock()
        s.streamLock.Lock()
        for k := range s.streams {
            s.streams[k].sessionClose()
        }
        s.streamLock.Unlock()
        s.notifyBucket()
        return s.conn.Close()
    }
}

下面是recvLoop函数,当没有包需要发送的时候就会在这里阻塞住,上面的s.notifyBucket()会激活这边的recvLoop,并且结束掉这个for循环

// recvLoop keeps on reading from underlying connection if tokens are available
func (s *Session) recvLoop() {
    buffer := make([]byte, (1<<16)+headerSize)
    for {
        for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
            <-s.bucketNotify
        }
....

close(s.die)这个函数会关闭掉所有正在select中的s.die

session的发送函数
func (s *Session) sendLoop() {
    buf := make([]byte, (1<<16)+headerSize)
    for {
        select {
        case <-s.die:
            return

总结

smux这个框架主要的作用是复用链接,提高传输效率的,总体来说逻辑相对比较简单,核心主要是在sendLoop()recvLoop()中,这块是两个负责发送数据和接收数据的,再来就是stream和session的定义,smux抽象出一个stream,客户端通过stream发送传输数据,从而减少每次传输数据都需要新建一个链接的这种方式,总的来说还是跟http2.0很相似的, 实际使用的时候其实还是需要再次封装,通过多个链接复用的方式才是最好的办法,能够让效率更高一些,具体可以看KCP协议,国内一个大神写的

results matching ""

    No results matching ""