现在大模型领域非常魔幻一件事,是调用友商开源的大模型构建自己的数据集,大家相互调用, 数据同源导致同样的问题回答内容也差不多,也难怪大家会质疑某些大模型是套壳gpt了,看来只有能积累原始数据的公司才能最终活下来。
这里就演示下如何用多进程调用商用大模型构建sft数据集
# -*- coding: utf-8 -*-
# @Time : 2024/6/19 上午10:33
# @Author : yblir
# @File : xiaohuangya_deepseek_json.py
# explain :
# =======================================================
import json
import re
import sysfrom openai import OpenAI
from loguru import logger
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
# from queue import Queue
# from multiprocessing import Queue, cpu_count, shared_memory, Processwork_nums = 100
# img_queue = Queue(1000)client = OpenAI(api_key="这里填入自己购买的key", base_url="https://api.deepseek.com/")def find_all_index(str_, pattern):patt = re.compile(pattern, re.IGNORECASE)pos = [item.start() for item in patt.finditer(str_)]return posdef run(question_, index):new_line_ = {}history_ = []msg1 = '结合TABLE信息,分析TABLE表结构,question的查询目标,查询条件和查询逻辑这4项内容。'messages = [{"role": "system", "content": "你是个数据库专家,帮忙解答用户提出的问题"},{"role": "user", "content": f"{question_}, {msg1}"},]response = client.chat.completions.create(model="deepseek-coder",messages=messages)result = response.choices[0].message.contenthistory_.append([f"{question_}, {msg1}", result])# right_sql = deepseek_sql_create(question)# print(result)msg2 = "根据你的流程分析,针对给出的TABLE,写出能表达当前question意思的sql语句"messages.append({'role': 'assistant', 'content': result})messages.append({'role': 'user', 'content': msg2})response = client.chat.completions.create(model="deepseek-coder",messages=messages)result = response.choices[0].message.contenthistory_.append([msg2, result])# print('--------------------------------------------')# print(result)msg3 = '分析下你写的sql语句是否正确,如果你认为有不合理的地方,指出并改正。最后,给出你认为正确的sql语句'messages.append({'role': 'assistant', 'content': result})messages.append({'role' : 'user','content': msg3})response = client.chat.completions.create(model="deepseek-coder",messages=messages)result = response.choices[0].message.content# print('--------------------------------------------')# print(result)new_line_['instruction'] = msg3new_line_['input'] = ''new_line_['output'] = resultnew_line_['history'] = history_logger.info(f'已完成数量:{index}')return new_line_if __name__ == '__main__':with open('tuning_sample.json', 'r', encoding='utf-8') as f:data = json.load(f)print(len(data))j = 6with ProcessPoolExecutor(max_workers=work_nums) as p:futures = [p.submit(run, line['instruction'], i + 1) for i, line in enumerate(data) if i + 1 > 1000]# 获取结果new_data_list = []k = 0for future in as_completed(futures):result = future.result()new_data_list.append(result)k += 1# 每抽取200条数据保存一次if k % 200 == 0:json_data = json.dumps(new_data_list, indent=4, ensure_ascii=False)with open(f'duck_sql_{j}.json', 'w', encoding='utf-8') as f:f.write(json_data)j += 1new_data_list = []json_data = json.dumps(new_data_list, indent=4, ensure_ascii=False)with open(f'duck_sql_extra.json', 'w', encoding='utf-8') as f:f.write(json_data)logger.success('json 写入成功')