python 如何将dataframe数据批量插入到clickhouse

1. 背景-问题描述

最近做一个资产数据清洗工作,结束后需要将数据批量插入到clickhouse;查找并尝试了好几种批量插入的方式均已失败告终;现就查到的dataframe类型数据批量插入到clickhouse记录和汇总于此。

2. 配置信息

python 3.12
clickhouse_driver 0.2.9
PyYAML 6.0.2
pandas 2.2.3
numpy 2.1.2

3. 单条数据插入

from clickhouse_driver import Client
client = Client(host='host',port='port',user='user',password='password',settings={'use_numpy': True}
)
# 创建表格
client.execute("CREATE TABLE IF NOT EXISTS db.test_table (id Int32, name String) ENGINE = MergeTree")
sql = "INSERT INTO db.test_table (id, name) VALUES (1, 'test_1')"
client.execute(sql)

db.test_table: db表示数据库,test_table表示表名称

4. 多条数据插入

# 多条数据插入
sql = "INSERT INTO db.test_table (id, name) VALUES (1, 'test_1'), (2, 'test_2')"
client.execute(sql)

5. dataframe数据全部插入

5.1 将需要插入的数据写入到sql中

# 取出dataframe列
import pandas as pd
import numpy as np
from clickhouse_driver import Clientclient = Client(host='host',port='port',user='user',password='password',settings={'use_numpy': True}
)# 创建一个示例DataFrame
data = {'id': [1, 2, 3],'name': ['name_1', 'name_2', 'name_3']
}
df = pd.DataFrame(data)
cols = df.columns.tolist()
sql0 = f"INSERT INTO db.test_table ({','.join(map(str, cols))}) VALUES"
# 将df下的values转化为tuple格式: [(1, 'name_1'), (2, 'name_2'),(3, 'name_3')]
tuples = [tuple(x) for x in df.to_numpy()]
sql = f"{sql0}" + ', '.join(map(str, tuples))
client.execute(sql)

5.2 使用insert_dataframe

cols = df.columns.tolist()
query = f"INSERT INTO {table} ({','.join(map(str, cols))}) VALUES "
client.insert_dataframe(query, df)

6. dataframe数据分批插入

def get_insert_sql(df: pd.DataFrame)->str:cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "tuples = [tuple(x) for x in df.to_numpy()]sql = f"{query}" + ', '.join(map(str, tuples))return sql# 定义每批次插入的行数batch_size = 1000# 分批次插入数据for start in range(0, len(df), batch_size):end = min(start + batch_size, len(df))sql = get_insert_sql(df[start:end])client.insert(sql)

经实验:下列方式会失败

tuples = [(1, 'test1'), (2, 'test2')]
client.execute('INSERT INTO db.test (id, name) VALUES', tuples, settings={'max_insert_block_size': 10},columnar=True)

在这里插入图片描述
正确的使用方式是:

cols = df.columns.tolist()
query = f"INSERT INTO {table} ({','.join(map(str, cols))}) VALUES "
# 分批插入,每次插入10条记录
client.insert_dataframe(query, df,settings={'max_insert_block_size': 10})

7. 完整代码

from clickhouse_driver import Client
import pandas as pd
import numpy as np
from datetime import datetime, timedeltaclass Error(Exception):pass
class Connection(Error):pass
class NotExist(Error):pass# ck client
class ClickHose:__slots__ = ("__ck",)__ck: Clientdef __init__(self):try:self.__ck = Client(host='ck_host',port=12345,user='ck_user',password='ck_password',settings={'use_numpy': True})except Exception as e:raise Connection(e)def get_conn(self) -> Client:return self.__ckdef query(self, sql: str):return self.__ck.execute(sql)def insert(self, sql: str):return self.__ck.execute(sql)def insert_dataframe(self, sql: str, df: pd.DataFrame, external_tables=None, query_id=None,settings=None):return self.__ck.insert_dataframe(sql, df, external_tables, query_id, settings)def close(self):self.__ck.disconnect()# ck表
class Test1CK:__slots__ = ("__ck","__table")__ck: ClickHose__table: strdef __init__(self, ck: ClickHose):self.__ck = ckself.__table = "db.test1"if not self.check_table():raise NotExist(f"not exist table: {self.__table}")def check_table(self)->bool:try:table_exists = self.__ck.query(f"EXISTS TABLE {self.__table}")[0][0]return table_existsexcept Exception as e:raise Error(e)def get_bill_by_cloud(self, cloud: str, start_time: int, end_time: int)->list:query = f"select * from {self.__table} where (cloud = '{cloud}') AND (time > {start_time}) AND (time < {end_time})"try:data = self.__ck.query(query)return dataexcept Exception as e:# logger.error(f"get_bill from table: {self.__table} error: {e}")def insert_batch(self, month: str, df: pd.DataFrame):if df is None or len(df) < 1:returntry:# 定义每批次插入的行数batch_size = 1000query = self.get_insert_sql(df)# 分批次插入数据for start in range(0, len(df), batch_size):end = min(start + batch_size, len(df))tuples = [tuple(x) for x in df[start:end].to_numpy()]sql = f"{query}" + ', '.join(map(str, tuples))self.__ck.insert(sql)#logger.info(f"insert month: {month} data success")except Exception as e:#logger.error(f"insert_batch from table: {self.__table}, sql: {sql} error: {e}")def insert_dataframe(self, month: str, df: pd.DataFrame):if df is None or len(df) < 1:returntry:# 定义每批次插入的行数batch_size = 1000cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "self.__ck.insert_dataframe(query, df, settings={'max_insert_block_size': batch_size})#logger.info(f"insert month: {month} data success")except Exception as e:#logger.error(f"insert_batch from table: {self.__table}, sql: {query} error: {e}")def get_insert_sql(self, df: pd.DataFrame) -> str:cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "#logger.debug(f"get_insert_sql: {query}")return queryif __name__ == '__main__':ck_db = ClickHose()try:test1_table = Test1CK(ck_db)# 创建一个示例DataFramedata = {'id': [1, 2, 3],'name': ['name_1', 'name_2', 'name_3']}df = pd.DataFrame(data)test1_table.insert_batch("3", df)except Exception as e:#logger.error(f"error {e}")

