前言
最近在学习flink, 为了模仿一个持续的无界的数据源, 所以需要一个可以持续发送消息的socket服务端. 先上效果图
效果图
socket服务端可以持续的发送消息, flink端是一个统计单词出现总数的消费端,效果图如下
源代码
flink的消费端就不展示了, 需要引入一些依赖和版本, 此处只展示socket的服务端
import java.io.IOException;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Scanner;/*** @author <a href="mailto:liang.qin.work@foxmail.com">liang.qin</a>* @since 2024/7/19 21:50**/
public class ContinuousMessageServer {@SuppressWarnings("InfiniteLoopStatement")public static void main(String[] args) throws IOException {int port = 9879;try (ServerSocket serverSocket = new ServerSocket(port)) {System.out.println("服务器启动,监听端口:" + port);// 无限循环以接受多个客户端连接while (true) {Socket clientSocket = serverSocket.accept();System.out.println("客户端已连接");// 为每个客户端启动一个新的线程来处理发送消息new ClientHandler(clientSocket).start();}}}// 处理客户端连接的内部类static class ClientHandler extends Thread {private final Socket clientSocket;public ClientHandler(Socket clientSocket) {this.clientSocket = clientSocket;}@Overridepublic void run() {try (PrintWriter out = new PrintWriter(clientSocket.getOutputStream(), true);Scanner scanner = new Scanner(System.in)) {do {System.out.println();System.out.println("请输入消息:");String message = scanner.nextLine();out.println(message);System.out.println("成功发送消息:" + message);// 检查客户端是否已断开连接(可选)} while (!clientSocket.isClosed() && clientSocket.isConnected() && !clientSocket.isInputShutdown());} catch (IOException e) {e.printStackTrace();} finally {try {if (clientSocket != null && !clientSocket.isClosed()) {clientSocket.close();}} catch (IOException e) {e.printStackTrace();}}}}
}