func (s *UDPSession) Read(b []byte) (n int, err error) {
for {
s.mu.Lock()
if len(s.bufptr) > 0 {
n = copy(b, s.bufptr)
s.bufptr = s.bufptr[n:]
s.mu.Unlock()
return n, nil
}
if s.isClosed {
s.mu.Unlock()
return 0, errors.New(errBrokenPipe)
}
if size := s.kcp.PeekSize(); size > 0 {
atomic.AddUint64(&DefaultSnmp.BytesReceived, uint64(size))
if len(b) >= size {
s.kcp.Recv(b)
s.mu.Unlock()
return size, nil
}
if cap(s.recvbuf) < size {
s.recvbuf = make([]byte, size)
}
s.recvbuf = s.recvbuf[:size]
s.kcp.Recv(s.recvbuf)
n = copy(b, s.recvbuf)
s.bufptr = s.recvbuf[n:]
s.mu.Unlock()
return n, nil
}
var timeout *time.Timer
var c <-chan time.Time
if !s.rd.IsZero() {
if time.Now().After(s.rd) {
s.mu.Unlock()
return 0, errTimeout{}
}
delay := s.rd.Sub(time.Now())
timeout = time.NewTimer(delay)
c = timeout.C
}
s.mu.Unlock()
select {
case <-s.chReadEvent:
case <-c:
case <-s.die:
case err = <-s.chErrorEvent:
if timeout != nil {
timeout.Stop()
}
return n, err
}
if timeout != nil {
timeout.Stop()
}
}
}
PeekSize()
func (kcp *KCP) PeekSize() (length int) {
if len(kcp.rcv_queue) == 0 {
return -1
}
seg := &kcp.rcv_queue[0]
if seg.frg == 0 {
return len(seg.data)
}
if len(kcp.rcv_queue) < int(seg.frg+1) {
return -1
}
for k := range kcp.rcv_queue {
seg := &kcp.rcv_queue[k]
length += len(seg.data)
if seg.frg == 0 {
break
}
}
return
}