构建LangChain应用程序的示例代码:10、如何使用LangChain的标准聊天功能,并通过Apache Kafka来回传递聊天消息教程

使用Apache Kafka路由消息

本示例向您展示了如何使用LangChain的标准聊天功能,并通过Apache Kafka来回传递聊天消息。

目标是模拟一个架构,其中聊天前端和LLM作为需要通过内部网络相互通信的独立服务运行。

这是一种替代通过REST API请求模型响应的典型模式(本文末尾有更多信息,解释了为什么您可能想要这样做)。

1. 安装主要依赖项

依赖项包括:

  • Quix Streams库,用于以"Pandas-like"的方式管理与Apache Kafka(或Kafka-like工具,如Redpanda)的交互。
  • LangChain库,用于管理与Llama-2的交互并存储对话状态。
!pip install quixstreams==2.1.2a langchain==0.0.340 huggingface_hub==0.19.4 langchain-experimental==0.0.42 python-dotenv

2. 构建并安装llama-cpp-python库(启用CUDA以利用Google Colab GPU)

llama-cpp-python库是一个Python包装器,围绕llama-cpp库,使您能够高效地仅使用CPU运行量化的LLM。

使用标准的pip安装命令llama-cpp-python默认不支持GPU。如果在Google Colab中仅依赖CPU,生成可能会非常慢,所以下面的命令添加了一个额外的选项来构建并安装带有GPU支持的llama-cpp-python(确保您在Google Colab中选择了GPU支持的运行环境)。

!CMAKE_ARGS="-DLLAMA_CUBLAS=on" FORCE_CMAKE=1 pip install llama-cpp-python

3. 下载并设置Kafka和Zookeeper实例

从Apache网站下载Kafka二进制文件,并以守护进程方式启动服务器。我们将使用Apache Kafka提供的默认配置来启动实例。

!curl -sSOL https://dlcdn.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz
!tar -xzf kafka_2.13-3.6.1.tgz!./kafka_2.13-3.6.1/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-3.6.1/config/zookeeper.properties
!./kafka_2.13-3.6.1/bin/kafka-server-start.sh -daemon ./kafka_2.13-3.6.1/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

4. 检查Kafka守护进程是否正在运行

显示正在运行的进程,并过滤Java进程(您应该看到两个——每个服务器一个)。

!ps aux | grep -E '[j]ava'

5. 导入所需的依赖项并初始化所需的变量

导入与Kafka交互的Quix Streams库,以及运行ConversationChain所需的LangChain组件。

# 导入实用程序库
import json
import random
import re
import time
import uuid
from os import environ
from pathlib import Path
from random import choice, randint, randomfrom dotenv import load_dotenv# 从Hugging Face hub直接下载模型的Hugging Face实用程序:
from huggingface_hub import hf_hub_download
from langchain.chains import ConversationChain# 导入Langchain模块以管理提示和对话链:
from langchain.llms import LlamaCpp
from langchain.memory import ConversationTokenBufferMemory
from langchain.prompts import PromptTemplate, load_prompt
from langchain_core.messages import SystemMessage
from langchain_experimental.chat_models import Llama2Chat
from quixstreams import Application, State, message_key# 导入Quix依赖项
from quixstreams.kafka import Producer# 初始化全局变量。
AGENT_ROLE = "AI"
chat_id = ""# 将当前角色设置为角色常量,并为补充客户元数据初始化变量:
role = AGENT_ROLE

6. 下载"llama-2-7b-chat.Q4_K_M.gguf"模型

从Hugging Face下载量化的Llama-2 7B模型,我们将使用它作为本地LLM(而不是依赖于外部服务的REST API调用)。

model_name = "llama-2-7b-chat.Q4_K_M.gguf"
model_path = f"./state/{model_name}"if not Path(model_path).exists():print("The model path does not exist in state. Downloading model...")hf_hub_download("TheBloke/Llama-2-7b-Chat-GGUF", model_name, local_dir="state")
else:print("Loading model from state...")

7. 加载模型并初始化对话记忆

加载Llama 2,并使用ConversationTokenBufferMemory将对话缓冲区设置为300个token。这个值用于在仅CPU容器中运行Llama,所以如果在Google Colab中运行,您可以提高它。它防止了托管模型的容器内存不足。

在这里,我们覆盖了默认的系统角色,以便聊天机器人具有《银河系漫游指南》中Marvin The Paranoid Android的个性。

# 使用适当的参数加载模型:llm = LlamaCpp(model_path=model_path,max_tokens=250,top_p=0.95,top_k=150,temperature=0.7,repeat_penalty=1.2,n_ctx=2048,streaming=False,n_gpu_layers=-1,
)model = Llama2Chat(llm=llm,system_message=SystemMessage(content="您是一个非常无聊的机器人,具有《银河系漫游指南》中Marvin the Paranoid Android的个性。"),
)# 定义在每次交流中给模型的对话历史量(300个token,或者略多于300个单词)# 该函数自动修剪超出token范围的对话历史中最旧的消息。memory = ConversationTokenBufferMemory(llm=llm,max_token_limit=300,ai_prefix="AGENT",human_prefix="HUMAN",return_messages=True,
)# 定义自定义提示prompt_template = PromptTemplate(input_variables=["history", "input"],template=""" 以下文本是您和需要您智慧的谦逊人类之间聊天的历史。请回复人类最近的消息。当前对话:{history}HUMAN: {input}\:nANDROID:""",
)chain = ConversationChain(llm=model, prompt=prompt_template, memory=memory)print("--------------------------------------------")
print(f"Prompt={chain.prompt}")
print("--------------------------------------------")

8. 使用聊天机器人初始化聊天对话

我们配置聊天机器人通过向"chat" Kafka主题发送固定问候来初始化对话。当我们发送第一条消息时,"chat"主题会自动创建。

def chat_init():chat_id = str(uuid.uuid4())  # 为有效的消息键控给对话一个IDprint("======================================")print(f"Generated CHAT_ID = {chat_id}")print("======================================")chat_init()

9. 初始化回复功能

这个函数定义了聊天机器人应该如何回复传入的消息。与之前的单元格不同,我们不是发送一个固定的消息,而是使用Llama-2生成一个回复,并将该回复发回"chat" Kafka主题。

def reply(row: dict, state: State):print("-------------------------------")print("Received:")print(row)print("-------------------------------")print(f"Thinking about the reply to: {row['text']}...")

10. 检查Kafka主题以获取新的人类消息,并让模型生成回复

如果您第一次运行这个单元格,请运行它并等待在控制台输出中看到Marvin的问候(“Hello my name is Marvin…”)。在收到LLM的回复后,手动停止这个单元格,并继续执行下一个单元格,在那里您将被提示输入您的回复。

一旦您输入了您的消息,请回到这个单元格。您的回复也发送到了同一个"chat"主题。Kafka消费者检查新消息,并过滤掉来自聊天机器人本身的消息,只留下最新的人类消息。

一旦检测到新的人类消息,就会触发回复功能。

在输出中收到LLM的回复后,手动停止这个单元格。

# 定义您的应用程序和设置app = Application(broker_address="127.0.0.1:9092",consumer_group="aichat",auto_offset_reset="earliest",consumer_extra_config={"allow.auto.create.topics": "true"},
)# 定义一个带有JSON反序列化的输入主题input_topic = app.topic("chat", value_deserializer="json")# 定义一个带有JSON序列化的输出主题output_topic = app.topic("chat", value_serializer="json")# 基于输入主题的消息流初始化一个流数据帧:sdf = app.dataframe(topic=input_topic)# 过滤SDF,只包括角色与机器人当前角色不匹配的传入行sdf = sdf.update(lambda val: print(f"Received update: {val}\n\nSTOP THIS CELL MANUALLY TO HAVE THE LLM REPLY OR ENTER YOUR OWN FOLLOWUP RESPONSE")
)# 以便它不会回复自己的消息sdf = sdf[sdf["role"] != role]# 为过滤后的SDF中检测到的任何新消息(行)触发回复功能sdf = sdf.apply(reply, stateful=True)# 再次检查SDF并过滤掉任何空行sdf = sdf[sdf.apply(lambda row: row is not None)]# 更新时间戳列到当前时间的纳秒sdf["Timestamp"] = sdf["Timestamp"].apply(lambda row: time.time_ns())# 将处理过的SDF发布到由output_topic对象指定的Kafka主题。sdf = sdf.to_topic(output_topic)app.run(sdf)

11. 输入人类消息

运行这个单元格以输入您想要发送给模型的消息。它使用另一个Kafka生产者将您的文本发送到"chat" Kafka主题,供模型获取(需要再次运行上一个单元格)。

