java memcache 队列_基于memcache的java分布式队列实现。

主要有两个类,一个队列类和一个job的抽象类。

保证队列类中的key的唯一性,就可以用spring配置多个实例。水平有限,欢迎吐槽。

上代码:

1、队列类

import net.spy.memcached.MemcachedClient;

import net.spy.memcached.internal.OperationFuture;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.springframework.beans.BeansException;

import org.springframework.beans.factory.DisposableBean;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.context.ApplicationContext;

import org.springframework.context.ApplicationContextAware;

import com.izx.services.common.Constant;

/**

*

* @ClassName: MemCacheQueue

* @Description: 基于memcache的消息队列的实现

* @author hai.zhu

* @date 2016-3-31 下午3:29:00

*

*/

public class MemCacheQueue implements InitializingBean, DisposableBean,ApplicationContextAware {

private static final Log log = LogFactory.getLog(MemCacheQueue.class);

/**

* 队列名

*/

private String key;

/**

* 队列锁失效分钟

*/

private Integer lockExpireMinite = 3;

private MemcachedClient memcachedClient;

private ApplicationContext applicationContext;

ListenerThread listenerThread = new ListenerThread();

public void setKey(String key) {

this.key = key;

}

public void setMemcachedClient(MemcachedClient memcachedClient) {

this.memcachedClient = memcachedClient;

}

@Override

public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {

this.applicationContext = applicationContext;

}

@Override

public void destroy() throws Exception {

try {

this.sign = false;

listenerThread.interrupt();

} catch (Exception e) {

log.error(e);

}

}

@Override

public void afterPropertiesSet() throws Exception {

//初始化队列,用add防止重启覆盖

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 0, "0");

memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 0, "0");

//设置任务线程

listenerThread.setDaemon(true);

listenerThread.start();

}

/**

*

* @Title: push

* @Description: 唯一对外方法,放入要执行的任务

* @param @param value

* @param @throws Exception    设定文件

* @return void    返回类型

* @throws

*/

public synchronized void push(MemCacheQueueJobAdaptor value) throws Exception {

//分布加锁

queuelock();

//放入队列

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 1);

Object keyorder = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);

memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorder, 0, value);

//分布解锁

queueUnLock();

}

/**

*

* @Title: pop

* @Description: 取出要执行的任务

* @param @return

* @param @throws Exception    设定文件

* @return MemCacheQueueJobAdaptor    返回类型

* @throws

*/

private synchronized MemCacheQueueJobAdaptor pop() throws Exception {

Object keyorderstart = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key);

Object keyorderend = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);

if(keyorderstart.equals(keyorderend)){

return null;

}

MemCacheQueueJobAdaptor adaptor = (MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);

memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 1);

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);

return adaptor;

}

/**

*

* @Title: queuelock

* @Description: 加锁

* @param @throws InterruptedException    设定文件

* @return void    返回类型

* @throws

*/

private void queuelock() throws Exception {

do {

OperationFuture sign = memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key, lockExpireMinite * 60, key);

if(sign.get()){

return;

} else {

log.debug("key: " + key + " locked by another business");

}

Thread.sleep(300);

} while (true);

}

/**

*

* @Title: queueUnLock

* @Description: 解锁

* @param     设定文件

* @return void    返回类型

* @throws

*/

private void queueUnLock() {

memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key);

}

private boolean sign = true;

private long THREAD_SLEEP = 10;

class ListenerThread extends Thread {

@Override

public void run(){

log.error("队列["+key+"]开始执行");

while(sign){

try {

Thread.sleep(THREAD_SLEEP);

dojob();

} catch (Exception e) {

log.error(e);

}

}

}

private void dojob(){

try{

queuelock();

MemCacheQueueJobAdaptor adaptor = pop();

//逐个执行

if(adaptor != null){

THREAD_SLEEP = 10;

try {

adaptor.setApplicationContext(applicationContext);

adaptor.onMessage();

} catch (Exception e) {

log.error(e);

}

}else{

THREAD_SLEEP = 5000;

}

}catch(Exception e){

log.error(e);

}finally{

queueUnLock();

}

}

}

}[/code]

2、job抽象类

import org.springframework.context.ApplicationContext;

import java.io.Serializable;

/**

*

* @ClassName: MemCacheQueueJobAdaptor

* @Description: 基于memcache队列的任务适配器

* @author hai.zhu

* @date 2015-12-11 上午11:48:26

* @param 

*/

public abstract class MemCacheQueueJobAdaptor implements Serializable{

private static final long serialVersionUID = -5071415952097756327L;

private ApplicationContext applicationContext;

public ApplicationContext getApplicationContext() {

return applicationContext;

}

public void setApplicationContext(ApplicationContext applicationContext) {

this.applicationContext = applicationContext;

}

/**

*

* @Title: onMessage

* @Description: 异步执行任务接口

* @author hai.zhu

* @param @param value 设定文件

* @return void 返回类型

* @throws

*/

public abstract void onMessage();

}[/code]

3、部分放在constant的常量

/**

* 基于memcache的队列存放前缀

*/

public static String MEMCACHE_GLOBAL_QUEUE_VARIABLE = "MEMCACHE_GLOBAL_QUEUE_VARIABLE_";

/**

* 基于memcache的队列锁的前缀

*/

public static String MEMCACHE_GLOBAL_QUEUE_LOCK = "MEMCACHE_GLOBAL_QUEUE_LOCK_";

/**

* 基于memcache的队列锁的开始元素

*/

public static String MEMCACHE_GLOBAL_QUEUE_STARTKEY = "MEMCACHE_GLOBAL_QUEUE_STARTKEY_";

/**

* 基于memcache的队列锁的结束元素

*/

public static String MEMCACHE_GLOBAL_QUEUE_ENDKEY = "MEMCACHE_GLOBAL_QUEUE_ENDKEY_";[/code]

4、spring配置,保证队列名的唯一性就可以配置多个队列

转载于:https://my.oschina.net/zhuxuan/blog/650935

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/367709.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【题解】 [ZJOI2009]假期的宿舍 (二分图匹配)

懒得复制题面,戳我 Solution: 处理出床位、要留校的人(注意来访问的人一定住校),和人与人的关系(连边)再接着就是二分图。注意的就是连向的人必须是有床位的还要注意的就是只用判断住校的同学二分图板子都打…

android-线程池-最顺手的写法

引子 关于线程池,在这里写出几种最顺手的写法,至于原理以及各种细节。放后面再填; 经过查证,凡是 以前new Thread()的地方,貌似都可以用线程池来执行,优化内存消耗。 代码 系统提供的…

使用Java和Google GSON解析ESPN API

在我的第一篇文章中,我将解释如何解析ESPN API。 可以在http://developer.espn.com/docs上找到API文档。 首先,您需要请求一个API密钥,然后可以开始查询REST API以检索JSON响应。 在下面的示例中,我将简单地查询在英格兰英超联赛…

java string对象创建对象_Java String 创建了几个对象

我们首先来看一段代码:Java代码String strnew String("abc");紧接着这段代码之后的往往是这个问题,那就是这行代码究竟创建了几个String对象呢?相信大家对这道题并不陌生,答案也是众所周知的,2个。接下来我们…

js 图片不保存的回显功能/图片尺寸验证/图片大小验证 /图片类型验证

大部分转自:http://blog.csdn.net/qq_39200924/article/details/79198766 在项目中经常用到input标签来上传文件,而这些文件通常是图片文件。图片有很多格式我们只需要其中的几种,就需要对用户上传的文件进行验证,在HTML5中有一个…

sqlx使用说明

sqlx使用指南 参考链接: http://jmoiron.github.io/sqlx/ sqlx是一个go语言包,在内置database/sql包之上增加了很多扩展,简化数据库操作代码的书写 资源 如果对go语言的sql用法不熟悉,可以到下面网站学习:http://go-database-sql.org/ 如果对…

填充一个池需要多少个线程?

近几个月来,我们一直看到一小部分但持续的操作失败,并带有一个奇怪的异常– org.springframework.jdbc.CannotGetJdbcConnectionException –“无法获得JDBC连接; 嵌套异常是java.sql.SQLException:客户端尝试检出Connection的尝试…

@font-face 用字体画图标

HTML 1 <body>2 <!-- ul.layout>li*5>a[href#]>i.icon -->3 <!-- Sublime Text 快捷拼写 -->4 <ul class"layout">5 <li><a href"#"><i class"icon">&#xe601;</…

java mapfile_基于文件的数据结构:关于MapFile

MapFile是已经排过序的SequenceFile&#xff0c;它有索引&#xff0c;所以可以按键查找1.MapFile的写操作MapFile的写操作类似于SequenceFile的写操作。新建一个MapFile.Writer实例&#xff0c;然后调用append()方法顺序写入文件内容。如果不按顺序写入&#xff0c;就抛出一个I…

linux scp 命令

scp 命令 scp 命令 意思是 secure copy 即安全拷贝&#xff0c;可以把它看做是 cp 命令的高级版&#xff0c;可以跨主机拷贝。 经常用来在局域网内不同主机之间分享文件&#xff0c;或者在本机与远程主机中分享文件。 在使用的时候就像使用cp 命令一样&#xff0c;第一个参数是…

工厂设计模式–一种有效的方法

如您所知&#xff0c;工厂方法模式或俗称工厂设计模式是“创意设计模式”类别下的一种设计模式。 模式背后的基本原理是&#xff0c;在运行时&#xff0c;我们根据传递的参数获得类似类型的对象。 关于这种模式有很多文章&#xff0c;开发人员可以通过各种方式来实现它。 在本文…

java 循环依赖_解决Java循环依赖的问题

最近看到一个问题&#xff1a;如果有两个类A和B&#xff0c;A类中有一个B的对象b&#xff0c;B类中有一个A的对象a&#xff0c;那么如何解决这两个类的相互依赖关系呢&#xff0c;几天就给大家说一下Java的循环依赖&#xff0c;raksmart服务器。举个栗子1&#xff1a;可以看到A…

CSS块元素水平垂直居中的实现技巧

针对之前遇到过的一些特殊样式的实现&#xff0c;我今天做个总结&#xff0c;目的有二&#xff1a;一是将这些方法记录下来&#xff0c;以便将来需要用到时查找使用。二为将这些大神们智慧的结晶发扬光大&#xff0c;让广大前端程序猿们能够少走弯路。此贴为更新帖&#xff0c;…

Intellij IDEA 将工程转换成maven工程 详解

1> 右键工程&#xff0c;点击 Add Framework Support2> 选中 Maven&#xff0c;再点击 OK3> 工程根目录自动生成 pom.xml 文件&#xff0c;这样 工程就支持 Maven版权声明&#xff1a;本文为博主原创文章&#xff0c;未经博主允许不得转载。 http://blog.csdn.net/che…

设置Apache Hadoop多节点集群

我们正在分享有关在基于Linux的机器&#xff08;多节点&#xff09;上安装Apache Hadoop的经验。 在这里&#xff0c;我们还将分享我们在各种故障排除方面的经验&#xff0c;并在将来进行更新。 用户创建和其他配置步骤– 我们首先在每个集群中添加一个专用的Hadoop系统用户。…

数据结构(七)排序---直接插入排序(稳定)

经典排序算法----直接插入排序算法及其改进&#xff08;稳定&#xff09; 定义&#xff1a; 直接插入排序的基本操作是将一个记录插入到已经排好序的有序表中&#xff0c;从而得到一个新的&#xff0c;记录数加一的有序表。 实现思想 我们预留了一个哨兵&#xff0c;这里我们将…

CSS3中的透明属性opacity的用法实例

实例设置 div 元素的不透明级别&#xff1a;div{opacity:0.5;}完整例子&#xff1a;<!DOCTYPE html><html><head><style> div{background-color:red;opacity:0.5;filter:Alpha(opacity50); /* IE8 以及更早的浏览器 */}</style></head><…

java嵌套类型 无法隐藏外层类型_java内部类深入详解 内部类的分类 特点 定义方式 使用...

java内部类 内部类的分类 特点 定义方式 使用 外部类调用内部类 多层嵌套内部类 内部类访问外部类属性 接口中的内部类 内部类的继承 内部类的覆盖 局部内部类 成员内部类 静态内部类 匿名内部类内部类定义将一个类定义放到另一个类的内部,这就是内部类内部类与组合是完…

java.io.FileNotFoundException: generatorConfig.xml (系统找不到指定的文件。)

在使用MyBatis的逆向工程生成代码时&#xff0c;一直报错java.io.FileNotFoundException: generatorConfig.xml (系统找不到指定的文件。)&#xff0c;如图 文件结构如下&#xff1a; 代码如下&#xff1a; import java.io.File; import java.util.ArrayList; import java.util…

单例设计模式–鸟瞰

几天前&#xff0c;当我回到家乡时&#xff0c;我的一位来自同事的大三学生参加了一家跨国公司的采访&#xff0c;在采访过程中受了重伤。 我的意思是&#xff0c;由于面试小组提出的难题&#xff0c;他无法使面试合格。 当我回到班加罗尔时&#xff0c;他分享了他在技术面试中…