来自工业界的知识库 RAG 服务(二),RagFlow 源码全流程深度解析

背景介绍

前面介绍过 有道 QAnything 源码解析,通过深入了解工业界的知识库 RAG 服务,得到了不少调优 RAG 服务的新想法。

因此本次趁热打铁,额外花费一点时间,深入研究了另一个火热的开源 RAG 服务 RagFlow 的完整实现流程,希望同样有所收获。

项目概述

框架设计

首先依旧可以先从框架图入手,与 常规的 RAG 架构 进行一些比较
请添加图片描述

可以看到右侧知识库被明显放大,同时最右侧详细介绍了文件解析的各种手段,比如 OCRDocument Layout Analyze 等,这些在常规的 RAG 中可能会作为一个不起眼的 Unstructured Loader 包含进去,可以猜到 RagFlow 的一个核心能力在于文件的解析环节。

在 官方文档 中也反复强调 Quality in, quality out, 反映出 RAGFlow 的独到之处在于细粒度文档解析。

另外 介绍文章 中提到其没有使用任何 RAG 中间件,而是完全重新研发了一套智能文档理解系统,并以此为依托构建 RAG 任务编排体系,也可以理解文档的解析是其 RagFlow 的核心亮点。

源码结构

首先可以看到 RagFlow 的源码结构:
请添加图片描述
对应模块的功能如下:

  • api 为后端的 API
  • web 对应的是前端页面
  • conf 为配置信息
  • deepdoc 对应的就是文件解析模块

从代码结构就能看出文件解析 deepdoc 在 RAGFlow 中一等公民角色

另外相关的技术栈如下:

  • Web 服务是基于 Flask 实现,这个在 2024 年来看稍微有一点点过时了
  • 业务数据库使用的是 MySQL
  • 向量数据库使用的是 ElasticSearch ,奇怪的是公司有自己的向量数据库 infinity 竟然默认没有用上
  • 文件存储使用的是 MinIO

正如前面介绍的因为没有使用 RAG 中间件,比如 langchainllamaIndex,因此实现上与常规的 RAG 系统会存在一些差异

源码解析

文件加载的支持

常规的 RAG 服务都是在上传时进行文件的加载和解析,但是 RAGFlow 的上传仅仅包含上传至 MinIO,需要手工点击触发文件的解析。
请添加图片描述
根据实际体验,以及网络上的反馈了解到 RAGFlow 的解析相当慢,估计资源开销也比较大,因此也能理解为什么采取二次手工确认的产品方案了。

实际的文件解析通过接口 /v1/document/run 进行触发的,实际的处理是在 api/db/services/task_service.py 中的 queue_tasks() 中完成的,此方法会根据文件创建一个或多个异步任务,方便异步执行。实现如下所示:

def queue_tasks(doc, bucket, name):def new_task():nonlocal docreturn {"id": get_uuid(),"doc_id": doc["id"]}tsks = []# pdf 文件的解析,根据不同的类型设置单个任务最多处理的页数# 默认单个任务处理 12 页 pdf,pager 类型的 pdf 一个任务处理 22 页,其他 pdf 不分页if doc["type"] == FileType.PDF.value:file_bin = MINIO.get(bucket, name)do_layout = doc["parser_config"].get("layout_recognize", True)pages = PdfParser.total_page_number(doc["name"], file_bin)page_size = doc["parser_config"].get("task_page_size", 12)if doc["parser_id"] == "paper":page_size = doc["parser_config"].get("task_page_size", 22)if doc["parser_id"] == "one":page_size = 1000000000if not do_layout:page_size = 1000000000page_ranges = doc["parser_config"].get("pages")if not page_ranges:page_ranges = [(1, 100000)]for s, e in page_ranges:s -= 1s = max(0, s)e = min(e - 1, pages)for p in range(s, e, page_size):task = new_task()task["from_page"] = ptask["to_page"] = min(p + page_size, e)tsks.append(task)# 表格数据单个任务处理 3000 行elif doc["parser_id"] == "table":file_bin = MINIO.get(bucket, name)rn = RAGFlowExcelParser.row_number(doc["name"], file_bin)for i in range(0, rn, 3000):task = new_task()task["from_page"] = itask["to_page"] = min(i + 3000, rn)tsks.append(task)else:tsks.append(new_task())bulk_insert_into_db(Task, tsks, True)DocumentService.begin2parse(doc["id"])# 任务插入 Redis 消息队列,方便异步处理for t in tsks:assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=t), "Can't access Redis. Please check the Redis' status."

从上面的实现来看,文件的解析是根据内容拆分为多个任务,通过 Redis 消息队列进行暂存,之后就可以离线异步处理。

直接查看对应的消息队列的消费模块,对应在 rag/svr/task_executor.py 中的 main() 方法中。实现简化后如下所示:

def main():# 获取任务rows = collect()for _, r in rows.iterrows():embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])# 执行文件解析cks = build(r)# 执行向量化tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)init_kb(r)# 写入 ESes_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))

完整的处理流程如下所示:

  1. 调用 collect() 方法从消息队列中获取任务
  2. 接下来每个任务会依次调用 build() 进行文件的解析
  3. 调用 embedding() 方法进行向量化
  4. 最后调用 ELASTICSEARCH.bulk() 写入 ElasticSearch,从这里就可以看到向量库的技术选型

接下来主要关注 build() 方法深入 RAGFlow 核心的文件解析,具体的实现简化后如下所示:

def build(row):# 根据类型选择合适的解析器chunker = FACTORY[row["parser_id"].lower()]# 执行文档的解析和切片cks = chunker.chunk(row["name"],binary=binary,from_page=row["from_page"],to_page=row["to_page"],lang=row["language"],callback=callback,kb_id=row["kb_id"],parser_config=row["parser_config"],tenant_id=row["tenant_id"],)

实际是根据 parser_id 去选择合适的解析器组,注意这个应该是从业务层得到的一个类型,每个解析器组中都包含了 pdf, word 等支持格式的文件解析,可以理解为一个使用场景的属性。这个会不会导致后续使用场景较多情况下,出现 N (场景) * N (文件格式) 的组合情况可能值得考虑,后续可能会进行优化。

以默认的 naive 类型为例深入对应的 chunk() 实现,其对应的实现在 rag/app/naive.py 中。此方法中包含了目前主持的 docx, pdf, xlsx, md 等格式的解析,我们以 pdf 为例深入查看对应的实现。

可以看到解析器是继承自 deepdoc/parser/pdf_parser.py 中的 RAGFlowPdfParser 实现,终于进入深度文档解析环节了。

pdf 文件的打开是基于 PyPDF2 实现,并基于 pdfplumber 实现表格数据的提取,这个库相对 PyMuPDF 速度更慢,但是可以处理得更精细。

另外使用的 OCR 模型为 /InfiniFlow/deepdoc,在解析中额外加载了一个 XGB 模型 InfiniFlow/text_concat_xgb_v1.0 用于内容提取。

实际的解析过程写的也很复杂,无怪乎处理速度有点慢。不过预期处理效果相对其他 RAG 项目也会好一些。从实际前端的展示的 Demo 来看,RAGFlow 可以将解析后的文本块与原始文档中的原始位置关联起来,这个效果还是比较惊艳的,目前看起来只有 RagFlow 实现了类似的效果。

请添加图片描述

文件的预处理策略

在 RAGFlow 中的文件中包含了不少了数据的清理操作,比如在 deepdoc/vision/layout_recognizer.py 中的就包含着文档中无用内容的判断,示例如下:

def __is_garbage(b):patt = [r"^•+$", r"(版权归©|免责条款|地址[::])", r"\.{3,}", "^[0-9]{1,2} / ?[0-9]{1,2}$",r"^[0-9]{1,2} of [0-9]{1,2}$", "^http://[^ ]{12,}","(资料|数据)来源[::]", "[0-9a-z._-]+@[a-z0-9-]+\\.[a-z]{2,3}","\\(cid *: *[0-9]+ *\\)"]return any([re.search(p, b["text"]) for p in patt])

文档中版权内容,参考来源信息等内容会被清理。但是这样处理比较分散,而且不同的流程中也充斥着大量的特殊处理,导致从源码很难拆分出明确的预处理逻辑。

从常规的流程来看,RAGFlow 将提取的内容分为普通的文本内容 + 表格,分别对这两部分内容进行 tokenize, 方便进行检索。

文件检索的支持 (包含混合检索)

文件检索的支持可以查看实际的对话处理流程,对话的 API 为 /v1/conversation/completion,实际对话的处理是在 api/db/services/dialog_service.py 中的 chat() 方法中完成

深入跟踪对话处理流程,可以看到文件的检索是在 rag/nlp/search.py 中的 search() 方法中完成。

RAGFlow 的检索目前实现的是混合检索,实现的是文本检索 + 向量检索,混合检索完全依赖 ElasticSearch 实现,具体的实现如下所示:

def search(self, req, idxnm, emb_mdl=None):qst = req.get("question", "")bqry, keywords = self.qryr.question(qst)bqry = add_filters(bqry)bqry.boost = 0.05# 构造 ElasticSearch 文本查询的请求s = Search()pg = int(req.get("page", 1)) - 1ps = int(req.get("size", 1000))topk = int(req.get("topk", 1024))src = req.get("fields", ["docnm_kwd", "content_ltks", "kb_id", "img_id", "title_tks", "important_kwd","image_id", "doc_id", "q_512_vec", "q_768_vec", "position_int","q_1024_vec", "q_1536_vec", "available_int", "content_with_weight"])s = s.query(bqry)[pg * ps:(pg + 1) * ps]s = s.highlight("content_ltks")s = s.highlight("title_ltks")if not qst:if not req.get("sort"):s = s.sort({"create_time": {"order": "desc", "unmapped_type": "date"}},{"create_timestamp_flt": {"order": "desc", "unmapped_type": "float"}})else:s = s.sort({"page_num_int": {"order": "asc", "unmapped_type": "float","mode": "avg", "numeric_type": "double"}},{"top_int": {"order": "asc", "unmapped_type": "float","mode": "avg", "numeric_type": "double"}},{"create_time": {"order": "desc", "unmapped_type": "date"}},{"create_timestamp_flt": {"order": "desc", "unmapped_type": "float"}})if qst:s = s.highlight_options(fragment_size=120,number_of_fragments=5,boundary_scanner_locale="zh-CN",boundary_scanner="SENTENCE",boundary_chars=",./;:\\!(),。?:!……()——、")s = s.to_dict()# 补充向量查询的信息q_vec = []if req.get("vector"):assert emb_mdl, "No embedding model selected"s["knn"] = self._vector(qst, emb_mdl, req.get("similarity", 0.1), topk)s["knn"]["filter"] = bqry.to_dict()if "highlight" in s:del s["highlight"]q_vec = s["knn"]["query_vector"]# 将构造的完整查询提交给 ElasticSearch 进行查询res = self.es.search(deepcopy(s), idxnm=idxnm, timeout="600s", src=src)kwds = set([])for k in keywords:kwds.add(k)for kk in rag_tokenizer.fine_grained_tokenize(k).split(" "):if len(kk) < 2:continueif kk in kwds:continuekwds.add(kk)aggs = self.getAggregation(res, "docnm_kwd")return self.SearchResult(total=self.es.getTotal(res),ids=self.es.getDocIds(res),query_vector=q_vec,aggregation=aggs,highlight=self.getHighlight(res),field=self.getFields(res, src),keywords=list(kwds))

可以看到 RAGFlow 将混合检索需求转换为复杂的查询条件,利用 elasticsearch-dsl 进行复杂查询的构造,之后直接提交给 ElasticSearch 即可。

检索结果的重排

