python中使用websocket调用、获取、保存大模型API

笔者最近在测试星火大模型的时候,他们是使用websocket 来建立对话,而且星火大模型开放的测试代码,质量上不咋地(20231030记录),还需要对websocket有一定的了解,才适合自己微调。

安装:

pip install websocket
pip install websocket-client

文章目录

  • 1 常见的websocket获取数据的方法
    • 1.1 第一种使用create_connection链接
    • 1.2 第二种:WebSocketApp + run_forever的方式
  • 2 针对`run_forever`内容保存
    • 2.1 通过定义global变量来保存内容
    • 2.2 通过`CallbackToIterator()`来返回


1 常见的websocket获取数据的方法

参考【python: websocket获取实时数据的几种常见链接方式】常见的两种。

1.1 第一种使用create_connection链接

需要pip install websocket-client (此方法不建议使用,链接不稳定,容易断,并且连接很耗时)

import time
from websocket import create_connectionurl = 'wss://i.cg.net/wi/ws'
while True:  # 一直链接,直到连接上就退出循环time.sleep(2)try:ws = create_connection(url)print(ws)breakexcept Exception as e:print('连接异常:', e)continue
while True:  # 连接上,退出第一个循环之后,此循环用于一直获取数据ws.send('{"event":"subscribe", "channel":"btc_usdt.ticker"}')response = ws.recv()print(response)

1.2 第二种:WebSocketApp + run_forever的方式

import websocketdef on_message(ws, message):  # 服务器有数据更新时,主动推送过来的数据print(message)def on_error(ws, error):  # 程序报错时,就会触发on_error事件print(error)def on_close(ws):print("Connection closed ……")def on_open(ws):  # 连接到服务器之后就会触发on_open事件,这里用于send数据req = '{"event":"subscribe", "channel":"btc_usdt.deep"}'print(req)ws.send(req)if __name__ == "__main__":websocket.enableTrace(True)ws = websocket.WebSocketApp("wss://i.cg.net/wi/ws",on_message=on_message,on_error=on_error,on_close=on_close)ws.on_open = on_openws.run_forever(ping_timeout=30)

第二种方式里面,run_forever其实是流式返回内容,大概可以看,流式输出的样例:


{"code":0,"sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":0}
### error: 'content'{"code":0,"fileRefer":"{\"43816997a7a44a299d0bfb7c360c5838\":[2,0,1]}","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":99}
### error: 'content'{"code":0,"content":"橘","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}橘{"code":0,"content":"子。","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":1}子。{"code":0,"content":"","sid":"5ebc0d6833b54909b4a51fbe75a5051a","status":2}
### closed ###

那么run_forever流式输出,正常的内容如何保存呢,进入下一章


2 针对run_forever内容保存

2.1 通过定义global变量来保存内容

参考【将Websocket数据保存到Pandas】
来看一下,文中的案例:

import jsonimport pandas as pd
import websocketdf = pd.DataFrame(columns=['foreignNotional', 'grossValue', 'homeNotional', 'price', 'side','size', 'symbol', 'tickDirection', 'timestamp', 'trdMatchID'])def on_message(ws, message):msg = json.loads(message)print(msg)global df# `ignore_index=True` has to be provided, otherwise you'll get# "Can only append a Series if ignore_index=True or if the Series has a name" errorsdf = df.append(msg, ignore_index=True)def on_error(ws, error):print(error)def on_close(ws):print("### closed ###")def on_open(ws):returnif __name__ == "__main__":ws = websocket.WebSocketApp("wss://www.bitmex.com/realtime?subscribe=trade:XBTUSD",on_open=on_open, on_message=on_message, on_error=on_error, on_close=on_close)ws.run_forever()

其中global df是在定义全局变量df,可以在函数中把流式数据拿出来,还是很不错的

2.2 通过CallbackToIterator()来返回

在开源项目中ChuanhuChatGPT,看到了使用的方式spark.py,个人还没有尝试,只是贴在这里。

贴一下这个函数:

