# README
greatws
支持海量连接的websocket库,callback写法
处理流程
特性
- 支持 epoll/kqueue
- 低内存占用
- 高tps
- 对websocket的兼容性较高,完整实现rfc6455, rfc7692
暂不支持
- ssl
- windows
- io-uring
警告⚠️
早期阶段,暂时不建议生产使用
内容
例子-服务端
net http升级到websocket服务端
package main
import (
"fmt"
"github.com/antlabs/greatws"
)
type echoHandler struct{}
func (e *echoHandler) OnOpen(c *greatws.Conn) {
// fmt.Printf("OnOpen: %p\n", c)
}
func (e *echoHandler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
if err := c.WriteTimeout(op, msg, 3*time.Second); err != nil {
fmt.Println("write fail:", err)
}
// if err := c.WriteMessage(op, msg); err != nil {
// slog.Error("write fail:", err)
// }
}
func (e *echoHandler) OnClose(c *greatws.Conn, err error) {
errMsg := ""
if err != nil {
errMsg = err.Error()
}
slog.Error("OnClose:", errMsg)
}
type handler struct {
m *greatws.MultiEventLoop
}
func (h *handler) echo(w http.ResponseWriter, r *http.Request) {
c, err := greatws.Upgrade(w, r,
greatws.WithServerReplyPing(),
// greatws.WithServerDecompression(),
greatws.WithServerIgnorePong(),
greatws.WithServerCallback(&echoHandler{}),
// greatws.WithServerEnableUTF8Check(),
greatws.WithServerReadTimeout(5*time.Second),
greatws.WithServerMultiEventLoop(h.m),
)
if err != nil {
slog.Error("Upgrade fail:", "err", err.Error())
}
_ = c
}
func main() {
var h handler
h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
h.m.Start()
fmt.Printf("apiname:%s\n", h.m.GetApiName())
mux := &http.ServeMux{}
mux.HandleFunc("/autobahn", h.echo)
rawTCP, err := net.Listen("tcp", ":9001")
if err != nil {
fmt.Println("Listen fail:", err)
return
}
log.Println("non-tls server exit:", http.Serve(rawTCP, mux))
}
gin升级到websocket服务端
package main
import (
"fmt"
"github.com/antlabs/greatws"
"github.com/gin-gonic/gin"
)
type handler struct{
m *greatws.MultiEventLoop
}
func (h *handler) OnOpen(c *greatws.Conn) {
fmt.Printf("服务端收到一个新的连接")
}
func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
// newMsg := make([]byte, len(msg))
// copy(newMsg, msg)
fmt.Printf("收到客户端消息:%s\n", msg)
c.WriteMessage(op, msg)
// os.Stdout.Write(msg)
}
func (h *handler) OnClose(c *greatws.Conn, err error) {
fmt.Printf("服务端连接关闭:%v\n", err)
}
func main() {
r := gin.Default()
var h handler
h.m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
h.m.Start()
r.GET("/", func(c *gin.Context) {
con, err := greatws.Upgrade(c.Writer, c.Request, greatws.WithServerCallback(h.m), greatws.WithServerMultiEventLoop(h.m))
if err != nil {
return
}
con.StartReadLoop()
})
r.Run()
}
客户端
package main
import (
"fmt"
"time"
"github.com/antlabs/greatws"
)
var m *greatws.MultiEventLoop
type handler struct{}
func (h *handler) OnOpen(c *greatws.Conn) {
fmt.Printf("客户端连接成功\n")
}
func (h *handler) OnMessage(c *greatws.Conn, op greatws.Opcode, msg []byte) {
// 如果msg的生命周期不是在OnMessage中结束,需要拷贝一份
// newMsg := make([]byte, len(msg))
// copy(newMsg, msg)
fmt.Printf("收到服务端消息:%s\n", msg)
c.WriteMessage(op, msg)
time.Sleep(time.Second)
}
func (h *handler) OnClose(c *greatws.Conn, err error) {
fmt.Printf("客户端端连接关闭:%v\n", err)
}
func main() {
m = greatws.NewMultiEventLoopMust(greatws.WithEventLoops(0), greatws.WithMaxEventNum(256), greatws.WithLogLevel(slog.LevelError)) // epoll, kqueue
m.Start()
c, err := greatws.Dial("ws://127.0.0.1:8080/", greatws.WithClientCallback(&handler{}), greatws.WithServerMultiEventLoop(h.m))
if err != nil {
fmt.Printf("连接失败:%v\n", err)
return
}
c.WriteMessage(opcode.Text, []byte("hello"))
time.Sleep(time.Hour) //demo里面等待下OnMessage 看下执行效果,因为greatws.Dial和WriteMessage都是非阻塞的函数调用,不会卡住主go程
}
配置函数
客户端配置参数
配置header
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientHTTPHeader(http.Header{
"h1": "v1",
"h2":"v2",
}))
}
配置握手时的超时时间
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDialTimeout(2 * time.Second))
}
配置自动回复ping消息
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReplyPing())
}
配置客户端最大读message
// 限制客户端最大服务返回返回的最大包是1024,如果超过这个大小报错
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientReadMaxMessage(1024))
配置客户端压缩和解压消息
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientDecompressAndCompress())
}
配置客户端上下文接管
func main() {
greatws.Dial("ws://127.0.0.1:12345/test", greatws.WithClientContextTakeover())
}
服务端配置参数
配置服务自动回复ping消息
func main() {
c, err := greatws.Upgrade(w, r, greatws.WithServerReplyPing())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
配置服务端最大读message
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerReadMaxMessage(1024))
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
配置服务端解压消息
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerDecompression())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
配置服务端压缩和解压消息
func main() {
c, err := greatws.Upgrade(w, r, greatws.WithServerDecompressAndCompress())
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
配置服务端上下文接管
func main() {
// 配置服务端读取客户端最大的包是1024大小, 超过该值报错
c, err := greatws.Upgrade(w, r, greatws.WithServerContextTakeover)
if err != nil {
fmt.Println("Upgrade fail:", err)
return
}
}
100w websocket长链接测试
e5 洋垃圾机器
- cpu=e5 2686(单路)
- memory=32GB
BenchType : BenchEcho
Framework : greatws
TPS : 106014
EER : 218.54
Min : 49.26us
Avg : 94.08ms
Max : 954.33ms
TP50 : 45.76ms
TP75 : 52.27ms
TP90 : 336.85ms
TP95 : 427.07ms
TP99 : 498.66ms
Used : 18.87s
Total : 2000000
Success : 2000000
Failed : 0
Conns : 1000000
Concurrency: 10000
Payload : 1024
CPU Min : 184.90%
CPU Avg : 485.10%
CPU Max : 588.31%
MEM Min : 563.40M
MEM Avg : 572.40M
MEM Max : 594.48M
5800h cpu
- cpu=5800h
- memory=64GB
BenchType : BenchEcho
Framework : greatws
TPS : 103544
EER : 397.07
Min : 26.51us
Avg : 95.79ms
Max : 1.34s
TP50 : 58.26ms
TP75 : 60.94ms
TP90 : 62.50ms
TP95 : 63.04ms
TP99 : 63.47ms
Used : 40.76s
Total : 5000000
Success : 4220634
Failed : 779366
Conns : 1000000
Concurrency: 10000
Payload : 1024
CPU Min : 30.54%
CPU Avg : 260.77%
CPU Max : 335.88%
MEM Min : 432.25M
MEM Avg : 439.71M
MEM Max : 449.62M
# Functions
No description provided by the author
初始化函数.
https://datatracker.ietf.org/doc/html/rfc6455#section-4.1 又是一顿if else, 咬文嚼字.
No description provided by the author
创建一个多路事件循环.
初始化一个多路事件循环,并且运行它.
No description provided by the author
No description provided by the author
StringToBytes 没有内存开销的转换
func StringToBytes(s string) (b []byte) { bh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) sh := (*reflect.StringHeader)(unsafe.Pointer(&s)) bh.Data = sh.Data bh.Len = sh.Len bh.Cap = sh.Len return b }.
No description provided by the author
最小业务goroutine数量, 控制业务go程数量 initCount: 初始化的协程数 min: 最小协程数 max: 最大协程数.
6.获取http header.
1.
0.
18.2 配置服务端Callback相关方法在io event loop中执行.
4.配置压缩.
21.1 设置客户端支持上下文接管, 默认不支持上下文接管.
20.2 配置自定义task, 需要确保传入的值是有效的,不然会panic.
21.2 配置压缩和解压缩.
10 配置解压缩.
15.2 配置延迟包的初始化buffer大小.
3.配置握手时的timeout,tcp连接的timeout.
No description provided by the author
No description provided by the author
2.配置http.Header.
6 配置忽略pong消息.
13.
14.2 配置最大延迟个数.client.
21.1 设置客户端最大窗口位数,使用上下文接管时,这个参数才有效.
20.2 配置event.
17.2 配置客户端OnClose.
仅仅配置OnMessae函数.
22.1 设置客户端最大可以读取的message的大小, 默认没有限制.
16.2 .设置客户端读超时时间.
配置自动回应ping frame, 当收到ping, 回一个pong.
默认模式 19.2 配置客户端使用stream模式处理请求,go程会复用,保证OnMessage的处理是有序的.
2.
1.配置tls.config.
默认stream2,忽略这个API 19.2 配置客户端使用stream模式处理请求,go程会复用,但是不保证顺序性,如果你的请求对时序有要求,请不要使用这个模式,.
No description provided by the author
关闭: 在解析循环中运行websocket OnOpen, OnMessage, OnClose 回调函数.
开启几个事件循环, 控制io go程数量.
设置日志级别.
设置每个事件循环一次返回的最大事件数量.
默认行为: 在解析循环中运行websocket OnOpen, OnMessage, OnClose 回调函数.
配置服务端回调函数.
配置服务端回调函数.
18.1 配置服务端Callback相关方法在io event loop中执行.
21.2 设置服务端支持上下文接管, 默认不支持上下文接管.
20.1 配置自定义task, 需要确保传入的值是有效的,不然会panic.
21.1 配置压缩和解压缩.
No description provided by the author
15.1 配置延迟包的初始化buffer大小.
11 关闭bufio clear hack优化.
3.关闭utf8检查.
No description provided by the author
13.
14.1 配置最大延迟个数.server.
22.2 设置服务端最大窗口位数, 使用上下文接管时,这个参数才有效.
20.1 配置event.
17。 只配置OnClose 17.1 配置服务端OnClose.
4.仅仅配置OnMessae函数 仅仅配置OnMessae函数.
22.2 设置服务端最大可以读取的message的大小,默认没有限制.
16.
5.
默认模式 19.1 配置服务端使用stream模式处理请求,go程会复用,保证OnMessage的处理是有序的.
2.
设置TCP_NODELAY 为false, 开启nagle算法 设置服务端TCP_NODELAY.
20.1 配置服务端使用unstream模式处理请求,go程会复用, 但是不保证顺序性,如果你的请求对时序有要求,请不要使用这个模式,.
7.
# Constants
No description provided by the author
No description provided by the author
No description provided by the author
DataCannotAccept 收到一个不能接受的数据类型.
EndpointGoingAway 对端正在消失.
No description provided by the author
No description provided by the author
NoExtensions 只用于客户端, 服务端返回扩展消息.
NormalClosure 正常关闭.
NotConsistentMessageType 表示对端正在终止连接, 消息类型不一致.
No description provided by the author
No description provided by the author
ProtocolError 表示对端由于协议错误正在终止连接.
ServerTerminating 服务端遇到意外情况, 中止请求.
TerminatingConnection 表示对端正在终止连接, 没有好用的错误, 可以用这个错误码表示.
No description provided by the author
TooBigMessage 消息太大, 不能处理, 关闭连接.
# Variables
默认MultiEventLoop.
conn已经被关闭.
No description provided by the author
close值不对.
No description provided by the author
close的值是空的.
事件循环为空.
事件循环没有启动.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
读超时.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
写超时.
No description provided by the author
# Structs
No description provided by the author
Config 配置 有两种方式可以配置相关值 1.
Conn大小改变历史,增加上下文接管,从<160到184.
No description provided by the author
1.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
# Type aliases
No description provided by the author
No description provided by the author
3.
2.
No description provided by the author
No description provided by the author
No description provided by the author
https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1 这里记录了各种状态码的含义.