• 9.3 链接的带缓冲的发包方法

    9.3 链接的带缓冲的发包方法

    我们之前给Connection提供了一个发消息的方法SendMsg(),这个是将数据发送到一个无缓冲的channel中msgChan。但是如果客户端链接比较多的话,如果对方处理不及时,可能会出现短暂的阻塞现象,我们可以做一个提供一定缓冲的发消息方法,做一些非阻塞的发送体验。

    zinx/ziface/iconnection.go

    1. //定义连接接口
    2. type IConnection interface {
    3. //启动连接,让当前连接开始工作
    4. Start()
    5. //停止连接,结束当前连接状态M
    6. Stop()
    7. //从当前连接获取原始的socket TCPConn
    8. GetTCPConnection() *net.TCPConn
    9. //获取当前连接ID
    10. GetConnID() uint32
    11. //获取远程客户端地址信息
    12. RemoteAddr() net.Addr
    13. //直接将Message数据发送数据给远程的TCP客户端(无缓冲)
    14. SendMsg(msgId uint32, data []byte) error
    15. //直接将Message数据发送给远程的TCP客户端(有缓冲)
    16. SendBuffMsg(msgId uint32, data []byte) error //添加带缓冲发送消息接口
    17. }

    zinx/znet/connection.go

    1. type Connection struct {
    2. //当前Conn属于哪个Server
    3. TcpServer ziface.IServer
    4. //当前连接的socket TCP套接字
    5. Conn *net.TCPConn
    6. //当前连接的ID 也可以称作为SessionID,ID全局唯一
    7. ConnID uint32
    8. //当前连接的关闭状态
    9. isClosed bool
    10. //消息管理MsgId和对应处理方法的消息管理模块
    11. MsgHandler ziface.IMsgHandle
    12. //告知该链接已经退出/停止的channel
    13. ExitBuffChan chan bool
    14. //无缓冲管道,用于读、写两个goroutine之间的消息通信
    15. msgChan chan []byte
    16. //有关冲管道,用于读、写两个goroutine之间的消息通信
    17. msgBuffChan chan []byte //定义channel成员
    18. }
    19. //创建连接的方法
    20. func NewConntion(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection{
    21. //初始化Conn属性
    22. c := &Connection{
    23. TcpServer:server,
    24. Conn: conn,
    25. ConnID: connID,
    26. isClosed: false,
    27. MsgHandler: msgHandler,
    28. ExitBuffChan: make(chan bool, 1),
    29. msgChan:make(chan []byte),
    30. msgBuffChan:make(chan []byte, utils.GlobalObject.MaxMsgChanLen), //不要忘记初始化
    31. }
    32. //将新创建的Conn添加到链接管理中
    33. c.TcpServer.GetConnMgr().Add(c)
    34. return c
    35. }

    然后将SendBuffMsg()方法实现一下:

    1. func (c *Connection) SendBuffMsg(msgId uint32, data []byte) error {
    2. if c.isClosed == true {
    3. return errors.New("Connection closed when send buff msg")
    4. }
    5. //将data封包,并且发送
    6. dp := NewDataPack()
    7. msg, err := dp.Pack(NewMsgPackage(msgId, data))
    8. if err != nil {
    9. fmt.Println("Pack error msg id = ", msgId)
    10. return errors.New("Pack error msg ")
    11. }
    12. //写回客户端
    13. c.msgBuffChan <- msg
    14. return nil
    15. }

    我们在Writer中也要有对msgBuffChan的数据监控:

    1. /*
    2. 写消息Goroutine, 用户将数据发送给客户端
    3. */
    4. func (c *Connection) StartWriter() {
    5. fmt.Println("[Writer Goroutine is running]")
    6. defer fmt.Println(c.RemoteAddr().String(), "[conn Writer exit!]")
    7. for {
    8. select {
    9. case data := <-c.msgChan:
    10. //有数据要写给客户端
    11. if _, err := c.Conn.Write(data); err != nil {
    12. fmt.Println("Send Data error:, ", err, " Conn Writer exit")
    13. return
    14. }
    15. //针对有缓冲channel需要些的数据处理
    16. case data, ok:= <-c.msgBuffChan:
    17. if ok {
    18. //有数据要写给客户端
    19. if _, err := c.Conn.Write(data); err != nil {
    20. fmt.Println("Send Buff Data error:, ", err, " Conn Writer exit")
    21. return
    22. }
    23. } else {
    24. break
    25. fmt.Println("msgBuffChan is Closed")
    26. }
    27. case <-c.ExitBuffChan:
    28. return
    29. }
    30. }
    31. }