微服务技术栈SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式(五):分布式搜索 ES-下

文章目录

  • 一、数据聚合
    • 1.1 聚合种类
    • 1.2 DSL实现聚合
    • 1.3 RestAPI实现聚合
    • 1.4 演示:多条件聚合
  • 二、自动补全
    • 2.1 拼音分词器
    • 2.2 自定义分词器
    • 2.3 DSL自动补全查询
    • 2.5 实现酒店搜索框自动补全
      • 2.5.1 修改酒店索引库数据结构
      • 2.5.2 RestAPI实现自动补全查询
      • 2.5.3 实战
  • 三、数据同步
    • 3.1 实现数据同步的方法
    • 3.2 使用消息队列MQ实现数据同步
      • 3.2.1 导入hotel-admin
      • 3.2.2 声明交换机、队列、routingkey
  • 四、集群
    • 4.1 搭建ES集群
    • 4.2 集群职责和脑裂问题
    • 4.3 集群故障转移
    • 4.4 集群分布式存储与查询


一、数据聚合

1.1 聚合种类

聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:

  1. 桶(Bucket)聚合:用来对文档做分组
    TermAggregation:按照文档字段值分组
    Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
  2. 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
    Avg:求平均值
    Max:求最大值
    Min:求最小值
    Stats:同时求max、min、avg、sum等
  3. 管道(pipeline)聚合:其它聚合的结果为基础做聚合

注意:参与聚合的字段类型必须是:keyword、数值、日期、布尔,一定不能是可分词的类型。

1.2 DSL实现聚合

# 使用DSL实现聚合
# 1.bucket桶聚合 + 限定聚合范围
# 例:根据酒店品牌名做聚合(并且限定价格不高于200的),并按照结果的升序排序,显示前5个品牌
GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}},"size": 0, //设置size为0,结果中不包含文档,只包含聚合结果"aggs": {  // 定义聚合"brandAgg": { // 定义聚合名"terms": {  // 聚合类型,按照品牌名聚合,所以选择term"field": "brand", // 参与聚合字段"order": {"_count": "asc"  //指定排序规则 升序}, "size": 20 //希望获得聚合结果数}}}
}# 2.Metrics聚合
# 例:获得每个品牌的用户评分的min、max、avg,并且按照avg排序(降序)GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"score_stats.avg": "desc"}},"aggs": { //子聚合"score_stats": { //子聚合名"stats": {  //聚合类型,stats可以计算min、max、avg等"field": "score"  //聚合字段}}}}}
}

1.3 RestAPI实现聚合

