前两天由于项目需要,一个windows上的批处理任务(kitchen.bat),需要接到mq的消息通知后执行,为了快速实现这里我们通过springboot写了一个jar程序,用于接收mq的消息,并调用bat文件。
本程序需要实现的功能
- 调用windows的批处理脚本bat,并支持传参
- 可根据配置设置并发,同时消费多个mq消息调用多个批处理脚本
- 确保java程序能一直正常运行(如果有假死或者宕机了可以自动重启)
- 批处理脚本执行失败了,则再将信息重新放回到mq的队列尾部,等待下次执行
需要用的技术
- Java的java.lang.Runtime类 用于调用windows服务器命令
- 通过环境变量配置程序运行的参数,如mq信息、和执行的批处理脚本命令路径、并发等
- 通过rabbitmq的手工ack来确定消息是否处理成功,及并发实现
- 通过
actuator
来判断java程序是否健康 - 通过windows定时任务来定时检查java程序是否正常提供服务,如果不正常则触发重启jar应用
- 通过maven+ant打包程序,将可执行程序jar及相关脚本打包成一个zip文件,方便发给使用方使用
主要实现逻辑
开发环境:jdk1.8 + maven3.x + rabbitmq
运行环境:windows + jre1.8
Java调用bat批处理文件
package cn.iccboy.kitchen.common;import lombok.extern.slf4j.Slf4j;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;/*** @author iccboy*/@Slf4j
public class CmdUtil {/*** 处理执行进程的流** @param inputStream* InputStream 执行进程的流* @param tag* int 标志:1--InputStream;2--ErrorStream*/private static void processStreamHandler(final InputStream inputStream, int tag) {// 处理流的线程new Thread(() -> {String line;try (InputStreamReader inputStreamReader = new InputStreamReader(inputStream);BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {while ((line = bufferedReader.readLine()) != null) {if(tag == 1) {log.info(line);} else {log.error(line);}}} catch (Exception e) {log.error("【异常】命令执行异常:{}", e.getMessage());}}).start();}public static int exec(String command, String... args) throws IOException {String cmd = StrUtil.splicingWithSpace(command, args);log.info("执行命令:{}", cmd);int ret = 99;Process process = Runtime.getRuntime().exec(cmd);processStreamHandler(process.getInputStream(), 1);processStreamHandler(process.getErrorStream(), 2);try {ret = process.waitFor();} catch (InterruptedException e) {log.error("【异常】process.waitFor:{}" , e.getMessage());}log.info("执行命令:{}, 返回状态码={}", cmd, ret);return ret;}
}
上面的程序中,一定要注意的是process.getErrorStream()
和 process.getInputStream()
一定要将命令行执行输出的信息(输出流)和错误信息(错误流)都从缓冲区读取出来,不然会导致程序执行阻塞。
process的阻塞: 在runtime执行大点的命令中,输入流和错误流会不断有流进入存储在JVM的缓冲区中,如果缓冲区的流不被读取被填满时,就会造成runtime的阻塞。所以在进行比如:大文件复制等的操作时,需要不断的去读取JVM中的缓冲区的流,防止Runtime的死锁阻塞。
程序健康检查
这里通过actuator
来实现,首先程序集成actuator
,由于是springboot项目,所以很方便。然后通过一个简单的java程序(CheckActuator)来访问actuator
的http地址,通过返回值来判断jar程序是否运行正常,然后通过windows的脚本(checkHealth.bat
)来调用CheckActuator
,根据返回值在进行java程序的重启等操作。
1. pom.xml增加actuator及prometheus的配置
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>io.micrometer</groupId><artifactId>micrometer-registry-prometheus</artifactId></dependency>
上的版本会根据springboot对应版本自动集成
2. 配置actuator
在application.yml中增加如下配置
management:health:rabbit:enabled: trueendpoints:web:exposure:include: ["prometheus","health"]endpoint:health:show-details: alwaysmetrics:export:prometheus:enabled: truejmx:enabled: true
3. 编写CheckActuator.java程序
当然也可以通过windows的批处理命令直接访问actuator的地址,来判断服务是否正常。
/*** 注意:该类不能删除!!!! 不能改名!!!!不能移动位置!!!!*/
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
import java.net.URLConnection;/*** ===================================================* 注意:该类不能删除!!!! 不能改名!!!!不能移动位置!!!!** 该类用于检查程序是否健康(通过actuator进行判断是否健康)** 主要供脚本checkHealth.bat进行调用* ===================================================*/
public class CheckActuator {private static final String HEALTH_FLAG = "\"status\":\"UP\"";public static void main(String[] args) {String url = "http://127.0.0.1:8000/actuator/health";if(args != null && args.length != 0) {url = args[0];}testUrlWithTimeOut(url);}public static void testUrlWithTimeOut(String urlString){int timeOutMillSeconds = 2000;URL url;try {url = new URL(urlString);URLConnection conn = url.openConnection();conn.setConnectTimeout(timeOutMillSeconds);conn.connect();InputStream in = conn.getInputStream();BufferedReader reader = new BufferedReader( new InputStreamReader(in));String line;StringBuilder sb = new StringBuilder();while ((line = reader.readLine()) != null) {sb.append(line);}boolean healthFlag = sb.toString().contains(HEALTH_FLAG);if(healthFlag) {System.exit(0);} else {System.out.println("健康检查异常:" + sb);System.exit(1);}} catch (Exception e) {System.out.println("网络连接异常: e=" + e.getMessage());System.exit(1);}}
}
我将上面的CheckActuator.java文件放到maven项目的test/java/跟目录下,后面会通过ant命令将.class移动到指定位置
- 健康检测脚本checkHealth.bat
上面的springboot项目会通过http服务,其运行的端口是8000,下面脚本会通过8000端口来获取对应的进程pid
::存活监控!
@echo off
set strPath=%~dp0
echo %strPath%
mkdir %strPath%log
set "yMd=%date:~0,4%-%date:~5,2%-%date:~8,2% %time:~0,8%"
set strFile=%strPath%log/checkHealth-%date:~0,4%%date:~5,2%%date:~8,2%.log
java -classpath %strPath% CheckActuator
if ERRORLEVEL 1 (goto err) else (goto ok):err
echo %yMd% 程序连接失败,进行重启! >> %strFile%
set port=8000
for /f "tokens=1-5" %%i in ('netstat -ano^|findstr ":%port%"') do (echo kill the process %%m who use the port taskkill /pid %%m -t -f
)
goto start
exit :ok
echo %yMd% 程序运行正常 >> %strFile%
exit
:start
chcp 65001
setlocal enabledelayedexpansion
set filename=""
for /f %%a in ('dir strPath *.jar /o-d /tc /b ') do (set filename=%%~na%%~xaecho 文件名: !filename!, 最新创建时间: %%~ta >> %strFile%if not !filename! == "" (goto startjar)
)
:startjar
rem 注释:查找最新文件结束,最新文件名为:%filename%
java -jar %strPath%%filename%
windows定时任务配置
- 新增-健康检查定时任务.bat
@echo off
set strPath=%~dp0
set checkBat=%strPath%checkHealth.bat
schtasks /create /tn xxx-health-check /tr %checkBat% /sc minute /mo 2
pause
上面的xxx-health-check
是定时任务的名字; /sc minute /mo 2
表示每2分钟执行一次命令。上面是通过命令配置的定时任务,也可以通过windows的图形管理界面【计划任务】配置。
- 移除-健康检查定时任务.bat
@echo off
pause
schtasks /delete /tn xxx-health-check /f
pause
- 查看-健康检查定时任务.bat
@echo off
schtasks /query /V /FO LIST /tn xxx-health-check
pause
通过windows环境变量设置java程序的配置
application.yml 部分配置如下:
server:port: ${K_PORT:8000}servlet:context-path: /
spring:application:name: xxxrabbitmq:host: ${K_MQ_HOST:172.18.1.100}password: ${K_MQ_PASSWORD:123456}port: ${K_MQ_PORT:5672}username: ${K_MQ_USERNAME:mq}connection-timeout: 15000listener:simple:acknowledge-mode: manual #开启手动ACKconcurrency: ${K_WORKS:1} # 并发max-concurrency: ${K_WORKS:1} # 最大并发prefetch: 1 # 每个消费每次预去取几个消息jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8shell:paths: ${K_BAT_PATHS:C:\invoke.bat}
可通过设置系统的环境变量来改变配置,可设置的变量包含:
变量 | 说明 | 默认值 |
---|---|---|
K_PORT | 程序运行的http服务端口 | 8000 |
K_MQ_HOST | rabbitmq 服务ip | 172.18.1.100 |
K_MQ_PORT | rabbitmq 服务端口 | 5672 |
K_MQ_USERNAME | rabbitmq 用户名 | mq |
K_MQ_PASSWORD | rabbitmq 密码 | 123456 |
K_BAT_PATHS | bat脚本路径,可以配置多个,通过英文逗号分隔,配置多个就会启动多个消费者,如:C:\invoke_1.bat,C:\invoke_2.bat | C:\invoke.bat |
K_WORKS | 每个消费者的并发数。如:K_BAT_PATHS配置了3个命令,K_WORKS 配置了 2 ,这表示有3*2=6个消费者 | 1 |
消费mq消息并执行bat文件
package cn.iccboy.kitchen.mq;import cn.iccboy.kitchen.common.CmdUtil;
import cn.iccboy.kitchen.common.ThreadUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;import static cn.iccboy.kitchen.mq.TopicRabbitMqConfig.EXCHANGE_DATA;
import static cn.iccboy.kitchen.mq.TopicRabbitMqConfig.KEY_INDEX_PROCESS;/*** @author iccboy* @date 2023-08-05 15:35*/
@Slf4j
public class CmdMqReceive {@Setterprivate String batPath;@Setterprivate Integer seq;@RabbitListener(queues = TopicRabbitMqConfig.QUEUE_INDEX_PROCESS)public void receive(Message<String> message, @Headers Map<String,Object> headers, Channel channel) throws IOException {long deliveryTag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);try {log.info("[start]第{}执行器,消息内容:{}", seq, message.getPayload());int status = CmdUtil.exec(batPath, message.getPayload());if(status != 0) {log.info("[err_1]第{}执行器,消息内容:{}加工脚本执行异常,状态码={}",seq, message.getPayload(), status);throw new RuntimeException("脚本执行异常");}log.info("[end]第{}执行器执行完成:{}", seq, message.getPayload());} catch (Exception e) {ThreadUtil.sleep(1000);log.error("[err]第{}执行器,执行异常重新进入队列:{}", seq, message.getPayload(), e);//channel.basicNack(deliveryTag, false, true);// 将处理错误的消息放到重新队列尾部channel.basicPublish(EXCHANGE_DATA,KEY_INDEX_PROCESS, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getPayload().getBytes(StandardCharsets.UTF_8));} finally {// 确认已处理channel.basicAck(deliveryTag,false);}}}
通过批处理命令配置个数,动态生成对应个数消费者
package cn.iccboy.kitchen.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;import java.util.List;@Slf4j
@Configuration
@Import(DynamicBuildMqReceiveBean.ImportConfig.class)
public class DynamicBuildMqReceiveBean {public static class ImportConfig implements ImportBeanDefinitionRegistrar, EnvironmentAware {private List<String> batPaths;@Overridepublic void setEnvironment(Environment environment) {try {batPaths = environment.getProperty("shell.paths", List.class);} catch (Exception ex) {log.error("参数绑定", ex);}}@Overridepublic void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {int seq = 0;for (String batPath : batPaths) {seq++;// 注册beanRootBeanDefinition beanDefinition = new RootBeanDefinition();beanDefinition.setBeanClass(CmdMqReceive.class);MutablePropertyValues values = new MutablePropertyValues();values.addPropertyValue("batPath", batPath);values.addPropertyValue("seq", seq);beanDefinition.setPropertyValues(values);registry.registerBeanDefinition(CmdMqReceive.class.getName() + "#" + seq, beanDefinition);}}}
}
上面通过ImportBeanDefinitionRegistrar
的方式 实现了动态bean的生成
通过maven的ant插件实现打包
在项目的 pom.xml文件中增加如下配置
<build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-antrun-plugin</artifactId><version>1.8</version><executions><execution><id>clean</id><phase>clean</phase><configuration><target><delete file="${basedir}/shell/CheckActuator.class"/></target></configuration><goals><goal>run</goal></goals></execution><execution><id>test-compile</id><phase>test-compile</phase><configuration><target><copy overwrite="true" file="${project.build.directory}/test-classes/CheckActuator.class"todir="${basedir}/shell" /></target></configuration><goals><goal>run</goal></goals></execution><execution><id>package</id><phase>package</phase><configuration><target><delete dir="${project.build.directory}/kitchen-mq-bin"/><mkdir dir="${project.build.directory}/kitchen-mq-bin"/><copy todir="${project.build.directory}/kitchen-mq-bin" overwrite="true"><fileset dir="${basedir}/shell" erroronmissingdir="false"><include name="*"/></fileset></copy><copy overwrite="true" file="${project.build.directory}/${project.name}-${project.version}.jar" todir="${project.build.directory}/kitchen-mq-bin" /><zip destfile="${basedir}/kitchen-mq-bin.zip"><fileset dir="${project.build.directory}/kitchen-mq-bin"><include name="*"/></fileset></zip></target></configuration><goals><goal>run</goal></goals></execution></executions></plugin></plugins></build>
项目结构如下图:
获取执行包
- 执行打包命令
mvn clean package
- 上面命令执行完成后,在项目的跟目录会产生一个压缩包
kitchen-mq-bin.zip
,将压缩包直接拷贝到目标服务器,解压即可。 - 解压后,直接执行
新增-健康检查定时任务.bat
即可。2分钟后就会启动程序。
下图是执行命令后,多出的 zip文件包,以及包里面的文件