研究一下python socket 实时通信,多对多,一对一,转发
C=>S 单独通信
server1
import socket
import threading# 在线客户端列表
online_clients = {}def broadcast(message, sender):"""向所有在线客户端广播消息,除了发送者"""for client_socket, addr in online_clients.items():if client_socket != sender:try:client_socket.send(message)except:# 客户端离线,从列表中移除online_clients.pop(client_socket)def handle_client(client_socket, addr):"""处理单个客户端的通信"""while True:try:data = client_socket.recv(1024)if not data:breakprint(f"从 {addr} 收到消息: {data.decode()}")broadcast(data, client_socket)except:break# 客户端离线,从列表中移除online_clients.pop(client_socket)client_socket.close()print(f"客户端 {addr} 已离线")# 启动服务器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 8000))
server_socket.listen(5)
print("服务器已启动,等待客户端连接...")while True:client_socket, addr = server_socket.accept()print(f"新客户端 {addr} 已连接")online_clients[client_socket] = addrthreading.Thread(target=handle_client, args=(client_socket, addr)).start()
client1
import socket# 创建 Socket 对象
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)# 连接服务器
server_address = ("localhost", 8000)
client_socket.connect(server_address)while True:message = input("输入消息: ")client_socket.sendall(message.encode())# 接收服务器响应data = client_socket.recv(1024)print(f"接收服务器响应收到消息: {data.decode()}")client_socket.close()
效果:
C<=>S 相互通信
server2
import socket
import select# 在线客户端列表
online_clients = {}def broadcast(message, sender):"""向所有在线客户端广播消息,除了发送者"""for client_socket, addr in online_clients.items():if client_socket != sender:try:client_socket.sendall(message)except:# 客户端离线,从列表中移除online_clients.pop(client_socket)def server_loop():server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.bind(("localhost", 8000))server_socket.listen(5)print("服务器已启动,等待客户端连接...")# 使用 select 监听 socket 列表inputs = [server_socket]while True:# 阻塞等待至少一个socket准备好进行I/O操作readable, writeable, exceptional = select.select(inputs, [], [])for sock in readable:if sock is server_socket:# 新的连接请求client_socket, addr = server_socket.accept()print(f"新客户端 {addr} 已连接")online_clients[client_socket] = addrinputs.append(client_socket)else:# 已连接客户端发来消息try:data = sock.recv(1024)if data:message = f"客户端 {online_clients[sock]}: {data.decode()}"print(message)broadcast(message.encode(), sock)else:# 客户端离线,从列表中移除online_clients.pop(sock)inputs.remove(sock)sock.close()print(f"客户端 {online_clients[sock]} 已离线")except:# 客户端异常,从列表中移除online_clients.pop(sock)inputs.remove(sock)sock.close()print(f"客户端 {online_clients[sock]} 已离线")if __name__ == "__main__":server_loop()
client2
import socket
import threadingdef send_message(client_socket):while True:message = input()client_socket.sendall(message.encode())def receive_message(client_socket):while True:try:data = client_socket.recv(1024)if data:print(data.decode())else:# 服务器断开连接breakexcept:breakif __name__ == "__main__":client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_address = ("localhost", 8000)client_socket.connect(server_address)print("已连接到服务器")threading.Thread(target=send_message, args=(client_socket,)).start()threading.Thread(target=receive_message, args=(client_socket,)).start()
效果:
C<->S 固定端口一一对应
server3
import socket
import select
from collections import defaultdict# 客户端端口号映射
client_ports = {50106: "A",50107: "B",50108: "C"
}# 允许通信的端口号对
allowed_ports = {50106: [50108],50107: [50109],50108: [50106]
}# 在线客户端列表,{端口号: socket}
online_clients = {}
# 消息队列,{端口号: [(发送者端口号, 消息)]}
message_queues = defaultdict(list)def send_message(sender_port, target_port, message):"""向指定端口号的客户端发送消息"""target_socket = online_clients.get(target_port)if target_socket:try:target_socket.sendall(f"{sender_port}: {message}".encode())except:# 接收方离线,将消息加入队列message_queues[target_port].append((sender_port, message))else:# 目标端口号不存在,将消息加入队列message_queues[target_port].append((sender_port, message))def server_loop():server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_socket.bind(("localhost", 8000))server_socket.listen(5)print("服务器已启动,等待客户端连接...")# 使用 select 监听 socket 列表inputs = [server_socket]while True:# 阻塞等待至少一个socket准备好进行I/O操作readable, writeable, exceptional = select.select(inputs, [], [])for sock in readable:if sock is server_socket: # 新的连接请求client_socket, addr = server_socket.accept()print(f"新客户端 {addr} 已连接")# client_port = client_socket.getsockname()[1]client_port = addr[1] online_clients[client_port] = client_socketinputs.append(client_socket)# 将离线期间的消息队列发送给新上线客户端for sender_port, message in message_queues.pop(client_port, []):client_socket.sendall(f"{sender_port}: {message}".encode())else:# 已连接客户端发来消息try:data = sock.recv(1024)if data:print(f"接收到消息: {data} ")# 解析消息,格式为 "目标端口号:消息内容"parts = data.decode().split(':') # 确保拆分后的部分数量正确 if len(parts) == 3: # 转换第一个部分为整数,作为sender_port sender_port = int(parts[0])# 转换第二个部分为整数,作为target_port target_port = int(parts[1]) # 第三个部分作为messagemessage = parts[2] # 检查是否允许通信if target_port in allowed_ports.get(sender_port, []):send_message(sender_port, target_port, message)else:print(f"客户端 {client_ports.get(sender_port, sender_port)} 试图与未授权端口 {target_port} 通信")else:# 客户端离线,从列表中移除client_port = sock.getsockname()[1]online_clients.pop(client_port)inputs.remove(sock)sock.close()print(f"客户端 {client_ports.get(client_port, client_port)} 已离线")except: try: # 获取远程客户端的地址和端口号 client_address, client_port = sock.getpeername() print(f"客户端 {client_address}:{client_port} 报错离线离线")# 根据需要处理client_port,例如从列表中移除 online_clients.pop(client_port, None) # 使用pop的第二个参数避免KeyError inputs.remove(sock) sock.close() print(f"客户端 {client_address}:{client_port} 已离线")except Exception as e: # 处理可能出现的异常,例如socket已经关闭等 print(f"客户端强制离线Error handling client socket: {e}") if __name__ == "__main__":server_loop()
client3-1
import socket
import threadingdef send_message(client_socket,sender_port, target_port):while True:message = input()if message.strip() == "exit":breakclient_socket.sendall(f"{sender_port}:{target_port}:{message}".encode())def receive_message(client_socket):while True:try:data = client_socket.recv(1024)if data:print(data.decode())else:# 服务器断开连接breakexcept:breakif __name__ == "__main__":client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_address = ("localhost", 8000)sender_port = 50106client_socket.bind(("localhost", sender_port)) # 绑定固定端口号 50108client_socket.connect(server_address)client_port = client_socket.getsockname()[1]print(f"已连接到服务器,本地端口号为 {client_port}")# 客户端 A 只能与端口号 50106 的客户端 C通信target_port = 50108sender_thread = threading.Thread(target=send_message, args=(client_socket, sender_port,target_port))receiver_thread = threading.Thread(target=receive_message, args=(client_socket,))sender_thread.start()receiver_thread.start()sender_thread.join()client_socket.close()
client3-2
import socket
import threadingdef send_message(client_socket,sender_port, target_port):while True:message = input()if message.strip() == "exit":breakclient_socket.sendall(f"{sender_port}:{target_port}:{message}".encode())def receive_message(client_socket):while True:try:data = client_socket.recv(1024)if data:print(data.decode())else:# 服务器断开连接breakexcept:breakif __name__ == "__main__":client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_address = ("localhost", 8000)sender_port = 50107client_socket.bind(("localhost", sender_port)) # 绑定固定端口号 50108client_socket.connect(server_address)client_port = client_socket.getsockname()[1]print(f"已连接到服务器,本地端口号为 {client_port}")# 客户端 A 只能与端口号 50106 的客户端 C通信target_port = 50109sender_thread = threading.Thread(target=send_message, args=(client_socket, sender_port,target_port))receiver_thread = threading.Thread(target=receive_message, args=(client_socket,))sender_thread.start()receiver_thread.start()sender_thread.join()client_socket.close()
client3-3
import socket
import threadingdef send_message(client_socket,sender_port, target_port):while True:message = input()if message.strip() == "exit":breakclient_socket.sendall(f"{sender_port}:{target_port}:{message}".encode())def receive_message(client_socket):while True:try:data = client_socket.recv(1024)if data:print(data.decode())else:# 服务器断开连接breakexcept:breakif __name__ == "__main__":client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server_address = ("localhost", 8000)sender_port = 50108client_socket.bind(("localhost", sender_port)) # 绑定固定端口号 50108client_socket.connect(server_address)client_port = client_socket.getsockname()[1]print(f"已连接到服务器,本地端口号为 {client_port}")# 客户端 C 只能与端口号 50106 的客户端 A 通信target_port = 50106sender_thread = threading.Thread(target=send_message, args=(client_socket, sender_port,target_port))receiver_thread = threading.Thread(target=receive_message, args=(client_socket,))sender_thread.start()receiver_thread.start()sender_thread.join()client_socket.close()
C<-S-S->C 固定端口一一对应
local
import socket
import threading# TCP服务器地址和端口
SERVER1_ADDRESS = ('localhost', 8001)
SERVER2_ADDRESS = ('localhost', 8002)# 用于从一个socket接收数据并发送到另一个socket的函数
def forward_data(source_socket, dest_socket):while True:try:data = source_socket.recv(1024)if data:dest_socket.sendall(data)except:break# 连接到8001端口的函数
def connect_to_server1():while True:try:server1_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server1_socket.connect(SERVER1_ADDRESS)print("已连接到8001端口")return server1_socketexcept:print("连接8001端口失败,正在重试...")continue# 连接到8002端口的函数
def connect_to_server2():while True:try:server2_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)server2_socket.connect(SERVER2_ADDRESS)print("已连接到8002端口")return server2_socketexcept:print("连接8002端口失败,正在重试...")continue# 连接到8001端口
server1_socket = connect_to_server1()# 连接到8002端口
server2_socket = connect_to_server2()# 创建两个线程,分别用于数据转发
thread1 = threading.Thread(target=forward_data, args=(server1_socket, server2_socket))
thread2 = threading.Thread(target=forward_data, args=(server2_socket, server1_socket))# 启动线程
thread1.start()
thread2.start()# 等待线程结束
thread1.join()
thread2.join()
server
import socket
import threading
from collections import defaultdict# 存储每个端口连接的客户端,使用defaultdict初始化为列表
clients = defaultdict(list)def handle_client(client_socket, client_address, port):while True:try:data = client_socket.recv(1024)if data:# 将数据转发给另一个端口的所有客户端other_port = 8000 if port == 8001 else 8001for other_client in clients[other_port]:other_client.send(data)else:# 如果没有数据,关闭连接breakexcept Exception as e:print(f"Error handling data from {client_address}: {e}")breaktry:# 从客户端列表中移除断开连接的客户端clients[port].remove(client_socket)client_socket.close()print(f"Closed connection to {client_address}")except Exception as e:print(f"Closed connection to {client_address}:{port} => {e}")def accept_connections(port):with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('', port))server_socket.listen()print(f"Listening on port {port}")while True:try:client_socket, client_address = server_socket.accept()print(f"Accepted connection from {client_address} on port {port}")clients[port].append(client_socket)# 创建一个新线程来处理客户端threading.Thread(target=handle_client, args=(client_socket, client_address, port)).start()except RuntimeError as e:print(f"创建一个新线程来处理客户端 {port} => {e}")# 创建并启动两个线程,分别监听8000和8001端口
thread1 = threading.Thread(target=accept_connections, args=(8000,))
thread2 = threading.Thread(target=accept_connections, args=(8001,)) thread1.start()
thread2.start()thread1.join()
thread2.join()
local_server
import socket
import threading# 创建一个TCP/IP socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)# 绑定IP地址和端口
server_address = ('', 8002)
server_socket.bind(server_address)# 开始监听连接
server_socket.listen(5)
print("等待连接[8002]...")# 客户端列表
clients = []# 处理客户端连接的函数def handle_client(client_socket, address):print(f"已连接 {address}")clients.append(client_socket)while True:try:# 接收客户端消息data = client_socket.recv(1024)if not data:break# 打印接收到的消息print(f"来自 {address} 的消息: {data.decode()}")except:print(f"与 {address} 的连接已断开")clients.remove(client_socket)client_socket.close()breaktry:# 等待连接并创建新的线程来处理每个连接while True:client_socket, address = server_socket.accept()client_handler = threading.Thread(target=handle_client, args=(client_socket, address))client_handler.start()# 控制台输入消息并发送给所有客户端while True:message = input("请输入消息: ")for client in clients:client.sendall(message.encode())
except KeyboardInterrupt:print("收到 Ctrl+C,取消端口绑定...")server_socket.close()
bot
import socket
import threadingdef receive_data(sock):while True:try:data = sock.recv(1024)if not data:breakprint(f"Received: {data.decode('utf-8')}")except Exception as e:print(f"Error receiving data: {e}")breakdef main():with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client_socket:try:# 连接到服务器client_socket.connect(('127.0.0.1', 8000))print("Connected to server on port 8000.")# 创建一个线程用于接收数据threading.Thread(target=receive_data, args=(client_socket,)).start()while True:# 从控制台读取数据message = input()if message:client_socket.sendall(message.encode('utf-8'))except Exception as e:print(f"Connection error: {e}")if __name__ == "__main__":main()
效果: