ElasticSearch: 项目实战 (1)
需求:
新增文章审核通过后同步数据到es索引库
1、文章服务中添加消息发送方法
在service层文章新增成功后,将数据通过kafka消息同步发送到搜索服务
@Autowiredprivate KafkaTemplate<String,String> kafkaTemplate;/*** 送消息,创建索引* @param apArticle* @param content* @param path*/private void createArticleESIndex(ApArticle apArticle, String content, String path) {SearchArticleVo vo = new SearchArticleVo();BeanUtils.copyProperties(apArticle,vo);vo.setContent(content);vo.setStaticUrl(path);kafkaTemplate.send(ArticleConstants.ARTICLE_ES_SYNC_TOPIC, JSON.toJSONString(vo));}
2、搜索服务中实现消息接收
创建SyncArticleListener普通类来接收文章服务发送过来的数据
package com.heima.search.listener;import com.alibaba.fastjson.JSON;
import com.heima.common.constants.ArticleConstants;
import com.heima.model.common.search.SearchArticleVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@Slf4j
public class SyncArticleListener {@Autowiredprivate RestHighLevelClient restHighLevelClient;@KafkaListener(topics = ArticleConstants.ARTICLE_ES_SYNC_TOPIC)public void onMessage(String message){if(StringUtils.isNotBlank(message)){log.info("SyncArticleListener,message={}",message);/*添加数据到索引库*/SearchArticleVo searchArticleVo = JSON.parseObject(message, SearchArticleVo.class);IndexRequest indexRequest = new IndexRequest("app_info_article");indexRequest.id(searchArticleVo.getId().toString());indexRequest.source(message, XContentType.JSON);try {restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {e.printStackTrace();log.error("sync es error={}",e);}}}
}
3、分别配置文章服务和搜索服务的nacos
spring:kafka:bootstrap-servers: 192.168.200.130:9092consumer:group-id: ${spring.application.name}key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer