基于websocket单台机器支持百万连接分布式聊天(IM)系统

PkgGoDev Release Go Report Card OpenIssue ClosedIssue Stars Forks Stargazers over time

本文将介绍如何实现一个基于websocket分布式聊天(IM)系统。

使用golang实现websocket通讯,单机可以支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。

本文内容比较长,如果直接想clone项目体验直接进入项目体验 goWebSocket项目下载 ,文本从介绍webSocket是什么开始,然后开始介绍这个项目,以及在Nginx中配置域名做webSocket的转发,然后介绍如何搭建一个分布式系统。

目录

1、项目说明

1.1 goWebSocket

本文将介绍如何实现一个基于websocket聊天(IM)分布式系统。

使用golang实现websocket通讯,单机支持百万连接,使用gin框架、nginx负载、可以水平部署、程序内部相互通讯、使用grpc通讯协议。

1.2 项目体验

2、介绍webSocket

2.1 webSocket 是什么

WebSocket 协议在2008年诞生,2011年成为国际标准。所有浏览器都已经支持了。

它的最大特点就是,服务器可以主动向客户端推送信息,客户端也可以主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。

2.2 webSocket的兼容性

浏览器开始支持webSocket的版本

golang、java、php、node.js、python、nginx 都有不错的支持

Android可以使用java-webSocket对webSocket支持

iOS 4.2及更高版本具有WebSockets支持

2.3 为什么要用webSocket

目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,然后返回结果,不可以服务端主动向某一个客户端主动发送数据

服务端处理一个请求

2.4 webSocket建立过程

客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息

Connection: Upgrade表明连接需要升级

Upgrade: websocket需要升级到 websocket协议

Sec-WebSocket-Version: 13 协议的版本为13

Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== 这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept对应

# Request Headers
Connection: Upgrade
Host: im.20jd.com
Origin: http://im.20jd.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket

浏览器 Network

服务端接收到升级协议的请求,如果服务端支持升级协议会做如下响应

返回:

Status Code: 101 Switching Protocols 表示支持切换协议

# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket

websocket接收和发送数据

3、如何实现基于webSocket的长连接系统

3.1 使用go实现webSocket服务端

3.1.1 启动端口监听

go websocket.StartWebSocket()
// 启动程序
func StartWebSocket() {
	http.HandleFunc("/acc", wsPage)
	http.ListenAndServe(":8089", nil)
}

3.1.2 升级协议

func wsPage(w http.ResponseWriter, req *http.Request) {

	// 升级协议
	conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
		fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])

		return true
	}}).Upgrade(w, req, nil)
	if err != nil {
		http.NotFound(w, req)

		return
	}

	fmt.Println("webSocket 建立连接:", conn.RemoteAddr().String())

	currentTime := uint64(time.Now().Unix())
	client := NewClient(conn.RemoteAddr().String(), conn, currentTime)

	go client.read()
	go client.write()

	// 用户连接事件
	clientManager.Register <- client
}

3.1.3 客户端连接的管理

// 连接管理
type ClientManager struct {
	Clients     map[*Client]bool   // 全部的连接
	ClientsLock sync.RWMutex       // 读写锁
	Users       map[string]*Client // 登录的用户 // appID+uuid
	UserLock    sync.RWMutex       // 读写锁
	Register    chan *Client       // 连接连接处理
	Login       chan *login        // 用户登录处理
	Unregister  chan *Client       // 断开连接处理程序
	Broadcast   chan []byte        // 广播 向全部成员发送数据
}

// 初始化
func NewClientManager() (clientManager *ClientManager) {
	clientManager = &ClientManager{
		Clients:    make(map[*Client]bool),
		Users:      make(map[string]*Client),
		Register:   make(chan *Client, 1000),
		Login:      make(chan *login, 1000),
		Unregister: make(chan *Client, 1000),
		Broadcast:  make(chan []byte, 1000),
	}

	return
}

3.1.4 注册客户端的socket的写的异步处理程序

// 向客户端写数据
func (c *Client) write() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("write stop", string(debug.Stack()), r)

		}
	}()

	defer func() {
		clientManager.Unregister <- c
		c.Socket.Close()
		fmt.Println("Client发送数据 defer", c)
	}()

	for {
		select {
		case message, ok := <-c.Send:
			if !ok {
				// 发送数据错误 关闭连接
				fmt.Println("Client发送数据 关闭连接", c.Addr, "ok", ok)

				return
			}

			c.Socket.WriteMessage(websocket.TextMessage, message)
		}
	}
}

3.1.5 注册客户端的socket的读的异步处理程序

