# 服务端
import queue
import re
import socket
import time
import tkinter as tk
from threading import Threadclass ChatService():def __init__(self):# 聊天框左上角名称self.title = f"马里奇聊天室"self.tk = tk.Tk(className=self.title)# 隐藏窗口# self.tk.withdraw()# 设置Tkinter窗口始终位于其他窗口的顶部# self.tk.attributes("-topmost", 1)"""固定窗口大小800x600:表示窗口的宽度为800像素,高度为600像素。+500:表示窗口在X轴上距离屏幕左侧500像素的位置。+200:表示窗口在Y轴上距离屏幕顶部200像素的位置。"""# self.tk.resizable(width=False, height=False)self.tk.geometry("800x600+500+200") # 大小,位置# 基础信息ip_label = tk.Label(text=f"我的ip:{local_ip}")ip_label.grid(row=0, column=0, sticky=tk.W, padx=10, pady=10)# 标签top_label = tk.Label(text="聊天内容:")top_label.grid(row=1, column=0, sticky=tk.W, padx=10, pady=10)# send message,消息框self.text_message = tk.Text(height=20, width=60)# 配置标签用于右对齐self.text_message.tag_configure("right", justify='right')# 配置标签用于居中对齐self.text_message.tag_configure("left", justify='left')self.text_message.grid(row=2, column=0, padx=10, pady=10)center_label = tk.Label(text="发送内容:")center_label.grid(row=3, column=0, sticky=tk.W, padx=10, pady=10)# input message,消息框self.input_message = tk.Text(height=10, width=60)self.input_message.grid(row=4, column=0, padx=10, pady=10)# 按钮,发送消息self.bt1 = tk.Button(text='发送', command=self.retrieve_input) # 实例化self.bt1.grid(row=4, column=1, sticky='ws', padx=10, pady=10)# 绑定回车键到 retrieve_input 函数# self.tk.bind('<Return>', self.retrieve_input) # <Return> 是回车键的事件名称# self.input_message.focus_set() # 设置焦点到 text 小部件,使其可以接收键盘输入# 对端基础信息,Frame是块概念,块内再加东西,挪位置,gridf1 = tk.Frame()dip_label = tk.Label(f1, text=f"填入对端ip:")dip_label.grid(row=0, column=0)# 接收对端ipself.enstr1 = tk.StringVar()self.one_entry = tk.Entry(f1, textvariable=self.enstr1, highlightcolor="Fuchsia", highlightthickness=1, width=20)self.one_entry.grid(row=0, column=1)f1.grid(row=0, column=1, sticky=tk.W, padx=10, pady=10)# 错误提示信息error_label = tk.Label(text="错误信息:")error_label.grid(row=1, column=1, sticky=tk.W, padx=10, pady=10)self.error_message = tk.Text(height=20, width=45)self.error_message.grid(row=2, column=1, sticky=tk.W, padx=10, pady=10)# 开始线程self.start_udp_recv_thread()# 启动self.tk.mainloop()def retrieve_input(self):"""1. 获取输入的文本内容2. 将内容添加到内容框3. udp发送消息4. 清空输入框:return:"""# 1. 获取输入的内容, "1.0" 表示从第一行第一个字符开始获取,tk.END 表示获取到文本框的最后。input_value = self.input_message.get("1.0", tk.END)input_value = input_value.strip()print(input_value.strip()) # 打印或处理输入的内容if not input_value:self.append_error_message("不能发送空白消息!")return# 3. 发送udp消息send_state = self.udp_send(input_value)if send_state:# 2. append_text_messageself.append_text_message(input_value)# 4. 清空文本框内容self.input_message.delete("1.0", tk.END)def append_error_message(self, message):"""添加错误信息你:return:"""self.error_message.config(state=tk.NORMAL) # 临时设置为可编辑状态self.error_message.insert(tk.END, message + "\n" + "\n") # 添加消息self.error_message.config(state=tk.DISABLED) # 再次设置为不可编辑状态self.error_message.see(tk.END) # 滚动到最新消息def append_text_message(self, input_value, direction='right'):# 2. 将内容添加到内容框now_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))self.text_message.config(state=tk.NORMAL) # 临时设置为可编辑状态self.text_message.insert(tk.END, now_time + "\n", direction)self.text_message.insert(tk.END, input_value + "\n" + "\n", direction) # 添加消息self.text_message.config(state=tk.DISABLED) # 再次设置为不可编辑状态self.text_message.see(tk.END) # 滚动到最新消息def after_append_text_message(self):while not my_queue.empty():message = my_queue.get()self.flash_icon()self.append_text_message(message, direction='left')self.tk.after(100, self.after_append_text_message)def start_udp_recv_thread(self):print("开始启动线程")# 多线程接收消息t = Thread(target=udp_recv)t.daemon = Truet.start()self.tk.after(100, self.after_append_text_message)print("结束启动线程")def udp_send(self, message):"""发送udp消息:param message::return:"""host = self.enstr1.get()if not host or not is_str_ip_v4(host):self.append_error_message("请输入正确的接收方ip地址!")return Falsewith socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.sendto(message.encode(), (host, port))print("发送成功")return Truedef flash_icon(self):"""图标闪动:return:"""self.tk.bell() # 发出系统警告声,可能会使图标闪烁self.tk.focus_force() # 强制窗口获得焦点def is_str_ip_v4(ip_str):"""判断字符串是不是ipv4"""if ip_str is None:return Falsev4_regex = "^(1\d{2}|2[0-4]\d|25[0-5]|[1-9]?\d)" \"(\.(1\d{2}|2[0-4]\d|25[0-5]|[1-9]?\d)){3}$"return re.match(v4_regex, ip_str) is not Nonedef udp_recv():"""接收udp消息:return:"""with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:s.bind((local_ip, port))while True:data, addr = s.recvfrom(1024) # 接收数据和客户端地址print(f"Received message: {data.decode()} from {addr}")my_queue.put(data.decode())def get_local_ip():try:# 创建一个 socket 对象s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)# 不必真的连接,只是用来获取本机IP地址s.connect(("8.8.8.8", 80))ip = s.getsockname()[0]s.close()return ipexcept Exception as e:print("无法获取本机IP地址:", e)return Noneif __name__ == '__main__':local_ip = get_local_ip()port = 51314my_queue = queue.Queue()ChatService()