来自工业界的知识库 RAG 服务(二),RagFlow 源码全流程深度解析

背景介绍

前面介绍过 有道 QAnything 源码解析,通过深入了解工业界的知识库 RAG 服务,得到了不少调优 RAG 服务的新想法。

因此本次趁热打铁,额外花费一点时间,深入研究了另一个火热的开源 RAG 服务 RagFlow 的完整实现流程,希望同样有所收获。

项目概述

框架设计

首先依旧可以先从框架图入手,与 常规的 RAG 架构 进行一些比较
请添加图片描述

可以看到右侧知识库被明显放大,同时最右侧详细介绍了文件解析的各种手段,比如 OCRDocument Layout Analyze 等,这些在常规的 RAG 中可能会作为一个不起眼的 Unstructured Loader 包含进去,可以猜到 RagFlow 的一个核心能力在于文件的解析环节。

在 官方文档 中也反复强调 Quality in, quality out, 反映出 RAGFlow 的独到之处在于细粒度文档解析。

另外 介绍文章 中提到其没有使用任何 RAG 中间件,而是完全重新研发了一套智能文档理解系统,并以此为依托构建 RAG 任务编排体系,也可以理解文档的解析是其 RagFlow 的核心亮点。

源码结构

首先可以看到 RagFlow 的源码结构:
请添加图片描述
对应模块的功能如下:

  • api 为后端的 API
  • web 对应的是前端页面
  • conf 为配置信息
  • deepdoc 对应的就是文件解析模块

从代码结构就能看出文件解析 deepdoc 在 RAGFlow 中一等公民角色

另外相关的技术栈如下:

  • Web 服务是基于 Flask 实现,这个在 2024 年来看稍微有一点点过时了
  • 业务数据库使用的是 MySQL
  • 向量数据库使用的是 ElasticSearch ,奇怪的是公司有自己的向量数据库 infinity 竟然默认没有用上
  • 文件存储使用的是 MinIO

正如前面介绍的因为没有使用 RAG 中间件,比如 langchainllamaIndex,因此实现上与常规的 RAG 系统会存在一些差异

源码解析

文件加载的支持

常规的 RAG 服务都是在上传时进行文件的加载和解析,但是 RAGFlow 的上传仅仅包含上传至 MinIO,需要手工点击触发文件的解析。
请添加图片描述
根据实际体验,以及网络上的反馈了解到 RAGFlow 的解析相当慢,估计资源开销也比较大,因此也能理解为什么采取二次手工确认的产品方案了。

实际的文件解析通过接口 /v1/document/run 进行触发的,实际的处理是在 api/db/services/task_service.py 中的 queue_tasks() 中完成的,此方法会根据文件创建一个或多个异步任务,方便异步执行。实现如下所示:

def queue_tasks(doc, bucket, name):def new_task():nonlocal docreturn {"id": get_uuid(),"doc_id": doc["id"]}tsks = []# pdf 文件的解析,根据不同的类型设置单个任务最多处理的页数# 默认单个任务处理 12 页 pdf,pager 类型的 pdf 一个任务处理 22 页,其他 pdf 不分页if doc["type"] == FileType.PDF.value:file_bin = MINIO.get(bucket, name)do_layout = doc["parser_config"].get("layout_recognize", True)pages = PdfParser.total_page_number(doc["name"], file_bin)page_size = doc["parser_config"].get("task_page_size", 12)if doc["parser_id"] == "paper":page_size = doc["parser_config"].get("task_page_size", 22)if doc["parser_id"] == "one":page_size = 1000000000if not do_layout:page_size = 1000000000page_ranges = doc["parser_config"].get("pages")if not page_ranges:page_ranges = [(1, 100000)]for s, e in page_ranges:s -= 1s = max(0, s)e = min(e - 1, pages)for p in range(s, e, page_size):task = new_task()task["from_page"] = ptask["to_page"] = min(p + page_size, e)tsks.append(task)# 表格数据单个任务处理 3000 行elif doc["parser_id"] == "table":file_bin = MINIO.get(bucket, name)rn = RAGFlowExcelParser.row_number(doc["name"], file_bin)for i in range(0, rn, 3000):task = new_task()task["from_page"] = itask["to_page"] = min(i + 3000, rn)tsks.append(task)else:tsks.append(new_task())bulk_insert_into_db(Task, tsks, True)DocumentService.begin2parse(doc["id"])# 任务插入 Redis 消息队列,方便异步处理for t in tsks:assert REDIS_CONN.queue_product(SVR_QUEUE_NAME, message=t), "Can't access Redis. Please check the Redis' status."

