AI大模型趣味实战 第7集:多端适配 个人新闻头条 基于大模型和RSS聚合打造个人新闻电台(Flask WEB版) 1
摘要
在信息爆炸的时代,如何高效获取和筛选感兴趣的新闻内容成为一个现实问题。本文将带领读者通过Python和Flask框架,结合大模型的强大能力,构建一个个性化的新闻聚合平台,不仅能够自动收集整理各类RSS源的新闻,还能以语音播报的形式提供"新闻电台"功能。我们将重点探讨如何利用AI大模型优化新闻内容提取、自动生成标签分类,以及如何通过语音合成技术实现新闻播报功能,打造一个真正实用的个人新闻助手。
项目代码仓库:https://github.com/wyg5208/rss_news_flask
系统运行截图如下:
核心概念和知识点
1. RSS技术与信息聚合
RSS(Really Simple Syndication)是一种用于发布频繁更新的网站内容的XML格式,它允许用户订阅网站的更新内容。我们的项目利用RSS技术,从多个新闻源自动获取最新内容,无需手动访问各个网站。
主要涉及知识点:
- RSS格式解析与内容提取
- Web爬虫技术与内容清洗
- 增量式数据更新策略
2. Web应用开发与交互设计
采用Flask框架构建Web应用,实现用户友好的界面和交互体验。
主要涉及知识点:
- Flask应用结构设计
- 前后端交互与API设计
- 用户认证与会话管理
- 响应式界面设计
3. 大模型应用
项目中大模型的应用主要体现在两个方面:
- 新闻内容优化:使用大模型智能提取文章核心内容,去除广告等干扰元素
- 自动标签生成:分析文章内容,自动提取关键词作为标签
主要涉及知识点:
- 大模型API调用方法
- Prompt设计与优化
- 文本分析与关键信息提取
4. 语音合成技术
将文本转换为语音,实现新闻播报功能。
主要涉及知识点:
- 文本到语音(TTS)技术
- 音频文件处理与管理
- 浏览器语音API集成
5. 系统设计与优化
包括数据库设计、任务调度系统、资源管理等方面。
主要涉及知识点:
- SQLite数据库设计与优化
- 多线程任务处理
- 定时任务调度系统
- 日志系统设计与管理
实战案例
接下来,我们将通过详细的代码示例和实现步骤,展示如何从零开始构建这个新闻聚合平台。
1. 项目初始化与环境配置
首先,我们需要创建项目目录结构并安装必要的依赖包。
# 项目依赖
# requirements.txt
alembic==1.15.1
aniso8601==10.0.0
anyio==4.9.0
attrs==25.3.0
beautifulsoup4==4.13.3
blinker==1.9.0
bs4==0.0.2
certifi==2025.1.31
cffi==1.17.1
charset-normalizer==3.4.1
click==8.1.8
colorama==0.4.6
comtypes==1.4.10
feedparser==6.0.10
Flask==2.3.3
Flask-Login==0.6.3
Flask-Migrate==4.0.5
Flask-RESTful==0.3.10
Flask-SQLAlchemy==3.1.1
Flask-WTF==1.2.1
greenlet==3.1.1
gunicorn==21.2.0
h11==0.14.0
httpcore==1.0.7
httpx==0.25.2
idna==3.10
itsdangerous==2.2.0
Jinja2==3.1.2
lxml==4.9.3
Mako==1.3.9
MarkupSafe==3.0.2
ollama==0.1.5
outcome==1.3.0.post0
packaging==24.2
pycparser==2.22
pypiwin32==223
PySocks==1.7.1
python-dotenv==1.0.1
pyttsx3==2.98
pytz==2025.1
pywin32==310
requests==2.31.0
schedule==1.2.1
selenium==4.15.2
sgmllib3k==1.0.0
six==1.17.0
sniffio==1.3.1
sortedcontainers==2.4.0
soupsieve==2.6
SQLAlchemy==2.0.39
trio==0.29.0
trio-websocket==0.12.2
typing_extensions==4.12.2
urllib3==2.3.0
webdriver-manager==4.0.1
Werkzeug==2.3.7
wsproto==1.2.0
WTForms==3.2.1
使用如下命令安装依赖:
pip install -r requirements.txt
2. Flask应用框架搭建
创建基础的Flask应用结构:
# app.py (基础结构)
from flask import Flask, render_template, request, jsonify, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, logout_user, login_required, current_user
import osapp = Flask(__name__)
app.config['SECRET_KEY'] = os.urandom(24)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///rss_news.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Falsedb = SQLAlchemy(app)# 配置登录管理器
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'# 数据库模型定义
class User(UserMixin, db.Model):id = db.Column(db.Integer, primary_key=True)username = db.Column(db.String(100), unique=True)password = db.Column(db.String(100))def __repr__(self):return f'<User {self.username}>'# 路由定义
@app.route('/')
def index():return render_template('index.html')# 应用入口
if __name__ == '__main__':with app.app_context():db.create_all()app.run(debug=True)
3. 数据库模型设计
我们需要设计完整的数据库模型来存储新闻和相关信息:
# 数据库模型定义
class News(db.Model):id = db.Column(db.Integer, primary_key=True)title = db.Column(db.String(500))link = db.Column(db.String(500), unique=True, index=True)description = db.Column(db.Text)content = db.Column(db.Text)source = db.Column(db.String(100))pub_date = db.Column(db.DateTime)add_date = db.Column(db.DateTime, default=datetime.datetime.now)class Tag(db.Model):id = db.Column(db.Integer, primary_key=True)name = db.Column(db.String(100))news_id = db.Column(db.Integer, db.ForeignKey('news.id'))news = db.relationship('News', backref=db.backref('tags', lazy=True))class TagLibrary(db.Model):id = db.Column(db.Integer, primary_key=True)name = db.Column(db.String(100), unique=True)category = db.Column(db.String(50))frequency = db.Column(db.Integer, default=0)class ScheduledTask(db.Model):id = db.Column(db.Integer, primary_key=True)task_id = db.Column(db.String(100), unique=True)task_type = db.Column(db.String(20)) # fetch or broadcastschedule_type = db.Column(db.String(20)) # daily, weekly, monthlyvalue = db.Column(db.Integer) # day number for weekly/monthlytime_value = db.Column(db.String(10)) # HH:MMextra_params = db.Column(db.String(100)) # JSON string for additional params
4. RSS内容抓取与处理
RSS内容抓取是整个系统的核心功能之一:
def fetch_rss_task(use_selenium=False, use_llm=True):logger.info("开始执行RSS抓取任务...")# 创建一个应用上下文对象app_ctx = app.app_context()# 推送上下文app_ctx.push()try:# 获取RSS源with open('rss_list.txt', 'r', encoding='utf-8') as f:rss_urls = [line.strip() for line in f.readlines() if line.strip()]logger.info(f"读取到{len(rss_urls)}个RSS源")if not rss_urls:logger.warning("RSS列表为空,没有要抓取的源")returntotal_fetched = 0newly_added = 0# 初始化WebDriver(如果需要)driver = Noneif use_selenium:try:driver = WebDriverManager.get_instance().get_driver()except Exception as e:logger.error(f"初始化WebDriver时出错: {e}")use_selenium = Falsetry:for url in rss_urls:try:logger.info(f"开始处理RSS源: {url}")# 解析RSS Feedfeed = feedparser.parse(url)if not feed.entries:logger.warning(f"{url} 没有条目")continuesource = feed.feed.title if hasattr(feed.feed, 'title') else urlfor entry in feed.entries:title = entry.title if hasattr(entry, 'title') else "无标题"link = entry.link if hasattr(entry, 'link') else ""description = entry.description if hasattr(entry, 'description') else ""# 清理描述中的HTML标签clean_description = ""if description:soup = BeautifulSoup(description, 'html.parser')clean_description = soup.get_text(separator=' ', strip=True)if not link:logger.warning("跳过无链接的条目")continue# 检查链接是否已存在existing_news = News.query.filter_by(link=link).first()if existing_news:logger.warning(f"跳过已存在的新闻: {title}")total_fetched += 1continue# 获取正文内容content = ""if use_selenium:try:content = extract_content_with_selenium(link, driver)except Exception as e:logger.error(f"使用Selenium提取内容时出错: {e}")content = extract_content(link)else:content = extract_content(link)# 使用大模型优化内容(如果启用)if use_llm and content:try:content = optimize_content_with_llm(content)except Exception as e:logger.error(f"使用大模型优化内容时出错: {e}")# 创建新闻条目news = News(title=title,link=link,description=clean_description,content=content,source=source,pub_date=pub_date)db.session.add(news)db.session.commit()# 生成并保存标签if content:generate_tags_for_news(news)total_fetched += 1newly_added += 1logger.info(f"成功添加新闻: {title}")except Exception as e:logger.error(f"处理RSS源 {url} 时出错: {e}")continuefinally:# 确保资源被释放if use_selenium:logger.info("抓取任务完成,资源将在应用上下文关闭时释放")finally:# 弹出上下文 - 确保在所有情况下都释放上下文app_ctx.pop()
5. 大模型内容优化
使用大模型进行内容优化,提取核心新闻内容:
def optimize_content_with_llm(content):"""使用大模型优化内容"""try:prompt = f"""
你是一个智能的内容提取助手。请从以下HTML内容中提取出真正的新闻文章内容,
移除所有广告、导航、页脚、侧边栏等无关内容。
保留原始的段落结构,返回整洁的HTML格式。
只返回正文内容,不要添加任何解释。内容:
{content[:10000]} # 限制输入长度
"""# 调用Ollama APIresponse = ollama.chat(model='glm4', messages=[{'role': 'user','content': prompt}])extracted_content = response['message']['content']# 确保返回的是HTML格式if not extracted_content.strip().startswith('<'):extracted_content = f"<p>{extracted_content}</p>"return extracted_contentexcept Exception as e:logger.error(f"使用大模型优化内容时出错: {e}")return content # 出错时返回原始内容
6. 自动标签生成
使用大模型自动为新闻生成标签:
def generate_tags_for_news(news):"""为新闻生成标签"""try:# 使用大模型生成标签prompt = f"""
分析以下新闻文章,提取5个关键词作为标签。
标签应该是单个词或短语,不超过10个字符,用逗号分隔。
只返回标签列表,不要添加任何解释。标题: {news.title}
描述: {news.description or ""}
内容: {news.content[:5000] if news.content else ""}
"""try:# 调用Ollama APIresponse = ollama.chat(model='glm4', messages=[{'role': 'user','content': prompt}])tags_text = response['message']['content']# 解析返回的标签tags = [tag.strip() for tag in re.split(r'[,,、]', tags_text) if tag.strip()]# 过滤长度超过10个字符的标签tags = [tag for tag in tags if len(tag) <= 10]# 最多保留5个标签tags = tags[:5]except Exception as e:logger.error(f"使用大模型生成标签时出错: {e}")# 如果大模型失败,尝试使用简单的关键词提取words = re.findall(r'\b\w{3,15}\b', news.title + " " + (news.description or ""))word_count = {}for word in words:if word.lower() not in ['the', 'and', 'for', 'with', 'that', 'this']:word_count[word] = word_count.get(word, 0) + 1tags = [word for word, count in sorted(word_count.items(), key=lambda x: x[1], reverse=True) if len(word) <= 10][:5]# 保存标签for tag_name in tags:# 检查标签库是否有该标签tag_in_library = TagLibrary.query.filter_by(name=tag_name).first()if not tag_in_library:# 创建新标签库条目tag_in_library = TagLibrary(name=tag_name, frequency=1)db.session.add(tag_in_library)else:# 更新使用频率tag_in_library.frequency += 1# 创建新标签关联tag = Tag(name=tag_name, news_id=news.id)db.session.add(tag)db.session.commit()except Exception as e:logger.error(f"生成标签时出错: {e}")db.session.rollback()
7. 语音合成与新闻播报
实现新闻语音播报功能:
@app.route('/api/text_to_speech', methods=['POST'])
@login_required
def text_to_speech():"""将文本转换为语音文件并返回URL"""try:# 获取请求数据data = request.get_json()if not data or 'text' not in data:return jsonify({'status': 'error', 'message': '缺少文本参数'})text = data['text']if not text or len(text) == 0:return jsonify({'status': 'error', 'message': '文本内容为空'})# 限制文本长度,避免处理过长的文本if len(text) > 10000:text = text[:10000] + "..."# 确保存储目录存在audio_dir = os.path.join(app.static_folder, 'audio')if not os.path.exists(audio_dir):os.makedirs(audio_dir)# 生成唯一文件名filename = f"tts_{uuid.uuid4().hex}.mp3"filepath = os.path.join(audio_dir, filename)# 启动后台线程生成语音文件tts_thread = threading.Thread(target=generate_tts_file,args=(text, filepath))tts_thread.start()# 等待生成完成(最多30秒)tts_thread.join(timeout=30)# 检查文件是否生成成功if os.path.exists(filepath) and os.path.getsize(filepath) > 0:# 返回文件URLaudio_url = url_for('static', filename=f'audio/{filename}')return jsonify({'status': 'success','audio_url': audio_url})else:return jsonify({'status': 'error','message': '语音生成失败或超时'})except Exception as e:logger.error(f"文本转语音出错: {e}")return jsonify({'status': 'error','message': str(e)})def generate_tts_file(text, output_file):"""生成语音文件的后台任务"""try:# 初始化语音引擎engine = pyttsx3.init()# 设置语音属性engine.setProperty('rate', 160) # 语速engine.setProperty('volume', 1.0) # 音量# 选择中文语音(如果可用)voices = engine.getProperty('voices')for voice in voices:if 'chinese' in voice.id.lower() or 'zh' in voice.id.lower():engine.setProperty('voice', voice.id)break# 保存为音频文件engine.save_to_file(text, output_file)engine.runAndWait()logger.info(f"语音文件已生成: {output_file}")except Exception as e:logger.error(f"生成语音文件出错: {e}")
8. 定时任务调度系统
设计定时任务系统,自动执行新闻抓取和播报:
def init_scheduler():"""初始化调度器任务"""with app.app_context():# 清空现有任务schedule.clear()# 加载数据库中的任务tasks = ScheduledTask.query.all()for task in tasks:if task.task_type == 'fetch':add_fetch_task(task.task_id, task.schedule_type, task.value, task.time_value)elif task.task_type == 'broadcast':extra_params = json.loads(task.extra_params) if task.extra_params else {}count = extra_params.get('count', 5)add_broadcast_task(task.task_id, task.schedule_type, task.value, task.time_value, count)# 添加定期清理音频文件的任务schedule.every(1).hours.do(cleanup_audio_files).tag('cleanup_audio')logger.info("已添加音频文件清理任务,每小时执行一次")# 添加定期清理过期日志文件的任务schedule.every(12).hours.do(cleanup_log_files).tag('cleanup_logs')logger.info("已添加日志文件清理任务,每12小时执行一次")def add_broadcast_task(task_id, schedule_type, value, time_value, count=5):"""添加新闻播报任务到调度器"""def task_func():logger.info(f"执行新闻播报任务: {task_id}")# 创建应用上下文ctx = app.app_context()ctx.push()try:# 获取最新的新闻news_list = db.session.query(News).order_by(News.add_date.desc()).limit(count).all()if news_list:# 初始化语音引擎try:engine = pyttsx3.init()# 设置语音参数engine.setProperty('rate', 150)engine.setProperty('volume', 0.9)# 播报开始提示engine.say("开始播报最新新闻")engine.runAndWait()# 逐条播报新闻for i, news in enumerate(news_list):# 播报标题engine.say(f"第{i+1}条新闻:{news.title}")engine.runAndWait()# 播报简短描述if news.description and len(news.description) > 0:short_desc = news.description[:200] + "..." if len(news.description) > 200 else news.descriptionengine.say(short_desc)engine.runAndWait()# 短暂停顿,区分不同新闻time.sleep(1)# 播报结束提示engine.say("新闻播报结束")engine.runAndWait()except Exception as e:logger.error(f"语音引擎初始化或播报过程出错: {e}")finally:# 释放上下文ctx.pop()# 根据不同的调度类型添加任务if schedule_type == 'daily':schedule.every().day.at(time_value).do(task_func).tag(task_id)elif schedule_type == 'weekly':days = {1: schedule.every().monday,2: schedule.every().tuesday,3: schedule.every().wednesday,4: schedule.every().thursday,5: schedule.every().friday,6: schedule.every().saturday,7: schedule.every().sunday}days[value].at(time_value).do(task_func).tag(task_id)elif schedule_type == 'monthly':# 设置每月指定日期执行job = schedule.every().day.at(time_value).do(task_func).tag(task_id)# 自定义月度任务的执行条件def monthly_condition():return datetime.datetime.now().day == valuejob.do_run = lambda: monthly_condition() and task_func()
9. 日志系统设计
为应用添加完善的日志系统,便于监控和调试:
# 日志系统配置
LOG_DIR = 'logs'
if not os.path.exists(LOG_DIR):os.makedirs(LOG_DIR)# 内存缓冲区,用于在UI中显示最新日志
log_buffer = deque(maxlen=1000)# 创建自定义的日志记录器
class MemoryHandler(logging.Handler):"""将日志记录到内存缓冲区,用于Web界面显示"""def emit(self, record):log_entry = self.format(record)log_buffer.append({'time': datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S'),'level': record.levelname,'message': record.getMessage(),'formatted': log_entry})# 配置日志记录器
logger = logging.getLogger('rss_app')
logger.setLevel(logging.INFO)# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(console_format)
logger.addHandler(console_handler)# 内存处理器,用于UI显示
memory_handler = MemoryHandler()
memory_handler.setLevel(logging.INFO)
memory_handler.setFormatter(console_format)
logger.addHandler(memory_handler)# 小时文件处理器,每小时自动创建一个新文件
hourly_handler = TimedRotatingFileHandler(filename=os.path.join(LOG_DIR, 'rss_app.log'),when='H',interval=1,backupCount=72, # 保留3天的日志encoding='utf-8'
)
# 设置日志文件后缀格式为 年-月-日_小时
hourly_handler.suffix = "%Y-%m-%d_%H"
hourly_handler.setLevel(logging.INFO)
hourly_handler.setFormatter(console_format)
logger.addHandler(hourly_handler)@app.route('/system_logs')
@login_required
def system_logs():"""显示系统日志页面"""logger.info('访问系统日志页面')# 获取日志文件列表log_files = []try:# 获取所有日志文件并按修改时间排序log_pattern = os.path.join(LOG_DIR, 'rss_app.log*')all_log_files = glob.glob(log_pattern)all_log_files.sort(key=os.path.getmtime, reverse=True)for file_path in all_log_files:file_name = os.path.basename(file_path)# 获取文件大小和修改时间file_stats = os.stat(file_path)file_size = file_stats.st_size / 1024 # KBfile_time = datetime.datetime.fromtimestamp(file_stats.st_mtime).strftime('%Y-%m-%d %H:%M:%S')# 添加文件信息if file_name == 'rss_app.log':display_name = f"当前日志 ({file_size:.1f} KB) - {file_time}"log_files.append({'name': display_name,'path': file_path})else:# 格式化时间戳timestamp = file_name.replace('rss_app.log.', '')try:# 尝试解析时间戳parsed_time = datetime.datetime.strptime(timestamp, '%Y-%m-%d_%H')display_name = f"{parsed_time.strftime('%Y-%m-%d %H:00')} ({file_size:.1f} KB)"except:display_name = f"{file_name} ({file_size:.1f} KB) - {file_time}"log_files.append({'name': display_name,'path': file_path})except Exception as e:logger.error(f"获取日志文件列表出错: {str(e)}")flash(f"获取日志文件列表出错: {str(e)}", 'danger')# 统计信息stats = {'total': len(log_buffer),'error': sum(1 for log in log_buffer if log['level'] == 'ERROR'),'warning': sum(1 for log in log_buffer if log['level'] == 'WARNING'),'files': len(log_files)}return render_template('system_logs.html', log_files=log_files, stats=stats)