6.任务分配与执行总体设计实现

1.设计

执行任务找一个落地场景:连接设备采集参数。设备有不同的协议,如:modbus rtu、modbus tcp、opc ua、simens s7等。协议多种多样,需要的参数也不同,连接及任务执行参数存放在t_job表的link_spec中,任务的配置存储在job_spec中,存储格式都是json。

1.1 任务的流转

1.2 类设计

1.2.1 Job类设计

JobAllotStrategy是接口,目前只定义了allotJob(分配任务)业务方法,后面会根据情况增加,下面AbstractAllotStrategy是抽象类,实现了JobAllotStrategy接口,将底层的具体实现的公共方法放在这里。

AverageJobAllotStrategy:平均分配任务策略实现类

WeightJobAllotStrategy:权重分配任务策略实现类

... 后面可以有很多

1.2.2 辅助类设计

ExecuteDttaskJobContext:任务相关参数的封装

JobAllotManager:对不同任务分配策略的集中管理

1.2.3 协议相关类的设计

IProtocol:顶层接口,定义协议的类型、开始方法和处理配置方法

AbstractProtocol:抽象类,对下面实际协议的实现的方式进行再细化以及共用逻辑的统一

ModbusRTUProtocol:针对Modbus RTU协议的处理

ModbusTCPProtocol:针对Modbus TCP协议的处理

VirtualProtocol:针对测试环境,虚拟化的协议处理

.... 可以自定义扩充

1.2.4 协议相关辅助类

CollectDataService:采集数据的最上层服务类

ICollectDataWorker:采集数据的接口定义,此处提供接口,方便后续不同采集数据实现的扩充

CommonCollectDataWorker:一般采集数据接口实现,使用java原生的ScheduledExecutorService实现周期性采集任务

2. 实现

2.1 Job相关类实现

2.1.1 JobAllotStrategy

目前此类很简单,只有分配job的类型和分配job两个方法,后面再去实现rebalanceJob的功能

public interface JobAllotStrategy {JobAllotStrategyType getType();void allotJob();}

2.1.2 AbstractJobAllotStrategy

JobAllotStrategy的抽象实现,后续所有Strategy的通用实现,以及一些复杂逻辑的组合可以放在这里

public abstract class AbstractJobAllotStrategy implements JobAllotStrategy {public List<Job> getAllJob() {return BeanUseHelper.entityHelpService().getAllJob();}public List<DttaskJob> getAllCrawlerJob() {return BeanUseHelper.entityHelpService().getAllDttaskJob();}public List<DttaskJob> getByCrawlerId(long dttaskId) {return BeanUseHelper.entityHelpService().queryDttaskJob(dttaskId);}}

2.1.3 AverageJobAllotStrategy

平均分配job的策略实现; WeightJobAllotStrategy可以模仿此类实现,主要逻辑是从每个Dttask节点解析配置的不同weight指标,默认是1:1:1,

可以配置成2:1:1,这样如果有8个任务,那么1号节点分配4个,2号和3号节点各分配2个

