python工具-udp-tcp-client-server-demo
- server
- tcp-server: python xxx.py -type tcp -ip “127.0.0.1” -port 1234
- udp-server: python xxx.py -type udp -ip “127.0.0.1” -port 1234
- client
- python xxx.py -type udp -ip “127.0.0.1” -port 1111
- python xxx.py -type tcp -ip “127.0.0.1” -port 1111
server-demo
import datetime
import socket
import select
import argparsedef udp_listen(local_ip, local_port):buffer_size = 1024bytes_to_send = str.encode("Hello UDP Client")# Create a datagram socketwith socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) as sock:# Bind to address and ipsock.bind((local_ip, local_port))print(f"UDP server up and listening, {local_ip}:{local_port}")# Listen for incoming datagramswhile(True):recv_data = sock.recvfrom(buffer_size)message = recv_data[0]address = recv_data[1]print("==========================")print(datetime.datetime.now()) print(f"Message from Client:{message}")print(f"Client IP Address:{address}")# Sending a reply to clientsock.sendto(bytes_to_send, address)def tcp_listen(ip, port):# create socketwith socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)sock.bind((ip, port))print("[+] Listening on {0}:{1}".format(ip, port))sock.listen(5) # permit to accesssock.setblocking(False)poll = select.poll()poll.register(sock, select.POLLIN) # 注册 listen socket 到 pull 中fd_to_socket = {}try:while True:ready = poll.poll()if not ready:breakfor fd, _ in ready:if fd == sock.fileno():sub_sock, client_address = sock.accept()print("==========================")print("Accepted from", client_address)fd_to_socket[sub_sock.fileno()] = sub_sockpoll.register(sub_sock, select.POLLIN) # 注册 accept socket 到 pull 中else:sub_sock = fd_to_socket[fd]raw_data = sub_sock.recv(1024)if not raw_data:print("Closed connection to", sub_sock.getpeername())poll.unregister(sub_sock)sub_sock.close()breaksub_sock.sendall(f"hello {client_address}".encode())print("Reveived {} from {}".format(raw_data.decode(), sub_sock.getpeername()))except KeyboardInterrupt:print("EXIT")except Exception as e:print(e)poll.unregister(sock)# python xxx.py -type udp -ip "127.0.0.1" -port 1234
# python xxx.py -type tcp -ip "127.0.0.1" -port 1234
if __name__ == "__main__":parser = argparse.ArgumentParser()parser.description='please enter correct para'parser.add_argument("-type", "--type", help="tcp or udp", type=str, default="udp")parser.add_argument("-ip", "--ip", help="listen ip", type=str, default="127.0.0.1")parser.add_argument("-port", "--port", help="listen port", type=int, default=1111)try:args = parser.parse_args()if args.type == "udp":udp_listen(args.ip, args.port)elif args.type == "tcp":tcp_listen(args.ip, args.port)except Exception as e:print(e)
client-demo
import socket
import datetime
import argparsedef udp_send(server_ip, server_port, bytes_to_send):server_address = (server_ip, server_port)recv_buf_size = 1024# Create a UDP socket at client sideclient = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)client.settimeout(3)# Send to server using created UDP sockettry:print(datetime.datetime.now())print(f"send {bytes_to_send} to {server_ip}:{server_port}")# 可以通过 udp_client.bind 绑定本地发送的ip和portclient.sendto(bytes_to_send, server_address)print(f"local_addr: {client.getsockname()}")recv_data = client.recvfrom(recv_buf_size)print(f"message from server {recv_data[0]}")except Exception as e:print(f"connect {server_address} fail. reasion:{e}")def tcp_send(server_ip, server_port, bytes_to_send):server_address = (server_ip, server_port)recv_buf_size = 1024# Create a UDP socket at client sideclient = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)client.settimeout(3)# Send to server using created UDP sockettry:print(datetime.datetime.now())print(f"send {bytes_to_send} to {server_ip}:{server_port}")# 可以通过 udp_client.bind 绑定本地发送的ip和portclient.connect(server_address)client.sendall(bytes_to_send)print(f"local_addr: {client.getsockname()}")recv_data = client.recv(recv_buf_size)print(f"message from server {recv_data}")except Exception as e:print(f"connect {server_address} fail. reasion:{e}")# python xxx.py -type udp -ip "127.0.0.1" -port 1111
# python xxx.py -type tcp -ip "127.0.0.1" -port 1111
if __name__ == "__main__":parser = argparse.ArgumentParser()parser.description='please enter correct para'parser.add_argument("-type", "--type", help="tcp or udp", type=str, default="udp")parser.add_argument("-ip", "--ip", help="listen ip", type=str, default="127.0.0.1")parser.add_argument("-port", "--port", help="listen port", type=int, default=1111)try:args = parser.parse_args()if args.type == "udp":udp_send(args.ip, args.port, "hello server".encode())elif args.type == "tcp":tcp_send(args.ip, args.port, "hello server".encode())except Exception as e:print(e)