Docker 中使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

一、Mysql 的安装和配置

1.使用 docker 安装 mysql,并且映射端口和 root 账号的密码

# 获取镜像
docker pull mysql:8.0.40-debian# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run -d -p 3388:3306 --name super-mysql -e MYSQL_ROOT_PASSWORD=123456 mysql:8.0.40-debian

2.连接 mysql,检查是否正确安装

        使用 Navicat 连接

使用 linux 命令行连接,172.21.121.208 是我本地映射的ip地址,这里换成对应 ip 即可

3.修改 mysql 的配置文件

# 进入刚刚安装的 mysql 容器
docker exec -it super-mysql /bin/bash# 查找 mysql 的配置文件 my.cnf
find / -name my.cnf 2>/dev/null#显示:
# /var/lib/dpkg/alternatives/my.cnf
# /etc/alternatives/my.cnf
# /etc/mysql/my.cnf# 修改配置文件 my.cnf 
vim /etc/mysql/my.cnf# 开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复# 如果没有vim,则需 先安装vim,执行如下命令
# 更新源
apt update# 安装vim
apt install vim

4.授权 canal 链接 mysql 账号具有作为 mysql slave 的权限, 如果已有账户可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

运行截图如下

使用刚刚新建的账号: canal 密码: canal 连接 mysql,成功了才往下配置

二、Canal 的安装和配置

1.使用 docker 安装 canal

# 获取镜像
docker pull canal/canal-server:latest# 查看镜像是否下载成功
docker images# 运行msyql镜像
docker run --name super-canal -p 3399:11111 -d canal/canal-server:latest

2.修改 canal 配置,并且重启 canal

# 进入刚刚安装的 canal 容器
docker exec -it super-canal /bin/bash# 查找 canal 的配置文件 instance.properties
find / -name 'instance.properties' 2>/dev/null 
# 显示:
# /home/admin/canal-server/conf/example/instance.properties# 修改配置文件 instance.properties
vi /home/admin/canal-server/conf/example/instance.propertiescanal.instance.master.address=连接mysql的ip:port
canal.instance.dbUsername=连接mysql的账号
canal.instance.dbPassword=连接mysql的密码# 重启 canal 服务
find / -name 'stop.sh' 2>/dev/null # 查找停止 canal 命令
bash /home/admin/canal-server/bin/stop.sh # 停止 canalfind / -name 'startup.sh' 2>/dev/null # 查找重启 canal 命令
bash /home/admin/canal-server/bin/startup.sh # 重启 canal

主要修改连接 msyql 的 ip 和 port,使用的账号和密码,修改配置如下:

三、使用 PHP 测试 Canal 是否监听到了 Mysql 的变化

1.初始化项目

# 创建项目文件夹 canal-elasticsearch,并且进入到项目文件夹# composer 初始化项目
composer init 
# 一直按回车键 Enter# 安装 cannal 依赖
composer require xingwenge/canal_php

2.在 src 目录下新建文件 index.php ,写入一下代码

<?php
require __DIR__.'/../vendor/autoload.php';
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\Fmt;try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);# $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");# $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);}}sleep(1);}$client->disConnect();
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

3.命令行中启动脚本 php ./src/index.php

4.在 mysql 中新建表或者新增数据,就会在命令行中打印出来

