Logstash
-
测试数据准备
DROP DATABASE IF EXISTS es;CREATE DATABASE es DEFAULT CHARACTER SET utf8;USE es;CREATE TABLE book (id INT NOT NULL,title VARCHAR(20),author VARCHAR(20),price DECIMAL(6,2),PRIMARY KEY(id) );DROP PROCEDURE IF EXISTS batchInsertBook;DELIMITER $$ CREATE PROCEDURE batchInsertBook(IN seed INT, IN loops INT) BEGINDECLARE i INT;DECLARE id INT;SET i = 0;SET id = seed;WHILE i < loops DOINSERT INTO book(id, title,author, price) VALUES(id, '雪山飞狐', '金庸', 50),(id+1, '神雕侠侣', '金庸', 60),(id+2, '三国演义', '罗贯中', 50),(id+3, '西游记', '吴承恩', 40);SET id = id + 4;SET i = i + 1;END WHILE; END $$ DELIMITER ;-- 禁用索引,加快数据导入速度 ALTER TABLE book DISABLE KEYS;-- 调用存储过程导入数据 CALL batchInsertBook(1, 100);-- 添加索引 ALTER TABLE book ENABLE KEYS;-- 修改表的引擎为innodb ALTER TABLE book ENGINE INNODB;
mysql> select count(*) from book; +----------+ | count(*) | +----------+ | 40000 | +----------+ 1 row in set (0.03 sec)
-
docker
安装logstash
# 拉取镜像 docker pull logstash:7.12.1
-
在宿主机配置目录
mkdir -p /root/logstash
-
在宿主机创建
/root/logstash/logstash.yml
,内容为空即可,该步骤不能省略 -
在宿主机创建
/root/logstash/logstash.conf
input {jdbc {jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.27.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"jdbc_user => "root"jdbc_password => "root"schedule => "* * * * *" statement => "SELECT * FROM book"type => "jdbc"} }filter { }output {stdout {codec => json_lines} }
-
本次连接的是
windows
上的MySQL
,通过IpV4的IP地址连接
-
上传
maven
仓库中的jar
# \apache-maven-3.9.6\repository\mysql\mysql-connector-java\8.0.11\mysql-connector-java-8.0.11.jar [root@localhost logstash]# ls mysql-connector-java-8.0.11.jar [root@localhost logstash]# chmod 644 mysql-connector-java-8.0.11.jar
-
开启
windows
的root
远程登录mysql -uroot -proot use mysql; update user set host = '%' where user = 'root'; FLUSH PRIVILEGES;
mysql> select host,user from user; +-----------+------------------+ | host | user | +-----------+------------------+ | % | root | | localhost | mysql.infoschema | | localhost | mysql.session | | localhost | mysql.sys | +-----------+------------------+ 4 rows in set (0.00 sec)
-
-
启动
logstash
容器docker run -d \--name logstash \-v ~/logstash/logstash.yml:/usr/share/logstash/config/logstash.yml \-v ~/logstash/logstash.conf:/usr/share/logstash/pipeline/logstash.conf \-v ~/logstash/mysql-connector-java-8.0.11.jar:/usr/share/logstash/mysql-connector-java-8.0.11.jar \logstash:7.12.1
-
查看日志
# 查看数据同步 docker logs -f logstash
-
将数据输出到
ElasticSearch
{% note blue ‘fas fa-bullhorn’ modern %}
既然我们能从
mysql
中读取数据,并输出到stdout
,那么我们同样可以从mysql
中读取数据,然后将数据输出到ElasticSearch
,修改logstash.conf
,内容如下{% endnote %}
input {jdbc {jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"jdbc_user => "root"jdbc_password => "root"schedule => "* * * * *"statement => "SELECT * FROM book"type => "jdbc"} }filter {}output {elasticsearch {hosts => ["192.168.32.128:9200"]index => "book"document_id => "%{id}"}stdout {codec => json_lines} }
-
确保
es
是启动的 -
重启
[root@192 logstash]# docker restart logstash logstash
-
进入如下界面
-
查看是否同步到
es
-
增量同步
-
修改
logstash.conf
,内容如下input {jdbc {jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.11.jar"jdbc_driver_class => "com.mysql.cj.jdbc.Driver"jdbc_connection_string => "jdbc:mysql://192.168.126.1:3306/es?useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMT%2B8"jdbc_user => "root"jdbc_password => "root"schedule => "* * * * *"type => "jdbc"# 记录查询结果中,某个字段的值use_column_value => true# 记录id字段的值,用于增量同步tracking_column => "id"# 指明要记录的字段的类型tracking_column_type => numeric# 指定要记录上一次查询的数据record_last_run => true# :sql_last_value代表上次查询出来的最大的“tracking_column”中的值statement => "SELECT * FROM book where id > :sql_last_value"last_run_metadata_path => "syncpoint_table"} }filter { }output {elasticsearch {hosts => ["192.168.32.128:9200"]index => "book"document_id => "%{id}"}stdout {codec => json_lines} }
-