找回密码
 注册

QQ登录

只需一步,快速开始

搜索
查看: 4211|回复: 0

controller侧与agent侧分布式通迅实现-睿云智合

[复制链接]
发表于 2018-11-14 18:19:43 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有账号?注册

×
.背景
在k8s分布式系统中,通迅成为重要的部分。 本文分享一下如何使用通迅中间件。
本文代码相关技术如下:
rabbitmq
redis
golang
k8s集群与集群之间通讯, 我们都可以使用相同的中间件rabbitmq。

本文使用最简单的模式 LB,单实例的RPC调用。
.分布式调用结构

2.1 rabbitmq lb模式调用


                               
登录/注册后可看大图

. agent1 agent2 agent3 同时上报自己在线时, rabbitmq自动调用 controller1 或 controller2 中其中一个实例,再由controller X 写入redis中去。当controller1、controller2需要所有agent状态时, 读取数据都是redis,所以都是一致的。

. agent1 agent2 agent3 获取配置信息时, rabbitmq也自动调用controller1或 controller2 其中一个实例。再由controller X 读取redis或者mysql数据,再返回给agent。不论是调用到controller1还是controller2,返回的数据都是一致。


2.2 rabbitmq 单实例模式调用


                               
登录/注册后可看大图
controller 实例下发配置信息时:
setp1. 获得当前在线的agent。
setp2. 单实例模式rpc调用。向所有的agent发送配置信息。
setp3. 可以明确了解有没有agent时下发配置失败的。如果都失败,则本次调用失败.。只要有一个失败,就可以认为需要重发一次命令。
.代码实现
3.1. rabbitmq rpc 调用 客户端实现
package ingress
import (
"fmt"
"time"
"context"
"github.com/wzhliang/xing"
"wise2c/wisecloud-ingress-agent/communicate"
"wise2c/wisecloud-ingress-agent/log"
"wise2c/wisecloud-ingress-agent/common"
)
type ControllerClient struct {
Producer  *xing.Client
Client    communicate.ControllerHelperClient
}
func NewControllerClient() *ControllerClient {
agent := &ControllerClient{}
//amqp_url := "amqp://guest:guest@localhost:5672/"
amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",
common.MQUser,
common.MQPassword,
common.MQHost,
common.MQPort,
)
var err error
agent.Producer, err = xing.NewClient(
globalRPCAgentName,
amqp_url,
xing.SetIdentifier(&xing.NoneIdentifier{}),
xing.SetSerializer(&xing.JSONSerializer{}),
)
if err != nil {
log.Error("xing.NewClient() is failed.%s", err.Error())
return agent
}
//LB RPC
target := fmt.Sprint("ingress.controller")
agent.Client = communicate.NewControllerHelperClient(target, agent.Producer)
return agent
}
func (this *ControllerClient) Close() {
if this.Producer == nil {
return
}
this.Producer.Close()
//this.closed = true
}
func (this *ControllerClient) OnlineAgent(name string) error {

ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
log.Debug("OnlineAgent(%s)", name)
_, err := this.Client.OnlineAgent(ctx,
&communicate.OnlineAgentRequest{Name: name, })
if err != nil {
return err
}
return nil
}
func (this *ControllerClient) GetIngressConfigs(uuid string) (string, error){

ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
response, err := this.Client.GetIngressConfigs(ctx,
&communicate.GetIngressConfigsRequest{
Uuid:   uuid,
})
if err != nil {
return "error", err
}
return response.Content, err
}
func AgentHeartbeatToController() {
if globalControllerClient == nil {
return
}
var err error
content := ""
for {
err = globalControllerClient.OnlineAgent(globalRPCAgentName)
if err != nil {
log.Error(err.Error())
}
//1 time / 2 second.
time.Sleep(time.Millisecond*2000)
}
}
3.2. rabbitmq rpc 调用 服务端实现
package ingress
import (
"fmt"
"context"
"github.com/wzhliang/xing"
"wise2c/wisecloud-ingress-controller/communicate"
"wise2c/wisecloud-ingress-controller/log"
"wise2c/wisecloud-ingress-controller/common"
)
type ControllerServerImp struct{}
func (g *ControllerServerImp) OnlineAgent(ctx context.Context, req *communicate.OnlineAgentRequest, rsp *communicate.Void) error {
log.Debug("OnlineAgent(%s)", req.Name)
err := globalAgentClient.manager.OnlineAgent(req.Name)
if err != nil {
return err
}
return nil
}
func (g *ControllerServerImp) GetIngressConfigs(ctx context.Context, req *communicate.GetIngressConfigsRequest, rsp *communicate.GetIngressConfigsResponse) error {
log.Info("GetIngressConfigs(%s)", req.Uuid)
rsp.Content = "ok"
return nil
}
func RunRPCServer() {
//globalRPCControllerName = fmt.Sprintf("host.controller.%s", common.GetGuid())
//LB RPC.
globalRPCControllerName = fmt.Sprintf("ingress.controller")
//amqp_url := "amqp://guest:guest@localhost:5672/"
amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",
common.MQUser,
common.MQPassword,
common.MQHost,
common.MQPort,
)
svc, err := xing.NewService(
globalRPCControllerName,
amqp_url,
xing.SetSerializer(&xing.JSONSerializer{}),
xing.SetBrokerTimeout(15, 5),
)
if err != nil {
log.Error(fmt.Sprintf("MQURL=%s NewService is failed. %s", amqp_url, err.Error()))
}
communicate.RegisterControllerHelperHandler(svc, &ControllerServerImp{})
log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)
err = svc.Run()
if err != nil {
log.Error(err.Error())
}
}
3.3. rabbitmq 单实例rpc调用 客户端实现