class CallbackToIterator:def __init__(self):self.queue = deque()self.cond = Condition()self.finished = Falsedef callback(self, result):with self.cond:self.queue.append(result)self.cond.notify()  # Wake up the generator.def __iter__(self):return selfdef __next__(self):with self.cond:# Wait for a value to be added to the queue.while not self.queue and not self.finished:self.cond.wait()if not self.queue:raise StopIteration()return self.queue.popleft()def finish(self):with self.cond:self.finished = Trueself.cond.notify()  # Wake up the generator if it's waiting.# 主函数截取
def get_answer_stream_iter(self):wsParam = Ws_Param(self.appid, self.api_key, self.api_secret, self.spark_url)websocket.enableTrace(False)wsUrl = wsParam.create_url()ws = websocket.WebSocketApp(wsUrl,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close,on_open=self.on_open,)ws.appid = self.appidws.domain = self.domain# Initialize the CallbackToIteratorws.iterator = CallbackToIterator()# Start the WebSocket connection in a separate threadthread.start_new_thread(ws.run_forever, (), {"sslopt": {"cert_reqs": ssl.CERT_NONE}})# Iterate over the CallbackToIterator instanceanswer = ""total_tokens = 0for message in ws.iterator:data = json.loads(message)code = data["header"]["code"]if code != 0:ws.close()raise Exception(f"请求错误: {code}, {data}")else:choices = data["payload"]["choices"]status = choices["status"]content = choices["text"][0]["content"]if "usage" in data["payload"]:total_tokens = data["payload"]["usage"]["text"]["total_tokens"]answer += contentif status == 2:ws.iterator.finish()  # Finish the iterator when the status is 2ws.close()yield answer, total_tokens

截取了部分代码,这里先是定义ws.iterator = CallbackToIterator()然后通过迭代从for message in ws.iterator:拿出数据,看上去也是可行的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/124785.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

explain查询sql执行计划返回的字段的详细说明

当使用EXPLAIN命令查看SQL语句的执行计划时,会返回一张表格,其中包含了该SQL语句的执行计划。下面是每个字段的详细分析: id:执行计划的唯一标识符。如果查询中有子查询,每个子查询都会有一个唯一的ID。在执行计划中&a…

JavaScript的分支结构,循环结构,以及函数的定义使用,数组

JS分支结构 if语句 同java if if…else if…else…if //案例&#xff1a;判断输入密码长度 let pwd prompt("请输入密码");let L pwd.length;if(L<6){alert("密码小于六位");}else if(L>6 && L<16){alert("密码大于6位小于1…

聊聊统一认证中的四种安全认证协议(干货分享)

大家好&#xff0c;我是陈哈哈。单点登录SSO的出现是为了解决众多企业面临的痛点&#xff0c;场景即用户需要登录N个程序或系统&#xff0c;每个程序与系统都有不同的用户名和密码。在企业发展初期&#xff0c;可能仅仅有几个程序时&#xff0c;管理账户和密码不是一件难事。但…

一文彻底理解python浅拷贝和深拷贝

目录 一、必备知识二、基本概念三、列表&#xff0c;元组&#xff0c;集合&#xff0c;字符串&#xff0c;字典浅拷贝3.1 列表3.2 元组3.3 集合3.4 字符串3.5 字典3.6 特别注意可视化展示浅拷贝总结 四、列表&#xff0c;元组&#xff0c;集合&#xff0c;字符串&#xff0c;字…

Python爬虫实战(六)——使用代理IP批量下载高清小姐姐图片(附上完整源码)

文章目录 一、爬取目标二、实现效果三、准备工作四、代理IP4.1 代理IP是什么&#xff1f;4.2 代理IP的好处&#xff1f;4.3 获取代理IP4.4 Python获取代理IP 五、代理实战5.1 导入模块5.2 设置翻页5.3 获取图片链接5.4 下载图片5.5 调用主函数5.6 完整源码5.7 免费代理不够用怎…

【UE 模型描边】UE5中给模型描边 数字孪生 智慧城市领域 提供资源下载

目录 0 引言1 Soft Outlines1.1 虚幻商城1.2 使用步骤 2 Auto Mesh Outlines2.1 虚幻商城2.2 使用步骤 3 Survivor Vision3.1 虚幻商城3.2 使用步骤 结尾 &#x1f64b;‍♂️ 作者&#xff1a;海码007&#x1f4dc; 专栏&#xff1a;UE虚幻引擎专栏&#x1f4a5; 标题&#xf…

多线程---JUC

文章目录 什么是JUC&#xff1f;Callable接口ReentrantLockReentrantLock VS synchronized 原子类线程池信号量SemaphoreCountDownLatch 什么是JUC&#xff1f; JUC是&#xff1a;java.util.concurrent这个包名的缩写。它里面包含了与并发相关&#xff0c;即与多线程相关的很多…

【NeurIPS 2023】多模态联合视频生成大模型CoDi

Diffusion Models视频生成-博客汇总 前言:目前视频生成的大部分工作都是只能生成无声音的视频,距离真正可用的视频还有不小的差距。CoDi提出了一种并行多模态生成的大模型,可以同时生成带有音频的视频,距离真正的视频生成更近了一步。相信在不远的将来,可以AI生成的模型可…

C++ 指针

