数据采集项目之业务数据(三)

1. Maxwell框架

开发公司为Zendesk公司开源,用java编写的MySQL变更数据抓取软件。内部是通过监控MySQL的Binlog日志,并将变更数据以JSON格式发送到Kafka等流处理平台。

1.1 MySQL主从复制

主机每次变更数据都会生成对应的Binlog日志,从机可以通过IO流的方式将Binlog日志下载到本地,可以通过它创造和主机一样的环境或者作为热备。

1.2 安装Maxwell

  1. 解压改名
  2. 启动MySQL Binlog, vim /etc/my.cnf. 增加如下配置:
    • binlog_format 日志类型的三种类型:
      • 基于语句:主机执行了什么语句,在从机里同样执行一遍。如果使用了random语句,会导致主从不一致。但是量级比较低
      • 基于行:主机被改动后,从机同步一份。不会有主从不一致的问题,但是量价比较大,需要将每行修改的数据都拿一份。
      • 混合模式:一般基于语句,但是如果基于语句会导致前后结果产生差异,自动转成基于行。
#数据库id
server-id = 1
#启动binlog,该参数的值会作为binlog的文件名
log-bin=mysql-bin
#binlog类型,maxwell要求为row类型
binlog_format=row
#启用binlog的数据库,需根据实际情况作出修改
binlog-do-db=gmall
  1. 重启MySQL服务
  2. 创建Maxwell所需所需的数据库和用户,用来存储断点续传所需的数据。
CREATE DATABASE maxwell;
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'maxwell';
GRANT ALL ON maxwell.* TO 'maxwell'@'%';//maxwell库的所有权限给maxwell
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';//其他库的查询、复制权限给maxwell
  1. 修改maxwell配置文件
    cp 配置文件,将会复制某个文件并且可以改名。
producer=kafka
# 目标Kafka集群地址
kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
#目标Kafka topic,可静态配置,例如:maxwell,也可动态配置,例如:%{database}_%{table}
kafka_topic=topic_db
# MySQL相关配置
host=hadoop102
user=maxwell
password=maxwell
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true# 过滤gmall中的z_log表数据,该表是日志数据的备份,无须采集
filter=exclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区,避免数据倾斜
producer_partition_by=primary_key

1.3 Maxwell的使用

  1. 启动zookeeper,kafka
  2. 启动maxwell, bin/maxwell --config config.properties --daemon
  3. 启动kafka消费者进程,用于消费maxwell添加到kafka的变更数据
  4. 启动数据生成jar包,查看消费者进程是否有新数据。
  5. 编写Maxwell启停脚本
#!/bin/bashMAXWELL_HOME=/opt/module/maxwellstatus_maxwell(){result=`ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -l`return $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho "启动Maxwell"$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho "Maxwell正在运行"fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho "停止Maxwell"ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk '{print $2}' | xargs kill -9elseecho "Maxwell未在运行"fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac

1.4 Bootstrap全量同步

Maxwell获取的数据都是后期变更的数据,但没有获取到数据库在开启Binlog日志之前的原始数据。

全量同步命令:/opt/module/maxwell/bin/maxwell-bootstrap --database gmall --table user_info --config /opt/module/maxwell/config.properties

2. 数仓数据同步策略

2.1 用户行为数据

数据源:Kafka
目的地:HDFS
传输方式采用Flume, 其中source为Kafka source, channel为Memmory channel, sink为HDFS sink。

根据官网查找相应参数:

  1. Kafka Source
    • type = Kafka Source全类名
    • kafka.bootstrap.servers 连接地址
    • kafka.topics = topic_log
    • batchSize: 批次大小
    • batchDurationMillis: 批次间隔2s
  2. File Channel
    • type: file
    • dataDirs: 存储路径
    • checkpointDir: 偏移量存储地址
    • keep-alive: 管道满了后,生产者间隔多少秒再放数据
  3. HDFS Sink
    • hdfs.rollInterval : 文件滚动,解决小文件问题,每隔多久滚动一次
    • rollSize: 文件大小
    • hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d, 文件存放路径
    • hdfs.round = false, 不采用系统本地时间
