发布/订阅消息传递是许多软件体系结构的重要组成部分。 某些软件系统要求消息传递解决方案提供高性能,可伸缩性,队列持久性和持久性,故障转移支持,事务以及许多其他令人敬畏的功能,在Java世界中,大多数情况下总是导致使用JMS实现之一提供者。 在我以前的项目中,我一直积极使用Apache ActiveMQ (现在转向Apache ActiveMQ Apollo )。 尽管这是一个很好的实现,但有时我只需要简单的排队支持,而Apache ActiveMQ看上去就显得过于复杂了。
备择方案? 请欢迎Redis pub / sub! 如果您已经在使用Redis作为键/值存储,那么很少的其他配置行将立即将发布/订阅消息传递到您的应用程序。
Spring Data Redis项目很好地抽象了Redis pub / sub API,并为使用Spring功能与JMS集成的每个人提供了如此熟悉的模型。
与往常一样,让我们从POM配置文件开始。 这是相当小的,简单的,包括必要的春天的依赖, 春天Redis的数据和Jedis ,伟大的Java客户端Redis的 。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemalocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelversion>4.0.0</modelversion><groupid>com.example.spring</groupid><artifactid>redis</artifactid><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><properties><project.build.sourceencoding>UTF-8</project.build.sourceencoding><spring.version>3.1.1.RELEASE</spring.version></properties><dependencies><dependency><groupid>org.springframework.data</groupid><artifactid>spring-data-redis</artifactid><version>1.0.1.RELEASE</version></dependency><dependency><groupid>cglib</groupid><artifactid>cglib-nodep</artifactid><version>2.2</version></dependency><dependency><groupid>log4j</groupid><artifactid>log4j</artifactid><version>1.2.16</version></dependency><dependency><groupid>redis.clients</groupid><artifactid>jedis</artifactid><version>2.0.0</version><type>jar</type></dependency><dependency><groupid>org.springframework</groupid><artifactid>spring-core</artifactid><version>${spring.version}</version></dependency><dependency><groupid>org.springframework</groupid><artifactid>spring-context</artifactid><version>${spring.version}</version></dependency></dependencies><build><plugins><plugin><groupid>org.apache.maven.plugins</groupid><artifactid>maven-compiler-plugin</artifactid><version>2.3.2</version><configuration><source>1.6<target>1.6</target></configuration></plugin></plugins></build>
</project>
继续配置Spring上下文,让我们了解为了使发布者发布一些消息并让消费者使用它们而需要的内容。 了解有关JMS的相应Spring抽象将对此有很大帮助。
- 我们需要连接工厂-> JedisConnectionFactory
- 我们需要一个模板供发布者发布消息-> RedisTemplate
- 我们需要一个消息监听器供使用者使用消息-> RedisMessageListenerContainer
使用Spring Java配置,让我们描述一下上下文:
package com.example.redis.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableScheduling;import com.example.redis.IRedisPublisher;
import com.example.redis.impl.RedisMessageListener;
import com.example.redis.impl.RedisPublisherImpl;@Configuration
@EnableScheduling
public class AppConfig {@BeanJedisConnectionFactory jedisConnectionFactory() {return new JedisConnectionFactory();}@BeanRedisTemplate< String, Object > redisTemplate() {final RedisTemplate< String, Object > template = new RedisTemplate< String, Object >();template.setConnectionFactory( jedisConnectionFactory() );template.setKeySerializer( new StringRedisSerializer() );template.setHashValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );template.setValueSerializer( new GenericToStringSerializer< Object >( Object.class ) );return template;}@BeanMessageListenerAdapter messageListener() {return new MessageListenerAdapter( new RedisMessageListener() );}@BeanRedisMessageListenerContainer redisContainer() {final RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory( jedisConnectionFactory() );container.addMessageListener( messageListener(), topic() );return container;}@BeanIRedisPublisher redisPublisher() {return new RedisPublisherImpl( redisTemplate(), topic() );}@BeanChannelTopic topic() {return new ChannelTopic( 'pubsub:queue' );}
}
非常简单明了。 @EnableScheduling批注不是必需的,并且仅对于我们的发布者实现是必需的:发布者将每100毫秒发布一次字符串消息。
package com.example.redis.impl;import java.util.concurrent.atomic.AtomicLong;import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.scheduling.annotation.Scheduled;import com.example.redis.IRedisPublisher;public class RedisPublisherImpl implements IRedisPublisher {private final RedisTemplate< String, Object > template;private final ChannelTopic topic; private final AtomicLong counter = new AtomicLong( 0 );public RedisPublisherImpl( final RedisTemplate< String, Object > template, final ChannelTopic topic ) {this.template = template;this.topic = topic;}@Scheduled( fixedDelay = 100 )public void publish() {template.convertAndSend( topic.getTopic(), 'Message ' + counter.incrementAndGet() + ', ' + Thread.currentThread().getName() );}
}
最后是我们的消息侦听器实现(仅在控制台上打印消息)。
package com.example.redis.impl;import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;public class RedisMessageListener implements MessageListener {@Overridepublic void onMessage( final Message message, final byte[] pattern ) {System.out.println( 'Message received: ' + message.toString() );}
}
太棒了,只有两个小类,一个配置用于将事物连接在一起,并且我们的应用程序中具有完整的发布/订阅消息支持! 让我们以独立方式运行应用程序…
package com.example.redis;import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;import com.example.redis.config.AppConfig;public class RedisPubSubStarter {public static void main(String[] args) {new AnnotationConfigApplicationContext( AppConfig.class );}
}
…并在控制台中查看以下输出:
...
Message received: Message 1, pool-1-thread-1
Message received: Message 2, pool-1-thread-1
Message received: Message 3, pool-1-thread-1
Message received: Message 4, pool-1-thread-1
Message received: Message 5, pool-1-thread-1
Message received: Message 6, pool-1-thread-1
Message received: Message 7, pool-1-thread-1
Message received: Message 8, pool-1-thread-1
Message received: Message 9, pool-1-thread-1
Message received: Message 10, pool-1-thread-1
Message received: Message 11, pool-1-thread-1
Message received: Message 12, pool-1-thread-1
Message received: Message 13, pool-1-thread-1
Message received: Message 14, pool-1-thread-1
Message received: Message 15, pool-1-thread-1
Message received: Message 16, pool-1-thread-1
...
大! Redis pub / sub可以做很多事情, Redis官方网站上提供了出色的文档 。
参考:我们的JCG合作伙伴 Andrey Redko在Andriy Redko {devmind}博客上使用Spring从Redis发布/ 订阅 。
翻译自: https://www.javacodegeeks.com/2012/10/redis-pubsub-using-spring.html