RAG & web
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>ChatGPT 应用页面</title><style>body {font-family: Arial, sans-serif;margin: 20px;display: flex;justify-content: space-between;}.chat-container {width: 48%;}#chat-box-dev, #chat-box-test {width: 100%;height: 400px;border: 1px solid #ccc;padding: 10px;overflow-y: auto;white-space: pre-wrap; /* 保留换行和空格 */background-color: #f9f9f9;}#user-input-dev, #user-input-test {width: calc(100% - 20px);padding: 10px;margin-top: 10px;}#send-button-dev, #send-button-test {padding: 10px 20px;margin-top: 10px;}.message {margin: 5px 0;}.user {color: blue;}.assistant {color: green;}pre {background-color: #f0f0f0;padding: 10px;border-radius: 5px;overflow-x: auto;}</style>
</head>
<body><div class="chat-container"><h1>程序员对话</h1><div id="chat-box-dev"></div><input type="text" id="user-input-dev" placeholder="请输入你的问题"><button id="send-button-dev">发送</button></div><div class="chat-container"><h1>测试工程师对话</h1><div id="chat-box-test"></div><input type="text" id="user-input-test" placeholder="请输入你的问题"><button id="send-button-test">发送</button></div><script>// 程序员对话document.getElementById('send-button-dev').addEventListener('click', async () => {await sendMessage('dev');});document.getElementById('user-input-dev').addEventListener('keypress', async (e) => {if (e.key === 'Enter') {await sendMessage('dev');}});// 测试工程师对话document.getElementById('send-button-test').addEventListener('click', async () => {await sendMessage('test');});document.getElementById('user-input-test').addEventListener('keypress', async (e) => {if (e.key === 'Enter') {await sendMessage('test');}});async function sendMessage(role) {const userInput = document.getElementById(`user-input-${role}`).value;const chatBox = document.getElementById(`chat-box-${role}`);if (userInput.trim() === '') {alert('输入不能为空');return;}// 显示用户输入chatBox.innerHTML += `<p class="message user"><strong>用户:</strong> ${userInput}</p>`;document.getElementById(`user-input-${role}`).value = '';// 发送请求到后端const response = await fetch(`/chat_${role}`, {method: 'POST',headers: {'Content-Type': 'application/json'},body: JSON.stringify({ message: userInput })});const data = await response.json();const roleLabel = '';chatBox.innerHTML += `<p class="message assistant"><strong>${roleLabel}</strong> <pre>${data.reply}</pre></p>`;chatBox.scrollTop = chatBox.scrollHeight;}</script>
</body>
</html>
import os
import asyncio
from fastapi import FastAPI
from fastapi.responses import HTMLResponse
from pydantic import BaseModel
import openai
from fastapi.middleware.cors import CORSMiddleware
import jsonapp = FastAPI()# 配置 CORS
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# OpenAI API 访问密钥
openai.api_key = os.getenv("OPEN_API_KEY")class Message(BaseModel):message: str# 定义返回格式async def get_response(prompt):response = openai.chat.completions.create(model="gpt-3.5-turbo-1106",messages=prompt,top_p=0.2,response_format={"type": "json_object"},stream=False)result = response.choices[0].message.contentreturn result @app.post("/chat_dev")
async def chat_dev(message: Message):content = message.messageresponse_format = {"data":[{"id": "int型,唯一ID","level": "int型,层级","parent_id": "int型,父节点id","seq_index": "int型,如果是同一层级且同一父级,则该代表语句顺序index,从1开始","content": "内容",}]}response_format_str = json.dumps(response_format)messages = [{"role": "user","content": f'''# Role- 你是一个文档分层助手,你需要按照文档含义将文档分成父子结构## Attention- 只引用内容,不要改变文档的内容- 分层后的内容的合集对比原始文本,不要出现文本丢失## WorkFlow- 先按照语义将文档进行分段,每段内容语义要内聚- 在基于分段的内容进行从属关系的划分## Task- 分析 ""{content}""## JSON- """{response_format_str}"""## Init- 做为<Role>,严格遵守<Attention>,并依照<WorkFlow>去完成<Task>,并以<JSON>方式输出'''}]result = await get_response(messages)return {"reply": result}@app.post("/chat_test")
async def chat_test(message: Message):user_input = message.messagechange_input = {"role": "user", "content": user_input}messages_for_test.append(change_input)result = await get_response(messages_for_test)messages_for_test.append({"role": "assistant", "content": result})return {"reply": result}@app.get("/", response_class=HTMLResponse)
async def get():with open("index.html", "r", encoding="utf-8") as f:return f.read()test_prompt = '''
# role:
- 测试工程师
## Task
- 根据用户与程序员的沟通历史和开发的输出,对程序员的代码进行测试
- 测试结果包括是否可运行,是否有潜在影响稳定的问题
## 沟通方式
- 在跟程序员发送信息时请在对话前添加"测试对程序员说:"
## Init
作为角色<role>,严格遵守<Task><沟通方式>
'''
messages_for_test = [{"role": "system","content": test_prompt}
]
1. RAG
RAG,全称Retrieval-Augmented Generation,即:检索增强生成。
直白点就是给大模型递“小抄”,让大模型基于“小抄”内容答复用户。
1.1 RAG的应用场景
RAG 目前主要用于两大场景:智能客服、智能知识库问答。
1.2 RAG解决了什么问题
大模型并没有你的私有化数据,通过 RAG,可以将你的私有化数据给到大模型。在智能客服/智能知识库问答场景下就是利用了这一点。
RAG 可以减少大模型“幻觉”的产生,因为回答的内容被局限到“小抄”范围内了
1.3 RAG架构图
RAG 在实际落地中还是相对复杂的,我们来看一个RAG的架构全貌
1.4 向量数据库
上图中介绍了向量数据库,向量数据库,顾名思义就是存储向量的数据库。
向量数据库在 RAG 架构中有着非常重要的作用,它为 RAG 架构提供了语义搜索的能力。
向量数据库并不会对文本进行Embedding,而是存储Embedding后的结果
回忆下什么是向量、多维向量
向量之间可以计算相似度,向量数据库就是存储这些向量,并且提供向量相似度的计算,找到跟检索内容在语义上相似的内容。来看个图理解下
注意:两侧的Embedding模型以及维度一定要一致
下一节课我们会再详细的介绍下目前的主流向量数据库以及相关的特性。
2. RAG架构详解
接下来我们基于架构图逐个讲解下相关的功能的详细设计
2.1 语义分析
在多轮对话中,往往用户发送的某一条Prompt的语义是需要结合上下文来推理得出的。比如:
用户: 周杰伦是谁?
系统: 周杰伦是著名歌手
用户: 他多大了?
用户最后的问题"他多大了"的语义其实是"周杰伦多大了"。
在后续私有知识库检索的时候,如果使用"他多大了"来进行检索,会影响检索的准确率。当然如果你的业务场景就是仅仅针对"周杰伦"的问答,并且知识库的内容仅仅是"周杰伦"的内容,那这个语义分析的过程可以省略。
语义分析方法
语义分析借助大模型也可以轻松实现,来看段代码
from dotenv import load_dotenv, find_dotenv
import openai
import osload_dotenv()
client = openai.OpenAI(api_key=os.getenv("api_key"))# 模拟用户第二次的对话内容
user_prompt = "他多大了"# 模拟用户的历史对话内容
messages = [{"role": "user","content": "周杰伦是谁?"
}, {"role": "assistant","content": "周杰伦是著名歌手"
}]# 构建语义分析Prompt
messages.append({"role": "user","content": f'''请基于上下文分析''{user_prompt}''这句话的意图,并以json的格式返回,json格式:{{"purpose":""}}'''
})# -------------- Chat Request参数 详解 --------------
chat_completion = client.chat.completions.create(messages=messages,model='gpt-3.5-turbo-1106',top_p=0.2,response_format={"type": "json_object"}, #输出类型为jsonstream=False
)chat_completion.choices[0].message.content
3. 知识库构建与检索
知识库的构建是RAG的重中之重,构建的知识库质量直接决定了RAG的最终效果。而一般来说,知识库的构建方式与检索方式是需要相互配合的。
这里我们先来介绍下知识库的构建方式。
3.1 知识库构建
知识库构建分为几个步骤:数据采集、数据预处理、数据结构化 & 存储结构设计、数据增强
3.1.1 数据采集
一般是从现有的业务服务(比如wiki、订单服务、用户服务…)中获取数据,或者从现有的BRD(业务需求文档)、PRD(产品需求文档)或其他文档中获取数据。
数据格式多样:文本(普通文本、word、markdown、html)、语音、图片、视频、其他(excel、ppt… …)等。
3.1.2 数据预处理
数据处理会细分为很多维度,比如:格式转换、数据清洗(去除脏数据、无关的数据)、数据脱敏、数据增强
格式转换,为了后续可以对这些数据进行检索,所以需要将所有数据类型转换成文本,下面简要的给一些转换的方式
格式类型 处理方式
语音 可以通过OpenAI的Audio接口来进行语音转文本(ASR)
图片 可以通过OpenAI的ChatCompletion接口利用GPT-4V的能力对图片进行解析获取图片描述文本;
另外还有一种方式就是OCR可以尝试使用Google的"pytesseract"来完成
视频 视频目前没有太好的方式,可以尝试将视频中的音频提取出来再做ASR。
可以尝试使用python的"moviepy"来提取音频
Word文档 可以通过python的"docx"库来解决
PPT文档 可以通过python的"pptx"库来解决
Excel文档 可以通过python的"pandas"库来解决
Mardkown 可以通过python的"markdown"库来解决
HTML 可以通过python的"beautifulsoup4"库来解决
PS:如果上述python库不会使用,可以先尝试跟GPT对话获取方式,也可以群内讨论
数据清洗,为了保障数据不会误导后续的大模型答复,需要进行一定的数据清洗
去除重复的数据:去除重复的数据,减少相同的数据的多次无效存储,可以使用pandas库
纠正错误/误导的数据:业务相关,比如某些业务场景下订单金额不该超过5000元
以上这些内容都可以借助Python的pandas库来实现,来看个代码
import pandas as pddata = {'name': ["菠菜", "菠萝", "黄瓜", "菠菜"],'age': [18, -19, 19, 20],'pay_money': [-10, 3000, 8532, 10]
}df = pd.DataFrame(data)# 按照name去重
df = df.drop_duplicates(subset=['name'])print("去重后的:")
print(df)# 修正年龄
df.loc[df['age'] < 0, 'age'] = 0
# 修正支付金额
df.loc[df['pay_money'] < 0, 'pay_money'] = 0
df.loc[df['pay_money'] > 5000, 'pay_money'] = 5000df
数据清洗
去除不相关的数据:假设你的智能问答仅仅关于旅游的,就不要出现炒股相关的内容,可以利用GPT来实现请识别出下列信息中与旅游不相关的数据,并标识出行数
'''
1. 旅游产业今年GDP将大xxxx规模
2. 今天股票行情突变
3. 3月份适合去上海游览xxx
'''
将不相关的内容以json方式返回,格式为{"line_num":[<行数>]}
去除有害内容:黄赌毒、反人类、反社会的内容,可以利用GPT来实现请识别出下列信息中的有毒有害信息,包括:黄赌毒、反人类、反社会的内容。
'''
1. 今天天气真好
2. 我要毁灭地球
3. ....
'''
将有毒有害信息内容以json方式返回,格式为{"line_num":[<行数>]}
数据脱敏,为了避免敏感数据(密码、手机号、身份证等)的泄漏,需要做一定的数据脱敏
首先可以根据具体的业务场景划定敏感数据的范围,比如:身份证、手机号。
然后可以利用GPT来识别敏感信息并进行处理,比如可以使用下边的Prompt请识别出下列信息中的敏感数据,敏感数据包括:身份证、手机号。
'''xxxxxx'''
将命中的敏感信息以json方式返回,格式为{"card_no":"<身份证>", "phone": "<手机号>"}
大模型让数据预处理的过程变的简单,并且成本更低
3.1.3 数据结构化
数据经过预处理之后,我们得到了相对优质的数据,接下来就要对数据进行进一步的结构化的设计了。
结构化是按照一定策略将原始的文本处理成结构化数据,结构化的数据有利于文本的检索准确度
- 按段分割
按照固定长度去切分原始的文档
- 有重叠的按段分割
按照固定长度,有重叠的去切分原始的文档
3. 父子结构化(推荐)
基于语义,将文本切分成具有父子结构的数据。如下图:
怎么理解基于语义切分?其实可以想象一篇议论文,议论文有观点,以及支持观点的论证,那观点就是父节点,而针对这个观点的论证就是子节点
什么时候需要数据结构化 & 结构化的好处
如果文档内容有着严谨的上下文关系,即,你需要完整阅读整篇文章才能准确理解文章的意思,那这种文章不适合结构化
如果文档内部有着相对独立的段落语义,并且文档内容超过1000字,那可以结构化
结构化建议使用父子结构的方式,优点是:基于语义分割,更精准,后续如果做数据增强有针对性,缺点是:实现复杂
文档结构化后,每次检索并传递给大模型的只是一部分文档,这样做的好处:
节省Token
加快大模型的推理速度
避免了内容过多导致的超过大模型上下文限制
数据结构化的方法
我们着重介绍下父子结构的实现方式
from dotenv import load_dotenv, find_dotenv
import openai
import os
import jsonload_dotenv()
client = openai.OpenAI(api_key=os.getenv("api_key"))# 定义返回格式
response_format = {"data":[{"id": "int型,唯一ID","level": "int型,层级","parent_id": "int型,父节点id","seq_index": "int型,如果是同一层级且同一父级,则该代表语句顺序index,从1开始","content": "内容",}]
}
response_format_str = json.dumps(response_format) #类似把多行数据改成一行with open('chuneng.txt', 'r', encoding='utf-8') as file:content = file.read()# 模拟用户的历史对话内容
messages = [{"role": "user","content":
f'''
# Role
- 你是一个文档分层助手,你需要按照文档含义将文档分成父子结构
## Attention
- 只引用内容,不要改变文档的内容
- 分层后的内容的合集对比原始文本,不要出现文本丢失
## WorkFlow
- 先按照语义将文档进行分段,每段内容语义要内聚
- 在基于分段的内容进行从属关系的划分
## Task
- 分析 ""{content}"" #这里把把整个文件通过prompt传给chatgpt
## JSON
- """{response_format_str}""" #定义输出的格式
## Init
- 做为<Role>,严格遵守<Attention>,并依照<WorkFlow>去完成<Task>,并以<JSON>方式输出
'''
}]# -------------- Chat Request参数 详解 --------------
chat_completion = client.chat.completions.create(messages=messages,model='gpt-3.5-turbo-1106',top_p=0.2,response_format={"type": "json_object"}, #使用json 格式回复,本来就是一句话,前面加了一个前缀stream=False
)doc_content = json.loads(chat_completion.choices[0].message.content)
print(doc_content)
3.1.4 数据增强
数据增强是为了增加数据节点被检索的机会。实际场景中,如果我们发现某一个数据节点本该被检索,却没有被检索出来,就可以考虑针对该节点进行数据增强。
数据增强的过程也可以利用GPT来实现,来看下Prompt
你是一名数据增强的助手,请你将输入文本再度发挥想象,更换成不同角度的问问题的方式,在进行输出
针对以下数据做一下数据增强,基于给定数据,充分发挥你的想象力,更换不同角度来询问问题
'''公司提倡的企业文化:高效、协同、补位、数据驱动'''
将输出结果以json方式返回,格式为{'ask':[<询问的问题>]}
3.1.5 数据存储
完成了数据结构化且数据增强的数据,接下来就是将这些数据存储到 向量数据库
我们再来看下,如何来设计存储架构,来个存储架构图:
由于MySQL擅长存储这种结构化的数据,我们将原始的结构化数据存储到MySQL中。
为了具备语义检索的能力,我们再将每个节点数据存储到向量数据库中。
总结:向量数据库提供语义检索能力,MySQL提供结构关系的存储能力。
接下来我们看看如何设计MySQL:
CREATE TABLE `doc` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '文档id',`name` varchar(100) NOT NULL DEFAULT '' COMMENT '文档名称',`content` text NOT NULL COMMENT '文档内容',
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '文档表';CREATE TABLE `doc_nodes` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '节点id',`doc_id` bigint NOT NULL DEFAULT -1 COMMENT '文档表的ID',`parent_id` bigint NOT NULL DEFAULT -1 COMMENT '从属的父节点ID',`level` smallint NOT NULL DEFAULT 0 COMMENT '层级,父节点层级为0',`seq` smallint NOT NULL DEFAULT 0 COMMENT '同一父节点、同一层级下的,排序',`content` varchar(2000) NOT NULL DEFAULT '' COMMENT '数据节点的内容',
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '文档节点表';CREATE TABLE `doc_nodes_da` (`id` bigint NOT NULL AUTO_INCREMENT COMMENT '节点id',`doc_nodes_id` bigint NOT NULL DEFAULT -1 COMMENT '文档节点表的ID',`content` varchar(2000) NOT NULL DEFAULT '' COMMENT '数据增强的内容',
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT '文档节点数据增强表';
doc表用于存储原始文档(当然也可以只生成id跟nanme,不对内容进行存储)
doc_nodes表用于存储结构化的数据
doc_noes_da表用于存储数据节点的增强数据
为了演示方便,接下来我们将暂时使用SQLite来代替MySQL(SQLite可以快速构建本地环境),对应的SQL也会相应的进行调整。
- 数据库初始化
import sqlite3# 连接到 SQLite 数据库
# 如果文件不存在,会自动创建
conn = sqlite3.connect("doc.db")# 创建一个游标对象
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE doc (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, content TEXT NOT NULL
);
''')cursor.execute('''
CREATE TABLE doc_nodes (id INTEGER PRIMARY KEY AUTOINCREMENT, doc_id INTEGER NOT NULL, parent_id INTEGER NOT NULL, level INTEGER NOT NULL DEFAULT 0, seq INTEGER NOT NULL DEFAULT 0,content TEXT NOT NULL
);
''')cursor.execute('''
CREATE TABLE doc_nodes_da (id INTEGER PRIMARY KEY AUTOINCREMENT,doc_nodes_id INTEGER NOT NULL, content TEXT NOT NULL
);
''')# 保存(提交)更改
conn.commit()# 关闭游标和连接
cursor.close()
conn.close()
import sqlite3conn = sqlite3.connect("doc.db")
# 开启自动提交
conn.isolation_level = Nonecursor = conn.cursor()# 先清空下数据库
cursor.execute("delete from doc")
cursor.execute("delete from doc_nodes")
cursor.execute("delete from doc_nodes_da")# 插入文档到doc表
rs = cursor.execute("insert into doc(name, content) values(?, ?)", ("深地储能", content))
doc_id = cursor.lastrowid# id mapping,即id映射,key是结构化的id,value是插入doc_nodes表后的id
id_mapping = {}
for one in doc_content["data"]:# 将结构化的id映射成数据库idparent_id = -1if one["parent_id"] != 0:parent_id = id_mapping[one["parent_id"]]cursor.execute("insert into doc_nodes(doc_id, parent_id, level, seq, content) values(?, ?, ?, ?, ?)",(doc_id, parent_id, one["level"], one["seq_index"], one["content"]),)# 记录id映射关系id_mapping[one["id"]] = cursor.lastrowidrs = cursor.execute("select * from doc_nodes order by id asc")
print(rs.fetchall())cursor.close()
conn.close()
数据插入向量数据库
这里我们先暂时使用 ChromaDB 作为向量数据库。ChromaDB 底层也是基于SQLite的,可以快速的构建本地环境。
!pip install chromadbimport chromadb
import sqlite3
import openai
from dotenv import load_dotenv, find_dotenv
import osload_dotenv()# 获取embedding
def get_embeddings(embedding_input):client = openai.OpenAI(api_key=os.getenv("api_key"))embeddings = client.embeddings.create(model="text-embedding-ada-002",input=embedding_input,)embeddings_result = []for i in embeddings.data:embeddings_result.append(i.embedding)return embeddings_resultconn = sqlite3.connect("doc.db")
# 开启自动提交
conn.isolation_level = Nonecursor = conn.cursor()# 获取或者创建数据库
vector_db = chromadb.PersistentClient(path="./").get_or_create_collection(# 向量数据库的名称name='knowledge_db'
)# 从SQLite中获取数据,并embedding
ids = []
contents = []
metadatas = []
# 获取doc_nodes的数据
rs = cursor.execute("select * from doc_nodes order by id asc")
for one in rs.fetchall():ids.append("doc_nodes:" + str(one[0]))metadatas.append({"doc_nodes_id": one[0]})contents.append(one[5])# 获取doc_nodes_da的数据
rs = cursor.execute("select * from doc_nodes_da order by id asc")
for one in rs.fetchall():ids.append("doc_nodes_da:" + str(one[0]))metadatas.append({"doc_nodes_id": one[1]})contents.append(one[2])# 获取embeddings,这里做了简化,如果doc内容过多,可以分批去做embeddings
contents_embeddings = get_embeddings(contents)# 插入向量数据库
vector_db.upsert(ids=ids, embeddings=contents_embeddings, metadatas=metadatas)print(vector_db.get(include=["metadatas"]))cursor.close()
conn.close()
3.2 知识库检索
前面我们做大了大量的工作完成了数据的预处理、结构化以及存储结构的设计等,接下来就是如何进行检索了。
我们回忆下,目前具备了结构化的MySQL/SQLite存储,也具备了语义搜索的向量数据库存储,来看下如何检索:
首先去向量数据库进行语义搜索,找到与 “XXXXXX” 语义相似的内容 “节点1.2” 以及 “节点2.1”
基于 “节点1.2” 以及 “节点2.1” 去搜索他们周边的内容
3.2.1 为何要搜索节点周边的数据
检索的目的是为了给大模型 “递小抄”,而如果给大模型的小抄内容只有被检索出来的数据节点,那大模型很难推理出正确的结果,因为缺少数据节点的上下文。
就像给人递小抄,只是一句话,而没有这句话的上下文,那很难能理解这句话的真正意图,而只有知道这句话的前后背景,才能正确的理解。
上下文有三种类型:
子节点,子节点用于补充被检索节点的细节信息
父节点,父节点用于补充被检索节点的大的背景信息
同层节点,同层节点用于补充被检索节点的前后信息
3.2.2 检索策略
检索策略是重中之重,检索策略就是问自己三个问题:
是否向下融合子节点,子子节点… … 融合多少
是否融合同层节点,融合多少
是否融合父节点,父父节点,是否融合父节点的同层节点… … 融合多少
对于"融合多少"这个问题,可以以大小限制作为判断,比如融合子节点以及再向下2层的子节点,总共不超过500字。
那如何评价策略的效果?这个我们将会再下一节课来讲解。
接下来我们看看如何用代码来实现这个过程:
模拟用户输入,并在向量数据库中检索
text = "岩盐的特点是什么,为什么适合地下储能?"
rs = vector_db.query(query_embeddings=get_embeddings([text]), n_results=5, include=['distances', 'metadatas'])
print(rs)
定于DocTree,用于存储数据,提供上下文的裁剪能力
class DocTree:def __init__(self):self.nodes = {} self.masterNode = Noneself.rootNodes = {}# 添加树的节点def addNode(self, treeNode, isMaster=False):if treeNode.id in self.nodes:returnself.nodes[treeNode.id] = treeNodeif isMaster:treeNode.isMaster = TruetreeNode.enable = Trueself.masterNode = treeNode# 构建树def buildTree(self): # mock 缺失的父节点mockNodes = []for node in self.nodes.values():if node.parentID not in self.nodes and node.parentID > 0:mockNodes.append(DocTreeNode(node.parentID, 0, 1, '', True))for one in mockNodes:self.addNode(one)for node in self.nodes.values():if node.parentID <= 0:self.rootNodes[node.getUniqID()] = nodeelif not node.isMock:node.setParentNode(self.nodes[node.parentID])self.nodes[node.parentID].addChildNode(node) # 裁剪树def cropTree(self, sameLayerMaxSize=-1, parentLayerMaxSize=-1, childLayerMaxSize=-1):if sameLayerMaxSize > 0:self._cropSameLayer(sameLayerMaxSize)if parentLayerMaxSize > 0:self._cropParentLayer(parentLayerMaxSize)if childLayerMaxSize > 0:self._cropChildLayer(childLayerMaxSize)# 根据尺寸裁剪节点def _cropNode(self, maxSize, nodes, masterNode=None):if nodes is None or len(nodes) == 0 or maxSize == 0:return 0, 0nodeIndex = 0if masterNode is not None:try:nodeIndex = nodes.index(masterNode)except:passbeforeIndex = nodeIndexafterIndex = nodeIndex + 1if self._calSize(nodes[beforeIndex:afterIndex], masterNode) > maxSize:return 0, 0step = 0retainNum = 0beforeIndexCanMove = TrueafterIndexCanMove = Truewhile beforeIndexCanMove or afterIndexCanMove:step += 1if step % 2 == 1:if beforeIndex > 0 and self._calSize(nodes[beforeIndex-1:afterIndex], masterNode) <= maxSize:beforeIndex -= 1retainNum += 1else:beforeIndexCanMove = Falseelse:if afterIndex <= len(nodes) and self._calSize(nodes[beforeIndex:afterIndex+1], masterNode) <= maxSize:afterIndex += 1retainNum += 1else:afterIndexCanMove = Falsefor one in nodes[beforeIndex:afterIndex]:one.enable = Truereturn retainNum, self._calSize(nodes[beforeIndex:afterIndex], masterNode)def _cropSameLayer(self, maxSize):if self.masterNode.parentNode is None:self._cropNode(maxSize, None, self.masterNode)else:self._cropNode(maxSize, self.masterNode.parentNode.childs, self.masterNode)def _cropParentLayer(self, maxSize):if self.masterNode.parentNode is None:returnparentNode = self.masterNode.parentNodeleftSize = maxSizestep = 0while True:step += 1if step % 2 == 1:retainNum, usedSize = self._cropNode(leftSize, [parentNode])if retainNum == 1:leftSize -= usedSizeelse:breakelse:if parentNode.parentNode is not None:retainNum, usedSize = self._cropNode(leftSize, parentNode.parentNode.childs, parentNode)if retainNum == len(parentNode.parentNode.childs):parentNode = parentNode.parentNodeleftSize -= usedSizeelse:breakelse:breakdef _cropChildLayer(self, maxSize, node=None):childs = [self.masterNode]while True:childs_temp = []for one in childs:childs_temp = childs_temp + one.childsretainNum, usedSize = self._cropNode(maxSize, childs_temp)if retainNum == 0 or retainNum != len(childs_temp) or usedSize == maxSize:breakelse:maxSize -= usedSizechilds = childs_tempdef _calSize(self, nodes, masterNode):size = 0for one in nodes:if masterNode is None:size += one.sizeelif one.id != masterNode.id:size += one.sizereturn sizedef printTree(self, treeNode=None, layer=0, justPrintEnable=False):if treeNode is None:for key in sorted(self.rootNodes):one = self.rootNodes[key]one.printNode(layer, justPrintEnable)for oneChild in one.childs:self.printTree(oneChild, layer+1, justPrintEnable)else:treeNode.printNode(layer, justPrintEnable)for oneChild in treeNode.childs:self.printTree(oneChild, layer+1, justPrintEnable)def toStr(self, treeNode=None):rs = ""if treeNode is None:for key in sorted(self.rootNodes):one = self.rootNodes[key]if one.enable and not one.isMock:rs += one.contentfor oneChild in one.childs:rs += self.toStr(oneChild)else:if treeNode.enable and not treeNode.isMock:rs += treeNode.contentfor oneChild in treeNode.childs:rs += self.toStr(oneChild)return rsclass DocTreeNode:def __init__(self, id, parentID, seq, content, isMock=False):self.id = idself.parentID = parentIDself.seq = seqself.content = contentself.size = len(content)self.childs = []self.parentNode = Noneself.isMaster = Falseself.enable = Falseself.isMock = isMockdef getUniqID(self):return self.parentID * 1000000000000 + self.id * 10000 + self.seqdef setParentNode(self, node):self.parentNode = nodedef addChildNode(self, node):self.childs.append(node)self.childs.sort(key=lambda node: node.seq)def printNode(self, layer, justPrintEnable=True):if justPrintEnable and not self.enable:returnfor i in range(layer):print("--", end="")if self.isMaster:print("***", end="")print(self.id, self.parentID, self.seq, self.content)