问题描述
项目中遇到一个问题,每当有节点变更时,整个 gRPC 网络连接会重建
然后我对该问题做了下排查
最后发现是 gRPC Resolver 使用上的一个坑
问题代码
func (r *xxResolver) update(nodes []*registry.Node) {state := resolver.State{Addresses: make([]resolver.Address, 0, 10),}var grpcNodes []*registry.Node // grpc的节点for _, n := range nodes {if _, ok := n.ServicePorts["grpc"]; ok {grpcNodes = append(grpcNodes, n)}}for _, n := range grpcNodes {port := n.ServicePorts["grpc"]addr := resolver.Address{Addr: fmt.Sprintf("%s:%d", n.Host, port), 问题代码在下面这行 Attributes: attributes.New(registry.NodeKey{}, n, registry.NodeNumber{}, len(grpcNodes)), 问题代码在这行 }state.Addresses = append(state.Addresses, addr)}if r.cc != nil {r.cc.UpdateState(state)}
}
这段代码的意思是:
- xxResolver 是个 gRPC 名字解析器实现
- 名字解析器就是收集有哪些 ip 列表(没接触过 gRPC Resolver ,你可以把它看成是类似 DNS 的域名解析过程)
- nodes 里服务发现维护的节点集合
- 从 nodes 中搜集相关 IP 列表
- 最后更新 gRPC 连接器/平衡器(cc)的状态
在构建 resolver.Address 时,字段 Attributes 添加了 NodeNumber 属性;而这个 NodeNumber 值是变化的
发生 bug 的原因
项目基于 gRPC 1.410 版本。该版本本身就存在前后矛盾的 bug 代码:
- 不同处的代码对 resolver.Address 对象的
等于
操作不一致
具体两处代码分别如下:
func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {// TODO: handle s.ResolverState.ServiceConfig?if logger.V(2) {logger.Info("base.baseBalancer: got new ClientConn state: ", s)}// Successful resolution; clear resolver error and ensure we return nil.b.resolverErr = nil// addrsSet is the set converted from addrs, it's used for quick lookup of an address.addrsSet := make(map[resolver.Address]struct{})for _, a := range s.ResolverState.Addresses {// Strip attributes from addresses before using them as map keys. So// that when two addresses only differ in attributes pointers (but with// the same attribute content), they are considered the same address.//// Note that this doesn't handle the case where the attribute content is// different. So if users want to set different attributes to create// duplicate connections to the same backend, it doesn't work. This is// fine for now, because duplicate is done by setting Metadata today.//// TODO: read attributes to handle duplicate connections.aNoAttrs := aaNoAttrs.Attributes = niladdrsSet[aNoAttrs] = struct{}{}if scInfo, ok := b.subConns[aNoAttrs]; !ok {// a is a new address (not existing in b.subConns).//// When creating SubConn, the original address with attributes is// passed through. So that connection configurations in attributes// (like creds) will be used.sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})if err != nil {logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)continue}b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}b.scStates[sc] = connectivity.Idlesc.Connect()} else {// Always update the subconn's address in case the attributes// changed.//// The SubConn does a reflect.DeepEqual of the new and old// addresses. So this is a noop if the current address is the same// as the old one (including attributes).scInfo.attrs = a.Attributesb.subConns[aNoAttrs] = scInfob.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})}}for a, scInfo := range b.subConns {// a was removed by resolver.if _, ok := addrsSet[a]; !ok {b.cc.RemoveSubConn(scInfo.subConn)delete(b.subConns, a)// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.// The entry will be deleted in UpdateSubConnState.}}// If resolver state contains no addresses, return an error so ClientConn// will trigger re-resolve. Also records this as an resolver error, so when// the overall state turns transient failure, the error message will have// the zero address information.if len(s.ResolverState.Addresses) == 0 {b.ResolverError(errors.New("produced zero addresses"))return balancer.ErrBadResolverState}return nil
}
baseBalancer.UpdateClientConnState 比较 Addresses 对象,事先处理了aNoAttrs.Attributes = nil
即,不考虑属性字段内容,即只要 ip port 一样就是同个连接
func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {ac.mu.Lock()defer ac.mu.Unlock()channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)if ac.state == connectivity.Shutdown ||ac.state == connectivity.TransientFailure ||ac.state == connectivity.Idle {ac.addrs = addrsreturn true}if ac.state == connectivity.Connecting {return false}// ac.state is Ready, try to find the connected address.var curAddrFound boolfor _, a := range addrs {if reflect.DeepEqual(ac.curAddr, a) {curAddrFound = truebreak}}channelz.Infof(logger, ac.channelzID, "addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)if curAddrFound {ac.addrs = addrs}return curAddrFound
}
addrConn.tryUpdateAddrs 内用的是 DeepEqual ,这里 Addresses 字段又是起作用的。
导致连接先关闭,后重建
问题解决
基于 gRPC 1.410 代码中的相互矛盾点,然后看了最新的代码:
// Equal returns whether a and o are identical. Metadata is compared directly,
// not with any recursive introspection.
//
// This method compares all fields of the address. When used to tell apart
// addresses during subchannel creation or connection establishment, it might be
// more appropriate for the caller to implement custom equality logic.
func (a Address) Equal(o Address) bool {return a.Addr == o.Addr && a.ServerName == o.ServerName &&a.Attributes.Equal(o.Attributes) &&a.BalancerAttributes.Equal(o.BalancerAttributes) &&a.Metadata == o.Metadata
}
加了 Address.Equal 统一了 Address 等号
操作的定义(说明官方也发现了这个问题 = =|)
但是很不幸,Attributes 字段也必须相等,才算地址相等
因此,项目中的代码还是写错的