Dify中的经济索引模式实现过程

当索引模式为经济时,使用离线的向量引擎、关键词索引等方式,降低了准确度但无需花费 Token。

一.提取函数**_extract**

根据不同文档类型进行内容的提取:

def _extract(self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict) \-> list[Document]:  # 提取# load fileif dataset_document.data_source_type not in ["upload_file", "notion_import"]:  # 数据源类型return []data_source_info = dataset_document.data_source_info_dict  # 数据源信息text_docs = []  # 文本文档if dataset_document.data_source_type == 'upload_file':if not data_source_info or 'upload_file_id' not in data_source_info:raise ValueError("no upload file found")file_detail = db.session.query(UploadFile). \filter(UploadFile.id == data_source_info['upload_file_id']). \one_or_none()if file_detail:extract_setting = ExtractSetting(datasource_type="upload_file",upload_file=file_detail,document_model=dataset_document.doc_form)text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])elif dataset_document.data_source_type == 'notion_import':if (not data_source_info or 'notion_workspace_id' not in data_source_infoor 'notion_page_id' not in data_source_info):raise ValueError("no notion import info found")extract_setting = ExtractSetting(datasource_type="notion_import",notion_info={"notion_workspace_id": data_source_info['notion_workspace_id'],"notion_obj_id": data_source_info['notion_page_id'],"notion_page_type": data_source_info['type'],"document": dataset_document,"tenant_id": dataset_document.tenant_id},document_model=dataset_document.doc_form)text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])# update document status to splittingself._update_document_index_status(document_id=dataset_document.id,after_indexing_status="splitting",extra_update_params={DatasetDocument.word_count: sum([len(text_doc.page_content) for text_doc in text_docs]),DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)})  # 更新文档状态为拆分# replace doc id to document model idtext_docs = cast(list[Document], text_docs)for text_doc in text_docs:text_doc.metadata['document_id'] = dataset_document.idtext_doc.metadata['dataset_id'] = dataset_document.dataset_idreturn text_docs

直接调用的是core.indexing_runner.IndexingRunner._extract()方法:

得到上传文件的详细信息:

接下来调用提取内容函数extract()方法:

根据不同的IndexType类型返回不同的索引处理器:

因为这里index_processor类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor,实际调用的是ParagraphIndexProcessor类中的extract函数:

ExtractProcessor.extract()类方法根据文件类型进行解析:

因为文件类型为txt,所以执行TextExtractor()

TextExtractor()实际执行的位置为dify\api\core\rag\extractor\text_extractor.py中的extract()方法:

最终得到text_docs内容:

二.转换函数_transform

def _transform(self, index_processor: BaseIndexProcessor, dataset: Dataset,text_docs: list[Document], doc_language: str, process_rule: dict) -> list[Document]:  # 转换# get embedding model instanceembedding_model_instance = Noneif dataset.indexing_technique == 'high_quality':if dataset.embedding_model_provider:embedding_model_instance = self.model_manager.get_model_instance(tenant_id=dataset.tenant_id,provider=dataset.embedding_model_provider,model_type=ModelType.TEXT_EMBEDDING,model=dataset.embedding_model)  # 获取嵌入模型实例else:embedding_model_instance = self.model_manager.get_default_model_instance(tenant_id=dataset.tenant_id,model_type=ModelType.TEXT_EMBEDDING,)  # 获取默认嵌入模型实例documents = index_processor.transform(text_docs, embedding_model_instance=embedding_model_instance,process_rule=process_rule, tenant_id=dataset.tenant_id,doc_language=doc_language)  # 转换文档return documents

实际调用的是core.indexing_runner.IndexingRunner._transform

因为设置dataset.indexing_technique'economy'

因为这里index_processor类型为core.rag.index_processor.processor.paragraph_index_processor.ParagraphIndexProcessor,实际调用的是ParagraphIndexProcessor类中的transform函数:

这里splitter类型为core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter

core.splitter.fixed_text_splitter.FixedRecursiveCharacterTextSplitter类:

调用文档清理CleanProcessor.clean()类方法:

实际调用的是dify\api\core\rag\cleaner\clean_processor.py中的CleanProcessor.clean()类方法:

clean方法的处理过程如下:

(1)默认清理:首先,方法会执行一些默认的清理操作,包括删除无效的符号。这些操作主要是通过正则表达式来实现的,例如,替换<\|<,替换\|>>,以及删除一些特定的ASCII和Unicode字符。

(2)规则应用:接下来方法会根据传入的process_rule字典中定义的规则来进一步清理文本。process_rule字典包含了一系列的清理规则,这些规则在rules键下的pre_processing_rules列表中定义。

(3)删除额外的空格:如果启用了remove_extra_spaces规则,方法会删除文本中的额外空格。这包括将三个或更多连续的换行符替换为两个换行符,以及将两个或更多连续的空格(包括特定的Unicode空格字符)替换为单个空格。

(4)删除URL和电子邮件地址:如果启用了remove_urls_emails规则,方法会从文本中删除URL和电子邮件地址。这是通过匹配特定的正则表达式模式来实现的,分别用于识别和删除电子邮件地址和URL。

通过splitter.split_documents([document])分割文档为文档节点:

实际调用的是dify\api\core\splitter\fixed_text_splitter.py中的FixedRecursiveCharacterTextSplitter类的split_text()方法。split_text方法的目的是将传入的文本分割成多个块,并返回这些块组成的列表。这个方法的处理过程可以分为以下几个步骤:

(1)检查固定分隔符:首先,方法检查是否存在一个固定的分隔符(_fixed_separator)。如果存在,它将使用这个分隔符来直接分割文本。这意味着文本将在每个出现固定分隔符的地方被分割。

(2)分割文本:使用固定分隔符分割文本后,会得到一个初步的块列表。然后,对这些初步的块进行进一步的处理,以确保每个块的长度不超过设定的大小(_chunk_size)。

(3)递归分割:对于每个初步的块,如果其长度超过了设定的大小,将使用recursive_split_text方法递归地进行进一步分割。这个递归过程会继续,直到所有的块都不超过设定的大小。

(4)返回最终块列表:最后,将所有处理后符合长度要求的块组成一个列表返回。

通过固定分隔符'\n'分割:

document_nodes = splitter.split_documents([document])实际返回文档节点数量为12:

dify\api\core\rag\index_processor\processor\paragraph_index_processor.py中,transform()方法主要是对文档进行预处理和分割,以便后续的索引或其它处理步骤可以更有效地处理文档的各个部分。主要执行步骤如下:

(1)获取分割器:根据传入的处理规则(process_rule)和嵌入模型实例(embedding_model_instance),获取文档分割器(splitter)。

(2)遍历文档:对于每个传入的文档(documents列表中的每个Document对象),执行以下子步骤:

  • 文档清理:使用CleanProcessor.clean方法清理文档的内容(document.page_content),移除不需要的字符或格式。

  • 文档分割:使用步骤1中获取的分割器(splitter)将清理后的文档内容分割成多个节点(document_nodes),每个节点代表文档的一部分。

  • 节点处理:对于每个分割后的节点,如果节点内容非空,则生成一个唯一的文档ID(doc_id)和文本哈希值(hash),并将这些信息添加到节点的元数据中。如果节点内容以特定字符(如.)开头,则移除这些字符。

  • 收集文档节点:将处理后的节点添加到一个列表中(split_documents),以便进一步处理。

(3)返回结果:将所有处理后的文档节点合并到一个列表中(all_documents),并返回该列表作为transform方法的结果。

三.保存函数_load_segments

接下调用self._load_segments(dataset, dataset_document, documents)方法:

实际调用的是_load_segments(self, dataset, dataset_document, documents)方法:

_load_segments 方法的主要目的是将处理后的文档段(documents)保存到数据库中,并更新相关文档的状态。这个过程可以分为以下几个步骤:

(1)初始化数据集文档存储:创建一个 DatasetDocumentStore 实例,这个实例与特定的数据集、创建者和文档ID相关联。

(2)添加文档段:使用 DatasetDocumentStore.add_documents(documents) 方法将处理后的文档段添加到数据库中。这里的 documents 是一个包含多个文档段的列表,每个文档段都是一个 Document 实例。具体doc_store.add_documents()self.get_document_segment()方法就不逐行调试,感兴趣可自行调试。

(3)更新文档状态为索引中:在所有文档段都保存到数据库之后,更新原始文档的状态为“索引中”(indexing)。这是通过调用 _update_document_index_status 方法实现的,该方法还会更新文档的清理完成时间和拆分完成时间。self._update_document_index_status()方法就不逐行调试,感兴趣可自行调试。

