文章目录
- 一、数据聚合
- 1.1 聚合种类
- 1.2 DSL实现聚合
- 1.3 RestAPI实现聚合
- 1.4 演示:多条件聚合
- 二、自动补全
- 2.1 拼音分词器
- 2.2 自定义分词器
- 2.3 DSL自动补全查询
- 2.5 实现酒店搜索框自动补全
- 2.5.1 修改酒店索引库数据结构
- 2.5.2 RestAPI实现自动补全查询
- 2.5.3 实战
- 三、数据同步
- 3.1 实现数据同步的方法
- 3.2 使用消息队列MQ实现数据同步
- 3.2.1 导入hotel-admin
- 3.2.2 声明交换机、队列、routingkey
- 四、集群
- 4.1 搭建ES集群
- 4.2 集群职责和脑裂问题
- 4.3 集群故障转移
- 4.4 集群分布式存储与查询
一、数据聚合
1.1 聚合种类
聚合(aggregations)可以实现对文档数据的统计、分析、运算。聚合常见的有三类:
- 桶(Bucket)聚合:用来对文档做分组
TermAggregation:按照文档字段值分组
Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组 - 度量(Metric)聚合:用以计算一些值,比如:最大值、最小值、平均值等
Avg:求平均值
Max:求最大值
Min:求最小值
Stats:同时求max、min、avg、sum等 - 管道(pipeline)聚合:其它聚合的结果为基础做聚合
注意:参与聚合的字段类型必须是:keyword、数值、日期、布尔,一定不能是可分词的类型。
1.2 DSL实现聚合
# 使用DSL实现聚合
# 1.bucket桶聚合 + 限定聚合范围
# 例:根据酒店品牌名做聚合(并且限定价格不高于200的),并按照结果的升序排序,显示前5个品牌
GET /hotel/_search
{"query": {"range": {"price": {"lte": 200}}},"size": 0, //设置size为0,结果中不包含文档,只包含聚合结果"aggs": { // 定义聚合"brandAgg": { // 定义聚合名"terms": { // 聚合类型,按照品牌名聚合,所以选择term"field": "brand", // 参与聚合字段"order": {"_count": "asc" //指定排序规则 升序}, "size": 20 //希望获得聚合结果数}}}
}# 2.Metrics聚合
# 例:获得每个品牌的用户评分的min、max、avg,并且按照avg排序(降序)GET /hotel/_search
{"size": 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 20,"order": {"score_stats.avg": "desc"}},"aggs": { //子聚合"score_stats": { //子聚合名"stats": { //聚合类型,stats可以计算min、max、avg等"field": "score" //聚合字段}}}}}
}
1.3 RestAPI实现聚合
/*** 桶bucket聚合*/@Testvoid testAgg() throws IOException {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.sizerequest.source().size(0);// 2.2.聚合request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析结果Aggregations aggregations = response.getAggregations();// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get("brandAgg");// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历for (Terms.Bucket bucket : buckets) {String brandName = bucket.getKeyAsString();System.out.println("brandName = " + brandName);long docCount = bucket.getDocCount();System.out.println("docCount = " + docCount);}}
1.4 演示:多条件聚合
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;@Overridepublic Map<String, List<String>> filters() {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.sizerequest.source().size(0);// 2.2.聚合buildAggregation(request);// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根据品牌名称,获取聚合结果List<String> brandList = getAggByName(aggregations, "brandAgg");// 放入mapresult.put("品牌",brandList);// 4.2.根据城市名称,获取聚合结果List<String> cityList = getAggByName(aggregations, "cityAgg");// 放入mapresult.put("城市",cityList);// 4.3.根据星级名称,获取聚合结果List<String> starList = getAggByName(aggregations, "starAgg");// 放入mapresult.put("星级",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}public List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get(aggName);// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();brandList.add(key);}return brandList;}public void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}
}
测试
@SpringBootTest
public class HotelDemoApplicationTest {@Autowiredprivate IHotelService hotelService;@Testvoid contextLoads(){Map<String, List<String>> filters = hotelService.filters();System.out.println(filters);}
}
结果:
二、自动补全
自动补全如下图所示:
2.1 拼音分词器
要实现根据字母做补全,就必须对文档按照拼音分词。在GitHub上恰好有elasticsearch的拼音分词插件。地址:https://github.com/medcl/elasticsearch-analysis-pinyin
安装方式与IK分词器一样,分三步:
- 解压
- 上传到虚拟机中,elasticsearch的plugin目录
- 重启elasticsearch
- 测试
2.2 自定义分词器
演示:
# 自定义拼音分词器
PUT /test
{"settings": {"analysis": {"analyzer": { "my_analyzer": { "tokenizer": "ik_max_word","filter": "py"}},"filter": {"py": { "type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"name": {"type": "text","analyzer": "my_analyzer","search_analyzer": "ik_smart"}}}
}POST /test/_doc/1
{"id": 1,"name": "狮子"
}
POST /test/_doc/2
{"id": 2,"name": "虱子"
}GET /test/_search
{"query": {"match": {"name": "掉入狮子笼咋办"}}
}
注意:
拼音分词器通常在创建索引库时使用,搜索时使用普通分词器即可
2.3 DSL自动补全查询
查询语法如下
// 自动补全查询
POST /test/_search
{"suggest": {"title_suggest": { // 自定义补全查询名称"text": "s", // 关键字"completion": {"field": "title", // 补全字段"skip_duplicates": true, // 跳过重复的"size": 10 // 获取前10条结果}}}
}
演示:
# 2.自动补全
# 2.1 创建一个 自动补全的索引库 属性有title
DELETE /test
PUT test
{"mappings": {"properties": {"title":{"type": "completion"}}}
}
# 2.2 插入示例数据
POST test/_doc
{"title": ["Sony", "WH-1000XM3"]
}
POST test/_doc
{"title": ["SK-II", "PITERA"]
}
POST test/_doc
{"title": ["Nintendo", "switch"]
}# 2.3 自动补全查询
# 例:输入一个关键字s,看自动补全的结果
# 结果:"SK-II"、"Sony"和"switch"
POST /test/_search
{"suggest": {"title_suggest": {"text": "s", "completion": {"field": "title","skip_duplicates": true, "size": 10 }}}
}
结果:
2.5 实现酒店搜索框自动补全
2.5.1 修改酒店索引库数据结构
1.修改索引库结构
# 酒店数据索引库
GET /hotel/_mapping
DELETE /hotel
PUT /hotel
{"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}}
}# 自动补全查询
GET /hotel/_search
{"suggest": {"mySuggestion": {"text": "shang","completion": {"field": "suggestion","skip_duplicates": true, "size": 10 }}}
}
2.修改HotelDoc
@Data
@NoArgsConstructor
public class HotelDoc {private Long id;private String name;private String address;private Integer price;private Integer score;private String brand;private String city;private String starName;private String business;private String location;private String pic;private Object distance; //新加加字段"距离":酒店距你选择位置的距离private Boolean isAD; //新加加字段"标记":给你置顶的酒店添加一个标记private List<String> suggestion;//新加该字段用于自动补全public HotelDoc(Hotel hotel) {this.id = hotel.getId();this.name = hotel.getName();this.address = hotel.getAddress();this.price = hotel.getPrice();this.score = hotel.getScore();this.brand = hotel.getBrand();this.city = hotel.getCity();this.starName = hotel.getStarName();this.business = hotel.getBusiness();this.location = hotel.getLatitude() + ", " + hotel.getLongitude();this.pic = hotel.getPic();// 自动补全字段的处理this.suggestion = new ArrayList<>();// 添加品牌、城市this.suggestion.add(this.brand);this.suggestion.add(this.city);// 判断商圈是否包含/if (this.business.contains("/")) {// business有多个值,需要切割String[] arr = this.business.split("/");// business的每个值都要加入到suggestion中Collections.addAll(this.suggestion, arr);}else{this.suggestion.add(this.business);}}
}
3.【重新导入数据,不演示,参见之前的批量导入文档功能】查询结果
2.5.2 RestAPI实现自动补全查询
/*** 自动补全查询*/@Testvoid testSuggest() throws IOException {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix("s")));// 3.发出请求SearchResponse response = client.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsfor (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();System.out.println(str);}}
2.5.3 实战
Mapper层
@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@PostMapping("list")public PageResult search(@RequestBody RequestParams params) {return hotelService.search(params);}@PostMapping("filters")public Map<String, List<String>> getFilters(@RequestBody RequestParams params) {return hotelService.filters(params);}@GetMapping("suggestion")public List<String> getSuggestion(@RequestParam("key") String key) {return hotelService.getSuggestion(key);}
}
Service层
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 自动补全查询*/@Overridepublic List<String> getSuggestion(String key) {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix(key)));// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsList<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();result.add(str);}return result;} catch (IOException e) {throw new RuntimeException(e);}}
}
结果演示
三、数据同步
3.1 实现数据同步的方法
3.2 使用消息队列MQ实现数据同步
3.2.1 导入hotel-admin
3.2.2 声明交换机、队列、routingkey
由于增和改都相当于插入,所以共用一个队列;删除占用一个队列。
一、对消费者hotel-demo的操作
- 引入amqp依赖和配置rabbitmq的yml文件
<!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
server:port: 8089
spring:datasource:url: jdbc:mysql://mysql:3306/heima?useSSL=falseusername: rootpassword: 123driver-class-name: com.mysql.jdbc.Driverrabbitmq:host: 192.168.150.101port: 5672username: itcastpassword: 123321virtual-host: /
logging:level:cn.itcast: debugpattern:dateformat: HH:mm:ss:SSS
mybatis-plus:configuration:map-underscore-to-camel-case: truetype-aliases-package: cn.itcast.hotel.pojo
- 定义mq的一些常量
public class HotelMqConstants {// 交换机名称public static final String EXCHANGE_NAME = "hotel.topic";// 新增修改队列public static final String INSERT_QUEUE_NAME = "hotel.insert.queue";// 删除队列public static final String DELETE_QUEUE_NAME = "hotel.delete.queue";// 新增修改的RoutingKeypublic static final String INSERT_KEY = "hotel.insert";// 删除的RoutingKeypublic static final String DELETE_KEY = "hotel.delete";
}
- 声明交换机和队列,并监听MQ消息【注解方式】
@Component
public class HotelListener {@Autowiredprivate IHotelService hotelService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.INSERT_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.INSERT_KEY))public void listenHotelInsert(Long hotelId){// 新增hotelService.saveById(hotelId);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = HotelMqConstants.DELETE_QUEUE_NAME),exchange = @Exchange(name = HotelMqConstants.EXCHANGE_NAME, type = ExchangeTypes.TOPIC),key = HotelMqConstants.DELETE_KEY))public void listenHotelDelete(Long hotelId){// 删除hotelService.deleteById(hotelId);}
}
【bean方式】
@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange(){return new TopicExchange(HotelMqConstants.EXCHANGE_NAME,true,false);}@Beanpublic Queue insertQueue(){return new Queue(HotelMqConstants.INSERT_QUEUE_NAME,true);}@Beanpublic Queue deleteQueue(){return new Queue(HotelMqConstants.DELETE_QUEUE_NAME,true);}@Beanpublic Binding insertQueueBinding(){return BindingBuilder.bind(insertQueue()).to(topicExchange()).with(HotelMqConstants.INSERT_KEY);}@Beanpublic Binding deleteQueueBinding(){return BindingBuilder.bind(deleteQueue()).to(topicExchange()).with(HotelMqConstants.DELETE_KEY);}
}
- RestAPI实现删改
@Slf4j
@Service
public class HotelService extends ServiceImpl<HotelMapper, Hotel> implements IHotelService {@Autowiredprivate RestHighLevelClient restHighLevelClient;/*** 搜索框查询*/@Overridepublic PageResult search(RequestParams params) {try {// 1.准备RequestSearchRequest request = new SearchRequest("hotel");// 2.准备请求参数// 2.1.多条件查询和过滤buildBasicQuery(params, request);// 2.2.分页int page = params.getPage();int size = params.getSize();request.source().from((page - 1) * size).size(size);/*** 2.3.距离排序*/String location = params.getLocation();if (StringUtils.isNotBlank(location)) {// 不为空则查询request.source().sort(SortBuilders.geoDistanceSort("location", new GeoPoint(location)).order(SortOrder.ASC).unit(DistanceUnit.KILOMETERS));}// 3.发送请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析响应return handleResponse(response);} catch (IOException e) {throw new RuntimeException("搜索数据失败", e);}}/*** 复合查询*/private void buildBasicQuery(RequestParams params, SearchRequest request) {// 1.准备Boolean复合查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();/*** 1.查询关键字* must参与 算分*/// 1.1.关键字搜索,match查询,放到must中String key = params.getKey();if (StringUtils.isNotBlank(key)) {// 不为空,根据关键字查询boolQuery.must(QueryBuilders.matchQuery("all", key));} else {// 为空,查询所有boolQuery.must(QueryBuilders.matchAllQuery());}/*** 2.条件过滤:多条件复合查询* 根据 “品牌 城市 星级 价格范围” 过滤数据* filter不参与 算分*/// 1.2.品牌String brand = params.getBrand();if (StringUtils.isNotBlank(brand)) { // 不为空则查询boolQuery.filter(QueryBuilders.termQuery("brand", brand));}// 1.3.城市String city = params.getCity();if (StringUtils.isNotBlank(city)) {// 不为空则查询boolQuery.filter(QueryBuilders.termQuery("city", city));}// 1.4.星级String starName = params.getStarName();if (StringUtils.isNotBlank(starName)) {// 不为空则查询boolQuery.filter(QueryBuilders.termQuery("starName", starName));}// 1.5.价格范围Integer minPrice = params.getMinPrice();Integer maxPrice = params.getMaxPrice();if (minPrice != null && maxPrice != null) {// 不为空则查询maxPrice = maxPrice == 0 ? Integer.MAX_VALUE : maxPrice;boolQuery.filter(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice));}/*** 3.算分函数查询* 置顶功能:给你置顶的酒店添加一个标记,并按其算分*/FunctionScoreQueryBuilder functionScoreQuery = QueryBuilders.functionScoreQuery(boolQuery, // 原始查询,boolQuerynew FunctionScoreQueryBuilder.FilterFunctionBuilder[]{ // function数组new FunctionScoreQueryBuilder.FilterFunctionBuilder(QueryBuilders.termQuery("isAD", true), // 过滤条件ScoreFunctionBuilders.weightFactorFunction(10) // 算分函数)});/*** 4.设置查询条件*/request.source().query(functionScoreQuery);}/*** 结果解析*/private PageResult handleResponse(SearchResponse response) {SearchHits searchHits = response.getHits();// 4.1.总条数long total = searchHits.getTotalHits().value;// 4.2.获取文档数组SearchHit[] hits = searchHits.getHits();// 4.3.遍历List<HotelDoc> hotels = new ArrayList<>(hits.length);for (SearchHit hit : hits) {// 4.4.获取sourceString json = hit.getSourceAsString();// 4.5.反序列化,非高亮的HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);// 4.6.处理高亮结果// 1)获取高亮mapMap<String, HighlightField> map = hit.getHighlightFields();if (map != null && !map.isEmpty()) {// 2)根据字段名,获取高亮结果HighlightField highlightField = map.get("name");if (highlightField != null) {// 3)获取高亮结果字符串数组中的第1个元素String hName = highlightField.getFragments()[0].toString();// 4)把高亮结果放到HotelDoc中hotelDoc.setName(hName);}}// 4.8.排序信息Object[] sortValues = hit.getSortValues(); // 获取排序结果if (sortValues.length > 0) {/*** 由于该程序是根据距离[酒店距你选择位置的距离]进行排序,所以排序结果为距离*/hotelDoc.setDistance(sortValues[0]);}// 4.9.放入集合hotels.add(hotelDoc);}return new PageResult(total, hotels);}/*** 多条件聚合*/@Overridepublic Map<String, List<String>> filters(RequestParams params) {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数// 2.1.query查询信息buildBasicQuery(params, request);// 2.2.sizerequest.source().size(0);// 2.3.聚合buildAggregation(request);// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Map<String, List<String>> result = new HashMap<>();Aggregations aggregations = response.getAggregations();// 4.1.根据品牌名称,获取聚合结果List<String> brandList = getAggByName(aggregations, "brandAgg");// 放入mapresult.put("品牌",brandList);// 4.2.根据城市名称,获取聚合结果List<String> cityList = getAggByName(aggregations, "cityAgg");// 放入mapresult.put("城市",cityList);// 4.3.根据星级名称,获取聚合结果List<String> starList = getAggByName(aggregations, "starAgg");// 放入mapresult.put("星级",starList);return result;} catch (IOException e) {throw new RuntimeException(e);}}public List<String> getAggByName(Aggregations aggregations, String aggName) {// 4.1.根据聚合名称,获取聚合结果Terms brandAgg = aggregations.get(aggName);// 4.2.获取bucketsList<? extends Terms.Bucket> buckets = brandAgg.getBuckets();// 4.3.遍历List<String> brandList = new ArrayList<>();for (Terms.Bucket bucket : buckets) {String key = bucket.getKeyAsString();brandList.add(key);}return brandList;}public void buildAggregation(SearchRequest request) {request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(100));request.source().aggregation(AggregationBuilders.terms("cityAgg").field("city").size(100));request.source().aggregation(AggregationBuilders.terms("starAgg").field("starName").size(100));}/*** 自动补全查询*/@Overridepublic List<String> getSuggestion(String key) {try {// 1.准备请求SearchRequest request = new SearchRequest("hotel");// 2.请求参数request.source().suggest(new SuggestBuilder().addSuggestion("hotelSuggest",SuggestBuilders.completionSuggestion("suggestion").size(10).skipDuplicates(true).prefix(key)));// 3.发出请求SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);// 4.解析结果Suggest suggest = response.getSuggest();// 4.1.根据补全查询名称,获取补全结果CompletionSuggestion suggestion = suggest.getSuggestion("hotelSuggest");// 4.2.获取optionsList<String> result = new ArrayList<>();for (CompletionSuggestion.Entry.Option option : suggestion.getOptions()) {// 4.3.获取补全的结果String str = option.getText().toString();result.add(str);}return result;} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long hotelId) {try {// 1.创建requestDeleteRequest request = new DeleteRequest("hotel", hotelId.toString());// 2.发送请求restHighLevelClient.delete(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("删除酒店数据失败", e);}}@Overridepublic void saveById(Long hotelId) {try {// 查询酒店数据,应该基于Feign远程调用hotel-admin,根据id查询酒店数据(现在直接去数据库查)Hotel hotel = getById(hotelId);// 转换HotelDoc hotelDoc = new HotelDoc(hotel);// 1.创建RequestIndexRequest request = new IndexRequest("hotel").id(hotelId.toString());// 2.准备参数request.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 3.发送请求restHighLevelClient.index(request, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException("新增酒店数据失败", e);}}
}
二、对发送者hotel-admin的操作
- 引入amqp依赖和配置rabbitmq的yml文件【同上】
- 定义mq的一些常量【同上】
- 当发送者对mysql数据库改动时,发送消息给MQ
@RestController
@RequestMapping("hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;// 注入发送消息的api@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 根据id查询*/@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id){return hotelService.getById(id);}/*** 查询当前页内容*/@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size){Page<Hotel> result = hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}/*** 新增,并发送给mq消息*/@PostMappingpublic void saveHotel(@RequestBody Hotel hotel){// 新增酒店hotelService.save(hotel);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 修改,并发送给mq消息*/@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.INSERT_KEY, hotel.getId());}/*** 删除,并发送给mq消息*/@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);// 发送MQ消息rabbitTemplate.convertAndSend(HotelMqConstants.EXCHANGE_NAME, HotelMqConstants.DELETE_KEY, id);}
}
四、集群
单机的elasticsearch做数据存储,必然面临两个问题:海量数据存储问题、单点故障问题。
>> 海量数据存储问题:将索引库从逻辑上拆分为N个分片(shard),存储到多个节点
>> 单点故障问题:将分片数据在不同节点备份(replica )
4.1 搭建ES集群
我们会在单机上利用docker容器运行多个es实例来模拟es集群。不过生产环境推荐大家每一台服务节点仅部署一个es的实例。
部署es集群可以直接使用docker-compose来完成,但这要求你的Linux虚拟机至少有4G的内存空间
- 创建es集群
首先编写一个docker-compose文件,内容如下:
version: '2.2'
services:es01:image: elasticsearch:7.12.1container_name: es01environment:- node.name=es01- cluster.name=es-docker-cluster- discovery.seed_hosts=es02,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data01:/usr/share/elasticsearch/dataports:- 9200:9200networks:- elastices02:image: elasticsearch:7.12.1container_name: es02environment:- node.name=es02- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es03- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data02:/usr/share/elasticsearch/dataports:- 9201:9200networks:- elastices03:image: elasticsearch:7.12.1container_name: es03environment:- node.name=es03- cluster.name=es-docker-cluster- discovery.seed_hosts=es01,es02- cluster.initial_master_nodes=es01,es02,es03- "ES_JAVA_OPTS=-Xms512m -Xmx512m"volumes:- data03:/usr/share/elasticsearch/datanetworks:- elasticports:- 9202:9200
volumes:data01:driver: localdata02:driver: localdata03:driver: localnetworks:elastic:driver: bridge
es运行需要修改一些linux系统权限,修改/etc/sysctl.conf
文件
vi /etc/sysctl.conf
添加下面的内容:
vm.max_map_count=262144
然后执行命令,让配置生效:
sysctl -p
通过docker-compose启动集群:
docker-compose up -d
- 集群状态监控
kibana可以监控es集群,不过新版本需要依赖es的x-pack 功能,配置比较复杂。
这里推荐使用cerebro来监控es集群状态,官方网址:https://github.com/lmenezes/cerebro
课前资料已经提供了安装包:
解压即可使用,非常方便。
解压好的目录如下:
进入对应的bin目录:
双击其中的cerebro.bat文件即可启动服务。
访问http://localhost:9000 即可进入管理界面:
输入你的elasticsearch的任意节点的地址和端口,点击connect即可:
绿色的条,代表集群处于绿色(健康状态)。
- 创建索引库
1)利用kibana的DevTools创建索引库
在DevTools中输入指令:
PUT /itcast
{"settings": {"number_of_shards": 3, // 分片数量"number_of_replicas": 1 // 副本数量},"mappings": {"properties": {// mapping映射定义 ...}}
}
2)利用cerebro创建索引库
利用cerebro还可以创建索引库:
填写索引库信息:
点击右下角的create按钮:
- 查看分片效果
回到首页,即可查看索引库分片效果:
4.2 集群职责和脑裂问题
4.3 集群故障转移
4.4 集群分布式存储与查询