自适应负载均衡算法原理及p2c+EWMA算法实现
在选择负载均衡算法时,希望满足以下要求:
- 各分区与机房调度亲和力
负载尽可能低
- 当节点出现故障时,负载均衡算法可以自动隔离该节点
- 故障节点恢复后,可以自动恢复该节点的流量分配
基于这些考虑go-zero选择了2gorc+EmEm来实现。
算法基本思想
p2c
p2c(二选一)选一:从多个节点中随机选择两个节点。
go-zero将被随机选择3次。如果所选节点之一的健康状况满足要求,则中止选择并使用这两个节点。
EWMA
EWMA(指数加权移动平均)指数移动加权平均法:表示每个值的权重系数随着时间的推移呈指数下降。该值越接近当前时刻,权重系数越大,反映了上一时期的平均值。
- 公式:

- 变量说明:
Vt:代表:代表t期望值EV♓❙›EV♓:代表It是t-1必需的ewma值β:这是EWMA算法的常数标准❙'''''ewmaeewma不需要存储所有过去的值,金额计算量显着减少,存储资源也将减少。- 传统平均值计算算法对网络耗时不敏感,而
EWMA可以通过频繁请求进行修改β快速监控网络平均值或更多。- 如果请求比较频繁,说明该节点的网络负载增加了。我们希望密切关注节点当前处理请求所需的时间(这反映了节点的负载),因此我们将进行相应的调整。
β。β越小,EWMA值越接近此时,可以快速检测网络故障; - 当请求频率较低时,我们会相对增加
β值。由此计算出的值EWMA接近于牛顿方程算法中通过计算阻尼函数❙模型计算出的平均值
- 如果请求比较频繁,说明该节点的网络负载增加了。我们希望密切关注节点当前处理请求所需的时间(这反映了节点的负载),因此我们将进行相应的调整。
- 中实现自定义负载均衡器,我们必须首先实现
gRPC 接口。.golang.org/grpc/balancer/base/base.go/PickerBuilder。当服务节点更新时会调用该接口。接口中的Buildtype PickerBuilder interface { // Build returns a picker that will be used by gRPC to pick a SubConn. Build(info PickerBuildInfo) balancer.Picker } - 方法还需要实现
google.golang.org/grpc/balancer/balancer.go/Picker
- 接口。该接口主要实现负载均衡器,选择请求的节点
type Picker interface { Pick(info PickInfo) (PickResult, error) } - ,最后将我们实现的负载均衡器注册到负载均衡器
map
β
go-zero❙❙的值β是 :
其中
Δte❙❙‶‶ k是常数gRPC 要在
❀ncinggo-zero的主要实现逻辑将每次更新节点时调用
gRPC方法Build。此时,所有节点信息都存储在Build中。 gRPC获取节点处理请求时,会调用Pick方法获取节点。go-zero在Pick方法PickEWMA 值中实现算法p2c计算负载情况和低负载返回节点使用gRPC。- 请求结束时,
gRPC调用方法PickResult.Done这个方法❙再次启动-Pu'实现-Pu'耗时搜索等信息存储和计算值EWMA并保存以供下次请求时的负载计算使用。
负载均衡代码分析
- 保存所有服务节点信息
我们需要节省节点处理这个请求所需的时间,
EWMA❙›❓‶‶go-zero对于每个节点,建议采用以下结构:type subConn struct { addr resolver.Address conn balancer.SubConn lag uint64 // 用来保存 ewma 值 inflight int64 // 用在保存当前节点正在处理的请求总数 success uint64 // 用来标识一段时间内此连接的健康状态 requests int64 // 用来保存请求总数 last int64 // 用来保存上一次请求耗时, 用于计算 ewma 值 pick int64 // 保存上一次被选中的时间点 } p2cPicker实现接口balancer.Picker❀、‶ 接口、❀ 服务节点信息type p2cPicker struct { conns []*subConn // 保存所有节点的信息 r *rand.Rand stamp *syncx.AtomicDuration lock sync.Mutex }gRPC 当节点更新时,会调用Build方法,传递节点的所有信息。在这里,我们使用结构subConn存储有关每个节点的信息。并将它们连接在一起,并使用p2cPicker结构来存储它们func (b *p2cPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { ...... var conns []*subConn for conn, connInfo := range readySCs { conns = append(conns, &subConn{ addr: connInfo.Address, conn: conn, success: initSuccess, }) } return &p2cPicker{ conns: conns, r: rand.New(rand.NewSource(time.Now().UnixNano())), stamp: syncx.NewAtomicDuration(), } }- 随机挑选节点信息。分为三种情况:
- 服务节点只有一个。此时直接返回供电
gRPC使用即可 - 有两个服务节点,通过
EWMA值和EWMA值和计算负载对于gRPC使用 - 有多个服务节点,此时使用算法
p2c选择两个条件节点,比较负载,比较gRPC使用
主要实现代码如下:
switch len(p.conns) { case 0:// 没有节点,返回错误 return emptyPickResult, balancer.ErrNoSubConnAvailable case 1:// 有一个节点,直接返回这个节点 chosen = p.choose(p.conns[0], nil) case 2:// 有两个节点,计算负载,返回负载低的节点 chosen = p.choose(p.conns[0], p.conns[1]) default:// 有多个节点,p2c 挑选两个节点,比较这两个节点的负载,返回负载低的节点 var node1, node2 *subConn // 3次随机选择两个节点 for i := 0; i < pickTimes; i++ { a := p.r.Intn(len(p.conns)) b := p.r.Intn(len(p.conns) - 1) if b >= a { b++ } node1 = p.conns[a] node2 = p.conns[b] // 如果这次选择的节点达到了健康要求, 就中断选择 if node1.healthy() && node2.healthy() { break } } // 比较两个节点的负载情况,选择负载低的 chosen = p.choose(node1, node2) } - 服务节点只有一个。此时直接返回供电
load计算节点上方的负载 ❙❙❙❙‸ ❙❙‸ 方法调用load 计算方法节点负载。负载计算公式为:
负载 = ewma * in-flight这里简单解释一下:
ewma'相当于平均请求时间 是当前节点乘以处理的请求数即可粗略计算出当前节点的网络负载。func (c *subConn) load() int64 { // 通过 EWMA 计算节点的负载情况; 加 1 是为了避免为 0 的情况 lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1))) load := lag * (atomic.LoadInt64(&c.inflight) + 1) if load == 0 { return penalty } return load }- 请求结束并更新
EWMA等节点信息- 将节点处理的请求总数减少1
- ,节省处理时间点。用于计算自节点处理最后一个请求以来的距离,并计算
β - 的
EWMA值 - 计算此请求所需的时间并计算 EWMA值并将其存储在节点
lag属性中 - 计算节点健康状态存储在节点‶‶‶‶属性中
func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) { start := int64(timex.Now()) return func(info balancer.DoneInfo) { // 正在处理的请求数减 1 atomic.AddInt64(&c.inflight, -1) now := timex.Now() // 保存本次请求结束时的时间点,并取出上次请求时的时间点 last := atomic.SwapInt64(&c.last, int64(now)) td := int64(now) - last if td < 0 { td = 0 } // 用牛顿冷却定律中的衰减函数模型计算EWMA算法中的β值 w := math.Exp(float64(-td) / float64(decayTime)) // 保存本次请求的耗时 lag := int64(now) - start if lag < 0 { lag = 0 } olag := atomic.LoadUint64(&c.lag) if olag == 0 { w = 0 } // 计算 EWMA 值 atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w))) success := initSuccess if info.Err != nil && !codes.Acceptable(info.Err) { success = 0 } osucc := atomic.LoadUint64(&c.success) atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w))) stamp := p.stamp.Load() if now-stamp >= logInterval { if p.stamp.CompareAndSwap(stamp, now) { p.logStats() } } } }
项目地址
https://github.com /tal-tech/go-zero
欢迎来到 go-zero 和 和 ‹
版权声明
本文仅代表作者观点,不代表Code前端网立场。
本文系作者Code前端网发表,如需转载,请注明页面地址。
code前端网