从上面的实现来看,文件的解析是根据内容拆分为多个任务,通过 Redis 消息队列进行暂存,之后就可以离线异步处理。

直接查看对应的消息队列的消费模块,对应在 rag/svr/task_executor.py 中的 main() 方法中。实现简化后如下所示:

def main():# 获取任务rows = collect()for _, r in rows.iterrows():embd_mdl = LLMBundle(r["tenant_id"], LLMType.EMBEDDING, llm_name=r["embd_id"], lang=r["language"])# 执行文件解析cks = build(r)# 执行向量化tk_count = embedding(cks, embd_mdl, r["parser_config"], callback)init_kb(r)# 写入 ESes_r = ELASTICSEARCH.bulk(cks, search.index_name(r["tenant_id"]))

完整的处理流程如下所示:

  1. 调用 collect() 方法从消息队列中获取任务
  2. 接下来每个任务会依次调用 build() 进行文件的解析
  3. 调用 embedding() 方法进行向量化
  4. 最后调用 ELASTICSEARCH.bulk() 写入 ElasticSearch,从这里就可以看到向量库的技术选型

接下来主要关注 build() 方法深入 RAGFlow 核心的文件解析,具体的实现简化后如下所示:

def build(row):# 根据类型选择合适的解析器chunker = FACTORY[row["parser_id"].lower()]# 执行文档的解析和切片cks = chunker.chunk(row["name"],binary=binary,from_page=row["from_page"],to_page=row["to_page"],lang=row["language"],callback=callback,kb_id=row["kb_id"],parser_config=row["parser_config"],tenant_id=row["tenant_id"],)

实际是根据 parser_id 去选择合适的解析器组,注意这个应该是从业务层得到的一个类型,每个解析器组中都包含了 pdf, word 等支持格式的文件解析,可以理解为一个使用场景的属性。这个会不会导致后续使用场景较多情况下,出现 N (场景) * N (文件格式) 的组合情况可能值得考虑,后续可能会进行优化。

以默认的 naive 类型为例深入对应的 chunk() 实现,其对应的实现在 rag/app/naive.py 中。此方法中包含了目前主持的 docx, pdf, xlsx, md 等格式的解析,我们以 pdf 为例深入查看对应的实现。

可以看到解析器是继承自 deepdoc/parser/pdf_parser.py 中的 RAGFlowPdfParser 实现,终于进入深度文档解析环节了。

pdf 文件的打开是基于 PyPDF2 实现,并基于 pdfplumber 实现表格数据的提取,这个库相对 PyMuPDF 速度更慢,但是可以处理得更精细。

另外使用的 OCR 模型为 /InfiniFlow/deepdoc,在解析中额外加载了一个 XGB 模型 InfiniFlow/text_concat_xgb_v1.0 用于内容提取。

实际的解析过程写的也很复杂,无怪乎处理速度有点慢。不过预期处理效果相对其他 RAG 项目也会好一些。从实际前端的展示的 Demo 来看,RAGFlow 可以将解析后的文本块与原始文档中的原始位置关联起来,这个效果还是比较惊艳的,目前看起来只有 RagFlow 实现了类似的效果。

请添加图片描述

文件的预处理策略

在 RAGFlow 中的文件中包含了不少了数据的清理操作,比如在 deepdoc/vision/layout_recognizer.py 中的就包含着文档中无用内容的判断,示例如下:

