1.设计
执行任务找一个落地场景:连接设备采集参数。设备有不同的协议,如:modbus rtu、modbus tcp、opc ua、simens s7等。协议多种多样,需要的参数也不同,连接及任务执行参数存放在t_job表的link_spec中,任务的配置存储在job_spec中,存储格式都是json。
1.1 任务的流转
1.2 类设计
1.2.1 Job类设计
JobAllotStrategy是接口,目前只定义了allotJob(分配任务)业务方法,后面会根据情况增加,下面AbstractAllotStrategy是抽象类,实现了JobAllotStrategy接口,将底层的具体实现的公共方法放在这里。
AverageJobAllotStrategy:平均分配任务策略实现类
WeightJobAllotStrategy:权重分配任务策略实现类
... 后面可以有很多
1.2.2 辅助类设计
ExecuteDttaskJobContext:任务相关参数的封装
JobAllotManager:对不同任务分配策略的集中管理
1.2.3 协议相关类的设计
IProtocol:顶层接口,定义协议的类型、开始方法和处理配置方法
AbstractProtocol:抽象类,对下面实际协议的实现的方式进行再细化以及共用逻辑的统一
ModbusRTUProtocol:针对Modbus RTU协议的处理
ModbusTCPProtocol:针对Modbus TCP协议的处理
VirtualProtocol:针对测试环境,虚拟化的协议处理
.... 可以自定义扩充
1.2.4 协议相关辅助类
CollectDataService:采集数据的最上层服务类
ICollectDataWorker:采集数据的接口定义,此处提供接口,方便后续不同采集数据实现的扩充
CommonCollectDataWorker:一般采集数据接口实现,使用java原生的ScheduledExecutorService实现周期性采集任务
2. 实现
2.1 Job相关类实现
2.1.1 JobAllotStrategy
目前此类很简单,只有分配job的类型和分配job两个方法,后面再去实现rebalanceJob的功能
public interface JobAllotStrategy {JobAllotStrategyType getType();void allotJob();}
2.1.2 AbstractJobAllotStrategy
JobAllotStrategy的抽象实现,后续所有Strategy的通用实现,以及一些复杂逻辑的组合可以放在这里
public abstract class AbstractJobAllotStrategy implements JobAllotStrategy {public List<Job> getAllJob() {return BeanUseHelper.entityHelpService().getAllJob();}public List<DttaskJob> getAllCrawlerJob() {return BeanUseHelper.entityHelpService().getAllDttaskJob();}public List<DttaskJob> getByCrawlerId(long dttaskId) {return BeanUseHelper.entityHelpService().queryDttaskJob(dttaskId);}}
2.1.3 AverageJobAllotStrategy
平均分配job的策略实现; WeightJobAllotStrategy可以模仿此类实现,主要逻辑是从每个Dttask节点解析配置的不同weight指标,默认是1:1:1,
可以配置成2:1:1,这样如果有8个任务,那么1号节点分配4个,2号和3号节点各分配2个
@Component
@Slf4j
public class AverageJobAllotStrategy extends AbstractJobAllotStrategy {@Autowiredprivate CollectDataService collectDataService;@Overridepublic JobAllotStrategyType getType() {return JobAllotStrategyType.AVERAGE;}@Overridepublic void allotJob() {EntityHelpService entityHelpService = BeanUseHelper.entityHelpService();Map<Long, List<Job>> allotJobMap = getAllotJobMap();log.info("allotJobMap={}", allotJobMap);entityHelpService.invalidAllDttaskJob();entityHelpService.saveAllDttaskJob(allotJobMap);Map<Long, List<DttaskJob>> dttaskJobMap = entityHelpService.queryDttaskJob();executeDttaskJob(new ExecuteDttaskJobContext(dttaskJobMap, true));}private void executeDttaskJob(ExecuteDttaskJobContext executeDttaskJobContext) {Map<Long, List<DttaskJob>> dttaskJobMap = executeDttaskJobContext.getDttaskJobMap();boolean startFlag = executeDttaskJobContext.getStartFlag();dttaskJobMap.forEach((dttaskId, dttaskJobList) -> {if (!Objects.equals(ServerInfo.getServerId(), dttaskId)) {// 向其它节点发送 任务控制 命令Set<Long> dttaskJobIdList = dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());DttaskMessage controlCollectMessage = DttaskMessage.buildControlCollectMessage(dttaskJobIdList, startFlag, dttaskId);log.info("向nodeId={}发送采集控制指令={}", controlCollectMessage);ServerInfo.getChannelByServerId(dttaskId).writeAndFlush(controlCollectMessage);} else {log.info("{}分配给自己的采集任务={}", startFlag ? "执行" : "停止", dttaskJobList);Set<Long> dttaskJobIds = dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());if (startFlag) {collectDataService.startCollectData(dttaskJobIds);} else {collectDataService.stopCollectData(dttaskJobIds);}}});}private Map<Long, List<Job>> getAllotJobMap() {List<Job> allJob = getAllJob();return average(allJob);}private <T> Map<Long, List<T>> average(List<T> list) {List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();int nodeCount = nodeInfoList.size();Map<Long, List<T>> allotJobMap = new HashMap<>();int averageJobCount = list.size() / nodeCount;int remainingJobCount = list.size() % nodeCount;int currentIndex = 0;for (NodeInfo nodeInfo : nodeInfoList) {allotJobMap.put(nodeInfo.getServerId(), list.subList(currentIndex, currentIndex + averageJobCount));currentIndex += averageJobCount;}while (remainingJobCount != 0) {for (Map.Entry<Long, List<T>> entry : allotJobMap.entrySet()) {entry.getValue().addAll(list.subList(currentIndex, currentIndex + 1));currentIndex++;remainingJobCount--;}}return allotJobMap;}}
2.1.4 ExecuteDttaskJobContext
执行任务传递的上下文信息,这里还比较简单,只是一个Map和执行的是启动还是停止,后续可以根据业务扩展,它也可以自带逻辑
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ExecuteDttaskJobContext {private Map<Long, List<DttaskJob>> dttaskJobMap;private Boolean startFlag;}
2.1.5 JobAllotStrategyType
分配job的策略枚举类
public enum JobAllotStrategyType {AVERAGE(0), WEIGHT(1), SPECIFIC(2);int code;JobAllotStrategyType(int code) {this.code = code;}public static JobAllotStrategyType from(int code) {for (JobAllotStrategyType value : values()) {if (value.code == code) {return value;}}throw new BusinessException(CharSequenceUtil.format("code={}不在JobAllotStrategyType中", code));}
}
2.1.6 JobAllotManager
和前面的netty任务处理策略一样,这里我也借助Spring容器管理所有JobAllotStrategy的策略,并根据配置,选择当前任务的策略
@Component
@Slf4j
public class JobAllotManager {@Autowiredprivate List<JobAllotStrategy> jobAllotStrategies;private static final Map<JobAllotStrategyType, JobAllotStrategy> map = new EnumMap<>(JobAllotStrategyType.class);@PostConstructpublic void init() {if (jobAllotStrategies != null) {for (JobAllotStrategy jobAllotStrategy : jobAllotStrategies) {map.put(jobAllotStrategy.getType(), jobAllotStrategy);}}}public static JobAllotStrategy getStrategy() {JobAllotStrategyType allotJobStrategyType = JobAllotStrategyType.from(BeanUseHelper.dttaskServerConfig().getAllotJobStrategyType());if (map.containsKey(allotJobStrategyType)) {return map.get(allotJobStrategyType);}throw new BusinessException(CharSequenceUtil.format("allotJobStrategyType={} 配置有误", allotJobStrategyType));}}
2.2 Protocol相关类实现
Protocol后面的变化会很多,不同协议需要的参数,处理的逻辑都不相同,目前仅进行较简化的封装
2.2.1 IProtocol
协议接口,主要是启动协议处理,我暂时将协议关闭的逻辑放到了Job这一侧去完成,也可以在这里提供一个stop接口,自己协议完成
public interface IProtocol {int getType();void start(ProtocolContext protocolContext);void parseConfig(ProtocolContext protocolContext);}
2.2.2 AbstractProtocol
@Slf4j
public abstract class AbstractProtocol implements IProtocol {protected Long dttaskId;protected Long dttaskJobId;protected Long deviceId;protected ProtocolContext protocolContext;public void parseConfig(ProtocolContext protocolContext) {this.dttaskId = protocolContext.getDttaskId();this.dttaskJobId = protocolContext.getDttaskJobId();this.deviceId = protocolContext.getDeviceId();this.protocolContext = protocolContext;doParseConfig(protocolContext);}protected abstract void doParseConfig(ProtocolContext protocolContext);public abstract void doStart();public void start(ProtocolContext protocolContext) {log.info("进入 AbstractProtocol.start, protocolContext={}", protocolContext);parseConfig(protocolContext);doStart();}}
2.2.3 Modbus RTU相关类实现
2.2.3.1 ModbusRTUProtocol
@Data
@Slf4j
@Component
public class ModbusRTUProtocol extends AbstractProtocol {private ModbusRTUSpecModel modbusRTUSpecModel;@Autowiredprivate ICollectDataWorker collectDataWorker;@Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject = protocolContext.getParam();this.modbusRTUSpecModel = ModbusRTUSpecModel.getFromParam(jsonObject);}@Overridepublic int getType() {return 0;}@Overridepublic void doStart() {try {SerialParameters serialParameters = new SerialParameters();serialParameters.setPortName(modbusRTUSpecModel.getPortName());serialParameters.setBaudRate(modbusRTUSpecModel.getBaudRate());serialParameters.setDatabits(modbusRTUSpecModel.getDatabits());serialParameters.setStopbits(modbusRTUSpecModel.getStopbits());serialParameters.setParity(modbusRTUSpecModel.getParity());serialParameters.setEncoding(modbusRTUSpecModel.getEncoding());serialParameters.setEcho(modbusRTUSpecModel.getEcho());serialParameters.setPortName(modbusRTUSpecModel.getPortName());ModbusSerialMaster modbusSerialMaster = new ModbusSerialMaster(serialParameters);modbusSerialMaster.connect();ModbusRTUCollectWorker modbusRTUCollectWorker = new ModbusRTUCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusSerialMaster, modbusRTUSpecModel);collectDataWorker.addCollectTask(dttaskJobId, modbusRTUCollectWorker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error("DttaskId={}采集config={}出现异常", this.dttaskId, protocolContext, e);}}
}
2.2.3.2 ModbusRTUCollectWorker
@Slf4j
public class ModbusRTUCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusSerialMaster master;private ModbusRTUSpecModel modbusRTUSpecModel;private Map<String, Long> lastReadMap = new HashMap<>();public ModbusRTUCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusSerialMaster master, ModbusRTUSpecModel modbusRTUSpecModel) {this.dttaskId = dttaskId;this.dttaskJobId = dttaskJobId;this.deviceId = deviceId;this.master = master;this.modbusRTUSpecModel = modbusRTUSpecModel;}@Overridepublic void run() {long current = new Date().getTime();for (ModbusRTUSpecModel.PointDetail pointDetail : modbusRTUSpecModel.getPointDetailList()) {String key = pointDetail.getKey();if (lastReadMap.containsKey(key) &&(current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() >= 1) {try {Register[] registers = master.readMultipleRegisters(modbusRTUSpecModel.getUnitId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info("Register value:{}", register.getValue());}} catch (Exception e) {log.error("DttaskId={}采集registerConfig={}出现异常", this.dttaskId, pointDetail, e);}}}}
}
2.2.3.3 ModbusRTUSpecModel
@Data
public class ModbusRTUSpecModel {private Integer mode;private Integer unitId;private String portName;private Integer baudRate;private Integer databits;private String parity;private Integer stopbits;private String encoding = "RTU";private Boolean echo = false;private List<PointDetail> pointDetailList;@Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusRTUSpecModel getFromParam(JSONObject param) {JSONObject linkSpec = param.getJSONObject("linkSpec");ModbusRTUSpecModel modbusRTUSpecModel = new ModbusRTUSpecModel();modbusRTUSpecModel.setMode(linkSpec.getInteger("mode"));modbusRTUSpecModel.setUnitId(linkSpec.getInteger("unitId"));modbusRTUSpecModel.setPortName(linkSpec.getString("portName"));modbusRTUSpecModel.setBaudRate(linkSpec.getInteger("baudRate"));modbusRTUSpecModel.setDatabits(linkSpec.getInteger("databits"));modbusRTUSpecModel.setParity(linkSpec.getString("parity"));modbusRTUSpecModel.setStopbits(linkSpec.getInteger("stopbits"));JSONArray pointDetailJsonArray = linkSpec.getJSONArray("pointDetailList");List<PointDetail> rtuPointDetailList = new ArrayList<>();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail = (JSONObject)pointDetailObject;PointDetail rtuPointDetail = new PointDetail();rtuPointDetail.setKey(pointDetail.getString("key"));rtuPointDetail.setOffset(pointDetail.getInteger("offset"));rtuPointDetail.setNumOfRegisters(pointDetail.getInteger("numOfRegisters"));rtuPointDetail.setSamplingInterval(pointDetail.getInteger("samplingInterval"));rtuPointDetailList.add(rtuPointDetail);}modbusRTUSpecModel.setPointDetailList(rtuPointDetailList);return modbusRTUSpecModel;}
}
2.2.4 Modbus TCP相关类实现
2.2.4.1 ModbusTCPProtocol
@Component
@Slf4j
public class ModbusTCPProtocol extends AbstractProtocol {private ModbusTCPSpecModel modbusTCPSpecModel;@Autowiredprivate ICollectDataWorker collectDataWorker;@Overridepublic int getType() {return Constant.EntityConstants.LINK_TYPE_MODBUSTCP;}@Overrideprotected void doParseConfig(ProtocolContext protocolContext) {JSONObject param = protocolContext.getParam();modbusTCPSpecModel = ModbusTCPSpecModel.getFromParam(param);}@Overridepublic void doStart() {try {ModbusTCPMaster modbusTCPMaster = new ModbusTCPMaster(modbusTCPSpecModel.getIp(), modbusTCPSpecModel.getPort(), 5, true);modbusTCPMaster.connect();ModbusTCPCollectWorker worker = new ModbusTCPCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusTCPSpecModel, modbusTCPMaster);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error("DttaskId={}采集config={}出现异常", dttaskId, modbusTCPSpecModel, e);}}
}
2.2.4.2 ModbusTCPCollectWorker
@Slf4j
public class ModbusTCPCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusTCPSpecModel modbusTCPSpecModel;private ModbusTCPMaster master;private Map<String, Long> lastReadMap = new HashMap<>();public ModbusTCPCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusTCPSpecModel modbusTCPSpecModel, ModbusTCPMaster master) {this.dttaskId = dttaskId;this.dttaskJobId = dttaskJobId;this.deviceId = deviceId;this.modbusTCPSpecModel = modbusTCPSpecModel;this.master = master;}@Overridepublic void run() {long current = new Date().getTime();for (ModbusTCPSpecModel.PointDetail pointDetail : modbusTCPSpecModel.getPointDetailList()) {String key = pointDetail.getKey();if (lastReadMap.containsKey(key) &&(current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() >= 1) {try {Register[] registers = master.readMultipleRegisters(modbusTCPSpecModel.getSlaveId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info("Register value:{}", register.getValue());}} catch (Exception e) {log.error("DttaskId={}采集pointDetail={}出现异常", dttaskId, pointDetail, e);}}}}
}
2.2.4.3 ModbusTCPSpecModel
@Data
public class ModbusTCPSpecModel {private Integer mode;private Long deviceId;private String ip;private Integer port;private Integer slaveId;private List<PointDetail> pointDetailList;@Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusTCPSpecModel getFromParam(JSONObject param) {JSONObject linkSpec = param.getJSONObject("linkSpec");ModbusTCPSpecModel modbusTCPSpecModel = new ModbusTCPSpecModel();modbusTCPSpecModel.setMode(linkSpec.getInteger("mode"));modbusTCPSpecModel.setDeviceId(linkSpec.getLong("deviceId"));modbusTCPSpecModel.setIp(linkSpec.getString("ip"));modbusTCPSpecModel.setPort(linkSpec.getInteger("port"));modbusTCPSpecModel.setSlaveId(linkSpec.getInteger("slaveId"));JSONArray pointDetailJsonArray = linkSpec.getJSONArray("pointDetailList");List<PointDetail> tcpPointDetailList = new ArrayList<>();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail = (JSONObject)pointDetailObject; PointDetail tcpPointDetail = new PointDetail();tcpPointDetail.setKey(pointDetail.getString("key"));tcpPointDetail.setOffset(pointDetail.getInteger("offset"));tcpPointDetail.setNumOfRegisters(pointDetail.getInteger("numOfRegisters"));tcpPointDetail.setSamplingInterval(pointDetail.getInteger("samplingInterval"));tcpPointDetailList.add(tcpPointDetail);}modbusTCPSpecModel.setPointDetailList(tcpPointDetailList);return modbusTCPSpecModel;}
}
2.2.5 Virtual相关类实现
2.2.5.1 VirtualProtocol
@Data
@Slf4j
@Component
public class VirtualProtocol extends AbstractProtocol {private VirtualSpecModel virtualSpecModel;@Autowiredprivate ICollectDataWorker collectDataWorker;@Overridepublic int getType() {return -1;}@Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject = protocolContext.getParam();this.virtualSpecModel = VirtualSpecModel.getFromParam(jsonObject);}@Overridepublic void doStart() {try {VirtualCollectWorker worker = new VirtualCollectWorker(dttaskId, dttaskJobId, deviceId, virtualSpecModel);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error("DttaskId={}采集config={}出现异常", dttaskId, protocolContext, e);}}
}
2.2.5.2 VirtualCollectWorker
@Slf4j
public class VirtualCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private VirtualSpecModel virtualSpecModel;public VirtualCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, VirtualSpecModel virtualSpecModel) {this.dttaskId = dttaskId;this.dttaskJobId = dttaskJobId;this.deviceId = deviceId;this.virtualSpecModel = virtualSpecModel;}@Overridepublic void run() {log.info("deviceId={},dttaskId={},dttaskJobId={},virtualSpecModel={}",deviceId, dttaskId, dttaskJobId, virtualSpecModel);}
}
2.2.5.3 VirtualSpecModel
@Data
@Slf4j
public class VirtualSpecModel {private VirtualSpecModel() {}public static VirtualSpecModel getFromParam(JSONObject jsonObject) {log.debug("VirtualSpecModel.getFromParam param={}", jsonObject);return new VirtualSpecModel();}}
2.3 将Job和Protocol串起来的类设计
2.3.1 ICollectDataWorker
public interface ICollectDataWorker {void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit);void removeCollectMonitor(long dttaskJobId);ScheduledFuture<Void> getCollectMonitorScheduledFuture(long dttaskJobId);void doCollect(Set<Long> dttaskJobId);void stopCollect(Set<Long> dttaskJobId);
}
2.3.2 CommonCollectDataWorker
@Component
@Slf4j
public class CommonCollectDataWorker implements ICollectDataWorker {private ScheduledExecutorService collectDataExecutor= new InfiniteScheduledThreadPoolExecutor(10, new CustomThreadFactory("CommonCollectDataWorker-THREAD-"));private Map<Long, ScheduledFuture<Void>> monitorDttaskJobStateMap = new ConcurrentHashMap<>();public void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit) {ScheduledFuture<Void> scheduledFuture = (ScheduledFuture<Void>) collectDataExecutor.scheduleAtFixedRate(runnable, delay, period, timeUnit);monitorDttaskJobStateMap.put(dttaskJobId, scheduledFuture);}public synchronized void removeCollectMonitor(long dttaskJobId) {monitorDttaskJobStateMap.remove(dttaskJobId);}public ScheduledFuture<Void> getCollectMonitorScheduledFuture(long dttaskJobId) {return monitorDttaskJobStateMap.get(dttaskJobId);}@Overridepublic void doCollect(Set<Long> dttaskJobIds) {log.info("进入 CommonCollectDataWorker.doCollect, dttaskJobIds={}", dttaskJobIds);List<DttaskJob> dttaskJobs = BeanUseHelper.entityHelpService().queryDttaskJob(dttaskJobIds);for (DttaskJob dttaskJob : dttaskJobs) {ProtocolContext protocolContext = new ProtocolContext();protocolContext.setDttaskId(dttaskJob.getDttaskId());protocolContext.setDeviceId(dttaskJob.getDeviceId());protocolContext.setDttaskJobId(dttaskJob.getId());protocolContext.setLinkType(dttaskJob.getLinkType());JSONObject param = new JSONObject();param.put("linkSpec", JSON.parseObject(JSON.toJSONString(dttaskJob.getLinkSpec())));param.put("jobSpec", JSON.parseObject(JSON.toJSONString(dttaskJob.getJobSpec())));protocolContext.setParam(param);IProtocol protocol = ProtocolManager.getProtocol(protocolContext.getLinkType());protocol.start(protocolContext);}}@Overridepublic void stopCollect(Set<Long> dttaskJobIds) {log.info("进入 CommonCollectDataWorker.stopCollect, dttaskJobIds={}", dttaskJobIds);for (Long dttaskJobId : dttaskJobIds) {ScheduledFuture<?> scheduledFuture = getCollectMonitorScheduledFuture(dttaskJobId);scheduledFuture.cancel(true);removeCollectMonitor(dttaskJobId);}}
}
2.3.2 CollectDataService
@Component
@Slf4j
public class CollectDataService {@Autowiredprivate ICollectDataWorker collectDataWorker;private CollectDataService() {}/*** 开始采集数据*/public void startCollectData(Set<Long> dttaskJobId) {BeanUseHelper.entityHelpService().runDttaskJob(dttaskJobId, ServerInfo.getServerId());collectDataWorker.doCollect(dttaskJobId);}public void stopCollectData(Set<Long> dttaskJobId) {collectDataWorker.stopCollect(dttaskJobId);BeanUseHelper.entityHelpService().stopDttaskJob(dttaskJobId, ServerInfo.getServerId());}
}
根据业务需要使用CollectDataService,它作为采集数据的业务管理类,承接上层业务目的,然后组织实现逻辑后交给ICollectDataWorker的实现去完成。
ICollectDataWorker的实现对接Protocol的实现,组合Protocol需要的参数,调用具体的Protocol完成特定协议采集数据的功能
3. 验证
代码地址在:GitHub - swsm/dttask: 分布式插件化任务执行框架 ,欢迎Star❤️
3.1 准备数据
数据库脚本 (这里job使用的protocol都是Virtual,方便大家测试)
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (1, 1, 1, -1, '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:37', '2023-12-12 16:35:37');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (2, 2, 2, -1, '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:40', '2023-12-12 16:35:40');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (3, 3, 3, -1, '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:44', '2023-12-12 16:35:44');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (4, 4, 4, -1, '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:45', '2023-12-12 16:35:45');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (5, 5, 5, -1, '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:45', '2023-12-12 16:35:45');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (6, 6, 6, -1, '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:47', '2023-12-12 16:35:47');
3.2 启动三个节点
这里会看到3个节点启动完成,1号节点为Controller,2 3号节点为Follower,这是前面的逻辑,不清楚的可以看前面的文章。
然后Controller会将任务按照任务分配策略分配给所有节点(包括自己),然后每个节点执行对应的任务。
- 数据库t_dttask_job表里有了对应每个节点的任务
- 各节点每秒执行一次任务
因为CollectManager类已经封装了doCollect 和 stopCollect的方法,大家自行创建Controller,调用方法就可以实现对某个任务的停止和启动了