文件的重排是在 rag/nlp/search.py 中的 rerank() 中完成的,重排是基于文本匹配得分 + 向量匹配得分混合进行排序,默认文本匹配的权重为 0.3, 向量匹配的权重为 0.7,对应的实现如下所示:

# tkweight 为文本匹配权重,vtweight 为向量匹配权重def rerank(self, sres, query, tkweight=0.3,vtweight=0.7, cfield="content_ltks"):# 获取文本关键词_, keywords = self.qryr.question(query)# 获取文本向量ins_embd = [Dealer.trans2floats(sres.field[i].get("q_%d_vec" % len(sres.query_vector), "\t".join(["0"] * len(sres.query_vector)))) for i in sres.ids]if not ins_embd:return [], [], []for i in sres.ids:if isinstance(sres.field[i].get("important_kwd", []), str):sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]ins_tw = []for i in sres.ids:content_ltks = sres.field[i][cfield].split(" ")title_tks = [t for t in sres.field[i].get("title_tks", "").split(" ") if t]important_kwd = sres.field[i].get("important_kwd", [])tks = content_ltks + title_tks + important_kwdins_tw.append(tks)# 获取整体相似分,文本相似分,向量相似分sim, tksim, vtsim = self.qryr.hybrid_similarity(sres.query_vector,ins_embd,keywords,ins_tw, tkweight, vtweight)return sim, tksim, vtsim

获取混合相似分之后,基于混合的相似分进行过滤和重排,默认混合得分低于 0.2 的会被过滤掉

大模型的处理

在进行上面的检索和重排阶段中,只是进行了必要的过滤,没有限制匹配文档的数量。

实际内容可能会超过大模型的输入 token 数量,因此在调用大模型前会调用 api/db/services/dialog_service.py 文件中 message_fit_in() 根据大模型可用的 token 数量进行过滤。这部分与有道的 QAnything 的实现大同小异,就不额外展开了。

将检索的内容,历史聊天记录以及问题构造为 prompt,即可作为大模型的输入了,默认的英文 prompt 如下所示:

"""
You are an intelligent assistant. Please summarize the content of the knowledge base to answer the question. Please list the data in the knowledge base and answer in detail. When all knowledge base content is irrelevant to the question, your answer must include the sentence "The answer you are looking for is not found in the knowledge base!" Answers need to consider chat history.Here is the knowledge base:{knowledge}The above is the knowledge base.
"""

对应的中文 prompt 如下所示:

"""
你是一个智能助手,请总结知识库的内容来回答问题,请列举知识库中的数据详细回答。当所有知识库内容都与问题无关时,你的回答必须包括“知识库中未找到您要的答案!”这句话。回答需要考虑聊天历史。以下是知识库:{knowledge}以上是知识库
"""

总结

通过上面的介绍,可以对开源的 RagFlow 有了一个大致的了解,与前面的 有道 QAnything 整体流程还是比较类似的。同样支持混合检索,文本检索的方案都是基于 ElasticSearch 实现,在检索后都实现了 Rerank 流程,并在进入大模型之前基于可用最大 token 进行动态过滤。

不过对比来看,来自互联网大厂有道的 QAnything 的代码质量高很多,实现功能封装与命名都相当简洁易懂,工程的鲁棒性也明显高出一大截。如果想要通过源码了解 RAG 服务推荐优先阅读 QAnything。

当然 RagFlow 也有一些独到之处,对于文件的细粒度处理带来更高质量的参考信息,从而更好的提升参考信息,如果对这部分感兴趣可以深入了解下 RagFlow 的相关代码实现细节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/18168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1688 API接口介绍:开启您的电商新篇章

什么是1688 API接口&#xff1f; API&#xff08;Application Programming Interface&#xff0c;应用程序编程接口&#xff09;是一组协议和工具&#xff0c;用于定义不同的软件应用程序如何彼此交互。1688 API接口则是1688平台为商家提供的一套接口规范&#xff0c;允许商家…

网络工程师---第四十三天