参考资料:
python如何将一个dataframe快速写入clickhouse
python批量插入clickhouse

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/59666.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

利用 TensorFlow 与 Docker 构建深度学习模型训练与部署流水线

在深度学习领域&#xff0c;构建、训练和部署模型是一个复杂且耗时的过程。本文将介绍如何利用 TensorFlow 构建深度学习模型&#xff0c;并通过 Docker 容器化技术实现模型的训练与部署&#xff0c;从而简化整个流水线&#xff0c;提高开发效率。我们将通过实战代码&#xff0…

DevOps业务价值流:架构设计最佳实践

系统设计阶段作为需求与研发之间的桥梁&#xff0c;在需求设计阶段的原型设计评审环节&#xff0c;尽管项目组人员可能未完全到齐&#xff0c;但关键角色必须到位&#xff0c;包括技术组长和测试组长。这一安排旨在同步推进两项核心任务&#xff1a;一是完成系统的架构设计&…

强化学习之课程学习法

作者名片 &#x1f935;‍♂️ 个人主页&#xff1a;抱抱宝 &#x1f604;微信公众号&#xff1a;宝宝数模AI ✍&#x1f3fb;作者简介&#xff1a;阿里云专家博主 | 持续分享机器学习、数学建模、数据分析、AI人工智能领域相关知识&#xff0c;和大家一起进步&#xff01; &am…

计算机网络:网络层 —— 网络地址转换 NAT

文章目录 网络地址转换 NAT 概述最基本的 NAT 方法NAT 转换表的作用 网络地址与端口号转换 NAPTNAT 和 NAPT 的缺陷 网络地址转换 NAT 概述 尽管因特网采用了无分类编址方法来减缓 IPv4 地址空间耗尽的速度&#xff0c;但由于因特网用户数量的急剧增长&#xff0c;特别是大量小…

leetcode 633. 平方数之和 中等

给定一个非负整数 c &#xff0c;你要判断是否存在两个整数 a 和 b&#xff0c;使得 a*ab*bc 。 示例 1&#xff1a; 输入&#xff1a;c 5 输出&#xff1a;true 解释&#xff1a;1 * 1 2 * 2 5示例 2&#xff1a; 输入&#xff1a;c 3 输出&#xff1a;false提示&#…

一、文心一言问答系统为什么要分对话,是否回学习上下文?二、文心一言是知识检索还是大模型检索?三、文心一言的词向量、词语种类及多头数量

目录 一、文心一言问答系统为什么要分对话,是否回学习上下文? 二、文心一言是知识检索还是大模型检索? 三、文心一言的词向量、词语种类及多头数量 一、文心一言问答系统为什么要分对话,是否回学习上下文? 文心一言问答系统分对话的原因在于其设计初衷就是提供一个交互…

Qt第三课 ----------容器类控件

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

nacos注册中心简介

当前现状与需求 随着微服务架构的普及&#xff0c;服务之间的通信和管理变得越来越复杂。传统的服务发现和配置管理方式已经无法满足现代应用的需求。在这种背景下&#xff0c;Nacos 应运而生&#xff0c;成为了一个强大的服务发现和配置管理工具。 1. 当前现状 服务发现&#…

