文章目录
- 一、客户端部分涵盖3大核心类
- 1.1、ConnectionFactory 类
- 1.2、Connection 类
- 1.3、Channel 类
- 二、3级类结构优势
- 三、客户端实现逻辑
- 3.1、ConnectionFactory
- 3.1.1、ConnectionFactory类所含字段
- 3.1.2、ConnectionFactory类所含方法
- 3.2、Connection
- 3.2.1、Connection类所含字段
- 3.2.2、Connection类所含方法
- 3.3、Channel
- 3.3.1、Channel类所含字段
- 3.3.2、Channel类所含方法
一、客户端部分涵盖3大核心类
1.1、ConnectionFactory 类
该类称为 “连接工厂”,通过这个类来持有服务器的地址。(因为当客户端与服务器建立连接时,需要知道服务器的 ip 和 端口号)
该类的主要功能:能够创建出 TCP 连接。(即 Connection 对象)
1.2、Connection 类
1、Connection 表示一个 TCP 连接,那么该 Connection 里就会持有一个 Socket 对象,客户端通过 Socket 对象来与服务器进行网络通信。
2、Connection 还负责:读取响应、写入请求。(服务器处收到客户端的请求叫做: 读取请求,服务器返回响应给客户端叫做: 写入响应;客户端收到服务器的响应叫做: 读取响应,客户端发送请求给服务器叫做: 写入请求)
3、由于TCP的创建/销毁开销较大,因此决定进行 TCP 的复用:一个TCP连接里可以发送/接收多次请求/响应,即一个TCP连接中可以含有多个Channel子逻辑。因此 Connection 类还要管理多个 Channel 对象,每个 Channel 就是一个逻辑上的连接。
1.3、Channel 类
1、Channel 表示一个逻辑上的连接,一个 TCP 连接就是通过 channel 来进行复用的。
譬如说,一个客户端上有多个模块,这些模块都需要与服务器进行通信,那每个模块都可以拥有自己的 Channel 对象,但是这几个 Channel 对象,共用的是同一个 TCP 连接,但它们并不会互相干扰。Connection 类 和 Channel 的关系就像,一辆公车。同一辆公车上,可以搭载学生上学、工人上班、老板出门…
2、Channel 上还需要提供服务器上的一系列 核心API。
二、3级类结构优势
客户端部分采用了3级类结构:类ConnectionFactory、类Connection、类Channel
该结构优势:当客户端比较复杂时,譬如一个客户端含有多个不同的板块,这时候多个模块通过 Channel 各自与服务器进行通信,互不干扰,但是通过 Connection 复用同一个 TCP,这样的话,逻辑上解耦合了,也高内聚了,效率也提高了。
三、客户端实现逻辑
在 mqclient 包下 新建三个新类,分别是:ConnectionFactory、Connection、Channel。
3.1、ConnectionFactory
3.1.1、ConnectionFactory类所含字段
1、需持有服务器ip地址
2、需持有服务器端口号
3、(由于当前服务器还只是一个单机版,即只含有一个虚拟主机,因此此时不需要 虚拟主机名、用户名、密码 来确定访问服务器的哪个虚拟主机,但是后续项目扩展,这些字段都会用到)
3.1.2、ConnectionFactory类所含方法
1、getter、setter方法。
2、创建新的 TCP 连接的方法(newConnection(String host,int port)): 由于创建TCP连接时需要知道服务器的 ip地址 和 端口号,因此传参 host、port。
3.2、Connection
3.2.1、Connection类所含字段
1、TCP连接来进行网络通信,因此需含有 Socket 对象。
2、使用哈希表来管理 Connection 中的众多 channel。
3、Socket 对象后续需要杯基于,来进行一些读写数据的操作,读写操作就离不开 Socket 里面的流对象,因此IO流也是所需字段。
3、线程池,用来处理回调方法。
3.2.2、Connection类所含方法
1、Connection 的构造方法(携带ip地址和端口号俩参数):初始化Socket、初始化所有流对象、处理服务器返回的响应,尤其是服务器收到生产者的消息后,将消息推送给对应消费者消费的这部分响应。
1.1、设置一个扫描线程循环扫描服务器返回给客户端的响应,当连接断开时,扫描结束。
1.2、从 socket 里读到响应数据,然后对响应进行处理,但服务器返回的响应含有两种:(1)、普通响应。(2)、服务器推送给客户端(订阅者)的消息。针对不同的响应,使用方法 dispatchResponse() 实现。
2、客户端发送请求(public void writeRequest(Request request))
3、客户端读取/接收响应(public Response readResponse())
4、使用 createChannel() 方法 在Connection 中创建出 Channel
4.1、通过 UUID算法 生成唯一标识的的 channelId
4.2、构造 Channel 对象。
4.3、将新构造出的 Channel 对象加入到 Connection 类中定义用来管理 Channel 的 channelMap 中。
4.4、客户端这边的Connection类里创建了Channel对象,此时也需要告知服务器,服务器就将客户端新建的 Channel 对象加入到 服务器中定义的 sessions 哈希表中。那么我们需要在 Channel 类中创建一个createChannel()。根据createChannel()的返回值判断此次与服务器的通信是否顺利,不顺利,就把刚刚4.3步骤加入了的Channel对象从sessions表中删除。
4.5、返回channel
5、使用方法 close() 关闭 connection,释放资源。
6、期望使用方法 disPatchResponse(Response response) 来处理服务器返回的响应,到底是针对请求的响应,还是推送的消息。
6.1、首先判断响应类型是否是 服务器推送回来的消息,如果是:
6.1.1、首先将响应中的payload解析出来(反序列化)成 SubscribeReturns 类,然后根据响应中的channelId从channelMap中找到对应的 Channel 对象,判断 channel 是否存在,如果channel 不存在,说明该消息对应的channel在客户端中不存在,直接抛异常。如果channel存在,就执行channel里的回调方法,但是并不在此扫描线程中执行回调方法,因为不知道回调函数要执行多长时间,而扫描线程一边需要判断服务器返回的响应类型,一边处理回调方法的话,实在忙不过来,因此,将回调方法直接投入线程池中,让线程池中的线程执行回调方法即可。
6.2、如果响应是针对请求的响应,此时就将响应的payload解析出来,变成 BasicReturns ,根据 BasicReturns 里面的 channelId 在 channelMap里查询对应的 Channel 对象,如果 Channel 对象不存在,说明当前响应对应的客户端并不存在,直接抛异常,如果不为空,就将当前响应存入记录服务器返回响应的哈希表basicReturnsMap里,使用方法 putReturns(basicReturns)处理。
7、 期望使用方法 putReturns(basicReturns)来唤醒那些发送了请求,正在阻塞等待着服务器返回相应的响应的客户端线程。
7.1、将当前响应添加至basicReturnsMap表中。
7.2、使用 notofyAll() 唤醒阻塞等待的线程。由于不知道到底有多少个线程正在阻塞等待,因此使用 notifyAll() 全部唤醒。
3.3、Channel
3.3.1、Channel类所含字段
1、由于一个TCP复用,因此一个TCP里包含多个 Channel,此时就需要定义 Connection 对象作为成员变量之一。
2、channelId,每一个Channel的身份标识。
3、定义一个哈希表,用来存储服务器给服务器返回的响应。约定一个channel中只能有一个回调,如果队列想去订阅多个消息,就需要含有多个channel。
3.3.2、Channel类所含方法
1、构造方法,含有两个参数,channelId、Connection对象,参数1channelId表明当前channel的唯一身份标识,参数2Connection表明当前的channel对象属于哪个TCP连接里的。
2、public boolean createChannel(): 期望在这个方法里边,和服务器进行通信,告知服务器说,客户端这边创建了新的Channel,Connection 调用 Channel 里的 createChannel()来与服务器进行通信,通信完成后,服务器会返回true/false作为服务器的响应结果。
2.1、与服务器进行通信,势必需要发送请求/接收响应…因此构造一个 BasicArguments 对象,设置 BasicArguments 对象里的属性,channelId即当前的,rid通过 私有方法获取到。
2.2、构造请求对象,设置请求里的属性,payload 内容即 BasicArguments 对象序列化的结果。
2.3、使用 Connection 对象里的发送请求,将请求发给服务器。请求发出后,等待服务器处理该请求,并给我们返回一个响应结果,但是由于这些都是需要时间的,因此服务器什么时候返回响应,不知道,客户端此处就需要进行阻塞等待服务器的响应。
2.4、构造一个返回类 BasicReturns 接收服务器返回的响应。
2.5、使用方法 waitResult(basicReturns.getRid()) 来阻塞等待服务器的响应。
2.6、返回 服务器响应值。
3、public BasicReturns waitResult(basicReturns.getRid()):期待通过该方法阻塞等待获取到服务器的响应,或取的响应其 rid 必须是和 发送请求时的 rid 一致。构造一个 BasicReturns 对象,判断当前 从 basicReturnsMap 表中获取到的basicReturns 对象是否为空,为空,说明服务器未返回此响应,wait()阻塞等待,如果服务器的响应返回后,就会在 Connection类里收到,此时就会把消息投到对应channelMap中,同时调用 notify() 唤醒正在阻塞的线程。读取成功消息后,将此消息从 basicReturnsMap中删除。返回 basicReturns。
4、使用 UUID 算法生成到 rid:private String getRid()
5、关闭 channel(public boolean close()):期望该方法能够给服务器发送一个 type 为 0x2 的请求,进行关闭 channel 即可。
5.1、构造一个 BasicArguments 对象,设置其对象里的相应属性。
5.2、构造一个 Request 对象,设置 request 对象里的相应属性。
5.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
6、创建交换机(public boolean exchangeDeclare(String exchangeName,ExchangeType exchangeType,boolean durable,boolean autoDelete,Map<String , Object> arguments)):
6.1、构造一个 ExchangeDeclareArguments 对象,设置该对象里的相应属性。
6.2、构造一个 Request 对象,设置其对象里的相应属性。
6.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
7、销毁交换机(public boolean exchangeDelete(String exchangeName)):
7.1、构造一个 ExchangeDeleteArguments 对象,设置该对象里的相应属性。
7.2、构造一个 Request 对象,设置其对象里的相应属性。
7.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
8、创建队列(public boolean queueDeclare(String queueName,boolean durable,boolean autoDelete,boolean exclusive,Map<String , Object> arguments)):
8.1、构造一个 queueDeclareArguments 对象,设置该对象里的相应属性。
8.2、构造一个 Request 对象,设置其对象里的相应属性。
8 .3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
9、删除队列(public boolean queueDelete(String queueName)):
9.1、构造一个 queueDeleteArguments 对象,设置该对象里的相应属性。
9.2、构造一个 Request 对象,设置其对象里的相应属性。
9 .3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
10、创建绑定(public boolean queueBind(String exchangeName,String queueName,String bindingKey)):
10.1、构造一个 queueBindArguments 对象,设置该对象里的相应属性。
10.2、构造一个 Request 对象,设置其对象里的相应属性。
10.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
11、删除绑定(public boolean queueUnbind(String exchangeName,String queueName)):
11.1、构造一个 queueUnbindArguments 对象,设置该对象里的相应属性。
11.2、构造一个 Request 对象,设置其对象里的相应属性。
11.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
12:发布消息(public boolean basicPublish( String exchangeName,String routingKey,BasicProperties basicPropertoes,byte[] body)):
12.1、构造一个 basicPublishArguments 对象,设置该对象里的相应属性。
12.2、构造一个 Request 对象,设置其对象里的相应属性。
12.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
13、订阅消息(public boolean basicComsume(String queueName,boolean autoAck,Comsumer comsumer)):
13.1、先对当前 comsumer 进行判断,如果 comsumer != null ,说明 channel 已经设置过回调了,不再重复设置。
13.2、构造一个 basicComsumeArguments 对象,设置该对象里的相应属性。
13.3、构造一个 Request 对象,设置其对象里的相应属性。
13.4、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。
14、确认消息(public boolean basicAck(String queueName,String messageId)):
13.1、构造一个 basicAckArguments 对象,设置该对象里的相应属性。
13.2、构造一个 Request 对象,设置其对象里的相应属性。
13.3、通过 Connection 将请求发送到服务器里,同时阻塞等待服务器的响应,返回服务器响应值。