在这里插入图片描述
在这里插入图片描述

    /*** 桶bucket聚合*/@Testvoid testAgg() throws IOException {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.sizerequest.source().size(0);// 2.2.聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析结果Aggregations aggregations = response.getAggregations();// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get("brandAgg");// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历for (Terms.Bucket bucket : buckets) {String brandName = bucket.getKeyAsString();System.out.println("brandName = " + brandName);long docCount = bucket.getDocCount();System.out.println("docCount = " + docCount);}}

1.4 演示:多条件聚合

在这里插入图片描述


@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Overridepublic Map<String, List<String>> filters() {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.sizerequest.source().size(0);// 2.2.聚合buildAggregation(request);// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根据品牌名称,获取聚合结果List<String> brandList = getAggByName(aggregations, "brandAgg");// 放入mapresult.put("品牌",brandList);// 4.2.根据城市名称,获取聚合结果List<String> cityList = getAggByName(aggregations, "cityAgg");// 放入mapresult.put("城市",cityList);// 4.3.根据星级名称,获取聚合结果List<String> starList = getAggByName(aggregations, "starAgg");// 放入mapresult.put("星级",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}public List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get(aggName);// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();brandList.add(key);}return brandList;}public void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}
}

测试

@SpringBootTest
public class HotelDemoApplicationTest {@Autowiredprivate IHotelService hotelService;@Testvoid contextLoads(){Map<String, List<String>> filters = hotelService.filters();System.out.println(filters);}
}

结果:
在这里插入图片描述

二、自动补全

自动补全如下图所示:
在这里插入图片描述

2.1 拼音分词器

要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin
安装方式与IK分词器一样,分三步:

  1. 解压
  2. 上传到虚拟机中,elasticsearch的plugin目录
  3. 重启elasticsearch
  4. 测试
    在这里插入图片描述

2.2 自定义分词器

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

演示:

# 自定义拼音分词器
PUT /test
{"settings": {"analysis": {"analyzer": { "my_analyzer": { "tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": { "type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}POST /test/_doc/1
{"id": 1,"name": "狮子"
}
POST /test/_doc/2
{"id": 2,"name": "虱子"
}GET /test/_search
{"query": {"match": {"name": "掉入狮子笼咋办"}}
}

在这里插入图片描述

注意:拼音分词器通常在创建索引库时使用,搜索时使用普通分词器即可

2.3 DSL自动补全查询

在这里插入图片描述
查询语法如下

// 自动补全查询
POST /test/_search
{"suggest": {"title_suggest": {  // 自定义补全查询名称"text": "s", // 关键字"completion": {"field": "title", // 补全字段"skip_duplicates": true, // 跳过重复的"size": 10 // 获取前10条结果}}}
}

演示:

# 2.自动补全
# 2.1 创建一个 自动补全的索引库 属性有title
DELETE /test
PUT test
{"mappings": {"properties": {"title":{"type": "completion"}}}
}
# 2.2 插入示例数据
POST test/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{"title": ["SK-II", "PITERA"]
}
POST test/_doc
{"title": ["Nintendo", "switch"]
}# 2.3 自动补全查询
# 例:输入一个关键字s,看自动补全的结果
# 结果:"SK-II""Sony""switch"
POST /test/_search
{"suggest": {"title_suggest": {"text": "s", "completion": {"field": "title","skip_duplicates": true, "size": 10 }}}
}

结果:
在这里插入图片描述

2.5 实现酒店搜索框自动补全

2.5.1 修改酒店索引库数据结构

在这里插入图片描述

1.修改索引库结构

# 酒店数据索引库
GET /hotel/_mapping
DELETE /hotel
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}# 自动补全查询
GET /hotel/_search
{"suggest": {"mySuggestion": {"text": "shang","completion": {"field": "suggestion","skip_duplicates": true, "size": 10 }}}
}

2.修改HotelDoc

@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;private String brand;private String city;private String starName;private String business;private String location;private String pic;private Object distance; //新加加字段"距离":酒店距你选择位置的距离private Boolean isAD; //新加加字段"标记":给你置顶的酒店添加一个标记private List<String> suggestion;//新加该字段用于自动补全public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();// 自动补全字段的处理this.suggestion = new ArrayList<>();// 添加品牌、城市this.suggestion.add(this.brand);this.suggestion.add(this.city);// 判断商圈是否包含/if (this.business.contains("/")) {// business有多个值,需要切割String[] arr = this.business.split("/");// business的每个值都要加入到suggestion中Collections.addAll(this.suggestion, arr);}else{this.suggestion.add(this.business);}}
}

3.【重新导入数据,不演示,参见之前的批量导入文档功能】查询结果
在这里插入图片描述

2.5.2 RestAPI实现自动补全查询

在这里插入图片描述

在这里插入图片描述

    /*** 自动补全查询*/@Testvoid testSuggest() throws IOException {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix("s")));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsfor (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();System.out.println(str);}}

2.5.3 实战

在这里插入图片描述
Mapper层

@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@PostMapping("list")public PageResult search(@RequestBody RequestParams params) {return hotelService.search(params);}@PostMapping("filters")public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {return hotelService.filters(params);}@GetMapping("suggestion")public List<String> getSuggestion(@RequestParam("key") String key) {return hotelService.getSuggestion(key);}
}

Service层

@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 自动补全查询*/@Overridepublic List<String> getSuggestion(String key)  {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix(key)));// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsList<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();result.add(str);}return result;} catch (IOException e) {throw new RuntimeException(e);}}
}

结果演示
在这里插入图片描述

三、数据同步

3.1 实现数据同步的方法

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.2 使用消息队列MQ实现数据同步

在这里插入图片描述

3.2.1 导入hotel-admin

3.2.2 声明交换机、队列、routingkey

在这里插入图片描述

由于增和改都相当于插入,所以共用一个队列;删除占用一个队列。

一、对消费者hotel-demo的操作

  1. 引入amqp依赖和配置rabbitmq的yml文件
		<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
server:port: 8089
spring:datasource:url: jdbc:mysql://mysql:3306/heima?useSSL=falseusername: rootpassword: 123driver-class-name: com.mysql.jdbc.Driverrabbitmq:host: 192.168.150.101port: 5672username: itcastpassword: 123321virtual-host: /
logging:level:cn.itcast: debugpattern:dateformat: HH:mm:ss:SSS
mybatis-plus:configuration:map-underscore-to-camel-case: truetype-aliases-package: cn.itcast.hotel.pojo
  1. 定义mq的一些常量
public class HotelMqConstants {// 交换机名称public static final String EXCHANGE_NAME = "hotel.topic";// 新增修改队列public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";// 删除队列public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";// 新增修改的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";// 删除的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}
  1. 声明交换机和队列,并监听MQ消息【注解方式】
@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.INSERT_KEY))public void listenHotelInsert(Long hotelId){// 新增hotelService.saveById(hotelId);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.DELETE_KEY))public void listenHotelDelete(Long hotelId){// 删除hotelService.deleteById(hotelId);}
}

【bean方式】

@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}@Beanpublic Queue insertQueue(){return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);}@Beanpublic Queue deleteQueue(){return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);}@Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}@Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}
}
  1. RestAPI实现删改
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 搜索框查询*/@Overridepublic PageResult search(RequestParams params) {try {// 1.准备RequestSearchRequest request = new SearchRequest("hotel");// 2.准备请求参数// 2.1.多条件查询和过滤buildBasicQuery(params, request);// 2.2.分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);/*** 2.3.距离排序*/String location = params.getLocation();if (StringUtils.isNotBlank(location)) {// 不为空则查询request.source().sort(SortBuilders.geoDistanceSort("location", new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}// 3.发送请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException("搜索数据失败", e);}}/*** 复合查询*/private void buildBasicQuery(RequestParams params, SearchRequest request) {// 1.准备Boolean复合查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();/*** 1.查询关键字* must参与 算分*/// 1.1.关键字搜索,match查询,放到must中String key = params.getKey();if (StringUtils.isNotBlank(key)) {// 不为空,根据关键字查询boolQuery.must(QueryBuilders.matchQuery("all", key));} else {// 为空,查询所有boolQuery.must(QueryBuilders.matchAllQuery());}/*** 2.条件过滤:多条件复合查询* 根据 “品牌 城市 星级 价格范围” 过滤数据* filter不参与 算分*/// 1.2.品牌String brand = params.getBrand();if (StringUtils.isNotBlank(brand)) { // 不为空则查询boolQuery.filter(QueryBuilders.termQuery("brand", brand));}// 1.3.城市String city = params.getCity();if (StringUtils.isNotBlank(city)) {// 不为空则查询boolQuery.filter(QueryBuilders.termQuery("city", city));}// 1.4.星级String starName = params.getStarName();if (StringUtils.isNotBlank(starName)) {// 不为空则查询boolQuery.filter(QueryBuilders.termQuery("starName", starName));}// 1.5.价格范围Integer minPrice = params.getMinPrice();Integer maxPrice = params.getMaxPrice();if (minPrice != null && maxPrice != null) {// 不为空则查询maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice;boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));}/*** 3.算分函数查询* 置顶功能:给你置顶的酒店添加一个标记,并按其算分*/FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(boolQuery, // 原始查询,boolQuerynew FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.termQuery("isAD", true), // 过滤条件ScoreFunctionBuilders.weightFactorFunction(10) // 算分函数)});/*** 4.设置查询条件*/request.source().query(functionScoreQuery);}/*** 结果解析*/private PageResult handleResponse(SearchResponse response) {SearchHits searchHits = response.getHits();// 4.1.总条数long total = searchHits.getTotalHits().value;// 4.2.获取文档数组SearchHit[] hits = searchHits.getHits();// 4.3.遍历List<HotelDoc> hotels = new ArrayList<>(hits.length);for (SearchHit hit : hits) {// 4.4.获取sourceString json = hit.getSourceAsString();// 4.5.反序列化,非高亮的HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);// 4.6.处理高亮结果// 1)获取高亮mapMap<String, HighlightField> map = hit.getHighlightFields();if (map != null && !map.isEmpty()) {// 2)根据字段名,获取高亮结果HighlightField highlightField = map.get("name");if (highlightField != null) {// 3)获取高亮结果字符串数组中的第1个元素String hName = highlightField.getFragments()[0].toString();// 4)把高亮结果放到HotelDoc中hotelDoc.setName(hName);}}// 4.8.排序信息Object[] sortValues = hit.getSortValues(); // 获取排序结果if (sortValues.length > 0) {/*** 由于该程序是根据距离[酒店距你选择位置的距离]进行排序,所以排序结果为距离*/hotelDoc.setDistance(sortValues[0]);}// 4.9.放入集合hotels.add(hotelDoc);}return new PageResult(total, hotels);}/*** 多条件聚合*/@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.query查询信息buildBasicQuery(params, request);// 2.2.sizerequest.source().size(0);// 2.3.聚合buildAggregation(request);// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根据品牌名称,获取聚合结果List<String> brandList = getAggByName(aggregations, "brandAgg");// 放入mapresult.put("品牌",brandList);// 4.2.根据城市名称,获取聚合结果List<String> cityList = getAggByName(aggregations, "cityAgg");// 放入mapresult.put("城市",cityList);// 4.3.根据星级名称,获取聚合结果List<String> starList = getAggByName(aggregations, "starAgg");// 放入mapresult.put("星级",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}public List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get(aggName);// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();brandList.add(key);}return brandList;}public void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}/*** 自动补全查询*/@Overridepublic List<String> getSuggestion(String key)  {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix(key)));// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsList<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();result.add(str);}return result;} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long hotelId) {try {// 1.创建requestDeleteRequest request = new DeleteRequest("hotel", hotelId.toString());// 2.发送请求restHighLevelClient.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("删除酒店数据失败", e);}}@Overridepublic void saveById(Long hotelId) {try {// 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)Hotel hotel = getById(hotelId);// 转换HotelDoc hotelDoc = new HotelDoc(hotel);// 1.创建RequestIndexRequest request = new IndexRequest("hotel").id(hotelId.toString());// 2.准备参数request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求restHighLevelClient.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("新增酒店数据失败", e);}}
}

二、对发送者hotel-admin的操作

  1. 引入amqp依赖和配置rabbitmq的yml文件【同上】
  2. 定义mq的一些常量【同上】
  3. 当发送者对mysql数据库改动时,发送消息给MQ
@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;// 注入发送消息的api@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 根据id查询*/@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id){return hotelService.getById(id);}/*** 查询当前页内容*/@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size){Page<Hotel> result = hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}/*** 新增,并发送给mq消息*/@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){// 新增酒店hotelService.save(hotel);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 修改,并发送给mq消息*/@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 删除,并发送给mq消息*/@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);}
}