chat_input = input("Please enter your reply: ")
myreply = chat_inputmsgvalue = {"uuid": chat_id,  # 现在留空"role": "human","text": myreply,"conversation_id": chat_id,"Timestamp": time.time_ns(),
}with Producer(broker_address="127.0.0.1:9092",extra_config={"allow.auto.create.topics": "true"},
) as producer:value = msgvalueproducer.produce(topic="chat",headers=[("uuid", str(uuid.uuid4()))],  # 这里也允许使用字典key=chat_id,  # 现在留空value=json.dumps(value),  # 需要是一个字符串)print("Replied to chatbot with message:")
print("--------------------------------------------")
print(value)
print("--------------------------------------------")
print("\n\nRUN THE PREVIOUS CELL TO HAVE THE CHATBOT GENERATE A REPLY")

为什么要通过Kafka路由聊天消息?

使用LangChain内置的对话管理功能直接与LLM交互更容易。此外,您还可以使用REST API从外部托管的模型生成响应。那么为什么要费心使用Apache Kafka呢?

有几个原因,例如:

  • 集成:许多企业希望运行自己的LLM,以便他们可以将数据保留在内部。这需要将LLM支持的组件集成到可能已经使用某种消息总线进行解耦的现有架构中。
  • 可扩展性:Apache Kafka旨在进行并行处理,因此许多团队更喜欢使用它来更有效地将工作分配给可用的工作者(在这种情况下,“工作者”是运行LLM的容器)。
  • 持久性:Kafka旨在允许服务在另一项服务遇到内存问题或下线时继续进行。这可以防止在多个系统相互通信的复杂分布式架构中发生数据丢失(LLM只是许多相互依赖的系统之一,还包括向量数据库和传统数据库)。

有关事件流为何适用于Gen AI应用程序架构的更多背景信息,请参阅Kai Waehner的文章“Apache Kafka + Vector Database + LLM = Real-Time GenAI”。


本文介绍了一种使用Apache Kafka进行消息路由以模拟独立聊天前端和LLM服务之间通信的架构。通过安装依赖库、设置Kafka环境、初始化模型和对话记忆,构建了一个聊天机器人。这种方法利用了Kafka的集成性、可扩展性和持久性优势,适合需要将LLM集成到现有解耦架构中的企业使用。此外,它还提供了一种比直接REST API调用更健壮和可靠的解决方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/847152.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mac 分享 WIFI 后,iPhone 连接 WIFI,但无法上网

0x00 分享WIFI 如何分享,可查看这篇: MacOS系统如何创建热点并共享Wi-Fi连接 0x01 iPhone 无法上网 打开设置,点击所连 WIFI 进入 配置 DNS 选择 手动 添加 服务器: 公用的有: 114.114.114.114 、180.76.76.76、1…

HTTP --tcp和keep-alive

TCP TCP连接 tcp/ip是全球计算机以及网络设备都在使用的一种常见的分组交换网络分层协议集,客户端可以打开一条tcp/ip连接,连接到可能运行在世界各地的服务器应用程序,一旦连接建立起来了,在客户端和服务器的计算机之间交换的报…

189.轮转数组

题目描述 给定一个整数数组 nums,将数组中的元素向右轮转 k 个位置,其中 k 是非负数。 示例 1: 输入: nums [1,2,3,4,5,6,7], k 3 输出: [5,6,7,1,2,3,4] 解释: 向右轮转 1 步: [7,1,2,3,4,5,6] 向右轮转 2 步: [6,7,1,2,3,4,5] 向右轮转 3 步: [5,…

ar地产沙盘互动体验提供更加丰富多彩的楼盘信息

AR增强现实技术作为其重要分支,正逐步在全球市场中崭露头角。国内的AR增强现实技术公司正致力于链接物理世界和虚拟世界,为用户带来沉浸式的AR体验。它们打造线上线下联动的一站式文旅景区数字化运营平台,让您在享受旅游的同时,也…

Python中的Pandas数据处理与分析

Pandas 是 Python 生态系统中用于数据处理与分析的核心库之一。它提供了高效且便捷的数据结构,能够处理各类数据操作,如数据清洗、转换、分析等。本文将深入探讨 Pandas 的基本功能和高级特性,涵盖数据导入与导出、数据预处理、数据操作、数据…

C语言学习日记————结构体

结构体是一种数据类型-----构造类型 struct 结构体名 { 成员列表 }; 1、struct //结构体 类型 关键字 2、结构体名 //表明这个结构体类型的名称 3、成员列表 …

Lua 基础 04 模块

Lua 基础相关知识 第四期 require 模块,通常是一个表,表里存储了一些字段和函数,单独写在一个 lua 文件。 例如,这是一个 tools.lua 文件,定义了一个局部 tools 表,包含一个 log 函数,可以传…

用容器构建wordpress项目

用容器构建wordpress项目 #准备两个镜像 #数据库和centos docker pull mysql:5.7 docker pull centos:7 #创建一个wordpress文件夹,在wordpress文件里面写一个Dockerfile文件 vim DockerfileFROM centos:7 #基于centos环境RUN yum -y install epel-release ;\ #安装…

笔记93:关于 C++ 中的 Eigen 库

注意1&#xff1a;Eigen 是一个基于 C 模板的线性代数库&#xff0c;以支持在 C 中进行矩阵运算&#xff1b; 注意2&#xff1a;要在 C 中使用 Eigen&#xff0c;需要在在程序开始前要包含所需头文件路径&#xff1b; #include <Eigen> a a 基础用法汇总 定义向量 E…

Shell编程实际应用

一、脚本编程步骤 1. 需求分析 2. 命令测试 3. 脚本编程 4. 测试调优 二、案例分析 1.MAC记录与端口扫描 实验要求&#xff1a; &#xff08;1&#xff09;统计网络中服务器MAC。 &#xff08;2&#xff09;检查哪些主机开启FTP。&#xff08;21&#xff09;&#xff08;wg…

vue3状态管理,pinia的使用

​​​​​​​状态管理 我们知道组件与组件之间可以传递信息&#xff0c;那么我们就可以将一个信息作为组件的独立状态&#xff08;例如&#xff0c;单个组件的颜色&#xff09;或者共有状态&#xff08;例如&#xff0c;多个组件是否显示&#xff09;在组件之传递&#xff0c…

顺序表的讲解与实现

顺序表的讲解与实现 一、顺序表的概念及结构二、顺序表分类顺序表和数组的区别顺序表分类静态顺序表动态顺序表 三、动态顺序表的实现(使用VS2022)1.初始化、销毁、打印内容2.检查扩容3.尾部插入、尾部删除、头部插入、头部删除尾部插入尾部删除头部插入头部删除 4.指定插入、指…

C++ stack类与queue类

目录 0.前言 1.容器适配器 1.1容器适配器的特点 1.2容器适配器的实现 1.3使用容器适配器的场景 2.stack的介绍与使用 2.1介绍 2.2使用 3.queue的介绍与使用 3.1介绍 3.2使用 4.stack和queue的模拟实现 4.1 stack的模拟实现 4.2 queue的模拟实现 5.结语 &#xf…

博客星球大冒险:用Spring Boot和JWT打造你的数字王国

揭秘如何在Spring Boot中无缝集成JWT&#xff0c;为你的应用打造一个高度可扩展且安全的认证系统。从添加依赖到创建JWT过滤器&#xff0c;再到实现令牌的有效性管理和刷新机制&#xff0c;每一步都精心设计&#xff0c;确保你的乐园能够迎接成千上万的游客&#xff01; 文章目…

Spark_SparkOnHive_海豚调度跑任务写入Hive表失败解决

背景 前段时间我在海豚上打包程序写hive出现了一个问题&#xff0c;spark程序向hive写数据时&#xff0c;报了如下bug&#xff0c; org.apache.spark.sql.AnalysisException: The format of the existing table test.xx is HiveFileFormat It doesnt match the specified for…

全球七家半导体工厂建设受阻:英特尔、三星、台积电等面临延期挑战

过去两年间&#xff0c;半导体行业经历了市场衰退、复苏慢于预期以及资金紧缩等问题&#xff0c;英特尔、台积电和三星等主要企业虽然继续推进扩张计划&#xff0c;但不断调整和放缓工厂建设的步伐与时间表&#xff0c;以更好地服务于长期发展目标。据统计&#xff0c;全球范围…

LLM主流开源代表模型

LLM主流开源大模型介绍 1 LLM主流大模型类别 随着ChatGPT迅速火爆&#xff0c;引发了大模型的时代变革&#xff0c;国内外各大公司也快速跟进生成式AI市场&#xff0c;近百款大模型发布及应用。 目前&#xff0c;市面上已经开源了各种类型的大语言模型&#xff0c;本章节我们…

【MATLAB源码-第220期】基于matlab的Massive-MIMO误码率随着接收天线变化仿真采用ZF均衡和QPSK调制。

操作环境&#xff1a; MATLAB 2022a 1、算法描述 1. 系统背景与目标 无线通信系统的发展极大地推动了现代通信技术的进步&#xff0c;从移动通信到无线局域网&#xff0c;甚至是物联网&#xff0c;均依赖于无线通信系统的高效和可靠性。在无线通信系统中&#xff0c;核心目…

【kafka】关于Kafka的入门介绍

为什么要使用kafka&#xff1f;kafka是什么东西&#xff1f; 案例场景 A服务向B服务发送消息&#xff0c;A服务传输数据很快&#xff0c;B服务处理数据很慢&#xff0c;这样B服务就会承受不住&#xff0c;怎么办&#xff1f;通过添加消息队列作为缓冲。kafka就是消息队列中的…

初识C++ · 模拟实现stack和Queue

目录 前言&#xff1a; 1 Stack 1.1 双端队列 2 Queue 前言&#xff1a; 经历了list三个自定义类型的洗礼&#xff0c;来个简单的放松放松&#xff0c;即栈和队列&#xff1a; 文档记录的&#xff0c;栈和队列是一种容器适配器&#xff0c;它们不属于stl&#xff0c;但是它…