idou老师教你学Istio 27:解读Mixer Report流程

1、概述

Mixer是Istio的核心组件,提供了遥测数据收集的功能,能够实时采集服务的请求状态等信息,以达到监控服务状态目的。

1.1 核心功能

•前置检查(Check):某服务接收并响应外部请求前,先通过Envoy向Mixer(Policy组件)发送Check请求,做一些access检查,同时确认adaptor所需cache字段,供之后Report接口使用;

•配额管理(Quota):通过配额管理机制,处理多请求时发生的资源竞争;

•遥测数据上报(Report):该服务请求处理结束后,将请求相关的日志,监控等数据,通过Envoy上报给Mixer(telemetry)

1.2 示例图

idou老师教你学Istio 27:解读Mixer Report流程

2、代码分析

2.1 Report代码分析

本节主要介绍Report的详细流程(基于Istio release1.0.0版本,commit id为3a136c90)。Report是mixer server的一个接口,供Envoy通过grpc调用。首先,我们从mixer server的启动入口main函数看起:

func main() {rootCmd := cmd.GetRootCmd(os.Args[1:], supportedTemplates(), supportedAdapters(), shared.Printf, shared.Fatalf)if err := rootCmd.Execute(); err != nil {os.Exit(-1)}
}

在rootCmd中,mixs通过server命令启动了mixer server,从而触发了runserver函数,在runserver中初始化(New)了一个server,我们一起看下newServer的函数,在这个函数中,与我们本节相关的内容就是Mixer初始化了一个grpc服务器NewGRPCServer。

rootCmd.AddCommand(serverCmd(info, adapters, printf, fatalf))
func serverCmd(info map[string]template.Info, adapters []adapter.InfoFn, printf, fatalf shared.FormatFn) *cobra.Command {sa := server.DefaultArgs()sa.Templates = infosa.Adapters = adaptersserverCmd := &cobra.Command{Use:   "server",Short: "Starts Mixer as a server",Run: func(cmd *cobra.Command, args []string) {runServer(sa, printf, fatalf)},}… …
}
func newServer(a *Args, p *patchTable) (*Server, error) {grpc.EnableTracing = a.EnableGRPCTracings.server = grpc.NewServer(grpcOptions...)mixerpb.RegisterMixerServer(s.server, api.NewGRPCServer(s.dispatcher, s.gp, s.checkCache))
}

在这个grpc的服务端中,定义了一个Report接口,这就是我们这节课主要关注的内容(可以看到Check接口也在此定义,我们下节再讲)

func (s *grpcServer) Report(ctx context.Context, req *mixerpb.ReportRequest) (*mixerpb.ReportResponse, error) {lg.Debugf("Report (Count: %d)", len(req.Attributes))// 校验attribute是否为空,空则直接returnif len(req.Attributes) == 0 {return reportResp, nil}// 若属性word为空,赋为默认值for i := 0; i < len(req.Attributes); i++ {iflen(req.Attributes[i].Words) == 0 {req.Attributes[i].Words = req.DefaultWords}}// 根据第一条attribute,生成proto包,proto包能跟踪一组attributesprotoBag := attribute.NewProtoBag(&req.Attributes[0], s.globalDict, s.globalWordList)// 初始化,开始跟踪attributes各个条目中属性accumBag := attribute.GetMutableBag(protoBag)// 保存accumBag的增量状态reportBag := attribute.GetMutableBag(accumBag)reportSpan, reportCtx := opentracing.StartSpanFromContext(ctx, "Report")reporter := s.dispatcher.GetReporter(reportCtx)var errors *multierror.Errorfor i := 0; i < len(req.Attributes); i++ {span, newctx := opentracing.StartSpanFromContext(reportCtx, fmt.Sprintf("attribute bag %d", i))// 第一个属性已经在创建proto包时创建,在此追踪所有attributesif i > 0 {if err := accumBag.UpdateBagFromProto(&req.Attributes[i], s.globalWordList); err != nil {err = fmt.Errorf("request could not be processed due to invalid attributes: %v", err)span.LogFields(otlog.String("error", err.Error()))span.Finish()errors = multierror.Append(errors, err)break}}lg.Debug("Dispatching Preprocess")// 真正开始分发,预处理阶段if err := s.dispatcher.Preprocess(newctx, accumBag, reportBag); err != nil {err = fmt.Errorf("preprocessing attributes failed: %v", err)span.LogFields(otlog.String("error", err.Error()))span.Finish()errors = multierror.Append(errors, err)continue}lg.Debug("Dispatching to main adapters after running preprocessors")lg.Debuga("Attribute Bag: \n", reportBag)lg.Debugf("Dispatching Report %d out of %d", i+1, len(req.Attributes))// 真正开始分发,将数据逐步加入到缓存中if err := reporter.Report(reportBag); err != nil {span.LogFields(otlog.String("error", err.Error()))span.Finish()errors = multierror.Append(errors, err)continue}span.Finish()// purge the effect of the Preprocess call so that the next time through everything is cleanreportBag.Reset()}reportBag.Done()accumBag.Done()protoBag.Done()// 真正的发送函数,从缓存中取出并发送到adaptorif err := reporter.Flush(); err != nil {errors = multierror.Append(errors, err)}reporter.Done()if errors != nil {reportSpan.LogFields(otlog.String("error", errors.Error()))}reportSpan.Finish()if errors != nil {lg.Errora("Report failed:", errors.Error())return nil, grpc.Errorf(codes.Unknown, errors.Error())}// 过程结束return reportResp, nil
}