四、集群

单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
>> 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
>> 单点故障问题:将分片数据在不同节点备份(replica )

4.1 搭建ES集群

我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间

  1. 创建es集群
    首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:es01:image: elasticsearch:7.12.1container_name: es01environment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data01:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge

es运行需要修改一些linux系统权限,修改/etc/sysctl.conf文件

vi /etc/sysctl.conf

添加下面的内容:

vm.max_map_count=262144

然后执行命令,让配置生效:

sysctl -p

通过docker-compose启动集群:

docker-compose up -d
  1. 集群状态监控

kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。

这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro

课前资料已经提供了安装包:

解压即可使用,非常方便。

解压好的目录如下:

进入对应的bin目录:

双击其中的cerebro.bat文件即可启动服务。

访问http://localhost:9000 即可进入管理界面:

在这里插入图片描述

输入你的elasticsearch的任意节点的地址和端口,点击connect即可:

绿色的条,代表集群处于绿色(健康状态)。
在这里插入图片描述

  1. 创建索引库

1)利用kibana的DevTools创建索引库

在DevTools中输入指令:

PUT /itcast
{"settings": {"number_of_shards": 3, // 分片数量"number_of_replicas": 1 // 副本数量},"mappings": {"properties": {// mapping映射定义 ...}}
}

2)利用cerebro创建索引库

利用cerebro还可以创建索引库:
在这里插入图片描述

填写索引库信息:

点击右下角的create按钮:
在这里插入图片描述

  1. 查看分片效果

回到首页,即可查看索引库分片效果:
在这里插入图片描述

4.2 集群职责和脑裂问题

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

4.3 集群故障转移

在这里插入图片描述

4.4 集群分布式存储与查询

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/754293.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

CSS案例-2.简单版侧边栏练习

效果 知识点 标签显示模式 块级元素 block-level 常见元素:<h1>~<h6>、<p>、<div>、<ul>、<ol>、<li>等。 特点: 独占一行长度、宽度、边距都可以控制宽度默认是容器(父级宽度)的100%是一个容器及盒子,里面可以放行内或者…

Docker部署TeamCity来完成内部CI、CD流程

使用TeamCity来完成内部CI、CD流程 本篇教程主要讲解基于容器服务搭建TeamCity服务&#xff0c;并且完成内部项目的CI流程配置。至于完整的DevOps&#xff0c;我们后续独立探讨。 一个简单的CI、CD流程 以下分享一个简单的CI、CD流程&#xff08;仅供参考&#xff09;&#…

SCI一区 | Matlab实现RIME-TCN-BiGRU-Attention霜冰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测

SCI一区 | Matlab实现RIME-TCN-BiGRU-Attention霜冰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测 目录 SCI一区 | Matlab实现RIME-TCN-BiGRU-Attention霜冰算法优化时间卷积双向门控循环单元融合注意力机制多变量时间序列预测预测效果基本介绍模型描述程…

