当前位置: 移动技术网 > IT编程>脚本编程>Go语言 > golang websocket 服务端的实现

golang websocket 服务端的实现

2019年09月09日  | 移动技术网IT编程  | 我要评论

创建一个websocket的服务端

package smile

import (
  "errors"
  "log"
  "net/http"
  "sync"
  "time"

  "github.com/gorilla/websocket"
)

const (
  // 允许等待的写入时间
  writewait = 10 * time.second

  // time allowed to read the next pong message from the peer.
  pongwait = 60 * time.second

  // send pings to peer with this period. must be less than pongwait.
  pingperiod = (pongwait * 9) / 10

  // maximum message size allowed from peer.
  maxmessagesize = 512
)

// 最大的连接id,每次连接都加1 处理
var maxconnid int64

// 客户端读写消息
type wsmessage struct {
  // websocket.textmessage 消息类型
  messagetype int
  data    []byte
}

// ws 的所有连接
// 用于广播
var wsconnall map[int64]*wsconnection

var upgrader = websocket.upgrader{
  readbuffersize: 1024,
  writebuffersize: 1024,
  // 允许所有的cors 跨域请求,正式环境可以关闭
  checkorigin: func(r *http.request) bool {
    return true
  },
}

// 客户端连接
type wsconnection struct {
  wssocket *websocket.conn // 底层websocket
  inchan  chan *wsmessage // 读队列
  outchan chan *wsmessage // 写队列

  mutex   sync.mutex // 避免重复关闭管道,加锁处理
  isclosed bool
  closechan chan byte // 关闭通知
  id    int64
}

func wshandler(resp http.responsewriter, req *http.request) {
  // 应答客户端告知升级连接为websocket
  wssocket, err := upgrader.upgrade(resp, req, nil)
  if err != nil {
    log.println("升级为websocket失败", err.error())
    return
  }
  maxconnid++
  // todo 如果要控制连接数可以计算,wsconnall长度
  // 连接数保持一定数量,超过的部分不提供服务
  wsconn := &wsconnection{
    wssocket: wssocket,
    inchan:  make(chan *wsmessage, 1000),
    outchan:  make(chan *wsmessage, 1000),
    closechan: make(chan byte),
    isclosed: false,
    id:    maxconnid,
  }
  wsconnall[maxconnid] = wsconn
  log.println("当前在线人数", len(wsconnall))

  // 处理器,发送定时信息,避免意外关闭
  go wsconn.processloop()
  // 读协程
  go wsconn.wsreadloop()
  // 写协程
  go wsconn.wswriteloop()
}

// 处理队列中的消息
func (wsconn *wsconnection) processloop() {
  // 处理消息队列中的消息
  // 获取到消息队列中的消息,处理完成后,发送消息给客户端
  for {
    msg, err := wsconn.wsread()
    if err != nil {
      log.println("获取消息出现错误", err.error())
      break
    }
    log.println("接收到消息", string(msg.data))
    // 修改以下内容把客户端传递的消息传递给处理程序
    err = wsconn.wswrite(msg.messagetype, msg.data)
    if err != nil {
      log.println("发送消息给客户端出现错误", err.error())
      break
    }
  }
}

// 处理消息队列中的消息
func (wsconn *wsconnection) wsreadloop() {
  // 设置消息的最大长度
  wsconn.wssocket.setreadlimit(maxmessagesize)
  wsconn.wssocket.setreaddeadline(time.now().add(pongwait))
  for {
    // 读一个message
    msgtype, data, err := wsconn.wssocket.readmessage()
    if err != nil {
      websocket.isunexpectedcloseerror(err, websocket.closegoingaway, websocket.closeabnormalclosure)
      log.println("消息读取出现错误", err.error())
      wsconn.close()
      return
    }
    req := &wsmessage{
      msgtype,
      data,
    }
    // 放入请求队列,消息入栈
    select {
    case wsconn.inchan <- req:
    case <-wsconn.closechan:
      return
    }
  }
}

// 发送消息给客户端
func (wsconn *wsconnection) wswriteloop() {
  ticker := time.newticker(pingperiod)
  defer func() {
    ticker.stop()
  }()
  for {
    select {
    // 取一个应答
    case msg := <-wsconn.outchan:
      // 写给websocket
      if err := wsconn.wssocket.writemessage(msg.messagetype, msg.data); err != nil {
        log.println("发送消息给客户端发生错误", err.error())
        // 切断服务
        wsconn.close()
        return
      }
    case <-wsconn.closechan:
      // 获取到关闭通知
      return
    case <-ticker.c:
      // 出现超时情况
      wsconn.wssocket.setwritedeadline(time.now().add(writewait))
      if err := wsconn.wssocket.writemessage(websocket.pingmessage, nil); err != nil {
        return
      }
    }
  }
}

// 写入消息到队列中
func (wsconn *wsconnection) wswrite(messagetype int, data []byte) error {
  select {
  case wsconn.outchan <- &wsmessage{messagetype, data}:
  case <-wsconn.closechan:
    return errors.new("连接已经关闭")
  }
  return nil
}

// 读取消息队列中的消息
func (wsconn *wsconnection) wsread() (*wsmessage, error) {
  select {
  case msg := <-wsconn.inchan:
    // 获取到消息队列中的消息
    return msg, nil
  case <-wsconn.closechan:

  }
  return nil, errors.new("连接已经关闭")
}

// 关闭连接
func (wsconn *wsconnection) close() {
  log.println("关闭连接被调用了")
  wsconn.wssocket.close()
  wsconn.mutex.lock()
  defer wsconn.mutex.unlock()
  if wsconn.isclosed == false {
    wsconn.isclosed = true
    // 删除这个连接的变量
    delete(wsconnall, wsconn.id)
    close(wsconn.closechan)
  }
}

// 启动程序
func startwebsocket(addrport string) {
  wsconnall = make(map[int64]*wsconnection)
  http.handlefunc("/ws", wshandler)
  http.listenandserve(addrport, nil)
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网