gRPC源码/负载均衡

本文以roundRobin为例介绍gRPC负载均衡实现。

代码

https://github.com/messixukej...
在liangzhiyang/annotate-grpc-go基础上补充了部分注释

关键interface

负载均衡器:
type Balancer interface{
    //启动负载均衡器,dialing的时候调用。
    Start(target string, config BalancerConfig) error
  //通知负载均衡器由新地址连接ok
    Up(addr Address) (down func(error))
    //获取下一个有效的负载均衡地址
    Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
    //地址更新通知channel,这里返回的是全量地址
    Notify() <-chan[]Address
    Close() error
}

//地址解析器接口
type Resolver interface{
    //解析器Resolver 用于创建 服务地址watcher。
    Resolve(target string) (Watcher, error)
}

//服务地址发现器接口
//watcher用于观察服务地址的变化
type Watcher interface{
    //阻塞接口,知道有服务地址变化或者错误发生
    Next() ([]*Update, error)
    
    Close()
}

关键数据结构

type testWatcher struct{
//用于接收地址更新
update chan*naming.Update

//用于表示多少个更新发生
side chan int

//用于通知地址注入者更新读已结束
readDone chanint
}

实现流程(以RoundRobin为例)

1、注入负载均衡规则

使用Dial时,使用WithBalance注入负载均衡规则
func WithBalancer(b Balancer) DialOption {
    return func(o *dialOptions) {
        o.balancer = b
    }
}

例如,这里注入了RoundRobin 轮寻规则。
cc, err := Dial("foo.bar.com", WithBalancer(RoundRobin(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))

2、 启动负载均衡器

roundRobin.Start -> testNameResolver.Resolve 生成服务地址发现器 -> 建立服务地址发现任务 watchAddrUpdates(新的goroutine循环监控地址)

3、服务地址发现器(开启负载均衡后,不管是初始化还是过程中的变化,都是通过服务地址发现器来获取服务端地址)

roundRobin.watchAddrUpdates ->阻塞在watcher.Next获取地址变更动作 -> 更新roundRobin.addrs -> 写入roundRobin.addrCh

3.1、开工过程,根据balancer.Notify获取服务端地址 -> 根据地址列表创建连接resetAddrConn
地址注入方式:Resolve中将服务端初始地址注入

3.2、动态变更地址 ClientConn.lbWatcher,根据balancer.Notify获取服务端地址 -> 新增地址resetAddrConn,删除地址tearDown。
地址注入方式:testWatcher.inject

3.3 创建连接细化resetAddrConn->resetTransport->roundRobin.Up

3.4 删除连接细化tearDown->roundRobin.down

4、提交记录

Invoke->getTransport->balancer.Get获取有效地址(策略:根据connected状态,循环找到有效地址。如果找不到则阻塞在waitCh)

waitCh阻塞解除由3.3触发

相关推荐