@Component
@Slf4j
public class AverageJobAllotStrategy extends AbstractJobAllotStrategy {@Autowiredprivate CollectDataService collectDataService;@Overridepublic JobAllotStrategyType getType() {return JobAllotStrategyType.AVERAGE;}@Overridepublic void allotJob() {EntityHelpService entityHelpService = BeanUseHelper.entityHelpService();Map<Long, List<Job>> allotJobMap = getAllotJobMap();log.info("allotJobMap={}", allotJobMap);entityHelpService.invalidAllDttaskJob();entityHelpService.saveAllDttaskJob(allotJobMap);Map<Long, List<DttaskJob>> dttaskJobMap = entityHelpService.queryDttaskJob();executeDttaskJob(new ExecuteDttaskJobContext(dttaskJobMap, true));}private void executeDttaskJob(ExecuteDttaskJobContext executeDttaskJobContext) {Map<Long, List<DttaskJob>> dttaskJobMap = executeDttaskJobContext.getDttaskJobMap();boolean startFlag = executeDttaskJobContext.getStartFlag();dttaskJobMap.forEach((dttaskId, dttaskJobList) -> {if (!Objects.equals(ServerInfo.getServerId(), dttaskId)) {// 向其它节点发送 任务控制 命令Set<Long> dttaskJobIdList = dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());DttaskMessage controlCollectMessage = DttaskMessage.buildControlCollectMessage(dttaskJobIdList, startFlag, dttaskId);log.info("向nodeId={}发送采集控制指令={}", controlCollectMessage);ServerInfo.getChannelByServerId(dttaskId).writeAndFlush(controlCollectMessage);} else {log.info("{}分配给自己的采集任务={}", startFlag ? "执行" : "停止", dttaskJobList);Set<Long> dttaskJobIds = dttaskJobList.stream().map(DttaskJob::getId).collect(Collectors.toSet());if (startFlag) {collectDataService.startCollectData(dttaskJobIds);} else {collectDataService.stopCollectData(dttaskJobIds);}}});}private Map<Long, List<Job>> getAllotJobMap() {List<Job> allJob = getAllJob();return average(allJob);}private <T> Map<Long, List<T>> average(List<T> list) {List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();int nodeCount = nodeInfoList.size();Map<Long, List<T>> allotJobMap = new HashMap<>();int averageJobCount = list.size() / nodeCount;int remainingJobCount = list.size() % nodeCount;int currentIndex = 0;for (NodeInfo nodeInfo : nodeInfoList) {allotJobMap.put(nodeInfo.getServerId(), list.subList(currentIndex, currentIndex + averageJobCount));currentIndex += averageJobCount;}while (remainingJobCount != 0) {for (Map.Entry<Long, List<T>> entry : allotJobMap.entrySet()) {entry.getValue().addAll(list.subList(currentIndex, currentIndex + 1));currentIndex++;remainingJobCount--;}}return allotJobMap;}}

2.1.4 ExecuteDttaskJobContext

执行任务传递的上下文信息,这里还比较简单,只是一个Map和执行的是启动还是停止,后续可以根据业务扩展,它也可以自带逻辑

@Data
@AllArgsConstructor
@NoArgsConstructor
public class ExecuteDttaskJobContext {private Map<Long, List<DttaskJob>> dttaskJobMap;private Boolean startFlag;}

2.1.5 JobAllotStrategyType

分配job的策略枚举类

public enum JobAllotStrategyType {AVERAGE(0), WEIGHT(1), SPECIFIC(2);int code;JobAllotStrategyType(int code) {this.code = code;}public static JobAllotStrategyType from(int code) {for (JobAllotStrategyType value : values()) {if (value.code == code) {return value;}}throw new BusinessException(CharSequenceUtil.format("code={}不在JobAllotStrategyType中", code));}
}

2.1.6 JobAllotManager

和前面的netty任务处理策略一样,这里我也借助Spring容器管理所有JobAllotStrategy的策略,并根据配置,选择当前任务的策略

@Component
@Slf4j
public class JobAllotManager {@Autowiredprivate List<JobAllotStrategy> jobAllotStrategies;private static final Map<JobAllotStrategyType, JobAllotStrategy> map = new EnumMap<>(JobAllotStrategyType.class);@PostConstructpublic void init() {if (jobAllotStrategies != null) {for (JobAllotStrategy jobAllotStrategy : jobAllotStrategies) {map.put(jobAllotStrategy.getType(), jobAllotStrategy);}}}public static JobAllotStrategy getStrategy() {JobAllotStrategyType allotJobStrategyType = JobAllotStrategyType.from(BeanUseHelper.dttaskServerConfig().getAllotJobStrategyType());if (map.containsKey(allotJobStrategyType)) {return map.get(allotJobStrategyType);}throw new BusinessException(CharSequenceUtil.format("allotJobStrategyType={} 配置有误", allotJobStrategyType));}}

2.2 Protocol相关类实现

Protocol后面的变化会很多,不同协议需要的参数,处理的逻辑都不相同,目前仅进行较简化的封装

2.2.1 IProtocol

协议接口,主要是启动协议处理,我暂时将协议关闭的逻辑放到了Job这一侧去完成,也可以在这里提供一个stop接口,自己协议完成

public interface IProtocol {int getType();void start(ProtocolContext protocolContext);void parseConfig(ProtocolContext protocolContext);}

2.2.2 AbstractProtocol

@Slf4j
public abstract class AbstractProtocol implements IProtocol {protected Long dttaskId;protected Long dttaskJobId;protected Long deviceId;protected ProtocolContext protocolContext;public void parseConfig(ProtocolContext protocolContext) {this.dttaskId = protocolContext.getDttaskId();this.dttaskJobId = protocolContext.getDttaskJobId();this.deviceId = protocolContext.getDeviceId();this.protocolContext = protocolContext;doParseConfig(protocolContext);}protected abstract void doParseConfig(ProtocolContext protocolContext);public abstract void doStart();public void start(ProtocolContext protocolContext) {log.info("进入 AbstractProtocol.start, protocolContext={}", protocolContext);parseConfig(protocolContext);doStart();}}

2.2.3 Modbus RTU相关类实现

2.2.3.1 ModbusRTUProtocol

@Data
@Slf4j
@Component
public class ModbusRTUProtocol extends AbstractProtocol {private ModbusRTUSpecModel modbusRTUSpecModel;@Autowiredprivate ICollectDataWorker collectDataWorker;@Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject = protocolContext.getParam();this.modbusRTUSpecModel = ModbusRTUSpecModel.getFromParam(jsonObject);}@Overridepublic int getType() {return 0;}@Overridepublic void doStart() {try {SerialParameters serialParameters = new SerialParameters();serialParameters.setPortName(modbusRTUSpecModel.getPortName());serialParameters.setBaudRate(modbusRTUSpecModel.getBaudRate());serialParameters.setDatabits(modbusRTUSpecModel.getDatabits());serialParameters.setStopbits(modbusRTUSpecModel.getStopbits());serialParameters.setParity(modbusRTUSpecModel.getParity());serialParameters.setEncoding(modbusRTUSpecModel.getEncoding());serialParameters.setEcho(modbusRTUSpecModel.getEcho());serialParameters.setPortName(modbusRTUSpecModel.getPortName());ModbusSerialMaster modbusSerialMaster = new ModbusSerialMaster(serialParameters);modbusSerialMaster.connect();ModbusRTUCollectWorker modbusRTUCollectWorker = new ModbusRTUCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusSerialMaster, modbusRTUSpecModel);collectDataWorker.addCollectTask(dttaskJobId, modbusRTUCollectWorker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error("DttaskId={}采集config={}出现异常", this.dttaskId, protocolContext, e);}}
}

2.2.3.2 ModbusRTUCollectWorker

@Slf4j
public class ModbusRTUCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusSerialMaster master;private ModbusRTUSpecModel modbusRTUSpecModel;private Map<String, Long> lastReadMap = new HashMap<>();public ModbusRTUCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusSerialMaster master, ModbusRTUSpecModel modbusRTUSpecModel) {this.dttaskId = dttaskId;this.dttaskJobId = dttaskJobId;this.deviceId = deviceId;this.master = master;this.modbusRTUSpecModel = modbusRTUSpecModel;}@Overridepublic void run() {long current = new Date().getTime();for (ModbusRTUSpecModel.PointDetail pointDetail : modbusRTUSpecModel.getPointDetailList()) {String key = pointDetail.getKey();if (lastReadMap.containsKey(key) &&(current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() >= 1) {try {Register[] registers = master.readMultipleRegisters(modbusRTUSpecModel.getUnitId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info("Register value:{}", register.getValue());}} catch (Exception e) {log.error("DttaskId={}采集registerConfig={}出现异常", this.dttaskId, pointDetail, e);}}}}
}

2.2.3.3 ModbusRTUSpecModel

@Data
public class ModbusRTUSpecModel {private Integer mode;private Integer unitId;private String portName;private Integer baudRate;private Integer databits;private String parity;private Integer stopbits;private String encoding = "RTU";private Boolean echo = false;private List<PointDetail> pointDetailList;@Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusRTUSpecModel getFromParam(JSONObject param) {JSONObject linkSpec = param.getJSONObject("linkSpec");ModbusRTUSpecModel modbusRTUSpecModel = new ModbusRTUSpecModel();modbusRTUSpecModel.setMode(linkSpec.getInteger("mode"));modbusRTUSpecModel.setUnitId(linkSpec.getInteger("unitId"));modbusRTUSpecModel.setPortName(linkSpec.getString("portName"));modbusRTUSpecModel.setBaudRate(linkSpec.getInteger("baudRate"));modbusRTUSpecModel.setDatabits(linkSpec.getInteger("databits"));modbusRTUSpecModel.setParity(linkSpec.getString("parity"));modbusRTUSpecModel.setStopbits(linkSpec.getInteger("stopbits"));JSONArray pointDetailJsonArray = linkSpec.getJSONArray("pointDetailList");List<PointDetail> rtuPointDetailList = new ArrayList<>();for (Object pointDetailObject : pointDetailJsonArray) {JSONObject pointDetail = (JSONObject)pointDetailObject;PointDetail rtuPointDetail = new PointDetail();rtuPointDetail.setKey(pointDetail.getString("key"));rtuPointDetail.setOffset(pointDetail.getInteger("offset"));rtuPointDetail.setNumOfRegisters(pointDetail.getInteger("numOfRegisters"));rtuPointDetail.setSamplingInterval(pointDetail.getInteger("samplingInterval"));rtuPointDetailList.add(rtuPointDetail);}modbusRTUSpecModel.setPointDetailList(rtuPointDetailList);return modbusRTUSpecModel;}
}

2.2.4 Modbus TCP相关类实现

2.2.4.1 ModbusTCPProtocol

@Component
@Slf4j
public class ModbusTCPProtocol extends AbstractProtocol {private ModbusTCPSpecModel modbusTCPSpecModel;@Autowiredprivate ICollectDataWorker collectDataWorker;@Overridepublic int getType() {return Constant.EntityConstants.LINK_TYPE_MODBUSTCP;}@Overrideprotected void doParseConfig(ProtocolContext protocolContext) {JSONObject param = protocolContext.getParam();modbusTCPSpecModel = ModbusTCPSpecModel.getFromParam(param);}@Overridepublic void doStart() {try {ModbusTCPMaster modbusTCPMaster = new ModbusTCPMaster(modbusTCPSpecModel.getIp(), modbusTCPSpecModel.getPort(), 5, true);modbusTCPMaster.connect();ModbusTCPCollectWorker worker = new ModbusTCPCollectWorker(this.dttaskId, this.dttaskJobId, deviceId, modbusTCPSpecModel, modbusTCPMaster);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error("DttaskId={}采集config={}出现异常", dttaskId, modbusTCPSpecModel, e);}}
}

2.2.4.2 ModbusTCPCollectWorker

@Slf4j
public class ModbusTCPCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private ModbusTCPSpecModel modbusTCPSpecModel;private ModbusTCPMaster master;private Map<String, Long> lastReadMap = new HashMap<>();public ModbusTCPCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, ModbusTCPSpecModel modbusTCPSpecModel, ModbusTCPMaster master) {this.dttaskId = dttaskId;this.dttaskJobId = dttaskJobId;this.deviceId = deviceId;this.modbusTCPSpecModel = modbusTCPSpecModel;this.master = master;}@Overridepublic void run() {long current = new Date().getTime();for (ModbusTCPSpecModel.PointDetail pointDetail : modbusTCPSpecModel.getPointDetailList()) {String key = pointDetail.getKey();if (lastReadMap.containsKey(key) &&(current - lastReadMap.get(key)) % pointDetail.getSamplingInterval() >= 1) {try {Register[] registers = master.readMultipleRegisters(modbusTCPSpecModel.getSlaveId(),pointDetail.getOffset(),pointDetail.getNumOfRegisters());for (Register register : registers) {log.info("Register value:{}", register.getValue());}} catch (Exception e) {log.error("DttaskId={}采集pointDetail={}出现异常", dttaskId, pointDetail, e);}}}}
}

2.2.4.3 ModbusTCPSpecModel

@Data
public class ModbusTCPSpecModel {private Integer mode;private Long deviceId;private String ip;private Integer port;private Integer slaveId;private List<PointDetail> pointDetailList;@Datapublic static class PointDetail {private String key;private Integer offset;private Integer numOfRegisters;private Integer samplingInterval;}public static ModbusTCPSpecModel getFromParam(JSONObject param) {JSONObject linkSpec = param.getJSONObject("linkSpec");ModbusTCPSpecModel modbusTCPSpecModel = new ModbusTCPSpecModel();modbusTCPSpecModel.setMode(linkSpec.getInteger("mode"));modbusTCPSpecModel.setDeviceId(linkSpec.getLong("deviceId"));modbusTCPSpecModel.setIp(linkSpec.getString("ip"));modbusTCPSpecModel.setPort(linkSpec.getInteger("port"));modbusTCPSpecModel.setSlaveId(linkSpec.getInteger("slaveId"));JSONArray pointDetailJsonArray = linkSpec.getJSONArray("pointDetailList");List<PointDetail> tcpPointDetailList = new ArrayList<>();for (Object pointDetailObject  : pointDetailJsonArray) {JSONObject pointDetail = (JSONObject)pointDetailObject; PointDetail tcpPointDetail = new PointDetail();tcpPointDetail.setKey(pointDetail.getString("key"));tcpPointDetail.setOffset(pointDetail.getInteger("offset"));tcpPointDetail.setNumOfRegisters(pointDetail.getInteger("numOfRegisters"));tcpPointDetail.setSamplingInterval(pointDetail.getInteger("samplingInterval"));tcpPointDetailList.add(tcpPointDetail);}modbusTCPSpecModel.setPointDetailList(tcpPointDetailList);return modbusTCPSpecModel;}
}

2.2.5 Virtual相关类实现

2.2.5.1 VirtualProtocol

@Data
@Slf4j
@Component
public class VirtualProtocol extends AbstractProtocol {private VirtualSpecModel virtualSpecModel;@Autowiredprivate ICollectDataWorker collectDataWorker;@Overridepublic int getType() {return -1;}@Overridepublic void doParseConfig(ProtocolContext protocolContext) {JSONObject jsonObject = protocolContext.getParam();this.virtualSpecModel = VirtualSpecModel.getFromParam(jsonObject);}@Overridepublic void doStart() {try {VirtualCollectWorker worker = new VirtualCollectWorker(dttaskId, dttaskJobId, deviceId, virtualSpecModel);collectDataWorker.addCollectTask(dttaskJobId, worker, 5, 1, TimeUnit.SECONDS);} catch (Exception e) {log.error("DttaskId={}采集config={}出现异常", dttaskId, protocolContext, e);}}
}

2.2.5.2 VirtualCollectWorker

@Slf4j
public class VirtualCollectWorker implements Runnable {private Long dttaskId;private Long dttaskJobId;private Long deviceId;private VirtualSpecModel virtualSpecModel;public VirtualCollectWorker(Long dttaskId, Long dttaskJobId, Long deviceId, VirtualSpecModel virtualSpecModel) {this.dttaskId = dttaskId;this.dttaskJobId = dttaskJobId;this.deviceId = deviceId;this.virtualSpecModel = virtualSpecModel;}@Overridepublic void run() {log.info("deviceId={},dttaskId={},dttaskJobId={},virtualSpecModel={}",deviceId, dttaskId, dttaskJobId, virtualSpecModel);}
}

2.2.5.3 VirtualSpecModel

@Data
@Slf4j
public class VirtualSpecModel {private VirtualSpecModel() {}public static VirtualSpecModel getFromParam(JSONObject jsonObject) {log.debug("VirtualSpecModel.getFromParam param={}", jsonObject);return new VirtualSpecModel();}}

2.3 将Job和Protocol串起来的类设计

2.3.1 ICollectDataWorker

public interface ICollectDataWorker {void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit);void removeCollectMonitor(long dttaskJobId);ScheduledFuture<Void> getCollectMonitorScheduledFuture(long dttaskJobId);void doCollect(Set<Long> dttaskJobId);void stopCollect(Set<Long> dttaskJobId);
}

