文章目录
- 前言
- 一、异步队列实现思路?
- 二、实现步骤
- 1.加入监听器
- 2.实现监听器
- 3.实现转交处理对象和转交处理线程
- 4.自动转交异步处理
- 5.业务如何添加
前言
在某些场景下,操作比较耗时,给用户体验不是很好,这时候我们就会直接想到两种方案,一种是定时任务,一种就是异步队列,那些实时性要求不高,且比较耗时的任务,是队列的最佳应用场景。
一、异步队列实现思路?
持久化=>插入队列=>出队,当程序突然停止,当程序启动的时候,从库里面拉出未执行的数据继续入队(补偿机制),下面是java的简单实现。
二、实现步骤
1.加入监听器
代码如下(示例):在web.xml 加入监听器
<!--转交异步处理监听器 --><listener><listener-class>context.ZjListener</listener-class></listener>
2.实现监听器
代码如下(示例):
public class ZjListener implements ServletContextListener {private static Logger log = LoggerFactory.getLogger(ZjListener.class);@Overridepublic void contextInitialized(ServletContextEvent servletContextEvent) {log.info("初始化转交异步处理线程...");ZjManager.getInstance().starup();log.info("初始化转交异步处理线程成功...");}@Overridepublic void contextDestroyed(ServletContextEvent servletContextEvent) {ZjManager.getInstance().shutdown();}
3.实现转交处理对象和转交处理线程
代码如下(示例):我就简单示例一些,具体实现看业务昂
public class ZjRequset {private String dm;private String xh;public String getDm() {return dm;}public void setDm(String dm) {this.dm = dm;}public String getXh() {return xh;}public void setXh(String xh) {this.xh = xh;}
}
public class ZjThread implements Runnable{private ZjRequset requset;private ZjService zjService = WebAppContext.getBeanEx("ZjService");/*** @description 实例化一个自动转交处理线程* @param requset*/public ZjThread(ZjRequset requset){this.requset = requset;}/*** @description 获取请求* @return ZjRequset*/public ZjRequset getRequest() {return requset;}@Overridepublic void run() {if (requset != null){zjService.saveZj(requset.getDm(), requset.getXh());}}
}
4.自动转交异步处理
代码如下(示例):这一块就是核心的代码了
public class ZjManager {private static final Log LOG = LogFactory.getLog(ZjManager.class);private static final ZjManager INSTANCE = new ZjManager();/*** 线程池维护线程的最少数量*/private final static int CORE_POOL_SIZE = 2;/*** 线程池维护线程的最大数量*/private final static int MAX_POOL_SIZE = 3;/*** 线程池维护线程所允许的空闲时间*/private final static int KEEP_ALIVE_TIME = 0;/*** 线程池所使用的缓冲队列大小*/private final static int WORK_QUEUE_SIZE = 200;/*** 是不是第一次启动程序*/private static boolean FIRST_QD = true;/*** 自动转交异步处理队列*/private final Queue<ZjRequset> requestQueue = new LinkedList<ZjRequset>();/*** 线程池*/private ThreadPoolExecutor threadPool = null;/*** 调度器*/private ScheduledExecutorService scheduler = null;/*** @description 获取异步处理管理器实例*/public static ZjManager getInstance(){return INSTANCE;}/*** @description 队列是否为空*/private boolean hasAcquire() {return !requestQueue.isEmpty();}/*** @description 启动工作线程*/public boolean starup(){LOG.info( Console.getNowStr() + " 正在启动异步处理管理器...");threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(WORK_QUEUE_SIZE), this.handler);scheduler = Executors.newScheduledThreadPool(1);scheduler.scheduleAtFixedRate(accessBufferThread, 0, 3, TimeUnit.SECONDS);LOG.info(Console.getNowStr() + " 启动异步处理管理器成功!");return true;}/*** @description 关闭工作线程*/public void shutdown(){if (scheduler != null) {scheduler.shutdown();}if (threadPool != null) {threadPool.shutdown();}}/*** @description 处理器*/final RejectedExecutionHandler handler = new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {synchronized (requestQueue) {try {requestQueue.offer(((ZjThread) r).getRequest());} catch (Exception e) {LOG.error("插入自动转交队列失败",e);}}}};/*** @description 访问消息缓存的调度线程,查看是否有待定请求,如果有,则创建一个新的,并添加到线程池*/final Runnable accessBufferThread = new Runnable() {@Overridepublic void run() {synchronized (requestQueue) {try {if (FIRST_QD){reloadRequest();}if (hasAcquire()) {ZjRequset request = requestQueue.poll();if (request != null) {Runnable task = new ZjThread(request);threadPool.execute(task);}}} catch (Exception e) {LOG.error("重新执行失败",e);}}}};/*** 增加一个数据库操作** @param request the request*/public void AddRequest(ZjRequset request) {try {if (request != null) {//持久化,写入库中wirteRequest(request);Runnable task = new ZjThread(request);threadPool.execute(task);}} catch (Exception e) {LOG.error(e.getMessage(),e);}}/*** 写入到表中** @param request the request*/void wirteRequest(ZjRequset request) {//将要请求写入库}/*** 将库中未执行的任务添加到队列中*/void reloadRequest() {FIRST_QD = false;Connection conn = null;PreparedStatement pst = null;ResultSet rs = null;//举个例子try {conn = getConn();pst = conn.prepareStatement(sql);rs = pst.executeQuery();while (rs.next()) {ZjRequset request = new ZjRequset();request.setDm(rs.getString("DM"));request.setXh(rs.getString("XH"));ZjManager.getInstance().AddRequest(request,"");}} catch (SQLException e) {LOG.error(e.getMessage(), e);}finally {DBUtils.closeResultSet(rs);DBUtils.closePStatement(pst);DBUtils.closeConnection(conn);}}/*** 获取队列待处理线程数量*/public int getQueueCount(){return requestQueue.size();}/*** 获取处理线程的状态*/public int getThreadZt(){return threadPool.getActiveCount();}}
5.业务如何添加
ZjRequset clRequest = new ZjRequset();clRequest.setDm(dm);clRequest.setXh(xh);ZjManager.getInstance().AddRequest(clRequest);