Indexing
- 索引
LangChain Indexing API将数据从任何来源同步到向量存储中并保持同步,可以做到:
- 避免将重复内容写入矢量库
- 避免重写未更改的内容
- 避免在未更改的内容上重新计算嵌入
最重要的是,Indexing API 甚至可以处理相对于原始源文档经历了多个转换步骤(例如,通过文本分块)的文档。
How it works
LangChain Indexing利用记录管理器(RecordManager) 来跟踪文档写入向量存储的情况。
索引内容时,会计算每个文档的哈希值,并将以下信息存储在记录管理器中:
- the document hash (hash of both page content and metadata)
- write time
- the source id——每个文档应在其元数据中包含信息,以便我们确定该文档的最终来源
Deletion modes
将文档索引到向量存储时,向量存储中的一些现有文档可能会被删除。
在某些情况下,可能希望删除与正在编制索引的新文档源自相同来源的任何现有文档。
在其他情况下,您可能希望批量删除所有现有文档。索引 API 删除模式可让您选择所需的行为:
Cleanup Mode | De-Duplicates Content | Parallelizable | Cleans Up Deleted Source Docs | Cleans Up Mutations of Source Docs and/or Derived Docs | Clean Up Timing |
---|---|---|---|---|---|
None | √ | √ | × | × | |
Incremental | √ | √ | × | √ | 不断地 |
Full | √ | × | √ | √ | 索引结束时 |
- De-Duplicates Content:删除重复内容
- Parallelizable:可并行性
- Cleans Up Deleted Source Docs:清理已删除的源文档
- Cleans Up Mutations of Source Docs and/or Derived Docs:清理源文档和/或派生文档的突变
- Clean Up Timing:清理时间
None
不执行任何自动清理,允许用户手动清理旧内容。
incremental
和 full
提供以下自动清理功能:
- 如果源文档或派生文档的内容发生更改,
incremental
和full
都会清理(删除)内容的先前版本。 - 如果源文档已被删除(意味着它不包含在当前正在索引的文档中),则
full
会将其从向量存储中正确删除,但incremental
不会。
当内容发生变化(例如,源 PDF 文件被修改)时,在索引期间将有一段时间新旧版本都可能返回给用户。这种情况发生在新内容写入之后、旧版本被删除之前。
incremental
索引可以最大限度地减少这段时间,因为它能够在写入时连续进行清理。full
模式在所有批次写入后进行清理。
Requirements
- 不要与已预先填充了独立于索引 API 的内容的存储一起使用,因为记录管理器不会知道之前已插入记录。
- 仅适用于支持以下功能的 LangChain 矢量存储:
- 按 id 添加文档(带有
ids
参数的add_documents
方法) - 按 id 删除(带有
ids
参数的删除方法)
- 按 id 添加文档(带有
兼容的矢量存储:AnalyticDB
, AstraDB
, AwaDB
, Bagel
, Cassandra
, Chroma
, CouchbaseVectorStore
, DashVector
, DatabricksVectorSearch
, DeepLake
, Dingo
, ElasticVectorSearch
, ElasticsearchStore
, FAISS
, HanaDB
, Milvus
, MyScale
, OpenSearchVectorSearch
, PGVector
, Pinecone
, Qdrant
, Redis
, Rockset
, ScaNN
, SupabaseVectorStore
, SurrealDBStore
, TimescaleVector
, Vald
, Vearch
, VespaStore
, Weaviate
, ZepVectorStore
Caution
记录管理器依靠基于时间的机制来确定可以清理哪些内容(当使用incremental
和 full
清理模式时)。
如果两个任务连续运行,并且第一个任务在时钟时间更改之前完成,则第二个任务可能无法清理内容。
这在实际设置中不太可能成为问题,原因如下:
RecordManager
使用更高分辨率的时间戳。- 数据需要在第一个和第二个任务运行之间发生变化,如果任务之间的时间间隔很小时,这种情况就不太可能发生。
- 索引任务通常需要几毫秒以上的时间。
Quickstart
from langchain.indexes import SQLRecordManager, index
from langchain_core.documents import Document
from langchain_elasticsearch import ElasticsearchStore
from langchain_openai import OpenAIEmbeddings
初始化向量存储并设置嵌入:
collection_name = "test_index"embedding = OpenAIEmbeddings()vectorstore = ElasticsearchStore(es_url="http://localhost:9200", index_name="test_index", embedding=embedding
)
使用适当的命名空间初始化记录管理器。
建议:使用同时考虑向量存储和向量存储中的集合名称的命名空间;例如,
“redis/my_docs”、“chromadb/my_docs”或“postgres/my_docs”。
namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(namespace, db_url="sqlite:///record_manager_cache.sql"
)
在使用记录管理器之前创建架构
record_manager.create_schema()
索引一些测试文档:
doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
索引到空向量存储中:
def _clear():"""Hacky helper method to clear content. See the `full` mode section to to understand why it works."""index([], record_manager, vectorstore, cleanup="full", source_id_key="source")
None
deletion mode
该模式不会自动清理旧版本的内容;但是,它仍然负责内容重复删除。
_clear()
index([doc1, doc1, doc1, doc1, doc1],record_manager,vectorstore,cleanup=None,source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
_clear()
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
第二次将跳过所有内容:
index([doc1, doc2], record_manager, vectorstore, cleanup=None, source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
incremental
deletion mode
_clear()
index([doc1, doc2],record_manager,vectorstore,cleanup="incremental",source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
再次索引应该会导致两个文档都被跳过——同时也会跳过嵌入操作!
index([doc1, doc2],record_manager,vectorstore,cleanup="incremental",source_id_key="source",
)
{'num_added': 0, 'num_updated': 0, 'num_skipped': 2, 'num_deleted': 0}
如果我们不提供增量索引模式的文档,则什么都不会改变。
index([], record_manager, vectorstore, cleanup="incremental", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
如果我们改变一个文档,新版本将被写入,所有共享相同源的旧版本将被删除。
changed_doc_2 = Document(page_content="puppy", metadata={"source": "doggy.txt"})
index([changed_doc_2],record_manager,vectorstore,cleanup="incremental",source_id_key="source",
)
{'num_added': 1, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 1}
full
deletion mode
在full
模式下,用户应该传递将其索引到索引函数中的完整内容。
任何未传递到索引功能且存在于向量存储中的文档都将被删除!
此行为对于处理源文档的删除很有用。
_clear()
all_docs = [doc1, doc2]
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
假设有人删除了第一个文档:
del all_docs[0]
all_docs
[Document(page_content='doggy', metadata={'source': 'doggy.txt'})]
使用full
模式也会清除已删除的内容。
index(all_docs, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}
Source
元数据属性包含一个称为source
的字段。该来源应指出与给定文档相关的最终出处。
例如,如果这些文档代表某个父文档的块,则两个文档的source
应该相同并引用父文档。
一般来说,应始终指定source
。如果您从不打算使用incremental
模式,并且由于某种原因无法正确指定source
字段,则仅使用 None。
from langchain_text_splitters import CharacterTextSplitter
doc1 = Document(page_content="kitty kitty kitty kitty kitty", metadata={"source": "kitty.txt"}
)
doc2 = Document(page_content="doggy doggy the doggy", metadata={"source": "doggy.txt"})
new_docs = CharacterTextSplitter(separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2
).split_documents([doc1, doc2])
new_docs
[Document(page_content='kitty kit', metadata={'source': 'kitty.txt'}),Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),Document(page_content='doggy doggy', metadata={'source': 'doggy.txt'}),Document(page_content='the doggy', metadata={'source': 'doggy.txt'})]
_clear()
index(new_docs,record_manager,vectorstore,cleanup="incremental",source_id_key="source",
)
{'num_added': 5, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
changed_doggy_docs = [Document(page_content="woof woof", metadata={"source": "doggy.txt"}),Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),
]
这应该删除与doggy.txt
源关联的文档的旧版本,并将其替换为新版本。
index(changed_doggy_docs,record_manager,vectorstore,cleanup="incremental",source_id_key="source",
)
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 2}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'}),Document(page_content='tty kitty', metadata={'source': 'kitty.txt'}),Document(page_content='tty kitty ki', metadata={'source': 'kitty.txt'}),Document(page_content='kitty kit', metadata={'source': 'kitty.txt'})]
Using with loaders
索引可以接受文档的可迭代或任何加载器。
from langchain_community.document_loaders.base import BaseLoaderclass MyCustomLoader(BaseLoader):def lazy_load(self):text_splitter = CharacterTextSplitter(separator="t", keep_separator=True, chunk_size=12, chunk_overlap=2)docs = [Document(page_content="woof woof", metadata={"source": "doggy.txt"}),Document(page_content="woof woof woof", metadata={"source": "doggy.txt"}),]yield from text_splitter.split_documents(docs)def load(self):return list(self.lazy_load())
_clear()
loader = MyCustomLoader()
loader.load()
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]
index(loader, record_manager, vectorstore, cleanup="full", source_id_key="source")
{'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}
vectorstore.similarity_search("dog", k=30)
[Document(page_content='woof woof', metadata={'source': 'doggy.txt'}),Document(page_content='woof woof woof', metadata={'source': 'doggy.txt'})]