2.3.2 CommonCollectDataWorker

@Component
@Slf4j
public class CommonCollectDataWorker implements ICollectDataWorker {private ScheduledExecutorService collectDataExecutor= new InfiniteScheduledThreadPoolExecutor(10, new CustomThreadFactory("CommonCollectDataWorker-THREAD-"));private Map<Long, ScheduledFuture<Void>> monitorDttaskJobStateMap = new ConcurrentHashMap<>();public void addCollectTask(long dttaskJobId, Runnable runnable, int delay, int period, TimeUnit timeUnit) {ScheduledFuture<Void> scheduledFuture = (ScheduledFuture<Void>) collectDataExecutor.scheduleAtFixedRate(runnable, delay, period, timeUnit);monitorDttaskJobStateMap.put(dttaskJobId, scheduledFuture);}public synchronized void removeCollectMonitor(long dttaskJobId) {monitorDttaskJobStateMap.remove(dttaskJobId);}public ScheduledFuture<Void> getCollectMonitorScheduledFuture(long dttaskJobId) {return monitorDttaskJobStateMap.get(dttaskJobId);}@Overridepublic void doCollect(Set<Long> dttaskJobIds) {log.info("进入 CommonCollectDataWorker.doCollect, dttaskJobIds={}", dttaskJobIds);List<DttaskJob> dttaskJobs = BeanUseHelper.entityHelpService().queryDttaskJob(dttaskJobIds);for (DttaskJob dttaskJob : dttaskJobs) {ProtocolContext protocolContext = new ProtocolContext();protocolContext.setDttaskId(dttaskJob.getDttaskId());protocolContext.setDeviceId(dttaskJob.getDeviceId());protocolContext.setDttaskJobId(dttaskJob.getId());protocolContext.setLinkType(dttaskJob.getLinkType());JSONObject param = new JSONObject();param.put("linkSpec", JSON.parseObject(JSON.toJSONString(dttaskJob.getLinkSpec())));param.put("jobSpec", JSON.parseObject(JSON.toJSONString(dttaskJob.getJobSpec())));protocolContext.setParam(param);IProtocol protocol = ProtocolManager.getProtocol(protocolContext.getLinkType());protocol.start(protocolContext);}}@Overridepublic void stopCollect(Set<Long> dttaskJobIds) {log.info("进入 CommonCollectDataWorker.stopCollect, dttaskJobIds={}", dttaskJobIds);for (Long dttaskJobId : dttaskJobIds) {ScheduledFuture<?> scheduledFuture = getCollectMonitorScheduledFuture(dttaskJobId);scheduledFuture.cancel(true);removeCollectMonitor(dttaskJobId);}}
}

