微服务-实用篇
- 一、微服务治理
- 1.微服务远程调用
- 2.Eureka注册中心
- Eureka的作用:
- 搭建EurekaServer服务
- Client服务注册
- 服务发现
- Ribbon负载均衡策略配置
- Ribbon配置饥饿加载
- 3.nacos注册中心
- 使用nacos注册中心服务
- nacos区域负载均衡
- nacos环境隔离-namespace
- Nacos和Eureka的对比
- nacos配置管理
- 配置管理步骤
- 配置热更新
- 多环境配置共享
- 4.http客户端Feign
- Feigin的使用步骤
- Feign的日志配置
- Feign的性能优化
- 5.统一网关Gateway
- 作用
- 搭建网关服务
- 路由的过滤器配置
- 全局过滤器
- 过滤器链执行顺序
- 二、异步通信
- 1.什么是AMQP?
- 2.部署RabbitMQ
- 3.SpingAMQP如何发送消息
- 4.SpingAMQP如何接收消息
- 5.WorkQueue模型
- 6.交换机
- 7.FanoutExchange
- 8.DirectExchange
- 9.TopicExchange
- 10.消息转换器
- 三、分布式搜索
- 1.初识elasticsearch
- 2.安装elastic
- 3.索引库的操作
- 4.文档操作
- 5.RestClient
- RestClient的初步使用
- RestClient增删改查
- 6.elasticsearch搜索功能
- DSL查询语法
- 7.RestClient查询文档
- 8.聚合
- 9.RestClient实现聚合
- 10.自动补全
- 11.RestClient实现自动补全
- 12.数据同步
- 13.ES集群
- 13.ES集群
一、微服务治理
1.微服务远程调用
-
springCloud提供了RestTemplate,可以发起远程http协议的调用
-
使用
-
注入bean
@Beanpublic RestTemplate restTemplate() {return new RestTemplate();}
-
restTemplate使用
@RestController @RequestMapping("order") public class OrderController {@Autowiredprivate OrderService orderService;@Autowiredprivate RestTemplate restTemplate;private static final String UserBaseApiURL = "http://localhost:8081/user/";@GetMapping("{orderId}")public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {// 根据id查询订单并返回Order order = orderService.queryOrderById(orderId);// 远程调用查询用户User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class);order.setUser(user);return order;} }
-
2.Eureka注册中心
-
Eureka的作用:
-
消费者该如何获取服务提供者具体信息?
- 服务提供者启动时向eureka注册自己的信息
- eureka保存这些信息
- 消费者根据服务名称向eureka拉取提供者信息
-
如果有多个服务提供者,消费者该如何选择?
- 服务消费者利用负载均衡算法,从服务列表中挑选一个
-
消费者如何感知服务提供者的健康状态?
- 服务者会每隔30秒向EurekaServer发送心跳请求,报告健康状态
- eureka会更新记录服务列表信息,心跳不正常会被剔除
- 消费者就可以拉取到最新的信息
-
-
搭建EurekaServer服务
-
创建项目引入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
-
编写application.yml配置文件
server:port: 10086 spring:application:name: eureka-server # 服务名 eureka:client:service-url: # 服务地址defaultZone: http://127.0.0.1:10086/eureka/
-
在启动类上开启自动装配
@EnableEurekaServer // 开启eureka自动装配 @SpringBootApplication public class EurekaApplication {public static void main(String[] args) {SpringApplication.run(EurekaApplication.class , args);} }
-
-
Client服务注册
-
引入eureka-client依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
-
在application.yml中配置eureka地址
spring:datasource:url: jdbc:mysql://localhost:3306/cloud_order?useSSL=falseusername: rootpassword: 123456driver-class-name: com.mysql.jdbc.Driverapplication:name: orderservice # 服务名 eureka:client:service-url: # 服务地址defaultZone: http://127.0.0.1:10086/eureka/
-
-
服务发现
-
给RestTemplate添加@LoadBalanced注解
@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}
-
用服务提供者的服务名称远程调用
@RestController @RequestMapping("order") public class OrderController {@Autowiredprivate OrderService orderService;@Autowiredprivate RestTemplate restTemplate;private static final String UserBaseApiURL = "http://userservice/user/";@GetMapping("{orderId}")public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {// 根据id查询订单并返回Order order = orderService.queryOrderById(orderId);// 远程调用查询用户User user = restTemplate.getForObject(UserBaseApiURL + order.getUserId(), User.class);order.setUser(user);return order;} }
-
-
Ribbon负载均衡策略配置
-
-
代码方式,注入IRule实例bean
@Beanpublic IRule randomIRule() {return new RandomRule();}
-
-
2.配置文件方式: 在application.yml文件中添加配置项
userservice:ribbon:NFLoadBalancerRuleClassName: com.netfix.loadbalancer.RandomRule
-
-
Ribbon配置饥饿加载
-
ribbon默认采用懒加载,即第一次访问时才会去创建LoadBalanceClient,请求时间会很长
-
在application.yml中开启饥饿加载
ribbon:eager-load:enabled: true # 开启饥饿加载clients: - userservice # 指定对userservice这个服务饥饿加载
-
3.nacos注册中心
-
使用nacos注册中心服务
-
引入依赖
<!-- nacos的管理依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.5.RELEASE</version><type>pom</type><scope>import</scope></dependency><!-- nacos客户端依赖包 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
-
修改application.yml文件nacos配置
spring:application:name: orderservice # 服务名cloud:nacos:server-addr: localhost:8848 # nacos端口号
-
-
nacos区域负载均衡
-
nacos默认负载均衡策略为服务轮询,一个服务往往会在多地部署多个实例,相同区域内的服务之间相互调用时长消耗更短,因此区域内优先调用比较合理,nacos提供了这样的负载均衡策略,不过区域内的策略为随机策略,当区域内没有可用服务时再访问其他区域的可用服务。
-
application.yml配置区域
spring:application:name: userservice # 服务名cloud:nacos:server-addr: localhost:8848discovery:cluster-name: HZ # 集群名称
-
为Ribbon配置区域优先策略
userservice:ribbon:NFLoadBalancerRuleClassName: com.alibaba.cloud.nacos.ribbon.NacosRule
-
-
nacos环境隔离-namespace
-
不同的时期有不同的环境,比如开发时需要开发环境,namespace就可以进行环境隔离,不同环境之间的服务无法调用。
-
application.yaml 配置namespace
spring:application:name: orderservice # 服务名cloud:nacos:server-addr: localhost:8848discovery:cluster-name: HZ # 集群名称namespace: fd3445b3-8e01-446a-91d5-03ab8b5a7205 # 环境id(从nacos控制台查看)
-
-
Nacos和Eureka的对比
-
共同点
- 都支持服务注册和服务拉取
- 都支持服务提供者心跳方式做健康检测
-
区别
-
Nacos支持服务端主动监测提供者状态:临时实例采用心跳模式,非临时实例采用主动健康监测。
spring:application:name: orderservice # 服务名cloud:nacos:server-addr: localhost:8848discovery:cluster-name: HZ # 集群名称ephemeral: false # 非临时实例
-
临时实例心跳不正常会被剔除,非临时实例则不会被剔除。
-
Nacos支持服务列表变更的消息推送模式,服务列表更新及时。
-
Nacos集群默认采用AP方式,当集群中存在非临时实例时,采用CP模式;Eureka采用AP模式。
-
-
-
nacos配置管理
-
配置管理步骤
-
在控制台添加配置文件
-
添加依赖
<!-- nacos配置管理依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId></dependency>
-
在bootstrap.yml文件中配置
spring:application:name: userserviceprofiles:active: dev # 开发环境cloud:nacos:server-addr: localhost:8848config:file-extension: yaml # 文件后缀
-
使用@Value注解注入
@Value("${pattern.format}")private String format;
-
-
配置热更新
-
通过@Value方式注入的配置属性需要在类上添加@RefreshScope注解即可实现配置热更新
@Slf4j @RestController @RequestMapping("/user") @RefreshScope // 配置热更新注解 public class UserController {@Autowiredprivate UserService userService;// 配置属性注入@Value("${pattern.format}")private String format; }
-
通过@ConfigurationProperties方式注入的属性自动热更新
@Component @ConfigurationProperties(prefix = "pattern") @Data public class PatternProperties {private String format; }
-
-
多环境配置共享
- 微服务会从nacos读取的配置文件:
- [服务名]-[spring.proflle.active].yaml
- [服务名].yaml,默认配置,多环境共享
- 优先级
- [服务名]-[环境].yaml > [服务名].yaml > 本地配置
- 微服务会从nacos读取的配置文件:
-
4.http客户端Feign
-
Feigin的使用步骤
-
引入依赖
<!-- fegin依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency>
-
启动类添加@EnableFeignClients注解
@MapperScan("cn.itcast.order.mapper") @SpringBootApplication @EnableFeignClients public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);} }
-
编写FeignClient接口
@FeignClient("userservice") public interface UserClient {@GetMapping("user/{id}")User findById(@PathVariable("id") Long id); }
-
使用FeignClient中定义的方法代替RestTemplate
@RestController @RequestMapping("order") public class OrderController {@Autowiredprivate OrderService orderService;@Autowiredprivate UserClient userClient;@GetMapping("{orderId}")public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {// 根据id查询订单并返回Order order = orderService.queryOrderById(orderId);// 远程调用查询用户User user = userClient.findById(order.getUserId());order.setUser(user);return order;} }
-
-
Feign的日志配置
-
方式一是配置文件
feign:client:config:default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置logger-level: FULL # 日志级别
-
方式二是JAVA代码配置类
public class FeignClientConfiguration {@Beanpublic Logger.Level feignLogLevel(){return Logger.Level.FULL;} }
@MapperScan("cn.itcast.order.mapper") @SpringBootApplication // 在启动类上添加的配置类属于全局配置@EnableFeignClients(defaultConfiguration = FeignAutoConfiguration.class) public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);} }
// 如果需要对某一服务进行配置在服务接口上添加即可 @FeignClient(value = "userservice" , configuration = FeignClientConfiguration.class) public interface UserClient {@GetMapping("user/{id}")User findById(@PathVariable("id") Long id); }
-
-
Feign的性能优化
-
引入依赖
<dependency><groupId>io.github.openfeign</groupId><artifactId>feign-httpclient</artifactId></dependency>
-
配置连接池
feign:client:config:default: # 这里是default就是全局配置,如果是写服务名,则针对某个微服务的配置logger-level: FULL # 日志级别httpclient:enabled: true # 开启feign对HttpClient的支持max-connections: 200 # 最大连接数max-connections-per-route: 50 # 每个路径的最大连接数
-
5.统一网关Gateway
-
作用
- 对用户请求做身份验证,权限认证
- 将用户请求路由到微服务,并实现负载均衡
- 将用户请求做限流
-
搭建网关服务
-
创建新的module,引入SpringCloudGateWay的依赖和服务发现依赖
<dependencies><!-- nacos服务发现依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!-- 网关gateway依赖 --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId></dependency></dependencies>
-
编写路由配置及nacos地址
server:port: 10010 spring:application:name: gatewaycloud:nacos:server-addr: localhost:80 # nacos地址gateway:routes: # 网关路由配置- id: user-service # 路由id 自定义uri: lb://userservice # 路由的目标地址 lb是负载均衡predicates: # 路由断言- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求- id: order-serviceuri: lb://orderservicepredicates: # 路由断言- Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求
-
-
路由的过滤器配置
-
为某个服务添加过滤器
spring:application:name: gatewaycloud:nacos:server-addr: localhost:8848 # nacos地址gateway:routes: # 网关路由配置- id: user-service # 路由id 自定义uri: lb://userservice # 路由的目标地址 lb是负载均衡predicates: # 路由断言- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求- id: order-serviceuri: lb://orderservicepredicates: # 路由断言- Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求filters:- AddRequestHeader=color, blue # 局部过滤器
-
添加默认过滤器(全局)
spring:application:name: gatewaycloud:nacos:server-addr: localhost:8848 # nacos地址gateway:routes: # 网关路由配置- id: user-service # 路由id 自定义uri: lb://userservice # 路由的目标地址 lb是负载均衡predicates: # 路由断言- Path=/user/** # 这个是按照路径匹配,只要以/user/开头就符合要求- id: order-serviceuri: lb://orderservicepredicates: # 路由断言- Path=/order/** # 这个是按照路径匹配,只要以/order/开头就符合要求default-filters:- AddRequestHeader=color, blue # 全局过滤器
-
-
全局过滤器
@Order(1) // 过滤器等级 越低优先值越高 @Component public class AuthorizeFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();HttpHeaders headers = request.getHeaders();String token = headers.getFirst("token");if (token != null && token.equals("abc")) {return chain.filter(exchange);}exchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);return exchange.getResponse().setComplete();} }
-
过滤器链执行顺序
- 每一个过滤器都必须指定一个int类型的order值,order值越小,优先级越高,执行顺序越靠前。
- GlobalFilter通过实现Ordered接口,或者添加@Order注解来指定Order值,由我们自己指定
- 路由过滤器和defaultFilter的order由Spring指定,默认是按照声明顺序从1递增
- 当过滤器的order值一样时,会按照default>路由过滤器>GlobalFilter的循序执行
二、异步通信
1.什么是AMQP?
- 应用层消息通信的一种协议,与语言和平台无关
2.部署RabbitMQ
RabbitMQ部署指南.md
3.SpingAMQP如何发送消息
-
引入AMQP的Starter依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
配置RabbitMQ地址
spring:rabbitmq:addresses: 192.168.88.101 # 地址名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcastpassword: 123321
-
新建测试类
@RunWith(SpringRunner.class) @SpringBootTest public class SpringAMQPTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSimpleQueue() {String queueName = "simple.queue";String message = "hello , Spring amqp!";rabbitTemplate.convertAndSend(queueName , message);}}
4.SpingAMQP如何接收消息
-
配置地址
spring:rabbitmq:addresses: 192.168.88.101 # 地址名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcastpassword: 123321
-
新建类
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue") // 在启动前要确保该队列存在!public void listenSimpleQueue(String msg) {System.out.println("消费者1接收到消息 = " + msg);} }
5.WorkQueue模型
-
多个模型绑定到一个队列,用一个消息会被一个消费者处理
-
通过设置prefetch来控制消费者预取的消息数量
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS spring:rabbitmq:addresses: 192.168.88.101 # 地址名port: 5672 # 端口virtual-host: / # 虚拟主机username: itcastpassword: 123321listener:simple:prefetch: 1 # 修改消费者提前把握的最大数量
-
示例:
-
设置发布50条消息
@Testpublic void testSimpleWorkQueue() {String queueName = "simple.queue";for (int i = 0; i < 50; i++) {String message = "hello , Spring amqp! - " + (i + 1);rabbitTemplate.convertAndSend(queueName , message);}}
-
两个接收者
@Component public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者1接收到消息 = " + msg);}@RabbitListener(queues = "simple.queue")public void listenSimpleQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收到消息 = " + msg);Thread.sleep(500);} }
-
6.交换机
- 交换机的作用是什么?
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
- FanoutExChange的会将消息路由到每个绑定的队列
7.FanoutExchange
-
特点:
- 会将交换机接收的消息转发给绑定的所有队列,所有与之监听的消费者全部都会收到消息
-
编写配置类
@Configuration public class FanoutConfig {// 声明FanoutExchange交换机@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("itcast.fanout");}// 声明第一个队列@Beanpublic Queue queue1() {return new Queue("fanout.queue1");}// 绑定队列1和交换机@Beanpublic Binding bindingQueue1(Queue queue1 , FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue1).to(fanoutExchange);}// 声明第二个队列@Beanpublic Queue queue2() {return new Queue("fanout.queue2");}// 绑定队列2和交换机@Beanpublic Binding bindingQueue2(Queue queue2 , FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue2).to(fanoutExchange);} }
-
编写消费者代码
@RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) {System.out.println("消费者1接收到消息 = " + msg);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收到消息 = " + msg);}
8.DirectExchange
-
特点:
- 该交换机需要设置key值,当该交换机收到消息时,交换机会转发给指定key值的队列。存在多个相同的key值时,则群发
-
案例
-
编写接收者,绑定交换机(注解方式)
@RabbitListener(bindings = @QueueBinding(value = @Queue( name = "direct.queue1"),exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT),key = {"red" , "blue" }))public void listenDirectQueue1(String msg) {System.out.println("消费者1接收到消息 = " + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue( name = "direct.queue2"),exchange = @Exchange( name = "itcast.direct" , type = ExchangeTypes.DIRECT),key = {"red" , "pink" }))public void listenDirectQueue2(String msg) throws InterruptedException {System.err.println("消费者2接收到消息 = " + msg);}
-
编写发布者
@Testpublic void testDirectQueue() {String queueName = "simple.queue";String message = "hello , Spring amqp!";// 交换机nameString exchangeName = "itcast.direct";rabbitTemplate.convertAndSend(exchangeName , "red" , message);}
-
9.TopicExchange
-
特点
- TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。
- Queue与Exchange指定BindingKey时可以使用通配符:
- #:代指0个或多个单词
- *:代指一个单词
-
示例:
-
编写接收者
@RabbitListener(bindings = @QueueBinding(value = @Queue( name = "topic.queue1"),exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC),key = {"china.*" }))public void listenTopicQueue1(String msg) {System.out.println("消费者1接收到消息 = " + msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue( name = "topic.queue2"),exchange = @Exchange( name = "itcast.topic" , type = ExchangeTypes.TOPIC),key = {"china.*" }))public void listenTopicQueue2(String msg) {System.err.println("消费者2接收到消息 = " + msg);}
-
编写发送者
@Testpublic void testTopicQueue() {String queueName = "simple.queue";String message = "hello , Spring amqp!";// 交换机nameString exchangeName = "itcast.topic";rabbitTemplate.convertAndSend(exchangeName , "china.news" , message);}
-
10.消息转换器
-
RabbieMQ是可以传递java对象的,通过MessageConverter实现,但是默认是JDk的序列化,最好为其配置JSON对象转化器,注意发布与接收方必须使用相同的MessageConverter
-
案例
-
消息发布方
-
引入JackSon依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.16.1</version> </dependency>
-
注入Bean
@Bean // 注入json消息转化器public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
-
发布消息
@Testpublic void ObjectQueue() {String queueName = "object.queue";HashMap<String, String> hashMap = new HashMap<>();hashMap.put("name" , "小强");hashMap.put("age" , "18");rabbitTemplate.convertAndSend(queueName , hashMap);}
-
-
消息接收方
-
引入JackSon依赖
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.16.1</version> </dependency>
-
注入Bean
@Bean // 注入json消息转化器public MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}
-
定义队列
@Beanpublic Queue ObjectQueue() {return new Queue("object.queue");}
-
接收消息
@RabbitListener(queues = "object.queue")public void listenObjectQueue2(HashMap<String , Object> msg) {System.err.println("消费者接收到消息 = " + msg);}
-
-
三、分布式搜索
1.初识elasticsearch
-
什么是elasticsearch?
- 一个开源的分布式搜索引擎,可以用来实现搜索、日志统计、分析、系统监控等功能
-
什么是elastic stack (ELK)?
- 是指以elasticsearch为核心的技术栈,包括beats、logstash、kibana、elasticsearch
2.安装elastic
安装elasticsearch.md
3.索引库的操作
-
mapping属性
- type:字段数据类型,常见的简单类型有:
- 字符串:text(可分词文本)、keyword(精确值,例如:品牌、国家)
- 数值:long、integer、short、byte、double、float
- 布尔:boolean
- 日期:date
- 对象:object
- index:是否创建索引,默认为true
- analyzer:使用哪种分词器
- properties:该字段的子字段
- type:字段数据类型,常见的简单类型有:
-
索引库操作
-
创建索引库
PUT /索引库名
# 创建索引库 PUT /heima {"mappings": {"properties": {"info": {"type": "text","analyzer": "ik_smart"},"email" : {"type": "keyword","index": false},"name" : {"properties": {"firstName" : {"type" : "keyword","index" : false},"lastName" : {"type" : "keyword","index" : false}}}}} }
-
查看索引库
GET /索引库名
# 查询索引库 GET /heima
-
删除索引库
DELETE /索引库名
# 删除索引库 DELETE /黑马
-
添加字段
PUT /索引库名/_mapping
# 添加新字段 PUT /heima/_mapping {"properties": {"color" : {"type" : "keyword","index" : false}} }
-
4.文档操作
-
添加文档
-
模板
POST /索引库名/_doc/文档id {"字段1" : "值1","字段2" : "值2","字段3" : {"子属性1" : "值3","子属性2" : "值4",} }
-
示例
# 添加文档 POST /heima/_doc/1 {"info" : "尚硅谷,让天下没有学完的技术!","email" : "1482939313@qq.com","name" : {"firstName" : "尚","lastName" : "硅谷"} }
-
-
查看文档
-
模板
GET /索引库名/_doc/文档id
-
示例
GET /heima/_doc/1
-
-
删除文档
-
模板
DELETE /索引库名/_doc/文档id
-
示例
DELETE /heima/_doc/1
-
-
修改文档
-
方式一:全量修改
-
特点
id存在则修改,不存在则创建
-
模板
PUT /索引库名/_doc/文档id {"字段1" : "值1","字段2" : "值2", }
-
-
方式二:局部修改
-
模板
POST /索引库名/_update/文档id {"doc" : {"字段1" : "值1","字段2" : "值2",} }
-
-
5.RestClient
-
RestClient的初步使用
-
引入依赖
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.12.1</version> </dependency>
<!-- 注意版本控制 --><properties><java.version>1.8</java.version><elasticsearch.version>7.12.1</elasticsearch.version></properties>
-
初始化RestHighLevelClient
package cn.itcast.hotel;@SpringBootTest class HotelDemoApplicationTests {private RestHighLevelClient client;@BeforeEachvoid setUp() {this.client = new RestHighLevelClient(RestClient.builder(HttpHost.create("http://192.168.88.101:9200")));}@AfterEachvoid tearDown() throws IOException {this.client.close();}@Testvoid contextLoads() {System.out.println(this.client);} }
-
-
RestClient增删改查
-
操作索引库
-
创建索引库
// 新建索引@Testpublic void addIndex() throws IOException {// 创建Request对象CreateIndexRequest request = new CreateIndexRequest("hotel");// 准备请求参数 RestClientConstant.HOTELTEMPLATE 为新建索引的json结构request.source(RestClientConstant.HOTELTEMPLATE , XContentType.JSON);// 发送请求client.indices().create(request , RequestOptions.DEFAULT);}
-
删除和判断索引库
// 删除索引@Testpublic void deleteIndex() throws IOException {// 创建Request对象DeleteIndexRequest request = new DeleteIndexRequest("hotel");// 发送请求client.indices().delete(request , RequestOptions.DEFAULT);}// 判断索引库是否存在@Testpublic void existsIndex() throws IOException {// 创建Request对象GetIndexRequest request = new GetIndexRequest("hotel");// 发送请求boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);System.out.println( exists ? "索引库存在" : "索引库不存在");}
-
-
操作文档
-
新增文档
// 新增文档@Testpublic void addDocument() throws IOException {// 获取信息Hotel hotel = hotelService.getById(36934L);HotelDoc hotelDoc = new HotelDoc(hotel);// 创建Request对象 并设置idIndexRequest request = new IndexRequest("hotel").id(hotel.getId().toString());// 设置请求request.source(JSON.toJSONString(hotelDoc) , XContentType.JSON);// 发送请求client.index(request , RequestOptions.DEFAULT);}
-
查询文档
// 新增文档@Testpublic void findDocument() throws IOException {// 创建Request对象 并设置idGetRequest request = new GetRequest("hotel").id("36934");// 发送请求GetResponse response = client.get(request, RequestOptions.DEFAULT);String json = response.getSourceAsString();HotelDoc hotelDoc = JSON.parseObject(json, HotelDoc.class);System.out.println("hotelDoc = " + hotelDoc);}
-
更新文档
// 更新文档@Testpublic void updateDocument() throws IOException {// 创建Request对象 并设置idUpdateRequest request = new UpdateRequest("hotel", "36934");// 设置更新信息 每两个参数为一对 key,valuerequest.doc("city" , "美国");// 发送请求client.update(request , RequestOptions.DEFAULT);}
-
删除文档
// 删除文档@Testpublic void deleteDocument() throws IOException {// 创建Request对象 并设置idDeleteRequest req = new DeleteRequest("hotel", "36934");// 发送请求client.delete(req , RequestOptions.DEFAULT);}
-
批量操作
// 批量操作@Testpublic void bulkDocument() throws IOException {// 获取数据List<Hotel> hotelList = hotelService.list();// 创建Request对象BulkRequest bulkRequest = new BulkRequest();// 写入hotelList.stream().forEach( item -> {HotelDoc hotelDoc = new HotelDoc(item);bulkRequest.add(new IndexRequest("hotel").id(item.getId().toString()).source(JSON.toJSONString(hotelDoc) , XContentType.JSON));});// 发送请求client.bulk(bulkRequest , RequestOptions.DEFAULT);}
-
-
6.elasticsearch搜索功能
-
DSL查询语法
-
基本语法
GET /indexName/_search {"query" : {"查询类型" : {"查询条件" : "条件值"}} }
-
查询所有
GET /hotel/_search {"query" : {“match_all” : {}} }
-
全文检索查询
-
会对查询text进行分词,查询倒排索引
-
match和multi_match的区别:
match查询单字段,multi_match根据多个字段查询,参与查询字段越多,查询性能越差
# match查询 GET /hotel/_search {"query": {"match": {"all": "上海如家"}} }# multi_match查询 GET /hotel/_search {"query": {"multi_match": {"query": "上海如家","fields": ["brand" , "name" , "business"]}} }
-
-
精确查询
# term查询 GET /hotel/_search {"query": {"term": {"brand": {"value": "如家"} J}} }# range查询 范围 GET /hotel/_search {"query": {"range": {"price": {"gte": 400,"lte": 500}}} }
-
地理查询
-
geo_bounding_box:查询geo_point值落在某个矩形范围的所有文档
# geo_bounding_box 边界框查询 GET /hotel/_search {"query": {"geo_bounding_box" : {"location" : {"top_left" : {"lat" : 31.1,"lon" : 121.5},"bottom_right" : {"lat" : 30.9,"lon" : 121.7}}}} }
-
geo_distance:查询到指定中心点小于某个距离值的所有文档
# geo_distance 距离中心点查询 GET /hotel/_search {"query": {"geo_distance" : {"distance" : "15km","location" : "31.21,121.5"}} }
-
-
复合查询
-
function score query 可以修改文档的相关性算分(query socre),根据新得到的算分排序。
-
function score query定义的三要素
- 过滤条件:哪些文档要加分
- 算分函数:如何计算function score
- 加权方式:funcation score 与 query score如何运算
# function sorce GET /hotel/_search {"query": {"function_score": {"query": {"match": {"all": "上海"}},"functions": [{"filter": {"term": {"brand": "万豪"}},"weight": 10}],"boost_mode": "multiply"}} }
-
复合查询Boolean Query是一个或多个查询子句的组合。子查询的组合方式有:
- must:必须匹配每个子查询,类似“与”
- should:选择性匹配子查询,类似“或”
- must_not:必须不匹配,不参与算分,类似“非”
- filter:必须匹配,不参与算分
# 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 # 搜索名字包含“如家”,价格不高于400,在坐标31.21,121.5周围10km范围内的酒店 GET /hotel/_search {"query": {"bool": {"must": [{"match": {"name": "如家"}}],"must_not": [{"range": {"price": {"gt": 400}}}],"filter": [{"geo_distance": {"distance": "10km","location": {"lat": 31.21,"lon": 121.5}}}]}} }
-
-
排序
-
elasticsearch默认根据相关度算分,也可以指定
-
简单类型
GET /hotel/_search {"query": {"match": {"all": "北京"}},"sort": [{"price": {"order": "desc"}}] }
-
地理坐标
GET /hotel/_search {"query": {"match": {"all": "上海"}},"sort": [{"_geo_distance": {"location": "31.21,121.5","order": "asc","unit": "km"}} ] }
-
-
分页
-
elasticsearch默认只返回top10的数据
GET /hotel/_search {"query": {"match_all": {}},"from": 10, // 分页开始页"size": 10 // 分页数量 }
-
-
高亮
GET /hotel/_search {"query": {"match": {"all": "如家"}},"highlight": {"fields": {"name": {"require_field_match": "false"}}} }
-
7.RestClient查询文档
@Testpublic void query() throws IOException {// 创建request对象SearchRequest searchRequest = new SearchRequest("hotel");// 构造DSL语句
// searchRequest.source().query(QueryBuilders.matchAllQuery());searchRequest.source().query(QueryBuilders.matchQuery("all" , "上海")).sort("price" , SortOrder.ASC).highlighter(new HighlightBuilder().field("name").requireFieldMatch(false)).from(0).size(50);// 发送请求SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);System.out.println("查询结果总条数为:" + response.getHits().getTotalHits() + "条");System.out.println("高亮部分为:" + response.getHits().getHits()[0].getHighlightFields());List<HotelDoc> hotelDocList = parseRestClintResponse(response, HotelDoc.class);hotelDocList.stream().forEach(System.out :: println);}private <T> List<T> parseRestClintResponse(SearchResponse response , Class<T> clazz) {SearchHits hits = response.getHits();ArrayList<T> list = new ArrayList<>();SearchHit[] hitsArray = hits.getHits();Arrays.stream(hitsArray).forEach( item -> list.add(JSON.parseObject(item.getSourceAsString() , clazz)));return list;}
8.聚合
-
什么是聚合?
- 聚合可以实现对文档数据的统计、分析、运算
-
聚合的分类
- 桶聚合:用来对文档作分组
- TermAggregation:按照文档字段值分组
- Date Histogram:按照日期阶梯分组,例如一周为一组,或者一月为一组
- 度量聚合:用以计算一些值,比如:最大值、最小值、avg等
- Avg:平均值
- Max:最大值
- Min:最小值
- Stats:同时求Avg、Max、Min、sum等
- 管道聚合:其他聚合的结果为基础做聚合
- 桶聚合:用来对文档作分组
-
示例:
-
DSL实现Bucket聚合
GET /hotel/_search {"query": {"range": {"price": {"lte": 500}}},"size" : 0,"aggs": {"brandAgg": {"terms": {"field": "brand","size": 100,"order": {"_count": "asc"}}}} }
-
DSL实现Metrics
GET /hotel/_search {"size": 0,"aggs": {"brandAggs": {"terms": {"field": "brand","size": 30, "order": {"score_aggs.avg": "desc"}},"aggs": {"score_aggs": {"stats": {"field": "score"}}}}} }
-
9.RestClient实现聚合
/*** 聚合查询*/@Testpublic void queryAggregation() throws IOException {SearchRequest request = new SearchRequest("hotel");request.source().size(0);request.source().aggregation(AggregationBuilders.terms("brandAgg").field("brand").size(20).order(BucketOrder.aggregation("scoreStats.avg" , false)).subAggregation(AggregationBuilders.stats("scoreStats").field("score")));SearchResponse response = client.search(request, RequestOptions.DEFAULT);Aggregations aggregations = response.getAggregations();Terms brandAgg = aggregations.get("brandAgg");List<? extends Terms.Bucket> buckets = brandAgg.getBuckets();buckets.stream().forEach( item -> {System.out.print(item.getKeyAsString());Aggregations aggregations1 = item.getAggregations();List<Aggregation> aggregations2 = aggregations1.asList();Aggregation aggregation = aggregations2.get(0);// 解析出aggregation内的内容if (aggregation instanceof Stats) {Stats stats = (Stats) aggregation;double avgScore = stats.getAvg();double minScore = stats.getMin();double maxScore = stats.getMax();long count = stats.getCount();System.out.println("平均分:" + avgScore + ", 最低分:" + minScore + ", 最高分:" + maxScore + ", 总数:" + count);}});}
10.自动补全
-
自定义分词器
// 酒店数据索引库 PUT /hotel {"settings": {"analysis": {"analyzer": {"text_anlyzer": {"tokenizer": "ik_max_word","filter": "py"},"completion_analyzer": {"tokenizer": "keyword","filter": "py"}},"filter": {"py": {"type": "pinyin","keep_full_pinyin": false,"keep_joined_full_pinyin": true,"keep_original": true,"limit_first_letter_length": 16,"remove_duplicated_term": true,"none_chinese_pinyin_tokenize": false}}}},"mappings": {"properties": {"id":{"type": "keyword"},"name":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart","copy_to": "all"},"address":{"type": "keyword","index": false},"price":{"type": "integer"},"score":{"type": "integer"},"brand":{"type": "keyword","copy_to": "all"},"city":{"type": "keyword"},"starName":{"type": "keyword"},"business":{"type": "keyword","copy_to": "all"},"location":{"type": "geo_point"},"pic":{"type": "keyword","index": false},"all":{"type": "text","analyzer": "text_anlyzer","search_analyzer": "ik_smart"},"suggestion":{"type": "completion","analyzer": "completion_analyzer"}}} }
// 自动补全查询 POST /hotel/_search {"suggest": {"suggestions": {"text": "sd", "completion": {"field": "suggestion", "skip_duplicates": true, "size": 10 }}} }
11.RestClient实现自动补全
/*** 自动补全*/@Testpublic void autoComplete() throws IOException {SearchRequest request = new SearchRequest("hotel");request.source().suggest(new SuggestBuilder().addSuggestion("suggestions",SuggestBuilders.completionSuggestion("suggestion").size(10).prefix("sd")));SearchResponse response = client.search(request, RequestOptions.DEFAULT);Suggest suggest = response.getSuggest();CompletionSuggestion suggestion = suggest.getSuggestion("suggestions");List<CompletionSuggestion.Entry.Option> options = suggestion.getOptions();options.stream().forEach(item -> {System.out.println(item.getHit());});}
12.数据同步
-
数据同步的几种实现方式
- 同步调用:实现简单,粗暴、业务耦合度高
- 异步通知:低耦合,实现难度一般、依赖mp的可靠性
- 监听binlog:完全解除服务间耦合、开始binlog增加数据库负担、实现复杂度高
-
利用MQ实现mysql与elasticsearch数据同步
-
定义常量类
public class MqConstant {// 交换机public static final String HOTEL_EXCHANGE_TOPIC = "hotel.exchange.topic";// update队列public static final String HOTEL_UPDATE_QUEUE = "hotel.update";// delete队列public static final String HOTEL_DELETE_QUEUE = "hotel.delete";}
-
在消费者微服务中声明exchange、queue、RoutingKey(rabbitMQ懒加载,只有消费者在监听,不监听不会创建交换机队列等)
@Configuration public class MqConfig {// 交换机@Beanpublic TopicExchange topicExchange() {return new TopicExchange(MqConstant.HOTEL_EXCHANGE_TOPIC , true , false);}// 更新队列@Bean(name = "updateQueue")public Queue updateQueue(){return new Queue(MqConstant.HOTEL_UPDATE_QUEUE , true);}// 删除队列@Bean(name = "deleteQueue")public Queue deleteQueue(){return new Queue(MqConstant.HOTEL_DELETE_QUEUE , true);}// 绑定队列交换机@Beanpublic Binding bindingUpdateQueue(@Qualifier("updateQueue") Queue updateQueue , TopicExchange topicExchange){return BindingBuilder.bind(updateQueue).to(topicExchange).with(MqConstant.HOTEL_UPDATE_QUEUE);}// 绑定队列交换机@Beanpublic Binding bindingDeleteQueue(@Qualifier("deleteQueue") Queue deleteQueue , TopicExchange topicExchange){return BindingBuilder.bind(deleteQueue).to(topicExchange).with(MqConstant.HOTEL_DELETE_QUEUE);} }
-
生产者发布
@RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id);} }
-
消费者监听
@Component public class HotelListener {@Autowiredprivate IHotelService hotelService;// 更新监听@RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE)public void updateListen(Long id) {System.out.println("更新es文档...");hotelService.updateEs(id);}// 删除监听@RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE)public void deleteListen(Long id) {System.out.println("删除es文档...");hotelService.deleteEs(id);}}
-
13.ES集群
- 各节点的职责
- master eligible节点的作用是什么?
- 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
- data节点的作用是什么?
- 数据CRUD
- coordinator节点的作用是什么?
- 路由请求到其他节点
- 合并查询到的结果,返回用户
- master eligible节点的作用是什么?
- 分布式新增如何确定分配分片?
- coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
- 分布式查询
- 分散阶段:coordinator node将查询请求分发给不同分片
- 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
- ES集群的故障转移
- 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移
nt.HOTEL_DELETE_QUEUE);
}
}
```
-
生产者发布
@RestController @RequestMapping("hotel") @CrossOrigin("*") public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@PutMapping()public void updateById(@RequestBody Hotel hotel){if (hotel.getId() == null) {throw new InvalidParameterException("id不能为空");}hotelService.updateById(hotel);rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_UPDATE_QUEUE , hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {hotelService.removeById(id);rabbitTemplate.convertAndSend(MqConstant.HOTEL_EXCHANGE_TOPIC , MqConstant.HOTEL_DELETE_QUEUE , id);} }
-
消费者监听
@Component public class HotelListener {@Autowiredprivate IHotelService hotelService;// 更新监听@RabbitListener(queues = MqConstant.HOTEL_UPDATE_QUEUE)public void updateListen(Long id) {System.out.println("更新es文档...");hotelService.updateEs(id);}// 删除监听@RabbitListener(queues = MqConstant.HOTEL_DELETE_QUEUE)public void deleteListen(Long id) {System.out.println("删除es文档...");hotelService.deleteEs(id);}}
13.ES集群
- 各节点的职责
- master eligible节点的作用是什么?
- 参与集群选主
- 主节点可以管理集群状态、管理分片信息、处理创建和删除索引库的请求
- data节点的作用是什么?
- 数据CRUD
- coordinator节点的作用是什么?
- 路由请求到其他节点
- 合并查询到的结果,返回用户
- master eligible节点的作用是什么?
- 分布式新增如何确定分配分片?
- coordinating node根据id做hash运算,得到结果对shard数量取余,余数就是对应的分片
- 分布式查询
- 分散阶段:coordinator node将查询请求分发给不同分片
- 收集阶段:将查询结果汇总到coordinator node,整理并返回给用户
- ES集群的故障转移
- 集群的master节点会监控集群中的节点状态,如果发现有节点宕机,会立即将宕机节点的分片数据迁移到其他节点,确保数据安全,这叫故障转移