(4)更新段状态为索引中:最后,更新所有相关文档段的状态为"索引中"。这是通过调用 _update_segments_by_document 方法实现的,该方法会更新所有相关文档段的状态和索引时间。self._update_segments_by_document()方法就不逐行调试,感兴趣可自行调试。

这个过程确保了文档段的正确保存和状态更新,为后续的索引和检索操作做好准备。

四.加载函数_load

接下来调用self._load()方法:

实际调用的是def _load(self, index_processor: BaseIndexProcessor, dataset: Dataset, dataset_document: DatasetDocument, documents: list[Document]) -> None:_load 方法的主要目的是将处理后的文档数据加载到索引中,并更新相关文档和文档段的状态为完成,以支持后续的搜索和检索操作。这个过程可以分为以下几个步骤:

(1)检查索引技术:根据数据集的索引技术(例如,economy),如果是high_quality,那么需要获取嵌入模型实例来进行高质量的索引处理。

(2)分块处理:如果使用嵌入模型实例,将文档数据分块处理。这通常涉及到并发执行,以提高处理效率。

(3)创建关键字索引:在一个独立的线程中,对文档数据进行关键字索引的创建,以便于后续的搜索和检索。

(4)更新文档状态:在所有文档数据被成功加载到索引后,更新相关文档的状态为completed,表示索引过程已完成。

(5)异常处理:在处理过程中,如果遇到任何异常(如文档暂停、提供者令牌未初始化错误等),将会更新文档的索引状态为error,并记录错误信息。

重点解释的是创建关键字线程这部分:

# create keyword index  # 创建关键字索引
create_keyword_thread = threading.Thread(target=self._process_keyword_index,args=(current_app._get_current_object(),dataset.id, dataset_document.id, documents))
create_keyword_thread.start() # 启动线程
create_keyword_thread.join()  # 等待线程结束

create_keyword_thread是一个 Thread 对象,之前通过 threading.Thread 创建并启动。它代表了一个独立的线程,用于执行某些任务,这里假设是处理关键字索引的任务。

.join(): 这是 Thread 类的一个方法。调用这个方法会使得调用它的线程(如主线程)等待,直到 create_keyword_thread 线程完成执行。如果 create_keyword_thread 已经完成,join() 会立即返回。

create_keyword_thread.join() 的作用是阻塞调用它的线程(通常是主线程),直到 create_keyword_thread 线程执行完成。这样做的目的是确保 create_keyword_thread 线程中的任务完全执行完毕后,主线程才继续执行后面的代码。

这种做法在需要确保某个线程中的任务完全完成后才进行下一步操作时非常有用,比如在处理完所有数据后才关闭数据库连接,或者在继续执行依赖于线程任务结果的代码之前确保线程任务已完成。

这里面执行的任务是target=self._process_keyword_index。这个方法主要用于在后台线程中处理关键字索引的创建和相关文档段状态的更新,确保这些操作在正确的应用上下文中执行:

def _process_keyword_index(self, flask_app, dataset_id, document_id, documents):  # 处理关键字索引with flask_app.app_context():dataset = Dataset.query.filter_by(id=dataset_id).first()if not dataset:raise ValueError("no dataset found")keyword = Keyword(dataset)keyword.create(documents)if dataset.indexing_technique != 'high_quality':document_ids = [document.metadata['doc_id'] for document in documents]db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id,DocumentSegment.index_node_id.in_(document_ids),DocumentSegment.status == "indexing").update({DocumentSegment.status: "completed",DocumentSegment.enabled: True,DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)})db.session.commit()

_process_keyword_index 方法的目的是在 Flask 应用的上下文中处理关键字索引。这个方法接收四个参数:flask_app(Flask 应用实例),dataset_id(数据集的 ID),document_id(文档的 ID),以及documents(文档对象列表)。方法的执行流程如下:

(1)使用 Flask 应用的上下文:这是必要的步骤,因为在 Flask 应用之外的线程中执行数据库操作或者访问 Flask 应用的配置时,需要手动创建应用上下文。

(2)查询数据集:通过 dataset_id 从数据库中查询对应的数据集对象。如果没有找到对应的数据集,抛出一个值错误异常。

(3)创建关键字索引:使用查询到的数据集对象初始化 Keyword 类的实例,并调用其 create 方法,传入文档对象列表来创建关键字索引。

重点分析keyword.create(documents)的过程,由于这个过程比较长,用另外一篇文档进行详细分析,具体参考文献 [1]。

(4)更新文档段状态:如果数据集的索引技术不是 'high_quality',则获取所有文档对象中的 doc_id,并更新数据库中对应 document_idindex_node_id 的文档段对象的状态为 "completed",同时设置其为启用状态,并记录完成时间。最后,提交数据库会话以保存更改。

参考文献

[1] Dify中Jieba类的create()方法执行过程:https://z0yrmerhgi8.feishu.cn/wiki/RKIewMrY2iaC1wks20FcMqhcnae

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/46282.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FastAPI 学习之路(四十三)路径操作的高级配置

在实际开发中&#xff0c;可能我们有些接口不能在接口文档中与其他业务接口一样开放给前端或者其他对接人&#xff0c;那么我们肯定会想着在接口文档中对其进行屏蔽隐藏操作&#xff0c;那么可以实现吗&#xff1f; 接口文档中隐藏接口 当然&#xff0c;还很简单&#xff0c;…

【CSS in Depth 2 精译】2.6 CSS 自定义属性(即 CSS 变量)+ 2.7 本章小结

文章目录 2.6 自定义属性&#xff08;即 CSS 变量&#xff09;2.6.1 动态变更自定义属性 2.7 本章小结 当前内容所在位置 第一章 层叠、优先级与继承第二章 相对单位 2.1 相对单位的威力2.2 em 与 rem2.3 告别像素思维2.4 视口的相对单位2.5 无单位的数值与行高2.6 自定义属性 …

PGCCC|【PostgreSQL】PCA+PCP+PCM等IT类认证申报个税退税指南

小编特将PostgreSQL证书申报个税退税流程&#xff0c;编辑成文&#xff0c;供大家申报参考哦~ 1.申报专项附加扣除 第一步&#xff1a;打开个人所得税APP&#xff0c;选择“专项附加扣除填报”&#xff1a; 第二步&#xff1a;“扣除年度”选择您要申报的年度&#xff0c;并…

Windows 默认以管理员运行打开CMD

winr 默认以管理员打开运行CMD 需求&#xff1a;在运行页面输入cmd 希望是可以直接通过管理员方式打开的。 winr 打开运行 输入secpol.msc 打开本地安全策略&#xff08;注意家庭版是没有这个的&#xff09; 找到本地策略–安全选项–用户帐户控制: 以管理员批准模式运行所有管…

基于Python thinker GUI界面的股票评论数据及投资者情绪分析设计与实现

1.绪论 1.1背景介绍 Python 的 Tkinter 库提供了创建用户界面的工具&#xff0c;可以用来构建股票评论数据及投资者情绪分析的图形用户界面&#xff08;GUI&#xff09;。通过该界面&#xff0c;用户可以输入股票评论数据&#xff0c;然后通过情感分析等技术对评论进行情绪分析…

JavaScript 中 await 永远不会 resolve 的 Promise 会导致内存泄露吗?

前言 在 JavaScript 中&#xff0c;await 关键字用于等待一个 Promise 完成&#xff0c;它只能在异步函数&#xff08;async function&#xff09;内部使用。当 await 一个永远不会 resolve 的 Promise 时&#xff0c;它确实会阻塞异步函数的进一步执行&#xff0c;但不会直接…

C1W1.Assignment: Logistic Regression

理论课&#xff1a;C1W1.Sentiment Analysis with Logistic Regression 文章目录 前期准备导入包导入数据处理推文文本 Part 1: Logistic regressionPart 1.1: Sigmoid实现 sigmoid 函数Logistic regression: regression and a sigmoid Part 1.2 Cost function and GradientUp…

Python 使用proto 发送socket数据

import socket import binascii import struct from SensingMonitoring_pb2 import Command, CommandNamesif __name__ "__main__":client socket.socket(socket.AF_INET, socket.SOCK_STREAM)client.connect(("192.168.1.100", 22295))# 发送数据comman…

软件模块的初始化

