简介: 本文重点介绍大数据产品集通用解决方案,即大数据在线计算+离线计算一体化解决方案,并通过真实案例模拟来说明此通用解决方案在具体项目中是如何落地的。
概述
本方案重点要落地的业务是中央网信办网络安全应急指挥中心相关业务,由于需要处理的网络数据流量巨大,而且对实时和离线大数据计算分析均有要求,所以提供此在线离线一体化解决方案。
混合云某项目主要业务简介如下:
- 流量采集
流量采集技术是监控网络流量的关键技术之一,为流量分析提供数据来源。为了能够在复杂企业网中有效的分析网络流量。
- 网络探针
互联网探针(NET probe),侦听网络数据包的网络探针称为互联网探针。数据包捕获、过滤、分析都能在“互联网探针”上实现。
本文主要针对流量采集业务来构建场景。
适用场景
- 既要数据实时分析又要数据离线分析的业务。
- 数据量计算较大且分析实时性要求较高的业务。
- 需要备份长期大量数据且能支持查询的业务。
- 数据来源多种多样需要进行大量同步和处理的业务。
技术架构
首先来看看业务架构:由于数据量比较庞大,涉及产品众多,数据链路也比较复杂。
本实践方案基于业务架构图抽象,得出如下图所示的技术架构和主要流程,并依据此编写操作步骤:
从抽象的业务流程图中,可以看出在线计算和离线计算两条主链路,因此可通过本文大数据在线离线一体化解决方案来实施。
方案优势
- 支持离线EB量级数据计算,2w以上并发作业,支持灵活调度多任务并发。
- 实时计算延迟到秒级乃至毫秒级,单个作业吞吐量可达到百万级别。
- 支持冷热数据备份,降低运营成本。
- 支持无缝对接各类数据源进行分析和查询,10亿+数据聚合亚秒级响应。
方案实施
数据建模
业务数据量比较庞大,为了便于处理和分析,首先进行数仓建模,并进行数据分层处理,方便实施多维分析并提升整个系统查询效率,降低查询穿透率。
雪花模型建模
根据对流量采集业务分析,比较适合数据仓库常用建模方法——雪花模型。依据业务特征和雪花模型建模原则,完成数仓建模,具体如下:
从以上模型看出,存在两个事实表,分别描述采集机流量信息和采集机规则事件信息,另外三个维度表分别记录运营商,采集机和地域维度,方便后续的业务分析。
数据分层
离线和在线引擎的数据量非常大,如果直接对接BI做数据分析,会导致查询效率很低。因此需要进行数据分层,将海量的数据来源经过ETL,清洗,根据数据域和应用域抽取到ADS层,交给BI做分析,通用方法如下图:
针对当前业务,可以进行数据分层,具体如下:
数据分层后,数据建模完成,后续重点用于离线链路实施中。
离线链路实施
本例中离线链路主要满足的应用域客户场景有以下两个:
- 各个省每日的采集机事件总和统计线图。
- 各运营商每日采集机字节速率平均值看板 (可通过下拉列表分别查看每个运营商的情况)。
离线链路主要针对大量数据进行批量处理和分析,并进行冷热数据存储,实时性要求不高。离线链路实施主要以base这个产品为核心来开发和串联,数据链路情况如下,本文通过自建数据来模拟datahub数据源。
具体实施步骤如下:
- 创建odps项目
首先新建odps任务云账号和配额组,并创建odps项目。
- 创建base项目
创建base项目,创建工作空间,并绑定刚刚创建的odps项目。
- 离线计算实施
- 根据数据建模部分的设计,完成相关数据表创建。
由于要创建的表比较多,这里贴一个ods层事实表的ddl:
CREATE TABLE IF NOT EXISTS ods_wa_collector_flow_mpp (c_pcg INT COMMENT '网络',c_pc INT COMMENT '省',c_isp STRING COMMENT 'isp',c_iao INT COMMENT '出入口',c_ch BIGINT COMMENT '采集机',c_pps BIGINT COMMENT '包速率',c_bps BIGINT COMMENT '字节速率',c_time TIMESTAMP COMMENT '时间' ) ;
接着用同样的方法依次创建各个数据层的数据表。
然后创建维度表,ddl示例如下:
CREATE TABLE IF NOT EXISTS dim_province (c_pc INT COMMENT '省代码',c_name STRING COMMENT '省名称' ) ;
接着用同样的方法依次创建其他维度表。
b.通过数据加工,完成数据建模和数据分层
首先通过数据清洗操作,对贴源层数据进行处理,代码示例如下:
INSERT OVERWRITE TABLE dwd_wa_collector_flow_mpp SELECT c_pcg,c_pc,c_isp,c_iao,c_ch,c_pps,c_bps,c_time FROM ods_wa_collector_flow_mpp WHERE c_ch >= 0 AND c_pps >= 0 AND c_bps >= 0 ;
接下来针对dwd层的数据,完成数据汇聚,代码示例如下:
INSERT OVERWRITE TABLE dws_wa_union SELECT a.c_ch,c_pcg,c_pc,c_isp,c_iao,c_pps,c_bps,c_rule_id,c_events,a.c_time FROM (SELECT c_pcg,c_pc,c_isp,c_iao,c_ch,c_pps,c_bps,c_time FROM dwd_wa_collector_flow_mpp WHERE c_time = cast(to_char(getdate(),'yyyy-mm-dd 00:00:00') as timestamp) )a FULL OUTER JOIN (SELECT c_ch,c_rule_id,c_events,c_time FROM dwd_wa_collector_rule_event_mpp WHERE c_time = cast(to_char(getdate(),'yyyy-mm-dd 00:00:00') as timestamp) )b ON a.c_ch = b.c_ch;
接下来构造应用域的数据表,用于应用域的分析和查询,示例为统计各个省采集机事件总数表:
INSERT OVERWRITE TABLE ads_province_rule_event SELECT c_ch,c_pc,c_rule_id,c_events,c_time FROM dws_wa_union; SELECT * FROM ads_province_rule_event;
最后,通过离线同步将应用域的数据同步到交互引擎adb3.0中,如下:
完成后,通过base的工作流任务图将各个节点串联起来,点击run按钮即可触发实例运行,并生成应用域数据,用于后续分析和查询,具体任务图如下:
可以看到运行成功了,之后将该任务图提交到生产环境中,就可以做到每日自动生产数据用于生产分析了。
4.结果分析实施
最终产出的应用域数据,一般会离线同步到交互式引擎中用于查询分析,这里选择的交互式引擎是adb3.0。
a.配置数据源和数据集
数据源配置
数据集配置
可以看到,我们在数据集里面配置了维表和事实表的关联。
b.生成仪表板图
基于配置的数据集,通过简单的配置便可以得出:各个省每日的采集机事件总和线图,各运营商每日采集机字节速率平均值看板。
客户可以通过仪表板的数据,分析采集机网络流量情况,以上就是离线链路总体实施情况。
实时链路实施
本例中实时链路主要满足的应用域客户场景如下:
每日实时采集机事件总和统计
区别于离线链路,实时链路重点满足客户对于信息处理分析的高时效性和可操作性的要求,例如客户就想看分钟级别的数据波动和数据大盘变化,方便及时做出决策,此时就需要实时计算来满足需求,实时计算链路大致如下图所示:
- 实时计算数据构造实时计算数据量相对离线计算要小一些,在本例中就不做复杂的数据建模了。
- datahub实时数据生成
由于是实时链路,本文采用组内ase工具来持续生产实时数据传入datahub,并让flink订阅datahub的数据,进行实时计算,ase会自动创建一个datahub的topic(ase_dr_datahub_topic01)用于传输数据。如下图所示,ase_dr_datahub_topic01在持续的接收实时数据。 接下来需要再创建一个topic(ase_dr_datahub_topic02),用于接收flink处理后的数据。 - datahub实时数据订阅
datahub的topic(ase_dr_datahub_topic01)建好并接收数据后,需要创建订阅来让其他应用实时获取datahub的数据。
- 实时计算实施接收到实时数据后,flink需要实时对数据进行计算处理,具体实施步骤如下:
- 创建flink作业
进入realtime compute flink产品,新建实时计算作业。 - 实时计算
通过flink对源数据进行计算处理,本例主要是完成数据清洗操作,相关代码如下:
create TEMPORARY TABLE datahub_source ( c_ch BIGINT COMMENT '采集机', c_rule_id BIGINT COMMENT '规则id', c_events BIGINT COMMENT '事件数', c_time BIGINT COMMENT '时间' ) with ( 'connector' = 'datahub', 'endpoint' = 'https://datahub.xxxx.com', 'project' = 'ase_datahub_pj_61232508463140', 'topic' = 'ase_dr_datahub_topic01', 'accessId' = 'xxxx', 'accessKey' = 'xxxx', 'subId' ='xxxx' ); CREATE TEMPORARY TABLE datahub_des ( c_ch BIGINT COMMENT '采集机', c_rule_id BIGINT COMMENT '规则id', c_events BIGINT COMMENT '事件数', c_time BIGINT COMMENT '时间' ) with ( 'connector' = 'datahub', 'endpoint' = 'https://datahub.xxxx.com', 'project' = 'ase_datahub_pj_61232508463140', 'topic' = 'ase_dr_datahub_topic02', 'accessId' = 'xxxx', 'accessKey' = 'xxxx' ); insert into odps_des select * from datahub_source where c_ch != 0 and c_events >= 0;
接下来在ase_dr_datahub_topic02中创建同步任务,将实时计算的结果数据同步到ADB3.0中。
看到对应的交互引擎ADB3.0相应的表中存在数据,即说明同步成功。
3.结果分析实施
实时计算链路采用datav进行实时大屏展示和分析,进入datav产品界面。
- 添加数据源
输入基本信息后,完成数据源的添加和数据集的创建。
数据源添加:
数据集创建: - 创建并配置应用大屏分析器
通过创建和配置应用大屏分析器,可以绑定实时数据源,并配置大屏展现内容和形式。 - 切换到普通大屏模式,观察计算结果的实时变化
在分析模式下完成数据源和展示内容配置后,切换到普通大屏模式进行展示配置,并设置实时刷新间隔。 接下来切换到预览模式下,就可以看到结果实时变化了,调试没问题后,点击发布即可。
以上就是大数据在线和离线两条最通用的计算链路,在实际客户场景下具体应用落地。
原文链接
本文为阿里云原创内容,未经允许不得转载。