通过上述代码解读,我们了解了Report接口的工作流程,但此时我们还并不知道一个请求的状态是如何报给adaptor的,下面我们通过简要的函数串接,把这部分流程串起来:

上述的预处理阶段Preprocess与上报阶段Report,最终都会调用到dispatch函数,仅通过不同的type来区分要做的事情;

func (d *Impl) Preprocess(ctx context.Context, bag attribute.Bag, responseBag *attribute.MutableBag) error {s := d.getSession(ctx, tpb.TEMPLATE_VARIETY_ATTRIBUTE_GENERATOR, bag)s.responseBag = responseBagerr := s.dispatch()if err == nil {err = s.err}… …
}
func (r *reporter) Report(bag attribute.Bag) error {s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, bag)s.reportStates = r.stateserr := s.dispatch()if err == nil {err = s.err}… …
}

而dispatch函数中做了真正的分发动作,包括:

1.遍历所有adaptor,调用adaptor中的函数,针对不同的adaptor生成不同的instance,并将instance缓存放入reportstates

var instance interface{}
if instance, err = input.Builder(s.bag); err != nil {log.Errorf("error creating instance: destination='%v', error='%v'", destination.FriendlyName, err)s.err = multierror.Append(s.err, err)continue
}
type NamedBuilder struct {InstanceShortName stringBuilder           template.InstanceBuilderFn
}
InstanceBuilderFn func(attrs attribute.Bag) (interface{}, error)
CreateInstanceBuilder: func(instanceName string, param proto.Message, expb *compiled.ExpressionBuilder) (template.InstanceBuilderFn, error)
builder.build(attr)
// For report templates, accumulate instances as much as possible before commencing dispatch.
if s.variety == tpb.TEMPLATE_VARIETY_REPORT {state.instances = append(state.instances, instance)continue
}

2.将instance分发到所有adaptor,最终调用并分发到adaptor的HandleMetric函数中

func (r *reporter) Flush() error {s := r.impl.getSession(r.ctx, tpb.TEMPLATE_VARIETY_REPORT, nil)s.reportStates = r.statess.dispatchBufferedReports()err := s.err… …
}
func (s *session) dispatchBufferedReports() {// Ensure that we can run dispatches to all destinations in parallel.s.ensureParallelism(len(s.reportStates))// dispatch the buffered dispatchStates we've gotfor k, v := range s.reportStates {s.dispatchToHandler(v)delete(s.reportStates, k)}s.waitForDispatched()
}
func (s *session) dispatchToHandler(ds *dispatchState) {s.activeDispatches++ds.session = ss.impl.gp.ScheduleWork(ds.invokeHandler, nil)
}
case tpb.TEMPLATE_VARIETY_REPORT:ds.err = ds.destination.Template.DispatchReport(ctx, ds.destination.Handler, ds.instances)
type TemplateInfo struct {Name             stringVariety          tpb.TemplateVarietyDispatchReport   template.DispatchReportFnDispatchCheck    template.DispatchCheckFnDispatchQuota    template.DispatchQuotaFnDispatchGenAttrs template.DispatchGenerateAttributesFn
}
DispatchReport: func(ctx context.Context, handler adapter.Handler, inst []interface{}) error {// Convert the instances from the generic []interface{}, to their specialized type.instances := make([]*metric.Instance, len(inst))for i, instance := range inst {instances[i] = instance.(*metric.Instance)}// Invoke the handler.if err := handler.(metric.Handler).HandleMetric(ctx, instances); err != nil {return fmt.Errorf("failed to report all values: %v", err)}return nil
}

2.2 相关结构体定义

Report接口请求体定义

// Used to report telemetry after performing one or more actions.
type ReportRequest struct {// 代表一个请求中的属性// 每个attribute代表一个请求动作,多个动作可汇总在一条message中以提高效率//虽然每个“属性”消息在语义上被视为与消息中的其他属性无关的独立独立实体,但此消息格式利用属性消息之间的增量编码,以便大幅减少请求大小并改进端到端 效率。 每组单独的属性用于修改前一组。 这消除了在单个请求中多次冗余地发送相同属性的需要。// 如果客户端上报时不想使用增量编码,可全量的发送所有属性.Attributes []CompressedAttributes `protobuf:"bytes,1,rep,name=attributes" json:"attributes"`// 所有属性的默认消息级字典.// 这使得可以为该请求中的所有属性共享相同的字典,这可以大大减少整体请求大小DefaultWords []string `protobuf:"bytes,2,rep,name=default_words,json=defaultWords" json:"default_words,omitempty"`// 全局字典的词条数,可检测客户端与服务端之间的全局字典是否同步GlobalWordCount uint32 `protobuf:"varint,3,opt,name=global_word_count,json=globalWordCount,proto3" json:"global_word_count,omitempty"`
}

3、总结

Mixer中涉及很多缓存命中等用于优化性能的设计,本文仅介绍了Mixer中Report接口发送到adaptor的过程,一些性能优化设计,如protobag,dispatch缓存等内容,将会在后续文章中解析。

相关服务请访问https://support.huaweicloud.com/cce/index.html?cce_helpcenter_2019

转载于:https://blog.51cto.com/14051317/2353294

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/536914.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql删除密码代码_mysql 用户新建、受权、删除、密码修改

mysql 用户新建、授权、删除、密码修改首先要声明一下&#xff1a;一般情况下&#xff0c;修改MySQL密码&#xff0c;授权&#xff0c;是需要有mysql里的root权限的。注&#xff1a;本操作是在WIN命令提示符下&#xff0c;phpMyAdmin同样适用。用户&#xff1a;phplamp 用户数…

KindEditor 上传漏洞致近百个党政机关网站遭植入

开发四年只会写业务代码&#xff0c;分布式高并发都不会还做程序员&#xff1f; >>> 2月21日消息&#xff0c;近日&#xff0c;安恒明鉴网站安全监测平台和应急响应中心监测发现近百起党政机关网站被植入色情广告页面&#xff0c;分析发现被植入色情广告页面的网站都…

php mysql登陆页面完整代码_求助:PHP实现登陆注册的代码是什么啊(主要是数据库那块)?...

思路&#xff1a;注册&#xff1a;获取前台表单数据->数据库连接->判断数据是否存在&#xff0c;存在输出提示&#xff0c;不存在则向数据库插入表单传来的值->如果sql执行失败输出错误&#xff0c;成功功输出注册成功登陆&#xff1a;获取前台表单数据->数据库连接…

Insql 1.8.2 发布,轻量级 .NET ORM 类库

开发四年只会写业务代码&#xff0c;分布式高并发都不会还做程序员&#xff1f; >>> Insql 是一个轻量级的.NET ORM 类库。对象映射基于 Dapper, Sql 配置灵感来自于 Mybatis。 TA 的追求&#xff1a;简洁、优雅、性能与质量 TA 的宗旨&#xff1a;让你用起来感觉到…

python 函数中所有print保存csv_python for循环print怎样才能输出csv呢

import csv,redef search(req,line):text re.search(req,line)if text:data text.group(1)else:data noreturn datacsvfile file(serp_html.csv,rb)reader csv.reader(csvfile)输出百度搜索结果数据&#xff1a;当前关键词&#xff0c;排名&#xff0c;排名网站&#xff0…

java中具有继承关系的类及其对象初始化顺序

先说结论对于具有继承关系的类&#xff0c;它们的类和对象构造顺序为&#xff1a;父类的类构造器() -> 子类的类构造器() -> 父类成员变量的赋值和实例代码块 -> 父类的构造函数 -> 子类成员变量的赋值和实例代码块 -> 子类的构造函数。 实验代码如下&#xff1…

mysql varchar 2000能存_mysql 数据库 varchar 到底可以存多少数据呢,长文慎入

一、关于UTF-8 UTF-8 Unicode Transformation Format-8bit。是用以解决国际上字符的一种多字节编码。 它对英文使用8位(即一个字节)&#xff0c;中文使用24位(三个字节)来编码。 UTF-8包含全世界所有国家需要用到的字符&#xff0c;是国际编码&#xff0c;通用性强。 UTF-8编码…

教程 | 如何利用C++搭建个人专属的TensorFlow

在开始之前&#xff0c;首先看一下最终成型的代码&#xff1a; 分支与特征后端&#xff08;https://github.com/OneRaynyDay/autodiff/tree/eigen&#xff09;仅支持标量的分支&#xff08;https://github.com/OneRaynyDay/autodiff/tree/master&#xff09;这个项目是我与 Min…

docker kali安装mysql_kali安装docker(有效详细的教程) ——vulhub漏洞复现 001

前记&#xff1a;博主有着多次安装docker的丰富经验&#xff0c;曾经为了在kali成功安装docker花费不少时间。在kali2016.3一直到最新的kali2019.4都通吃&#xff01;所以跟着下面的步骤走&#xff0c;绝对不会出错。(该机子此前没装过docker&#xff0c;并且配置好了kali更新源…

PDF文件如何转成markdown格式

百度上根据pdf转makrdown为关键字进行搜索&#xff0c;结果大多数是反过来的转换&#xff0c;即markdown文本转PDF格式。 但是PDF转markdown的解决方案很少。 正好我工作上有这个需求&#xff0c;所以自己实现了一个解决方案。 下图是一个用PDF XChange Editor打开的PDF文件&am…

kangle支不支持PHP_【转载】PHP调用kangle的API

摘要&#xff1a;根据管理的API公布写了一个类封装了一个操作集合&#xff0c;这是一个kangleAPI的一个封...根据管理的API公布写了一个类封装了一个操作集合&#xff0c;这是一个kangleAPI的一个封装吧&#xff0c;是在其他地方看到的&#xff0c;接口包含获取easypanel的信息…

ES6 学习笔记(一)let,const和解构赋值

let和const let和const是es6新增的两个变量声明关键字&#xff0c;与var的不同点在于&#xff1a; &#xff08;1&#xff09;let和const都是块级作用域&#xff0c;在{}内有效&#xff0c;这点在for循环中非常有用&#xff0c;只在循环体内有效。var为函数作用域。 &#xff0…

mysql数据库容量和性能_新品速递丨容量盘性能提升超 300%,数据库支持 MySQL 8.0...

2关系型数据库 MySQL Plus支持 MySQL 8.0 内核及 XtraBackup 物理在线迁移方式关系型数据库服务 MySQL Plus 发布新版本 1.0.6 &#xff0c; 新增多项功能&#xff0c;提升了集群自动化运维能力。主要升级有&#xff1a;- 支持 MySQL 8.0 内核&#xff1a;根据官方测试&#xf…

10. Python面向对象

Python从设计之初就已经是一门面向对象的语言&#xff0c;正因为如此&#xff0c;在Python中创建一个类和对象是很容易的。如果接触过java语言同学应该都知道&#xff0c;Java面向对象三大特征是&#xff1a;封装、继承、多态。Python面向对象也有一些特征&#xff0c;接下来我…

mysql聚簇索引 和主键的区别_[MySQL] innoDB引擎的主键与聚簇索引

MysqL的innodb引擎本身存储的形式就必须是聚簇索引的形式,在磁盘上树状存储的,但是不一定是根据主键聚簇的,有三种情形:1. 有主键的情况下,主键就是聚簇索引2. 没有主键的情况下,第一个非空null的唯一索引就是聚簇索引3. 如果上面都没有,那么就是有一个隐藏的row-id作为聚簇索引…

前端页面:一直报Cannot set property 'height' of undefined

1、出现错误的例子&#xff0c;只拷贝了项目中关键出现问题的部分 例子中明明写了styleheight:16px这个属性&#xff0c;但是为什么还说height未定义呢 通过打印发现&#xff1a;cks.each(function () { autoTextAreaHeight($(this)); });中的$(this)取出来…

mysql表在线转成分区表_11g普通表在线转换分区表

本帖最后由 灯和树 于 2016-5-4 14:58 编辑由于业务系统数据量增大&#xff0c;对其用户表在线完成分区表转换过程&#xff0c;记录如下&#xff0c;11g数据库支持。创建过渡分区表根据USER_ID创建分区表CREATE TABLE SDP_SMECD.TEST_T_USER_ID(USER_ID NUMBER(13) …

tiger4444/rabbit4444后缀勒索病毒怎么删除 能否百分百恢复

上海某客户中了tiger4444的勒索病毒&#xff0c;找到我们后&#xff0c;一天内全部恢复完成。说了很多关于勒索病毒的事情&#xff0c;也提醒过大家&#xff0c;可总是有人疏忽&#xff0c;致使中招后&#xff0c;丢钱丢面子&#xff0c;还丢工作。 那么要怎样预防呢与处理呢&a…

mysql远程一会不用卡住_连接远程MySQL数据库项目启动时,不报错但是卡住不继续启动的,...

连接远程MySQL数据库项目启动时&#xff0c;不报错但是卡住不继续启动的&#xff0c;2018-03-12 17:08:52.532DEBUG[localhost-startStop-1]o.s.beans.factory.support.DefaultListableBeanFactory.doGetBean():251 -Returning cached instance of singleton bean ‘org.spring…

GPT-5、开源、更强的ChatGPT!

年终岁尾&#xff0c;正值圣诞节热闹气氛的OpenAI写下了2024年的发展清单。 OpenAI联合创始人兼首席执行官Sam Altman在社交平台公布&#xff0c;AGI&#xff08;稍晚一些&#xff09;、GPT-5、更好的语音模型、更高的费率限制&#xff1b; 更好的GPTs&#xff1b;更好的推理…