Categorygithub.com/sniperHW/clustergo
modulepackage
0.0.0-20240826144011-0be7b7c8d863
Repository: https://github.com/sniperhw/clustergo.git
Documentation: pkg.go.dev

# README

游戏服务端框架

clustergo是一个简单的网络游戏服务端框架。可以快速构建服务器集群内部以及服务器与客户端时间的通信。集群内部采用tcp通信。服务器客户端之间 支持tcp,websocket,kcp通信方式。

服务器集群间支持两种通信模式:PRC及普通的消息传递。服务器节点使用一个逻辑地址作为标识,只要知道对端逻辑地址就可以与对端通信。集群内部通信节点会建立tcp连接,连接在首次通信请求时建立。

逻辑地址

各节点使用一个32位逻辑地址标识,逻辑地址被分为3段,高12位表示服务器组,中8位表示节点类型(255保留给harbor使用),低12位表示进程id。

消息投递与处理

cluster支持在节点之间传递protobuf消息和raw binary消息。如果只需要使用protobuf可以使用默认的SendPbMessage和RegisterPbMessageHandler。

如果需要支持不同的消息格式,可以使用SendBinMessage和RegisterBinMessageHandler自己处理消息的编码和解码。

rpc

clustergo默认采用pbrpc

将协议文件添加到pbrpc/proto目录中

例如对于echo服务,首先在proto目录添加echo.proto文件,之后填充文件内容如下:

syntax = "proto3";

option go_package = "../service/echo";

message request {
	string msg = 1;
}

message response {
    string msg = 1;
}

对于任务服务,请求参数必须命名为request,返回值必须命名为response,go_package永远设置为"../service/echo"。

在pbrpc目录执行./gen.sh,这个操作会在pbrpc/service目录下生成echo.pb.go和echo.go两个文件。

其中echo.go的内容如下:

package echo

import (
	"context"
	"time"

	"github.com/sniperHW/rpcgo"
	"github.com/sniperHW/clustergo"
	"github.com/sniperHW/clustergo/addr"
)

type Replyer struct {
	replyer *rpcgo.Replyer
}

func (this *Replyer) Reply(result *Response, err error) {
	this.replyer.Reply(result, err)
}

type EchoService interface {
	OnCall(context.Context, *Replyer, *Request)
}

func Register(o EchoService) {
	clustergo.RegisterRPC("echo", func(ctx context.Context, r *rpcgo.Replyer, arg *Request) {
		o.OnCall(ctx, &Replyer{replyer: r}, arg)
	})
}

func Call(ctx context.Context, peer addr.LogicAddr, arg *Request) (*Response, error) {
	var resp Response
	err := clustergo.Call(ctx, peer, "echo", arg, &resp)
	return &resp, err
}

func CallWithCallback(peer addr.LogicAddr, deadline time.Time, arg *Request, cb func(*Response, error)) func() bool {
	var resp Response
	var fn func(interface{}, error)
	if cb != nil {
		fn = func(ret interface{}, err error) {
			if ret != nil {
				cb(ret.(*Response), err)
			} else {
				cb(nil, err)
			}
		}
	}

	return clustergo.CallWithCallback(peer, deadline, "echo", arg, &resp, fn)
}

服务端

在服务端定义实现了EchoService的类型,通过Register注册服务:


package main

import (
	"context"

	"github.com/sniperHW/clustergo"
	"github.com/sniperHW/clustergo/addr"
	"github.com/sniperHW/clustergo/example/discovery"
	"github.com/sniperHW/clustergo/log/zap"
	"github.com/sniperHW/clustergo/pbrpc/service/echo"
)

//实现echo.EchoService 
type echoService struct {
}

func (e *echoService) OnCall(ctx context.Context, replyer *echo.Replyer, request *echo.Request) {
	clustergo.Logger().Debug("echo:", request.Msg)
	replyer.Reply(&echo.Response{Msg: request.Msg}, nil)
}

func main() {
	l := zap.NewZapLogger("1.1.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
	clustergo.InitLogger(l.Sugar())

	//注册服务
	echo.Register(&echoService{})

	localaddr, _ := addr.MakeLogicAddr("1.1.1")
	clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)

	clustergo.Wait()

}

客户端


package main

import (
	"context"

	"github.com/sniperHW/clustergo"
	"github.com/sniperHW/clustergo/addr"
	"github.com/sniperHW/clustergo/example/discovery"
	"github.com/sniperHW/clustergo/log/zap"
	"github.com/sniperHW/clustergo/pbrpc/service/echo"
)

func main() {
	l := zap.NewZapLogger("1.2.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
	clustergo.InitLogger(l.Sugar())
	localaddr, _ := addr.MakeLogicAddr("1.2.1")
	clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)

	//假设echo服务全部由逻辑地址type=1的节点提供,这里任意获取一个type=1的节点
	echoAddr, _ := clustergo.GetAddrByType(1)

	//执行10次同步调用
	for i := 0; i < 10; i++ {
		resp, err := echo.Call(context.TODO(), echoAddr, &echo.Request{Msg: "hello"})
		l.Sugar().Debug(resp, err)
	}
	clustergo.Stop()
	clustergo.Wait()
}


集群

逻辑地址最高字段位表示集群

集群内的节点可以建立通信连接直接通信(harbor不受这个限制),跨集群节点通信请看下一小节。

默认情况下,集群A的节点无法通过GetAddrByType获取到集群B中的节点地址(如果已经获得对端的逻辑地址,则不受此限制)。如果希望将节点暴露到集群外,可以将节点的Export字段设置为true。

例如对于一个分服,开房间的游戏。

每个服属于一个集群,房间服务器作为公共服务,属于另外的集群。此时需要将战斗节点的Export字段设置为true。游戏服中的节点便可以通过GetAddrByType获取到可用的战斗节点。

跨集群通信

如果通信目标与自己不在同一集群内,则无法与目标建立直接通信连接,为了跨集群通信,需要启动harbor节点,消息路由如下:

本机 -> 本集群内harbor节点 -> 目标所在集群harbor节点 -> 目标节点

多集群部署示意图:

Alt text

如上图所示,有A,B两个集群,集群各自的harbor节点分别连接本集群的center以及harbors center。 当节点B向节点A发消息时,发现A不在本集群内无法建立直接通信,因此将消息发往harbor B,harbor B接收到路由请求后,根据目标地址 2.1.1将请求转发给harbor A(harbor A和harbor B连接了同一个harbors center,因此他们之间可以建立直接通信连接)。harbor A 接收到之后发现目标A可以直达,于是将消息发送给A。

Stream(在单个连接上建立多个流)

clustergo支持在同一cluster内的节点之间建立stream。典型的使用方式由gateway接受客户端连接,并且为每一个连接建立一个到gameserver的stream。在gameserver看来,每个stream代表一个客户端连接。

gameserver.go


package main

import (
	"github.com/sniperHW/clustergo"
	"github.com/sniperHW/clustergo/addr"
	"github.com/sniperHW/clustergo/example/discovery"
	"github.com/sniperHW/clustergo/logger/zap"
	"github.com/xtaci/smux"
)

func main() {
	l := zap.NewZapLogger("1.1.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
	clustergo.InitLogger(l.Sugar())
	localaddr, _ := addr.MakeLogicAddr("1.1.1")
	clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)
	clustergo.OnNewStream(func(s *smux.Stream) {
		//处理stream
		go func() {
			buff := make([]byte, 64)
			for {
				n, err := s.Read(buff)
				if err != nil {
					break
				}
				n, err = s.Write(buff[:n])
				if err != nil {
					break
				}
			}
			s.Close()
		}()
	})
	clustergo.Wait()
}


gateserver.go

package main

import (
	"io"
	"net"
	"sync"

	"github.com/sniperHW/netgo"
	"github.com/sniperHW/clustergo"
	"github.com/sniperHW/clustergo/addr"
	"github.com/sniperHW/clustergo/example/discovery"
	"github.com/sniperHW/clustergo/logger/zap"
)

func main() {
	l := zap.NewZapLogger("1.2.1.log", "./logfile", "debug", 1024*1024*100, 14, 28, true)
	clustergo.InitLogger(l.Sugar())
	localaddr, _ := addr.MakeLogicAddr("1.2.1")
	clustergo.Start(discovery.NewClient("127.0.0.1:8110"), localaddr)

	gameAddr, _ := clustergo.GetAddrByType(1)

	_, serve, _ := netgo.ListenTCP("tcp", "127.0.0.1:8113", func(conn *net.TCPConn) {
		go func() {
			//客户端连接到达,建立到1.1.1的stream
			cliStream, err := clustergo.OpenStream(gameAddr)
			if err != nil {
				conn.Close()
				return
			}

			defer func() {
				conn.Close()
				cliStream.Close()
			}()

			var wait sync.WaitGroup
			wait.Add(2)

			//将来自客户端的数据通过stream透传到1.1.1
			go func() {
				io.Copy(cliStream, conn)
				wait.Done()
			}()

			//将来自1.1.1的stream数据透传回客户端
			go func() {
				io.Copy(conn, cliStream)
				wait.Done()
			}()
			wait.Wait()
		}()
	})
	go serve()

	clustergo.Wait()
}

Membership

clustergo是为游戏服务端设计的分布式框架,游戏服务通常由一个个服务节点组成,每个节点提供一组内聚的功能服务。

Membership服务保存了所有成员的如下信息:

type Node struct {
	Addr      addr.Addr
	Export    bool //是否将节点暴露到cluster外部
	Available bool //是否可用,
}

节点启动时,首先连接Membership获取配置信息,之后通过配置的逻辑地址从配置中查询自身节点信息。如果找到则使用配置中的网络地址启动服务,否则启动失败。

节点发现

普通节点只会将配置中Cluster相同或Export为true的节点保存到本地。可以通过GetAddrByType来获取某个类型的可用节点(Available=true),如果有多个节点可用GetAddrByType将随机返回一个节点。

# Packages

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Functions

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Variables

.
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
连接建立前待发送消息缓冲区的大小,一旦缓冲区满,发送将返回ErrPendingQueueFull.
No description provided by the author
socket异步发送chan的大小.

# Structs

No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author
No description provided by the author

# Interfaces

No description provided by the author
No description provided by the author

# Type aliases

No description provided by the author
No description provided by the author