*放在哪里&#xff1f; 如果声明一个变量&#xff1a;int* b; 如果声明多个变量&#xff1a;int a,*b,*c; nullptr c11中NULL的变形&#xff0c;是一个特殊值&#xff0c;可以赋给任意类型的指针&#xff0c;代表该指针指向为空。 this指针 this指针不是一个const Test*(…

关于ABB 机器人多任务的建立

关于ABB 机器人多任务的建立.需要实时监控某一区域&#xff0c;或者某一信号&#xff0c;或者计件到达某一数量机器人自动停止报警&#xff0c;显示到示教器上&#xff0c;多任务可以实现&#xff0c;类似发那科机器人后台逻辑指令 当软件选项漏选或者少选可以选择修改选项&…

新恶意软件使用 MSIX 软件包来感染 Windows

人们发现&#xff0c;一种新的网络攻击活动正在使用 MSIX&#xff08;一种 Windows 应用程序打包格式&#xff09;来感染 Windows PC&#xff0c;并通过将隐秘的恶意软件加载程序放入受害者的 PC 中来逃避检测。 Elastic Security Labs 的研究人员发现&#xff0c;开发人员通常…

pycharm使用ssh连接远程jupyter

1. 安装jupyter pip install jupyter2. 生成jupyter_notebook_config.py文件 jupyter notebook --generate-config3. 设置命令参数 jupyter notebook --no-browser --allow-root --port 8900配置Jupyter服务器 将上面的代码复制到命令行实参中&#xff1a;

【3D 图像分割】基于 Pytorch 的 VNet 3D 图像分割7(数据预处理)

在上一节&#xff1a;【3D 图像分割】基于 Pytorch 的 VNet 3D 图像分割6&#xff08;数据预处理&#xff09; 中&#xff0c;我们已经得到了与mhd图像同seriesUID名称的mask nrrd数据文件了&#xff0c;可以说是一一对应了。 并且&#xff0c;mask的文件&#xff0c;还根据结…

Qt使用QWebEngineView一些记录

1.关闭软件警告&#xff1a; Release of profile requested but WebEnginePage still not deleted. Expect troubles! 原因&#xff0c;系统退出关闭view&#xff0c;没有释放page。 解决办法&#xff1a;手动释放page 顺便把view也释放了。 Widget::~Widget() {updateIni…

设计模式_状态模式

状态模式 介绍 设计模式定义案例问题堆积在哪里解决办法状态模式一个对象 状态可以发生改变 不同的状态又有不同的行为逻辑游戏角色 加载不同的技能 每个技能有不同的&#xff1a;攻击逻辑 攻击范围 动作等等1 状态很多 2 每个状态有自己的属性和逻辑每种状态单独写一个类 角色…

力扣第509题 斐波那契数 新手动态规划(推荐参考) c++

题目 509. 斐波那契数 简单 相关标签 递归 记忆化搜索 数学 动态规划 斐波那契数 &#xff08;通常用 F(n) 表示&#xff09;形成的序列称为 斐波那契数列 。该数列由 0 和 1 开始&#xff0c;后面的每一项数字都是前面两项数字的和。也就是&#xff1a; F(0) 0&a…

git本地搭建服务器[Vmware虚拟机访问window的git服务器]

先按照https://zhuanlan.zhihu.com/p/494988089说明下载好Gitblit然后复制到tomcat的webapps目录下,如下: 双击"startup.bat"启动tomcat: 然后访问"http://127.0.0.1:8080/gitblit/"即可看到git的界面: 说明git服务器已经能够成功运行了! Vmware虚拟机…

k8s基本操作命令

目录 1、//查看资源对象简写 2、//查看集群信息 3、//配置kubectl自动补全 4、//node节点查看日志 5、//查看 master 节点状态 6、//查看命令空间 7、//查看default命名空间的所有资源 8、//创建命名空间app 9、//删除命名空间app 10、//在命名空间kube-public 创建…

Java电商平台 - API 接口设计之 token、timestamp、sign 具体架构与实现|电商API接口接入

一&#xff1a;token 简介 Token&#xff1a;访问令牌access token, 用于接口中, 用于标识接口调用者的身份、凭证&#xff0c;减少用户名和密码的传输次数。一般情况下客户端(接口调用方)需要先向服务器端申请一个接口调用的账号&#xff0c;服务器会给出一个appId和一个key, …

Android jetpack : Navigation 导航 路由 、 单个Activity嵌套多个Fragment的UI架构方式

Android Navigation 如何动态的更换StartDestination &&保存Fragment状态 Navigation(一)基础入门 google 官网 &#xff1a; Navigation 导航 路由 讨论了两年的 Navigation 保存 Fragment 状态问题居然被关闭了 Navigation是一种导航的概念&#xff0c;即把Activ…