基于Confluent+Flink的实时数据分析最佳实践

简介:在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案。

业务背景

在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案。

Confluent是基于Apache Kafka提供的企业级全托管流数据服务,由 Apache Kafka 的原始创建者构建,通过企业级功能扩展了 Kafka 的优势,同时消除了 Kafka管理或监控的负担。

实时计算Flink版是阿里云基于 Apache Flink 构建的企业级实时大数据计算商业产品。实时计算 Flink 由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,提供全系列产品矩阵,完全兼容开源 Flink API,并充分基于强大的阿里云平台提供云原生的 Flink 商业增值能力。

一、准备工作-创建Confluent集群和实时计算Flink版集群

  1. 登录Confluent管理控制台,创建Confluent集群,创建步骤参考 Confluent集群开通
  1. 登录实时计算Flink版管理控制台,创建vvp集群。请注意,创建vvp集群选择的vpc跟confluent集群的region和vpc使用同一个,这样可以在vvp内部访问confluent的内部域名。

二、最佳实践-实时统计玩家充值金额-Confluent+实时计算Flink+Hologres

2.1 新建Confluent消息队列

  1. 在confluent集群列表页,登录control center

  1. 在左侧选中Topics,点击Add a topic按钮,创建一个名为confluent-vvp-test的topic,将partition设置为3

2.2 配置结果表 Hologres

  1. 进入Hologres控制台,点击Hologres实例,在DB管理中新增数据库`mydb`

  1. 登录Hologres数据库,新建SQL

  1. Hologres中创建结果表 SQL语句
--用户累计消费结果表CREATE TABLE consume (appkey VARCHAR,serverid VARCHAR,servertime VARCHAR,roleid VARCHAR,amount FLOAT,dt VARCHAR,primary key(appkey,dt));

2.3 创建实时计算vvp作业

  1. 首先登录vvp控制台,选择集群所在region,点击控制台,进入开发界面

  1. 点击作业开发Tab,点击新建文件,文件名称:confluent-vvp-hologres,文件类型选择:流作业/SQL

  1. 在输入框写入以下代码:
create TEMPORARY table kafka_game_consume_source(  appkey STRING,servertime STRING,consumenum DOUBLE,roleid STRING,serverid STRING    
) with ('connector' = 'kafka','topic' = 'game_consume_log','properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.xxx:9071[xxx可以找开发同学查看]','properties.group.id' = 'gamegroup','format' = 'json','properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks','properties.ssl.truststore.password' = '[your truststore password]','properties.security.protocol'='SASL_SSL','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="xxx[集群的用户]" password="xxx[相应的密码]";'
);
-- 创建累计消费hologres sink表
CREATE TEMPORARY TABLE consume(appkey STRING,serverid STRING,servertime STRING,roleid STRING,amount DOUBLE,dt STRING,PRIMARY KEY (appkey,dt) NOT ENFORCED)WITH ('connector' = 'hologres','dbname' = 'mydb','endpoint' = 'hgprecn-cn-tl32gkaet006-cn-beijing-vpc.hologres.aliyuncs.com:80','password' = '[your appkey secret]','tablename' = 'consume','username' = '[your app key]','mutateType' = 'insertorreplace');
--{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
--{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
--{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}-- 计算每个用户累积消费金额insert into consumeSELECTappkey,LAST_VALUE(serverid) as serverid,LAST_VALUE(servertime) as servertime,LAST_VALUE(roleid) as roleid,sum(consumenum) as amount,substring(servertime,1,10) as dtFROM kafka_game_consume_sourceGROUP BY appkey,substring(servertime,1,10)having sum(consumenum) > 0;
  1. 在高级配置里,增加依赖文件truststore.jks(访问内部域名得添加这个文件,访问公网域名可以不用),访问依赖文件的固定路径前缀都是/flink/usrlib/(这里就是/flink/usrlib/truststore.jks)

  1. 点击上线按钮,完成上线

  1. 在运维作用列表里找到刚上线的作用,点击启动按钮,等待状态更新为running,运行成功。

  1. 在control center的【Topics->Messages】页面,逐条发送测试消息,格式为:
{"appkey":"appkey1","servertime":"2020-09-30 14:10:36","consumenum":33.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:11:36","consumenum":30.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:13:36","consumenum":31.8,"roleid":"roleid1","serverid":"1"}
{"appkey":"appkey2","servertime":"2020-09-30 14:20:36","consumenum":33.8,"roleid":"roleid2","serverid":"2"}
{"appkey":"appkey1","servertime":"2020-09-30 14:30:36","consumenum":73.8,"roleid":"roleid1","serverid":"1"}

2.4 查看用户充值金额实时统计效果

三、最佳实践-电商实时PV和UV统计-Confluent+实时计算Flink+RDS

3.1 新建Confluent消息队列

  1. 在confluent集群列表页,登录control center

  1. 在左侧选中Topics,点击Add a topic按钮,创建一个名为pv-uv的topic,将partition设置为3

3.2 创建云数据库RDS结果表

  1. 登录 RDS 管理控制台页面,购买RDS。确保RDS与Flink全托管集群在相同region,相同VPC下

  1. 添加虚拟交换机网段(vswitch IP段)进入RDS白名单,详情参考:设置白名单文档

3.【vswitch IP段】可在 flink的工作空间详情中查询

  1. 在【账号管理】页面创建账号【高权限账号】

  1. 数据库实例下【数据库管理】新建数据库【conflufent_vvp】

  1. 使用系统自带的DMS服务登陆RDS,登录名和密码输入上面创建的高权限账户

  1. 双击【confluent_vvp】数据库,打开SQLConsole,将以下建表语句复制粘贴到 SQLConsole中,创建结果表
CREATE TABLE result_cps_total_summary_pvuv_min(summary_date date NOT NULL COMMENT '统计日期',summary_min varchar(255) COMMENT '统计分钟',pv bigint COMMENT 'pv',uv bigint COMMENT 'uv',currenttime timestamp COMMENT '当前时间',primary key(summary_date,summary_min)
)

3.3 创建实时计算VVP作业

1.【[VVP控制台】新建文件

  1. 在SQL区域输入以下代码:
--数据的订单源表
CREATE TABLE source_ods_fact_log_track_action (account_id VARCHAR,--用户IDclient_ip VARCHAR,--客户端IPclient_info VARCHAR,--设备机型信息platform VARCHAR,--系统版本信息imei VARCHAR,--设备唯一标识`version` VARCHAR,--版本号`action` VARCHAR,--页面跳转描述gpm VARCHAR,--埋点链路c_time VARCHAR,--请求时间target_type VARCHAR,--目标类型target_id VARCHAR,--目标IDudata VARCHAR,--扩展信息,JSON格式session_id VARCHAR,--会话IDproduct_id_chain VARCHAR,--商品ID串cart_product_id_chain VARCHAR,--加购商品IDtag VARCHAR,--特殊标记`position` VARCHAR,--位置信息network VARCHAR,--网络使用情况p_dt VARCHAR,--时间分区天p_platform VARCHAR --系统版本信息
) WITH ('connector' = 'kafka','topic' = 'game_consume_log','properties.bootstrap.servers' = 'kafka.confluent.svc.cluster.local.c79f69095bc5d4d98b01136fe43e31b93:9071','properties.group.id' = 'gamegroup','format' = 'json','properties.ssl.truststore.location' = '/flink/usrlib/truststore.jks','properties.ssl.truststore.password' = '【your password】','properties.security.protocol'='SASL_SSL','properties.sasl.mechanism'='PLAIN','properties.sasl.jaas.config'='org.apache.flink.kafka.shaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="【your user name】" password="【your password】";'
);
--{"account_id":"id1","client_ip":"172.11.1.1","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:10:00"}
CREATE TABLE result_cps_total_summary_pvuv_min (summary_date date,--统计日期summary_min varchar,--统计分钟pv bigint,--点击量uv bigint,--一天内同个访客多次访问仅计算一个UVcurrenttime timestamp,--当前时间primary key (summary_date, summary_min)
) WITH (type = 'rds',url = 'url = 'jdbc:mysql://rm-【your rds clusterId】.mysql.rds.aliyuncs.com:3306/confluent_vvp',',tableName = 'result_cps_total_summary_pvuv_min',userName = 'flink_confluent_vip',password = '【your rds password】'
);
CREATE VIEW result_cps_total_summary_pvuv_min_01 AS
selectcast (p_dt as date) as summary_date --时间分区, count (client_ip) as pv --客户端的IP, count (distinct client_ip) as uv --客户端去重, cast (max (c_time) as TIMESTAMP) as c_time --请求的时间
fromsource_ods_fact_log_track_action
groupby p_dt;
INSERTinto result_cps_total_summary_pvuv_min
selecta.summary_date,--时间分区cast (DATE_FORMAT (c_time, 'HH:mm') as varchar) as summary_min,--取出小时分钟级别的时间a.pv,a.uv,CURRENT_TIMESTAMP as currenttime --当前时间
fromresult_cps_total_summary_pvuv_min_01 AS a;
  1. 点击【上线】之后,在作业运维页面点击启动按钮,直到状态更新为RUNNING状态。

  1. 在control center的【Topics->Messages】页面,逐条发送测试消息,格式为:
{"account_id":"id1","client_ip":"72.11.1.111","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:11:00"}
{"account_id":"id2","client_ip":"72.11.1.112","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:12:00"}
{"account_id":"id3","client_ip":"72.11.1.113","client_info":"mi10","p_dt":"2021-12-01","c_time":"2021-12-01 19:13:00"}

3.4 查看PV和UV效果

    可以看出rds数据表的pv和uv会随着发送的消息数据,动态的变化,同时还可以通过【数据可视化】来查看相应的图表信息。

pv图表展示:

uv图表展示:

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/511152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度解读「无影云电脑远程办公解决方案」

简介:疫情常态化,企业如何应对「远程」带来的挑战? 疫情之下,很多企业选择远程办公来保证业务的正常运营,而远程办公解决方案需要具备哪些技术能力来应对“远程”带来的挑战呢? 一,弹性伸缩能…

疯了?黑客公开“25美元入侵星链”法,SpaceX给他钱,还诚邀大家一起来“黑”?...

整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)近日,国外论坛 Reddit 上的一则热帖给人看“懵”了:SpaceX 诚邀大家来入侵自家的星链(Starlink),成功者有赏。这令人不由感慨:居然…

车脉科技:业内首创“车企体验式营销”

随着新能源汽车不断得到人们的广泛关注,车企在新车型、新市场、新认知下如何提升销量以及用户如何选购一款合心意的智能电动车成为新能源智能时代的汽车营销难题。 车脉科技创业初衷 车脉科技的创始人孙泽锋说道:“创立车脉的初衷,我们一端想…

专访香侬科技:致力于让世界听到中文NLP的声音

像所有的创业者一样,香侬科技的初创团队胸怀梦想,期待有一天当人们提起香侬的时候,除了“信息论之父”,还能想起来有一家用技术在链接大千世界的科技公司——香侬科技。 新生的香侬科技选择“长在云上” 香侬科技的CTO王思宽说起…

驭数有道,天翼云数据库 TeleDB 全新升级

8月16日,以“红云天翼 安全普惠”为主题的天翼云TeleDB系列产品升级发布会在线上顺利举办。此次发布的天翼云自主研发云原生数据库进行了全新升级,推出一站式HTAP融合数据库,以及TeleDB数据库容灾双活方案,同时也展示了TeleDB数据…

如何构建一个流量无损的在线应用架构 | 专题尾篇

简介:我们将这些年在每一个环节中的相应解决方案,以产品化的方式沉淀到企业级分布式应用服务(EDAS)中。EDAS 致力于解决在线应用的全流程流量无损,经过 6 年的精细打磨,已经在流量接入与流量服务两个关键位…

云原生微服务技术趋势解读

简介:随着微服务技术门槛大幅下降,随着企业数字化升级步伐加速,随着云计算的迅速发展,微服务将无处不在;随着行业成熟度逐步提升,随着开源和标准推进,微服务的标准逐步形成,标准形成…

中国信通院魏博锴:云原生混部标准解读

嘉宾 | 魏博锴出品 | CSDN云原生2022年7月28日,中国信通院、腾讯云、FinOps产业标准工作组联合发起的《原动力x云原生正发声 降本增效大讲堂》系列直播活动第4讲如期举行,中国信通院云大所云计算部云原生研究员魏博锴解读了云原生混部标准。本文整理自魏…

从平凡到非凡 阿里云李克的技术进阶之路

简介:人物简介:李克 阿里云边缘云计算领域技术负责人 2009年硕士毕业加入阿里至今,一直从事CDN及边缘云领域的技术研发工作,在CDN、边缘计算等方向上有丰富的行业经验,全程参与了阿里云CDN商业化转型,边缘云…

一文搞懂redis

简介:NoSQL泛指非关系型数据库,随着web2.0互联网的诞生,传统的关系型数据库很难对付web2.0大数据时代!尤其是超大规模的高并发的社区,暴露出来很多难以克服的问题,NoSQL在当今大数据环境下发展的十分迅速&a…

热搜!华为 30 岁以下员工仅占 28%,网友:35 岁危机呢?

整理 | 郑丽媛出品 | CSDN(ID:CSDNnews)一直以来,程序员的“35 岁”都是圈内的热议话题:35 岁是程序员的职业终结点、程序员到 35 岁就废了、超过 35 岁的程序员容易被裁……久而久之,35 岁逐渐变成了一个很…

阿里云发布业界首本《云计算公网质量白皮书》

随着互联网的发展,网络已经融入了整个社会发展的进程,如同血液贯通人类文明的五脏六腑。一旦网络出现故障,将给社会的政治、经济、文化造成重大损失。 2021年10月4日,Facebook在地球上”消失”了6小时,市值跌掉3000亿…

选轻量应用服务器or云服务器ECS?一图帮你彻底区分

简介:轻量应用服务器适合轻量级且访问量低的应用场景,更适合个人开发者、对新手小白更友好;而云服务器ECS可覆盖全业务场景(如大数据分析,深度学习等),要求用户有一定的开发技术能力。 本文首发…

宜搭小技巧|一招摆脱纸质表单,数据收集更便捷

简介:开启「应用公开访问」,组织外成员也可提交数据。 许多公司在前台都会准备一个访客登记表,供来访者填写。但如果来访者数量较多,就会出现这样的问题…… 提供纸质表单供访客填写信息,使用起来繁琐且费时&#xff…

如何用 Serverless 低成本打造个人专属网盘?

简介:想要做个网盘不知如何开始,不妨花3分钟读读这篇,看看如何借助 Serverless ,低成本的做一个“不限制网速、无限扩展、同时支持数百种文件格式在线预览、编辑、协作”的专属个人 & 家庭网盘~ 前言 随着全球大数据不断增长…

云之后,亚马逊云科技要为业界提供水和空气一样的安全防护

云巨头亚马逊云科技,正在持续加码云安全。 编辑 | 宋慧 出品 | CSDN云计算 提到亚马逊云科技,我们首先想到的是它在云领域的计算存储等技术和优势。不过亚马逊云科技却连续四年在举办它的全球安全大会 re:Inforce,刚刚 ,2022 re:…

即学即会 Serverless | 如何解决 Serverless 应用开发部署的难题?

简介:开发者在选择使用 Serverless 时,仍会有开发和部署困难、厂商锁定等诸多担忧,有没有一种支持 Serverless 应用全生命周期管理的开发者工具,能够简单快速上手并真正帮助我们提升研发、运维的效能的呢? 破局&#x…

NBF事件中心架构设计与实现

简介:NBF是阿里巴巴供应链中台的基础技术团队打造的一个技术PaaS平台,她提供了微服务FaaS框架,低代码平台和中台基础设施等一系列的PaaS产品,旨在帮助业务伙伴快速复用和扩展中台能力,提升研发效能和对外的商业化输出。…

关于“算力”,这篇文章值得一看

作者 | 小枣君来源 | 鲜枣课堂今天这篇文章,我们来聊聊算力。这两年,算力可以说是ICT行业的一个热门概念。在新闻报道和大咖演讲中,总会出现它的身影。那么,究竟到底什么是算力?算力包括哪些类别,分别有什么…

宜搭小技巧|找不到应用怎么办?群应用一键直达

简介:5步学会「一键添加群应用」! 上期钉多多将Excel一键转应用后,大大提高了同学们的工作效率,于是小伙伴们纷纷用钉钉宜搭创建了各种各样的应用,那么新的问题产生了...... 每次提交数据都要切换到工作台找到对应的…