- 大家好,我是同学小张,日常分享AI知识和实战案例
- 欢迎 点赞 + 关注 👏,持续学习,持续干货输出。
- +v: jasper_8017 一起交流💬,一起进步💪。
- 微信公众号也可搜【同学小张】 🙏
本站文章一览:
当大模型的返回文字非常多时,返回完整的结果会耗费比较长的时间。如果等待大模型形成完整的答案再展示给用户,明显会给用户不好的体验。所以,现在市面上大多数的AI应用,在给用户结果时,都是以流式输出的方式展示给用户的。所谓的流式输出,就是类似打字机式的方式,一个字或一个词的输出,给用户一种答案逐渐出现的动画效果。
今天我们来学习下如何流式输出大模型的返回结果。本文将涵盖 LangChain 的流式输出方式和 OpenAI 原生的流式输出方式。
文章目录
- 0. LangChain的流式输出 Streaming
- 0.1 实现流式输出
- 1. OpenAI 原生的流式输出
- 1.1 启动 OpenAI 的流式输出
- 1.2 流式输出结果组装
- 1.3 完整的流式输出测试程序
0. LangChain的流式输出 Streaming
0.1 实现流式输出
我们在 【AI大模型应用开发】【LangChain系列】实战案例4:再战RAG问答,提取在线网页数据,并返回生成答案的来源 代码的基础上,增加流式输出。
原代码:
import bs4
from langchain import hub
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter# Load, chunk and index the contents of the blog.
loader = WebBaseLoader(web_paths=("https://lilianweng.github.io/posts/2023-06-23-agent/",),bs_kwargs=dict(parse_only=bs4.SoupStrainer(class_=("post-content", "post-title", "post-header"))),
)
docs = loader.load()text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(docs)
vectorstore = Chroma.from_documents(documents=splits, embedding=OpenAIEmbeddings())# Retrieve and generate using the relevant snippets of the blog.
retriever = vectorstore.as_retriever()
prompt = hub.pull("rlm/rag-prompt")
llm = ChatOpenAI(model_name="gpt-3.5-turbo", temperature=0)def format_docs(docs):return "\n\n".join(doc.page_content for doc in docs)from langchain_core.runnables import RunnableParallelrag_chain_from_docs = (RunnablePassthrough.assign(context=(lambda x: format_docs(x["context"])))| prompt| llm| StrOutputParser()
)rag_chain_with_source = RunnableParallel({"context": retriever, "question": RunnablePassthrough()}
).assign(answer=rag_chain_from_docs)result = rag_chain_with_source.invoke("What is Task Decomposition")
print(result)
修改为流式输出:
# result = rag_chain_with_source.invoke("What is Task Decomposition")
# print(result)
for chunk in rag_chain_with_source.stream("What is Task Decomposition"):print(chunk)
修改方式很简单,LangChain的Chain中已经帮我们封装好了 stream 接口,调用该接口获取的结果即为流式输出的结果。其输出的结果如下(每次输出一个词,词前面加一个Key,用来标识这是答案的哪一部分):
我们可以利用Key来组装答案:
output = {}
curr_key = None
for chunk in rag_chain_with_source.stream("What is Task Decomposition"):for key in chunk:if key not in output:output[key] = chunk[key]else:output[key] += chunk[key]if key != curr_key:print(f"\n\n{key}: {chunk[key]}", end="", flush=True)else:print(chunk[key], end="", flush=True)curr_key = key
这样我们看到的答案的打印过程就是一个词一个词的出现了。最后展示完跟非流式输出一样。
1. OpenAI 原生的流式输出
1.1 启动 OpenAI 的流式输出
只需要在OpenAI接口调用时,将stream参数置为True,就启用了流式输出。
response = client.chat.completions.create(model = model,messages = messages,temperature = temperature,stream=True, # 启动流式输出
)
1.2 流式输出结果组装
结果的组装过程如下,流式输出的结果在 msg.choices[0].delta
中存着:
text = ""
print("====Streaming====")
# 需要把 stream 里的 token 拼起来,才能得到完整的 call
for msg in response:delta = msg.choices[0].deltaif delta.content:text_delta = delta.contentprint(text_delta)text = text + text_delta
print("====done!====")
if text:print(text)
1.3 完整的流式输出测试程序
from openai import OpenAI
# 加载 .env 到环境变量
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())client = OpenAI()###### 这里封装成函数 #######
def get_openai_chat_completion(messages, temperature, model = "gpt-3.5-turbo-1106"):response = client.chat.completions.create(model = model,messages = messages,temperature = temperature,stream=True, # 启动流式输出)return responseSYSTEM_PROMPT = """
你是一名资深教师,你叫“同学小张”,用户会给你一个提示,你根据用户给的提示,来为用户设计关于此课程的学习大纲。
你必须遵循以下原则:
1. 你有足够的时间思考,确保在得出答案之前,你已经足够理解用户需求中的所有关键概念,并给出关键概念的解释。
2. 输出格式请使用Markdown格式, 并保证输出内容清晰易懂。
3. 至少输出10章的内容, 每章至少有5个小节不要回答任何与课程内容无关的问题。
"""if __name__ == "__main__":user_input = "大模型应用开发"messages = [{"role": "system","content": SYSTEM_PROMPT,},{"role": "user","content": user_input,} ]response = get_openai_chat_completion(messages, 0.5)text = ""print("====Streaming====")# 需要把 stream 里的 token 拼起来,才能得到完整的 callfor msg in response:delta = msg.choices[0].deltaif delta.content:text_delta = delta.contentprint(text_delta)text = text + text_deltaprint("====done!====")if text:print(text)
流式输出过程如下:
组装后结果如下:
如果觉得本文对你有帮助,麻烦点个赞和关注呗 ~~~
- 大家好,我是 同学小张,日常分享AI知识和实战案例
- 欢迎 点赞 + 关注 👏,持续学习,持续干货输出。
- +v: jasper_8017 一起交流💬,一起进步💪。
- 微信公众号也可搜【同学小张】 🙏
本站文章一览: