(需求实战_终章) SpringBoot2.x 整合RabbitMQ

文章目录

        • 1. maven依赖
        • 2. MainConfig
        • 3. application.properties
        • 4. 发送字符串 生产者
        • 5. 发送对象 生产者
        • 6. 接收字符串客户端
        • 7. 接收对象客户端
        • 8.confirem 确认机制
        • 9. return确认机制
        • 10. MQ消息发送工具类封装
        • 11. 分布式id
        • 12. 时间工具类
        • 13. 对象

1. maven依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2. MainConfig

package com.gblfy.springboot.config;import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;@Configuration
@ComponentScan({"com.gblfy.springboot.*"})
public class MainConfig { }

3. application.properties

#应用端口信息
server.port=80
#RabbitMQ 连接信息
#IP地址
spring.rabbitmq.addresses=127.0.0.1
#RabbitMQ 端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=admin
#虚拟主机
spring.rabbitmq.virtual-host=/admin
#连接超时时间
spring.rabbitmq.connection-timeout=15000spring.profiles.active=dev

application-dev.properties

#服务端 RabbitMQ 配置
#消息发送至交换机消息确认模式 是否确认回调
spring.rabbitmq.publisher-confirms=true
#消息发送至交换机消息确认模式 是否确认消息返回回调
spring.rabbitmq.publisher-returns=true
#消息手工签收
spring.rabbitmq.template.mandatory=true#消费端 RabbitMQ 配置
#手动签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#指定最小的消费者数量
spring.rabbitmq.listener.simple.concurrency=5
#指定最大的消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#接收字符串类型MQ消息spring.rabbitmq.listener.str.queue.name=queue-1
spring.rabbitmq.listener.str.queue.durable=true
spring.rabbitmq.listener.str.exchange.name=exchange-1
spring.rabbitmq.listener.str.exchange.durable=true
spring.rabbitmq.listener.str.exchange.type=topic
spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.str.key=cus-str.##接收object类型MQ消息
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=cus-obj.#

YML

#----------------------------服务端(公有)配置----------------------------
spring:rabbitmq:addresses: 192.168.0.XXX #RabbitMQ服务端地址username: admin #用户名password: admin #密码port: 5672 #端口virtual-host: /admin #虚拟主机connection-timeout: 15000 #超时时间
#----------------------------生产端端配置----------------------------publisher-confirm-type: correlated #确认消息已发送至交换机,选择交换类型为交互publisher-returns: true #在消息没有被路由到指定的queue时将消息返回,而不是丢弃template:mandatory: true #是否手动签收listener:simple:acknowledge-mode: manual #手动签收concurrency: 5 #默认线程数max-concurrency: 10 #最大线程数
#----------------------------消费端配置----------------------------
#----------------------------对象类型监听----------------------------order:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-2 #交换机名称type: topic #消息类型key: cmiip-obj.# #消息路由key的路由规则queue:durable: true #是否持久化name: queue-2 #队列名称
#----------------------------字符串类型监听----------------------------str:exchange:durable: true #是否持久化ignoreDeclarationExceptions: truename: exchange-1 #交换机名称type: topic #消息类型key: cmiip-str.# #消息路由key的路由规则queue:durable: true #是否持久化name: queue-1 #队列名称

4. 发送字符串 生产者

package com.gblfy.springboot.controller;import com.gblfy.springboot.utils.MQSendMsgUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MQSendStrMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;/*** 发送字符串类型消息** @param exchangeName   交换机名称* @param queueRouterKey 路由key* @param msg            报文* @return*/@GetMapping("/mQSendStrMsg")public String mQSendStrMsg(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey,@RequestParam(value = "msg") String msg) {mqSendMsgUtils.snedStrMQMsg(exchangeName, queueRouterKey, msg);return "发送字符串消息成功!";}//测试连接:http://localhost/mQSendStrMsg?exchangeName=exchange-1&queueRouterKey=cus&msg=测试2
}

5. 发送对象 生产者

