大模型API调用初尝试二
- 科大讯飞—星火认知大模型
- 单轮会话调用
- 多轮会话调用
- 百度—千帆大模型
- 获取access_token
- 单轮会话
- 多轮会话
科大讯飞—星火认知大模型
星火认知大模型是科大讯飞开发的,直接使用可以点击星火认知大模型,要调用API的话在讯飞开发平台注册,之后点击进入控制台。
注册后完成实名认证,可以领取各种新人礼包,白嫖一些tokens! 之后就去找官方API调用文档,星火大模型的API是通过websocket建立连接然后实现调用的,和通义千问通过HTTP协议调用、智谱AI直接用封装好的python包调用不同。
单轮会话调用
单论会话就是用户提一个问题,大模型回答,这一轮就结束了。下一轮会话用户向模型发送的信息中不包括上一轮的信息。
官方给的python调用实例,修改一下on_close()函数(需要三个参数),以及将服务器端和用户端代码分离开:SparkAPI.py—服务器端口的代码👇
# coding: utf-8
import _thread as thread
import os
import time
import base64import base64
import datetime
import hashlib
import hmac
import json
from urllib.parse import urlparse
import ssl
from datetime import datetime
from time import mktime
from urllib.parse import urlencode
from wsgiref.handlers import format_date_timeimport websocket
import openpyxl
from concurrent.futures import ThreadPoolExecutor, as_completed
import osclass Ws_Param(object):# 初始化def __init__(self, APPID, APIKey, APISecret, gpt_url):self.APPID = APPIDself.APIKey = APIKeyself.APISecret = APISecretself.host = urlparse(gpt_url).netlocself.path = urlparse(gpt_url).pathself.gpt_url = gpt_url# 生成urldef create_url(self):# 生成RFC1123格式的时间戳now = datetime.now()date = format_date_time(mktime(now.timetuple()))# 拼接字符串signature_origin = "host: " + self.host + "\n"signature_origin += "date: " + date + "\n"signature_origin += "GET " + self.path + " HTTP/1.1"# 进行hmac-sha256进行加密signature_sha = hmac.new(self.APISecret.encode('utf-8'), signature_origin.encode('utf-8'),digestmod=hashlib.sha256).digest()signature_sha_base64 = base64.b64encode(signature_sha).decode(encoding='utf-8')authorization_origin = f'api_key="{self.APIKey}", algorithm="hmac-sha256", headers="host date request-line", signature="{signature_sha_base64}"'authorization = base64.b64encode(authorization_origin.encode('utf-8')).decode(encoding='utf-8')# 将请求的鉴权参数组合为字典v = {"authorization": authorization,"date": date,"host": self.host}# 拼接鉴权参数,生成urlurl = self.gpt_url + '?' + urlencode(v)# 此处打印出建立连接时候的url,参考本demo的时候可取消上方打印的注释,比对相同参数时生成的url与自己代码生成的url是否一致return url# 收到websocket错误的处理
def on_error(ws, error):print("### error:", error)# 收到websocket关闭的处理
def on_close(ws, close_status_code, close_msg): print("### closed ###")# 收到websocket连接建立的处理
def on_open(ws):thread.start_new_thread(run, (ws,))def run(ws, *args):data = json.dumps(gen_params(appid=ws.appid, messages=ws.messages, domain=ws.domain))ws.send(data)# 收到websocket消息的处理
def on_message(ws, message):# print(message)data = json.loads(message)code = data['header']['code']if code != 0:print(f'请求错误: {code}, {data}')ws.close()else:choices = data["payload"]["choices"]status = choices["status"]content = choices["text"][0]["content"]# 保存一轮对话的回答global answeranswer += contentprint(content,end='')if status == 2:print("\n#### 关闭会话")ws.close()def gen_params(appid, messages, domain):"""通过appid和用户的提问来生成请参数"""data = {"header": {"app_id": appid,"uid": "1234", # "patch_id": [] #接入微调模型,对应服务发布后的resourceid },"parameter": {"chat": {"domain": domain,"temperature": 0.5,"max_tokens": 4096,"auditing": "default",}},"payload": {"message": {"text": messages}}}return datadef main(appid, api_secret, api_key, gpt_url, domain, messages):wsParam = Ws_Param(appid, api_key, api_secret, gpt_url)websocket.enableTrace(False)wsUrl = wsParam.create_url()ws = websocket.WebSocketApp(wsUrl, on_message=on_message, on_error=on_error, on_close=on_close, on_open=on_open)ws.appid = appidws.messages = messagesws.domain = domainws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
客户端去调用API的代码:spark_wenda.py👇
import sparkAPI as LLM_APIif __name__ == "__main__":# 准备用户API信息 以及访问的模型urlappid="xxxx"api_secret="xxxx"api_key="xxxxx"gpt_url="wss://spark-api.xf-yun.com/v3.5/chat"# Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat" # v3.0环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址domain="generalv3.5"# domain = "generalv3" # v3.0版本# domain = "generalv2" # v2.0版本# domain = "general" # v2.0版本# 用户的问题query = input("\nUser:")question =[{'role':'user', 'content':query}]LLM_API.answer =""print("\nAssistant:",end = "")# 调用api得到输出LLM_API.main(appid, api_secret, api_key, gpt_url, domain, question)
运行结果如下,获取到了大模型的回答,并只输出了其content:
多轮会话调用
多轮会话需要注意前面几轮对话用户user的问题和大模型assistant的回答都要记录下来,通过websocket访问大模型时,将之前的问答信息一同发送,因此需要记录下每轮大模型的回答,即通过global answer记录每轮回答,并在每轮结束后将其附加在messages中。具体的Spark_wenda.py内容如下👇
import sparkAPI as LLM_APImessages =[]# length = 0
# 并入一条新的message,可以是用户的问题,也可以是大模型的回答
def getText(role,content):jsoncon = {}jsoncon["role"] = rolejsoncon["content"] = contentmessages.append(jsoncon)return messages# 获取messages的长度,向大模型发送的问题token数量是有限制的
def getlength(messages):length = 0for content in messages:temp = content["content"]leng = len(temp)length += lengreturn length
# 检查token数量
def checklen(messages):while (getlength(messages) > 8000):del messages[0]return messages if __name__ == "__main__":# 准备用户API信息 以及访问的模型urlappid="xxxxxx"api_secret="xxxxxxx"api_key="xxxxxxxx"gpt_url="wss://spark-api.xf-yun.com/v3.5/chat"# Spark_url = "ws://spark-api.xf-yun.com/v3.1/chat" # v3.0环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v2.1/chat" # v2.0环境的地址# Spark_url = "ws://spark-api.xf-yun.com/v1.1/chat" # v1.5环境的地址domain="generalv3.5"# domain = "generalv3" # v3.0版本# domain = "generalv2" # v2.0版本# domain = "general" # v2.0版本messages.clearwhile True: # 循环进行会话query = input("\nUser:")# 将用户新的问题加入历史问答messages中question = checklen(getText("user",query))LLM_API.answer =""print("\nAssistant:",end = "")LLM_API.main(appid, api_secret, api_key, gpt_url, domain, messages)# 将星火大模型的输出附加到历史问答messages中getText("assistant",LLM_API.answer)
运行观察大模型确实能够对每次的问题进行回答,且能够有效利用之前问答的信息。
messages记录的是多轮问答的内容以及最新一次会话用户的问题,messages其实是等于question的。在vscode中调试代码,观察messages和question中的信息,确实是有前面两轮问答的问题+答案,以及最新一轮用户的问题:
百度—千帆大模型
首先注册登录百度千帆大模型平台;然后在控制台添加应用,后续会用到API_KEY,SECRET_KEY;然后在文档中找到官方API调用文档,其中包括了不同模型的调用代码。
这里需要注意,后续要调用API大模型,需要在在线服务中开通服务,否则会报错{"error_code":17,"error_msg":"Open api daily request limit reached"}
。
千帆大模型平台中的大模型api是通过http的方式去请求-响应的,步骤包括1、通过api-key和secret-key获取access-token;2、利用access-token向大模型发起请求;3、获取大模型的响应。
获取access_token
import requests
import json
# 填入自己申请的api应用的信息
API_KEY = "xxxxxxx"
SECRET_KEY = "xxxxxxx"# 参考网址👉 https://cloud.baidu.com/doc/WENXINWORKSHOP/s/Ilkkrb0i5
# access_token
def get_access_token():"""使用 AK,SK 生成鉴权签名(Access Token):return: access_token,或是None(如果错误)"""url = "https://aip.baidubce.com/oauth/2.0/token"params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}return str(requests.post(url, params=params).json().get("access_token"))
单轮会话
# 单轮会话
def Single_Round_Session():# 访问的模型的url url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()# 用户输入问题query = input("\nUser:")payload = json.dumps({"disable_search": False,"enable_citation": False,# 添加问题 发送请求"messages": [{"role": "user","content": query,}]})headers = {'Content-Type': 'application/json'}# 获取大模型的响应response = requests.request("POST", url, headers=headers, data=payload)# 获取响应的内容print(response.text)
if __name__ == '__main__':Single_Round_Session()
例子:
多轮会话
多轮会话需要注意保存之前对话的问答,代码如下:
# 多轮会话
def Multi_Round_Session():# 访问的模型的url url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()# 用户的问题messages = []while True :# 构造访问的问题question = {"role": "user","content":input("\nUser:")}messages.append(question)payload = json.dumps({"disable_search": False,"enable_citation": False,# 添加问题 发送请求"messages": messages})headers = {'Content-Type': 'application/json'}# 获取大模型的响应response = requests.request("POST", url, headers=headers, data=payload)# 获取响应的内容print("\n",response.json()["result"] )# 将响应内容添加到会话历史中answer = {"role": "assistant","content":response.json()["result"]}messages.append(answer)if __name__ == '__main__':Multi_Round_Session()
结果: