涵盖从application.yml
配置,通过MQ订阅消息,将消息存放到Redis,最后通过HTTP接口提供消息查询的整个流程,我们将分步进行。
Step 1: application.yml
配置
spring:profiles:active: devredis:host: localhostport: 6379database: 0jedis:pool:max-active: 10max-idle: 5min-idle: 0max-wait: -1msmq:subscribe:enable: truetopics:- topic1- topic2tag: "*"consumerGroup: "yourConsumerGroup"
这个配置文件定义了Redis的连接信息和MQ订阅的相关配置,包括是否启动时自动订阅MQ和订阅的主题列表。
Step 2: 消息存储服务(MessageStorage)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;@Service
public class MessageStorage {private final StringRedisTemplate redisTemplate;@Autowiredpublic MessageStorage(StringRedisTemplate redisTemplate) {this.redisTemplate = redisTemplate;}public void storeMessage(String topic, String messageId, String message) {redisTemplate.opsForHash().put("mq:topic:" + topic, messageId, message);}public String getMessageByTopicAndId(String topic, String messageId) {Object message = redisTemplate.opsForHash().get("mq:topic:" + topic, messageId);return message != null ? message.toString() : null;}public Map<Object, Object> getAllMessagesByTopic(String topic) {return redisTemplate.opsForHash().entries("mq:topic:" + topic);}
}
Step 3: ConsumerManager
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Service;@Service
public class ConsumerManager implements CommandLineRunner {@Value("${mq.subscribe.enable:false}")private boolean autoSubscribe;@Value("${mq.subscribe.topics}")private List<String> topics;@Value("${mq.subscribe.tag}")private String tag;@Value("${mq.subscribe.consumerGroup}")private String consumerGroup;@Autowiredprivate MessageStorage messageStorage;// 示例方法,需根据实际MQ客户端进行实现public void startConsumer(String topic, String tag, String consumerGroup) {// 假设的MQ订阅逻辑// 省略实际的MQ订阅代码// 假设收到消息后:String messageId = "uniqueMessageId"; // 假设的消息IDString message = "Example Message"; // 假设的消息内容messageStorage.storeMessage(topic, messageId, message);}@Overridepublic void run(String... args) {if (autoSubscribe) {topics.forEach(topic -> {try {startConsumer(topic, tag, consumerGroup);} catch (Exception e) {e.printStackTrace();}});}}
}
Step 4: ConsumerController(HTTP接口)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;import java.util.Map;@RestController
@RequestMapping("/api/messages")
public class ConsumerController {@Autowiredprivate MessageStorage messageStorage;@GetMapping("/get")public ResponseEntity<?> getMessage(@RequestParam String topic, @RequestParam(required = false) String messageId) {if (messageId != null) {String message = messageStorage.getMessageByTopicAndId(topic, messageId);return message != null ? ResponseEntity.ok(message) : ResponseEntity.notFound().build();} else {Map<Object, Object> messages = messageStorage.getAllMessagesByTopic(topic);return ResponseEntity.ok(messages);}}
}
说明
- MQ订阅逻辑:
ConsumerManager
中的startConsumer
方法需要根据你使用的MQ客户端库具体实现,这里仅提供了一个概念性的示例。 - 消息存储与检索:
MessageStorage
类使用Redis的Hash数据结构
来存储和检索消息。每个topic
对应一个Hash,其中每个messageId
是键,消息内容是值。
- HTTP接口:
ConsumerController
提供了一个/api/messages/get
接口,允许根据topic
和可选的messageId
查询消息。
通过上述步骤,我们完成了一个简单的系统,它能够在项目启动时自动订阅配置中指定的MQ主题,并将接收到的消息存储在Redis中。同时,我们也提供了一个HTTP接口来查询存储的消息。这个示例提供了一个基本框架,你可以根据实际的需求和MQ客户端库进行调整和扩展。