电商项目-数据同步解决方案(三)商品上架同步更新ES索引库

一、 需求分析和业务逻辑

主要应用技术有:Feign远程调用, 消息队列-RabbitMQ ,分布式搜索引擎-ElasticSearch,Eureka,Canal

商品上架将商品的sku列表导入或者更新索引库。
数据监控微服务需要定义canal监听器,监听商品表的改变,一旦发现商品表内容改变,需要将商品的SPUID发送到消息队列中。

实现思路:

(1)在数据监控微服务中监控tb_spu表的数据,当tb_spu发生更改且is_marketable为1时,表示商品上架,将spu的id发送到rabbitmq。

(2)在rabbitmq管理后台创建商品上架交换器(fanout)。使用分列模式的交换器是考虑商品上架会有很多种逻辑需要处理,导入索引库只是其中一项,另外还有商品详细页静态化等操作。这样我们可以创建导入索引库的队列和商品详细页静态化队列并与商品上架交换器进行绑定。

(3)搜索微服务从rabbitmq的导入索引库的队列中提取spu的id,通过feign调用商品微服务得到sku的列表,并且通过调用elasticsearch的高级restAPI 将sku列表导入到索引库。
在这里插入图片描述

二、 搭建环境和代码实现

主要应用技术有:Feign远程调用, 消息队列-RabbitMQ ,分布式搜索引擎-ElasticSearch,Eureka,Canal
实现步骤:

2.1 发送消息到mq

(1)在rabbitmq后台创建交换器goods_up_exchange(类型为fanout),创建队列search_add_queue绑定交换器goods_up_exchange,更新rabbitmq配置类

首先定义交换机名称GOODS_UP_EXCHANGE,定义一个队列用来接收商品消息SEARCH_ADD_QUEUE,声明队列,声明交换机,绑定队列与交换机。

@Configuration
public class RabbitMQConfig {//交换机名称private static final String GOODS_UP_EXCHANGE="goods_up_exchange";//定义队列名称private static final String SEARCH_ADD_QUEUE="search_add_queue";//定义队列名称public static final String AD_UPDATE_QUEUE="ad_update_queue";//声明队列@Beanpublic Queue queue(){return new Queue(AD_UPDATE_QUEUE);}//声明队列@Bean(AD_UPDATE_QUEUE)public Queue AD_UPDATE_QUEUE(){return new Queue(AD_UPDATE_QUEUE);}//声明交换机@Bean(GOODS_UP_EXCHANGE)public Exchange GOODS_UP_EXCHANGE(){return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();}//队列绑定交换机@Beanpublic Binding AD_UPDATE_QUEUE_BINDING(@Qualifier(AD_UPDATE_QUEUE) Queue queue,@Qualifier(GOODS_UP_EXCHANGE) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("").noargs();}
}

(2)数据监控微服务新增SpuListener,添加以下代码:

