import os
import zipfile
from datetime import datetime
from logging.handlers import RotatingFileHandlerclass CompressedRotatingFileHandler(RotatingFileHandler):"""自定义的 RotatingFileHandler,支持在日志轮转时压缩旧日志文件,并管理压缩文件的最大数量。"""def __init__(self, filename, mode='a', maxBytes=10 * 1024 * 1024, backupCount=5, encoding=None, delay=False):"""初始化处理器。:param filename: 日志文件名。:param mode: 打开文件的模式,默认是追加模式 'a'。:param maxBytes: 日志文件的最大字节数,达到后进行轮转。:param backupCount: 保留的压缩日志文件的最大数量。:param encoding: 文件编码。:param delay: 如果为 True,则文件将在第一次写入时打开。"""super().__init__(filename, mode, maxBytes, backupCount, encoding, delay)self.backupCount = backupCountdef doRollover(self):"""执行日志轮转:关闭当前日志文件,重命名并压缩旧日志文件,然后删除多余的压缩日志文件。"""if self.stream:self.stream.close()self.stream = Nonetimestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")zip_filename = f"{os.path.splitext(self.baseFilename)[0]}_{timestamp}"file_name = os.path.basename(zip_filename)with zipfile.ZipFile(zip_filename + '.zip', 'w', zipfile.ZIP_DEFLATED) as zipf:zipf.write(self.baseFilename, arcname=file_name + '.log')with open(self.baseFilename, 'w'):passself._cleanup_old_logs()def _cleanup_old_logs(self):"""删除多余的压缩日志文件,保留最新的 backupCount 个文件。"""dir_name, base_name = os.path.split(self.baseFilename)prefix = os.path.splitext(base_name)[0] + "_"suffix = ".zip"compressed_files = [os.path.join(dir_name, f) for f in os.listdir(dir_name)if f.startswith(prefix) and f.endswith(suffix)]compressed_files.sort(key=lambda x: os.path.getmtime(x))while len(compressed_files) > self.backupCount:oldest = compressed_files.pop(0)os.remove(oldest)import loggingdef setup_logger(log_file, max_bytes=10 * 1024 * 1024, backup_count=5, log_level=logging.INFO):"""配置并返回一个日志记录器。:param log_file: 日志文件路径。:param max_bytes: 日志文件的最大字节数,超过后进行轮转和压缩。:param backup_count: 保留的压缩日志文件的最大数量。:param log_level: 日志级别。:return: 配置好的 Logger 对象。"""logger = logging.getLogger("CompressedLogger")logger.setLevel(log_level)logger.propagate = False if not logger.handlers:handler = CompressedRotatingFileHandler(filename=log_file,maxBytes=max_bytes,backupCount=backup_count,encoding='utf-8')formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')handler.setFormatter(formatter)logger.addHandler(handler)return logger
if __name__ == "__main__":logger = setup_logger("logs/application.log", max_bytes=0.5 * 1024 * 1024, backup_count=2000,log_level=logging.DEBUG)for i in range(100000):logger.info(f"这是第 {i} 条日志记录")