package com.gblfy.springboot.controller;import com.gblfy.springboot.entity.Order;
import com.gblfy.springboot.utils.MQSendMsgUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MQSendObjMsgController {@Autowiredprivate MQSendMsgUtils mqSendMsgUtils;@GetMapping("/mQSendObjMsg")public String mQSendStrMsg2(@RequestParam(value = "exchangeName") String exchangeName,@RequestParam(value = "queueRouterKey") String queueRouterKey) {//模拟发送order对象Order order = new Order().builder().reqXml("我是请求报文").serviceName("接口名称").resXml("我是响应报文").build();//模拟接口描述String serviceName = "TJHL";String queueDesc = "纽约理赔发送退单接口";//模拟接口类型String queueType = "WEBSERVICE";//调用MQ工具类发送消息mqSendMsgUtils.snedObjMqMsg(exchangeName, order, queueRouterKey, serviceName, queueDesc, queueType);return "发送对象消息成功!";}//测试连接:http://localhost/mQSendObjMsg?exchangeName=exchange-2&queueRouterKey=cus
}

6. 接收字符串客户端

package com.gblfy.springboot.conusmer;import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;@Component
public class CusStrQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收字符串类型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.str.queue.name}",durable = "${spring.rabbitmq.listener.str.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.str.exchange.name}",durable = "${spring.rabbitmq.listener.str.exchange.durable}",type = "${spring.rabbitmq.listener.str.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.str.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.str.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据String jsonMsg = new String(message.getBody());log.info("响应报文 mResXml: {}", jsonMsg);// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 反馈消息的消费状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//TODO 保存数据到数据库}
}

7. 接收对象客户端

package com.gblfy.springboot.conusmer;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.gblfy.springboot.entity.Order;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class CusObjQueueMsgHandler implements ChannelAwareMessageListener {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(CusObjQueueMsgHandler.class);/*** 接收对象类型MQ消息** @param message* @param channel* @throws Exception*/@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",durable = "${spring.rabbitmq.listener.order.queue.durable}"),exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",durable = "${spring.rabbitmq.listener.order.exchange.durable}",type = "${spring.rabbitmq.listener.order.exchange.type}",ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),key = "${spring.rabbitmq.listener.order.key}"))@RabbitHandler@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//TODO 接收消息成功 创建一个消费端轨迹表来存储消息的轨迹数据String jsonMsg = new String(message.getBody());// 同一时刻服务器只会发一条消息给消费者channel.basicQos(1);// 反馈消息的消费状态channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//通过 判断路由routingKey是否等于trace相同即可//fastjson解析MQ接收的json字符串 转换成RequestInfo对象JSONObject jsonObject = JSON.parseObject(jsonMsg);Order orderInfo = JSON.toJavaObject(jsonObject, Order.class);log.info("接口名称 serviceName: {}", orderInfo.getServiceName());log.info("请求报文 mReqXml: {}", orderInfo.getReqXml());log.info("响应报文 mResXml: {}", orderInfo.getResXml());MessageProperties messageProperties = message.getMessageProperties();log.info("交换机名称 : {}", messageProperties.getReceivedExchange());log.info("路由key名称 : {}", messageProperties.getReceivedRoutingKey());log.info("内容类型 : {}", messageProperties.getContentType());log.info("内容编码 : {}", messageProperties.getContentEncoding());log.info("标签 : {}", messageProperties.getDeliveryTag());// 2. 接收接口信息Map<String, Object> headers = message.getMessageProperties().getHeaders();log.info("队列唯一标识ID: {}", headers.get("QUEUE_MSG_ID"));log.info("队列名称: {}", headers.get("QUEUE_NAME"));log.info("队列类型: {}", headers.get("QUEUE_TYPE"));log.info("队列描述: {}", headers.get("QUEUE_DESC"));//TODO 保存数据到数据库}
}

8.confirem 确认机制

package com.gblfy.springboot.confirms;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.stereotype.Component;@Component("confirmCallback")
public class ConfirmCallBackListener implements RabbitTemplate.ConfirmCallback {//日志输出private final static Logger log = LoggerFactory.getLogger(ConfirmCallBackListener.class);/*** 生产者消息发送成功与失败确认机制* <p>* 1. ack* true : 标志生产者将消息发出成功* false: 标志生产者将消息发出失败* 2. ack :true 意味着消息发送成功 有2种场景*        第一种:生产者将消息成功发送到指定队列中,等待消费者消费消息*        第两种:生产者将消息发送成功,但是,由于无法路由到指定的消息*               队列,这种场景的消息,会被return机制监听到,后续进行*               补偿机制,做消息补发处理* </p>** @param correlationData 队列消息的唯一标识ID,消息做补偿机制会用到* @param ack             ack 消息是否发送成功的标识* @param cause           消息发送失败的原因*/@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息队列标识ID: {}", correlationData.getId());log.info("发送消息状态: {}", ack);//TODO 消息发送交换机成功 保存轨迹记录if (!ack) {//TODO 消息发送交换机失败 保存轨迹记录log.info("异常处理....");}}
}
/*** !ack 场景结果示例:* <p>* correlationData: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568]* ack: false* 异常处理....* 消息: CorrelationData [id=a37285dc-5dd6-4e22-8cc4-5c0fbf67b568],* nack,失败原因是:channel error;* protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'FIS-TRACE-COMMON-EXCHANGE' in vhost '/admin',* class-id=60, method-id=40)*/

9. return确认机制

package com.gblfy.springboot.returns;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component("returnCallback")
public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback {//打印日志 实时定位private final static Logger log = LoggerFactory.getLogger(ReturnCallBackListener.class);/*** 消息无法路由 触发消息 return机制* <p></p>* 1. 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除* 2. 会监听到生产者发送消息的关键信息* 3. 根据关键信息,后续进行补偿机制,做消息补发处理* </p>** @param message    消息实体* @param replyCode  应答码312* @param replyText  NO_ROUTE* @param exchange   交换机* @param routingKey 路由routingKey*/@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("DeliveryTag: {}", message.getMessageProperties().getDeliveryTag());log.info("ContentType: {}", message.getMessageProperties().getContentType());log.info("ContentEncoding: {}", message.getMessageProperties().getContentEncoding());log.info("消息发送的指定交换机: {}", exchange);log.info("队列路由的routingKey: {}", routingKey);log.info("队列的响应码replyCode: {}", replyCode);log.info("队列的响应信息: {}", replyText);//TODO 消息发送交换机成功 路由失败 保存轨迹记录}
}
/*** 场景结果示例:* return exchange: FIS-TRACE-COMMON-EXCHANGE, routingKey: fis-str.user, replyCode: 312, replyText: NO_ROUTE* correlationData: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a]* ack: true* 消息: CorrelationData [id=30d924db-77b4-41df-bbe6-9a8f0eb3fe7a],已经被ack成功*/

10. MQ消息发送工具类封装

package com.gblfy.springboot.utils;import com.fasterxml.jackson.databind.ObjectMapper;
import com.gblfy.springboot.confirms.ConfirmCallBackListener;
import com.gblfy.springboot.consts.MQPrefixConst;
import com.gblfy.springboot.entity.Order;
import com.gblfy.springboot.returns.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** MQ发送 不同类型消息 公用工具类* <p>* MQ发送消息模式采用 订阅模式(topic)中的通配符模式* order.* 区配一个词* order.# 区配一个或者多个词* <p>** @author gblfy* @date 2020-04-16*/
@Component
public class MQSendMsgUtils {private final static Logger log = LoggerFactory.getLogger(MQSendMsgUtils.class);//引入json工具类private static final ObjectMapper MAPPER = new ObjectMapper();@Autowiredprivate RabbitTemplate rabbitTemplate;//注入发送消息模板@Autowiredprivate ConfirmCallBackListener confirmCallback;@Autowiredprivate ReturnCallBackListener returnCallback;/*** 发送MQ STRING类型消息 第1种** @param exchangeName 指定交换机名称* @param type         路由routingKey* @param msg          MQ STRING类型消息*/public void snedStrMQMsg(String exchangeName, String type, String msg) {try {/*** CorrelationData 说明:* 1. correlationId 作为生产端和消息绑定消息队列全局唯一标识* 2. 当生产端发送的消息无法路由到指定的消息队列时,此种场*    景的消息会被生产端会return确认机制监听到,对消息做补*    偿机制处理*///通过雪花算法生成全局唯一ID,用于消息发送失败,后期做消息补偿处理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// Confirm 消息确认策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息确认策略rabbitTemplate.setReturnCallback(returnCallback);//发送消息到MQ的交换机,通知其他系统rabbitTemplate.convertAndSend(exchangeName, MQPrefixConst.CUS_MQ_STR_PRE + type, msg.getBytes(), correlationId);} catch (Exception e) {e.printStackTrace();}}public void snedObjMqMsg(String exchangeName, Order order, String queueRouteKey, String queueName, String queueDesc, String queueType) {try {/*** CorrelationData 说明:* 1. correlationId 作为生产端和消息绑定消息队列全局唯一标识* 2. 当生产端发送的消息无法路由到指定的消息队列时,此种场*    景的消息会被生产端会return确认机制监听到,对消息做补*    偿机制处理*/// Confirm 消息确认策略rabbitTemplate.setConfirmCallback(confirmCallback);// Return 消息确认策略rabbitTemplate.setReturnCallback(returnCallback);//1.对象处理String jsonStrObj = MAPPER.writeValueAsString(order);// 2. 通过雪花算法生成全局唯一ID,用于消息发送失败,后期做消息补偿处理CorrelationData correlationId = new CorrelationData(String.valueOf(SnowflakeIdWorker.generateId()));// MQ 添加额外参数设置  用于定位该消息属于什么接口Message message = addExtraParameters(jsonStrObj, correlationId, queueName, queueDesc, queueType);// 3.发送数据消息到指定的MQ交换机,通知其他系统rabbitTemplate.convertAndSend(exchangeName,MQPrefixConst.CUS_MQ_OBJ_PRE + queueRouteKey, message, correlationId);} catch (Exception e) {e.printStackTrace();}}/*** MQ 添加额外参数设置** @param jsonStrObj json处理前的数据对象* @param queueDesc  队列描述* @param queueType  队列类型* @return*/public Message addExtraParameters(String jsonStrObj, CorrelationData correlationId, String queueName, String queueDesc, String queueType) {MessageProperties messageProperties = new MessageProperties();//这里注意一定要修改contentType为 application/jsonmessageProperties.setContentType("application/json");messageProperties.setContentEncoding("UTF-8");messageProperties.getHeaders().put("QUEUE_NAME", queueName);messageProperties.getHeaders().put("QUEUE_DESC", queueDesc);messageProperties.getHeaders().put("QUEUE_TYPE", queueType);messageProperties.getHeaders().put("QUEUE_MSG_ID", correlationId.getId());messageProperties.getHeaders().put("SEND_DATE", MQTimeUtils.CURRENT_DATE_TIME);Message message = new Message(jsonStrObj.getBytes(), messageProperties);return message;}
}

