client端发送数据
先来看一下smux发送数据的一个简单例子:
func client() {
// Get a TCP connection
conn, err := net.Dial(...)
if err != nil {
panic(err)
}
// Setup client side of smux
session, err := smux.Client(conn, nil)
if err != nil {
panic(err)
}
// Open a new stream
stream, err := session.OpenStream()
if err != nil {
panic(err)
}
// Stream implements io.ReadWriteCloser
stream.Write([]byte("ping"))
}
上面代码是官方的demo,首先是建立一个tcp链接,通过tcp的链接去初始化smux 的Client从而拿到一个session,session的定义其实就是一个链接,或者直译就是会话,然后通过OpenStream打开一个stream进行数据的传输。
Session的的流程如下,总的来说 一个Session就代表了一次会话,也就是一次链接,用一个链接来传输数据,多个stream共用一个通道传输
从上图来看,其实这个框架的大体设计是和http2.0的设计几乎是差不多的,感觉像是一个http2.0核心逻辑的缩小版本。
Session解析
先来看一下smux是如何拿到一个session的
func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) {
if config == nil {
config = DefaultConfig()
}
if err := VerifyConfig(config); err != nil {
return nil, err
}
return newSession(config, conn, true), nil
}
func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session {
s := new(Session)
//创建一个die chan,用来监听关闭消息
s.die = make(chan struct{})
s.conn = conn
s.config = config
//初始化stream map
s.streams = make(map[uint32]*Stream)
//初始化一个
s.chAccepts = make(chan *Stream, defaultAcceptBacklog)
//根据配置初始化MaxReceiveBuffer
s.bucket = int32(config.MaxReceiveBuffer)
//初始化一个数据包的通知chan
s.bucketNotify = make(chan struct{}, 1)
//初始化一个write消息chan
s.writes = make(chan writeRequest)
// 判断是否是client
if client {
s.nextStreamID = 1
} else {
s.nextStreamID = 0
}
//开启接收loop
go s.recvLoop()
//开启发送数据的loop
go s.sendLoop()
//开启keepalive
go s.keepalive()
return
Client
函数主要是做一些配置校验,主要的初始化还是newSession
进行的,上面的每一个参数的用处后面都会进行讲解,先来看一下核心的三个函数recvLoop
、sendLoop
、keepalive
recvLoop解析
smux
的通过复用主要是通过recvLoop
、sendLoop
这两个函数来实现的,所有发送的数据,都会放倒这两个goroutine
中去发送来实现通道复用
要看要理解recvloop需要理解几个东西:
- 数据包的格式
- frame的流程
1.数据包的格式
VERSION(1B) | CMD(1B) | LENGTH(2B) | STREAMID(4B) | DATA(LENGTH)
VERSION
先来看看version这个字段 总的来说目前没有什么用,整个项目中对这个字段目前还没有什么操作,但是总的来说,一个通信协议肯定是需要用version这个字段
CMD
CMD是具体的操作,下面是CMD的定义
const ( // cmds
cmdSYN byte = iota // stream open
cmdFIN // stream close, a.k.a EOF mark
cmdPSH // data push
cmdNOP // no operation
)
cmdSYN
主要是打开一个Session通道,client端主动建立一个stream的映射,opentStream的部分代码如下
if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil {
return nil, errors.Wrap(err, "writeFrame")
}
cmdFIN
主要是用来关闭通道的,比如传输完数据就对stream进行关闭
cmdPSH
主要是发送数据的命令
cmdNOP
没有太大的具体含义,主要是给keepalive使用,用来探活的
LENGTH
数据包的大小
STREAMID
因为通道是共用的,所以需要通过STREAMID来区分数据,
DATA
具体的数据包
2.Frame流程
frame是Smux
中最小的传输单位,一个数据包可能是一个frame也可能是多个frame,frame的大小是通过config.MaxFrameSize
去决定的,默认是4096字节
type Frame struct {
ver byte
cmd byte
sid uint32
data []byte
}
recvloop读取数据再解析
首先看一下服务端的简易实现:
func server() {
// Accept a TCP connection
conn, err := listener.Accept()
if err != nil {
panic(err)
}
// Setup server side of smux
session, err := smux.Server(conn, nil)
if err != nil {
panic(err)
}
// Accept a stream
stream, err := session.AcceptStream()
if err != nil {
panic(err)
}
// Listen for a message
buf := make([]byte, 4)
stream.Read(buf)
}
服务端接收数据主要是AcceptStream()
接收数据这个函数,这块主要是通过s.chAccepts
接收stream
func (s *Session) AcceptStream() (*Stream, error) {
//超时的实现
var deadline <-chan time.Time
if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() {
timer := time.NewTimer(d.Sub(time.Now()))
defer timer.Stop()
deadline = timer.C
}
select {
case stream := <-s.chAccepts:
return stream, nil
case <-deadline://如果超时也会直接返回
return nil, errTimeout
case <-s.die:
return nil, errors.New(errBrokenPipe)
}
}
理解了上面的概念后,再来继续看,首先会make一个buffer,最大的buffer字节是1<<16(65536字节)
加上header之后是65544
,所以上面frame的config.MaxFrameSize
最大不能超过1<<16(65536字节)
// recvLoop keeps on reading from underlying connection if tokens are available
func (s *Session) recvLoop() {
//总的buffer长度是headerSize(sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength) 8字节
//1<<16(65536字节)
//总的就是(65544字节)
buffer := make([]byte, (1<<16)+headerSize)
for {
//如果当前没有包 并且session没有关闭 那么就等待包
for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
<-s.bucketNotify
}
//读取frame
if f, err := s.readFrame(buffer); err == nil {
atomic.StoreInt32(&s.dataReady, 1)
//根据frame中的cmd作出对应的操作
switch f.cmd {
case cmdNOP:
case cmdSYN:
s.streamLock.Lock()
if _, ok := s.streams[f.sid]; !ok {
stream := newStream(f.sid, s.config.MaxFrameSize, s)
s.streams[f.sid] = stream
select {
case s.chAccepts <- stream:
case <-s.die:
}
}
s.streamLock.Unlock()
case cmdFIN:
s.streamLock.Lock()
if stream, ok := s.streams[f.sid]; ok {
stream.markRST()
stream.notifyReadEvent()
}
s.streamLock.Unlock()
case cmdPSH:
s.streamLock.Lock()
if stream, ok := s.streams[f.sid]; ok {
atomic.AddInt32(&s.bucket, -int32(len(f.data)))
stream.pushBytes(f.data)
stream.notifyReadEvent()
}
s.streamLock.Unlock()
default:
s.Close()
return
}
} else {
s.Close()
return
}
}
}
上面的recvLoop主要支持这几个cmd,cmdNOP
、cmdSYN
、cmdFIN
、cmdPSH
,具体含义上面已经讲解这里不过多描述,这里看看他们的代码实现逻辑,cmdNOP
没有任何实现就不用讲解了
cmdSYN
cmdSYN
主要是创建一个stream用来接收数据,然后把数据传输到s.chAccepts通道中
case cmdSYN:
s.streamLock.Lock()
if _, ok := s.streams[f.sid]; !ok {
stream := newStream(f.sid, s.config.MaxFrameSize, s)
s.streams[f.sid] = stream
select {
case s.chAccepts <- stream:
case <-s.die:
}
}
s.streamLock.Unlock()
cmdFIN
cmdFIN
主要是用来关闭stream的,数据传输完成后调用,这里主要是调用了stream的 markRST
、notifyReadEvent
case cmdFIN:
s.streamLock.Lock()
if stream, ok := s.streams[f.sid]; ok {
stream.markRST()
stream.notifyReadEvent()
}
s.streamLock.Unlock()
// mark this stream has been reset
func (s *Stream) markRST() {
atomic.StoreInt32(&s.rstflag, 1)
}
// notify read event
func (s *Stream) notifyReadEvent() {
select {
case s.chReadEvent <- struct{}{}:
default:
}
}
func (s *Stream) Read(b []byte) (n int, err error) {
...
READ:
.....
if n > 0 {
s.sess.returnTokens(n)
return n, nil
} else if atomic.LoadInt32(&s.rstflag) == 1 {
_ = s.Close()
return 0, io.EOF
}
select {
case <-s.chReadEvent:
goto READ
case <-deadline:
return n, errTimeout
case <-s.die:
return 0, errors.New(errBrokenPipe)
}
....
}
markRST
把rstflag
设置成1用来标记rstflag已经可以close,然后调用notifyReadEvent
触发select中的case <-s.chReadEvent:
通过goto让代码重新跳转到READ部分读取数据,然后判断rstflag==1来关闭
cmdPSH
通过sid拿到再session中保存的stream,然后把数据放进去,然后发送notifyReadEvent
事件
case cmdPSH:
s.streamLock.Lock()
if stream, ok := s.streams[f.sid]; ok {
atomic.AddInt32(&s.bucket, -int32(len(f.data)))
stream.pushBytes(f.data)
stream.notifyReadEvent()
}
s.streamLock.Unlock()
notifyReadEvent
事件后,goto会重新触发下面这段代码
READ:
//判断是否die或者超时
select {
case <-s.die:
return 0, errors.New(errBrokenPipe)
case <-deadline:
return n, errTimeout
default:
}
读取数据到b中
s.bufferLock.Lock()
n, err = s.buffer.Read(b)
s.bufferLock.Unlock()
//如果有数据
if n > 0 {
//就把读的都返回
s.sess.returnTokens(n)
return n, nil
} else if atomic.LoadInt32(&s.rstflag) == 1 {
_ = s.Close()
return 0, io.EOF
}
//继续阻塞
select {
case <-s.chReadEvent:
goto READ
case <-deadline:
return n, errTimeout
case <-s.die:
return 0, errors.New(errBrokenPipe)
}
从上面代码可以看到如果每次读到数据之后都会直接返回,所以客户端需要显示调用stream的close来关闭通道,服务端判断异常io.EOF之后才能知道数据发送完毕
sendLoop发送数据解析
客户端发送数据的话需要通过调用OpenStream()方法,在client端和server端建立一个stream映射,具体流程如下
/ OpenStream is used to create a new stream
func (s *Session) OpenStream() (*Stream, error) {
//如果已经关闭就直接返回了
if s.IsClosed() {
return nil, errors.New(errBrokenPipe)
}
// 用nextStreamIDLock 所加锁
// generate stream id
s.nextStreamIDLock.Lock()
//如果s.goAway 大于0(这里主要判断sid有没有溢出)
if s.goAway > 0 {
s.nextStreamIDLock.Unlock()
return nil, errors.New(errGoAway)
}
//拿到下一个streamId
s.nextStreamID += 2
sid := s.nextStreamID
//处理溢出的情况
if sid == sid%2 { // stream-id overflows
s.goAway = 1
s.nextStreamIDLock.Unlock()
return nil, errors.New(errGoAway)
}
//解锁
s.nextStreamIDLock.Unlock()
//创建一个stream
stream := newStream(sid, s.config.MaxFrameSize, s)
//写入一个frame进行打开通道
if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil {
return nil, errors.Wrap(err, "writeFrame")
}
//解锁
s.streamLock.Lock()
s.streams[sid] = stream
s.streamLock.Unlock()
return stream, nil
}
上面代码主要是通过发送一个指令cmdSYN
到server端,建立映射,具体可以看上面相关cmdSYN
的解析部分。
拿到一个stream之后就可以进行write数据,下面是write的逻辑
// Write implements net.Conn
func (s *Stream) Write(b []byte) (n int, err error) {
//设置超时
var deadline <-chan time.Time
if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() {
timer := time.NewTimer(d.Sub(time.Now()))
defer timer.Stop()
deadline = timer.C
}
//如果stream已经关闭就直接返回
select {
case <-s.die:
return 0, errors.New(errBrokenPipe)
default:
}
//拆分数据,然后组装frame,这块拆分比较简单,主要就是根据之前config中的frame最大值进行一个拆分数组而已
frames := s.split(b, cmdPSH, s.id)
sent := 0
//遍历frames数组 分多个包发送
for k := range frames {
req := writeRequest{
frame: frames[k],
result: make(chan writeResult, 1),
}
//把数据写入session中的writes通道
select {
case s.sess.writes <- req:
case <-s.die:
return sent, errors.New(errBrokenPipe)
case <-deadline://处理超时
return sent, errTimeout
}
//阻塞拿到每一个frame发送的结果
select {
case result := <-req.result:
sent += result.n
if result.err != nil {
return sent, result.err
}
case <-s.die:
return sent, errors.New(errBrokenPipe)
case <-deadline:
return sent, errTimeout
}
}
return sent, nil
}
上面代码发送数据主要是通过s.sess.writes <- req:
把writeRequest放入到sendLoop中进行发送,sendLoop()
是真正发送数据的地方
func (s *Session) sendLoop() {
buf := make([]byte, (1<<16)+headerSize)
for {
select {
case <-s.die:
return
case request, ok := <-s.writes://这里主要是接收来自通道`s.sess.writes <- req:` 中的数据
if !ok {
continue
}
//组装数据包
buf[0] = request.frame.ver
buf[1] = request.frame.cmd
binary.LittleEndian.PutUint16(buf[2:], uint16(len(request.frame.data)))
binary.LittleEndian.PutUint32(buf[4:], request.frame.sid)
copy(buf[headerSize:], request.frame.data)
n, err := s.conn.Write(buf[:headerSize+len(request.frame.data)])
n -= headerSize
if n < 0 {
n = 0
}
result := writeResult{
n: n,
err: err,
}
//回传数据结果到result中
request.result <- result
close(request.result)
}
}
}
keepalive()解析
keepalive()
这个函数来看,keepalive主要是用来做心跳检测的,从默认的配置来看,KeepAliveInterval
代表的是10秒一次的心跳包
KeepAliveTimeout
是30秒的keepalive,如果创建超过了30秒依然没有数据发送那么就会关闭掉
func DefaultConfig() *Config {
return &Config{
KeepAliveInterval: 10 * time.Second,
KeepAliveTimeout: 30 * time.Second,
MaxFrameSize: 4096,
MaxReceiveBuffer: 4194304,
}
}
func (s *Session) keepalive() {
//创建两个定时任务
tickerPing := time.NewTicker(s.config.KeepAliveInterval)
tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout)
defer tickerPing.Stop()
defer tickerTimeout.Stop()
for {
select {
case <-tickerPing.C:
s.writeFrame(newFrame(cmdNOP, 0))
s.notifyBucket() // force a signal to the recvLoop
case <-tickerTimeout.C:
if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) {
s.Close()
return
}
case <-s.die:
return
}
}
}
func (s *Session) recvLoop() {
buffer := make([]byte, (1<<16)+headerSize)
for {
//如果刚开始没有包 那么就会阻塞到这里
for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
<-s.bucketNotify
}
if f, err := s.readFrame(buffer); err == nil {
atomic.StoreInt32(&s.dataReady, 1)
....
atomic.StoreInt32(&s.dataReady, 1)
主要是用来标记有数据发送,让KeepAliveTimeout
不会关闭掉
session的关闭逻辑
session关闭的话,就会关闭掉所有的stream,并且关闭掉网络链接,然后通过s.notifyBucket
发送信息
func (s *Session) Close() (err error) {
s.dieLock.Lock()
select {
case <-s.die:
s.dieLock.Unlock()
return errors.New(errBrokenPipe)
default:
close(s.die)
s.dieLock.Unlock()
s.streamLock.Lock()
for k := range s.streams {
s.streams[k].sessionClose()
}
s.streamLock.Unlock()
s.notifyBucket()
return s.conn.Close()
}
}
下面是recvLoop
函数,当没有包需要发送的时候就会在这里阻塞住,上面的s.notifyBucket()
会激活这边的recvLoop,并且结束掉这个for循环
// recvLoop keeps on reading from underlying connection if tokens are available
func (s *Session) recvLoop() {
buffer := make([]byte, (1<<16)+headerSize)
for {
for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() {
<-s.bucketNotify
}
....
而close(s.die)
这个函数会关闭掉所有正在select中的s.die
session的发送函数
func (s *Session) sendLoop() {
buf := make([]byte, (1<<16)+headerSize)
for {
select {
case <-s.die:
return
总结
smux
这个框架主要的作用是复用链接,提高传输效率的,总体来说逻辑相对比较简单,核心主要是在sendLoop()
和recvLoop()
中,这块是两个负责发送数据和接收数据的,再来就是stream和session的定义,smux
抽象出一个stream
,客户端通过stream发送传输数据,从而减少每次传输数据都需要新建一个链接的这种方式,总的来说还是跟http2.0很相似的, 实际使用的时候其实还是需要再次封装,通过多个链接复用的方式才是最好的办法,能够让效率更高一些,具体可以看KCP协议,国内一个大神写的