1. 背景-问题描述
最近做一个资产数据清洗工作,结束后需要将数据批量插入到clickhouse;查找并尝试了好几种批量插入的方式均已失败告终;现就查到的dataframe类型数据批量插入到clickhouse记录和汇总于此。
2. 配置信息
python 3.12
clickhouse_driver 0.2.9
PyYAML 6.0.2
pandas 2.2.3
numpy 2.1.2
3. 单条数据插入
from clickhouse_driver import Client
client = Client(host='host',port='port',user='user',password='password',settings={'use_numpy': True}
)
# 创建表格
client.execute("CREATE TABLE IF NOT EXISTS db.test_table (id Int32, name String) ENGINE = MergeTree")
sql = "INSERT INTO db.test_table (id, name) VALUES (1, 'test_1')"
client.execute(sql)
db.test_table: db表示数据库,test_table表示表名称
4. 多条数据插入
# 多条数据插入
sql = "INSERT INTO db.test_table (id, name) VALUES (1, 'test_1'), (2, 'test_2')"
client.execute(sql)
5. dataframe数据全部插入
5.1 将需要插入的数据写入到sql中
# 取出dataframe列
import pandas as pd
import numpy as np
from clickhouse_driver import Clientclient = Client(host='host',port='port',user='user',password='password',settings={'use_numpy': True}
)# 创建一个示例DataFrame
data = {'id': [1, 2, 3],'name': ['name_1', 'name_2', 'name_3']
}
df = pd.DataFrame(data)
cols = df.columns.tolist()
sql0 = f"INSERT INTO db.test_table ({','.join(map(str, cols))}) VALUES"
# 将df下的values转化为tuple格式: [(1, 'name_1'), (2, 'name_2'),(3, 'name_3')]
tuples = [tuple(x) for x in df.to_numpy()]
sql = f"{sql0}" + ', '.join(map(str, tuples))
client.execute(sql)
5.2 使用insert_dataframe
cols = df.columns.tolist()
query = f"INSERT INTO {table} ({','.join(map(str, cols))}) VALUES "
client.insert_dataframe(query, df)
6. dataframe数据分批插入
def get_insert_sql(df: pd.DataFrame)->str:cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "tuples = [tuple(x) for x in df.to_numpy()]sql = f"{query}" + ', '.join(map(str, tuples))return sql# 定义每批次插入的行数batch_size = 1000# 分批次插入数据for start in range(0, len(df), batch_size):end = min(start + batch_size, len(df))sql = get_insert_sql(df[start:end])client.insert(sql)
经实验:下列方式会失败
tuples = [(1, 'test1'), (2, 'test2')]
client.execute('INSERT INTO db.test (id, name) VALUES', tuples, settings={'max_insert_block_size': 10},columnar=True)
正确的使用方式是:
cols = df.columns.tolist()
query = f"INSERT INTO {table} ({','.join(map(str, cols))}) VALUES "
# 分批插入,每次插入10条记录
client.insert_dataframe(query, df,settings={'max_insert_block_size': 10})
7. 完整代码
from clickhouse_driver import Client
import pandas as pd
import numpy as np
from datetime import datetime, timedeltaclass Error(Exception):pass
class Connection(Error):pass
class NotExist(Error):pass# ck client
class ClickHose:__slots__ = ("__ck",)__ck: Clientdef __init__(self):try:self.__ck = Client(host='ck_host',port=12345,user='ck_user',password='ck_password',settings={'use_numpy': True})except Exception as e:raise Connection(e)def get_conn(self) -> Client:return self.__ckdef query(self, sql: str):return self.__ck.execute(sql)def insert(self, sql: str):return self.__ck.execute(sql)def insert_dataframe(self, sql: str, df: pd.DataFrame, external_tables=None, query_id=None,settings=None):return self.__ck.insert_dataframe(sql, df, external_tables, query_id, settings)def close(self):self.__ck.disconnect()# ck表
class Test1CK:__slots__ = ("__ck","__table")__ck: ClickHose__table: strdef __init__(self, ck: ClickHose):self.__ck = ckself.__table = "db.test1"if not self.check_table():raise NotExist(f"not exist table: {self.__table}")def check_table(self)->bool:try:table_exists = self.__ck.query(f"EXISTS TABLE {self.__table}")[0][0]return table_existsexcept Exception as e:raise Error(e)def get_bill_by_cloud(self, cloud: str, start_time: int, end_time: int)->list:query = f"select * from {self.__table} where (cloud = '{cloud}') AND (time > {start_time}) AND (time < {end_time})"try:data = self.__ck.query(query)return dataexcept Exception as e:# logger.error(f"get_bill from table: {self.__table} error: {e}")def insert_batch(self, month: str, df: pd.DataFrame):if df is None or len(df) < 1:returntry:# 定义每批次插入的行数batch_size = 1000query = self.get_insert_sql(df)# 分批次插入数据for start in range(0, len(df), batch_size):end = min(start + batch_size, len(df))tuples = [tuple(x) for x in df[start:end].to_numpy()]sql = f"{query}" + ', '.join(map(str, tuples))self.__ck.insert(sql)#logger.info(f"insert month: {month} data success")except Exception as e:#logger.error(f"insert_batch from table: {self.__table}, sql: {sql} error: {e}")def insert_dataframe(self, month: str, df: pd.DataFrame):if df is None or len(df) < 1:returntry:# 定义每批次插入的行数batch_size = 1000cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "self.__ck.insert_dataframe(query, df, settings={'max_insert_block_size': batch_size})#logger.info(f"insert month: {month} data success")except Exception as e:#logger.error(f"insert_batch from table: {self.__table}, sql: {query} error: {e}")def get_insert_sql(self, df: pd.DataFrame) -> str:cols = df.columns.tolist()query = f"INSERT INTO {self.__table} ({','.join(map(str, cols))}) VALUES "#logger.debug(f"get_insert_sql: {query}")return queryif __name__ == '__main__':ck_db = ClickHose()try:test1_table = Test1CK(ck_db)# 创建一个示例DataFramedata = {'id': [1, 2, 3],'name': ['name_1', 'name_2', 'name_3']}df = pd.DataFrame(data)test1_table.insert_batch("3", df)except Exception as e:#logger.error(f"error {e}")
参考资料:
python如何将一个dataframe快速写入clickhouse
python批量插入clickhouse