11. 分布式id

package com.gblfy.springboot.utils;import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;import java.net.Inet4Address;
import java.net.UnknownHostException;/*** Twitter_Snowflake<br>* SnowFlake的结构如下(每部分用-分开):<br>* 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 <br>* 1位标识,由于long基本类型在Java中是带符号的,最高位是符号位,正数是0,负数是1,所以id一般是正数,最高位是0<br>* 41位时间截(毫秒级),注意,41位时间截不是存储当前时间的时间截,而是存储时间截的差值(当前时间截 - 开始时间截)* 得到的值),这里的的开始时间截,一般是我们的id生成器开始使用的时间,由我们程序来指定的(如下下面程序IdWorker类的startTime属性)。41位的时间截,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69<br>* 10位的数据机器位,可以部署在1024个节点,包括5位datacenterId和5位workerId<br>* 12位序列,毫秒内的计数,12位的计数顺序号支持每个节点每毫秒(同一机器,同一时间截)产生4096个ID序号<br>* 加起来刚好64位,为一个Long型。<br>* SnowFlake的优点是,整体上按照时间自增排序,并且整个分布式系统内不会产生ID碰撞(由数据中心ID和机器ID作区分),并且效率较高,经测试,SnowFlake每秒能够产生26万ID左右。*/
public class SnowflakeIdWorker {// ==============================Fields===========================================/*** 开始时间截 (2015-01-01)*/private final long twepoch = 1489111610226L;/*** 机器id所占的位数*/private final long workerIdBits = 5L;/*** 数据标识id所占的位数*/private final long dataCenterIdBits = 5L;/*** 支持的最大机器id,结果是31 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)*/private final long maxWorkerId = -1L ^ (-1L << workerIdBits);/*** 支持的最大数据标识id,结果是31*/private final long maxDataCenterId = -1L ^ (-1L << dataCenterIdBits);/*** 序列在id中占的位数*/private final long sequenceBits = 12L;/*** 机器ID向左移12位*/private final long workerIdShift = sequenceBits;/*** 数据标识id向左移17位(12+5)*/private final long dataCenterIdShift = sequenceBits + workerIdBits;/*** 时间截向左移22位(5+5+12)*/private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;/*** 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)*/private final long sequenceMask = -1L ^ (-1L << sequenceBits);/*** 工作机器ID(0~31)*/private long workerId;/*** 数据中心ID(0~31)*/private long dataCenterId;/*** 毫秒内序列(0~4095)*/private long sequence = 0L;/*** 上次生成ID的时间截*/private long lastTimestamp = -1L;private static SnowflakeIdWorker idWorker;static {idWorker = new SnowflakeIdWorker(getWorkId(), getDataCenterId());}//==============================Constructors=====================================/*** 构造函数** @param workerId     工作ID (0~31)* @param dataCenterId 数据中心ID (0~31)*/public SnowflakeIdWorker(long workerId, long dataCenterId) {if (workerId > maxWorkerId || workerId < 0) {throw new IllegalArgumentException(String.format("workerId can't be greater than %d or less than 0", maxWorkerId));}if (dataCenterId > maxDataCenterId || dataCenterId < 0) {throw new IllegalArgumentException(String.format("dataCenterId can't be greater than %d or less than 0", maxDataCenterId));}this.workerId = workerId;this.dataCenterId = dataCenterId;}// ==============================Methods==========================================/*** 获得下一个ID (该方法是线程安全的)** @return SnowflakeId*/public synchronized long nextId() {long timestamp = timeGen();//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常if (timestamp < lastTimestamp) {throw new RuntimeException(String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));}//如果是同一时间生成的,则进行毫秒内序列if (lastTimestamp == timestamp) {sequence = (sequence + 1) & sequenceMask;//毫秒内序列溢出if (sequence == 0) {//阻塞到下一个毫秒,获得新的时间戳timestamp = tilNextMillis(lastTimestamp);}}//时间戳改变,毫秒内序列重置else {sequence = 0L;}//上次生成ID的时间截lastTimestamp = timestamp;//移位并通过或运算拼到一起组成64位的IDreturn ((timestamp - twepoch) << timestampLeftShift)| (dataCenterId << dataCenterIdShift)| (workerId << workerIdShift)| sequence;}/*** 阻塞到下一个毫秒,直到获得新的时间戳** @param lastTimestamp 上次生成ID的时间截* @return 当前时间戳*/protected long tilNextMillis(long lastTimestamp) {long timestamp = timeGen();while (timestamp <= lastTimestamp) {timestamp = timeGen();}return timestamp;}/*** 返回以毫秒为单位的当前时间** @return 当前时间(毫秒)*/protected long timeGen() {return System.currentTimeMillis();}private static Long getWorkId() {try {String hostAddress = Inet4Address.getLocalHost().getHostAddress();int[] ints = StringUtils.toCodePoints(hostAddress);int sums = 0;for (int b : ints) {sums += b;}return (long) (sums % 32);} catch (UnknownHostException e) {// 如果获取失败,则使用随机数备用return RandomUtils.nextLong(0, 31);}}private static Long getDataCenterId() {int[] ints = StringUtils.toCodePoints(SystemUtils.getHostName());int sums = 0;for (int i : ints) {sums += i;}return (long) (sums % 32);}/*** 静态工具类** @return*/public static Long generateId() {long id = idWorker.nextId();return id;}//==============================Test=============================================/*** 测试*/public static void main(String[] args) {System.out.println(System.currentTimeMillis());long startTime = System.nanoTime();for (int i = 0; i < 50000; i++) {long id = SnowflakeIdWorker.generateId();System.out.println(id);}System.out.println((System.nanoTime() - startTime) / 1000000 + "ms");}
}

12. 时间工具类

package com.gblfy.springboot.utils;import org.springframework.stereotype.Component;import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;@Component
public class MQTimeUtils {//格式化时间  日期格式public static final DateFormat DATE_TIME_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd");// 日期格式public static final DateFormat TIME_FORMAT = new SimpleDateFormat("HH:mm:ss");// 日期格式//当前日期+时间 用于定位 消息队列服务端和和生产发送消息时间 确认什么类型的什么接口public static final String CURRENT_DATE_TIME = DATE_TIME_FORMAT.format(new Date());/*** 获取当前日期 类型Date*/public static Date getCurrentDate() {Date currentDate = null;try {currentDate = DATE_FORMAT.parse(DATE_FORMAT.format(new Date()));} catch (ParseException e) {e.printStackTrace();}return currentDate;}/*** 获取当前日期 类型String*/public static String getCurrentDateToStr() {String currentDateToStr = null;try {currentDateToStr = DATE_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentDateToStr;}/*** 获取当前时间 类型String*/public static String getCurrenTimeToStr() {String currentTimeToStr = null;try {currentTimeToStr = TIME_FORMAT.format(new Date());} catch (Exception e) {e.printStackTrace();}return currentTimeToStr;}public static void main(String[] args) {System.out.println(MQTimeUtils.getCurrentDate());System.out.println(MQTimeUtils.getCurrentDateToStr());}
}

