【优化】XXLJOB修改为使用虚拟线程

【优化】XXLJOB修改为使用虚拟线程

新建这几个目录 类, 去找项目对应的xxljob的源码

主要是将 new Thread  改为  虚拟线程

Thread.ofVirtual().name("VT").unstarted

以下代码是 xxljob   2.3.0版本  举一反三 去修改对应版本的代码

<!--        定时任务--><dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version></dependency>

XxlJobExecutor
package com.xxl.job.core.executor;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.client.AdminBizClient;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.server.EmbedServer;
import com.xxl.job.core.thread.JobLogFileCleanThread;
import com.xxl.job.core.thread.JobThread;
import com.xxl.job.core.thread.TriggerCallbackThread;
import com.xxl.job.core.util.IpUtil;
import com.xxl.job.core.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;/*** Created by xuxueli on 2016/3/2 21:14.*/
public class XxlJobExecutor  {private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);// ---------------------- param ----------------------private String adminAddresses;private String accessToken;private String appname;private String address;private String ip;private int port;private String logPath;private int logRetentionDays;public void setAdminAddresses(String adminAddresses) {this.adminAddresses = adminAddresses;}public void setAccessToken(String accessToken) {this.accessToken = accessToken;}public void setAppname(String appname) {this.appname = appname;}public void setAddress(String address) {this.address = address;}public void setIp(String ip) {this.ip = ip;}public void setPort(int port) {this.port = port;}public void setLogPath(String logPath) {this.logPath = logPath;}public void setLogRetentionDays(int logRetentionDays) {this.logRetentionDays = logRetentionDays;}// ---------------------- start + stop ----------------------public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-serverinitEmbedServer(address, ip, port, appname, accessToken);}public void destroy(){// destory executor-serverstopEmbedServer();// destory jobThreadRepositoryif (jobThreadRepository.size() > 0) {for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");// wait for job thread push result to callback queueif (oldJobThread != null) {try {oldJobThread.join();} catch (InterruptedException e) {logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);}}}jobThreadRepository.clear();}jobHandlerRepository.clear();// destory JobLogFileCleanThreadJobLogFileCleanThread.getInstance().toStop();// destory TriggerCallbackThreadTriggerCallbackThread.getInstance().toStop();}// ---------------------- admin-client (rpc invoker) ----------------------private static List<AdminBiz> adminBizList;private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {if (adminAddresses!=null && adminAddresses.trim().length()>0) {for (String address: adminAddresses.trim().split(",")) {if (address!=null && address.trim().length()>0) {AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);if (adminBizList == null) {adminBizList = new ArrayList<AdminBiz>();}adminBizList.add(adminBiz);}}}}public static List<AdminBiz> getAdminBizList(){return adminBizList;}// ---------------------- executor-server (rpc provider) ----------------------private EmbedServer embedServer = null;private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}private void stopEmbedServer() {// stop provider factoryif (embedServer != null) {try {embedServer.stop();} catch (Exception e) {logger.error(e.getMessage(), e);}}}// ---------------------- job handler repository ----------------------private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();public static IJobHandler loadJobHandler(String name){return jobHandlerRepository.get(name);}public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}// ---------------------- job thread repository ----------------------private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){JobThread newJobThread = new JobThread(jobId, handler);Thread.ofVirtual().name("VT_XXLJOB").start(newJobThread);
//        newJobThread.start();
//        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();}return newJobThread;}public static JobThread removeJobThread(int jobId, String removeOldReason){JobThread oldJobThread = jobThreadRepository.remove(jobId);if (oldJobThread != null) {oldJobThread.toStop(removeOldReason);oldJobThread.interrupt();return oldJobThread;}return null;}public static JobThread loadJobThread(int jobId){JobThread jobThread = jobThreadRepository.get(jobId);return jobThread;}}

EmbedServer
package com.xxl.job.core.server;import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.biz.model.*;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.util.GsonTool;
import com.xxl.job.core.util.ThrowableUtil;
import com.xxl.job.core.util.XxlJobRemotingUtil;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.*;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;/*** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2020-04-11 21:25*/
public class EmbedServer {private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);private ExecutorBiz executorBiz;private Thread thread;public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = Thread.ofVirtual().name("VT_EmbedServer").unstarted(new Runnable() {@Overridepublic void run() {// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ExecutorService bizThreadPool = Executors.newVirtualThreadPerTaskExecutor();//                ThreadPoolExecutor bizThreadPool2 = new ThreadPoolExecutor(
//                        0,
//                        200,
//                        60L,
//                        TimeUnit.SECONDS,
//                        new LinkedBlockingQueue<Runnable>(2000),
//                        new ThreadFactory() {
//                            @Override
//                            public Thread newThread(Runnable r) {
//                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
//                            }
//                        },
//                        new RejectedExecutionHandler() {
//                            @Override
//                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
//                            }
//                        });try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registrystartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}public void stop() throws Exception {// destroy server threadif (thread!=null && thread.isAlive()) {thread.interrupt();}// stop registrystopRegistry();logger.info(">>>>>>>>>>> xxl-job remoting server destroy success.");}// ---------------------- registry ----------------------/*** netty_http** Copy from : https://github.com/xuxueli/xxl-rpc** @author xuxueli 2015-11-24 22:25:15*/public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);private ExecutorBiz executorBiz;private String accessToken;
//        private ThreadPoolExecutor bizThreadPool;private ExecutorService bizThreadPool;public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ExecutorService bizThreadPool) {this.executorBiz = executorBiz;this.accessToken = accessToken;this.bizThreadPool = bizThreadPool;}@Overrideprotected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {// request parse//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);String requestData = msg.content().toString(CharsetUtil.UTF_8);String uri = msg.uri();HttpMethod httpMethod = msg.method();boolean keepAlive = HttpUtil.isKeepAlive(msg);String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);// invokebizThreadPool.execute(new Runnable() {@Overridepublic void run() {// do invokeObject responseObj = process(httpMethod, uri, requestData, accessTokenReq);// to jsonString responseJson = GsonTool.toJson(responseObj);// write responsewriteResponse(ctx, keepAlive, responseJson);}});}private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {// validif (HttpMethod.POST != httpMethod) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");}if (uri==null || uri.trim().length()==0) {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");}if (accessToken!=null&& accessToken.trim().length()>0&& !accessToken.equals(accessTokenReq)) {return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");}// services mappingtry {if ("/beat".equals(uri)) {return executorBiz.beat();} else if ("/idleBeat".equals(uri)) {IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);return executorBiz.idleBeat(idleBeatParam);} else if ("/run".equals(uri)) {TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);return executorBiz.run(triggerParam);} else if ("/kill".equals(uri)) {KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);return executorBiz.kill(killParam);} else if ("/log".equals(uri)) {LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);return executorBiz.log(logParam);} else {return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");}} catch (Exception e) {logger.error(e.getMessage(), e);return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));}}/*** write response*/private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {// write responseFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());if (keepAlive) {response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);}ctx.writeAndFlush(response);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);ctx.close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {ctx.channel().close();      // beat 3N, close if idlelogger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");} else {super.userEventTriggered(ctx, evt);}}}// ---------------------- registry ----------------------public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}public void stopRegistry() {// stop registryExecutorRegistryThread.getInstance().toStop();}}

ExecutorRegistryThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 17/3/2.*/
public class ExecutorRegistryThread {private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);private static ExecutorRegistryThread instance = new ExecutorRegistryThread();public static ExecutorRegistryThread getInstance(){return instance;}private Thread registryThread;private volatile boolean toStop = false;public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});registryThread.setDaemon(true);registryThread.setName("VT_xxl-job, executor ExecutorRegistryThread");registryThread.start();}public void toStop() {toStop = true;// interrupt and waitif (registryThread != null) {registryThread.interrupt();try {registryThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}}

JobLogFileCleanThread
package com.xxl.job.core.thread;import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;/*** job file clean thread** @author xuxueli 2017-12-29 16:23:43*/
public class JobLogFileCleanThread {private static Logger logger = LoggerFactory.getLogger(JobLogFileCleanThread.class);private static JobLogFileCleanThread instance = new JobLogFileCleanThread();public static JobLogFileCleanThread getInstance(){return instance;}private Thread localThread;private volatile boolean toStop = false;public void start(final long logRetentionDays){// limit min valueif (logRetentionDays < 3 ) {return;}localThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while (!toStop) {try {// clean log dir, over logRetentionDaysFile[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();if (childDirs!=null && childDirs.length>0) {// todayCalendar todayCal = Calendar.getInstance();todayCal.set(Calendar.HOUR_OF_DAY,0);todayCal.set(Calendar.MINUTE,0);todayCal.set(Calendar.SECOND,0);todayCal.set(Calendar.MILLISECOND,0);Date todayDate = todayCal.getTime();for (File childFile: childDirs) {// validif (!childFile.isDirectory()) {continue;}if (childFile.getName().indexOf("-") == -1) {continue;}// file create dateDate logFileCreateDate = null;try {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");logFileCreateDate = simpleDateFormat.parse(childFile.getName());} catch (ParseException e) {logger.error(e.getMessage(), e);}if (logFileCreateDate == null) {continue;}if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {FileUtil.deleteRecursively(childFile);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.DAYS.sleep(1);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");}});localThread.setDaemon(true);localThread.setName("VT_xxl-job, executor JobLogFileCleanThread");localThread.start();}public void toStop() {toStop = true;if (localThread == null) {return;}// interrupt and waitlocalThread.interrupt();try {localThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}

JobThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.log.XxlJobFileAppender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;/*** handler thread* @author xuxueli 2016-1-16 19:52:47*/
public class JobThread extends Thread{private static Logger logger = LoggerFactory.getLogger(JobThread.class);private int jobId;private IJobHandler handler;private LinkedBlockingQueue<TriggerParam> triggerQueue;private Set<Long> triggerLogIdSet;		// avoid repeat trigger for the same TRIGGER_LOG_IDprivate volatile boolean toStop = false;private String stopReason;private boolean running = false;    // if running jobprivate int idleTimes = 0;			// idel timespublic JobThread(int jobId, IJobHandler handler) {this.jobId = jobId;this.handler = handler;this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());}public IJobHandler getHandler() {return handler;}/*** new trigger to queue** @param triggerParam* @return*/public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {// avoid repeatif (triggerLogIdSet.contains(triggerParam.getLogId())) {logger.debug(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());}triggerLogIdSet.add(triggerParam.getLogId());triggerQueue.add(triggerParam);return ReturnT.SUCCESS;}/*** kill job thread** @param stopReason*/public void toStop(String stopReason) {/*** Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep),* 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身;* 所以需要注意,此处彻底销毁本线程,需要通过共享变量方式;*/this.toStop = true;this.stopReason = stopReason;}/*** is running job* @return*/public boolean isRunningOrHasQueue() {return running || triggerQueue.size()>0;}@Overridepublic void run() {// inittry {handler.init();} catch (Throwable e) {logger.error(e.getMessage(), e);}// executewhile(!toStop){running = false;idleTimes++;TriggerParam triggerParam = null;try {// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);if (triggerParam!=null) {running = true;idleTimes = 0;triggerLogIdSet.remove(triggerParam.getLogId());// log filename, like "logPath/yyyy-MM-dd/9999.log"String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());XxlJobContext xxlJobContext = new XxlJobContext(triggerParam.getJobId(),triggerParam.getExecutorParams(),logFileName,triggerParam.getBroadcastIndex(),triggerParam.getBroadcastTotal());// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);// executeXxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());if (triggerParam.getExecutorTimeout() > 0) {// limit timeoutThread futureThread = null;try {FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {@Overridepublic Boolean call() throws Exception {// init job contextXxlJobContext.setXxlJobContext(xxlJobContext);handler.execute();return true;}});futureThread = Thread.ofVirtual().name("VT_XXL").start(futureTask);
//                            futureThread.start();Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);} catch (TimeoutException e) {XxlJobHelper.log("<br>----------- xxl-job job execute timeout");XxlJobHelper.log(e);// handle resultXxlJobHelper.handleTimeout("job execute timeout ");} finally {futureThread.interrupt();}} else {// just executehandler.execute();}// valid execute handle dataif (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {XxlJobHelper.handleFail("job handle result lost.");} else {String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)?tempHandleMsg.substring(0, 50000).concat("..."):tempHandleMsg;XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);}XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="+ XxlJobContext.getXxlJobContext().getHandleCode()+ ", handleMsg = "+ XxlJobContext.getXxlJobContext().getHandleMsg());} else {if (idleTimes > 30) {if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lostXxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");}}}} catch (Throwable e) {if (toStop) {XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);}// handle resultStringWriter stringWriter = new StringWriter();e.printStackTrace(new PrintWriter(stringWriter));String errorMsg = stringWriter.toString();XxlJobHelper.handleFail(errorMsg);XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");} finally {if(triggerParam != null) {// callback handler infoif (!toStop) {// commonmTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.getXxlJobContext().getHandleCode(),XxlJobContext.getXxlJobContext().getHandleMsg() ));} else {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job running, killed]" ));}}}}// callback trigger request in queuewhile(triggerQueue !=null && triggerQueue.size()>0){TriggerParam triggerParam = triggerQueue.poll();if (triggerParam!=null) {// is killedTriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(),triggerParam.getLogDateTime(),XxlJobContext.HANDLE_COCE_FAIL,stopReason + " [job not executed, in the job queue, killed.]"));}}// destroytry {handler.destroy();} catch (Throwable e) {logger.error(e.getMessage(), e);}logger.debug(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());}
}

TriggerCallbackThread
package com.xxl.job.core.thread;import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobContext;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.log.XxlJobFileAppender;
import com.xxl.job.core.util.FileUtil;
import com.xxl.job.core.util.JdkSerializeTool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;/*** Created by xuxueli on 16/7/22.*/
public class TriggerCallbackThread {private static Logger logger = LoggerFactory.getLogger(TriggerCallbackThread.class);private static TriggerCallbackThread instance = new TriggerCallbackThread();public static TriggerCallbackThread getInstance(){return instance;}/*** job results callback queue*/private LinkedBlockingQueue<HandleCallbackParam> callBackQueue = new LinkedBlockingQueue<HandleCallbackParam>();public static void pushCallBack(HandleCallbackParam callback){getInstance().callBackQueue.add(callback);logger.debug(">>>>>>>>>>> xxl-job, push callback request, logId:{}", callback.getLogId());}/*** callback thread*/private Thread triggerCallbackThread;private Thread triggerRetryCallbackThread;private volatile boolean toStop = false;public void start() {// validif (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");return;}// callbacktriggerCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {// normal callbackwhile(!toStop){try {HandleCallbackParam callback = getInstance().callBackQueue.take();if (callback != null) {// callback list paramList<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);callbackParamList.add(callback);// callback, will retry if errorif (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}}// last callbacktry {List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);if (callbackParamList!=null && callbackParamList.size()>0) {doCallback(callbackParamList);}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");}});triggerCallbackThread.setDaemon(true);triggerCallbackThread.setName("VT_xxl-job, executor TriggerCallbackThread");triggerCallbackThread.start();// retrytriggerRetryCallbackThread = Thread.ofVirtual().unstarted(new Runnable() {@Overridepublic void run() {while(!toStop){try {retryFailCallbackFile();} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);} catch (InterruptedException e) {if (!toStop) {logger.error(e.getMessage(), e);}}}logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");}});triggerRetryCallbackThread.setDaemon(true);triggerRetryCallbackThread.start();}public void toStop(){toStop = true;// stop callback, interrupt and waitif (triggerCallbackThread != null) {    // support empty admin addresstriggerCallbackThread.interrupt();try {triggerCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}// stop retry, interrupt and waitif (triggerRetryCallbackThread != null) {triggerRetryCallbackThread.interrupt();try {triggerRetryCallbackThread.join();} catch (InterruptedException e) {logger.error(e.getMessage(), e);}}}/*** do callback, will retry if error* @param callbackParamList*/private void doCallback(List<HandleCallbackParam> callbackParamList){boolean callbackRet = false;// callback, will retry if errorfor (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> callbackResult = adminBiz.callback(callbackParamList);if (callbackResult!=null && ReturnT.SUCCESS_CODE == callbackResult.getCode()) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback finish.");callbackRet = true;break;} else {callbackLog(callbackParamList, "<br>----------- xxl-job job callback fail, callbackResult:" + callbackResult);}} catch (Exception e) {callbackLog(callbackParamList, "<br>----------- xxl-job job callback error, errorMsg:" + e.getMessage());}}if (!callbackRet) {appendFailCallbackFile(callbackParamList);}}/*** callback log*/private void callbackLog(List<HandleCallbackParam> callbackParamList, String logContent){for (HandleCallbackParam callbackParam: callbackParamList) {String logFileName = XxlJobFileAppender.makeLogFileName(new Date(callbackParam.getLogDateTim()), callbackParam.getLogId());XxlJobContext.setXxlJobContext(new XxlJobContext(-1,null,logFileName,-1,-1));XxlJobHelper.log(logContent);}}// ---------------------- fail-callback file ----------------------private static String failCallbackFilePath = XxlJobFileAppender.getLogPath().concat(File.separator).concat("callbacklog").concat(File.separator);private static String failCallbackFileName = failCallbackFilePath.concat("xxl-job-callback-{x}").concat(".log");private void appendFailCallbackFile(List<HandleCallbackParam> callbackParamList){// validif (callbackParamList==null || callbackParamList.size()==0) {return;}// append filebyte[] callbackParamList_bytes = JdkSerializeTool.serialize(callbackParamList);File callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis())));if (callbackLogFile.exists()) {for (int i = 0; i < 100; i++) {callbackLogFile = new File(failCallbackFileName.replace("{x}", String.valueOf(System.currentTimeMillis()).concat("-").concat(String.valueOf(i)) ));if (!callbackLogFile.exists()) {break;}}}FileUtil.writeFileContent(callbackLogFile, callbackParamList_bytes);}private void retryFailCallbackFile(){// validFile callbackLogPath = new File(failCallbackFilePath);if (!callbackLogPath.exists()) {return;}if (callbackLogPath.isFile()) {callbackLogPath.delete();}if (!(callbackLogPath.isDirectory() && callbackLogPath.list()!=null && callbackLogPath.list().length>0)) {return;}// load and clear file, retryfor (File callbaclLogFile: callbackLogPath.listFiles()) {byte[] callbackParamList_bytes = FileUtil.readFileContent(callbaclLogFile);// avoid empty fileif(callbackParamList_bytes == null || callbackParamList_bytes.length < 1){callbaclLogFile.delete();continue;}List<HandleCallbackParam> callbackParamList = (List<HandleCallbackParam>) JdkSerializeTool.deserialize(callbackParamList_bytes, List.class);callbaclLogFile.delete();doCallback(callbackParamList);}}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/235434.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机基础以及实施运维工程师的介绍

目录 什么是实施、运维工程师 实施工程师 实施工程师的职责 什么是运维工程师 运维功工程师的职责 需要的技术 计算机的介绍 CPU 存储器 IO 系统总线 主板 BIOS 什么是实施、运维工程师 实施工程师 纯实施工程师是指在工程项目实施阶段专门负责实施工作的工程师。与其他…

大模型赋能“AI+电商”,景联文科技提供高质量电商场景数据

据新闻报道&#xff0c;阿里巴巴旗下淘天集团和国际数字商业集团都已建立完整的AI团队。 淘天集团已经推出模特图智能生成、官方客服机器人、万相台无界版等AI工具&#xff0c;训练出了自己的大模型产品 “星辰”&#xff1b; 阿里国际商业集团已成立AI Business&#xff0c;…

Gazebo GUI模型编辑器

模型编辑器 现在我们将构建我们的简单机器人。我们将制作一个轮式车辆&#xff0c;并添加一个传感器&#xff0c;使我们能够让机器人跟随一个斑点&#xff08;人&#xff09;。 模型编辑器允许我们直接在图形用户界面 &#xff08;GUI&#xff09; 中构建简单的模型。对于更复…

在使用mapstruct,想忽略掉List<DTO>字段里面的,`data` 字段的映射, 如何写ignore: 使用@IterableMapping

在使用mapstruct,想忽略掉List字段里面的,data 字段的映射, 如何写ignore 代码如下: public interface AssigmentFileMapper {AssigmentFileDTO assigmentFileToAssigmentFileDTO(AssigmentFile assigmentFile);AssigmentFile assigmentFileDTOToAssigmentFile(Assigment…

用全志R128复刻自平衡赛车机器人,还实现了三种不同的操控方式

经常翻车的朋友们都知道&#xff0c;能在翻车后快速摆正车身的车才是好车。 就像动画《四驱兄弟》中展现的那样&#xff0c;在比赛中需要跟着赛车一起跑圈&#xff0c;而且赛车如果被撞翻还需要重新用手扶正&#xff0c;所浪费的时间非常影响比赛结果。 如果小豪和小烈可以拥有…

flutter自定义地图Marker完美展示图片

世人都说雪景美 寒风冻脚无人疼 只道是一身正气 结论 参考Flutter集成高德地图并添加自定义Maker先实现自定义Marker。如果自定义Marker中用到了图片&#xff0c;那么会碰到图片没有被绘制到Marker的问题&#xff0c;此时需要通过precacheImage来预加载图片&#xff0c;从而解…

智能化创作与艺术:发展、问题、未来趋势

导言 随着人工智能技术的不断进步&#xff0c;智能化创作在艺术领域逐渐崭露头角。本文将深入研究智能化创作与艺术的发展过程、遇到的问题、解决的过程&#xff0c;探讨未来的可用范围&#xff0c;并分析在各国的应用和未来的研究趋势。最后&#xff0c;探讨在哪些方面能取胜&…

一. 模块之间的依赖 ------ 详细解析官网购物应用优秀案例(鸿蒙开发)

一. 项目目录简介 ├──**common** // 公共能力层 │ ├──components │ │ ├──CommodityList.ets // 商品列表组件 │ │ ├──CounterProduct.ets // 数量加减组件 │ │ └──EmptyComponent.ets /…

使用阿里云性能测试工具 JMeter 场景压测 RocketMQ 最佳实践

作者&#xff1a;森元 需求背景 新业务上线前&#xff0c;我们通常需要对系统的不同中间件进行压测&#xff0c;找到当前配置下中间件承受流量的上限&#xff0c;从而确定上游链路的限流规则&#xff0c;保护系统不因突发流量而崩溃。阿里云 PTS 的 JMeter 压测可以支持用户上…

【C++初阶】第一站:C++入门基础(下)

前言&#xff1a; 紧接着上两篇文章&#xff0c;c入门基础(上)&#xff1a;C入门基础(上) c入门基础(中)&#xff1a;C入门基础(中) 继续补充完c初阶入门基础的知识点&#xff0c;本章知识点包括&#xff1a; 引用和指针的区别、内联函数、auto关键字(C11)、基于范围的for循环…

Android 13 - Media框架(24)- OMXNodeInstance(一)

为了了解 ACodec 是如何与 OpenMAX 组件进行 buffer 流转的&#xff0c;我们有必要先来学习 OMXNodeInstance&#xff0c;在前面的章节中&#xff0c;我们已经了解了 media.codec 进程包含的内容&#xff0c;以及 OpenMAX 框架中的一些内容。这一节我们将来学习 OMXNode 与 med…

快猫视频模板源码定制开发 苹果CMS 可打包成双端APP

苹果CMS快猫视频网站模板源码&#xff0c;可用于开发双端APP&#xff0c;后台支持自定义参数&#xff0c;包括会员升级页面、视频、演员、专题、收藏和会员系统等完整模块。还可以直接指定某个分类下的视频为免费专区&#xff0c;具备完善的卡密支付体系&#xff0c;无需人工管…

听GPT 讲Rust源代码--src/tools(17)

File: rust/src/tools/rust-analyzer/crates/profile/src/hprof.rs 在Rust源代码中&#xff0c;rust/src/tools/rust-analyzer/crates/profile/src/hprof.rs文件是rust-analyzer中的性能分析模块&#xff0c;用于代码运行时的性能统计和分析。下面将详细介绍每个结构体的作用&a…

【操作系统】什么是进程?

文章目录 进程进程的属性进程的状态挂起 进程 进程是一个可并发执行的具有独立功能的程序关于某个数据集合的执行过程&#xff0c;也是操作系统进行资源分配和保护的基本单位。 进程的属性 结构性&#xff1a; 共享性&#xff1a;同一程序运行于不同数据集合上构成不同的进程…

Flink Table API 与 SQL 编程整理

Flink API总共分为4层这里主要整理Table API的使用 Table API是流处理和批处理通用的关系型API&#xff0c;Table API可以基于流输入或者批输入来运行而不需要进行任何修改。Table API是SQL语言的超集并专门为Apache Flink设计的&#xff0c;Table API是Scala和Java语言集成式…

【漏洞复现】奥威亚 教学视频应用服务平台任意文件上传漏洞

漏洞描述 AVA 教学视频应用服务平台是由广州市奥威亚电子科技有限公司基于当前教育视频资源建设的背景及用户需求的调研,开发出来能够适应时代发展和满足学校需求,具有实效性、多功能、特点鲜明的平台。 该平台存在任意文件上传漏洞,通过此漏洞攻击者可上传webshell木马,…

OpenCV-9颜色空间的转换

颜色转换API&#xff1a;cvtColor&#xff08;img&#xff0c;colorsapce&#xff09; cvt含义为转换 convesion(转换) 下面为示例代码&#xff1a; import cv2# callback中至少有一个参数 def callback(value):passcv2.namedWindow("color", cv2.WINDOW_NORMAL) …

C#上位机与欧姆龙PLC的通信03----创建项目工程

1、创建仿真PLC 这是一款CP1H-X40DR-A的PLC&#xff0c;呆会后面创建工程的时候需要与这个类型的PLC类型一致&#xff0c;否则程序下载不到PLC上。 2、创建虚拟串口 首先安装&#xff0c;这个用来创建虚拟串口来模拟真实的串口&#xff0c;也就是上位机上有那种COM口&#xf…

Java版直播商城免 费 搭 建:电商、小程序、三级分销及免 费 搭 建,平台规划与营销策略全掌握

随着互联网的快速发展&#xff0c;越来越多的企业开始注重数字化转型&#xff0c;以提升自身的竞争力和运营效率。在这个背景下&#xff0c;鸿鹄云商SAAS云产品应运而生&#xff0c;为企业提供了一种简单、高效、安全的数字化解决方案。 鸿鹄云商SAAS云产品是一种基于云计算的软…

神经科学与计算神经科学的蓬勃发展与未来趋势

导言 神经科学和计算神经科学是当前科学研究领域中备受关注的方向。本文将深入研究这两个领域的发展历程、遇到的问题、解决过程&#xff0c;以及未来的可用范围。我们还将关注在各国的应用现状以及未来的研究趋势&#xff0c;探讨如何在竞争中取胜&#xff0c;以及在哪些方面发…