
代码地址: https://github.com/Xuzan9396/zinx-ws
做这个项目初衷,主要因为自己公司做直播平台的,之前公司写了一套,websocket 封装的框架,主要做房间服务器,和 h5 小游戏服务器,但是由于感觉随着业务增大,后面感觉某些设计有缺陷,看了冰哥的设计模式, 打算跟着冰哥设计模式重写一个 websocket
后续会在自己的项目中使用,打算在直播间的小游戏,准备上线使用 "Name": "zin-ws -------gitxuzan", "Host": "127.0.0.1", "端口": "端口", "TcpPort": 8999, "最大连接数": "最大连接数", "MaxConn": 1000, "最大的包大小": "最大包大小", "MaxPackageSize": 4096, "worker 池子": "worker 池子 10 个并发处理读的数据", "WorkerPoolSize": 10 | MsgId | len | body |
|---|---|---|
| 协议号 ID | body 长度 | 二进制 body 长度 |
| uint32 | uint32 | []byte |
wsconfig.SetWSConfig("127.0.0.1", 8999, wsconfig.WithName("gitxuzan ----- websocket")) 还有其他设置例如: wsconfig.WithWorkerSize(10) // 设置 10 个 worker 处理业务逻辑 wsconfig.WithMaxPackSize(4096) // 每个发送的包大小 4k wsconfig.WithMaxConn(1000) // 同时在线 1000 个连接 wsconfig.WithVersion() // 自定义本地版本 type LoginInfo struct { znet.BaseRouter } 例如上面写的 LoginInfo 继承 znet.BaseRouter 重写三个方法依次执行: PreHandle Handle PostHandle 同时要设置 router // 登录 s.AddRouter(1001, &LoginInfo{}) 1001 代表协议号,相当于协议投里面的 msgId,映射到具体某个业务,发送端需要发送对应的协议号 func (l *LoginInfo) PreHandle(request ziface.IRequest) { request 中 目前有发送,断开,获取当前属性,获取当前连接 } package main import ( wsconfig "github.com/Xuzan9396/ws/config" "github.com/Xuzan9396/ws/ziface" "github.com/Xuzan9396/ws/znet" "log" "time" ) func init() { log.SetFlags(log.Lshortfile | log.LstdFlags) } type LoginInfo struct { znet.BaseRouter } // 模拟登录逻辑 func (l *LoginInfo) PreHandle(request ziface.IRequest) { auth := false <-time.After(5 * time.Second) // 模拟业务 if auth == false { // 模拟登录认证失败,然后断开连接 request.GetConnetion().Stop() } } type PingInfo struct { znet.BaseRouter } type HelloInfo struct { znet.BaseRouter } func (p *PingInfo) PreHandle(request ziface.IRequest) { log.Printf("pre:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID()) } func (p *PingInfo) Handle(request ziface.IRequest) { log.Printf("Handle:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID()) } func (p *PingInfo) PostHandle(request ziface.IRequest) { log.Printf("post:%s,conntId:%d,,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID()) request.GetConnetion().SendMsg(request.GetMsgID(), []byte("回复 ping!")) } func (p *HelloInfo) PreHandle(request ziface.IRequest) { log.Printf("pre:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID()) } func (p *HelloInfo) Handle(request ziface.IRequest) { log.Printf("Handle:%s,conntId:%d,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID()) } func (p *HelloInfo) PostHandle(request ziface.IRequest) { log.Printf("post:%s,conntId:%d,,msgId:%d", request.GetData(), request.GetConnetion().GetConnID(), request.GetMsgID()) request.GetConnetion().SendMsg(request.GetMsgID(), []byte("回复 hello!")) } // 创建链接后初始化函数 func SetOnConnetStart(conn ziface.IConnection) { conn.SetProperty("name", "xuzan") res, bools := conn.GetProperty("name") if bools { log.Println("name", res.(string)) } conn.RemoveProperty("name") } func GetConnectNum(s ziface.IServer) { go func() { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: connNumTotal := s.GetConnMgr().Len() log.Println("连接数量:", connNumTotal) } } }() } func main() { //设置配置 wsconfig.SetWSConfig("127.0.0.1", 8999, wsconfig.WithName("gitxuzan ----- websocket")) // 创建一个 server 句柄 s := znet.NewServer() // 启动 sever s.SetOnConnStart(SetOnConnetStart) // 测试业务 s.AddRouter(1, &HelloInfo{}) // 其他业务 s.AddRouter(2, &PingInfo{}) // 登录 s.AddRouter(1001, &LoginInfo{}) // 监控长连接数量 GetConnectNum(s) s.Server() } package main import ( "flag" "github.com/Xuzan9396/ws/znet" "github.com/gorilla/websocket" "log" "net/http" "net/url" "os" "os/signal" "time" ) var addr = flag.String("addr", "127.0.0.1:8999", "http service address") func main() { flag.Parse() log.SetFlags(0) interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt) u := url.URL{Scheme: "ws", Host: *addr, Path: "/"} log.Printf("connecting to %s", u.String()) c, _, err := websocket.DefaultDialer.Dial(u.String(), http.Header{"User-Agent": {""}}) if err != nil { log.Fatal("dial:", err) } defer c.Close() log.Println("ws 连接成功") ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() p := znet.NewDataPack() by := []byte{'h', 'e', 'l', 'l', 'o'} resBytes, err := p.Pack(&znet.Message{ Id: 1, DataLen: uint32(len(by)), Data: by, }) byPing := []byte("ping") resPingBytes, _ := p.Pack(&znet.Message{ Id: 2, DataLen: uint32(len(byPing)), Data: byPing, }) timer := time.NewTimer(30 * time.Second) go read(c) for { select { case <-timer.C: // 模拟认证登录 sendMsg := []byte("login") sendMsgPack, _ := p.Pack(&znet.Message{ Id: 1001, DataLen: uint32(len(sendMsg)), Data: sendMsg, }) err := c.WriteMessage(websocket.BinaryMessage, sendMsgPack) if err != nil { log.Println("write:", err) timer.Stop() return } log.Println("login 写入成功:", string(sendMsg)) timer.Stop() case <-ticker.C: sendMsg := resBytes err := c.WriteMessage(websocket.BinaryMessage, sendMsg) if err != nil { log.Println("write:", err) return } log.Println("写入成功:", string(by)) err = c.WriteMessage(websocket.BinaryMessage, resPingBytes) if err != nil { log.Println("write:", err) return } log.Println("写入成功:", string(resPingBytes)) case <-interrupt: log.Println("interrupt") err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { log.Println("write close:", err) return } } } } func read(c *websocket.Conn) { for { _, message, err := c.ReadMessage() if err != nil { log.Println("read:", err) return } p := znet.NewDataPack() img, err := p.Unpack(message) if err != nil { log.Println("read:", err) return } log.Printf("msgId:%d,recv: %s", img.GetMsgId(), img.GetData()) } } 1 Nnq 2023-04-02 05:56:51 +08:00 golang 现在好多相似的库,都快选择恐惧症了 |
2 fridaycatye 2023-04-02 09:14:16 +08:00 强,我也是看了他的教程学习 go |
3 pubby 2023-04-02 11:55:37 +08:00 我感觉 websocket 的痛点是 1 支持大量连接 2.和业务解耦 我们公司是做了一套 ws 的基础服务 分成 2 部分 [接入] 部分,做成分布式可水平任意数量扩展,部署在各种廉价云机器上。 [消息服务] 部分,一般部署在业务所在集群,和每个 [接入] 器之间建立(一条) ws 进行数据通信,维护所有客户端信息。 业务端使用 http/grpc 和 [消息服务] 进行收发消息。 这样业务根本不需要关心 ws 处理,只要做消息回调和消息发送的处理。 接入:客户端先 http 请求一个连接 token ,此时会告诉客户端连接 token ,以及连到哪个 [接入] 点 发送消息:业务端---http/grpc--> [消息服务] ---ws---> [接入器] ----ws----> 客户端 接收消息:客户端----ws----> [接入器] -----ws-----> [消息服务] ----http/grpc---> 业务端 [接入] 和 [消息服务] 之间只使用一条 ws 连接交换消息,这样传输链路上的各种网关就不需要维护大量 ws 连接了。 |
5 lesismal 2023-04-02 12:15:02 +08:00 @pubby 接入层如果也是用 go ,还可以考虑用我这个来承载大量连接降低硬件消耗: https://github.com/lesismal/nbio 如果不是用 go 而是用 nginx 那些,就不需要我这个了,除非为了一些功能开发方便 @gitxuzan @fridaycatye 刚看了一眼你们冰哥哥的代码,比如: https://github.com/aceld/zinx/blob/master/ztimer/timer.go#L76 这种定时器要在到期前一直占用一个协程。而标准库 time.AfterFunc 只要在到期时启动一个协程、执行完就退出。 冰的这种代码,太不适合真正的大项目了,也就玩玩小项目能干翻 py 这些。 学思路可以,别被这些理论派、缺少实战的 up 把自己带偏了。 这代码辣眼睛,不继续看了。 |
6 pubby 2023-04-02 12:27:56 +08:00 @lesismal 嗯,当时出发点就是堆机器,所以用了当时最可靠的 github.com/gorilla/websocket ( 2017 年) golang 各种解决单机 ws 能力的方案还是最近几年的事情。 |
8 lesismal 2023-04-02 12:46:58 +08:00 @gitxuzan @pubby 我这个能让你们代码更简单,老业务当然没必要去浪费时间替换,但是新业务的话,欢迎试驾。。。 https://github.com/lesismal/arpc 性能也还可以: https://colobu.com/2022/07/31/2022-rpc-frameworks-benchmarks 易用性和扩展性请看看示例,该有的基本都有了 名字带了 rpc 但其实是全功能的网络库,server 主动发消息都可以的,也不限制 rpc 的方式,也可以只是推送消息不需要另一端响应,client/server side 都可以做这些,做游戏网络库可以,做 IM 可以,做 RPC 可以,做推送服务之类的都可以。支持中间件之类的各种扩展。支持前端 js client 而且也能用 http ,所以 web 前端一把梭也可以。常见的游戏客户端引擎基本都支持 js ,所以用来做游戏也可以一把梭。 在一些对性能要求极致的比如 fps 游戏,当然我还是会自己定制网络库,把中间件之类的不必要的代码去掉,把协议头做更极致的优化。 |
9 neoblackcap 2023-04-02 18:15:07 +08:00 @Nnq 一个建议,如果你觉得这活的工作量不大,那么请自己实现自己维护。那么就不用去理解人家的库是怎么调用的。而且维护起来也更加轻松 |
10 Nnq 2023-04-03 02:18:41 +08:00 @neoblackcap 只是感觉大家都是反复造轮子的感觉,时间都放在重写维护上了,如果是一个东西企业内部重度使用的话还好,有些 lib 可能几年只用那么一次,之后都不一定有人维护了,文档要是也没有;基本上可以扔垃圾桶里了 |
11 bv 2023-04-03 09:41:58 +08:00 @lesismal #6 看 B 站上讲 GMP 调度,算是讲的最清晰的 UP 了,受益不少。但是 zinx 和这个 websocket 代码质量确实辣眼睛。 |
12 lesismal 2023-04-03 10:32:15 +08:00 @bv 虽然没怎么看,但隔三岔五就会看到有人夸赞,可以看出其实作为 golang 知识传播、做得算是不错了。知识培训机构这种和实战差别还是很大,尤其是好些人习惯了不管是啥自己先造个轮子再说,直接使用标准库 timer 比他这个好得多却非要画蛇添足。不只是 zinx ,其他一些培训机构、某些厂出的号称”架构师“的 go 框架也差不多 |