13. 对象

package com.gblfy.springboot.entity;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Order implements Serializable {private String serviceName;private String reqXml;private String resXml;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/520332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

lnmp 修改mysql密码_Lnmp下修改mysql根密码

今天帮兄弟安装了一个新的lnmp环境&#xff0c;安装后才发现mysql的根密码不是兄弟想要的密码&#xff0c;如是在建站前把根密码快快改了。 首先登陆phpmyadmin&#xff0c;选择mysql数据库,之后选sql选项卡&#xff0c;输入如下代码到命令框&#xff0c;之后点右下角的执行按钮…

实战:基于 Spring 的应用配置如何迁移至阿里云应用配置管理 ACM

最近遇到一些开发者朋友&#xff0c;准备将原有的Java Spring的应用配置迁移到 阿里云应用配置管理 ACM 中。迁移过程中&#xff0c;遇到不少有趣的问题。本文将通过一个简单的样例来还原迁移过程中遇到的问题和相关解决思路&#xff0c;以期达到和读者交流的目的。 什么样的配…

华为豪投20亿!3年培养100万AI人才,网友不服!

近日&#xff0c;华为20亿奖励员工的新闻频频刷屏。其中20亿奖金不是面向所有的华为员工&#xff0c;20亿奖金包涉及到的是研发体系、造AI芯片和建设生态的员工。从5G开始部署以来&#xff0c;华为获得了来自全球各地运营商的订单&#xff0c;签订了40多个5G商用合同。另外华为…

mysql 5.7 hint_新特性解读 | MySQL 8.0 新增 HINT 模式

在开始演示之前&#xff0c;我们先介绍下两个概念。概念一&#xff0c;数据的可选择性基数&#xff0c;也就是常说的cardinality值。查询优化器在生成各种执行计划之前&#xff0c;得先从统计信息中取得相关数据&#xff0c;这样才能估算每步操作所涉及到的记录数&#xff0c;而…

RAM SSO功能重磅发布 —— 满足客户使用企业本地账号登录阿里云

阿里云RAM (Resource Access Management)为客户提供身份与访问控制管理服务。使用RAM&#xff0c;可以轻松创建并管理您的用户&#xff08;比如雇员、企业开发的应用程序&#xff09;&#xff0c;并控制用户对云资源的访问权限。 对云资源的信息安全保护与风险控制能力是企业成…

SpringBoot 整合Shiro Ehcache

文章目录依赖 <properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version><mybatis.sprin…

SQLServer AlwaysOn在阿里云的前世今生

缘起 早在2015年的时候&#xff0c;随着阿里云业务突飞猛进的发展&#xff0c;SQLServer业务也积累了大批忠实客户&#xff0c;其中一些体量较大的客户在类似大促的业务高峰时RDS的单机规格&#xff08;规格是按照 内存CPUIOPS 一定比例分配&#xff0c;根据底层资源不同都会有…

构建企业数字化转型协同力有多难?青云发布workly.ai誓要解决这些棘手的问题!...

戳蓝字“CSDN云计算”关注我们哦&#xff01;相信大部分人都经历过办公中的手忙脚乱与无所适从&#xff0c;每天面对无数的任务与工作本就是一项挑战&#xff0c;而在办公中面对不同终端协同工具&#xff0c;所带来的那些令人头疼的密码、来不及回复的信息与邮件、繁琐的办公流…

docker无法连接mysql镜像_关于Docker官方CentOS镜像无法启动mysqld的总结

很多童鞋反映&#xff0c;在Docker官方CentOS镜像中安装了Mysql server后&#xff0c;无法正常启动。无法正常启动表现为两种情况&#xff1a;1> 初始完数据库后&#xff0c;mysqld启动报错2> systemctl start mysqld或者service mysqld start报错首先重现一下现场。第一…

阿里云高级技术专家带你全面了解云主机性能评测

钱超&#xff0c;花名西邪&#xff0c;阿里云高级技术专家&#xff0c;超12年老阿里&#xff0c;是云主机性能领域的知名专家。 在目前的云计算测评领域&#xff0c;很多性能测评存在营销的包装&#xff0c;容易引起误导&#xff1a;比如用瞬时性能引导读者得出结论&#xff0…

RabbitMQ控制台详解

文章目录属性说明overview→Totals所有队列的阻塞情况Ready待消费的消息总数Unacked待应答的消息总数Total总数 ReadyUnacked 属性说明Publishproducter pub消息的速率Publisher confirmbroker确认pub消息的速率Deliver(manual ack)ustomer手动确认的速率Deliver( auto ack)cu…

阿里云HBase携X-Pack再进化,重新赋能轻量级大数据平台

一、八年双十一&#xff0c;造就国内最大最专业HBase技术团队 阿里巴巴集团早在2010开始研究并把HBase投入生产环境使用&#xff0c;从最初的淘宝历史交易记录&#xff0c;到蚂蚁安全风控数据存储。持续8年的投入&#xff0c;历经8年双十一锻炼。4个PMC&#xff0c;6个committ…

2018阿里云双12年终大促主会场全攻略

2018阿里云双12年终大促活动已经于12月7日正式开启&#xff0c;从已开放的活动页面来看&#xff0c;活动分为两个阶段&#xff1a; 12月7日-12月23日的拉新返现阶段和12月24日-12月28日的TOP100英雄榜PK阶段。 活动核心亮点&#xff1a; 老会员拉新可享25%返现最高2.5万奖金&a…

RabbitMQ集群原理介绍

文章目录一、RabbitMQ默认集群原理1. RabbitMQ集群元数据的同步2. 为何RabbitMQ集群仅采用元数据同步的方式3. RabbitMQ集群发送/订阅消息的基本原理4. 客户端直接连接队列所在节点5. 客户端连接的是非队列数据所在节点7. 集群节点类型磁盘节点内存节点8. 总结二、RabbitMQ镜像…

mysql统计每周每个学校新增学生数量_深入学习之mysql(四)聚合函数

聚合函数&#xff1a;COUNT统计记录的条数、SUM求和函数、AVG求平均值、MAX求最大值、MIN求最小值一、COUNT练习&#xff1a;1、统计学校一共有多少个学生&#xff1a;mysql> SELECT COUNT(*) AS Total FROM t_student;-------| Total |-------| 13 |-------1 row in set (0…

阿里云物联网平台体验(树莓派+Python篇)

阿里云物联网平台体验(树莓派Python篇) 虽然对阿里云物联网平台比较熟悉了&#xff0c;从一开始就有幸参与了飞凤平台&#xff08;Link Develop 一站式开发平台的前身&#xff09;的一些偏硬件接入的工作。但是同时也见证了阿里云物联网团队从几十人到数百人的迅速扩张&#x…

阿里云物联网边缘计算加载MQTT驱动

写在前面 本文在LinkEdge快速入门样例驱动的基础上&#xff0c;加载了MQTT订阅的客户端&#xff0c;使得边缘端容器可以通过MQTT获得外部数据。 1. 系统需求 物联网边缘计算平台&#xff0c;又名Link IoT Edge[1]。在物联网边缘计算帮助文档中的 “快速入门”描述了…

RabbitMQ镜像策略set_policy

添加vhosts rabbitmqctl add_vhost <vhost>rabbitmqctl delete_vhost <vhost>rabbitmqctl list_vhosts [<vhostinfoitem> ...]参数设置格式&#xff1a; #设置 rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <a…

IBM推出可加快响应跨云网络威胁的开放技术 业界首次实现跨安全工具和跨云的搜索威胁功能,无需移动数据

&#xff08;2019年11月20日&#xff0c;美国纽约州阿蒙克&#xff09;IBM发布了Cloud Pak for Security&#xff0c;创新性地实现了业界首次无需从原始数据源移动数据而能连接任意安全工具、云和本地部署的系统。该平台现已可用&#xff0c;包括了用于搜索威胁的开源技术&…

mysql做文本挖掘_4graphlab简单文本挖掘

爬虫Python基础、数据分析扩展包Numpy、pandas、matplotlib&#xff0c;Python读取MySQL数据&#xff0c;Python爬虫及Scrapy框架&#xff0c;无监督机器学习算法聚类分析等&#xff0c;以及案例&#xff1a;互联网金融行业客户价值分析等。机器学习机器学习是一门多领域交叉学…