Distributed WebSocket 是一个支持分布式部署的 WebSocket 服务框架,基于 Go 语言实现。它通过 Redis/Etcd 存储客户端连接信息,利用 gRPC 在服务节点间传递消息,实现了跨节点的实时通信。
- 支持单节点和分布式部署
- 基于 Redis/Etcd 的连接信息共享
- 通过 gRPC 实现跨节点消息传递
- 高性能的 WebSocket 连接处理
- 自动连接管理和消息广播
- 客户端连接状态监控和错误处理
┌──────────────────┐ ┌───────────────────┐ ┌───────────────┐ │ WebSocket 客户端 │────▶│ WebSocket 服务节点 │◀───▶│ Redis/Etcd │ └──────────────────┘ └───────────────────┘ └───────────────┘ │ ▲ ▼ │ ┌──────────────────┐ │ gRPC │ │ 服务接口 │ └──────────────────┘ 系统主要组件:
- WebSocket 客户端 - 浏览器或其他设备建立 WebSocket 连接
- WebSocket 服务节点 - 承载 WebSocket 连接,处理消息收发
- Redis/Etcd - 存储连接的元信息(connection_id ↔ 节点IP+端口)
- gRPC 服务 - 节点间通过 gRPC 互相发送消息
go get github.com/jayecc/go-websocket- Go 1.19+
- Redis
- gRPC
package main import ( "log" "net/http" "time" "github.com/gin-gonic/gin" websocket "github.com/jayecc/go-websocket" ) func main() { gin.SetMode(gin.ReleaseMode) gin.DisableConsoleColor() app := gin.Default() // 创建WebSocket Hub hub := websocket.NewHubRun() defer hub.Close() // 注册WebSocket路由 app.GET("/ws", func(ctx *gin.Context) { client := websocket.NewClient(hub, websocket.WithId("xxxx")) // 设置连接回调 client.OnConnect(func(conn *websocket.Client) { log.Printf("Client %s connected", conn.GetID()) }) // 设置消息处理回调 client.OnEvent(func(conn *websocket.Client, messageType int, message []byte) { log.Printf("Received message from client %s: %s", conn.GetID(), string(message)) // 发送服务器时间作为响应 response := time.Now().Format(time.RFC3339) conn.Emit([]byte(response)) }) // 设置断开连接回调 client.OnDisconnect(func(id string) { log.Printf("Client %s disconnected", id) }) // 设置错误处理回调 client.OnError(func(id string, err error) { log.Printf("Error from client %s: %v", id, err) }) // 建立WebSocket连接 if err := client.Conn(ctx.Writer, ctx.Request); err != nil { log.Printf("Failed to establish WebSocket connection: %v", err) ctx.String(http.StatusInternalServerError, "Failed to establish WebSocket connection") return } }) log.Fatal(app.Run(":8080")) }package main import ( "context" "log" "net" "net/http" "github.com/gin-gonic/gin" "github.com/go-redis/redis/v8" "github.com/grpc-ecosystem/go-grpc-middleware/recovery" "golang.org/x/sync/errgroup" "google.golang.org/grpc" websocket "github.com/jayecc/go-websocket" "github.com/jayecc/go-websocket/websocketpb" ) func main() { serverGroup := errgroup.Group{} grpcAddr := ":8081" httpAddr := ":8082" grpcHost := websocket.IP().String() + grpcAddr // 创建Redis客户端 redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, }) // 创建Redis存储 storage := websocket.NewRedisStorage(redisClient, "websocket") // 创建WebSocket Hub websocketHub := websocket.NewHubRun() defer websocketHub.Close() // 创建分布式WebSocket客户端 websocketClient := websocket.NewDistClient(storage) // 启动 gRPC 服务器 serverGroup.Go(func() error { lis, err := net.Listen("tcp", grpcAddr) if err != nil { return err } grpcServer := grpc.NewServer( grpc.UnaryInterceptor(grpcrecovery.UnaryServerInterceptor()), ) // 注册 gRPC 服务 websocketpb.RegisterWebsocketServer(grpcServer, websocket.NewDistServer(websocketHub)) return grpcServer.Serve(lis) }) // 启动 HTTP 服务器 serverGroup.Go(func() error { gin.SetMode(gin.ReleaseMode) gin.DisableConsoleColor() app := gin.Default() // 注册WebSocket路由 app.GET("/ws", func(ctx *gin.Context) { session := websocket.NewDistSession(websocketHub, storage, grpcHost, websocket.WithId("xxxx")) session.OnError(func(id string, err error) { log.Printf("OnError: %v\n", err) }) session.OnEvent(func(conn *websocket.Client, messageType int, message []byte) { log.Printf("OnEvent: %s\n", string(message)) // 广播消息到所有节点 log.Println(websocketClient.Broadcast(context.Background(), []byte("grpc广播消息"))) }) session.OnConnect(func(conn *websocket.Client) { log.Printf("OnConnect: %s\n", conn.GetID()) }) session.OnDisconnect(func(id string) { log.Printf("OnDisconnect: %s\n", id) }) if err := session.Conn(ctx.Writer, ctx.Request); err != nil { ctx.String(http.StatusInternalServerError, "Failed to establish WebSocket connection") return } }) return app.Run(httpAddr) }) log.Println(serverGroup.Wait()) }管理活跃的客户端连接和消息广播。
NewHub()- 创建 Hub 实例NewHubRun()- 创建并运行 Hub 实例Client(id string)- 根据 ID 获取客户端Broadcast(message []byte)- 广播消息Close()- 关闭 Hub
表示单个 WebSocket 客户端连接。
NewClient(hub *Hub, opts ...Option)- 创建客户端Conn(w http.ResponseWriter, r *http.Request)- 建立 WebSocket 连接Emit(message []byte)- 向客户端发送消息Broadcast(message []byte)- 广播消息GetID()- 获取客户端 IDClose()- 关闭客户端连接
分布式会话管理器。
NewDistSession(hub *Hub, storage Storage, addr string, opts ...Option)- 创建分布式会话OnConnect(handler func(conn *Client))- 设置连接回调OnEvent(handler func(conn *Client, messageType int, message []byte))- 设置消息回调OnError(handler func(id string, err error))- 设置错误回调OnDisconnect(handler func(id string))- 设置断开连接回调
分布式客户端,用于跨节点发送消息。
NewDistClient(storage Storage)- 创建分布式客户端Emit(ctx context.Context, id string, message []byte)- 向指定客户端发送消息Online(ctx context.Context, id string)- 检查客户端是否在线Broadcast(ctx context.Context, message []byte)- 广播消息到所有节点
存储接口,用于保存客户端连接信息。
Set(key string, value string)- 设置键值对Get(key string)- 获取值Del(key ...string)- 删除键Clear(host string)- 清理指定主机的连接All()- 获取所有连接信息
项目提供了 gRPC 服务接口,用于节点间通信:
Emit- 向指定客户端发送消息Online- 检查客户端是否在线Broadcast- 广播消息
redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: "", DB: 0, })storage := websocket.NewRedisStorage(redisClient, "websocket")- 错误处理:始终实现错误回调以监控连接问题
- 资源清理:使用
defer确保 Hub 和连接正确关闭 - 超时控制:为 gRPC 调用设置合适的超时时间
- 重试机制:对于关键操作实现重试逻辑
- 监控日志:记录连接状态和消息处理日志
MIT