4GB GPU的DeepSeek-Coder 1.3B模型,并且它已经被量化或优化过。以下是具体的步骤:
安装必要的依赖项:
pip install transformers torch grpcio googleapis-common-protos
创建一个新的ROS 2包:
cd ~/ros2_ws/src
ros2 pkg create --build-type ament_python llm_ros2_node --dependencies rclpy std_msgs grpcio googleapis-common-protos torch transformers
编辑setup.py文件以包含所需的依赖项:
from setuptools import setuppackage_name = 'llm_ros2_node'setup(name=package_name,version='0.0.0',packages=[package_name],data_files=[('share/ament_index/resource_index/packages', ['resource/' + package_name]),('share/' + package_name, ['package.xml']),],install_requires=['setuptools'],zip_safe=True,maintainer='your_name',maintainer_email='your_email@example.com',description='TODO: Package description',license='Apache License 2.0',tests_require=['pytest'],entry_points={'console_scripts': ['llm_node = llm_ros2_node.llm_node:main',],},
)
编写ROS 2节点代码:在这个节点中,我们将订阅一个话题并发送消息到本地的大语言模型,然后将结果发布到另一个话题。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from transformers import AutoModelForCausalLM, AutoTokenizer
import torchclass LLMNode(Node):def __init__(self):super().__init__('llm_node')self.subscription = self.create_subscription(String,'input_text',self.listener_callback,10)self.publisher_ = self.create_publisher(String, 'output_text', 10)# Load the DeepSeek-Coder model and tokenizerself.model_name_or_path = "path/to/deepseek-coder-1.3b-optimized"self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")self.tokenizer = AutoTokenizer.from_pretrained(self.model_name_or_path)self.model = AutoModelForCausalLM.from_pretrained(self.model_name_or_path).to(self.device)self.model.eval()def listener_callback(self, msg):self.get_logger().info(f'Received input text: {msg.data}')response = self.call_llm(msg.data)self.publisher_.publish(String(data=response))def call_llm(self, prompt):inputs = self.tokenizer.encode(prompt, return_tensors="pt").to(self.device)outputs = self.model.generate(inputs, max_length=50, num_return_sequences=1)reply = self.tokenizer.decode(outputs[0], skip_special_tokens=True)return replydef main(args=None):rclpy.init(args=args)llm_node = LLMNode()rclpy.spin(llm_node)llm_node.destroy_node()rclpy.shutdown()if __name__ == '__main__':main()