一 使用场景
在一些场景,项目已发布了一段时间了,只是需要完善或优化一些功能要用到队列,但不想改动太大(或者不想在安装第三方MQ组件框架)的情况下可以用redis实现队列 。
二 redis实现队列
1 redis有序队列
使用redis的list的数据类型轻松实现有序队列,该队列每次存储时放在左边第1个,从右边最后一个取出即先进去先出来,该队列大多数场景都适用,如果不知道使用什么队列好可以先尝试试用该队列。
/*** REDIS有序队列*/public void pushOrder(String json) {stringRedisTemplate.opsForList().leftPush(KEY, json);}/*** REDIS有序队列*/public String popOrder() {return stringRedisTemplate.opsForList().rightPop(KEY);}
假设分布式服务下的另外的springboot项目消费队列示例代码:
@Service
public class RedisOrderProcess {private static final Logger logger = LogManager.getLogger(RedisOrderProcess.class);@AutowiredRedisServiceImpl redisService;@PostConstructpublic void init() {int cpuCount = Runtime.getRuntime().availableProcessors();System.out.println("start RedisOrderProcess > cpus="+cpuCount);processOrderImport(cpuCount);}private void processOrderImport(int cpus) {ExecutorService executorService = new ThreadPoolExecutor(cpus, 1000,60L, TimeUnit.SECONDS,new LinkedBlockingQueue <Runnable>());executorService.execute(() -> {while (true) {String json=null;try{json=redisService.popOrder();if(json!=null){//订单业务处理代码}else{Thread.sleep(500);}}catch (Exception e){logger.error("err data> \n"+json,e);e.printStackTrace();}}});}}
2 redis无序队列
使用redis的hash的数据类型轻松实现无序队列,而且队列保存中的内容是唯一的,往往用到的一些场景很特殊,而且在某些场景运用比正规的队列组件还爽。
在使用springboot框架下轻松实现生产端:
/*** 推送第三方设备状态** @param id* @param status*/public void pushStatus(String id, Integer status) {if(status==null){return;}String value = id+ ":" + status;stringRedisTemplate.opsForSet().add("PUSH_STATUS", value);}
在使用springboot框架下轻松实现消费端:
@Service
public class RedisStatusProcess {private static final Logger logger = LogManager.getLogger(RedisStatusProcess.class);@AutowiredStringRedisTemplate stringRedisTemplate;@PostConstructpublic void init() {int cpuCount = Runtime.getRuntime().availableProcessors();System.out.println("start RedisStatusProcess > cpus="+cpuCount);processStatusImport(cpuCount);}private void procesStatusImport(int cpus) {ExecutorService executorService = new ThreadPoolExecutor(cpus, 1000,60L, TimeUnit.SECONDS,new LinkedBlockingQueue <Runnable>());executorService.execute(() -> {while (true) {String json=null;try{json=stringRedisTemplate.opsForSet().pop("PUSH_STATUS")if(json!=null){//状态业务处理代码}else{Thread.sleep(200);}}catch (Exception e){logger.error("err data> \n"+json,e);e.printStackTrace();}}});}}
以上代码在我们的线上场景:我们当前2000个设备,每个设备每秒上传1个状态,如果使用有序队列就要 1分钟时间就要处理2000*60个状态,如果队列有延迟还意味着从队列获取的状态不是最新的,而且没有消费时可能会撑破内存。而使用无序队列就不存在问题,因为在队列中每个设备只允许一个状态,如果未消费则直接用最新的状态覆盖,拿出的基本都是最新的,也不用一定要消费到每个队列状态。
安装与使用redis教程 -> http://t.csdnimg.cn/1Ltzm