什么是初始化&#xff1f; 软件的初始化&#xff08;Initialization&#xff09;是指软件启动或重新配置时执行的一系列步骤和过程&#xff0c;旨在准备软件运行环境、加载必要的配置信息、检查系统依赖项、分配资源&#xff08;如内存、文件句柄等&#xff09;&#xff0c;以及…

5、Hacker_Kid-v1.0.1

中等难度 目标root权限 先进行一波IP地址发现 netdiscover -i eth0 -r 192.168.1.1/24 发现存在的靶机ip 进行一波端口的探测 发现是一个apache的服务和一个tornado的网站 这里有个细节部分&#xff0c;53端口常见的情况都是走的udp协议做的域名解析&#xff0c;这里查询出来…

【ARM】CCI缓存一致性整理

目录 1.CCI500提供的功能 2.CCI500在SOC系统中所处的位置​编辑 3.CCI500内部结构​编辑 4.功能描述 1.CCI500提供的功能 2.CCI500在SOC系统中所处的位置 3.CCI500内部结构 Transaction Tracker&#xff08;TT&#xff09;是用来解决一致性和ordering问题的&#xff0c;它…

完成SSH连接与端口映射并运行hello_world.py以及创建conda环境

完成SSH连接与端口映射并运行hello_world.py 第一步&#xff1a; 使用vscode打开连接开发机如图 第二步&#xff1a; 端口映射 ssh -p 37367 rootssh.intern-ai.org.cn -CNg -L {本地机器_PORT}:127.0.0.1:{开发机_PORT} -o StrictHostKeyCheckingno 如&#xff1a; ssh -…

优化静止不动的GPS点(JS版)

1.理论依据: 连续的GPS点中&#xff0c;静止不动的一段或者多段这样的点序列。把这些点序列处理成一个点&#xff0c;也就是拿这些序列的第一个点即可。理论依据如下&#xff1a;从第二个点开始&#xff0c;每个点都和第一个点进行距离计算和比较。至少比较N个点。当百分之M的…

Win11任务栏当中对 STM32CubeMX 的堆叠问题

当打开多个 CubeMX 程序的时候&#xff0c;Win11 自动将其进行了堆叠&#xff0c;这时候就无法进行预览与打开。 问题分析&#xff1a;大部分ST的工具都是基于 JDK 来进行开发的&#xff0c;Win11 将其识别成了同一个 Binary 但是实际上他们并不是同一个&#xff0c;通过配置…

redis源码分析之底层数据结构(一)-动态字符串sds

1.绪论 我们知道redis是由c语言实现的&#xff0c;c语言中是自带字符串的&#xff0c;但是为什么redis还要再实现自己的动态字符串呢&#xff0c;这种动态字符串的底层数据结构是怎样的呢?接下来我们带着这些问题来看一看redis中的动态字符串sds。 2.sds的组成 struct __at…

pico+unity3d项目配置

重点&#xff1a;unity编辑器版本要和pico的sdk要求一致、比如&#xff1a; 对于 Unity 2022.1.14 及以上版本&#xff0c;若同时在项目中使用 URP、Linear 色彩空间、四倍抗锯齿和OpenGL&#xff0c;会出现崩溃。该问题待 Unity 引擎解决。对于 Unity 2022&#xff0c;若同时…

多个版本JAVA切换(学习笔记)

多个版本JAVA切换 很多时候&#xff0c;我们电脑上会安装多个版本的java版本&#xff0c;java8&#xff0c;java11&#xff0c;java17等等&#xff0c;这时候如果想要切换java的版本&#xff0c;可以按照以下方式进行 1.检查当前版本的JAVA 同时按下 win r 可以调出运行工具…

Pytorch张量

在conda的环境中安装Jupyter及其他软件包 Pytorch 建立在张量&#xff08;tensor&#xff09;之上&#xff0c;Pytorch张量是一个 n 维数组&#xff0c;类似于 NumPy 数组。专门针对GPU设计&#xff0c;可以运行在GPU上以加快计算效率。换句话说&#xff0c;Pytorch张量是可以运…

解决QT creator中文乱码问题

1.首先设置文本编辑器为UTF-8 先在工具-选项-文本编辑器-behavior部分选择文件编码为UTF-8&#xff0c;紧接着是选择“如果编码是UTF-8则添加”&#xff0c;如下图 2.设置ext code for tools 为system 具体解决办法是 工具-选项-环境-interfaces这一栏有一个“Text code for to…