2.3.2 CollectDataService

@Component
@Slf4j
public class CollectDataService {@Autowiredprivate ICollectDataWorker collectDataWorker;private CollectDataService() {}/*** 开始采集数据*/public void startCollectData(Set<Long> dttaskJobId) {BeanUseHelper.entityHelpService().runDttaskJob(dttaskJobId, ServerInfo.getServerId());collectDataWorker.doCollect(dttaskJobId);}public void stopCollectData(Set<Long> dttaskJobId) {collectDataWorker.stopCollect(dttaskJobId);BeanUseHelper.entityHelpService().stopDttaskJob(dttaskJobId, ServerInfo.getServerId());}
}

根据业务需要使用CollectDataService,它作为采集数据的业务管理类,承接上层业务目的,然后组织实现逻辑后交给ICollectDataWorker的实现去完成。

ICollectDataWorker的实现对接Protocol的实现,组合Protocol需要的参数,调用具体的Protocol完成特定协议采集数据的功能

3. 验证

代码地址在:GitHub - swsm/dttask: 分布式插件化任务执行框架 ,欢迎Star❤️

3.1 准备数据

数据库脚本 (这里job使用的protocol都是Virtual,方便大家测试)

INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (1, 1, 1, -1, '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:37', '2023-12-12 16:35:37');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (2, 2, 2, -1, '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:40', '2023-12-12 16:35:40');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (3, 3, 3, -1, '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:44', '2023-12-12 16:35:44');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (4, 4, 4, -1, '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:45', '2023-12-12 16:35:45');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (5, 5, 5, -1, '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"mode\": \"0\", \"parity\": \"None\", \"unitId\": 1, \"baudRate\": 9600, \"databits\": 8, \"deviceId\": 1, \"portName\": \"COM4\", \"stopbits\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:45', '2023-12-12 16:35:45');
INSERT INTO `t_job` (`id`, `device_id`, `device_link_id`, `link_type`, `link_spec`, `job_spec`, `remark`, `delete_flag`, `created_at`, `updated_at`) VALUES (6, 6, 6, -1, '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', '{\"ip\": \"127.0.0.1\", \"mode\": \"1\", \"port\": 1234, \"slaveId\": 1, \"pointDetail\": [{\"key\": \"1\", \"offset\": 1, \"numOfRegisters\": 2, \"samplingInterval\": 1000}, {\"key\": \"2\", \"offset\": 2, \"numOfRegisters\": 1, \"samplingInterval\": 2000}, {\"key\": \"3\", \"offset\": 3, \"numOfRegisters\": 3, \"samplingInterval\": 1000}, {\"key\": \"4\", \"offset\": 4, \"numOfRegisters\": 5, \"samplingInterval\": 3000}]}', NULL, 0, '2023-12-12 16:35:47', '2023-12-12 16:35:47');

