文章目录
- 1. maven依赖
- 2. MainConfig
- 3. application.properties
- 4. 发送字符串 生产者
- 5. 发送对象 生产者
- 6. 接收字符串客户端
- 7. 接收对象客户端
- 8.confirem 确认机制
- 9. return确认机制
- 10. MQ消息发送工具类封装
- 11. 分布式id
- 12. 时间工具类
- 13. 对象
1. maven依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2. MainConfig
package com.gblfy.springboot.config;import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;@Configuration
@ComponentScan({"com.gblfy.springboot.*"})
public class MainConfig { }
3. application.properties
#应用端口信息
server.port=80
#RabbitMQ 连接信息
#IP地址
spring.rabbitmq.addresses=127.0.0.1
#RabbitMQ 端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=admin
#虚拟主机
spring.rabbitmq.virtual-host=/admin
#连接超时时间
spring.rabbitmq.connection-timeout=15000spring.profiles.active=dev
application-dev.properties
#服务端 RabbitMQ 配置
#消息发送至交换机消息确认模式 是否确认回调
spring.rabbitmq.publisher-confirms=true
#消息发送至交换机消息确认模式 是否确认消息返回回调
spring.rabbitmq.publisher-returns=true
#消息手工签收
spring.rabbitmq.template.mandatory=true#消费端 RabbitMQ 配置
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#指定最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=5
#指定最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#接收字符串类型MQ消息spring.rabbitmq.listener.str.queue.name=queue-1
spring.rabbitmq.listener.str.queue.durable=true
spring.rabbitmq.listener.str.exchange.name=exchange-1
spring.rabbitmq.listener.str.exchange.durable=true
spring.rabbitmq.listener.str.exchange.type=topic
spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.str.key=cus-str.##接收object类型MQ消息
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=cus-obj.#
YML
#----------------------------服务端(公有)配置----------------------------
spring:rabbitmq:addresses: 192.168.0.XXX #RabbitMQ服务端地址username: admin #用户名password: admin #密码port: 5672 #端口virtual-host: /admin #虚拟主机connection-timeout: 15000 #超时时间
#----------------------------生产端端配置----------------------------publisher-confirm-type: correlated #确认消息已发送至交换机,选择交换类型为交互publisher-returns: true #在消息没有被路由到指定的queue时将消息返回,而不是丢弃template:mandatory: true #是否手动签收listener:simple:acknowledge-mode: manual #手动签收concurrency: 5 #默认线程数max-concurrency: 10 #最大线程数
#----------------------------消费端配置----------------------------
#----------------------------对象类型监听----------------------------order:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-2 #交换机名称type: topic #消息类型key: cmiip-obj.# #消息路由key的路由规则queue:durable: true #是否持久化name: queue-2 #队列名称
#----------------------------字符串类型监听----------------------------str:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-1 #交换机名称type: topic #消息类型key: cmiip-str.# #消息路由key的路由规则queue:durable: true #是否持久化name: queue-1 #队列名称
4. 发送字符串 生产者
package com.gblfy.springboot.controller;import com.gblfy.springboot.utils.MQSendMsgUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MQSendStrMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 发送字符串类型消息** @param exchangeName 交换机名称* @param queueRouterKey 路由key* @param msg 报文* @return*/@GetMapping("/mQSendStrMsg")public String mQSendStrMsg(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey,@RequestParam(value = "msg") String msg) {mqSendMsgUtils.snedStrMQMsg(exchangeName, queueRouterKey, msg);return "发送字符串消息成功!";}//测试连接:http://localhost/mQSendStrMsg?exchangeName=exchange-1&queueRouterKey=cus&msg=测试2
}
5. 发送对象 生产者
package com.gblfy.springboot.controller;import com.gblfy.springboot.entity.Order;
import com.gblfy.springboot.utils.MQSendMsgUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MQSendObjMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;@GetMapping("/mQSendObjMsg")public String mQSendStrMsg2(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey) {//模拟发送order对象Order order = new Order().builder().reqXml("我是请求报文").serviceName("接口名称").resXml("我是响应报文").build();//模拟接口描述String serviceName = "TJHL";String queueDesc = "纽约理赔发送退单接口";//模拟接口类型String queueType = "WEBSERVICE";//调用MQ工具类发送消息mqSendMsgUtils.snedObjMqMsg(exchangeName, order, queueRouterKey, serviceName, queueDesc, queueType);return "发送对象消息成功!";}//测试连接:http://localhost/mQSendObjMsg?exchangeName=exchange-2&queueRouterKey=cus
}
6. 接收字符串客户端
package com.gblfy.springboot.conusmer;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;@Component
public class CusStrQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收字符串类型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.str.queue.name}",durable = "${spring.rabbitmq.listener.str.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.str.exchange.name}",durable = "${spring.rabbitmq.listener.str.exchange.durable}",type = "${spring.rabbitmq.listener.str.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.str.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据String jsonMsg = new String(message.getBody());log.info("响应报文 mResXml: {}", jsonMsg);// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 反馈消息的消费状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//TODO 保存数据到数据库}
}
7. 接收对象客户端
package com.gblfy.springboot.conusmer;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.springboot.entity.Order;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class CusObjQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收对象类型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",durable = "${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",durable = "${spring.rabbitmq.listener.order.exchange.durable}",type = "${spring.rabbitmq.listener.order.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据String jsonMsg = new String(message.getBody());// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 反馈消息的消费状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//通过 判断路由routingKey是否等于trace相同即可//fastjson解析MQ接收的json字符串 转换成RequestInfo对象JSONObject jsonObject = JSON.parseObject(jsonMsg);Order orderInfo = JSON.toJavaObject(jsonObject, Order.class);log.info("接口名称 serviceName: {}", orderInfo.getServiceName());log.info("请求报文 mReqXml: {}", orderInfo.getReqXml());log.info("响应报文 mResXml: {}", orderInfo.getResXml());MessageProperties messageProperties = message.getMessageProperties();log.info("交换机名称 : {}", messageProperties.getReceivedExchange());log.info("路由key名称 : {}", messageProperties.getReceivedRoutingKey());log.info("内容类型 : {}", messageProperties.getContentType());log.info("内容编码 : {}", messageProperties.getContentEncoding());log.info("标签 : {}", messageProperties.getDeliveryTag());// 2. 接收接口信息Map<String, Object> headers = message.getMessageProperties().getHeaders();log.info("队列唯一标识ID: {}", headers.get("QUEUE_MSG_ID"));log.info("队列名称: {}", headers.get("QUEUE_NAME"));log.info("队列类型: {}", headers.get("QUEUE_TYPE"));log.info("队列描述: {}", headers.get("QUEUE_DESC"));//TODO 保存数据到数据库}
}
8.confirem 确认机制
package com.gblfy.springboot.confirms;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;@Component("confirmCallback")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {//日志输出private final static Logger log = LoggerFactory.getLogger(ConfirmCallBackListener.class);/*** 生产者消息发送成功与失败确认机制* <p>* 1. ack* true : 标志生产者将消息发出成功* false: 标志生产者将消息发出失败* 2. ack :true 意味着消息发送成功 有2种场景* 第一种:生产者将消息成功发送到指定队列中,等待消费者消费消息* 第两种:生产者将消息发送成功,但是,由于无法路由到指定的消息* 队列,这种场景的消息,会被return机制监听到,后续进行* 补偿机制,做消息补发处理* </p>** @param correlationData 队列消息的唯一标识ID,消息做补偿机制会用到* @param ack ack 消息是否发送成功的标识* @param cause 消息发送失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息队列标识ID: {}", correlationData.getId());log.info("发送消息状态: {}", ack);//TODO 消息发送交换机成功 保存轨迹记录if (!ack) {//TODO 消息发送交换机失败 保存轨迹记录log.info("异常处理....");}}
}
/*** !ack 场景结果示例:* <p>* correlationData: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568]* ack: false* 异常处理....* 消息: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568],* nack,失败原因是:channel error;* protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'FIS-TRACE-COMMON-EXCHANGE' in vhost '/admin',* class-id=60, method-id=40)*/
9. return确认机制
package com.gblfy.springboot.returns;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component("returnCallback")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);/*** 消息无法路由 触发消息 return机制* <p></p>* 1. 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除* 2. 会监听到生产者发送消息的关键信息* 3. 根据关键信息,后续进行补偿机制,做消息补发处理* </p>** @param message 消息实体* @param replyCode 应答码312* @param replyText NO_ROUTE* @param exchange 交换机* @param routingKey 路由routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());log.info("ContentType: {}", message.getMessageProperties().getContentType());log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());log.info("消息发送的指定交换机: {}", exchange);log.info("队列路由的routingKey: {}", routingKey);log.info("队列的响应码replyCode: {}", replyCode);log.info("队列的响应信息: {}", replyText);//TODO 消息发送交换机成功 路由失败 保存轨迹记录}
}
/*** 场景结果示例:* return exchange: FIS-TRACE-COMMON-EXCHANGE, routingKey: fis-str.user, replyCode: 312, replyText: NO_ROUTE* correlationData: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a]* ack: true* 消息: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a],已经被ack成功*/
10. MQ消息发送工具类封装
package com.gblfy.springboot.utils;import com.fasterxml.jackson.databind.ObjectMapper;
import com.gblfy.springboot.confirms.ConfirmCallBackListener;
import com.gblfy.springboot.consts.MQPrefixConst;
import com.gblfy.springboot.entity.Order;
import com.gblfy.springboot.returns.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** MQ发送 不同类型消息 公用工具类* <p>* MQ发送消息模式采用 订阅模式(topic)中的通配符模式* order.* 区配一个词* order.# 区配一个或者多个词* <p>** @author gblfy* @date 2020-04-16*/
@Component
public class MQSendMsgUtils {private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);//引入json工具类private static final ObjectMapper MAPPER = new ObjectMapper();@Autowiredprivate RabbitTemplate rabbitTemplate;//注入发送消息模板@Autowiredprivate ConfirmCallBackListener confirmCallback;@Autowiredprivate ReturnCallBackListener returnCallback;/*** 发送MQ STRING类型消息 第1种** @param exchangeName 指定交换机名称* @param type 路由routingKey* @param msg MQ STRING类型消息*/public void snedStrMQMsg(String exchangeName, String type, String msg) {try {/*** CorrelationData 说明:* 1. correlationId 作为生产端和消息绑定消息队列全局唯一标识* 2. 当生产端发送的消息无法路由到指定的消息队列时,此种场* 景的消息会被生产端会return确认机制监听到,对消息做补* 偿机制处理*///通过雪花算法生成全局唯一ID,用于消息发送失败,后期做消息补偿处理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// Confirm 消息确认策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息确认策略rabbitTemplate.setReturnCallback(returnCallback);//发送消息到MQ的交换机,通知其他系统rabbitTemplate.convertAndSend(exchangeName, MQPrefixConst.CUS_MQ_STR_PRE + type, msg.getBytes(), correlationId);} catch (Exception e) {e.printStackTrace();}}public void snedObjMqMsg(String exchangeName, Order order, String queueRouteKey, String queueName, String queueDesc, String queueType) {try {/*** CorrelationData 说明:* 1. correlationId 作为生产端和消息绑定消息队列全局唯一标识* 2. 当生产端发送的消息无法路由到指定的消息队列时,此种场* 景的消息会被生产端会return确认机制监听到,对消息做补* 偿机制处理*/// Confirm 消息确认策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息确认策略rabbitTemplate.setReturnCallback(returnCallback);//1.对象处理String jsonStrObj = MAPPER.writeValueAsString(order);// 2. 通过雪花算法生成全局唯一ID,用于消息发送失败,后期做消息补偿处理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// MQ 添加额外参数设置 用于定位该消息属于什么接口Message message = addExtraParameters(jsonStrObj, correlationId, queueName, queueDesc, queueType);// 3.发送数据消息到指定的MQ交换机,通知其他系统rabbitTemplate.convertAndSend(exchangeName,MQPrefixConst.CUS_MQ_OBJ_PRE + queueRouteKey, message, correlationId);} catch (Exception e) {e.printStackTrace();}}/*** MQ 添加额外参数设置** @param jsonStrObj json处理前的数据对象* @param queueDesc 队列描述* @param queueType 队列类型* @return*/public Message addExtraParameters(String jsonStrObj, CorrelationData correlationId, String queueName, String queueDesc, String queueType) {MessageProperties messageProperties = new MessageProperties();//这里注意一定要修改contentType为 application/jsonmessageProperties.setContentType("application/json");messageProperties.setContentEncoding("UTF-8");messageProperties.getHeaders().put("QUEUE_NAME", queueName);messageProperties.getHeaders().put("QUEUE_DESC", queueDesc);messageProperties.getHeaders().put("QUEUE_TYPE", queueType);messageProperties.getHeaders().put("QUEUE_MSG_ID", correlationId.getId());messageProperties.getHeaders().put("SEND_DATE", MQTimeUtils.CURRENT_DATE_TIME);Message message = new Message(jsonStrObj.getBytes(), messageProperties);return message;}
}
11. 分布式id
package com.gblfy.springboot.utils;import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;import java.net.Inet4Address;
import java.net.UnknownHostException;/*** Twitter_Snowflake<br>* SnowFlake的结构如下(每部分用-分开):<br>* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>* 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>* 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)* 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>* 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>* 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>* 加起来刚好64位,为一个Long型。<br>* SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。*/
public class SnowflakeIdWorker {// ==============================Fields===========================================/*** 开始时间截 (2015-01-01)*/private final long twepoch = 1489111610226L;/*** 机器id所占的位数*/private final long workerIdBits = 5L;/*** 数据标识id所占的位数*/private final long dataCenterIdBits = 5L;/*** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)*/private final long maxWorkerId = -1L ^ (-1L << workerIdBits);/*** 支持的最大数据标识id,结果是31*/private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);/*** 序列在id中占的位数*/private final long sequenceBits = 12L;/*** 机器ID向左移12位*/private final long workerIdShift = sequenceBits;/*** 数据标识id向左移17位(12+5)*/private final long dataCenterIdShift = sequenceBits + workerIdBits;/*** 时间截向左移22位(5+5+12)*/private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;/*** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)*/private final long sequenceMask = -1L ^ (-1L << sequenceBits);/*** 工作机器ID(0~31)*/private long workerId;/*** 数据中心ID(0~31)*/private long dataCenterId;/*** 毫秒内序列(0~4095)*/private long sequence = 0L;/*** 上次生成ID的时间截*/private long lastTimestamp = -1L;private static SnowflakeIdWorker idWorker;static {idWorker = new SnowflakeIdWorker(getWorkId(), getDataCenterId());}//==============================Constructors=====================================/*** 构造函数** @param workerId 工作ID (0~31)* @param dataCenterId 数据中心ID (0~31)*/public SnowflakeIdWorker(long workerId, long dataCenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));}if (dataCenterId > maxDataCenterId || dataCenterId < 0) {throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));}this.workerId = workerId;this.dataCenterId = dataCenterId;}// ==============================Methods==========================================/*** 获得下一个ID (该方法是线程安全的)** @return SnowflakeId*/public synchronized long nextId() {long timestamp = timeGen();//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards. Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}//如果是同一时间生成的,则进行毫秒内序列if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;//毫秒内序列溢出if (sequence == 0) {//阻塞到下一个毫秒,获得新的时间戳timestamp = tilNextMillis(lastTimestamp);}}//时间戳改变,毫秒内序列重置else {sequence = 0L;}//上次生成ID的时间截lastTimestamp = timestamp;//移位并通过或运算拼到一起组成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift)| (dataCenterId << dataCenterIdShift)| (workerId << workerIdShift)| sequence;}/*** 阻塞到下一个毫秒,直到获得新的时间戳** @param lastTimestamp 上次生成ID的时间截* @return 当前时间戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 返回以毫秒为单位的当前时间** @return 当前时间(毫秒)*/protected long timeGen() {return System.currentTimeMillis();}private static Long getWorkId() {try {String hostAddress = Inet4Address.getLocalHost().getHostAddress();int[] ints = StringUtils.toCodePoints(hostAddress);int sums = 0;for (int b : ints) {sums += b;}return (long) (sums % 32);} catch (UnknownHostException e) {// 如果获取失败,则使用随机数备用return RandomUtils.nextLong(0, 31);}}private static Long getDataCenterId() {int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName());int sums = 0;for (int i : ints) {sums += i;}return (long) (sums % 32);}/*** 静态工具类** @return*/public static Long generateId() {long id = idWorker.nextId();return id;}//==============================Test=============================================/*** 测试*/public static void main(String[] args) {System.out.println(System.currentTimeMillis());long startTime = System.nanoTime();for (int i = 0; i < 50000; i++) {long id = SnowflakeIdWorker.generateId();System.out.println(id);}System.out.println((System.nanoTime() - startTime) / 1000000 + "ms");}
}
12. 时间工具类
package com.gblfy.springboot.utils;import org.springframework.stereotype.Component;import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class MQTimeUtils {//格式化时间 日期格式public static final DateFormat DATE_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");// 日期格式public static final DateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm:ss");// 日期格式//当前日期+时间 用于定位 消息队列服务端和和生产发送消息时间 确认什么类型的什么接口public static final String CURRENT_DATE_TIME = DATE_TIME_FORMAT.format(new Date());/*** 获取当前日期 类型Date*/public static Date getCurrentDate() {Date currentDate = null;try {currentDate = DATE_FORMAT.parse(DATE_FORMAT.format(new Date()));} catch (ParseException e) {e.printStackTrace();}return currentDate;}/*** 获取当前日期 类型String*/public static String getCurrentDateToStr() {String currentDateToStr = null;try {currentDateToStr = DATE_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentDateToStr;}/*** 获取当前时间 类型String*/public static String getCurrenTimeToStr() {String currentTimeToStr = null;try {currentTimeToStr = TIME_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentTimeToStr;}public static void main(String[] args) {System.out.println(MQTimeUtils.getCurrentDate());System.out.println(MQTimeUtils.getCurrentDateToStr());}
}
13. 对象
package com.gblfy.springboot.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order implements Serializable {private String serviceName;private String reqXml;private String resXml;
}