#定义组件
a1.sources=r1
a1.channels=c1
a1.sinks=k1#配置source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_log#配置channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6#配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log
a1.sinks.k1.hdfs.round = false # 是否获取本地时间a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0#控制输出文件类型
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = gzip#组装 
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.2 零点漂移问题

在HDFS系统存放文件时是按照时间进行分区存放的,存放时查看的是header中的timestamp,但是由于数据传输过程中也需要一段时间,header中的时间并不是数据的实际产生时间,这个就是零点漂移问题。

解决办法:借助拦截器,修改header中的timestamp的值。编写拦截器代码,需要在IDEA中创建对应的项目并打包。

  1. 导入依赖,flume-ng-core和JSON解析依赖fastjson (1.2.62)
  2. 创建包gmall.interceptor
  3. 创建类TimeStampInterceptor, 继承Interceptor接口
  4. 实现intercept(Event event)和intercept(Event events)
  5. 使用fastjson来解析json文件,得到jsonObject对象,用来获取时间戳ts。将获取到的时间戳覆盖header中的timestamp, 如果数据格式错误会抛异常,使用try-catch来捕获它,并过滤掉该条数据。注意此处不能使用for循环来一边遍历,一边删除集合数据
@Overridepublic Event intercept(Event event) {//1、获取header和body的数据Map<String, String> headers = event.getHeaders();String log = new String(event.getBody(), StandardCharsets.UTF_8);try {//2、将body的数据类型转成jsonObject类型(方便获取数据)JSONObject jsonObject = JSONObject.parseObject(log);//3、header中timestamp时间字段替换成日志生成的时间戳(解决数据漂移问题)String ts = jsonObject.getString("ts");headers.put("timestamp", ts);return event;} catch (Exception e) {e.printStackTrace();return null;}
@Override
public List<Event> intercept(List<Event> list) {Iterator<Event> iterator = list.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();//必须使用迭代器删除}}return list;
}
  1. 打包时注意要带上fastjson依赖,需要在maven中添加配置打包插件。依赖中有flume和fastjson,但在虚拟机上有flume,没有fastjson,所以需要排除flume。可以使用provided标签来排除让打包时排除依赖。

    • compile:在单元测试、编译、运行三种方式都会使用compile表明的依赖;
    • test:在单元测试才会使用test表明的依赖;
    • provided:在编译才会使用test表明的依赖;
  2. Flume配置文件中添加拦截器

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.gmall.flume.interceptor.TimestampInterceptor$Builder # 全类名建议在IDEA中复制,Builder也需要根据自己的代码函数名修改
  1. 重新生成数据,查看是否根据数据本身的时间戳存放到对应的HDFS分区文件中。

3. 业务数据同步

3.1 同步策略

  1. 全量同步:每天将所有数据同步一份,业务数据量小,优先考虑全量同步。
  2. 增量同步:每天只将新增和变化进行同步,业务数据量大,优先考虑增量同步。

3.2 数据同步工具

全量:DataX、Sqoop
增量:Maxwell、Canal

3.3 DataX

是一个数据同步工具,致力于实现包括关系型数据库HDFS、Hive、ODPS、HBase、MySQL等等数据源之间的互传。

  1. 架构= reader + framework + writer
  2. 运行流程
    • job: 单个数据同步的作业,会启动一个进程。
    • Task: 根据不同数据源的切分策略,一个Job会切分为多个Task,Task是DataX作业的最小单元,每个Task负责一部分,由一个线程执行。
  3. 调度策略:会根据系统资源设置并发度,并发度为线程同时执行的个数,任务会按照并发度一组一组执行。

3.4 DataX安装

  1. 下载解压DataX安装包
  2. bin/datax.py job/job.json测试安装包是否完整
  3. MySQL Reader配置文件的书写
  4. HDFS Writer配置文件的书写
  5. 执行datax命令python /opt/module/datax/bin/datax.py -p"-Dtargetdir=/origin_data/gmall/db/activity_info_full/2022-06-08" /opt/module/datax/job/import/gmall.activity_info.json
  6. 执行完后可以使用hadoop fs cat 路径名 | zcat,来查看压缩文件是否正确

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/99384.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

多路彩灯控制器LED流水灯花型verilog仿真图视频、源代码

名称&#xff1a;多路彩灯控制器LED流水灯花型verilog 软件&#xff1a;Quartus 语言&#xff1a;Verilog 代码功能&#xff1a; 用quartus和modelism&#xff0c;设计一个多路彩灯控制器&#xff0c;能够使花型循环变化&#xff0c;具有复位清零功能&#xff0c;并可以选择…

Flutter 打包 windows桌面端可执行文件

简单一说 因为个人兴趣爱好&#xff0c;在写一个跨平台工具。为了省事没去官网看文档&#xff0c;直接翻阅各大博客网站&#xff0c;一个简单的命令&#xff0c;博客写的内容比较复杂。为了方便自己和有需要同学&#xff0c;简单做一个记录。 Flutter提供了一种方便命令行的方…

后台开发核心技术与应用实践看书笔记(一):C++编程常用技术

C编程常用技术 第一个C程序函数函数模板 数组字符数组 指针概念数组与指针字符串与指针函数与指针 引用引用作为参数常引用 结构体&#xff0c;公用体&#xff0c;枚举共用体枚举结构体&#xff0c;共用体在内存单元占用字节数的计算 预处理常用宏定义命令do while(0)的妙用条件…

【gitlab】从其他仓库创建项目

需求描述 解决方法 以renren-fast脚手架为例 第一步 第二步 第三步 第四步 参考文章

【面试】pc寄存器题

目录 1.使用pc寄存器存储字节码指令地址有什么作用&#xff1f;&#xff08;为什么使用pc寄存器记录当前线程的执行地址&#xff1f;&#xff09;2.pc寄存器为什么被设定为线程私有的&#xff1f; 1.使用pc寄存器存储字节码指令地址有什么作用&#xff1f;&#xff08;为什么使…

应用层协议 HTTP

一、应用层协议 我们已经学过 TCP/IP , 已然知道数据能从客户端进程经过路径选择跨网络传送到服务器端进程。 我们还需要知道的是&#xff0c;我们把数据从 A 端传送到 B 端&#xff0c; TCP/IP 解决的是顺丰的功能&#xff0c;而两端还要对数据进行加工处理或者使用&#xf…

算法-二叉树

1. 二叉树定义 二叉树是一种常见的树状数据结构&#xff0c;它由节点组成&#xff0c;每个节点最多有两个子节点&#xff0c;通常称为左子节点和右子节点。二叉树的定义如下&#xff1a; 节点&#xff08;Node&#xff09;&#xff1a;每个节点包含一个数据元素&#xff08;通…

python常用库之数据库orm框架之SQLAlchemy

文章目录 python常用库之数据库orm框架之SQLAlchemy一、什么是SQLAlchemySQLAlchemy 使用场景 二、SQLAlchemy使用SQLAlchemy根据模型查询SQLAlchemy SQL 格式化的方式db_session.query和 db_session.execute区别实测demo 总结&#xff1a;让我们留意一下SQLAlchemy 的 lazy lo…

SpringMVC的视图

文章目录 1. ThymeleafView2. 转发视图3. 重定向视图4. 视图控制器view-controller5. 总结6. 荐书 SpringMVC中的视图是View接口&#xff0c;视图的作用渲染数据&#xff0c;将模型Model中的数据展示给用户SpringMVC视图的种类很多&#xff0c;默认有转发视图和重定向视图 当工…

二维反射容斥:P9366

https://www.luogu.com.cn/problem/P9366 构造循环矩阵&#xff0c;考虑反射容斥和将军饮马 考虑二维不太好做&#xff0c;我们曼哈顿距离转切比雪夫距离&#xff0c;变成一维的情况。 由于棋盘是正方形的&#xff0c;所以循环长度为 2 n 4 2n4 2n4。用多项式快速幂预处理&…

el-cascader

场景&#xff1a; el-cascader lazy multiple 反显数据 非lazy的场景 selecetedOptions2: [[1, 2, 3],[1, 2, 4], ],可以正常回显&#xff1b;> ok lazy场景下&#xff1a; 是不可以回显的… 如果el-cascader是异步的单选 cascader默认会加载下个层级的&#xff08;子…

Unity设计模式——原型模式

原型模式&#xff08;Prototype&#xff09;用原型实例指定创建对象的种类&#xff0c;并且通过拷贝这些原型创建新的对象。原型模式其实就是从一个对象再创建另外一个可定制的对象&#xff0c;而且不需知道任何创建的细节 。 原型类 Prototype&#xff1a; abstract class P…

十一、2023.10.5.计算机网络(end).11

文章目录 17、说说 TCP 可靠性保证&#xff1f;18、简述 TCP 滑动窗口以及重传机制?19、说说滑动窗口过小怎么办?20、说说如果三次握手时候每次握手信息对方没收到会怎么样&#xff0c;分情况介绍&#xff1f;21、简述 TCP 的 TIME_WAIT&#xff0c;为什么需要有这个状态&…

虚拟机机初始化配置

虚拟机信息 镜像&#xff1a;Centos7 模板机配置&#xff1a;4U4G 100G存储 /boot 800MB /swap 物理内存如果低于2G&#xff0c;设定为虚拟内存设定为物理内存2倍&#xff0c;物理内存高于2G设定为4G。 / 95.2G 虚拟机安装完成后配置网卡 [rootlocalho…

ctfshow-web5(md5弱比较)

打开题目链接是html和php代码 html没啥有用信息&#xff0c;这里审一下php代码 &#xff1a; 要求使用get方式传入两个参数 v1&#xff0c;v2 ctype_alpha()函数&#xff1a;用于检查给定的字符串是否仅包含字母&#xff1b; is_numeric()函数&#xff1a;检测字符串是否只由…

聊聊分布式架构05——[NIO基础]BIO到NIO的演进

目录 I/O I/O模型 BIO示例 BIO与NIO比较 NIO的三大核心 NIO核心之缓冲区 Buffer常用子类&#xff1a; Buffer常用API Buffer中的重要概念 NIO核心之通道 FileChannel 类 FileChannel常用方法 NIO核心之选择器 概述 应用 NIO非阻塞原理分析 服务端流程 客户端…

请求的转发和重定向

RequestDispatcher接口实现转发&#xff1a; jsp1上链接到Servlet&#xff0c;Servlet再转发&#xff08;关键在这里怎么实现转发&#xff1f;&#xff1f;&#xff09; 演示index.html页面---->Servlet1(转发到)------>Servlet2 实现转发流程 1.用HttpServletReques…

如何解决openal32.dll丢失,有什么办法解决

你第一次知道openal32.dll文件是在什么情况下&#xff0c;你了解过openal32.dll文件吗&#xff1f;如果电脑中openal32.dll丢失有什么办法可以解决&#xff0c;今天就教大家如何解决openal32.dll丢失&#xff0c;都有哪些办法可以解决openal32.dll丢失。 一&#xff0e;openal3…

Mac下docker安装MySQL8.0.34

学习并记录一下如何用docker部署MySQL 在Docker中搜索并下载MySQL8.0.x的最新版本 下载好后&#xff0c;在Images中就可以看到MySQL的镜像了 通过下面的命令也可以查看docker images启动镜像&#xff0c;使用下面的命令就可以启动镜像了docker run -itd --name mysql8.0.34 -…

2021年03月 Python(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

Python编程&#xff08;1~6级&#xff09;全部真题・点这里 一、单选题&#xff08;共25题&#xff0c;每题2分&#xff0c;共50分&#xff09; 第1题 对于字典infor {“name”:“tom”, “age”:13, “sex”:“male”}&#xff0c;删除"age":13键值对的操作正确的…