3.2 启动三个节点

这里会看到3个节点启动完成,1号节点为Controller,2 3号节点为Follower,这是前面的逻辑,不清楚的可以看前面的文章。

然后Controller会将任务按照任务分配策略分配给所有节点(包括自己),然后每个节点执行对应的任务。

  • 数据库t_dttask_job表里有了对应每个节点的任务

  • 各节点每秒执行一次任务

因为CollectManager类已经封装了doCollect 和 stopCollect的方法,大家自行创建Controller,调用方法就可以实现对某个任务的停止和启动了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/230146.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Jenkins配置代理节点时遇到的坑和解决办法

需求&#xff1a;服务器太满了&#xff0c;需要找个比较空闲的机器分担一下&#xff0c;看上了同网络的某开会用的笔记本&#xff0c;把这个本本利用起来能跑一个算一个。 但配置起来并不容易&#xff0c;遇到的问题有些网上也几乎找不到答案。这里记录一下能救一个是一个&…

python处理数据内存不够,python处理数据安全吗

大家好&#xff0c;小编为大家解答python处理数据索引的常见方法的问题。很多人还不知道python处理数据内存不够&#xff0c;现在让我们一起来看看吧&#xff01; 学 目录 1.数据表的基本信息查看 2.查看数据表的大小 3.数据格式的查看 4、查看具体的数据分布 二、缺失值处理 …

sap table 获取 valuation class MBEW 查表获取

参考 https://www.tcodesearch.com/sap-tables/search?qvaluationclass

FastAPI访问/docs接口文档显示空白、js/css无法加载

如图&#xff1a; 原因是FastAPI的接口文档默认使用https://cdn.jsdelivr.net/npm/swagger-ui-dist5.9.0/swagger-ui.css 和https://cdn.jsdelivr.net/npm/swagger-ui-dist5.9.0/swagger-ui-bundle.js 来渲染页面&#xff0c;而这两个URL是外网的CDN&#xff0c;在国内响应超…

Text2SQL学习整理(二) WikiSQL数据集介绍

导语 上篇博客中&#xff0c;我们已经了解到Text2SQL任务的基本定义&#xff0c;本篇博客将对近年来该领域第一个大型数据集WikiSQL做简要介绍。 WikiSQL数据集概述 基本统计特性 WikiSQL数据集是一个多数据库、单表、单轮查询的Text-to-SQL数据集。它是Salesforce在2017年…

python之双链表

双链表简单讲解 双向链表&#xff08;doubly linked list&#xff09;是一种链式数据结构&#xff0c;它的每个节点包含两个指针&#xff0c;一个指向前一个节点&#xff0c;一个指向后一个节点。与单向链表相比&#xff0c;双向链表可以在任何位置进行插入和删除操作&#xf…