1、网络地址转换请简述DNS服务器迭代查询与递归的区别&#xff1f; 2、请从技术方面简述RAIDO、RAID1、RAID3、 RAID5的特点&#xff1f; 3、请从层次结构、部署设备和功能配置方面描述层次化的网络结构&#xff1f; 4、请简述IPSECVPN和AH和ESP的区别&#xff1f; 5、请简述ID…

Java网络编程之TCP协议核心机制(三)

题外话 最近学习内容很多嗷 正题 延时应答机制 当客户端发送数据到服务器时,服务器不会立即返回ACK,而是等待一会再返回ACK 这段等待时间应用程序可能会消化掉接收缓冲区中的数据,当服务器返回ACK时,就会携带此时接收缓冲区大小的信息 当客户端下次再发送数据的时候就可以…

SQL 语言:数据操作

文章目录 SELECT 基本结构简单查询连接查询子查询聚集函数和更名操作分组查询字符串操作集合操作UNION 运算INTERSECT 运算EXCEPT 运算 视图查询和更新WITH 子句其他语句总结 SQL 的数据操作包括 SELECT(查询)、INSERT(插入)、DELETE(删除)和 UPDATE(修改)四条语句。 SELECT 基…

钡铼PLC集成BL121PO协议网关优化电子制造产线的生产效率

PLC转OPC UA协议转换网关BL121PO在电子制造产线中的优化应用&#xff0c;可以显著提高生产效率&#xff0c;促进生产线的智能化和信息化发展。本文将从以下几个方面进行阐述&#xff1a; 提高设备间通信效率&#xff1a;PLC转OPC UA协议转换网关BL121PO通过高效的协议转换&…

SpringBoot基础篇

1&#xff1a;parent 目的&#xff1a;减少依赖配置 开发SpringBoot程序要继承spring-boot-starter-parentspring-boot-starter-parent中定义了若干个依赖管理继承parent模块可以避免多个依赖使用相同技术出现依赖版本冲突继承parent的形式也可以采用引入依赖的i形式实现效果…

项目构建工具maven

一、概述 1、maven是apache的一个开源项目&#xff0c;是一个优秀的项目构建/管理工具 2、apache(软件基金会、非盈利组织、管理维护一些开源项目) 二、功能 1、管理项目中jar包和jar包与jar包之间的依赖 2、完成项目编译、测试、打包 三、核心文件 pom.xml:在里面配置相…

信息学奥赛初赛天天练-14-阅读程序-字符数组、唯一分解定理应用

