ElasticSearch 7.15.2 使用java canal 接入实现灵活化增量数据准实时同步

前言:
①canal.adapter-1.1.5 支持一对一单表的增量数据同步ElasticSearch 7;
②对于多表聚合场景的SQL满足不了我们的业务需求。
③采用java canal 接入,可以实现灵活化增量数据准实时同步

文章目录

          • 一、java canal 接入
            • 1. 依赖导入
            • 2. 增加配置
            • 3. canal 客户端
            • 4. 消息消费/处理模型
            • 5. 重建关联索引
          • 二、效果验证
            • 2.1. 关闭adapter
            • 2.2. 修改数据
            • 2.3. 数据监控
            • 2.4. 索引查询
            • 2.5. 关联数据修改
            • 2.6. 数据监控
            • 2.7. 索引查询

一、java canal 接入

前提:由于咱们是做增量数据同步ElasticSearch 7.15.2,因此项目中需要提前整合好ElasticSearch 7.15.2

1. 依赖导入
 <!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.client --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.5</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.protocol --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.protocol</artifactId><version>1.1.5</version></dependency><!-- https://mvnrepository.com/artifact/com.alibaba.otter/canal.common --><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.common</artifactId><version>1.1.5</version></dependency>
2. 增加配置

application.properties

# canal服务端ip
canal.alone-ip=192.168.159.134
# destination
canal.destination=example
canal.username=canal
canal.passwoed=canal
canal.port=11111
3. canal 客户端
package com.imooc.dianping.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.google.common.collect.Lists;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;/*** canal 客户端** @author gblfy* @date 2021-11-22*/
@Component
public class CanalClient implements DisposableBean {private CanalConnector canalConnector;@Value("${canal.alone-ip}")private String CANALIP;@Value("${canal.destination}")private String DESTINATION;@Value("${canal.username}")private String USERNAME;@Value("${canal.passwoed}")private String PASSWOED;@Value("${canal.port}")private int PORT;@Beanpublic CanalConnector getCanalConnector() {//canal实例化canalConnector = CanalConnectors.newClusterConnector(Lists.newArrayList(new InetSocketAddress(CANALIP, PORT)), DESTINATION, USERNAME, PASSWOED);//连接canalcanalConnector.connect();// 自定filter 格式{database}.{table}canalConnector.subscribe();// 回滚寻找上次中断的位置canalConnector.rollback();//返回连接return canalConnector;}@Overridepublic void destroy() throws Exception {if (canalConnector != null) {//防止canal泄露canalConnector.disconnect();}}
}

客户端有了,接下来,解决消息消费的问题?
消息消费的过程,就是轮训跑批的过程。可以简单理解为我们对应消息的消费,类似于canal客户端不断地从canal.deployer当中不断拉取mysql数据库中bin_log同步过来的消息,而消息消费完成之后,去告知canal.deployer,这条消息已经ack确认消费过了,之后,就不用推送给我了。使用这种方式,来完成消息消费的动作。

4. 消息消费/处理模型

接入消息消费模型CanalScheduling

package com.imooc.dianping.canal;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import com.imooc.dianping.dal.ShopModelMapper;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;/*** canal 消息消费模型** @author gblfy* @date 2021-11-22*/
@Component
public class CanalScheduling implements Runnable, ApplicationContextAware {//记录日志private final static Logger logger = LoggerFactory.getLogger(CanalScheduling.class);private ApplicationContext applicationContext;@Autowiredprivate ShopModelMapper shopModelMapper;@Resourceprivate CanalConnector canalConnector;@Autowiredprivate RestHighLevelClient restHighLevelClient;@Override//每个100毫秒 唤醒线程执行run方法@Scheduled(fixedDelay = 100)public void run() {//初始化批次IDlong batchId = -1;try {//批次/1000条int batchSize = 1000;Message message = canalConnector.getWithoutAck(batchSize);//1000条批次的ID  当获取的batchId=-1代表没有消息batchId = message.getId();List<CanalEntry.Entry> entries = message.getEntries();// batchId != -1 (内部消费有消息的)// entries.size() > 0有对应的内容//当batchId != -1 并且entries.size() > 0说明mysql bin_log发生了多少条数据的变化if (batchId != -1 && entries.size() > 0) {//逐条处理entries.forEach(entry -> {//处理类型: bin_log已ROW方式处理的消息if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {//消息解析处理publishCanalEvent(entry);}});}//消息解析完毕后,告知批次消息已经消费ack确认完成canalConnector.ack(batchId);} catch (Exception e) {e.printStackTrace();//将本次消息回滚,下次继续消息canalConnector.rollback(batchId);}}/*** 消息解析处理函数** @param entry*/private void publishCanalEvent(CanalEntry.Entry entry) {//事件类型 只关注INSERT、UPDATE、DELETECanalEntry.EventType eventType = entry.getHeader().getEventType();//获取发生变化的数据库String database = entry.getHeader().getSchemaName();//获取发生变化的数据库中的表String table = entry.getHeader().getTableName();CanalEntry.RowChange change = null;try {//记录这条消息发生了那些变化change = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {e.printStackTrace();return;}change.getRowDatasList().forEach(rowData -> {List<CanalEntry.Column> columns = rowData.getAfterColumnsList();//主键String primaryKey = "id";CanalEntry.Column idColumn = columns.stream().filter(column -> column.getIsKey()&& primaryKey.equals(column.getName())).findFirst().orElse(null);//将Columns转换成mapMap<String, Object> dataMap = parseColumnsToMap(columns);try {indexES(dataMap, database, table);} catch (IOException e) {e.printStackTrace();}});}/*** 将Columns转换成map** @param columns* @return*/Map<String, Object> parseColumnsToMap(List<CanalEntry.Column> columns) {Map<String, Object> jsonMap = new HashMap<>();columns.forEach(column -> {if (column == null) {return;}jsonMap.put(column.getName(), column.getValue());});return jsonMap;}private void indexES(Map<String, Object> dataMap, String database, String table) throws IOException {logger.info("发生变化的行数据:{} ", dataMap);logger.info("发生变化的数据库:{}  ", database);logger.info("发生变化的表:{} ", table);// 限定处理出具库范围 支处理数据库名称为dianpingdb的消息if (!StringUtils.equals("dianpingdb", database)) {return;}/*** 我们要关注表数据范围* 当seller表、category表、shop 表发生变化,都buildESQuery* 当着3个参数中任意一个参数发生变化,我只需要将发生变化的ID传入,重建与此ID关联的索引*/List<Map<String, Object>> result = new ArrayList<>();if (StringUtils.equals("seller", table)) {result = shopModelMapper.buildESQuery(new Integer((String) dataMap.get("id")), null, null);} else if (StringUtils.equals("category", table)) {result = shopModelMapper.buildESQuery(null, new Integer((String) dataMap.get("id")), null);} else if (StringUtils.equals("shop", table)) {result = shopModelMapper.buildESQuery(null, null, new Integer((String) dataMap.get("id")));} else {//不关注其他的表return;}for (Map<String, Object> map : result) {IndexRequest indexRequest = new IndexRequest("shop");indexRequest.id(String.valueOf(map.get("id")));indexRequest.source(map);//更新索引restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
}
5. 重建关联索引
package com.imooc.dianping.dal;import com.imooc.dianping.model.ShopModel;
import org.apache.ibatis.annotations.Param;import java.math.BigDecimal;
import java.util.List;
import java.util.Map;public interface ShopModelMapper {//当着3个参数中任意一个参数发生变化,我只需要将发生变化的ID传入,重建与此ID关联的索引List<Map<String,Object>> buildESQuery(@Param("sellerId")Integer sellerId,@Param("categoryId")Integer categoryId,@Param("shopId")Integer shopId);
}
    <select id="buildESQuery" resultType="java.util.Map">select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flagfrom shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id<if test="sellerId != null">and c.id = #{sellerId}</if><if test="categoryId != null">and b.id = #{categoryId}</if><if test="shopId != null">and a.id = #{shopId}</if></select>
二、效果验证
2.1. 关闭adapter
cd /app/canal/canal.adapterbin/stop.sh
2.2. 修改数据

修改dianpingdb数据库中shop表中ID=1的数据中name的值
陕西面馆(北京亦庄) 调整为gblfy.com陕西面馆(北京亦庄),提交事务!
在这里插入图片描述

2.3. 数据监控

在这里插入图片描述

2021-11-23 16:19:21.338  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的行数据:{icon_url=/static/image/shopcover/xchg.jpg, address=船厂路36号, latitude=31.195341, end_time=22:00, created_at=2021-11-19 15:53:52, tags=新开业 人气爆棚, start_time=10:00, updated_at=2021-12-22 15:53:52, category_id=1, name=gblfy.com陕西面馆(北京亦庄), remark_score=4.9, price_per_man=156, id=1, seller_id=1, longitude=120.915855} 
2021-11-23 16:19:21.356  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的数据库:dianpingdb  
2021-11-23 16:19:21.356  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的表:shop 
2.4. 索引查询

# 查询shop索引
GET /shop/_search
{"query": {"match": {"name": "陕西面馆"}}
}

在这里插入图片描述

2.5. 关联数据修改

单表增量同步,官网本身就支持,关联表数据修改,增量准实时同步,官网是不支持的。因此,咱们需要继续测试修改关联表的数据,再次验证。

修改dianpingdb数据库中category表中ID=1的数据中name的值
美食5 调整为美食我的最爱666,提交事务!

修改前:
在这里插入图片描述
修改后:
在这里插入图片描述

2.6. 数据监控

在这里插入图片描述

2021-11-23 16:25:16.325  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的行数据:{icon_url=/static/image/firstpage/food_u.png, updated_at=2019-06-10 15:33:37, name=美食我的最爱666, created_at=2019-06-10 15:33:37, id=1, sort=99} 
2021-11-23 16:25:16.334  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的数据库:dianpingdb  
2021-11-23 16:25:16.338  INFO 3792 --- [   scheduling-1] c.imooc.dianping.canal.CanalScheduling   : 发生变化的表:category
2.7. 索引查询
GET /shop/_search
{"query": {"term": {"category_name": "美食我的最爱666"}}
}

在这里插入图片描述


# 查询shop索引
GET /shop/_search
{"query": {"match": {"name": "陕西面馆"}}
}

在这里插入图片描述
至此,我们完成了通过java代码的方式,灵活化的根据ID去接入我们对应es增量的准实时更新!小伙伴们,一起加油!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/516756.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2020 年最厉害的 10 门编程语言

作者 | 沉默王二来源 | 沉默王二对于很多初学编程的人来说&#xff0c;尤其是马上要入学的大一新生来说&#xff0c;选择哪门子编程语言实在是个痛苦的事。选择对了&#xff0c;毕业后顺利找到工作&#xff0c;完美走向职场&#xff1b;选择错了&#xff0c;毕业后受伤的才华无…

MaxCompute客户端在windows命令行下查询中文乱码怎么办?

MaxCompute客户端工具是阿里云大数据计算服务MaxCompue产品官方客户端工具&#xff0c;通过客户端工具可以连接MaxCompute项目&#xff0c;完成包括数据管理、数据上下传、作业执行、用户及授权管理等各项操作。 但有用户会碰到在Windows下的命令行中执行odpscmd后&#xff0c;…

SLS多云日志采集、处理及分析

场景描述 本文根据用户业务场景不同分别采用Logtail和Log producer写入阿里云日志服务&#xff0c;针对未使用其他日志采集服务的用户&#xff0c;推荐在第三方云平台或线下IDC服务器安装logtail采集并使用DCDN安全传输&#xff1b;针对已使用其他日志采集工具并且已有日志服务…

10分钟搭建完成人脸通行系统 百度『乘风』人脸智能化平台了解一下

目前人脸技术已在企业办公、智慧社区、金融保险等多领域多场景中落地应用&#xff0c;发展潜力巨大。8月21日&#xff0c;百度大脑开放日“乘风新基建&#xff0c;加速产业智能化升级”专场活动在乌镇召开。会上&#xff0c;基于百度大脑领先的人脸识别技术&#xff0c;百度智能…

聚焦数字化智慧安防的新型社区

云栖号案例库&#xff1a;【点击查看更多上云案例】 不知道怎么上云&#xff1f;看云栖号案例库&#xff0c;了解不同行业不同发展阶段的上云方案&#xff0c;助力你上云决策&#xff01; 引言 现如今&#xff0c;智慧城市建设已成为全球城市发展的必然趋势&#xff0c;全球仅…

Nexus 3.31.1-01搭建 maven 私服 windows

文章目录1. Nexus 3 下载2. 解压后目录3. 前台启动4. 浏览器访问5. 登录1. Nexus 3 下载 nexus3下载地址 2. 解压后目录 3. 前台启动 进行命令窗口&#xff0c;执行以下命令 nexus.exe /run等待出现这个信息 4. 浏览器访问 http://localhost:8081/ 点右上角登录 5. 登…

中邮智递通过数加和datav将系统和服务迁移到大数据平台

云栖号案例库&#xff1a;【点击查看更多上云案例】 不知道怎么上云&#xff1f;看云栖号案例库&#xff0c;了解不同行业不同发展阶段的上云方案&#xff0c;助力你上云决策&#xff01; 概述 中邮智递数据平台发展到现在经历了四个阶段: (1) 直接在业务生产系统开发生成报表…

百度大脑“乘风”新基建,“破浪”产业智能化落地

人脸识别技术在AI时代不仅可以带来便利&#xff0c;同时也提高了效率。8月21日&#xff0c;百度大脑开放日在乌镇举办“乘风新基建&#xff0c;加速产业智能化升级”专场&#xff0c;桐乡市经济和信息化局副局长陈再飞、乌镇镇科协秘书长钱永琪、桐乡市工业互联网企业联合会会长…

美柚上云 致力成为最懂女人的互联网企业

云栖号案例库&#xff1a;【点击查看更多上云案例】 不知道怎么上云&#xff1f;看云栖号案例库&#xff0c;了解不同行业不同发展阶段的上云方案&#xff0c;助力你上云决策&#xff01; 案例背景 美柚以让女人更美更健康为己任&#xff0c;致力成为最懂女人的互联网企业;美柚…

MaxCompute中如何使用OSS外部表读取JSON数据?

一、打开OSS&#xff0c;上传json文件 json文件内容展示&#xff1a; {"id":5644228109524316032,"sourceType":1} {"id":-736866360508848202,"sourceType":3} 二、登录DataWorks&#xff0c;建立外部表 建表语句&#xff1a; CRE…

阿里工程师用 8 张图告诉你如何存储、管理泛内容数据

作者| 阿里文娱高级开发工程师 至德责编 | 王晓曼头图 | CSDN 下载自东方 IC用户在优酷或者其它互联网App上看到的文字、图片、视频等&#xff0c;都可以被称为内容&#xff0c;那么这些内容是如何被生产、管理和组织的&#xff1f;本文将简单介绍阿里文娱是如何利用网状关系组…

nexus3 作为maven 私服配置国内加速以及企业管理内部jar IDEA 实战

文章目录一、nexus3 配置1. 新建仓库概述2. 阿里云代理仓库3. 自己的仓库4. 自己的仓库组二、maven配置2.1. 私服配置2.2. 替换后的配置三、IntelliJ IDEA3.1. 创建项目3.2. 指定配置3.3. 下载依赖四、nexus3 监控4.1. 查看依赖版本4.2. 版本对比4.3. aliyun 仓库地址一、nexus…

大数据上云第一课:MaxCompute授权和外表操作躲坑指南

一、子账号创建、AK信息绑定 如果您是第一次使用子账号登录数加平台和使用DataWorks&#xff0c;需要确认以下信息&#xff1a; • 该子账号所属主账号的企业别名。 • 该子账号的用户名和密码。 • 该子账号的AccessKey ID和AccessKey Secret。 • 确认主账号已经允许子账号启…

程序员的年龄越大编程能力越弱???原来我们都理解错了

大多数人都说程序员是低调多金的代表&#xff0c;但是一旦年龄突破30岁&#xff0c;就容易陷入各种被嫌弃中&#xff0c;无法逃脱三十五岁定律。这让许多已到中年的程序员感觉忽然头上悬了几把剑。一位74岁的数据科学家Gene DAngelo则恰恰相反。他曾在社区提出话题&#xff1a;…

唱吧基于 MaxCompute 弥补自建体系的不足

本文作者&#xff1a;马星显 唱吧大数据负责人 使用 MaxCompute之前&#xff0c;唱吧使用自建体系来存储处理各端收集来的日志数据&#xff0c;包括请求访问记录、埋点数据、服务器业务数据等。初期这套基于开源组件的体系有力支撑了数据统计、业务报表、风控等业务需求。但随…

1.倒排索引 2.逻辑斯提回归算法

1.倒排索引 https://help.aliyun.com/zh/open-search/retrieval-engine-edition/introduction-to-inverted-indexes 倒排索引&#xff08;Inverted Index&#xff09;是一种数据结构&#xff0c;用于快速查找包含某个特定词或词语的文档。它主要用于全文搜索引擎等应用&#…

塑云科技基于 KafKa+OTS+MaxCompute 完成物联网系统技术重构

塑云科技&#xff1a;性能突破&#xff0c;基于KafKaOTSMaxCompute 完成了一次物联网系统技术重构 背景&#xff1a;创业团队&#xff0c;专注于氢能燃料电池生态链的运营支撑&#xff0c;当前主要的业务组成为新能源车整车实时运营监控分析&#xff0c;加氢站实时运营监控分析…

什么是Docker?看这一篇文章就够了

作者 | 码农的荒岛求生来源 | 程序员小灰&#xff08;ID: chengxuyuanxiaohui&#xff09;程序员&#xff0c;应该怎样理解docker&#xff1f;容器技术的起源假设你们公司正在秘密研发下一个“今日头条”APP&#xff0c;我们姑且称为明日头条&#xff0c;程序员自己从头到尾搭建…

基于MaxCompute 衣二三帮助客户找到合适自己的衣服

摘要&#xff1a;本文由衣二三CTO程异丁为大家讲解了如何基于MaxCompute构建智能化运营工具。 衣二三作为亚洲最大的共享时装平台&#xff0c;MaxCompute是如何帮助它解决数据提取速度慢、数据口径差异等问题呢&#xff1f;程异丁通过衣二三数据体系架构&#xff0c;从用户运营…

Nexus 3.31.1 maven 私服 搭建篇 linux

文章目录1. Nexus 3 下载2. 解压3. 目录调整4. 重命名5. 创建用户6. 调整家目录7. 指定启动用户8. 环境变量配置9. 刷新环境变量10. 修改工作目录11. 指定jdk12. 修改权限13. nexus启动14. 状态验证15. 浏览器验证16. 登录17. 初始化设置软件版本JDK1.8.0_202Nexus3.31.1Disk s…