风不懂 不懂得 叶的梦
月不听 不听闻
窗里琴声意难穷
水不见 不曾见 绿消红
霜不知 不知晓
将别人怎道珍重
落叶有风才敢
做一个 会飞的梦
孤窗有月才敢
登高在 夜里从容
桃花有水才怕
身是客 身是客
此景不能久
🎵 Tie Yann (铁阳)、薄彩生《不知晓》
在大数据分析和处理领域,Apache Spark是一个广泛使用的高性能、通用的计算框架,而ClickHouse作为一个高性能的列式数据库,特别适合在线分析处理(OLAP)。结合Scala语言的强大功能和简洁语法,我们可以高效地开发Spark应用程序来执行复杂的数据分析任务。本博客将详细介绍如何使用Scala结合Spark连接ClickHouse,并进行一系列的数据处理操作。
环境准备
首先,请确保你已经安装了以下软件:
- Apache Spark:确保安装了适合你数据处理需求的版本。
- ClickHouse:安装并配置好ClickHouse数据库,包括网络访问权限等。
- JDK和Scala:因为我们使用Scala编写Spark应用程序,需要安装Java开发工具包和Scala。
创建SparkSession
SparkSession是Spark 2.0引入的一个新概念,是对之前版本中SparkContext、SQLContext等API的封装,它提供了一个统一的入口来进行各种数据操作。
val spark = SparkSession.builder().appName("myApp").master("yarn") // 这里使用yarn模式.config("spark.sql.catalogImplementation", "hive").getOrCreate()
连接ClickHouse
要连接ClickHouse,我们需要配置数据库的URL、用户名和密码等信息。以下是一个配置JDBC连接的示例:
def getCKJdbcProperties(batchSize: String = "100000",socketTimeout: String = "300000",numPartitions: String = "50",rewriteBatchedStatements: String = "true"): Properties = {val properties = new Propertiesproperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")properties.put("user", "default")properties.put("password", "t233")properties
}
读取数据
利用Spark的read.jdbc方法,我们可以轻松地从ClickHouse读取数据到DataFrame中。
val ckUrl = "jdbc:clickhouse://233.233.233.233:8123/test"
val ckTable = "test.testTable"
val ckProperties = getCKJdbcProperties()
val ckDF = spark.read.jdbc(ckUrl, ckTable, ckProperties)
数据处理
数据读取到Spark后,你可以使用Scala编写的各种数据处理逻辑。例如,我们可以提取特定字段、进行过滤、聚合等操作。
var dipList = ckDF.select("ip_dst").distinct().where("tpart='" + today + "'").collect()
写回ClickHouse或HDFS
处理完数据后,你可能需要将结果保存回ClickHouse或写入HDFS。这可以通过DataFrameWriter
完成,它支持多种数据写入模式和格式。
// 示例:将处理后的数据写入HDFS
retDipDF.coalesce(1).write.mode(SaveMode.Overwrite).csv("/tmp/url_test/dip/" + today)
完整案例
package com.hzx.demoimport scala.collection.mutable.ArrayBuffer
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import com.hzx.sec.util.isRealAttack.getURLInfoobject MainDemo {def getCKJdbcProperties(batchSize: String = "100000",socketTimeout: String = "300000",numPartitions: String = "50",rewriteBatchedStatements: String = "true"): Properties = {val properties = new Propertiesproperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")properties.put("user", "default")properties.put("password", "t233")properties.put("batchsize", batchSize)properties.put("socket_timeout", socketTimeout)properties.put("numPartitions", numPartitions)properties.put("rewriteBatchedStatements", rewriteBatchedStatements)properties}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("myApp").master("yarn").config("spark.sql.catalogImplementation", "hive").config("spark.default.parallelism", "1000").config("spark.driver.maxResultSize", "20g").config("spark.debug.maxToStringFields", "100").config("spark.executor.memory", "16g").config("spark.driver.memory", "20g").config("spark.executor.cores", "8").config("spark.executor.instances", "10").config("spark.yarn.queue", "testdb").config("spark.driver.extraClassPath", "$LIBJARS").config("spark.executor.extraClassPath", "$LIBJARS").getOrCreate()spark.sparkContext.setLogLevel("ERROR")spark.sql("use testdb")// 打印任务开始时间println("任务开始时间:" + java.time.LocalDateTime.now())// val today = java.time.LocalDate.now().toStringval today = "2023-05-30"val todayStr = today.replace("-", "")// 连接clickhouseval ckProperties = getCKJdbcProperties()val ckUrl = "jdbc:clickhouse://233.233.233.233:8123/test"val ckTable = "testdb.testtable"var ckDF = spark.read.jdbc(ckUrl, ckTable, ckProperties)var dipList = ckDF.select("ip_dst").distinct().where("tpart='" + today + "'").collect()}}
总结
通过Scala和Spark结合ClickHouse进行数据处理,我们可以利用Spark的强大计算能力和ClickHouse的高效存储能力,来实现高性能的大数据分析和处理。这种技术组合特别适合处理日志数据、用户行为分析、实时数据处理等场景。