Canal原理及其使用

1 什么是canal

  canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。(数据库同步需要阿里的otter中间件,基于canal)

2 canal使用场景

  (1)阿里otter(阿里用于进行异地数据库之间的同步框架)中间件的一部分,这是原始场景

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4rpvVYhg-1612757904181)(assets/1610588516142.png)]

  (2)更新缓存:如果有大量的请求发送到mysql的话,mysql查询速度慢,QPS上不去,光查mysql可能会瘫痪,那就可以在前面加个缓存,这个缓存有2个主要的问题。一是缓存没有怎么办,二是数据不一致怎么办。对于第一个问题查缓存没有就差mysql,mysql再往缓存中写一份。对于第二个问题,如果数据库修改了,那就采用异步的方式进行修改,启动一个canal服务,监控mysql,只要一有变化就同步缓存,这样mysql和缓存就能达到最终的一致性。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P4MRL3pv-1612757904184)(assets/1610588585694.png)]

  (3)抓取业务数据新增变化表,用于制作拉链表:做拉链表是需要有增加时间和修改时间的,需要数据今天新增和变化的数据,如果时间不全就没办法知道哪些是修改的。可以通过canal把变化的抽到自己的表里,以后数据就从这个表出。

  (4)取业务表的新增变化数据,用于制作实时统计

3 canal工作原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AYNMkES5-1612757904186)(assets/1610588660766.png)]

  首先了解一下mysql主备复制原理:

  (1)master主库将改变记录,发送到二进制文件(binary log)中

  (2)slave从库向mysql Master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

  (3)slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

  canal的工作原理:把自己伪装成slave,从master复制数据。读取binlog是需要master授权的,因为binlog是加密的,授权分用户名密码才能读。master授权后不知道读他的binlog的是从机还是canal,他的所有传输协议都符合从机的标准,所以master一直以为是从机读的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Dsi3tRlj-1612757904195)(assets/1610590410151.png)]

4 mysql的binlog

4.1 二进制日志

  mysql的二进制日志记录了所有的DDL和DML(除了数据查询语句),以事件的形式进行记录,包含语句执行消耗的时间,mysql的二进制日志是事务安全型的。

  开启二进制日志大概会有1%的性能损坏。二进制日志有2个主要的使用场景:①mysql的主备复制②数据恢复,通过使用mysqlbinlog工具来恢复数据(用这个做恢复是备选方案,主方案还是定期快照,定期执行脚本导数据,其实就是把当前所有数据导成insert,这个量少)

  二进制日志包括2类文件:①二进制日志索引文件(后缀为.index)用于记录所有的二进制文件②二进制日志文件(后缀为.00000*)记录数据库所有的DDL和DML(除了数据查询语句)

4.2 开启binlog

  修改mysql的配置文件my.cnf。

# vim /etc/my.cnfgG

  在[mysqld] 区块 添加

log-bin=mysql-bin

  mysql-bin表示binlog日志的前缀,以后生成的的日志文件就是 mysql-bin.000001 的文件后面的数字按顺序生成。 当mysql重启或到达单个文件大小的阈值时,新生一个文件,按顺序编号。

4.3 binlog分类

  binlog的格式有三种:STATEMENT,MIXED,ROW对比如下

格式描述优点
STATEMENT语句级别,记录每一次执行写操作的语句,相对于ROW模式节省了空间,但是可能产生数据不一致如update tt set create_date=now(),由于执行时间不同产生饿得数据就不同节省空间可能造成数据不一致
ROW行级,记录每次操作后每行记录的变化。假如一个update的sql执行结果是1万行statement只存一条,如果是row的话会把这个1000行的结果存这。持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录执行后的效果占用较大空间
MIXED是对statement的升级,如当函数中包含 UUID() 时,包含 AUTO_INCREMENT 字段的表被更新时,执行 INSERT DELAYED 语句时,用 UDF 时,会按照 ROW的方式进行处理节省空间,同时兼顾了一定的一致性还有些极个别情况依旧会造成不一致,另外statement和mixed对于需要对binlog的监控的情况都不方便

4.4 binlog格式选择

  如果只考虑主从复制的话可以用mixed,一般情况下使用statement,遇到几种特殊情况使用row,同步的话有SQL就行,因为手里有数据,前提是有数据才能执行这个SQL。在大数据场景下我们抽取数据是用于统计分析,分析的数据,如果用statement抽了SQL手里也没数据,不知道执行修改哪些,因为没有数据,所以没办法分析,所以适合用row,清清楚楚的表明了每一行是什么样。

4.5 修改配置文件

  修改my.cnf文件,在[mysqld]模块下添加如下内容

server-id= 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=bigdata

  binlog-do-db用于指定库,缩小监控的范围,server-id不能和mysql集群的其他节点重复

4.6 重启mysql

# service mysqld restart
Redirecting to /bin/systemctl restart mysqld.service

  到数据目录下查询是否生成binlog文件,这里我把数据目录自定义为了/data/mysql/

# cd /data/mysql/
# ll
total 188500
-rw-r----- 1 mysql mysql       56 Jul  1  2020 auto.cnf
-rw------- 1 mysql mysql     1676 Jul  1  2020 ca-key.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 ca.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 client-cert.pem
-rw------- 1 mysql mysql     1676 Jul  1  2020 client-key.pem
drwxr-x--- 2 mysql mysql     4096 Jul  1  2020 dataxweb
-rw-r----- 1 mysql mysql      526 Jan 14 11:03 ib_buffer_pool
-rw-r----- 1 mysql mysql 79691776 Jan 14 11:04 ibdata1
-rw-r----- 1 mysql mysql 50331648 Jan 14 11:04 ib_logfile0
-rw-r----- 1 mysql mysql 50331648 Aug  5 06:20 ib_logfile1
-rw-r----- 1 mysql mysql 12582912 Jan 14 11:04 ibtmp1
drwxr-x--- 2 mysql mysql      116 Jul  1  2020 iot
drwxr-x--- 2 mysql mysql     4096 Jul  1  2020 mysql
-rw-r----- 1 mysql mysql      154 Jan 14 11:03 mysql-bin.000001
-rw-r----- 1 mysql mysql       19 Jan 14 11:03 mysql-bin.index
srwxrwxrwx 1 mysql mysql        0 Jan 14 11:03 mysql.sock
-rw------- 1 mysql mysql        6 Jan 14 11:03 mysql.sock.lock
drwxr-x--- 2 mysql mysql     8192 Jul  1  2020 performance_schema
-rw------- 1 mysql mysql     1680 Jul  1  2020 private_key.pem
-rw-r--r-- 1 mysql mysql      452 Jul  1  2020 public_key.pem
-rw-r--r-- 1 mysql mysql     1112 Jul  1  2020 server-cert.pem
-rw------- 1 mysql mysql     1676 Jul  1  2020 server-key.pem
drwxr-x--- 2 mysql mysql     8192 Jul  1  2020 sys

  可以发现,这二进制日志索引文件和日志文件生成了。只要重启mysql,mysql-bin后面的序号就会往上涨,他的切分规则就是重启或者到一个大小的阈值,就会切一个

mysql-bin.000001
mysql-bin.index

5 安装canal

5.1 下载地址

https://github.com/alibaba/canal/releases

5.2 mysql为canal配置权限

  在mysql中给canal单独建一个用户,给全库全表的读,拷贝,复制的权限

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

  报错:ERROR 1819 (HY000): Your password does not satisfy the current policy requirements

原因是因为密码设置的过于简单会报错,MySQL有密码设置的规范,具体是与validate_password_policy的值有关,下图表明该值规则

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3xmcIHaU-1612757904199)(assets/1610595557211.png)]

  查看MySQL完整的初始密码规则,登陆后执行以下命令

mysql> SHOW VARIABLES LIKE 'validate_password%';
+--------------------------------------+--------+
| Variable_name                        | Value  |
+--------------------------------------+--------+
| validate_password_check_user_name    | OFF    |
| validate_password_dictionary_file    |        |
| validate_password_length             | 8      |
| validate_password_mixed_case_count   | 1      |
| validate_password_number_count       | 1      |
| validate_password_policy             | MEDIUM |
| validate_password_special_char_count | 1      |
+--------------------------------------+--------+

 密码的长度是由validate_password_length决定的,但是可以通过以下命令修改

set global validate_password_length=4;

  validate_password_policy决定密码的验证策略,默认等级为MEDIUM(中等),可通过以下命令修改为LOW(低)

set global validate_password_policy=0;

  重新执行

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal' ;

5.3 解压及配置

$ tar -zxvf canal.deployer-1.1.4.tar.gz

  配置说明:canal server的conf下有几个配置文件

conf/
├── canal_local.properties
├── canal.properties
├── example
│   ├── h2.mv.db
│   ├── instance.properties
│   └── meta.dat
├── logback.xml
├── metrics
│   └── Canal_instances_tmpl.json
└── spring├── base-instance.xml├── default-instance.xml├── file-instance.xml├── group-instance.xml├── memory-instance.xml└── tsdb├── h2-tsdb.xml├── mysql-tsdb.xml├── sql│   └── create_table.sql└── sql-map├── sqlmap-config.xml├── sqlmap_history.xml└── sqlmap_snapshot.xml

canal.properties的common属性前四个配置项:

canal.id= 1             #canal的编号,在集群环境下,不同canal的id不同,注意它和mysql的server_id不同。
canal.ip=               # ip这里不指定,默认为本机
canal.port= 11111       # 端口号,是给tcp模式(netty)时候用的,如果用了kafka或者rocketmq,就不会去起这个端口了
canal.zkServers=         # zk用于canal cluster
canal.serverMode = tcp   # 用于指定什么模式拉取数据

destinations相关的配置:

#################################################
#########       destinations        ############# 
#################################################
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5canal.instance.global.mode = spring 
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/file-instance.xml

canal.destinations = example可以设置多个,比如example1,example2,则需要创建对应的两个文件夹,并且每个文件夹下都有一个instance.properties文件。

全局的canal实例管理用spring,这里的file-instance.xml最终会实例化所有的destinations instances:

        <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"><property name="destination" value="${canal.instance.destination}" /><property name="eventParser"><ref local="eventParser" /></property><property name="eventSink"><ref local="eventSink" /></property><property name="eventStore"><ref local="eventStore" /></property><property name="metaManager"><ref local="metaManager" /></property><property name="alarmHandler"><ref local="alarmHandler" /></property><property name="mqConfig"><ref local="mqConfig" /></property></bean>

如canal.instance.destination等于example,就会加载example/instance.properties配置文件

修改instance 配置文件

vi conf/example/instance.properties

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=10.0.165.1:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#################################################

6 canal的instance与消费方式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wQ7Won6T-1612757904202)(assets/1610607844057.png)]

  canal.properties这个配置文件负责的是canal服务的基础配置,每个canal可以起n多个实例instance,一个instance代表一个线程,每个instance都有一个独立的配置文件instance.properties,不同的instance可以采集不同的mysql数据库,也就是一个canal可以对应多个mysql数据库。

  在instance里面有一个小队列,可以理解为是jvm级的队列,instance抓取来的数据先放入到队列中,队列可以有很多出口:①一个是canal server自己主动把数据推送到kafka,这个比较简单,一行代码不用写,只需要配个kafka的地址,每个instance对应kafka的一个topic,数据是json串。这种方式虽然简单,但是他的缺点主要体现在2个方面,一个instance对应一个topic,所有表都在这一个topic,所以实时的时候要进行分流。另一方面,因为数据是json,并且携带了很多冗余信息,但是数据量大的时候传输效率比较低。②第二种方式是启动canal客户端主动去拉取数据,可以定义多长周期消费多少数据。他的缺点在于抓取出来的是序列化压缩的数据,所以需要反序列化,解压,比较麻烦。他的优点在于我们可以进行压缩,过滤掉没用的冗余信息,只保留我们需要的信息,提交传输效率。

$ ll
total 16
-rwxrwxr-x 1 canal canal  291 Sep  2  2019 canal_local.properties
-rwxrwxr-x 1 canal canal 5202 Jan 14 12:10 canal.properties
drwxrwxr-x 2 canal canal   33 Jan 14 12:15 example
-rwxrwxr-x 1 canal canal 3119 Sep  2  2019 logback.xml
drwxrwxr-x 2 canal canal   39 Jan 14 12:00 metrics
drwxrwxr-x 3 canal canal  149 Jan 14 12:00 spring

  一个example的目录就是一个instance,canal要配置多个实例采集多个数据源mysql的话如下配置,然后把conf目录下example复制多份,分别重命名。如下

#################################################
#########               destinations            #############
#################################################
canal.destinations = example1,example2,example3

7 canal server主动推送数据

7.1 配置

  修改配置vim conf/canal.properties:这个是总配置,端口号,服务器参数,kafka地址,zookeeper地址(高可用)等

  修改如下内容,这个zookeeper是配置高可用的,配置采用kafka方式,kafka的地址

canal.zkServers = 10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181
canal.serverMode = kafka
canal.mq.servers = 10.0.165.8:9092,10.0.165.9:9092

  修改配置vim conf/example/instance.properties针对要追踪的mysql的实例配置:一个instance实例对应一个数据库(这个是指数据库服务器)服务器的binlog。所以一个instance具体采集几个数据库是binlog定的和canal没关系,canal不管,canal就把binlog里面有什么就采集,不管是一个数据库还是多个,只要在一个binlog都采集

  修改如下内容,配置用户名,密码,地址。canal.mq.partitionsNum这个是发送到第几个分区

canal.instance.master.address=10.0.165.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.mq.topic=ods_bigdata_mysql

7.2 启动canal

./bin/startup.sh

7.3 测试

  启动canal后,在kafka创建topic

bin/kafka-topics.sh --create --zookeeper 10.0.165.4:2181 --replication-factor 2 --partitions 12 --topic ods_bigdata_mysql

  到kafka目录下开销消费端查询是否有数据

bin/kafka-console-consumer.sh --bootstrap-server  10.0.165.8:9092,10.0.165.9:9092 --topic  ods_bigdata_mysql

  (1)往需要采集的库中的user_info表插入一条数据数据

  执行sql

insert  into user_info values (10001,'test','test',NULL,'test','11111111111','111@gmail.com',NULL,'3','1999-09-09','F','2020-02-02 02:02:02',NULL)

  可以看到kafka消费出了如下一条数据

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"test","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610676724000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610676724288,"type":"INSERT"}

  (2)更新前面插入的这条数据

  执行sql

UPDATE user_info SET name="update" WHERE id=10001

  kafka消费出的数据如下

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"update","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610676928000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":[{"name":"test"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610676928644,"type":"UPDATE"}

  (3)删除前面插入的这条数据

  执行sql

DELETE FROM user_info WHERE id=10001

  kafka消费出的数据如下

{"data":[{"id":"10001","login_name":"test","nick_name":"test","passwd":null,"name":"update","phone_num":"11111111111","email":"111@gmail.com","head_img":null,"user_level":"3","birthday":"1999-09-09","gender":"F","create_time":"2020-02-02 02:02:02","operate_time":null}],"database":"bigdata","es":1610677003000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","login_name":"varchar(200)","nick_name":"varchar(200)","passwd":"varchar(200)","name":"varchar(200)","phone_num":"varchar(200)","email":"varchar(200)","head_img":"varchar(200)","user_level":"varchar(200)","birthday":"date","gender":"varchar(1)","create_time":"datetime","operate_time":"datetime"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"login_name":12,"nick_name":12,"passwd":12,"name":12,"phone_num":12,"email":12,"head_img":12,"user_level":12,"birthday":91,"gender":12,"create_time":93,"operate_time":93},"table":"user_info","ts":1610677003637,"type":"DELETE"}

8 canal主动拉取数据客户端

8.1 修改配置

  修改canal.properties,zookeeper配置高可用,配置采用tcp方式

canal.zkServers = 10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181
canal.serverMode = tcp

  注意:需要修改canal.proerties的canal.serverMode为tcp否则不会启动11111端口

  修改instance.properties,配置用户名,密码,地址。

canal.instance.master.address=10.0.165.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

  重新启动后查看11111端口是否被占用

[canal@fbi-local-02 bin]$ lsof -i:11111
COMMAND   PID  USER   FD   TYPE  DEVICE SIZE/OFF NODE NAME
java    38516 canal  108u  IPv4 4281763      0t0  TCP fbi-local-02:vce (LISTEN)

8.2 将binlog转换为ProtoBuf消息

  (1)编写proto描述文件CanalBinLog.proto

syntax = "proto3";
option java_package = "com.quinto.canal";
option java_outer_classname = "CanalBinLog";/* 行数据 */
message RowData {uint64 executeTime = 1;string schemaName = 2;string tableName = 3;string eventType = 4;/* 列数据 */map<string, string> columns = 5;uint64 logfileoffset = 14;string logfilename = 15;
}

  (2)canal客户端代码编写

  导入依赖

    <properties><protobuf.version>3.5.0</protobuf.version><kafka.client.version>1.0.0</kafka.client.version><kafka.version>0.11.0.2</kafka.version><canal.version>1.1.4</canal.version></properties><dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>${canal.version}</version></dependency><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>${protobuf.version}</version></dependency><!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>${kafka.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka.client.version}</version></dependency></dependency>

  ①工具类

  读取配置文件工具类

package com.quinto.utils;import java.io.IOException;
import java.util.Properties;/*** 读取config.properties配置文件的工具类*/
public class ConfigUtil {// 定义一个properties对象public static Properties properties;// 定义一个静态代码块,只执行一次static {try {properties = new Properties();properties.load(ConfigUtil.class.getClassLoader().getResourceAsStream("config.properties"));} catch (IOException e) {e.printStackTrace();}}public static String canalServerIp() {return properties.getProperty("canal.server.ip");}public static int canalServerPort() {return Integer.parseInt(properties.getProperty("canal.server.port"));}public static String canalServerDestination() {return properties.getProperty("canal.server.destination");}public static String canalServerUsername() {return properties.getProperty("canal.server.username");}public static String canalServerPassword() {return properties.getProperty("canal.server.password");}public static String canalSubscribeFilter() {return properties.getProperty("canal.subscribe.filter");}public static String zookeeperServerIp() {return properties.getProperty("zookeeper.server.ip");}public static String kafkaBootstrap_servers_config() {return properties.getProperty("kafka.bootstrap_servers_config");}public static int kafkaBatch_size() {return Integer.parseInt(properties.getProperty("kafka.batch_size"));}public static String kafkaAcks() {return properties.getProperty("kafka.acks");}public static String kafkaRetries() {return properties.getProperty("kafka.retries");}public static String kafkaBatch() {return properties.getProperty("kafka.batch");}public static String kafkaClient_id_config() {return properties.getProperty("kafka.client_id_config");}public static String kafkaKey_serializer_class_config() {return properties.getProperty("kafka.key_serializer_class_config");}public static String kafkaValue_serializer_class_config() {return properties.getProperty("kafka.value_serializer_class_config");}public static String kafkaTopic() {return properties.getProperty("kafka.topic");}public static void main(String[] args) {System.out.println(kafkaTopic());}
}

  配置文件

# canal配置
canal.server.ip=10.0.165.2
canal.server.port=11111
canal.server.destination=example
canal.server.username=canal
canal.server.password=canal
canal.subscribe.filter=bigdata.*# zookeeper配置
zookeeper.server.ip=10.0.165.4:2181,10.0.165.5:2181,10.0.165.6:2181# kafka配置
# kafka集群地址
kafka.bootstrap_servers_config=10.0.165.8:9092,10.0.165.9:9092
# 配置批次发送数据的大小,满足批次大小才会发送数据
kafka.batch_size= 10240
# ack
kafka.acks=all
# 重试次数
kafka.retries=2
kafka.client_id_config=quinto_canal
# kafka的key序列化
kafka.key_serializer_class_config=org.apache.kafka.common.serialization.StringSerializer
# kafka的value序列化,自定义开发
kafka.value_serializer_class_config=com.quinto.protobuf.ProtoBufSerializer
# 数据写入到kafka的哪个topic中
kafka.topic=ods_canal_mysql

  kafka工具类

package com.quinto.utils;import com.quinto.bean.CanalRowData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;/*** kafka工具类*/
public class KafkaUtil {public static KafkaProducer getKafkaProducer(){// 定义一个properties对象接收参数Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, ConfigUtil.kafkaBootstrap_servers_config());properties.put(ProducerConfig.BATCH_SIZE_CONFIG, ConfigUtil.kafkaBatch_size());properties.put(ProducerConfig.ACKS_CONFIG, ConfigUtil.kafkaAcks());properties.put(ProducerConfig.RETRIES_CONFIG, ConfigUtil.kafkaRetries());properties.put(ProducerConfig.CLIENT_ID_CONFIG, ConfigUtil.kafkaClient_id_config());properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaKey_serializer_class_config());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ConfigUtil.kafkaValue_serializer_class_config());//实例化生产者对象并返回,key使用默认的String序列化范式,value采用自定义的序列化方式,这个序列化需要传递一个Protobufable的子类return new KafkaProducer<String, CanalRowData>(properties);}/*** 传递参数,将数据写入到kafka集群* @param rowData*/public static void send(KafkaProducer kafkaProducer,CanalRowData rowData){kafkaProducer.send(new ProducerRecord<>(ConfigUtil.kafkaTopic(), rowData));}
}

  ②自定义kafka序列化类

package com.quinto.protobuf;import org.apache.kafka.common.serialization.Serializer;import java.util.Map;/*** 实现kafka的value的自定义序列化对象* 要求传递的泛型必须是集成ProtoBufabl接口的实现列,才可以被序列化成功*/
public class ProtoBufSerializer implements Serializer<ProtoBufable> {@Overridepublic void configure(Map<String, ?> map, boolean b) {}@Overridepublic byte[] serialize(String s, ProtoBufable data) {return data.toBytes();}@Overridepublic void close() {}
}

  ③protobuf序列化接口,所有能够使用protobuf序列化的bean都需要集成这个接口

package com.quinto.protobuf;/*** 定义protobuf序列化接口,返回的是byte[]二进制对象,所有能够使用protobuf序列化的bean都需要集成这个接口*/
public interface ProtoBufable {/*** 将对象转换成二进制数组* @return*/byte[] toBytes();
}

  ④canal数据的Protobuf的实现类

package com.quinto.bean;import com.quinto.protobuf.CanalBinLog;
import com.quinto.protobuf.ProtoBufable;import java.util.Map;/*** canal数据的Protobuf的实现类,使用protobuf序列化成bean对象。* 用于将binlog解析后的map对象转换成protobuf序列化后的字节码数据,写入kafka集群*/
public class CanalRowData implements ProtoBufable {private String logfileName;private Long logfileOffset;private Long executeTime;private String schemaName;private String tableName;private String eventType;private Map<String, String> columns;public String getLogfileName() {return logfileName;}public void setLogfileName(String logfileName) {this.logfileName = logfileName;}public Long getLogfileOffset() {return logfileOffset;}public void setLogfileOffset(Long logfileOffset) {this.logfileOffset = logfileOffset;}public Long getExecuteTime() {return executeTime;}public void setExecuteTime(Long executeTime) {this.executeTime = executeTime;}public String getSchemaName() {return schemaName;}public void setSchemaName(String schemaName) {this.schemaName = schemaName;}public String getTableName() {return tableName;}public void setTableName(String tableName) {this.tableName = tableName;}public String getEventType() {return eventType;}public void setEventType(String eventType) {this.eventType = eventType;}public Map<String, String> getColumns() {return columns;}public void setColumns(Map<String, String> columns) {this.columns = columns;}/*** 构造方法中解析map对象的binlog日志*/public CanalRowData(Map map){//解析map对象中所有的参数if(map.size()>0){this.logfileName = map.get("logfileName").toString();this.logfileOffset = Long.parseLong(map.get("logfileOffset").toString());this.executeTime = Long.parseLong(map.get("executeTime").toString());this.schemaName = map.get("schemaName").toString();this.tableName = map.get("tableName").toString();this.eventType = map.get("eventType").toString();this.columns = (Map<String, String>)map.get("columns");}}/*** 将map对象解析出来的参数,赋值给protobuf对象,然后序列化后字节码返回* @return*/@Overridepublic byte[] toBytes() {CanalBinLog.RowData.Builder builder = CanalBinLog.RowData.newBuilder();builder.setLogfileName(this.getLogfileName());builder.setLogfileOffset(this.getLogfileOffset());builder.setExecuteTime(this.getExecuteTime());builder.setSchemaName(this.getSchemaName());builder.setTableName(this.getTableName());builder.setEventType(this.getEventType());for (String key : this.getColumns().keySet()) {builder.putColumns(key, this.getColumns().get(key));}//将传递的binlog数据解析后序列化成字节码数据返回return builder.build().toByteArray();}
}

  ⑤canal客户端类

package com.quinto.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.quinto.bean.CanalRowData;
import com.quinto.utils.ConfigUtil;
import com.quinto.utils.KafkaUtil;
import org.apache.kafka.clients.producer.KafkaProducer;import java.util.HashMap;
import java.util.List;
import java.util.Map;public class CanalClient {// Canal客户端连接器private CanalConnector canalConnector;// kafka生产者工具类private KafkaProducer kafkaProducer;public CanalClient(){// 在构造方法中初始化连接与kafka工具类kafkaProducer = KafkaUtil.getKafkaProducer();}public void statrt() {// 1 创建连接并建立连接,连接的是高可用集群System.out.println(ConfigUtil.zookeeperServerIp()+ConfigUtil.canalServerDestination()+ConfigUtil.canalServerUsername()+ConfigUtil.canalServerPassword());canalConnector = CanalConnectors.newClusterConnector("10.0.165.4:2181","example", "canal", "canal");// 不停拉取的标识boolean isFetching = true;// 建立连接try {canalConnector.connect();// 回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有 ack 的地方开始拿canalConnector.rollback();// 2 订阅主题canalConnector.subscribe(ConfigUtil.canalSubscribeFilter());// 不停的拉取数据while (isFetching){// 3 获取数据,尝试拿batchSize条记录,有多少取多少,不会阻塞等待Message message = canalConnector.getWithoutAck(ConfigUtil.kafkaBatch_size());// 获取这个批次的idlong batchId = message.getId();// 获取拉取到的日志数据总数int size = message.getEntries().size();// 判断是否又获取到数据if (batchId == -1 | size == 0){System.out.println("没有抓取到数据");Thread.sleep(1000);}else {System.out.println("发送数据:"+ message);// 将binlog日志解析成Map对象Map map = binlogToMap(message);// 将map对象序列化成protobuf格式写入到kafka中CanalRowData canalRowData = new CanalRowData(map);// 有数据将数据发送到kafka集群if(map.size()>0){KafkaUtil.send(kafkaProducer,canalRowData);}}// 4 提交确认// 提交确认,进行batch id的确认,确认之后,小于等于此 batchId 的 Message 都会被确认。canalConnector.ack(batchId);}} catch (Exception e) {e.printStackTrace();} finally {// 5 关闭连接canalConnector.disconnect();}}private Map binlogToMap(Message message) throws InvalidProtocolBufferException {Map rowDataMap = new HashMap();// 构建CanalClient.RowData实体
//        CanalBinLog.RowData.Builder builder = CanalBinLog.RowData.newBuilder();// 遍历message中的所有binlog实体for (CanalEntry.Entry entry: message.getEntries()){// 只处理事务型的binlogif(entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND){continue;}// 获取文件名String logfileName = entry.getHeader().getLogfileName();// 获取logfile的偏移量long logfileOffset = entry.getHeader().getLogfileOffset();// 获取sql语句的执行时间戳long executeTime = entry.getHeader().getExecuteTime();// 获取数据库名称String schemaName = entry.getHeader().getSchemaName();// 获取表名String tableName = entry.getHeader().getTableName();// 获取事件类型 insert/update/deleteString eventType = entry.getEntryType().toString().toLowerCase();rowDataMap.put("logfileName", logfileName);rowDataMap.put("logfileOffset", logfileOffset);rowDataMap.put("executeTime", executeTime);rowDataMap.put("schemaName", schemaName);rowDataMap.put("tableName", tableName);rowDataMap.put("eventType", eventType);// 封装列数据HashMap<String, String> columnDataMap = new HashMap<>();// 获取所有行上的变更CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();for (CanalEntry.RowData rowData : rowDatasList){if(eventType.equals("insert") || eventType.equals("update")){for (CanalEntry.Column column : rowData.getAfterColumnsList()){columnDataMap.put(column.getName(), column.getValue());}}else if(eventType.equals("delete")) {for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {columnDataMap.put(column.getName(), column.getValue());}}}rowDataMap.put("columns",columnDataMap);}return rowDataMap;}
}

  ⑥入口类

package com.quinto;import com.quinto.canal.CanalClient;public class App {public static void main(String[] args) {// 实例化canal客户端对象,调用start方法拉取canalserver的binlog日志发送到kafka集群CanalClient canalClient = new CanalClient();canalClient.statrt();}
}

8.3 测试

  (1)往user_info表插入一条数据

insert  into user_info values (10001,'test','test',NULL,'test','11111111111','111@gmail.com',NULL,'3','1999-09-09','F','2020-02-02 02:02:02',NULL)

  canal client从canal server拉取到的数据如下

发送数据:Message[id=8,entries=[header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615211serverId: 1serverenCode: "UTF-8"executeTime: 1610943288000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " `"
, header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615374serverId: 1serverenCode: "UTF-8"executeTime: 1610943288000sourceType: MYSQLschemaName: "bigdata"tableName: "user_info"eventLength: 105eventType: INSERTprops {key: "rowsCount"value: "1"}
}
entryType: ROWDATA
storeValue: "\b\210\001\020\001P\000b\241\004\022*\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0010\000B\00510001R\nbigint(20)\022*\b\001\020\f\032\nlogin_name \000(\0010\000B\004testR\fvarchar(200)\022)\b\002\020\f\032\tnick_name \000(\0010\000B\004testR\fvarchar(200)\022 \b\003\020\f\032\006passwd \000(\0010\001R\fvarchar(200)\022$\b\004\020\f\032\004name \000(\0010\000B\004testR\fvarchar(200)\0220\b\005\020\f\032\tphone_num \000(\0010\000B\v11111111111R\fvarchar(200)\022.\b\006\020\f\032\005email \000(\0010\000B\r111@gmail.comR\fvarchar(200)\022\"\b\a\020\f\032\bhead_img \000(\0010\001R\fvarchar(200)\022\'\b\b\020\f\032\nuser_level \000(\0010\000B\0013R\fvarchar(200)\022&\b\t\020[\032\bbirthday \000(\0010\000B\n1999-09-09R\004date\022!\b\n\020\f\032\006gender \000(\0010\000B\001FR\nvarchar(1)\0226\b\v\020]\032\vcreate_time \000(\0010\000B\0232020-02-02 02:02:02R\bdatetime\022\"\b\f\020]\032\foperate_time \000(\0010\001R\bdatetime"
, header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615479serverId: 1serverenCode: "UTF-8"executeTime: 1610943288000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00522379"
],raw=false,rawEntries=[]]

  查看kafka消费出来的数据

ϝ󬑢igdata	user_info"rowdatapϙzmysql-bin.000002

  删除刚才插入的数据

发送数据:Message[id=9,entries=[header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615575serverId: 1serverenCode: "UTF-8"executeTime: 1610943487000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " a"
, header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615738serverId: 1serverenCode: "UTF-8"executeTime: 1610943487000sourceType: MYSQLschemaName: "bigdata"tableName: "user_info"eventLength: 105eventType: DELETEprops {key: "rowsCount"value: "1"}
}
entryType: ROWDATA
storeValue: "\b\210\001\020\003P\000b\241\004\n*\b\000\020\373\377\377\377\377\377\377\377\377\001\032\002id \001(\0000\000B\00510001R\nbigint(20)\n*\b\001\020\f\032\nlogin_name \000(\0000\000B\004testR\fvarchar(200)\n)\b\002\020\f\032\tnick_name \000(\0000\000B\004testR\fvarchar(200)\n \b\003\020\f\032\006passwd \000(\0000\001R\fvarchar(200)\n$\b\004\020\f\032\004name \000(\0000\000B\004testR\fvarchar(200)\n0\b\005\020\f\032\tphone_num \000(\0000\000B\v11111111111R\fvarchar(200)\n.\b\006\020\f\032\005email \000(\0000\000B\r111@gmail.comR\fvarchar(200)\n\"\b\a\020\f\032\bhead_img \000(\0000\001R\fvarchar(200)\n\'\b\b\020\f\032\nuser_level \000(\0000\000B\0013R\fvarchar(200)\n&\b\t\020[\032\bbirthday \000(\0000\000B\n1999-09-09R\004date\n!\b\n\020\f\032\006gender \000(\0000\000B\001FR\nvarchar(1)\n6\b\v\020]\032\vcreate_time \000(\0000\000B\0232020-02-02 02:02:02R\bdatetime\n\"\b\f\020]\032\foperate_time \000(\0000\001R\bdatetime"
, header {version: 1logfileName: "mysql-bin.000002"logfileOffset: 4615843serverId: 1serverenCode: "UTF-8"executeTime: 1610943487000sourceType: MYSQLschemaName: ""tableName: ""eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\022\00522390"
],raw=false,rawEntries=[]]

  查看kafka消费出来的数据

󬑢igdata	user_info"rowdatapºܙzmysql-bin.000002

7 canal高可用

  一个总的canal服务器进程,每一个instance就是一个线程,单独对应一个mysql服务器的binlog。再起一个canal服务的话,对于同一个mysql服务器不能做负载均衡,数据分片等。有两个canal服务器都监控一个或多个mysql服务器的binlog,这两个canal服务同时只能有一个提供服务,当提供服务的这个宕机时,zookeeper能知道,zookeeper就通知另一个canal服务器让他提供服务。当原来宕机的那个再启动起来时,是抢占模式的,谁抢到就谁上,没抢到就standy模式。canal本身就是一个工具不存数据,宕机了就宕机,只有还有另外一个能提供服务就行,所以没有什么同步问题(不像数据库有同步问题)。因为启动canal服务是需要消耗资源的,不想redis高可用占资源太少了。canal的standy资源也不能给少了,要双份资源,就看企业在意不在意,服务核心不核心。

  maxwell和canal非常像,maxwell连高可用机制都没提供,倒了就再起。其实很多软件都是不提供高可用方案的,如果怕他倒的话,可以用Keepalived,这个机制很简单就是做心跳监测,可以给任何进程做一个心跳检测,可以一直检测他在不在进程列表里,如果宕了进程没了他会有一系列触发操作,可以在他里面写一个shell,如还有一个备机,要是这个挂了就在备机启动。或者自己手工在restart,这是一种通用型方案。Keepalived和maxwell是完全没有耦合关系的,maxwell完全不知道Keepalived的存在,Keepalived是从外围的观察者观察这个进程,不像zookeeper,是需要向它注册的。

  把canal目录分发给其他节点

$ scp -r canal canal@10.0.165.10:/home/canal/.local

  注意:这里zookeeper为观察者监控的模式,只能实现高可用,而不是负载均衡,即同一时点只有一个canal-server节点能够监控某个数据源,只要这个节点能够正常工作,那么其他监控这个数据源的canal-server只能做stand-by,直到工作节点停掉,其他canal-server节点才能抢占。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/473508.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 1657. 确定两个字符串是否接近

文章目录1. 题目2. 解题1. 题目 如果可以使用以下操作从一个字符串得到另一个字符串&#xff0c;则认为两个字符串 接近 &#xff1a; 操作 1&#xff1a;交换任意两个 现有 字符。 例如&#xff0c;abcde -> aecdb操作 2&#xff1a;将一个 现有 字符的每次出现转换为另一…

机器学习简介及学习思维导图

什么是机器学习机器学习是人工智能的一个分支。人工智能的研究是从以“推理”为重点到以“知识”为重点&#xff0c;再到以“学习”为重点&#xff0c;一条自然、清晰的脉络。机器学习是实现人工智能的一个途径&#xff0c;即以机器学习为手段解决人工智能中的问题。机器学习算…

LeetCode 1658. 将 x 减到 0 的最小操作数(哈希)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums 和一个整数 x 。每一次操作时&#xff0c;你应当移除数组 nums 最左边或最右边的元素&#xff0c;然后从 x 中减去该元素的值。请注意&#xff0c;需要 修改 数组以供接下来的操作使用。 如果可以将 x 恰好 减到 0 &…

1057: [ZJOI2007]棋盘制作 - BZOJ

Description国际象棋是世界上最古老的博弈游戏之一&#xff0c;和中国的围棋、象棋以及日本的将棋同享盛名。据说国际象棋起源于易经的思想&#xff0c;棋盘是一个8*8大小的黑白相间的方阵&#xff0c;对应八八六十四卦&#xff0c;黑白对应阴阳。而我们的主人公小Q&#xff0c…

机器学习:Scikit-learn与特征工程

“数据决定了机器学习的上限&#xff0c;而算法只是尽可能逼近这个上限”&#xff0c;这句话很好的阐述了数据在机器学习中的重要性。大部分直接拿过来的数据都是特征不明显的、没有经过处理的或者说是存在很多无用的数据&#xff0c;那么需要进行一些特征处理&#xff0c;特征…

Mvc系统学习9——Areas学习

在Mvc2.0中&#xff0c;新增加了一个特性就是Areas。在没有有使用Areas的情况下&#xff0c;我们的Mvc项目组织是下面这样的。当项目庞大的时候&#xff0c;Controllers,Model,View文件下下面势必会有很多文件。项目将难以管理。 通过使用Areas使我们可以很好的组织项目&#x…

天池 在线编程 数组游戏

文章目录1. 题目2. 解题1. 题目 样例 1 输入: [3, 4, 6, 6, 3] 输出: 7 说明: [3, 4, 6, 6, 3] -> [4, 5, 7, 6, 4] -> [5, 6, 7, 7, 5] -> [6, 7, 8, 7, 6] -> [7, 8, 8, 8, 7] -> [8, 9, 9, 8, 8] -> [9, 9, 10, 9, 9] -> [10, 10, 10, 10, 10] 来源&a…

机器学习:sklearn数据集与机器学习组成

机器学习组成&#xff1a;模型、策略、优化 《统计机器学习》中指出&#xff1a;机器学习模型策略算法。其实机器学习可以表示为&#xff1a;Learning RepresentationEvalutionOptimization。我们就可以将这样的表示和李航老师的说法对应起来。机器学习主要是由三部分组成&…

天池 在线编程 分割数组

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164426199705086870/193936950952137407 2. 解题 class Solution { public:/*** param arr: an inter array * return: return the min sum*/int splitArray(vector<int> &arr) {// …

C++ 添加程序图标到我的电脑

&#xff23;&#xff0b;&#xff0b; 像我的电脑中 百度网盘的 那图标快捷方式。如何生成的呢&#xff1f;设置程序图标到我的电脑 请看下边代码 就ok了(*^__^*) 嘻嘻…… 类似下图&#xff1a; 大家如果看我下边的不是很清楚&#xff0c;可以下载这个具体工程&#xff1b…

LeetCode 1663. 具有给定数值的最小字符串(贪心)

文章目录1. 题目2. 解题1. 题目 小写字符 的 数值 是它在字母表中的位置&#xff08;从 1 开始&#xff09;&#xff0c;因此 a 的数值为 1 &#xff0c;b 的数值为 2 &#xff0c;c 的数值为 3 &#xff0c;以此类推。 字符串由若干小写字符组成&#xff0c;字符串的数值 为…

LeetCode 1664. 生成平衡数组的方案数(前缀和+后缀和)

文章目录1. 题目2. 解题1. 题目 给你一个整数数组 nums 。你需要选择 恰好 一个下标&#xff08;下标从 0 开始&#xff09;并删除对应的元素。请注意剩下元素的下标可能会因为删除操作而发生改变。 比方说&#xff0c;如果 nums [6,1,7,4,1] &#xff0c; 那么&#xff1a; …

迪美特TVZ8双核智能高清播放器 在电视上编程不是梦

迪美特TVZ8双核智能高清播放器 两步让普通电视变云电视 独家VST&#xff1a; 全网聚合&#xff0c;极致体验&#xff1a;独家自主设计&#xff0c;炫丽生动的Win8风格UI界面&#xff1a; 新版VST全聚合是华人用户数最多的聚合平台软件&#xff0c;集合视频点播、网络直播…

LeetCode 1665. 完成所有任务的最少初始能量(贪心)

文章目录1. 题目2. 解题1. 题目 给你一个任务数组 tasks &#xff0c;其中 tasks[i] [actuali, minimumi] &#xff1a; actuali 是完成第 i 个任务 需要耗费 的实际能量。minimumi 是开始第 i 个任务前需要达到的最低能量。 比方说&#xff0c;如果任务为 [10, 12] 且你当…

词云(WordCloud)制作

以《神雕侠侣》为例&#xff0c;我们制作词云&#xff0c;看看有哪些高频词汇。 1. 导入一些包 # -*- coding:utf-8 -*- # Python Version: 3.7 # Time: 2020/11/27 19:32 # Author: Michael Ming # Website: https://michael.blog.csdn.net/ # File: word_cloud.py # Refere…

天池 在线编程 求和查找

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164427478262600292/204998627646706400 2. 解题 暴力 哈希查找 class Solution { public:/*** param inputs: an integer array* param tests: an integer array* return: return true if s…

天池 在线编程 条件串(DP)

文章目录1. 题目2. 解题1. 题目 来源&#xff1a;https://tianchi.aliyun.com/oj/164427478262600292/204998627646706401 2. 解题 把字符串分成ace&#xff0c;bdf 两部分进行处理&#xff0c;求以某个字符结束时的最小删除次数 class Solution { public:/*** param s: wri…

用 Kaggle 经典案例教你用 CNN 做图像分类!

我们来看一个 Kaggle 上比较经典的一个图像分类的比赛 CIFAR( CIFAR-10 - Object Recognition in Images )&#xff0c;这个比赛现在已经关闭了&#xff0c;但不妨碍我们来去通过它学习一下卷积神经网络做图像识别的代码结构。相信很多学过深度学习的同学都尝试过这个比赛&…

Flask知识点回顾以及重点内容

1. HTTP通信与Web框架 1.1 流程 客户端将请求打包成HTTP的请求报文&#xff08;HTTP协议格式的请求数据&#xff09; 采用TCP传输发送给服务器端 服务器接收到请求报文后按照HTTP协议进行解析 服务器根据解析后获知的客户端请求进行逻辑执行 服务器将执行后的结果封装成HTTP的响…

机器学习回归算法—线性回归及案例分析

一、回归算法回归是统计学中最有力的工具之一。机器学习监督学习算法分为分类算法和回归算法两种&#xff0c;其实就是根据类别标签分布类型为离散型、连续性而定义的。回归算法用于连续型分布预测&#xff0c;针对的是数值型的样本&#xff0c;使用回归&#xff0c;可以在给定…