Oracle Sql查询和性能优化(持续更新)

目录 索引优化 查询重写 EXISTS vs. IN 避免全表扫描 合理使用UNION操作符 优化子查询 执行计划分析 表设计优化 规范化与反规范化 分区表 存储参数调整 SGA配置 共享池 数据缓冲区 SGA自动管理 PGA配置 RAID配置 日志文件优化 性能诊断工具 AWR ASH 定期…

Java的基本语法

Java的基本语法是编写Java程序时必须遵循的规则和结构。以下是一些Java基本语法的关键点&#xff1a; 命名规范 在Java编程中&#xff0c;遵循一定的命名规范是非常重要的&#xff0c;它有助于代码的可读性和维护性&#xff0c;可参考&#xff08;https://blog.csdn.net/hwh2…

运放进阶篇-多种波形可调信号发生器-产生方波-三角波-正弦波

引言&#xff1a;前几节我们已经说到硬件相关基础的电路&#xff0c;以及对于运放也讲到了初步的理解&#xff0c;特别是比较器的部分&#xff0c;但是放大器的部分我们对此并没有阐述&#xff0c;在这里通过实例进行理论结合实践的学习。而运放真正的核心&#xff0c;其实就是…

蓝桥杯c++算法学习【1】之枚举与模拟(卡片、回文日期、赢球票:::非常典型的比刷例题!!!)

别忘了请点个赞收藏关注支持一下博主喵&#xff01;&#xff01;&#xff01; 关注博主&#xff0c;更多蓝桥杯nice题目静待更新:) 枚举与模拟 一、卡片&#xff1a; 【问题描述】 …

使用Netty实现一个简单的聊天服务器

✅作者简介&#xff1a;热爱Java后端开发的一名学习者&#xff0c;大家可以跟我一起讨论各种问题喔。 &#x1f34e;个人主页&#xff1a;Hhzzy99 &#x1f34a;个人信条&#xff1a;坚持就是胜利&#xff01; &#x1f49e;当前专栏&#xff1a;Netty &#x1f96d;本文内容&a…

用discuz开发一款网站自动秒收录源码功能更强悍更安全

用discuz开发一款网站自动秒收录源码功能更强悍更安全 在当今数字化时代&#xff0c;网站的曝光率和访问量是衡量其成功与否的重要指标。为了提升网站的曝光率&#xff0c;自动秒收录技术应运而生。而Discuz平台作为知名的社区管理系统&#xff0c;为开发者提供了丰富的功能和…

绿色能源发展关键:优化风电运维体系

根据QYResearch调研团队最新发布的《全球风电运维市场报告2023-2029》显示&#xff0c;预计到2029年&#xff0c;全球风电运维市场的规模将攀升至307.8亿美元&#xff0c;并且在接下来的几年里&#xff0c;其年复合增长率&#xff08;CAGR&#xff09;将达到12.5%。 上述图表及…

ER图的规范画法

ER图相较其他结构化分析方法和面向对象分析方法的图结构来说&#xff0c;还是相对容易不少&#xff0c;不过各位也要注重细节的严谨性。 entity&#xff0c;实体&#xff1b;relationship&#xff0c;关系&#xff1b;相信各位的英语水平不难理解ER图的功能&#xff0c;椭圆代表…

linux 下 signal() 函数的用法,信号类型在哪里定义的?

--------------------------------------------------- author: hjjdebug date: 2024年 11月 07日 星期四 14:47:33 CST description: linux 下 signal() 函数的用法 --------------------------------------------------- signal 是linux 下最基础的进程通讯机制…

【静态页面】尚品汇 1、设计稿分析及资源准备

目录 1. 准备工作2. 理解设计3. 规划项目结构 1. 准备工作 安装必要的工具&#xff1a;确保你的开发环境已经准备好&#xff0c;包括文本编辑器&#xff08;如 VSCode&#xff09;、浏览器等。获取设计文件&#xff1a;获取UI设计稿或者设计文件链接&#xff0c;并确保可以访问…

基于MPPT最大功率跟踪的光伏发电蓄电池控制系统simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 5.完整工程文件 1.课题概述 基于MPPT最大功率跟踪的光伏发电蓄电池控制系统simulink建模与仿真。本系统包括PV模块&#xff0c;电池模块&#xff0c;电池控制器模块&#xff0c;MPPT模块&#xff0c;PWM模…

nginx代理 proxy_pass

一、location 包含 location /api/ {proxy_pass http://127.0.0.1:85;} 二、location 不包含 location /api/ {proxy_pass http://127.0.0.1:85/;} 三、locaion 包含 location /api {proxy_pass http://127.0.0.1:85;}四、location 包含 location /api {proxy_pass http://127.…