背景:分布式部署 一个主节点往各个节点下发任务(调用第三方服务),目的是为了测试各节点与第三方的连通性
思路:
主节点实现
创建Spring Boot项目:作为主节点的后端服务。
集成Eureka客户端:在主节点的pom.xml中添加Eureka客户端依赖,并在配置文件中配置Eureka服务器地址。
服务发现逻辑:编写服务来获取Eureka中的子节点列表,可以通过Eureka的REST API或者直接使用Spring Cloud提供的DiscoveryClient。
任务下发:接收外部请求,解析参数(外部系统接口名、IP、端口)。遍历从Eureka获取的子节点列表,对每个子节点发起HTTP请求,传递上述参数。
结果收集:收集各子节点的响应,组织成最终的结果列表。
子节点实现
创建Spring Boot项目:作为子节点的后端服务,并注册到Eureka服务器。
处理主节点请求:定义一个API来接收主节点的HTTP请求,提取外部系统接口名、IP、端口参数。
Socket调用外部接口:根据接收到的参数,使用Java的Socket编程建立到外部系统的连接,发送请求并等待响应。记得设置超时时间为2秒。
响应主节点:根据Socket调用的结果,构建响应信息(成功/失败及原因、外部接口名、IP、端口),并通过HTTP响应返回给主节点。
注意事项
错误处理:确保在每个关键步骤都有恰当的错误处理逻辑,比如网络请求失败、超时、Socket异常等。
日志记录:在主节点和子节点上都应记录详细的日志,便于问题追踪。
安全性:考虑对HTTP请求和Socket通信进行适当的安全加固,如使用HTTPS、身份验证等
废话不多说,直接上代码:新建一个简单的 springboot 项目,引入Eureka 等所需简单启动依赖
server 端(主节点)
参数说明:
externalApiName 策略名
ip 第三方 ip
port 第三方端口号
controller(表单方式接收数据)
@RestController
public class TaskDistributorController {@Autowiredprivate TaskManagerService taskManagerService;@PostMapping("/distributeTask")public ResponseEntity<List<ResultDto>> distributeTask(TaskRequest taskRequest) {try {// 调用服务层逻辑处理任务分配List<ResultDto> results = taskManagerService.distributeTasks(taskRequest);return ResponseEntity.ok(results);} catch (Exception e) {return ResponseEntity.status(500).body(null);}}}
service
@Service
@Slf4j
public class TaskManagerService {@Autowiredprivate DiscoveryClient discoveryClient;public List<ResultDto> distributeTasks(TaskRequest taskRequest) {// 获取子节点列表List<ServiceInstance> instances = discoveryClient.getInstances("deployment-test-tool-node");String externalApiName = taskRequest.getExternalApiName();List<ResultDto> results = new ArrayList<>();for (ServiceInstance instance : instances) {String workerUrl = "http://" + instance.getHost() + ":" + instance.getPort() + "/executeTask";// 构建请求参数MultiValueMap<String, String> params = new LinkedMultiValueMap<>();params.put("externalApiName", Collections.singletonList(externalApiName));params.put("ip", Collections.singletonList(taskRequest.getIp()));params.put("port", Collections.singletonList(String.valueOf(taskRequest.getPort())));HttpHeaders headers = new HttpHeaders();headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);HttpEntity<MultiValueMap<String, String>> requestEntity = new HttpEntity<>(params, headers);RestTemplate restTemplate = new RestTemplate();ResponseEntity<ResultDto> response = restTemplate.postForEntity(workerUrl, requestEntity, ResultDto.class);log.info("Response from worker: " + response.getBody());response.getBody().setIp(instance.getHost());response.getBody().setPort(String.valueOf(instance.getPort()));response.getBody().setExternalApiName(externalApiName);response.getBody().setExternalIpPort(taskRequest.getIp()+":"+taskRequest.getPort());results.add(response.getBody());}return results;}}
node子节点(以 socket 方式发送请求)
controller
@RestController
public class WorkerController {@Autowiredprivate ExternalSystemClient externalSystemClient;@PostMapping("/executeTask")public ResultDto executeTask(TaskRequest request) {return externalSystemClient.callExternalApi(request.getExternalApiName(), request.getIp(), request.getPort());}
}
service
@Service
public class ExternalSystemClient {public ResultDto callExternalApi(String apiName, String ip, int port) {Socket socket = null;BufferedReader in = null;PrintWriter out = null;try {socket = new Socket();SocketAddress address = new InetSocketAddress(ip, port);socket.connect(address, 2000); // 这里的2000是创建链接超时时间,单位为毫秒socket.setSoTimeout(2000); // 设置超时时间2秒in = new BufferedReader(new InputStreamReader(socket.getInputStream()));out = new PrintWriter(socket.getOutputStream(), true);// 发送请求out.println(apiName);// 读取响应String response = in != null ? in.readLine() : "";if (response != null) {return new ResultDto("success", "连接成功");} else {return new ResultDto("failed", "未收到响应");}} catch (Exception e) {return new ResultDto("failed", e.getMessage());} finally {// 关闭资源if (in != null) try { in.close(); } catch (Exception ignored) {}if (out != null) try { out.close(); } catch (Exception ignored) {}if (socket != null) try { socket.close(); } catch (Exception ignored) {}}}
}
模拟第三方以socket方式接收
socketserver
@Component
@Slf4j
public class SocketServer {@Value("${server.port}")private int port;@PostConstructpublic void startServer() {try (ServerSocket serverSocket = new ServerSocket(port)) {log.info("Socket服务已启动,等待连接...");while (true) {try (Socket clientSocket = serverSocket.accept();PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true)) {out.println("连接成功,测试联通性良好!");} catch (IOException e) {log.error("处理客户端连接时发生错误", e);}}} catch (IOException e) {log.error("启动Socket服务器时发生错误", e);}}
}