def __is_garbage(b):patt = [r"^•+$", r"(版权归©|免责条款|地址[::])", r"\.{3,}", "^[0-9]{1,2} / ?[0-9]{1,2}$",r"^[0-9]{1,2} of [0-9]{1,2}$", "^http://[^ ]{12,}","(资料|数据)来源[::]", "[0-9a-z._-]+@[a-z0-9-]+\\.[a-z]{2,3}","\\(cid *: *[0-9]+ *\\)"]return any([re.search(p, b["text"]) for p in patt])

文档中版权内容,参考来源信息等内容会被清理。但是这样处理比较分散,而且不同的流程中也充斥着大量的特殊处理,导致从源码很难拆分出明确的预处理逻辑。

从常规的流程来看,RAGFlow 将提取的内容分为普通的文本内容 + 表格,分别对这两部分内容进行 tokenize, 方便进行检索。

文件检索的支持 (包含混合检索)

文件检索的支持可以查看实际的对话处理流程,对话的 API 为 /v1/conversation/completion,实际对话的处理是在 api/db/services/dialog_service.py 中的 chat() 方法中完成

深入跟踪对话处理流程,可以看到文件的检索是在 rag/nlp/search.py 中的 search() 方法中完成。

RAGFlow 的检索目前实现的是混合检索,实现的是文本检索 + 向量检索,混合检索完全依赖 ElasticSearch 实现,具体的实现如下所示:

def search(self, req, idxnm, emb_mdl=None):qst = req.get("question", "")bqry, keywords = self.qryr.question(qst)bqry = add_filters(bqry)bqry.boost = 0.05# 构造 ElasticSearch 文本查询的请求s = Search()pg = int(req.get("page", 1)) - 1ps = int(req.get("size", 1000))topk = int(req.get("topk", 1024))src = req.get("fields", ["docnm_kwd", "content_ltks", "kb_id", "img_id", "title_tks", "important_kwd","image_id", "doc_id", "q_512_vec", "q_768_vec", "position_int","q_1024_vec", "q_1536_vec", "available_int", "content_with_weight"])s = s.query(bqry)[pg * ps:(pg + 1) * ps]s = s.highlight("content_ltks")s = s.highlight("title_ltks")if not qst:if not req.get("sort"):s = s.sort({"create_time": {"order": "desc", "unmapped_type": "date"}},{"create_timestamp_flt": {"order": "desc", "unmapped_type": "float"}})else:s = s.sort({"page_num_int": {"order": "asc", "unmapped_type": "float","mode": "avg", "numeric_type": "double"}},{"top_int": {"order": "asc", "unmapped_type": "float","mode": "avg", "numeric_type": "double"}},{"create_time": {"order": "desc", "unmapped_type": "date"}},{"create_timestamp_flt": {"order": "desc", "unmapped_type": "float"}})if qst:s = s.highlight_options(fragment_size=120,number_of_fragments=5,boundary_scanner_locale="zh-CN",boundary_scanner="SENTENCE",boundary_chars=",./;:\\!(),。?:!……()——、")s = s.to_dict()# 补充向量查询的信息q_vec = []if req.get("vector"):assert emb_mdl, "No embedding model selected"s["knn"] = self._vector(qst, emb_mdl, req.get("similarity", 0.1), topk)s["knn"]["filter"] = bqry.to_dict()if "highlight" in s:del s["highlight"]q_vec = s["knn"]["query_vector"]# 将构造的完整查询提交给 ElasticSearch 进行查询res = self.es.search(deepcopy(s), idxnm=idxnm, timeout="600s", src=src)kwds = set([])for k in keywords:kwds.add(k)for kk in rag_tokenizer.fine_grained_tokenize(k).split(" "):if len(kk) < 2:continueif kk in kwds:continuekwds.add(kk)aggs = self.getAggregation(res, "docnm_kwd")return self.SearchResult(total=self.es.getTotal(res),ids=self.es.getDocIds(res),query_vector=q_vec,aggregation=aggs,highlight=self.getHighlight(res),field=self.getFields(res, src),keywords=list(kwds))

可以看到 RAGFlow 将混合检索需求转换为复杂的查询条件,利用 elasticsearch-dsl 进行复杂查询的构造,之后直接提交给 ElasticSearch 即可。

检索结果的重排

文件的重排是在 rag/nlp/search.py 中的 rerank() 中完成的,重排是基于文本匹配得分 + 向量匹配得分混合进行排序,默认文本匹配的权重为 0.3, 向量匹配的权重为 0.7,对应的实现如下所示:

# tkweight 为文本匹配权重,vtweight 为向量匹配权重def rerank(self, sres, query, tkweight=0.3,vtweight=0.7, cfield="content_ltks"):# 获取文本关键词_, keywords = self.qryr.question(query)# 获取文本向量ins_embd = [Dealer.trans2floats(sres.field[i].get("q_%d_vec" % len(sres.query_vector), "\t".join(["0"] * len(sres.query_vector)))) for i in sres.ids]if not ins_embd:return [], [], []for i in sres.ids:if isinstance(sres.field[i].get("important_kwd", []), str):sres.field[i]["important_kwd"] = [sres.field[i]["important_kwd"]]ins_tw = []for i in sres.ids:content_ltks = sres.field[i][cfield].split(" ")title_tks = [t for t in sres.field[i].get("title_tks", "").split(" ") if t]important_kwd = sres.field[i].get("important_kwd", [])tks = content_ltks + title_tks + important_kwdins_tw.append(tks)# 获取整体相似分,文本相似分,向量相似分sim, tksim, vtsim = self.qryr.hybrid_similarity(sres.query_vector,ins_embd,keywords,ins_tw, tkweight, vtweight)return sim, tksim, vtsim

获取混合相似分之后,基于混合的相似分进行过滤和重排,默认混合得分低于 0.2 的会被过滤掉

大模型的处理

在进行上面的检索和重排阶段中,只是进行了必要的过滤,没有限制匹配文档的数量。

实际内容可能会超过大模型的输入 token 数量,因此在调用大模型前会调用 api/db/services/dialog_service.py 文件中 message_fit_in() 根据大模型可用的 token 数量进行过滤。这部分与有道的 QAnything 的实现大同小异,就不额外展开了。

将检索的内容,历史聊天记录以及问题构造为 prompt,即可作为大模型的输入了,默认的英文 prompt 如下所示:

"""
You are an intelligent assistant. Please summarize the content of the knowledge base to answer the question. Please list the data in the knowledge base and answer in detail. When all knowledge base content is irrelevant to the question, your answer must include the sentence "The answer you are looking for is not found in the knowledge base!" Answers need to consider chat history.Here is the knowledge base:{knowledge}The above is the knowledge base.
"""

对应的中文 prompt 如下所示:

"""
你是一个智能助手,请总结知识库的内容来回答问题,请列举知识库中的数据详细回答。当所有知识库内容都与问题无关时,你的回答必须包括“知识库中未找到您要的答案!”这句话。回答需要考虑聊天历史。以下是知识库:{knowledge}以上是知识库
"""

总结

通过上面的介绍,可以对开源的 RagFlow 有了一个大致的了解,与前面的 有道 QAnything 整体流程还是比较类似的。同样支持混合检索,文本检索的方案都是基于 ElasticSearch 实现,在检索后都实现了 Rerank 流程,并在进入大模型之前基于可用最大 token 进行动态过滤。

不过对比来看,来自互联网大厂有道的 QAnything 的代码质量高很多,实现功能封装与命名都相当简洁易懂,工程的鲁棒性也明显高出一大截。如果想要通过源码了解 RAG 服务推荐优先阅读 QAnything。

当然 RagFlow 也有一些独到之处,对于文件的细粒度处理带来更高质量的参考信息,从而更好的提升参考信息,如果对这部分感兴趣可以深入了解下 RagFlow 的相关代码实现细节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/18168.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

1688 API接口介绍:开启您的电商新篇章

