这里的源码主要来自于Langchain-ChatGLM中的向量库部分,做了一些代码上的修改和封装,以适用于基于问题和包含数据库表描述的txt文件(文件名为库表名,文件内容为库表中的字段及描述)对数据库表进行快速检索。
中文分词类
splitter.py
from langchain.text_splitter import CharacterTextSplitter
import re
from typing import Listclass ChineseTextSplitter(CharacterTextSplitter):def __init__(self, pdf: bool = False, sentence_size: int = 100, **kwargs):super().__init__(**kwargs)self.pdf = pdfself.sentence_size = sentence_sizedef split_text1(self, text: str) -> List[str]:if self.pdf:text = re.sub(r"\n{3,}", "\n", text)text = re.sub('\s', ' ', text)text = text.replace("\n\n", "")sent_sep_pattern = re.compile('([﹒﹔﹖﹗。!?]["’”」』]{0,2}|(?=["‘“「『]{1,2}|$))') # del :;sent_list = []for ele in sent_sep_pattern.split(text):if sent_sep_pattern.match(ele) and sent_list:sent_list[-1] += eleelif ele:sent_list.append(ele)return sent_listdef split_text(self, text: str) -> List[str]: ##此处需要进一步优化逻辑if self.pdf:text = re.sub(r"\n{3,}", r"\n", text)text = re.sub('\s', " ", text)text = re.sub("\n\n", "", text)text = re.sub(r'([;;!?。!?\?])([^”’])', r"\1\n\2", text) # 单字符断句符text = re.sub(r'(\.{6})([^"’”」』])', r"\1\n\2", text) # 英文省略号text = re.sub(r'(\…{2})([^"’”」』])', r"\1\n\2", text) # 中文省略号text = re.sub(r'([;;!?。!?\?]["’”」』]{0,2})([^;;!?,。!?\?])', r'\1\n\2', text)# 如果双引号前有终止符,那么双引号才是句子的终点,把分句符\n放到双引号后,注意前面的几句都小心保留了双引号text = text.rstrip() # 段尾如果有多余的\n就去掉它# 很多规则中会考虑分号;,但是这里我把它忽略不计,破折号、英文双引号等同样忽略,需要的再做些简单调整即可。ls = [i for i in text.split("\n") if i]for ele in ls:if len(ele) > self.sentence_size:ele1 = re.sub(r'([,,]["’”」』]{0,2})([^,,])', r'\1\n\2', ele)ele1_ls = ele1.split("\n")for ele_ele1 in ele1_ls:if len(ele_ele1) > self.sentence_size:ele_ele2 = re.sub(r'([\n]{1,}| {2,}["’”」』]{0,2})([^\s])', r'\1\n\2', ele_ele1)ele2_ls = ele_ele2.split("\n")for ele_ele2 in ele2_ls:if len(ele_ele2) > self.sentence_size:ele_ele3 = re.sub('( ["’”」』]{0,2})([^ ])', r'\1\n\2', ele_ele2)ele2_id = ele2_ls.index(ele_ele2)ele2_ls = ele2_ls[:ele2_id] + [i for i in ele_ele3.split("\n") if i] + ele2_ls[ele2_id + 1:]ele_id = ele1_ls.index(ele_ele1)ele1_ls = ele1_ls[:ele_id] + [i for i in ele2_ls if i] + ele1_ls[ele_id + 1:]id = ls.index(ele)ls = ls[:id] + [i for i in ele1_ls if i] + ls[id + 1:]return ls
faiss向量库类
myfaiss.py
from langchain.vectorstores import FAISS
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.faiss import dependable_faiss_import
from typing import Any, Callable, List, Dict
from langchain.docstore.base import Docstore
from langchain.docstore.document import Document
import numpy as np
import copy
import osclass MyFAISS(FAISS, VectorStore):def __init__(self,embedding_function: Callable,index: Any,docstore: Docstore,index_to_docstore_id: Dict[int, str],normalize_L2: bool = False,):super().__init__(embedding_function=embedding_function,index=index,docstore=docstore,index_to_docstore_id=index_to_docstore_id,normalize_L2=normalize_L2)def seperate_list(self, ls: List[int]) -> List[List[int]]:lists = []ls1 = [ls[0]]source1 = self.index_to_docstore_source(ls[0])for i in range(1, len(ls)):if ls[i - 1] + 1 == ls[i] and self.index_to_docstore_source(ls[i]) == source1:ls1.append(ls[i])else:lists.append(ls1)ls1 = [ls[i]]source1 = self.index_to_docstore_source(ls[i])lists.append(ls1)return listsdef similarity_search_with_score_by_vector(self, embedding: List[float], k: int = 4) -> List[Document]:faiss = dependable_faiss_import()# (1,1024)vector = np.array([embedding], dtype=np.float32)# 默认FALSEif self._normalize_L2:faiss.normalize_L2(vector)# shape均为(1, k)scores, indices = self.index.search(vector, k)docs = []id_set = set()# 存储关键句keysentences = []# 遍历找到的k个最近相关文档的索引# top-k是第一次的筛选条件,score是第二次的筛选条件for j, i in enumerate(indices[0]):if i in self.index_to_docstore_id:_id = self.index_to_docstore_id[i]# 执行接下来的操作else:continue# index→id→contentdoc = self.docstore.search(_id)doc.metadata["score"] = int(scores[0][j])docs.append(doc)# 其实存的都是indexid_set.add(i)docs.sort(key=lambda doc: doc.metadata['score'])return docs
嵌入检索类
embedder.py
from langchain.embeddings.huggingface import HuggingFaceEmbeddings
from langchain.document_loaders import TextLoader
from embeddings.splitter import ChineseTextSplitter
from embeddings.myfaiss import MyFAISS
import os
import torch
from config import *def torch_gc():if torch.cuda.is_available():# with torch.cuda.device(DEVICE):torch.cuda.empty_cache()torch.cuda.ipc_collect()elif torch.backends.mps.is_available():try:from torch.mps import empty_cacheempty_cache()except Exception as e:print(e)print("如果您使用的是 macOS 建议将 pytorch 版本升级至 2.0.0 或更高版本,以支持及时清理 torch 产生的内存占用。")class Embedder:def __init__(self, config):self.model = HuggingFaceEmbeddings(model_name="/home/df1500/NLP/LLM/pretrained_model/WordEmbeddings/"+config.emb_model,model_kwargs={'device': 'cuda'})self.config = configself.create_vector_score()self.vector_store = MyFAISS.load_local(self.config.db_vs_path, self.model)def load_file(self, filepath):# 对文件分词if filepath.lower().endswith(".txt"):loader = TextLoader(filepath, autodetect_encoding=True)textsplitter = ChineseTextSplitter(pdf=False, sentence_size=self.config.sentence_size)docs = loader.load_and_split(textsplitter)else:raise Exception("{}文件不是txt格式".format(filepath))return docsdef txt2vector_store(self, filepaths):# 批量建立知识库docs = []for filepath in filepaths:try:docs += self.load_file(filepath)except Exception as e:raise Exception("{}文件加载失败".format(filepath))print("文件加载完毕,正在生成向量库")vector_store = MyFAISS.from_documents(docs, self.model)torch_gc()vector_store.save_local(self.config.db_vs_path)def create_vector_score(self):if "index.faiss" not in os.listdir(self.config.db_vs_path):filepaths = os.listdir(self.config.db_doc_path)filepaths = [os.path.join(self.config.db_doc_path, filepath) for filepath in filepaths]self.txt2vector_store(filepaths)print("向量库已建立成功")def get_topk_db(self, query):related_dbs_with_score = self.vector_store.similarity_search_with_score(query, k=self.config.sim_k)topk_db = [{'匹配句': db_data.page_content, '数据库': os.path.basename(db_data.metadata['source'])[:-4], '得分': db_data.metadata['score']} for db_data in related_dbs_with_score]return topk_db
测试代码
Config是用来传参的类,这里略去定义
if __name__ == '__main__':Conf = Config()configs = Conf.get_config()embedder = Embedder(configs)query = "公司哪个月的出勤率是最高的?"topk_db = embedder.get_topk_db(query)print(topk_db)