【微服务学习笔记(二)】Docker、RabbitMQ、SpringAMQP、Elasticseach
- Docker
- 镜像和容器
- 安装
- 基础命令
- Dockerfile自定义镜像
- MQ(服务异步通讯)
- RabbitMQ
- 安装
- 使用
- 消息模型
- SpringAMQP
- 消息发送
- 消息接收
- Work Queue 工作队列
- 发布订阅
- Fanout Exchange
- Direct Exchange
- TopicExchange
- 消息转换器
- elasticsearch(分布式搜索)
- 倒排索引
- 索引库语法
- 操作文档
本篇内容为学习笔记,学习链接为SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud微服务技术栈课
课程资料链接可在视频下方找到,此处不粘贴,而以下的代码都是资料中有的,只不过做为记录单独粘贴,做为学习使用的参考步骤。
Docker
作用:解决开发部署时依赖、环境冲突的问题。
Docker如何解决依赖的兼容问题的?
- 将应用的Libs(函数库)、Deps(依赖)配置与应用一起打包。
- 将每个应用放到一个隔离容器去运行,避免互相干扰。
Docker如何解决不同系统环境的问题?
- Docker将用户程序与所需要调用的系统(比如Ubuntu)函数库一起打包。
- Docker运行到不同操作系统时,直接基于打包的库函数,借助于操作系统的Linux内核来运行。
Docker与虚拟机的比较
docker是一个系统进程,虚拟机是在操作系统中的操作系统。
特性 | Docker | 虚拟机 |
---|---|---|
性能 | 接近原生 | 性能较差 |
硬盘占用 | 一般为MB | 一般为GB |
启动 | 秒级 | 分钟级 |
镜像和容器
镜像(Image):Docker将应用程序及其所需的依赖、函数库、环境、配置等文件打包在一起,称为镜像。
**容器(Container)😗*镜像中的应用程序运行后形成的进程就是容器,只是Docker会给容器做隔离,对外不可见。
Docker和DockerHub
- DockerHub:DockerHub是一个Docker镜像的托管平台。这样的平台称为Docker Registry。
- 国内也有类似于DockerHub 的公开服务,比如网易云镜像服务、阿里云镜像库等。
架构:
Docker是一个CS架构的程序,由两部分组成:
- 服务端(server):Docker守护进程,负责处理Docker指令,管理镜像、容器等。
- 客户端(client):通过命令或RestAPI向Docker服务端发送指令。可以在本地或远程向服务端发送指令。
安装
可以参考官方文档:
Docker安装文档
还可以参考这篇博客:
Docker 安装 (完整详细版)
基础命令
可以通过 docker --help
命令查看帮助文档,学习使用。
拉取命令步骤:
1、进入docker官网搜索需要拉取的镜像
2、按照官网搜索出来的指令放入控制台
数据卷是一个虚拟目录,指向宿主机文件系统中的某个目录。
容器与数据耦合的问题:
- 不便于修改
当我们要修改Nginx的html内容时,需要进入容器内部修改,很不方便。 - 数据不可复用
在容器内的修改对外是不可见的。所有修改对新创建的容器是不可复用的。 - 升级维护困难
数据在容器内,如果要升级容器必然删除旧容器,所有数据都跟着删除了。
命令:
docker volume [选项]
选项:
- create 创建一个volume
- inspect 显示一个或多个volume的信息
- ls 列出所有的volume
- prune 删除未使用的volume
- rm 删除一个或多个指定的volume
在创建容器时,可以通过-v参数来挂载一个数据卷到某个容器目录:
Dockerfile自定义镜像
Dockerfile就是一个文本文件,其中包含一个个的指令(Instruction),用指令来说明要执行什么操作来构建镜像。每一个指令都会形成一层Layer。
搭建镜像仓库
Docker官方的Docker Registry是一个基础版本的Docker镜像仓库,具备仓库管理的完整功能,但是没有图形化界面。
搭建方式比较简单,命令如下:
docker run -d \--restart=always \--name registry \-p 5000:5000 \-v registry-data:/var/lib/registry \registry
命令中挂载了一个数据卷registry-data到容器内的/var/lib/registry 目录,这是私有镜像库存放数据的目录。
访问http://YourIp:5000/v2/_catalog 可以查看当前私有镜像服务中包含的镜像
使用DockerCompose部署带有图像界面的DockerRegistry,命令如下:
version: '3.0'
services:registry:image: registryvolumes:- ./registry-data:/var/lib/registryui:image: joxit/docker-registry-ui:staticports:- 8080:80environment:- REGISTRY_TITLE=传智教育私有仓库- REGISTRY_URL=http://registry:5000depends_on:- registry
我们的私服采用的是http协议,默认不被Docker信任,所以需要做一个配置:
# 打开要修改的文件
vi /etc/docker/daemon.json
# 添加内容:
"insecure-registries":["http://192.168.150.101:8080"]
# 重加载
systemctl daemon-reload
# 重启docker
systemctl restart docker
MQ(服务异步通讯)
MQ(MessageQueue),消息队列,事件驱动架构中的Broker。
同步调用:
调用方需要等待执行方的调用结果。(就像打电话一样,需要实时响应)
优点:时效性高
缺点:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
异步调用:
调用方无需等待执行方的执行结果 (就像发微信,不需要马上回复)。
常见实现为事件驱动模式:
优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
异步通信的缺点: - 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
RabbitMQ
安装
1、单机部署
在Centos7虚拟机中使用Docker来安装。
下载镜像:
docker pull rabbitmq:3-management
执行下面的命令来运行MQ容器:
docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management
2、集群部署
在RabbitMQ的官方文档中,讲述了两种集群的配置方式:
- 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
- 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
先来看普通模式集群。
首先,我们需要让3台MQ互相知道对方的存在。
分别在3台机器中,设置 /etc/hosts文件,添加如下内容:
192.168.150.101 mq1
192.168.150.102 mq2
192.168.150.103 mq3
并在每台机器上测试,是否可以ping通对方。
使用
开启后在浏览器中输入虚拟机的地址,以此进入RabbitMQ页面。
RabbitMQ页面介绍:
消息模型
官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
简单队列模型
public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.101");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("itcast");factory.setPassword("123321");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
SpringAMQP
AMQP:Advanced Message Queuing Protocol,是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
SpringAMQP:Spring AMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象spring-rabbit是底层的默认实现。
SpringAMQP特征:
- 侦听器容器,用于异步处理入站消息
- 用于发送和接收消息的RabbitTemplate
- RabbitAdmin用于自动声明队列,交换和绑定
消息发送
1、引入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2、在publisher服务中配置
spring:rabbitmq:host: 192.168.150.101 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /
3、测试方法:
@RunWith(SpringRunner.class)
@SpringBootTest
//单元测试使用该两个注解
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}
消息接收
1、在consumer中
spring:rabbitmq:host: 192.168.150.101 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1
2、监听方法
@Component
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}
}
Work Queue 工作队列
作用:提高消息处理的效率,避免消息堆积。
消息预取:预先取出队列中的消息。
消息预取限制:通过prefetch控制预期消息的上限,使得处理更快的消费者能取得更多消息,慢的消费者取得少的消息。
spring:rabbitmq:listener:simple:prefetch: 1 #每次只能获取一条消息,处理完成才能获取下一个消息
发布订阅
以上两个模型都针对一个消息发布一个消费者,哪怕是队列模型,也是对消息进行分配,不可能一个消息获得多次。
加入exchange(交换机),发布者不需要直接面对队列,交换机只负责发送,不负责存储。
常见exchange类型包括:
- Fanout:广播
- Direct:路由
- Topic:话题
Fanout Exchange
和以上类型不同的点在于,会将接收到的消息路由到每一个跟其绑定的queue。
1、声明队列、交换机,并将其绑定
@Configuration
public class FanoutConfig {// itcast.fanout@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}// fanout.queue1@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}// 绑定队列1到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}// fanout.queue2@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}// 绑定队列2到交换机@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}@Beanpublic Queue objectQueue(){return new Queue("object.queue");}
}
2、查看
3、发送测试SpringAmqpTest中
@Testpublic void testSendFanoutExchange() {// 交换机名称String exchangeName = "itcast.fanout";// 消息String message = "hello, every one!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}
Direct Exchange
会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
1、SpringRabbitListener中声明监听方法,分别监听两个队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");}
2、SpringAmqpTest类发送测试
@Testpublic void testSendDirectExchange() {// 交换机名称String exchangeName = "itcast.direct";// 消息String message = "hello, red!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);}
TopicExchange
与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.
分割。
1、SpringRabbitListener监听
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");}
2、SpringAmqpTest中测试
@Testpublic void testSendTopicExchange() {// 交换机名称String exchangeName = "itcast.topic";// 消息String message = "今天天气不错,我的心情好极了!";// 发送消息rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);}
消息转换器
RabbitMQ只支持字节注入,而SpringAMQP允许发对象,采用Java中jdk的序列化,将其转化。
Spring的对消息对象的处理是由org.springframework,amqp.suppor.converter.Messageconverter来处理的。默认实现是SimpleMessageConverter,基于JDK的ObjectOutputStream完成序列化。
如果要修改只需要定义一个MessageConverter 类型的Bean即可。
推荐用JSON方式序列化,步骤如下:
1、引入依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>
2、PublisherApplication中:
@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
3、发送转换为JSON完成,接下来是接收:
SpringRabbitListener:
@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> msg){System.out.println("接收到object.queue的消息:" + msg);}
ConsumerApplication:
@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
elasticsearch(分布式搜索)
elasticsearch是强大的开源搜索引擎,广泛应用于日志数据分析、实时监控等领域。
Lucene是一个]ava语言的搜索引擎类库,是Apache公司的顶级项目。
Lucene的优势:
- 易扩展
- 高性能(基于倒排索引)
Lucene的缺点:
- 只限于Java语言开发
- 学习曲线陡峭不支持水平扩展
倒排索引
与MySQL对比:
- Mysql:擅长事务类型操作,可以确保数据的安全和一致性
- Elasticsearch:擅长海量数据的搜索、分析、计算
索引库语法
查看索引库语法
GET /索引库名
示例:
GET /heima
删除索引库的语法
DELETE /索引库名
示例:
DELETE /heima
索引库和mapping一旦创建无法修改,但是可以添加新的字段,语法如下:
操作文档
新增文档
查看文档语法
GET /索引库名/-doc/文档id
示例:
GET /heima/_doc/1
删除文档的语法:
DELETE /索引库名/-doc/文档id
示例:
DELETE /heima/ doc/1
修改文档