@CanalEventListener
public class SpuListener {@Autowiredprivate RabbitTemplate rabbitTemplate;@ListenPoint(schema = "changgou_goods",table = "tb_spu")public void goodsUp(CanalEntry.EventType eventType,CanalEntry.RowData rowData){//获取改变之前的数据并将这部分数据转换为mapMap<String,String> oldData=new HashMap<>();rowData.getBeforeColumnsList().forEach((c)->oldData.put(c.getName(),c.getValue()));//获取改变之后的数据并这部分数据转换为mapMap<String,String> newData = new HashMap<>();rowData.getAfterColumnsList().forEach((c)->newData.put(c.getName(),c.getValue()));//获取最新上架的商品 0->1if ("0".equals(oldData.get("is_marketable")) && "1".equals(newData.get("is_marketable"))){//将商品的spuid发送到mqrabbitTemplate.convertAndSend(RabbitMQConfig.GOODS_UP_EXCHANGE,"",newData.get("id"));}}
}

2.2 ES索引库环境准备

Linux虚拟机镜像中包含elasticsearch的相关docker镜像

2.3 创建索引结构

新建shangcheng_service_search_api模块,并添加索引库实体类

(1) 添加依赖

<dependencies><dependency><groupId>com.shangcheng</groupId><artifactId>shangcheng_common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency>
</dependencies>

(2) 创建实体类

@Document(indexName = "skuinfo", type = "docs")
public class SkuInfo implements Serializable {//商品id,同时也是商品编号@Id@Field(index = true, store = true, type = FieldType.Keyword)private Long id;//SKU名称@Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")private String name;//商品价格,单位为:元@Field(index = true, store = true, type = FieldType.Double)private Long price;//库存数量@Field(index = true, store = true, type = FieldType.Integer)private Integer num;//商品图片@Field(index = false, store = true, type = FieldType.Text)private String image;//商品状态,1-正常,2-下架,3-删除@Field(index = true, store = true, type = FieldType.Keyword)private String status;//创建时间private Date createTime;//更新时间private Date updateTime;//是否默认@Field(index = true, store = true, type = FieldType.Keyword)private String isDefault;//SPUID@Field(index = true, store = true, type = FieldType.Long)private Long spuId;//类目ID@Field(index = true, store = true, type = FieldType.Long)private Long categoryId;//类目名称@Field(index = true, store = true, type = FieldType.Keyword)private String categoryName;//品牌名称@Field(index = true, store = true, type = FieldType.Keyword)private String brandName;//规格private String spec;//规格参数private Map<String, Object> specMap;//getter & setter略
}

2.4 搜索微服务搭建

(1)创建shangcheng_service_search模块,pom.xml引入依赖

<dependencies><dependency><groupId>com.shangcheng</groupId><artifactId>shangcheng_common</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>com.shangcheng</groupId><artifactId>shangcheng_service_goods_api</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>com.shangcheng</groupId><artifactId>shangcheng_service_search_api</artifactId><version>1.0-SNAPSHOT</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
</dependencies>

(2)shagncheng_service_search的application.yml

server:port: 9009
spring:application:name: searchrabbitmq:host: 192.168.200.128redis:host: 192.168.200.128main:allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册data:elasticsearch:cluster-name: elasticsearchcluster-nodes: 192.168.200.128:9300thymeleaf:cache: false
eureka:client:service-url:defaultZone: http://127.0.0.1:6868/eurekainstance:prefer-ip-address: true
feign:hystrix:enabled: trueclient:config:default:   #配置全局的feign的调用超时时间  如果 有指定的服务配置 默认的配置不会生效connectTimeout: 600000 # 指定的是 消费者 连接服务提供者的连接超时时间 是否能连接  单位是毫秒readTimeout: 600000  # 指定的是调用服务提供者的 服务 的超时时间()  单位是毫秒
#hystrix 配置
hystrix:command:default:execution:timeout:#如果enabled设置为false,则请求超时交给ribbon控制enabled: falseisolation:strategy: SEMAPHORE

(3)创建com.shangcheng包,包下创建SearchApplication

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients(basePackages = {"com.shangcheng.goods.feign"})
public class SearchApplication {public static void main(String[] args) {SpringApplication.run(SearchApplication.class,args);}
}

(4) 将rabbitmq配置类放入该模块下

package com.shangcheng.search.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {//定义交换机名称public static final String GOODS_UP_EXCHANGE="goods_up_exchange";public static final String GOODS_DOWN_EXCHANGE="goods_down_exchange";//定义队列名称public static final String AD_UPDATE_QUEUE="ad_update_queue";public static final String SEARCH_ADD_QUEUE="search_add_queue";public static final String SEARCH_DEL_QUEUE="search_del_queue";//声明队列@Beanpublic Queue queue(){return new Queue(AD_UPDATE_QUEUE);}@Bean(SEARCH_ADD_QUEUE)public Queue SEARCH_ADD_QUEUE(){return new Queue(SEARCH_ADD_QUEUE);}@Bean(SEARCH_DEL_QUEUE)public Queue SEARCH_DEL_QUEUE(){return new Queue(SEARCH_DEL_QUEUE);}//声明交换机@Bean(GOODS_UP_EXCHANGE)public Exchange GOODS_UP_EXCHANGE(){return ExchangeBuilder.fanoutExchange(GOODS_UP_EXCHANGE).durable(true).build();}@Bean(GOODS_DOWN_EXCHANGE)public Exchange GOODS_DOWN_EXCHANGE(){return ExchangeBuilder.fanoutExchange(GOODS_DOWN_EXCHANGE).durable(true).build();}
​
​//队列与交换机的绑定@Beanpublic Binding GOODS_UP_EXCHANGE_BINDING(@Qualifier(SEARCH_ADD_QUEUE)Queue queue,@Qualifier(GOODS_UP_EXCHANGE)Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("").noargs();}@Beanpublic Binding GOODS_DOWN_EXCHANGE_BINDING(@Qualifier(SEARCH_DEL_QUEUE)Queue queue,@Qualifier(GOODS_DOWN_EXCHANGE)Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("").noargs();}}

2.5 商品服务查询商品信息的实现

(1) SkuController新增方法

@GetMapping("/spu/{spuId}")public List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId){Map<String,Object> searchMap = new HashMap<>();if (!"all".equals(spuId)){searchMap.put("spuId",spuId);}searchMap.put("status","1");List<Sku> skuList = skuService.findList(searchMap);return skuList;}

(2) shangcheng_service_goods_api新增common依赖

<dependencies><dependency><groupId>com.changgou</groupId><artifactId>changgou_common</artifactId><version>1.0-SNAPSHOT</version></dependency>
</dependencies>

(3) 定义skuFegin接口

@FeignClient(name="goods")
@RequestMapping("/sku")
public interface SkuFeign {/**** 多条件搜索品牌数据* @param spuId* @return*/@GetMapping("/sku/spu/{spuId}")public List<Sku> findSkuListBySpuId(@PathVariable("spuId") String spuId);
}

2.6 搜索微服务批量导入数据逻辑

(1) 创建 com.shangcheng.search.dao包,并新增ESManagerMapper接口

public interface ESManagerMapper extends ElasticsearchRepository<SkuInfo,Long> {
}

(2)创建 com.shangcheng.search.service包,包下创建接口EsManagerService

public interface ESManagerService {/*** 创建索引库结构*/void createMappingAndIndex();/*** 导入全部数据到ES索引库*/void importAll();/*** 根据spuid导入数据到ES索引库* @param spuId 商品id*/void importDataBySpuId(String spuId);}

(2)创建com.shangcheng.search.service.impl包,包下创建服务实现类

@Service
public class ESManagerServiceImpl implements ESManagerService {@Autowiredprivate ElasticsearchTemplate elasticsearchTemplate;@Autowiredprivate SkuFeign skuFeign;@Autowiredprivate ESManagerMapper esManagerMapper;//创建索引库结构@Overridepublic void createMappingAndIndex() {//创建索引elasticsearchTemplate.createIndex(SkuInfo.class);//创建映射elasticsearchTemplate.putMapping(SkuInfo.class);}//导入全部sku集合进入到索引库@Overridepublic void importAll() {//查询sku集合List<Sku> skuList = skuFeign.findSkuListBySpuId("all");if (skuList == null || skuList.size()<=0){throw new RuntimeException("当前没有数据被查询到,无法导入索引库");}//skulist转换为jsonString jsonSkuList = JSON.toJSONString(skuList);//将json转换为skuinfoList<SkuInfo> skuInfoList = JSON.parseArray(jsonSkuList, SkuInfo.class);for (SkuInfo skuInfo : skuInfoList) {//将规格信息转换为mapMap specMap = JSON.parseObject(skuInfo.getSpec(), Map.class);skuInfo.setSpecMap(specMap);}//导入索引库esManagerMapper.saveAll(skuInfoList);}//根据spuid查询skuList,添加到索引库@Overridepublic void importDataBySpuId(String spuId) {List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);if (skuList == null || skuList.size()<=0){throw new RuntimeException("当前没有数据被查询到,无法导入索引库");}//将集合转换为jsonString jsonSkuList = JSON.toJSONString(skuList);List<SkuInfo> skuInfoList = JSON.parseArray(jsonSkuList, SkuInfo.class);for (SkuInfo skuInfo : skuInfoList) {//将规格信息进行转换Map specMap = JSON.parseObject(skuInfo.getSpec(), Map.class);skuInfo.setSpecMap(specMap);}//添加索引库esManagerMapper.saveAll(skuInfoList);}}

(3) 创建com.shangcheng.search.controller.定义ESManagerController

@RestController
@RequestMapping("/manager")
public class ESManagerController {@Autowiredprivate ESManagerService esManagerService;//创建索引库结构@GetMapping("/create")public Result create(){esManagerService.createMappingAndIndex();return new Result(true, StatusCode.OK,"创建索引库结构成功");}//导入全部数据@GetMapping("/importAll")public Result importAll(){esManagerService.importAll();return new Result(true, StatusCode.OK,"导入全部数据成功");}
}

2.7 接收mq消息执行导入

shangcheng_service_search工程创建com.shangcheng.search.listener包,包下创建类

@Component
public class GoodsUpListener {@Autowiredprivate ESManagerService esManagerService;@RabbitListener(queues = RabbitMQConfig.SEARCH_ADD_QUEUE)public void receiveMessage(String spuId){System.out.println("接收到的消息为:   "+spuId);//查询skulist,并导入到索引库esManagerService.importDataBySpuId(spuId);}
}

2.8 测试

(1)启动环境 eureka 、elasticsearch 、canal服务端、canal数据监控微服务、rabbitmq

(2)启动商品微服务、搜索微服务

(3)修改tb_spu某记录的is_marketable值为1,观察控制台输出,启动kibana查询记录是否导入成功

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/65496.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Unity游戏环境交互系统

概述 交互功能使用同一个按钮或按钮列表&#xff0c;在不同情况下显示不同的内容&#xff0c;按下执行不同的操作。 按选项个数分类 环境交互系统可分为两种&#xff0c;单选项交互&#xff0c;一般使用射线检测&#xff1b;多选项交互&#xff0c;一般使用范围检测。第一人…

前端正在被“锈”化

jeff Atwood 在 2007 年说&#xff1a;"any application that can be writen in JavaScript , willeventually be written in JavaScript"&#xff0c;翻译过来就是&#xff1a;“任何可以使用 JavaScript 来编写的应用&#xff0c;并最终也会由 JavaScript 编写”&a…

穷举vs暴搜vs深搜vs回溯vs剪枝_全排列_子集

46. 全排列 递归解决&#xff1a;一开始选一个数&#xff0c;递归进入下一层再选一个新的数&#xff0c;直到到最后一个数。反会上一层遍历其它数。 每次递归到叶子节点就找到了一种组合&#xff0c;思路有了具体怎么实现&#xff1f; 1.怎么记录每条路径&#xff1f; 定义一个…

【openGauss】正则表达式次数符号“{}“在ORACLE和openGauss中的差异

一、前言 正则作为一种常用的字符串处理方式&#xff0c;在各种开发语言&#xff0c;甚至数据库中&#xff0c;都有自带的正则函数。但是正则函数有很多标准&#xff0c;不同标准对正则表达式的解析方式不一样&#xff0c;本次在迁移一个ORACLE数据库到openGauss时发现了一个关…

【代码随想录|完全背包问题】

518.零钱兑换|| 题目链接&#xff1a;518. 零钱兑换 II - 力扣&#xff08;LeetCode&#xff09; 这里求的是组合数&#xff0c;就是不强调元素排列的顺序&#xff0c;211和121是同一个数那种&#xff0c;要先遍历物品&#xff0c;这样的话我算出来的每个值才是按顺序121&…

go语言的成神之路-筑基篇-gin常用功能

第一节-gin参数绑定 目录 第一节-?gin参数绑定 ShouldBind简要概述 功能&#xff1a; 使用场景&#xff1a; 可能的错误&#xff1a; 实例代码 效果展示 第二节-gin文件上传 选择要上传的文件 选择要上传的文件。 效果展示? 代码部分 第三节-gin请求重定向 第…

Qt 12.28 day3

作业&#xff1a; 1】 思维导图 2】 在登录界面的登录取消按钮进行以下设置&#xff1a; 使用手动连接&#xff0c;将登录框中的取消按钮使用qt4版本的连接到自定义的槽函数中&#xff0c;在自定义的槽函数中调用关闭函数 将登录按钮使用qt5版本的连接到自定义的槽函数中&a…

mybatis-plus 用法总结

MyBatis-Plus&#xff08;简称 MP&#xff09;是 MyBatis 的增强工具&#xff0c;旨在简化开发者的 CRUD 操作。它在 MyBatis 的基础上提供了更多的功能和便利性&#xff0c;如代码生成器、分页插件、性能分析插件等&#xff0c;使开发者能够更高效地进行数据库操作。MyBatis-P…

Rust: enum 和 i32 的区别和互换

在Rust编程语言中&#xff0c;enum&#xff08;枚举&#xff09;和i32是两种不同类型的数据结构&#xff0c;它们各自有不同的用途和特性。 i32 i32是一个32位的有符号整数类型。它用于存储整数值&#xff0c;范围从-2,147,483,648到2,147,483,647。i32是Rust中的基本数据类型…

迁移学习 详解及应用示例

简介&#xff1a; 迁移学习是一种机器学习技术&#xff0c;其核心思想是利用在一个任务上已经学到的知识&#xff08;源任务&#xff1a;任务已经有一个训练好的模型&#xff0c;然后我们将这个模型的某些部分或知识迁移到一个新的但相关的“目标任务”上。&#xff09;来帮助解…

【ETCD】【实操篇(十五)】etcd集群成员管理:如何高效地添加、删除与更新节点

etcd 是一个高可用的分布式键值存储&#xff0c;广泛应用于存储服务发现、配置管理等场景。为了确保集群的稳定性和可扩展性&#xff0c;管理成员节点的添加、删除和更新变得尤为重要。本文将指导您如何在etcd集群中处理成员管理&#xff0c;帮助您高效地维护集群节点。 目录 …

前端 学习

vue结构 package.json 作用&#xff1a;记录项目的元信息&#xff0c;包括依赖包、脚本命令、项目名称、版本号等。 常见字段&#xff1a; dependencies&#xff1a;运行时依赖的 npm 包。 devDependencies&#xff1a;开发时使用的依赖包。 scripts&#xff1a;定义 npm 脚本…

矩阵的因子分解1-奇异值分解

文章目录 矩阵的因子分解1-奇异值分解求法归纳例1. 对矩阵 A ( 0 1 − 1 0 0 2 1 0 ) A \begin{pmatrix} 0 & 1 \\ -1 & 0 \\ 0 & 2 \\ 1 & 0 \end{pmatrix} A ​0−101​1020​ ​ 进行奇异值分解1. 计算 A H A A^H A AHA 的特征值和特征向量2. 将奇异值按…

网易企业邮箱登陆:保障数据安全

网易企业邮箱是一款为企业提供安全可靠的电子邮件服务的工具。通过网易企业邮箱&#xff0c;企业可以实现员工之间的高效沟通和信息共享&#xff0c;同时保障数据的安全性。 企业邮箱的安全性是企业信息保护的重要组成部分。网易企业邮箱采用了多层加密技术&#xff0c;确保邮件…

王佩丰24节Excel学习笔记——第二十二讲:制作甘特图与动态甘特图

【以 Excel2010 系列学习&#xff0c;用 Office LTSC 专业增强版 2021 实践】 【本章技巧】 插入图表&#xff0c;针对每一个图表上的元素&#xff0c;都可以选中选右键进行修改数据&#xff1b;本章中的向两端延伸&#xff0c;设置数据的原理&#xff1b;数据格式的显示方式&…

LeetCode 1705.吃苹果的最大数目:贪心(优先队列) - 清晰题解

【LetMeFly】1705.吃苹果的最大数目&#xff1a;贪心(优先队列) - 清晰题解 力扣题目链接&#xff1a;https://leetcode.cn/problems/maximum-number-of-eaten-apples/ 有一棵特殊的苹果树&#xff0c;一连 n 天&#xff0c;每天都可以长出若干个苹果。在第 i 天&#xff0c;…

Docmatix:突破性的文档视觉问答数据集

Docmatix&#xff1a;突破性的文档视觉问答数据集 1. 数据集概述 1.1 规模与创新 数据规模&#xff1a; 240万张图像950万个问答对来自130万份PDF文档较现有数据集扩大240倍 应用领域&#xff1a;文档视觉问答(DocVQA)可访问性&#xff1a;通过HuggingFace平台开放获取 2. …

Doris使用注意点

自己学习过程中整理,非官方 dws等最后用于查询的表可以考虑使用row存储加快查询,即用空间换时间duplicate key的选择要考虑最常查询使用适当使用bloomfilter 加速查询适当使用aggregate 模式降低max,avg,min之类的计算并加快查询,比如加速明细和汇总的一体化查询使用ALTER…

Kubernetes之NodeSelector与NodeName实战

目录 目标 版本 官网 概述 实战 NodeName实战 NodeSelector实战 目标 通过配置NodeSelector与NodeName实现Pod运行&#xff08;或优先运行&#xff09;在我们期望的节点之上。了解这两种实现方法的区别。 版本 Kubernets v1.25.0 官网 将Pod分配给节点https://kubernet…

【docker系列】打造个人私有网盘zfile

1. 介绍 是一个适用于个人的在线网盘(列目录)程序&#xff0c;可以将你各个存储类型的存储源&#xff0c;统一到一个网页中查看、预览、维护&#xff0c;再也不用去登录各种各样的网页登录后管理文件 2. 需要环境 2.1 硬件需求 CPU&#xff1a;至少1核 内存&#xff1a;推荐…