1. Clickhouse建表
- 创建database
create database ad_report;
use ad_report;
- 创建table
drop table if exists dwd_ad_event_inc;
create table if not exists dwd_ad_event_inc
(event_time Int64 comment '事件时间',event_type String comment '事件类型',ad_id String comment '广告id',ad_name String comment '广告名称',ad_product_id String comment '广告产品id',ad_product_name String comment '广告产品名称',ad_product_price Decimal(16, 2) comment '广告产品价格',ad_material_id String comment '广告素材id',ad_material_url String comment '广告素材url',ad_group_id String comment '广告组id',platform_id String comment '推广平台id',platform_name_en String comment '推广平台名称(英文)',platform_name_zh String comment '推广平台名称(中文)',client_country String comment '客户端所处国家',client_area String comment '客户端所处地区',client_province String comment '客户端所处省份',client_city String comment '客户端所处城市',client_ip String comment '客户端ip地址',client_device_id String comment '客户端设备id',client_os_type String comment '客户端操作系统类型',client_os_version String comment '客户端操作系统版本',client_browser_type String comment '客户端浏览器类型',client_browser_version String comment '客户端浏览器版本',client_user_agent String comment '客户端UA',is_invalid_traffic UInt8 comment '是否是异常流量'
) ENGINE = MergeTree()ORDER BY (event_time, ad_name, event_type, client_province, client_city, client_os_type,client_browser_type, is_invalid_traffic);
2. Hive数据导出至Clickhouse
使用spark-sql查询数据,然后通过jdbc写入Clickhouse。
- 创建Maven项目,pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.yt</groupId><artifactId>hive-to-clickhouse</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><!-- 引入mysql驱动,目的是访问hive的metastore元数据--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!-- 引入spark-hive模块--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.3.1</version><scope>provided</scope></dependency><!--引入clickhouse-jdbc驱动,为解决依赖冲突,需排除jackson的两个依赖--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><exclusions><exclusion><artifactId>jackson-databind</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion><exclusion><artifactId>jackson-core</artifactId><groupId>com.fasterxml.jackson.core</groupId></exclusion></exclusions></dependency><!-- 引入commons-cli,目的是方便处理程序的输入参数 --><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><!--将依赖编译到jar包中--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><!--配置执行器--><execution><id>make-assembly</id><!--绑定到package执行周期上--><phase>package</phase><goals><!--只运行一次--><goal>single</goal></goals></execution></executions></plugin></plugins></build></project>
- 创建HiveToClickhouse类
public class HiveToClickhouse {public static void main(String[] args) {//使用 commons-cli 解析参数//1.定义参数Options options = new Options();options.addOption(OptionBuilder.withLongOpt("hive_db").withDescription("hive数据库名称(required)").hasArg(true).isRequired(true).create());options.addOption(OptionBuilder.withLongOpt("hive_table").withDescription("hive表名称(required)").hasArg(true).isRequired(true).create());options.addOption(OptionBuilder.withLongOpt("hive_partition").withDescription("hive分区(required)").hasArg(true).isRequired(true).create());options.addOption(OptionBuilder.withLongOpt("ck_url").withDescription("clickhouse的jdbc url(required)").hasArg(true).isRequired(true).create());options.addOption(OptionBuilder.withLongOpt("ck_table").withDescription("clickhouse表名称(required)").hasArg(true).isRequired(true).create());options.addOption(OptionBuilder.withLongOpt("batch_size").withDescription("数据写入clickhouse时的批次大小(required)").hasArg(true).isRequired(true).create());//2.解析参数CommandLineParser parser = new GnuParser();CommandLine cmd = null;try {cmd = parser.parse(options, args);} catch (ParseException e) {//若catch到参数解析异常(即传入的参数非法),则打印帮助信息,并returnSystem.out.println(e.getMessage());HelpFormatter helpFormatter = new HelpFormatter();helpFormatter.printHelp("--option argument", options);return;}//3.创建SparkConfSparkConf sparkConf = new SparkConf().setAppName("hive2clickhouse");//4.创建SparkSession,并启动Hive支持SparkSession sparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate();//5.设置如下参数,支持使用正则表达式匹配查询字段sparkSession.sql("set spark.sql.parser.quotedRegexColumnNames=true");//6.执行如下查询语句,查询hive表中除去dt分区字段外的所有字段String sql = "select `(dt)?+.+` from " + cmd.getOptionValue("hive_db") + "." + cmd.getOptionValue("hive_table") + " where dt='" + cmd.getOptionValue("hive_partition") + "'";Dataset<Row> hive = sparkSession.sql(sql);//7.将数据通过jdbc模式写入clickhousehive.write().mode(SaveMode.Append).format("jdbc").option("url", cmd.getOptionValue("ck_url")).option("dbtable", cmd.getOptionValue("ck_table")).option("driver", "ru.yandex.clickhouse.ClickHouseDriver").option("batchsize", cmd.getOptionValue("batch_size")).save();//8.关闭SparkSessionsparkSession.close();}}
- 上传hive.xml,hdfs.xml 以及core-site.xml文件到项目的resource目录下
- 打包,并上传hive-to-clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar到hadoop节点
- 执行如下命令测试
spark-submit \
--class com.atguigu.ad.spark.HiveToClickhouse \
--master yarn \
ad_hive_to_clickhouse-1.0-SNAPSHOT-jar-with-dependencies.jar \
--hive_db ad \
--hive_table dwd_ad_event_inc \
--hive_partition 2023-06-07 \
--ck_url jdbc:clickhouse://hadoop102:8123/ad_report \
--ck_table dwd_ad_event_inc \
--batch_size 1000
PS:
- 为保证任务可提交到yarn运行,需要在$SPARK_HOME/conf/spark-env.sh文件中增加如下参数:
export HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop/