一、代码实现
import logging
import os
import threading
import timefrom watchdog.events import FileSystemEventHandler
from watchdog.observers import Observerlogger = logging.getLogger(__name__)class LogWatcher(FileSystemEventHandler):def __init__(self, log_file, on_modified_callback=None):""""初始化 LogWatcher 类的实例。参数:- log_file:日志文件的路径- on_modified_callback:可选的回调函数,在文件修改时调用属性:- log_file:日志文件的路径- file_object:日志文件对象- on_modified_callback:文件修改回调函数- last_line:最后一行文本- observer:观察者对象- match_string:需要匹配的字符串- stop_watching:停止监视的标志"""self.log_file = log_fileself.file_object = open(log_file, 'rb')self.on_modified_callback = on_modified_callbackself.last_line = self.get_last_line() # 初始化时获取最后一行文本self.observer = Observer()self.observer.schedule(self, ".", recursive=False)self.match_string = Noneself.stop_watching = Falsedef start(self):"""启动观察者对象,开始监视文件变化。"""self.observer.start()def stop(self):"""停止观察者对象,结束监视文件变化。"""self.observer.stop()self.observer.join()self.file_object.close()def get_last_line(self):"""获取日志文件的最后一行文本。它通过将文件指针移动到文件末尾,然后逐个字符向前搜索,直到找到换行符为止。返回值:- 最后一行文本,如果文件为空则返回None"""# 将文件指针移动到文件末尾self.file_object.seek(0, os.SEEK_END)# 获取当前文件指针的位置(此时指针在最后一行的末尾)position = self.file_object.tell()try:# 尝试向前移动两个字节new_position = max(position - 2, 0)self.file_object.seek(new_position, os.SEEK_SET)except OSError as e:# 如果发生错误,可能是文件太小,返回Nonereturn None# 逐个字符向前搜索,确保文件指针最终停在当前行的第一个字符处while True:# read(1)读取的是指针位置的下一个字符,每次调用read(1)都会读取一个字符,并将指针向后移动一个字符的位置。char = self.file_object.read(1).decode('utf-8', errors='ignore')if char == '\n':breakif new_position == 0:# 如果已经到达文件开头,跳出循环break# 尝试向前移动一个字节位置,确保不越界到文件开头new_position = max(new_position - 1, 0)# 将文件指针移动到新的位置self.file_object.seek(new_position, os.SEEK_SET)# last_line = self.file_object.readline().decode('utf-8', errors='ignore').strip()last_line = self.file_object.read(position - new_position).decode('utf-8', errors='ignore').strip()# 输出调试信息logger.debug(f'Reading line: {last_line}')return last_linedef on_modified(self, event):"""on_modified方法是FileSystemEventHandler的回调方法,当日志文件发生变化时,都会调用这个方法。参数:- event:文件变化事件对象"""# 注意,这里一个要用绝对路径比较,不能直接使用 event.src_path == self.log_file,# event.src_path == self.log_file 的值为false# if event.src_path == self.log_file:if os.path.abspath(event.src_path) == os.path.abspath(self.log_file):# 在文件发生变化时,实时获取最后一行文本self.last_line = self.get_last_line()# 用户可在外部传入一个回调方法,在文本发生变化时执行该事件if self.on_modified_callback:self.on_modified_callback()# 调用基类的同名方法,以便执行基类的默认行为super(LogWatcher, self).on_modified(event)def tail_last_line_and_match(self, match_string=None, max_match_seconds=10):"""实时监控日志文件的变化,并实时获取最后一行文本。如果匹配到指定的字符串,停止监视。参数:- match_string:需要匹配的字符串"""self.match_string = match_stringself.start()end_time = time.time() + max_match_secondstry:while not self.stop_watching and time.time() <= end_time:if self.match_string and self.match_string in self.last_line:self.stop_watching = Trueexcept KeyboardInterrupt:passself.stop_watching = True # 停止监视循环def write_logs(log_file):"""在新线程中写入日志"""for i in range(10):with open(log_file, 'a') as file:file.write(f'New log entry {i}\n')time.sleep(1) # 每秒写入一次日志if __name__ == '__main__':import logginglogging.basicConfig(level=logging.DEBUG)log_file = 'demo.log'# 创建日志文件并写入示例日志with open(log_file, 'w') as file:file.write('This is the first line of the log.\n')file.write('This is the second line of the log.\n')log_watcher = LogWatcher(log_file)# 启动新线程写入日志write_thread = threading.Thread(target=write_logs, args=(log_file,))write_thread.start()# 启动实时监控日志文件变化,并打印最后一行文本,直到匹配到指定字符串或超时才停止监视log_watcher.tail_last_line_and_match(match_string='New log entry 9', max_match_seconds=20)# 等待写入线程结束write_thread.join()
三、Demo验证
运行代码,控制台的输出结果:
DEBUG:__main__:Reading line: This is the second line of the log.
DEBUG:__main__:Reading line: New log entry 0
DEBUG:__main__:Reading line: New log entry 1
DEBUG:__main__:Reading line: New log entry 2
DEBUG:__main__:Reading line: New log entry 3
DEBUG:__main__:Reading line: New log entry 4
DEBUG:__main__:Reading line: New log entry 5
DEBUG:__main__:Reading line: New log entry 6
DEBUG:__main__:Reading line: New log entry 7
DEBUG:__main__:Reading line: New log entry 8
DEBUG:__main__:Reading line: New log entry 9Process finished with exit code 0
欢迎技术交流: