支持持续接收数、可发送数据、可多端口连接。
废话少说,直接上代码!
如果写的可以,记得点个赞~
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.ObjectUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ArrayBlockingQueue;@Slf4j
@AllArgsConstructor
public class TcpIpService {// 保存消息数据的容器,会单独开一个线程进行监听里面是否有数据然后进行业务处理//(我设置的最大存放个数,视需求而定)public static ArrayBlockingQueue<HashMap> messageQueue = new ArrayBlockingQueue<HashMap>(10);/*** 开始加载监听Tcp/Ip端口* @throws IOException*/public static void ServerSocketD(int portNumber) throws IOException {MQMonitor mqMonitor = new MQMonitor();mqMonitor.start();InterlinkageMonitorServer thread = new InterlinkageMonitorServer(portNumber);thread.start();}/*** 发送消息* @param data* @return* @throws IOException*/public static ReturnDataState giveOrder(String data) throws IOException, InterruptedException {//向客户端发送消息try {log.info("发:"+data);ServerThread.outputStream = ServerThread.socket.getOutputStream();ServerThread.outputStream.write(data.getBytes("GBK"));} catch (IOException e) {e.printStackTrace();}return new ReturnDataState(0, "");}
}/*** 监听用户链接*/
@Slf4j
class InterlinkageMonitorServer extends Thread {//监听端口private static int PORT = 0;public static Socket socket;/*** @param portNumber:端口号*/public InterlinkageMonitorServer(int portNumber){this.PORT = portNumber;}@SneakyThrowspublic void run(){log.info("TcpIp消息:>>>>>>>>>>>>>>> 开始监听用户链接 <<<<<<<<<<<<<");ServerSocket serverSocket = null;try {//建立服务器的 Socket,并设定一个监听的端口 PORTif (PORT<1024){log.error("TcpIp消息:监听的端口数值不能为0或小于1024");}else if (PORT > 65535){log.error("TcpIp消息:监听的端口数值不能大于65535");}serverSocket = new ServerSocket(PORT);//由于需要进行循环监听,因此获取消息的操作应放在一个 while 大循环中while(true){try {//建立跟客户端的连接socket = serverSocket.accept();} catch (Exception e) {log.info("TcpIp消息:建立与客户端的连接出现异常");e.printStackTrace();}if (socket != null){log.info("TcpIp消息:>>>>>>>>>>>>>>> 有客户端链接,开启新线程 <<<<<<<<<<<<<");//注:视需求做,我们的需求是就没什么人链接,就新开了一个线程做处理MessageServerThread thread = new MessageServerThread(socket);thread.start();socket = null;}// 不能让他跑的太快,需要在一定程度上让他跑慢点(视需求而定)Thread.sleep(1000);}} catch (Exception e) {e.printStackTrace();} finally {serverSocket.close();}}
}/*** 每一个链接上的,都会单独进行监听消息*/
@Slf4j
class MessageServerThread extends Thread {public static Socket socket ;InputStream inputStream;public MessageServerThread(Socket socket){this.socket=socket;}public void run(){try {//接收客户端的消息并打印inputStream = socket.getInputStream();byte[] bytes = new byte[5120];while (inputStream.read(bytes) != -1){//解决乱码的问题String string = new String(bytes, "GB2312");//解决 byte 数组为空或者填不满的问题HashMap originalData = JSON.parseObject(string.trim(), HashMap.class);log.info("收:"+originalData.toString());// 消息存入produce(originalData);//推送后清空数组bytes = new byte[5120];}} catch (Exception e) {log.error("WebSocket消息:客户端的主动断开连接了,关闭线程");}//操作结束,关闭sockettry{socket.close();}catch(IOException e){ log.error("WebSocket消息:关闭连接出现异常"); }}// 生产消息public static void produce(HashMap msg) {if (Server.messageQueue.offer(msg)) {log.info("MQ消息:成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());} else {log.info("MQ消息:消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}log.info("=======================");}
}/*** 收到数据后的业务处理*/
@Slf4j
class MQMonitor extends Thread {public MQMonitor() {}@SneakyThrowspublic void run(){while (true) {// 取出收到的值HashMap<String, Object> consume = consume();while (!ObjectUtils.isEmpty(consume)) {log.info("MQ消息:开始消化数据");// 业务内容。。。。。break;}// 不为空时会快速对数据进行处理if (TcpIpService.messageQueue.size() == 0)Thread.sleep( 1000 );}}// 消费消息public static HashMap consume() {HashMap msg = Server.messageQueue.poll();if (msg != null) {// 消费条件满足情况,从消息容器中取出一条消息log.info("MQ消息:已经消费消息:" + msg + ",当前暂存的消息数量是:" + Server.messageQueue.size());} else {log.info("MQ消息:消息处理中心内没有消息可供消费!");}log.info("=======================");return msg;}
}