DEYOv2: Rank Feature with Greedy Matchingfor End-to-End Object Detection

摘要 与前代类似&#xff0c; DEYOv2 采用渐进式推理方法 来加速模型训练并提高性能。该研究深入探讨了一对一匹配在优化器中的局限性&#xff0c;并提出了有效解决该问题的解决方案&#xff0c;如Rank 特征和贪婪匹配 。这种方法使DEYOv2的第三阶段能够最大限度地从第一和第二…

Pytorch环境下基于Transformer模型的滚动轴承故障诊断

注意力机制是深度学习中的重要技术之一&#xff0c;正日益受到重视关注。注意力机制作为一种信息贡献筛选的方法被提出&#xff0c;它可以帮助神经网络更多地关注与任务相关的特征&#xff0c;从而减少对任务贡献较小信息的影响。因此&#xff0c;利用注意机制可以提高神经网络…

ArcGIS巧思制作3D景观地图

John Nelson 又制作了一个制图教程视频,我原以为只是一个简单的局部场景DEM夸张实现的3D地图。 不过细看以后…… 还就是比较简单的3D场景地图,操作不难,但是 John Nelson 就是天才。 为什么? 他使用 ArcGIS Pro,在普通的3D地图中,不仅仅是图层混合制作地形效果,还巧妙的…

51-32 CVPR’24 | 3DSFLabelling,通过伪自动标注增强 3D 场景流估计

24 年 2 月&#xff0c;鉴智机器人、剑桥大学和上海交通大学联合发布CVPR24工作&#xff0c;3DSFLabelling: Boosting 3D Scene Flow Estimation by Pseudo Auto-labelling。 提出 3D 场景自动标注新框架&#xff0c;将 3D 点云打包成具有不同运动属性的 Boxes&#xff0c;通过…

【光伏监控系统的相关产品有哪些】Acrel-1000DP分布式光伏监控系统

光伏发电系统是指无需通过热过程直接将光能转变为电能的发电系统。通常由光伏方阵、蓄电池组&#xff08;蓄电池控制器&#xff09;、逆变器、交流配电柜和太阳跟踪控制系统等设备组成。其特点是可靠性高、使用寿命长、不污染环境、能独立发电又能并网运行。 分布式光伏监控系…

高防服务器秒解是什么意思

高防服务器秒解是指高防服务器在遭受大规模的DDoS攻击时&#xff0c;能够迅速解决问题或应对攻击。DDoS攻击是指攻击者通过向目标服务器发送大量的请求&#xff0c;使服务器资源耗尽或无法正常响应其他合法用户的请求&#xff0c;从而导致服务不可用。高防服务器通过具备高性能…

upload-labs-pass01

1.安装好环境进入关卡&#xff08;记得打开小皮&#xff09; 2.可以看到第一关是要求上传图片&#xff0c;但是同时限制了图片类型&#xff0c;那么如果我们将木马写入图片&#xff0c;但是类型又不在白名单&#xff0c;就要想办法绕过 3.可以看到这里的要求是有check&#xff…

数据结构:栈「详解」

目录 一&#xff0c;栈的定义 二&#xff0c;栈的基本操作 1&#xff0c;顺序栈 1.1顺序栈的基本概念 1.2顺序栈的基本操作 2&#xff0c;链栈 2.1链栈的基本概念 2.2链栈的种类 2.3链栈的基本操作 三&#xff0c;栈的应用 1&#xff0c;函数递归调用 2&#xff0c;…

基于支持向量机(svm)的人脸识别

注意&#xff1a;本文引用自专业人工智能社区Venus AI 更多AI知识请参考原站 &#xff08;[www.aideeplearning.cn]&#xff09; 数据集加载与可视化 from sklearn.datasets import fetch_lfw_people faces fetch_lfw_people(min_faces_per_person60) # Check out sample…

REDHAWK——连接(续)

文章目录 前言一、突发 IO1、数据传输①、输入②、输出 2、突发信号相关信息 (SRI)3、多输出端口4、使用复数数据①、在 C 中转换复数数据 5、时间戳6、端口统计①、C 二、消息传递1、消息生产者①、创建一个消息生产者②、发送消息 2、消息消费者①、创建消息消费者②、注册接…

优化选址问题 | 基于节约算法求解考虑碳排放及带时间窗的物流选址问题附matlab代码

目录 问题代码问题 节约算法(Savings Algorithm)通常用于解决车辆路径问题(Vehicle Routing Problem, VRP),特别是当需要考虑如何有效地组织车辆的路线以最小化总行驶距离时。然而,当问题扩展到包括碳排放和带时间窗的物流选址问题时,算法需要相应的调整。 在这个扩展…

VSCode创建用户代码片段-案例demo

示例 - 在线生成代码片段 Vue3代码片段 {"vue3": {scope": "javascript,typescript,html,vue","prefix": "vue3","body": ["<template>","$1","</template>",""…

webpack5零基础入门-11处理html资源

1.目的 主要是为了自动引入打包后的js与css资源&#xff0c;避免手动引入 2.安装相关包 npm install --save-dev html-webpack-plugin 3.引入插件 const HtmlWebpackPlugin require(html-webpack-plugin); 4.添加插件&#xff08;通过new方法调用&#xff09; /**插件 *…

Coursera上Golang专项课程3:Concurrency in Go 学习笔记(完结)

Concurrency in Go 本文是 Concurrency in Go 这门课的学习笔记&#xff0c;如有侵权&#xff0c;请联系删除。 文章目录 Concurrency in GoMODULE 1: Why Use Concurrency?Learning Objectives M1.1.1 - Parallel ExecutionM1.1.2 - Von Neumann BottleneckM1.1.3 - Power W…

K8S POD 启动探针 startupProbe 的使用

当我们启动一个POD 时&#xff0c; 当k8s detect 里面的容器启动成功时&#xff0c; 就会认为这个POD 启动完成了&#xff0c; 通常就会在状态里表示 ready 1/1 … 例如 rootk8s-master:~# kubectl get pods NAME READY STATUS RESTARTS AGE bq-api-demo 1…

数字创新的引擎:探索Web3的前沿科技和商业模式

随着数字化时代的不断发展&#xff0c;Web3作为下一代互联网的重要组成部分&#xff0c;正逐渐成为数字创新的引擎。本文将深入探讨Web3的前沿科技和商业模式&#xff0c;揭示其在数字创新领域的重要作用和潜力。 1. 区块链技术的革命性 Web3的核心是区块链技术&#xff0c;它…

Memcached-分布式内存对象缓存系统

目录 一、NoSQL 介绍 二、Memcached 1、Memcached 介绍 1.1 Memcached 概念 1.2 Memcached 特性 1.3 Memcached 和 Redis 区别 1.4 Memcached 工作机制 1.4.1 内存分配机制 1.4.2 懒惰期 Lazy Expiration 1.4.3 LRU&#xff08;最近最少使用算法&#xff09; 1.4.4…