1. rpc lb调用实时上报agent是否在线,实现了类似consul的服务发现的功能.

2. ClientManager可以通过redis中的实时数据,管理所有的rpc client. 当 agent 下线,或者3秒之间没有上报状态,则清除指定的rpc client.

3. 这样每次下发配置时,可以实时发送到每个rpc单实例服务器实例.

package ingress
import (
"fmt"
"sync"
"time"
"errors"
"context"
"github.com/wzhliang/xing"
"github.com/astaxie/beego/utils"
"wise2c/wisecloud-ingress-controller/communicate"
"wise2c/wisecloud-ingress-controller/log"
"wise2c/wisecloud-ingress-controller/common"
)
type AgentHelper struct {
mutex         *sync.Mutex
closed       bool
Helper        communicate.AgentHelperClient
Producer      *xing.Client
}
func NewAgentHelper(agent_name string) *AgentHelper {
var err error
agent := &AgentHelper{
mutex:   new(sync.Mutex),
closed:  false,
}
//amqp_url := "amqp://guest:guest@localhost:5672/"
amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",
common.MQUser,
common.MQPassword,
common.MQHost,
common.MQPort,
)
agent.Producer, err = xing.NewClient(
globalRPCControllerName,
amqp_url,
xing.SetIdentifier(&xing.NoneIdentifier{}),
xing.SetSerializer(&xing.JSONSerializer{}),
)
if err != nil {
log.Error(fmt.Sprintf("MQURL=%s NewClient is failed. %s",
amqp_url,
err.Error()))
return agent
}
//target := fmt.Sprint("ingress.agent.%s", agent_name)
agent.Helper = communicate.NewAgentHelperClient(agent_name, agent.Producer)
return agent
}
func (this *AgentHelper) Close() {
this.mutex.Lock()
defer this.mutex.Unlock()
if this.Producer != nil {
this.Producer.Close()
}
this.closed = true
}
func (this *AgentHelper) SetIngressConfig(content string) error  {
this.mutex.Lock()
defer this.mutex.Unlock()
if this.closed {
return errors.New("the client is closed.")
}
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
log.Info("SetIngressConfig(%s)", content)
_, err := this.Helper.SetIngressConfig(ctx,
&communicate.SetIngressConfigRequest{
content,
})
return err
}
func (this *AgentHelper) DelIngressConfig(uuid string) error {
this.mutex.Lock()
defer this.mutex.Unlock()
if this.closed {
return errors.New("the client is closed.")
}
ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
defer cancel()
log.Info("DelIngressConfig(%s)", uuid)
_, err := this.Helper.DelIngressConfig(ctx,
&communicate.DelIngressConfigRequest{
uuid,
})
return err
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type ClientManager struct {
mutex           *sync.Mutex
Pool   *utils.BeeMap
}
func NewClientManager() *ClientManager {
return &ClientManager{
mutex:      new(sync.Mutex),
Pool:       utils.NewBeeMap(),
}
}
func (p *ClientManager) Init() {
go p.RunConnect()
}
func (p *ClientManager) RunConnect() {
for {
names, err := globalRedisClient.GetAgentNames()
if err != nil {
log.Error(err.Error())
}
name_map := map[interface{}]int{}
for _, name := range names {
name_map[name] = 1
}
for key, v := range p.Pool.Items() {
//log.Error("key=%s", key)
_, ok := name_map[key]
if ok {
//log.Warning("find the %s", key)
continue
}
if v != nil {
log.Warning("Close the AgentHelper %s", key)
v.(*AgentHelper).Close()
}
log.Warning("Delete the Pool %s", key)
p.Pool.Delete(key)
}
for _, name := range names {
if !p.Pool.Check(name) {
log.Warning("New the AgentHelper %s", name)
p.Pool.Set(name, NewAgentHelper(name))
}
}
time.Sleep(time.Second * 1)
}
}
func (p *ClientManager) GetClients() map[interface{}]interface{} {
return p.Pool.Items()
}
func (p *ClientManager) OnlineAgent(agent_name string) error {
p.mutex.Lock()
defer p.mutex.Unlock()
globalRedisClient.OnlineAgent(agent_name, "1")
if !p.Pool.Check(agent_name) {
p.Pool.Set(agent_name, NewAgentHelper(agent_name))
}
return nil
}
func (p *ClientManager) OfflineAgent(agent_name string) error {
p.mutex.Lock()
defer p.mutex.Unlock()
globalRedisClient.OfflineAgent(agent_name)
agent_helper :=  p.Pool.Get(agent_name)
if agent_helper != nil {
agent_helper.(*AgentHelper).Close()
p.Pool.Delete(agent_name)
}
return nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type AgentClient struct {
manager *ClientManager
}
func NewAgentClient() *AgentClient {
return &AgentClient{
manager:    NewClientManager(),
}
}
func (client *AgentClient) Init() {
client.manager.Init()
}
type AgentHandlerCallback func(request, response interface{}, helper *AgentHelper) error
func (client *AgentClient) AgentHandler(request, response interface{}, callback AgentHandlerCallback) (err error) {
maps := client.manager.GetClients()
count := 0
for k, v := range maps {
name := k.(string)
if v == nil {
continue
}
helper := v.(*AgentHelper)
if callback == nil {
continue
}
err = callback(request, response, helper)
if err != nil {
return errors.New(fmt.Sprintf("%s %s", name, err.Error()))
}
count += 1
}
if count > 0 {
return nil
}
return errors.New("no agent online.")
}
func (this *AgentClient) SetIngressConfig(request, response interface{}) error {
return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {
return helper.SetIngressConfig(request.(string))
})
}
func (this *AgentClient) DelIngressConfig(request, response interface{}) error {
return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {
return helper.DelIngressConfig(request.(string))
})
}

3.4. rabbitmq 单实例rpc调用.服务端实现
package ingress
import (
"fmt"
"context"
"github.com/wzhliang/xing"
"wise2c/wisecloud-ingress-agent/communicate"
"wise2c/wisecloud-ingress-agent/log"
"wise2c/wisecloud-ingress-agent/common"
)
type AgentServerImp struct{}
func (g *AgentServerImp) SetIngressConfig(ctx context.Context, req *communicate.SetIngressConfigRequest, rsp *communicate.Void) error {
log.Info("SetIngressConfig(%s)", req.Content)
config := &Wise2cIngressConfig{}
err := config.Parse([]byte(req.Content))
if err != nil {
log.Error(err.Error())
return err
}
globalIngressProcess.SetIngressConfig(config)
return nil
}
func (g *AgentServerImp) DelIngressConfig(ctx context.Context, req *communicate.DelIngressConfigRequest, rsp *communicate.Void) error {
log.Info("DelIngressConfig(%s)", req.Uuid)
globalIngressProcess.DelIngressConfig(req.Uuid)
return nil
}
func RunRPCServer() {
//amqp_url := "amqp://guest:guest@localhost:5672/"
amqp_url := fmt.Sprintf("amqp://%s:%s@%s:%d",
common.MQUser,
common.MQPassword,
common.MQHost,
common.MQPort,
)
svc, err := xing.NewService(
globalRPCAgentName,
amqp_url,
xing.SetSerializer(&xing.JSONSerializer{}),
)
if err != nil {
log.Error(fmt.Sprintf("MQURL=%s, NewService() is failed. %s", amqp_url,  err.Error()))
}
communicate.RegisterAgentHelperHandler(svc, &AgentServerImp{})
log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)
go LoopRPC(svc)
}
func LoopRPC(svc *xing.Client) {
err := svc.Run()
if err != nil {
log.Error(err.Error())
}
}
.总结
● 通过 rabbitmq lb调用方式,可以实现从agent侧上报数据到controller侧或者agent侧拉取需要的数据。
● 通过rabbitmq 单实例调用方式,由于有了之前lb上报agent状态,或者使用第三方 consul.etcd中服务发现功能。我们可以实现从controller侧下发配置到每一个agent,在每个agent实例中完成相同的功能。

routeros
您需要登录后才可以回帖 登录 | 注册

本版积分规则

QQ|Archiver|手机版|小黑屋|软路由 ( 渝ICP备15001194号-1|渝公网安备 50011602500124号 )

GMT+8, 2024-4-20 19:12 , Processed in 0.285931 second(s), 4 queries , Gzip On, Redis On.

Powered by Discuz! X3.5 Licensed

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表