什么是1688 API接口&#xff1f; API&#xff08;Application Programming Interface&#xff0c;应用程序编程接口&#xff09;是一组协议和工具&#xff0c;用于定义不同的软件应用程序如何彼此交互。1688 API接口则是1688平台为商家提供的一套接口规范&#xff0c;允许商家…

使用FastAPI同时搭建WebSocket服务端和HTTP服务端

使用FastAPI搭建WebSocket服务端和HTTP服务端 1. WebSocket协议 WebSocket协议是一种在单个TCP连接上进行全双工通信的网络协议。它提供了双向通信的能力&#xff0c;允许服务器和客户端之间进行实时数据传输。与HTTP不同&#xff0c;WebSocket在连接建立后保持打开状态&…

网络工程师---第四十三天

1、网络地址转换请简述DNS服务器迭代查询与递归的区别&#xff1f; 2、请从技术方面简述RAIDO、RAID1、RAID3、 RAID5的特点&#xff1f; 3、请从层次结构、部署设备和功能配置方面描述层次化的网络结构&#xff1f; 4、请简述IPSECVPN和AH和ESP的区别&#xff1f; 5、请简述ID…

[Android]Mac电脑ADB使用

在Android开发中&#xff0c;ADB&#xff08;Android Debug Bridge&#xff09;是一个非常重要的工具&#xff0c;它提供了开发者与Android设备之间进行通信的多种方式。安装ADB对于任何进行Android开发的人来说都是必不可少的&#xff0c;尤其是在Mac电脑上进行开发时。 1. 安…

Java网络编程之TCP协议核心机制(三)

题外话 最近学习内容很多嗷 正题 延时应答机制 当客户端发送数据到服务器时,服务器不会立即返回ACK,而是等待一会再返回ACK 这段等待时间应用程序可能会消化掉接收缓冲区中的数据,当服务器返回ACK时,就会携带此时接收缓冲区大小的信息 当客户端下次再发送数据的时候就可以…

SQL 语言:数据操作

文章目录 SELECT 基本结构简单查询连接查询子查询聚集函数和更名操作分组查询字符串操作集合操作UNION 运算INTERSECT 运算EXCEPT 运算 视图查询和更新WITH 子句其他语句总结 SQL 的数据操作包括 SELECT(查询)、INSERT(插入)、DELETE(删除)和 UPDATE(修改)四条语句。 SELECT 基…

深度学习——自适应图片大小选择线宽和字体大小

不知道大家在可视化检测结果时是否会有设置的线宽太小在图片上画出来的框看不清楚&#xff0c;写的字符串看不清楚的烦恼。我发现不论是什么大小的图片&#xff0c;使用yolov5可视化线宽和字符串都能很清晰的显示出来&#xff0c;我根据yolov5上面的可视化代码&#xff0c;改写…

钡铼PLC集成BL121PO协议网关优化电子制造产线的生产效率

PLC转OPC UA协议转换网关BL121PO在电子制造产线中的优化应用&#xff0c;可以显著提高生产效率&#xff0c;促进生产线的智能化和信息化发展。本文将从以下几个方面进行阐述&#xff1a; 提高设备间通信效率&#xff1a;PLC转OPC UA协议转换网关BL121PO通过高效的协议转换&…

Flutter 中的 PositionedTransition 小部件:全面指南

Flutter 中的 PositionedTransition 小部件&#xff1a;全面指南 在 Flutter 中&#xff0c;PositionedTransition 是一个动画 widget&#xff0c;它允许你创建一个动画&#xff0c;使得一个子 widget 从屏幕的一个位置平滑过渡到另一个位置。这在实现各种动态布局变化时非常有…

SpringBoot基础篇

1&#xff1a;parent 目的&#xff1a;减少依赖配置 开发SpringBoot程序要继承spring-boot-starter-parentspring-boot-starter-parent中定义了若干个依赖管理继承parent模块可以避免多个依赖使用相同技术出现依赖版本冲突继承parent的形式也可以采用引入依赖的i形式实现效果…

就说说开一家公司的流程和成本

本人在进互联网公司和外企前&#xff0c;也和一位老板合作做&#xff0c;在一家小微公司里做过技术负责人&#xff0c;所以也了解开办一家公司的流程以及公司运作的成本。 通过本文大家其实能看到创业的难度。具体来讲&#xff0c;开办并维持着一家公司&#xff0c;其实需要操…

使用python写一个程序,持续监控某个windows进程的活动信息,例如占用cpu比例、占用内存等

使用python写一个程序&#xff0c;持续监控某个windows进程的活动信息&#xff0c;例如占用cpu比例、占用内存等 要持续监控某个Windows进程的活动信息&#xff0c;如CPU和内存占用&#xff0c;你可以使用psutil库。如果你还没有安装这个库&#xff0c;你可以使用pip进行安装&…

Python开发 —— 变量、全局变量函数的参数传递

1. Python变量 1.1 变量的定义和使用 在Python中&#xff0c;变量不需要显式声明。通过赋值语句&#xff0c;变量会自动创建。例如&#xff1a; x 10 y "Hello, World!"在这段代码中&#xff0c;x 被赋值为整数10&#xff0c;而 y 被赋值为字符串 "Hello, W…

SpringCloudOpenFeign的详解

1. SpringCloud OpenFeign的特性 1. 概念 Feign是一个声明式web Rest服务客户端。它使编写web服务客户端更容易要使用Feign&#xff0c;请创建一个接口并对其使用注解进行标注它具有可插入注释支持&#xff0c;包括Feign注释和JAX-RS注释Feign还支持可插拔编码器和解码器Spri…

Day25

Day25 网络编程概念 计算机网络 网络编程&#xff1a;TCP协议的三次握手四次挥手 IP地址&#xff0c;端口号&#xff1a;取值范围&#xff1a;065535&#xff0c;保留端口号&#xff1a;01024。 网络协议&#xff1a;TCP协议&#xff08;类比于打电话&#xff0c;双方需要连接…

项目构建工具maven

一、概述 1、maven是apache的一个开源项目&#xff0c;是一个优秀的项目构建/管理工具 2、apache(软件基金会、非盈利组织、管理维护一些开源项目) 二、功能 1、管理项目中jar包和jar包与jar包之间的依赖 2、完成项目编译、测试、打包 三、核心文件 pom.xml:在里面配置相…

信息学奥赛初赛天天练-14-阅读程序-字符数组、唯一分解定理应用

更多资源请关注纽扣编程微信公众号 1 2019 CSP-J 阅读程序1 (程序输入不超过数组或字符串定义的范围&#xff1b;判断题正确填√,错误填&#xff1b;除特殊说明外&#xff0c;判断题1.5分&#xff0c;选择题3分&#xff0c;共计40分) 1 输入的字符串只能由小写字母或大写字母组…

Redis的非关系型数据库

第七天课堂笔记 今日目标 非关系型数据库&#xff08;nosql【not only SQL】数据库&#xff09; Redis的非关系型数据库 Redis的安装和配置 Redis常见数据类型 Redis特性 nosql数据库 not only sql数据库&#xff0c;非关系型数据库&#xff0c;往往采用类似于json来存储数…

【大宗】第一期:大航海时代下的[集运欧线]

一、大航海时代 - 集运欧线前世今生 01 航运合约指数的诞生 ‍‍‍‍ 2023年8月18日&#xff0c;上海期货交易所的伙伴们搞了个大新闻——他们推出了一种新的期货品种&#xff0c;叫做“欧线集运”。这可不是什么普通的期货&#xff0c;它是基于一个叫做SCFIS的指数&#xf…

上海汇正财经官网怎么样?客户好评如潮,口碑赞誉之声不绝于耳

在财经服务领域&#xff0c;客户评价是衡量一家企业信誉和服务质量的重要标准。上海汇正财经作为业内知名的财经服务平台&#xff0c;以其优质的服务赢得了广大客户的认可和好评。大量正面用户评价和成功服务的案例&#xff0c;充分证明了上海汇正财经是一个值得信赖的正规企业…