这算是之前一直想做的一件事,趁周末赶快做了。
业务意义:现实中有大量的舆情,这对我们的决策会有比较重要的作用
技术依赖:
- 1 模型基础能力
- 2 消息队列
- 3 异步获取消息
- 4 时间序列库
1 模型基础能力
大模型发展到现在,已经有了很大的变化。过去,在本地执行的大模型能力是完全不够的,deepseek14b出来之后我测了一下,就基础能力还是已经ok,但是推理过程我不知道是不是模型执行本身的代价,但其实我是不需要的。
本地模型能力够了之后,那么就很有意思。一方面,它可以比较节约成本。对于一些研究性质的任务应该是够的,比如我这次就打算用4060TI(16G)和Mac MiniM4(16G)来运行模型。另一方面,由于数据不出去,所以对数据隐私保护就比较好,这就可以尝试更多重要的任务。
另外就是模型的尺寸问题了,我粗略的感觉了一下
尺寸 | 能力 | 输入上限 |
---|---|---|
1.5b | 也许可以做一些特别特别简单的任务 | 2k |
8b | 可以做简单任务 | 上8k |
14b | 可以做中等复杂的任务 | 8k |
32b | 可以做较复杂任务 | |
100b + | 可以直接商业化 |
目前我觉得14b可能是性价比最高的模型尺寸。
小模型有一个明显的问题,就是可能无法严格按照指令执行任务。以生成json为例,可能会出现更高的失败率。当然尺寸模型也会碰到同样的问题。所以,对于大模型应用来说,应该用一些封装方式转为可靠节点。
小模型,很像以前集成学习中的弱分类器,它的使用并不会那么直接。需要有一套体系来进行校验和评估,有意义的是,这套校验和评估方法同样适合更大尺寸的模型,尤其是面对现在层出不同的大模型时。
标准化封装,然后使用体系进行归纳校验
2 消息队列
消息队列非常重要,缓冲和回放是最核心的功能,kafka这方面做的真挺不错。
像类似新闻的摘要数据到来后进入队列,一方面是缓冲,另一方面也是进行按时间的自然截断。如果觉得有必要,是可以让这些数据进入持久队列的,我给自己准备的存储空间非常多,可能未来还会更多(火力不足恐惧症)
由于处理不是唯一的,所以需要有多个出队列,这时候就要用回放了。例如,我先在有 input_topic1,然后我需要进行下一步etl,可能我先想到了方法1 :只提取新闻的标题和正文,满足了现在的需求,于是写入 tier2_topic_etl1。可能过一阵我又想到方法2:再提取评论连接。这时候就会写入tier2_topic_etl2。最后也许会放弃方法1,又或者放弃方法2,但这个就比较简单了。
类似的,或者不可避免的,我会采用多个不同的模型去处理相同的数据,这时候就又可以继续往后分叉了。可以非常方便的服务于多种探索,只要换一个消费组id就行了。这些不同的分支,可以在不同层级的部分进行合并,这方面mongo又非常合适。所以生成唯一主键是必要的。
当然,最后如果结论是采纳的,一定会在时间序列库中表现为事件。到了这个层级,才是(基本)可决策的。
3 异步获取消息
对于大量的数据获取和流转,一定要采用异步/协程的方式。
从去年开始,我才深深体会到这个问题。以前很多时候是批量传入,批量处理,并没有感觉。在大量实时性的任务中,一定会有数量庞大,但是数据又很小的任务,且会受到网络波动。没有异步和有异步效率可能差百倍。
在大模型时代,很多请求是独立到来的,比如用户的一个问题。我粗略估算过,大模型时代单条数据处理的成本可能是以前一百万倍。所以,必须要能够服务好单条的请求。
目前我的大部分服务已经转向了异步和微批次,在worker端重新搭建了fastapi+aps+celery,主要是发现其他的一些成熟异步框架也是这样的,所以还不如自己搭一下。搭好有一阵子了,之前还没想好怎么去进行统一化调度,所以也没有立即启用。我想接着这个周末,需要将这个推入试产状态。
4 时间序列库
技术和工具只是一个表象和约束,关键是背后的逻辑。我觉得line protocal挺好的,不仅约定了一条简洁的时间数据从技术上怎么定义的,同时也反过来促使业务人员思考:
- 1 什么时候要建立一个bucket (db level)
- 2 可能会有哪些measurement (collection level)
- 3 对于某条数据来说,哪些是tags,哪些是field
这种结构会对数据的最终处理带来影响,不是唯一的,但是是非常有用的。最近在看clickhouse的书,里面也是做了一些取舍(不支持update,delete, 不太支持单条精确查询等),然后利用一些新的有利特性(CPU的SIMD),从而达到令人惊叹的性能(极高效取数、极高效统计和极少磁盘占用)。
终于可以言归正传:现在的 point是我能每分钟拿到一些公开新闻摘要,我需要从中间提取事件,然后找出一类特定事件(比如打击证券行业犯罪等)。所以这里有两部分:①提取数据 ②判定事件
因为获取到的数据结构性非常好,所以我会先用简单的正则将数据提取出来,校验后就可以送到下一步。
然后在下一步对事件进行判断,然后给到类别的判定和置信度,这里用到deepseek-r1-14b。
数据的规律是以时间开头的固定元组。我简单写了一个清洗逻辑:
import re
def is_valid_time(text= None):pattern = r'^([01]?[0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]$'return bool(re.match(pattern, text))import redef is_has_brackets(text = None):pattern = r'【[^】]*】'return bool(re.search(pattern, text))
some_msg.split('\n')
s = pd.Series(some_msg.split('\n'))
s_list = list(s)
# 1 标记
time_tag = s.apply(is_valid_time)
import numpy as np
time_tag_arr = time_tag.valuestime_tag_pos = np.argwhere(time_tag_arr ==True).ravel()
time_tag_pos1 = np.append(time_tag_pos, time_tag_pos[-1] + 6)
# time_tag_pos_s = pd.Series(time_tag_pos)
# time_tag_pos_s1 = time_tag_pos_s.diff(1)time_tag_pos_start = time_tag_pos1[:-1]
time_tag_pos_end = time_tag_pos1[1:]# 将起始点和结束点组合成一个二维数组
time_tag_pos2 = np.column_stack((time_tag_pos_start, time_tag_pos_end))candidate_tuple_list = []
for tem_tuple in time_tag_pos2:tem_list = s_list[tem_tuple[0]:tem_tuple[1]]if is_has_brackets(tem_list[1]):tem_dict = {}tem_dict['event_time'] = tem_list[0]tem_dict['title'] = tem_list[1].replace('*','')for j in range(2,len(tem_list)):tem_v = tem_list[j]if tem_v.startswith('阅'):tem_dict['read'] = tem_v elif tem_v.startswith('[评论'):tem_dict['comments'] = tem_v elif tem_v.startswith('分享'):tem_dict['share'] = tem_velse:pass candidate_tuple_list.append(tem_dict)
理解原文结构和写代码大约花了我1.5个小时,主要是在怎么通过下标快速从列表中取出子列表这个问题上纠结了很久。如果只是为了实现逻辑的话,不会超过0.5个小时。
然后随之而来是一个问题:我相信在这个case中,数据的格式一般不会有大变化 。但是未来肯定是会变的,所以这种固定解析逻辑比较不那么靠谱,得要加一个校验。怎么样才能有更好的效率和效果呢?
如果我能抽象出其中一定有的元素,或者我需要的元素,这个可以比较抽象;然后让大模型去完成这个解析显然会更具有通用性。
这种方案后续再试,很多工具都在变,而且我这种非结构化爬取的内容应该不会很多。原则上,应该还是原始数据- 初筛 - 精筛。初筛是一定需要的,先把相关的提出来,或者把不相关的过滤掉。
好了,这一步就算etl,我先做掉。
做之前,需要先订立一些元数据,我会同步写往rocks和mongo。平时主要用rocks,除非数据挂了才用mongo。
数据清洗完了之后就可以入库了,第一次可以将全量的数据写入。这次我没有写原始数据,之后可能需要备份一批,否则kafka只有7天 ,嗯,似乎也够了,新闻太老意义不大。
接下来是deepseek时间:对数据进行事件的判定
简单封装一下:
调用函数
import time
from openai import OpenAI
def call_local_deepseek(api_key ='ollama',base_url="http://172.17.0.1:11434/v1",model="deepseek-r1:14b",system_prompt = '', user_prompt = ''):tick1 = time.time()client = OpenAI(base_url=base_url, # 替换为远端服务器的 IP 或域名api_key=api_key, # 必填项,但会被忽略)response = client.chat.completions.create(model=model, # 替换为你下载的模型名称messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=0 # 设置 temperature 参数,这里设置为 0.7,你可以根据需要调整)print('call_local_deepseek takes %.2f' %(time.time() - tick1))# 输出结果return response.choices[0].message.content
# 测试
call_local_deepseek(user_prompt ='你是谁')
结果的解析函数
import re
import json
def extract_json_from_response(response):"""从大模型的回复中提取 JSON 部分。:param response: 大模型的回复内容:return: 解析后的 JSON 数据(字典形式),若未找到则返回空字典"""# 定义正则表达式模式,用于匹配 JSON 内容json_pattern = r'```json\s*([\s\S]*?)\s*```'# 查找匹配的 JSON 内容match = re.search(json_pattern, response)if match:# 提取 JSON 字符串json_str = match.group(1)try:# 解析 JSON 字符串为 Python 字典json_data = json.loads(json_str)return json_dataexcept json.JSONDecodeError:print("JSON 解析失败,请检查模型回复的 JSON 格式是否正确。")return None
# 以下是一个 Python 函数,用于提取大模型回复中指定标签内的内容。如果未找到指定标签,函数将返回空字符串。import re
def extract_tag_content(response, tag):"""从大模型的回复中提取指定标签内的内容。:param response: 大模型的回复内容:param tag: 要提取的标签,例如 'think':return: 标签内的内容,如果未找到则返回空字符串"""# 构建正则表达式模式,用于匹配指定标签内的内容pattern = fr'<{tag}>([\s\S]*?)</{tag}>'# 查找匹配的内容match = re.search(pattern, response)if match:# 提取标签内的内容return match.group(1)return None
看效果
用函数提取出内容:效果是不错的。不过r1废话多的特点导致了每条数据的处理时间太长了。4060ti有点顶不住啊,哈哈
In [11]: extract_json_from_response(res_content)
Out[11]:
{'market_punishment': 65,'market_down': 0,'explanation': '新闻标题明确提到了对两家私募机构的违规操作进行了处罚,这与A股市场的监管和处罚相关。阅读量较高(52.3万)和分享量(47次)也表明了该事件的关注度。因此,这条新闻的相关性评分较高,但考虑到涉及的是小规模私募,整体影响可能有限,所以评分为65分。'}
我试试并发,据说ollama允许4个,至少3个看起来是好的
In [16]: keyword_args_list = [{'system_prompt':system_prompt, 'user_prompt' : str(sample_data1)},...: {'system_prompt':system_prompt, 'user_prompt' : str(sample_data2)},...: {'system_prompt':system_prompt, 'user_prompt' : str(sample_data3)}...:...: ]...:...:...: tick1 = time.time()...: res_list = thread_concurrent_run(call_local_deepseek, keyword_args_list = keyword_args_list, max_workers= 3)...: tick2 = time.time()...:...:
2025-03-22 23:04:49 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 17.80
2025-03-22 23:04:49 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 17.92
2025-03-22 23:04:52 - httpx - INFO - HTTP Request: POST http://172.17.0.1:11434/v1/chat/completions "HTTP/1.1 200 OK"
call_local_deepseek takes 21.00
解析出来
In [18]: res_list1 = [extract_json_from_response(x) for x in res_list]In [19]: res_list1
Out[19]:
[{'market_punishment': 0,'market_down': 10,'explanation': '新闻标题提到市场一片大好,没有涉及A股市场的处罚信息,因此市场处罚评分为0。虽然整体情绪积极,但并未明确提及市场下跌或投资者损失,故市场下跌评分较低为10分。'},{'market_punishment': 0,'market_down': 90,'explanation': '新闻标题明确提到A股市场大跌1.5%,属于A股市场整体下跌的情况,相关性很高。'},{'market_punishment': 60,'market_down': 10,'explanation': '新闻标题提到对两家小规模私募机构的违规操作进行了处罚,这直接关联到A股市场的监管和整治,因此属于A股市场处罚类别。虽然没有提到具体的市场下跌情况,但处罚通常可能会影响市场情绪,所以相关性较高。阅读量和分享量也显示了一定的关注度。'}]
然后很奇怪的是,单发和多发的情况有点不一样。对于第一条,是稳定这样的。重复跑3条也是稳定的。暂时只能理解量大质量就下滑了。有点搞笑。之前我发现在批量实体识别的时候也有类似的问题。
{'market_punishment': 0,'market_down': 0,'explanation': '新闻标题提到市场一片大好,没有涉及A股市场的处罚或下跌内容,因此两个类别的相关性评分为0。'}
然后我试了下8b,就这个基础问题好像也还行,速度快了一倍。
最后我试了下1.5b,就完全bbq
因为mac可能是需要设置一下网络开放,速度感觉和4060ti也差不多。
实际跑了一个一分钟的数据,哈哈,有点尴尬,赶不上数据的速度,就先科研一下吧。