// 读取客户端数据
func (c *Client) read() {
	defer func() {
		if r := recover(); r != nil {
			fmt.Println("write stop", string(debug.Stack()), r)
		}
	}()

	defer func() {
		fmt.Println("读取客户端数据 关闭send", c)
		close(c.Send)
	}()

	for {
		_, message, err := c.Socket.ReadMessage()
		if err != nil {
			fmt.Println("读取客户端数据 错误", c.Addr, err)

			return
		}

		// 处理程序
		fmt.Println("读取客户端数据 处理:", string(message))
		ProcessData(c, message)
	}
}

3.1.6 接收客户端数据并处理

{"seq":"1565336219141-266129","cmd":"login","data":{"userID":"马远","appID":101}}
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
/************************  请求数据  **************************/
// 通用请求数据格式
type Request struct {
	Seq  string      `json:"seq"`            // 消息的唯一ID
	Cmd  string      `json:"cmd"`            // 请求命令字
	Data interface{} `json:"data,omitempty"` // 数据 json
}

// 登录请求数据
type Login struct {
	ServiceToken string `json:"serviceToken"` // 验证用户是否登录
	AppID        uint32 `json:"appID,omitempty"`
	UserID       string `json:"userID,omitempty"`
}

// 心跳请求数据
type HeartBeat struct {
	UserID string `json:"userID,omitempty"`
}
/************************  响应数据  **************************/
type Head struct {
	Seq      string    `json:"seq"`      // 消息的ID
	Cmd      string    `json:"cmd"`      // 消息的cmd 动作
	Response *Response `json:"response"` // 消息体
}

type Response struct {
	Code    uint32      `json:"code"`
	CodeMsg string      `json:"codeMsg"`
	Data    interface{} `json:"data"` // 数据 json
}

3.1.7 使用路由的方式处理客户端的请求数据

// Websocket 路由
func WebsocketInit() {
	websocket.Register("login", websocket.LoginController)
	websocket.Register("heartbeat", websocket.HeartbeatController)
}

3.1.8 防止内存溢出和Goroutine不回收

client_manager.go

// 定时清理超时连接
func ClearTimeoutConnections() {
    currentTime := uint64(time.Now().Unix())

    for client := range clientManager.Clients {
        if client.IsHeartbeatTimeout(currentTime) {
            fmt.Println("心跳时间超时 关闭连接", client.Addr, client.UserID, client.LoginTime, client.HeartbeatTime)

            client.Socket.Close()
        }
    }
}

3.2 使用javaScript实现webSocket客户端

3.2.1 启动并注册监听程序

ws = new WebSocket("ws://127.0.0.1:8089/acc");

 
ws.onopen = function(evt) {
  console.log("Connection open ...");
};
 
ws.onmessage = function(evt) {
  console.log( "Received Message: " + evt.data);
  data_array = JSON.parse(evt.data);
  console.log( data_array);
};
 
ws.onclose = function(evt) {
  console.log("Connection closed.");
};

3.2.2 发送数据

登录:
ws.send('{"seq":"2323","cmd":"login","data":{"userID":"11","appID":101}}');

心跳:
ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');

ping 查看服务是否正常:
ws.send('{"seq":"2325","cmd":"ping","data":{}}');

关闭连接:
ws.close();

3.3 发送消息

3.3.1 文本消息

客户端只要知道发送用户是谁,还有内容就可以显示文本消息,这里我们重点关注一下数据部分

target:定义接收的目标,目前未设置

type:消息的类型,text 文本消息 img 图片消息

msg:文本消息内容

from:消息的发送者

文本消息的结构:

{
  "seq": "1569080188418-747717",
  "cmd": "msg",
  "response": {
    "code": 200,
    "codeMsg": "Ok",
    "data": {
      "target": "",
      "type": "text",
      "msg": "hello",
      "from": "马超"
    }
  }
}

这样一个文本消息的结构就设计完成了,客户端在接收到消息内容就可以展现到 IM 界面上

3.3.2 图片和语言消息

发送图片消息,发送消息者的客户端需要先把图片上传到文件服务器,上传成功以后获得图片访问的 URL,然后由发送消息者的客户端需要将图片 URL 发送到 gowebsocket,gowebsocket 图片的消息格式发送给目标客户端,消息接收者客户端接收到图片的 URL 就可以显示图片消息。

图片消息的结构:

{
  "type": "img",
  "from": "马超",
  "url": "http://20jd.com/images/home_logo.png",
  "secret": "消息鉴权 secret",
  "size": {
    "width": 480,
    "height": 720
  }
}

语言消息、和视频消息和图片消息类似,都是先把文件上传服务器,然后通过 gowebsocket 传递文件的 URL,需要注意的是部分消息涉及到隐私的文件,文件访问的时候需要做好鉴权信息,不能让非接收用户也能查看到别人的消息内容。

4、goWebSocket 项目

4.1 项目说明

4.2 项目依赖

# 主要使用到的包
github.com/gin-gonic/gin@v1.4.0
github.com/redis/go-redis/v9
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf

4.3 项目启动

git clone git@github.com:link1st/gowebsocket.git
# 或
git clone https://github.com/link1st/gowebsocket.git
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# 修改项目监听端口,redis连接等(默认127.0.0.1:3306)
vim app.yaml
# 返回项目目录,为以后启动做准备
cd ..
app:
  logFile: log/gin.log # 日志文件位置
  httpPort: 8080 # http端口
  webSocketPort: 8089 # webSocket端口
  rpcPort: 9001 # 分布式部署程序内部通讯端口
  httpUrl: 127.0.0.1:8080
  webSocketUrl:  127.0.0.1:8089


redis:
  addr: "localhost:6379"
  password: ""
  DB: 0
  poolSize: 30
  minIDleConns: 30
go run main.go

4.4 接口文档

4.4.1.1 接口说明
4.4.1 HTTP接口文档

线上:http://im.20jd.com

测试:http://im.20jd.com

4.4.1.2 聊天页面
参数 必填 类型 说明 示例
appID uint32 appID/房间ID 101
4.4.1.3 获取房间用户列表
参数 必填 类型 说明 示例
appID uint32 appID/房间ID 101
参数 必填 类型 说明 示例
code int 错误码 200
msg string 错误信息 Success
data array 返回数据
userCount int 房间内用户总数 1
userList list 用户列表
{
    "code": 200,
    "msg": "Success",
    "data": {
        "userCount": 1,
        "userList": [
            "黄帝"
        ]
    }
}
4.4.1.4 查询用户是否在线
参数 必填 类型 说明 示例
appID uint32 appID/房间ID 101
userID string 用户ID 黄帝
参数 必填 类型 说明 示例
code int 错误码 200
msg string 错误信息 Success
data array 返回数据
online bool 发送结果 true:在线 false:不在线 true
userID string 用户ID 黄帝
{
    "code": 200,
    "msg": "Success",
    "data": {
        "online": true,
        "userID": "黄帝"
    }
}
4.4.1.5 给用户发送消息
参数 必填 类型 说明 示例
appID uint32 appID/房间ID 101
userID string 用户id 黄帝
msgID string 消息ID 避免重复发送
message string 消息内容 hello
参数 必填 类型 说明 示例
code int 错误码 200
msg string 错误信息 Success
data array 返回数据
sendResults bool 发送结果 true:成功 false:失败 true
{
    "code": 200,
    "msg": "Success",
    "data": {
        "sendResults": true
    }
}
4.4.1.6 给全员用户发送消息
参数 必填 类型 说明 示例
appID uint32 appID/房间ID 101
userID string 用户id 黄帝
msgID string 消息ID 避免重复发送
message string 消息内容 hello
参数 必填 类型 说明 示例
code int 错误码 200
msg string 错误信息 Success
data array 返回数据
sendResults bool 发送结果 true:成功 false:失败 true
{
    "code": 200,
    "msg": "Success",
    "data": {
        "sendResults": true
    }
}
4.4.2 RPC接口文档
4.4.2.1 接口说明
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.protobuf";
option java_outer_classname = "ProtobufProto";


package protobuf;

// The AccServer service definition.
service AccServer {
    // 查询用户是否在线
    rpc QueryUsersOnline (QueryUsersOnlineReq) returns (QueryUsersOnlineRsp) {
    }
    // 发送消息
    rpc SendMsg (SendMsgReq) returns (SendMsgRsp) {
    }
    // 给这台机器的房间内所有用户发送消息
    rpc SendMsgAll (SendMsgAllReq) returns (SendMsgAllRsp) {
    }
    // 获取用户列表
    rpc GetUserList (GetUserListReq) returns (GetUserListRsp) {
    }
}

// 查询用户是否在线
message QueryUsersOnlineReq {
    uint32 appID = 1; // AppID
    string userID = 2; // 用户ID
}

message QueryUsersOnlineRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    bool online = 3;
}

// 发送消息
message SendMsgReq {
    string seq = 1; // 序列号
    uint32 appID = 2; // appID/房间ID
    string userID = 3; // 用户ID
    string cms = 4; // cms 动作: msg/enter/exit
    string type = 5; // type 消息类型,默认是 text
    string msg = 6; // msg
    bool isLocal = 7; // 是否查询本机 acc内部调用为:true(本机查询不到即结束)
}

message SendMsgRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    string sendMsgID = 3;
}

// 给这台机器的房间内所有用户发送消息
message SendMsgAllReq {
    string seq = 1; // 序列号
    uint32 appID = 2; // appID/房间ID
    string userID = 3; // 不发送的用户ID
    string cms = 4; // cms 动作: msg/enter/exit
    string type = 5; // type 消息类型,默认是 text
    string msg = 6; // msg
}

message SendMsgAllRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    string sendMsgID = 3;
}

// 获取用户列表
message GetUserListReq {
    uint32 appID = 1;
}

message GetUserListRsp {
    uint32 retCode = 1;
    string errMsg = 2;
    repeated string userID = 3;
}
4.4.2.2 查询用户是否在线
4.4.2.3 发送消息
4.4.2.4 给指定房间所有用户发送消息
4.4.2.5 获取房间内全部用户

5、webSocket项目Nginx配置

5.1 为什么要配置Nginx

5.2 nginx配置

upstream  go-im
{
    server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

upstream  go-acc
{
    server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}


server {
    listen       80 ;
    server_name  im.20jd.com;
    index index.html index.htm ;


    location /acc {
        proxy_set_header Host $host;
        proxy_pass http://go-acc;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 10m;
    }

    location /
    {
        proxy_set_header Host $host;
        proxy_pass http://go-im;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 30m;
    }

    access_log  /link/log/nginx/access/im.log;
    error_log   /link/log/nginx/access/im.error.log;
}

5.3 问题处理

/link/server/tengine/sbin/nginx -t

nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
http{
	fastcgi_temp_file_write_size 128k;
..... # 需要添加的内容

    #support websocket
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

.....
    gzip on;
    
}

6、压测

6.1 Linux内核优化

被压测服务器需要保持100W长连接,客户和服务器端是通过socket通讯的,每个连接需要建立一个socket,程序需要保持100W长连接就需要单个程序能打开100W个文件句柄

# 查看系统默认的值
ulimit -n
# 设置最大打开文件数
ulimit -n 1000000

通过修改配置文件的方式修改程序最大打开句柄数

root soft nofile 1040000
root hard nofile 1040000

root soft nofile 1040000
root hard nproc 1040000

root soft core unlimited
root hard core unlimited

* soft nofile 1040000
* hard nofile 1040000

* soft nofile 1040000
* hard nproc 1040000

* soft core unlimited
* hard core unlimited

修改完成以后需要重启机器配置才能生效

file-max的值需要大于limits设置的值

# file-max 设置的值参考
cat /proc/sys/fs/file-max
12553500

vim /etc/sysctl.conf

# 配置参考
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
net.ipv4.ip_local_port_range = 1024 65000
net.ipv4.tcp_mem = 786432 2097152 3145728
net.ipv4.tcp_rmem = 4096 4096 16777216
net.ipv4.tcp_wmem = 4096 4096 16777216

sysctl -p 修改配置以后使得配置生效命令

6.2 压测准备

6.3 压测数据

连接数 内存
10000 281M
100000 2.7g
200000 5.4g
500000 13.1g
1000000 25.8g

7、如何基于webSocket实现一个分布式Im

7.1 说明

7.2 架构

用户连接时序图

分布是系统随机给用户发送消息

7.3 分布式系统部署

# app.yaml 配置文件信息
app:
  logFile: log/gin.log
  httpPort: 8080
  webSocketPort: 8089
  rpcPort: 9001
  httpUrl: im.20jd.com
  webSocketUrl:  im.20jd.com

# 在启动项目
go run main.go 

# 将第一个项目拷贝一份
cp -rf gowebsocket gowebsocket1
# app.yaml 修改配置文件
app:
  logFile: log/gin.log
  httpPort: 8081
  webSocketPort: 8090
  rpcPort: 9002
  httpUrl: im.20jd.com
  webSocketUrl:  im.20jd.com

# 在启动第二个项目
go run main.go 

在之前Nginx配置项中添加第二台机器的Ip和端口

upstream  go-im
{
    server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
    server 127.0.0.1:8081 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

upstream  go-acc
{
    server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
    server 127.0.0.1:8090 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

查看请求是否落在两个项目上 实验两个用户分别连接不同的项目(gowebsocket和gowebsocket1)是否也可以相互发送消息

本项目只是演示了这个项目如何分布式部署,以及分布式部署以后模块如何进行相互通讯 完全解决系统没有单点的故障,还需 Nginx集群、redis cluster等

8、回顾和反思

8.1 在其它系统应用

8.2 已经实现的功能

IM实现细节:

8.2 需要完善、优化

8.3 总结

9、参考文献

维基百科 WebSocket

阮一峰 WebSocket教程

WebSocket协议:5分钟从入门到精通

go-stress-testing 单台机器100w连接压测实战

github 搜:link1st 查看项目 gowebsocket

https://github.com/link1st/gowebsocket

意见反馈

添加link1st的微信

赞助商