PDF转为图片

PDF转为图片 背景pdf展示目标效果 发展过程最终解决方案&#xff1a;python PDF转图片pdf2image注意&#xff1a;poppler 安装 背景 最近接了一项目&#xff0c;主要的需求就是本地的文联单位&#xff0c;需要做一个电子刊物阅览的网站&#xff0c;将民族的刊物发布到网站上供…

字节开源的netPoll多路复用器源码解析

字节开源的netPoll多路复用器源码解析 引言NetPollepoll API原生网络库实现netpoll 设计思路netpoll 对比 go net数据结构 源码解析多路复用池初始化Epoll相关API可读事件处理server启动accept 事件客户端连接初始化客户端连接建立 可读事件等待读取数据 可写事件处理客户端启动…

word增加引用-endnote使用

使用软件&#xff1a; web of science https://webofscience.clarivate.cn/wos/alldb/basic-search; Pub Med等数据库endnote20 链接: https://pan.baidu.com/s/1VQMEsgFY3kcpCNfIyqEjtQ?pwdy1mz 提取码: y1mz 复制这段内容后打开百度网盘手机App&#xff0c;操作更方便哦 --…

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解

信号与线性系统翻转课堂笔记4——连续LTI系统的微分方程模型与求解 The Flipped Classroom4 of Signals and Linear Systems 对应教材&#xff1a;《信号与线性系统分析&#xff08;第五版&#xff09;》高等教育出版社&#xff0c;吴大正著 一、要点 &#xff08;1&#x…