更多资源请关注纽扣编程微信公众号 1 2019 CSP-J 阅读程序1 (程序输入不超过数组或字符串定义的范围&#xff1b;判断题正确填√,错误填&#xff1b;除特殊说明外&#xff0c;判断题1.5分&#xff0c;选择题3分&#xff0c;共计40分) 1 输入的字符串只能由小写字母或大写字母组…

【大宗】第一期:大航海时代下的[集运欧线]

一、大航海时代 - 集运欧线前世今生 01 航运合约指数的诞生 ‍‍‍‍ 2023年8月18日&#xff0c;上海期货交易所的伙伴们搞了个大新闻——他们推出了一种新的期货品种&#xff0c;叫做“欧线集运”。这可不是什么普通的期货&#xff0c;它是基于一个叫做SCFIS的指数&#xf…

上海汇正财经官网怎么样?客户好评如潮,口碑赞誉之声不绝于耳

在财经服务领域&#xff0c;客户评价是衡量一家企业信誉和服务质量的重要标准。上海汇正财经作为业内知名的财经服务平台&#xff0c;以其优质的服务赢得了广大客户的认可和好评。大量正面用户评价和成功服务的案例&#xff0c;充分证明了上海汇正财经是一个值得信赖的正规企业…

每日两题 / 131. 分割回文串 42. 接雨水(LeetCode热题100)

131. 分割回文串 - 力扣&#xff08;LeetCode&#xff09; 数据量较小&#xff0c;考虑直接暴力&#xff0c;每次dfs&#xff1a;以bg作为左区间&#xff0c;往右遍历&#xff0c;找到一段回文串区间后&#xff0c;将回文串插入vector<string>&#xff0c;并以下一个下标…

段位在于面对人性之恶,一笑而过

这个小哥哥不知道是哪里不对劲了&#xff0c;突然给我留言说我在骗流量&#xff0c;骗关注。公众号是我的&#xff0c;文章是我写的&#xff0c;主要分享的就是我创业的一些接单案例&#xff0c;因为之前收到很多无效的留言&#xff0c;寻求合作就几个字我不想接收无效信息&…

Unity 权限 之 Android 【权限 动态申请】功能的简单封装

Unity 权限 之 Android 【权限 动态申请】功能的简单封装 目录 Unity 权限 之 Android 【权限 动态申请】功能的简单封装 一、简单介绍 二、Android 权限 动态申请 三、实现原理 四、注意事项 五、案例实现简单步骤 附录&#xff1a; 一、进一步优化 二、多个权限申请…

第三方软件检测机构要具备哪些资质要求?专业测试报告如何申请?

第三方软件检测机构是独立于软件开发商和用户之外的公正机构&#xff0c;负责对软件进行全面的检测和评估。其独立性保证了评测结果的客观性和公正性&#xff0c;有效避免了软件开发商对自身产品的主观偏见和误导。 要成为一家合格的第三方软件检测机构&#xff0c;需要具备一…

OKR 实践:来自一位信息技术部主管的成功秘诀

OKR 实践&#xff1a;来自一位信息技术部主管的成功秘诀 为什么选择OKR 公司信息技术部为38个各地分公司、12,000名员工的IT需求提供服务。庞大而多样的客户群常常使我们的团队分散&#xff0c;许多团队都在各自为政&#xff0c;以个案为基础解决问题&#xff0c;而不是采用企业…

LabVIEW高低温试验箱控制系统

要实现LabVIEW高低温试验箱控制系统&#xff0c;需要进行硬件配置、软件设计和系统集成&#xff0c;确保LabVIEW能够有效地监控和控制试验箱的温度。以下是详细说明&#xff1a; 硬件配置 选择合适的试验箱&#xff1a; 确定高低温试验箱的型号和品牌。 确认试验箱是否支持外…

摸鱼大数据——Hive表操作——文件数据的导入和导出

数据导入和导出 1、文件数据导入 1.1 直接上传文件 window页面上传 需求: 已知emp1.txt文件在windows/mac系统,要求使用hdfs保存此文件 并且使用hivesql建表关联数据 use day06; ​ -- 1- 创建Hive表 create table emp1 (id int,name string,salary int,dept string )row for…

基于51单片机的汽车智能灯光控制系统

一.硬件方案 本设计硬件部分&#xff0c;中央处理器采用了STC89C52RC单片机&#xff0c;另外使用两个灯珠代表远近光灯&#xff0c;感光部分采用了光敏电阻&#xff0c;因为光敏电阻输出的是电压模拟信号&#xff0c;单片机不能直接处理模拟信号&#xff0c;所以经过ADC0832进…

基于python flask +pyecharts实现的气象数据可视化分析大屏

背景 气象数据可视化分析大屏基于Python Flask和Pyecharts技术&#xff0c;旨在通过图表展示气象数据的分析结果&#xff0c;提供直观的数据展示和分析功能。在当今信息化时代&#xff0c;气象数据的准确性和实时性对各行业具有重要意义。通过搭建气象数据可视化分析大屏&…

【kubernetes】关于k8s集群如何将pod调度到指定node节点(亲和与反亲和等)

目录 一、调度约束 1.1K8S的 List-Watch 机制 ⭐⭐⭐⭐⭐ 1.1.1Pod 启动典型创建过程 二、调度过程 2.1Predicate&#xff08;预选策略&#xff09; 常见的算法 2.2priorities&#xff08;优选策略&#xff09;常见的算法 三、k8s将pod调度到指定node的方法 3.1指定…