# 创建数据表
CREATE TABLE `user` (`id` int unsigned NOT NULL AUTO_INCREMENT,`name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',`age` int unsigned NOT NULL DEFAULT '0',`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=18 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

注意:如果新增数据表或者其它情况,导致 canal 突然连接不上 mysql 了,建议停止 canal,且删除当前的 canal 容器,重新使用 docker 安装 canal,这样解决起来比较迅速(坏笑)

四、ElasticSearch 的安装和配置

1.使用 docker 安装 elasticsearch

# 获取镜像
docker pull registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1# 查看镜像是否下载成功
docker images# 创建数据文件夹
mkdir /mnt/d/Work/Code/docker/elasticsearch/data# 运行 elasticsearch 镜像
docker run --name super-elasticsearch -d \-p 3320:9200 -p 3330:9300 \-e "ES_JAVA_OPTS=-Xms4g -Xmx16g" \-e "discovery.type=single-node" \-v /mnt/d/Work/Code/docker/elasticsearch/data:/usr/share/elasticsearch/data \registry.cn-hangzhou.aliyuncs.com/xka/es:7.11.2-210328-1

2.测试 elasticsearch 是否安装成功, 在浏览器地址栏输入:127.0.0.1:3320,出现如下画面表示安装成功了

五、使用 PHP 通过 Canal 同步 Mysql 数据到 ElasticSearch

1.在 php 中使用 composer 安装 elasticsearch 依赖包

# 使用 composer 安装 elasticsearch 依赖包
composer require elasticsearch/elasticsearch

2.使用 php 在 elasticsearch 中创建 mapping,在 src 目录下创建文件 creatElasticSearchMapping.php

<?php
require 'vendor/autoload.php';use Elasticsearch\ClientBuilder;
use Elasticsearch\Common\Exceptions\BadRequest400Exception;// 创建 Elasticsearch 客户端
$client = ClientBuilder::create()->setHosts(['localhost:3320']) // 设置 Elasticsearch 主机和端口->build();// 索引名称
$indexName = 'canal_user_index';// 索引设置和映射
$params = ['index' => $indexName,'body'  => ['settings' => ['number_of_shards' => 1, // 分片数量'number_of_replicas' => 0 // 副本数量],'mappings' => ['properties' => ['name' => ['type' => 'text'],'age' => ['type' => 'integer'],'email' => ['type' => 'keyword'],'created_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],'updated_at' => ['type' => 'date', 'format' => 'yyyy-MM-dd HH:mm:ss'],]]]
];try {// 创建索引$response = $client->indices()->create($params);echo "索引 '$indexName' 创建成功:\n";print_r($response);
} catch (BadRequest400Exception $e) {// 如果索引已经存在if (strpos($e->getMessage(), 'index_already_exists_exception') !== false) {echo "索引 '$indexName' 已经存在。\n";} else {echo "创建索引时发生错误: " . $e->getMessage() . "\n";}
} catch (\Exception $e) {echo "创建索引时发生错误: " . $e->getMessage() . "\n";
}

3.检查elasticsearch中的mapping是否创建成功

curl -XGET 'http://localhost:3320/canal_user_index/_mapping'

提示如图

4.在 php 中通过 canal 获取 msyql 的数据变化,且更新到 elasticsearch 中,数据的增删改代码都在下面了,在 src 目录下创建文件 canalToElasticSearch.php

<?php
require __DIR__.'/../vendor/autoload.php';
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
use Elasticsearch\ClientBuilder;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\Fmt;$clientES = ClientBuilder::create()->setHosts(['localhost:3320'])->setSSLVerification(false) // 禁用 SSL 验证(仅用于开发环境)->setRetries(3) // 设置重试次数->build();// 索引名称
$indexName = 'canal_user_index';try {$client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);$client->connect("172.21.121.208", 3399);$client->checkValid();$client->subscribe("1001", "example", ".*\\..*");echo "script start success!";while (true) {$message = $client->get(100);if ($entries = $message->getEntries()) {foreach ($entries as $entry) {Fmt::println($entry);$rowChange = new RowChange();$rowChange->mergeFromString($entry->getStoreValue());$evenType = $rowChange->getEventType();$header = $entry->getHeader();/** @var RowData $rowData */foreach ($rowChange->getRowDatas() as $rowData) {switch ($evenType) {/** 删除数据 */case EventType::DELETE:if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && $clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->delete(['index' => $indexName, 'type' => '_doc', 'id' => $id]);}}break;/** 新增数据 */case EventType::INSERT:$insertData = [];if ($rowData->getAfterColumns()) {foreach ($rowData->getAfterColumns() as $column) {$insertData = array_merge($insertData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($insertData)) {$params = ['index' => $indexName, 'body' => $insertData];if (!empty($id)) $params['id'] = $id;$response = $clientES->index($params);}}break;default:/** 更新数据 */if ($rowData->getAfterColumns()) {$updateData = [];foreach ($rowData->getAfterColumns() as $column) {$updateData = array_merge($updateData, [$column->getName() => $column->getValue()]);if ($column->getName() === 'id') $id = $column->getValue();}if (!empty($id) && !empty($updateData)) {$params = ['index' => $indexName,'id' => $id,'body' => ['doc' => $updateData],];if ($clientES->exists(['index' => $indexName, 'id' => $id])) {$response = $clientES->update($params);} else {$updateData['id'] = $id;$params['body'] = $updateData;$response = $clientES->index($params);}}}break;}}}}sleep(1);}
} catch (\Exception $e) {echo $e->getMessage(), PHP_EOL;
}

5.执行php脚本,监听数据变化

我这里在 mysql 中添加了一些数据,都同步到 elasticsearch 里面了

msyql 中的截图:

elasticsearch 中的截图

看到这里,辛苦了。

感觉自己今天又又又变得比昨天更强了

参考文档链接:

1.Mysql 数据库 主从数据库 (主从)(主主)-CSDN博客

2.https://github.com/xingwenge/canal-php

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/63870.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习量化交易6】优化改造基于miniQMT的量化交易软件,已开放下载~(已完成数据下载、数据清洗、可视化模块)

我是Mr.看海&#xff0c;我在尝试用信号处理的知识积累和思考方式做量化交易&#xff0c;应用深度学习和AI实现 股票自动交易&#xff0c;目的是实现财务自由~ 目前我正在开发基于miniQMT的量化交易软件。 之前写到&#xff0c;目前我已经完成了数据下载、数据清洗和数据可视化…

Ubuntu18安装后基本配置操作

1. 关掉自动更新 不关掉自动更新&#xff0c;会将你的ubuntu系统更新到更高版本&#xff0c;一些配置就不能用了&#xff0c;所以要关掉自动更新。在“软件和更新”中将“自动检查更新”设置为从不。 2. ubuntu换国内源 参考链接换源 按照这个换源这个换源好使 &#xff0c;…

Hadoop一课一得

Hadoop作为大数据时代的奠基技术之一&#xff0c;自问世以来就深刻改变了海量数据存储与处理的方式。本文将带您深入了解Hadoop&#xff0c;从其起源、核心架构、关键组件&#xff0c;到典型应用场景&#xff0c;并结合代码示例和图示&#xff0c;帮助您更好地掌握Hadoop的实战…

2024153读书笔记|《春烂漫:新平摄影作品选》——跳绳酷似人生路,起落平常,进退平常,莫惧征途万里长

2024153读书笔记|《春烂漫&#xff1a;新平摄影作品选》——跳绳酷似人生路&#xff0c;起落平常&#xff0c;进退平常&#xff0c;莫惧征途万里长 《春烂漫&#xff1a;新平摄影作品选》作者新平&#xff0c;2019.12.25年读完的小书&#xff0c;当时就觉得挺不错&#xff0c;今…

JAVA:建造者模式(Builder Pattern)的技术指南

1、简述 建造者模式(Builder Pattern)是一种创建型设计模式,它通过将对象的构造过程与表示分离,使得相同的构造过程可以创建不同的表示。建造者模式尤其适用于创建复杂对象的场景。 设计模式样例:https://gitee.com/lhdxhl/design-pattern-example.git 本文将详细介绍建…

GESP202412 四级【Recamán】题解(AC)

》》》点我查看「视频」详解》》》 [GESP202412 四级] Recamn 题目描述 小杨最近发现了有趣的 Recamn 数列&#xff0c;这个数列是这样生成的&#xff1a; 数列的第一项 a 1 a_1 a1​ 是 1 1 1&#xff1b;如果 a k − 1 − k a_{k-1}-k ak−1​−k 是正整数并且没有在数…

「数据结构详解·十五」树状数组

「数据结构详解一」树的初步「数据结构详解二」二叉树的初步「数据结构详解三」栈「数据结构详解四」队列「数据结构详解五」链表「数据结构详解六」哈希表「数据结构详解七」并查集的初步「数据结构详解八」带权并查集 & 扩展域并查集「数据结构详解九」图的初步「数据结构…

如何通过python实现一个web自动化测试框架?

一、首先你得知道什么是Selenium&#xff1f; Selenium是一个基于浏览器的自动化测试工具&#xff0c;它提供了一种跨平台、跨浏览器的端到端的web自动化解决方案。Selenium主要包括三部分&#xff1a;Selenium IDE、Selenium WebDriver和Selenium Grid。 Selenium IDE&#…

[Rust开发]actix_web::middleware 中间件

actix_web::middleware 在 Actix Web 框架中扮演着重要的角色&#xff0c;它允许开发者在处理 HTTP 请求和响应的过程中插入自定义的逻辑。中间件可以在请求到达处理函数之前或响应返回给客户端之前执行&#xff0c;从而实现日志记录、身份验证、数据验证、错误处理等功能。 为…

opencv——图片矫正

图像矫正 图像矫正的原理是透视变换&#xff0c;下面来介绍一下透视变换的概念。 听名字有点熟&#xff0c;我们在图像旋转里接触过仿射变换&#xff0c;知道仿射变换是把一个二维坐标系转换到另一个二维坐标系的过程&#xff0c;转换过程坐标点的相对位置和属性不发生变换&a…

记录:ubuntu24.04源码安装nginx

一. 下载Nginx源码 两个地址二选一即可 Nginx官网Nginx官网 Github eg&#xff1a;nginx-1.27.3.tar.gz 下载到 ubuntu24.04 的 Downloads &#xff0c;解压 cd Downloads tar -zxvf nginx-1.27.3.tar.gz二. 编译安装 Note: 编译最好用 root 权限&#xff0c; 使用下面命令…

CNCF云原生生态版图

CNCF云原生生态版图 概述什么是云原生生态版图如何使用生态版图 项目和产品&#xff08;Projects and products&#xff09;会员&#xff08;Members&#xff09;认证合作伙伴与提供商&#xff08;Certified partners and providers&#xff09;无服务&#xff08;Serverless&a…

wsl2子系统ubuntu发行版位置迁移步骤

默认的wsl2发行版是安装在windos的c盘&#xff0c;占用空间较大&#xff0c;有迁移需求&#xff0c;也可以迁移到其他电脑&#xff1b; 查看现有发行版信息 运行以下命令查看现有的 WSL 发行版及其状态&#xff1a; wsl --list --verbose# 输出示例NAME STATE …

SpringBoot基于Redis+WebSocket 实现账号单设备登录.

引言 在现代应用中&#xff0c;一个账号在多个设备上的同时登录可能带来安全隐患。为了解决这个问题&#xff0c;许多应用实现了单设备登录&#xff0c;确保同一个用户只能在一个设备上登录。当用户在新的设备上登录时&#xff0c;旧设备会被强制下线。 本文将介绍如何使用 Spr…

MVC配置文件及位置

配置文件位置 默认位置 WEB-INF目录下&#xff0c;文件名&#xff1a;<servlet-name>-servlet.xml <?xml version"1.0" encoding"UTF-8"?> <web-app xmlns"http://xmlns.jcp.org/xml/ns/javaee"xmlns:xsi"http://www.w3.…

【若依项目-RuoYi】掌握若依前端的基本流程

搞毕设项目&#xff0c;使用前后端分离技术&#xff0c;后端springBoot&#xff0c;前端vue3element plus。自己已经写好前端与后端代码&#xff0c;但想换一个前端界面所以使用到了若依&#xff0c;前前后后遇到许多坑&#xff0c;记录一下&#xff0c;方便之后能够快速回忆。…

图像边缘检测示例(综合利用阈值分割、数学形态学和边缘检测算子)

一、问题 读入一副灰度图像&#xff08;如果是彩色图像&#xff0c;可以先将其转化为灰度图像&#xff09;&#xff0c;然后提取比较理想的灰度图像边缘。这里以moon.tif为例。 二、算法 大家一开始容易想到直接利用MATLAB的内置函数edge并采用不同边缘提取算子进行边缘提取&a…

R语言的数据结构-向量

【图书推荐】《R语言医学数据分析实践》-CSDN博客 《R语言医学数据分析实践 李丹 宋立桓 蔡伟祺 清华大学出版社9787302673484》【摘要 书评 试读】- 京东图书 (jd.com) R语言编程_夏天又到了的博客-CSDN博客 在R语言中&#xff0c;数据结构是非常关键的部分&#xff0c;它提…

集成方案 | Docusign + 泛微,实现全流程电子化签署!

本文将详细介绍 Docusign 与泛微的集成步骤及其效果&#xff0c;并通过实际应用场景来展示 Docusign 的强大集成能力&#xff0c;以证明 Docusign 集成功能的高效性和实用性。 在现代企业运营中&#xff0c;效率和合规性是至关重要的。泛微作为企业级办公自动化和流程管理的解决…

Docker Compose应用实战

文章目录 1、使用Docker Compose必要性及定义2、Docker Compose应用参考资料3、Docker Compose应用最佳实践步骤1_概念2_步骤 4、Docker Compose安装5、Docker Compose应用案例1_网站文件准备2_Dockerfile文件准备3_Compose文件准备4_使用docker-compose up启动容器5_访问6_常见…