探索 Coinbase 二层链 Base 的潜力与风险

作者&#xff1a;lesleyfootprint.network 在不断变化的加密货币领域&#xff0c;Coinbase 已经确立了自己领先中心化交易所&#xff08;CEX&#xff09;的地位。然而&#xff0c;Coinbase 坚信去中心化是创造一个开放、全球范围内对每个人都可访问的加密经济的关键&#xff0…

python学习3

大家好&#xff0c;今天又来更新python学习篇了。本次的内容比较简单&#xff0c;时描述性统计代码&#xff0c;直接给出所有代码&#xff0c;如下&#xff1a; import pandas as pd from scipy.stats import fisher_exact from fuzzywuzzy import fuzz from fuzzywuzzy impor…

高性能计算HPC与统一存储

高性能计算&#xff08;HPC&#xff09;广泛应用于处理大量数据的复杂计算&#xff0c;提供更精确高效的计算结果&#xff0c;在石油勘探、基因分析、气象预测等领域&#xff0c;是企业科研机构进行研发的有效手段。为了分析复杂和大量的数据&#xff0c;存储方案需要响应更快&…

【兔子王赠书第12期】赠ChatGPT中文范例的自然语言处理入门书

文章目录 写在前面自然语言处理图书推荐图书简介编辑推荐 推荐理由粉丝福利写在后面 写在前面 小伙伴们好久不见吖&#xff0c;本期博主给大家推荐一本入门自然语言处理的经典图书&#xff0c;一起来看看吧~ 自然语言处理 自然语言处理&#xff08;Natural Language Process…

【面向对象】C++/python/java的多态比较

一、面向对象的主要特点 封装&#xff1a;封装是把数据和操作数据的方法绑定在一起&#xff0c;对数据的访问只能通过已定义的接口。这可以保护数据不被外部程序直接访问或修改&#xff0c;增强数据的安全性。继承&#xff1a;继承是一种联结类的层次模型&#xff0c;并且允许…

机器学习 | KNN算法

一、KNN算法核心思想和原理 1.1、怎么想出来的&#xff1f; 近朱者赤&#xff0c;近墨者黑&#xff01; 距离决定一切、民主集中制 1.2、基本原理 —— 分类 k个最近的邻居 民主集中制投票分类表决与加权分类表决 1.3、基本原理 —— 回归 计算未知点的值决策规则不同均值法与…

【UML】第5篇 UML中的视图和图

目录 一、视图和图 二、图的种类 2.1 结构图 2.2 行为图 图是UML中最重要的概念了&#xff0c;起码我是这么认为。 上篇关于低代码的文章&#xff0c;我也说了&#xff0c;未来也许AI编码&#xff0c;我们更重要的工作&#xff0c;是能够为业务进行建模&#xff0c;拆解&a…

mybatis plus 公共字段自动填充createBy updateBy

一、公共字段自动填充 需求&#xff1a;好多表公共的字段&#xff0c;赋值逻辑也相同&#xff0c;不用每次为其赋值&#xff0c;‘拦截器’统一赋值。 1. 在新增数据时&#xff0c;需要设置创建时间、创建人、修改时间、修改人等字段&#xff0c;在编辑数据时需要设置修改时间…

FL Studio21.2.2963水果音乐软件安装

FL Studio是功能强大的音乐制作解决方案&#xff0c;使用旨在为用户提供一个友好完整的音乐创建环境&#xff0c;让您能够轻松创建、管理、编辑、混合具有专业品质的音乐&#xff0c;一切的一切都集中在一个软件中&#xff0c;只要您想&#xff0c;只要您需要&#xff0c;它总能…

深兰科技入选财联社“2023科创好公司”榜单

12月13日&#xff0c;“2023科创好公司”评选榜单正式公布&#xff0c;深兰科技成功入选&#xff0c;获得该榜单中“新能源汽车及自动驾驶”赛道的“科创好公司”称号。 “科创好公司”榜评选是由财联社及《科创板日报》联合打造的一级市场投后服务体系中的重要活动项目&#x…