【Hadoop】使用Scala与Spark连接ClickHouse进行数据处理


风不懂 不懂得 叶的梦
月不听 不听闻
窗里琴声意难穷
水不见 不曾见 绿消红
霜不知 不知晓
将别人怎道珍重
落叶有风才敢
做一个 会飞的梦
孤窗有月才敢
登高在 夜里从容
桃花有水才怕
身是客 身是客
此景不能久
                     🎵 Tie Yann (铁阳)、薄彩生《不知晓》


在大数据分析和处理领域,Apache Spark是一个广泛使用的高性能、通用的计算框架,而ClickHouse作为一个高性能的列式数据库,特别适合在线分析处理(OLAP)。结合Scala语言的强大功能和简洁语法,我们可以高效地开发Spark应用程序来执行复杂的数据分析任务。本博客将详细介绍如何使用Scala结合Spark连接ClickHouse,并进行一系列的数据处理操作。

环境准备

首先,请确保你已经安装了以下软件:

  1. Apache Spark:确保安装了适合你数据处理需求的版本。
  2. ClickHouse:安装并配置好ClickHouse数据库,包括网络访问权限等。
  3. JDK和Scala:因为我们使用Scala编写Spark应用程序,需要安装Java开发工具包和Scala。

创建SparkSession

SparkSession是Spark 2.0引入的一个新概念,是对之前版本中SparkContext、SQLContext等API的封装,它提供了一个统一的入口来进行各种数据操作。

val spark = SparkSession.builder().appName("myApp").master("yarn") // 这里使用yarn模式.config("spark.sql.catalogImplementation", "hive").getOrCreate()

连接ClickHouse

要连接ClickHouse,我们需要配置数据库的URL、用户名和密码等信息。以下是一个配置JDBC连接的示例:

def getCKJdbcProperties(batchSize: String = "100000",socketTimeout: String = "300000",numPartitions: String = "50",rewriteBatchedStatements: String = "true"): Properties = {val properties = new Propertiesproperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")properties.put("user", "default")properties.put("password", "t233")properties
}

读取数据

利用Spark的read.jdbc方法,我们可以轻松地从ClickHouse读取数据到DataFrame中。

val ckUrl = "jdbc:clickhouse://233.233.233.233:8123/test"
val ckTable = "test.testTable"
val ckProperties = getCKJdbcProperties()
val ckDF = spark.read.jdbc(ckUrl, ckTable, ckProperties)

数据处理

数据读取到Spark后,你可以使用Scala编写的各种数据处理逻辑。例如,我们可以提取特定字段、进行过滤、聚合等操作。

var dipList = ckDF.select("ip_dst").distinct().where("tpart='" + today + "'").collect()

写回ClickHouse或HDFS

处理完数据后,你可能需要将结果保存回ClickHouse或写入HDFS。这可以通过DataFrameWriter完成,它支持多种数据写入模式和格式。

// 示例:将处理后的数据写入HDFS
retDipDF.coalesce(1).write.mode(SaveMode.Overwrite).csv("/tmp/url_test/dip/" + today)

完整案例

package com.hzx.demoimport scala.collection.mutable.ArrayBuffer
import java.util.Properties
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SparkSession
import com.hzx.sec.util.isRealAttack.getURLInfoobject MainDemo {def getCKJdbcProperties(batchSize: String = "100000",socketTimeout: String = "300000",numPartitions: String = "50",rewriteBatchedStatements: String = "true"): Properties = {val properties = new Propertiesproperties.put("driver", "ru.yandex.clickhouse.ClickHouseDriver")properties.put("user", "default")properties.put("password", "t233")properties.put("batchsize", batchSize)properties.put("socket_timeout", socketTimeout)properties.put("numPartitions", numPartitions)properties.put("rewriteBatchedStatements", rewriteBatchedStatements)properties}def main(args: Array[String]): Unit = {val spark = SparkSession.builder().appName("myApp").master("yarn").config("spark.sql.catalogImplementation", "hive").config("spark.default.parallelism", "1000").config("spark.driver.maxResultSize", "20g").config("spark.debug.maxToStringFields", "100").config("spark.executor.memory", "16g").config("spark.driver.memory", "20g").config("spark.executor.cores", "8").config("spark.executor.instances", "10").config("spark.yarn.queue", "testdb").config("spark.driver.extraClassPath", "$LIBJARS").config("spark.executor.extraClassPath", "$LIBJARS").getOrCreate()spark.sparkContext.setLogLevel("ERROR")spark.sql("use testdb")// 打印任务开始时间println("任务开始时间:" + java.time.LocalDateTime.now())//    val today = java.time.LocalDate.now().toStringval today = "2023-05-30"val todayStr = today.replace("-", "")// 连接clickhouseval ckProperties = getCKJdbcProperties()val ckUrl = "jdbc:clickhouse://233.233.233.233:8123/test"val ckTable = "testdb.testtable"var ckDF = spark.read.jdbc(ckUrl, ckTable, ckProperties)var dipList = ckDF.select("ip_dst").distinct().where("tpart='" + today + "'").collect()}}

总结

通过Scala和Spark结合ClickHouse进行数据处理,我们可以利用Spark的强大计算能力和ClickHouse的高效存储能力,来实现高性能的大数据分析和处理。这种技术组合特别适合处理日志数据、用户行为分析、实时数据处理等场景。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/764041.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

gofly框架api接口请求合法性验证

验证说明 api接口请求头带合法性参数,用来验证客户端请求是否来自自己授权的应用发起,这个防止api接口被第三方盗用,影响系统性能及数据安全。 服务端配置 配置文件在:resource/config.yaml,配置参数为app:下的apis…

zookeeper分布式锁原理剖析

在ZooKeeper的CLI中,create命令用于在指定路径上创建一个新的节点。以下是create命令的参数解释: -s:顺序节点标志。如果指定了该选项,则创建的节点将是顺序节点。顺序节点的名称将以“path”后跟一个连字符和递增的数字序列结尾…

如何暴露一些方法在线上使用呢?瞧瞧本文适合胃口不。

1. 介绍 逻辑介绍:通过时间对齐方式来控制是否可以访问某些方法。 2. 实现 实现代码如下 (() > {const aes { l: {}, decrypt: () > { console.log(520m) } }const limitLogin () > {const time new Date();const week time.getDay();const hours …

二叉树|257.二叉树的所有路径

力扣题目链接 class Solution { private:void traversal(TreeNode* cur, vector<int>& path, vector<string>& result) {path.push_back(cur->val); // 中&#xff0c;中为什么写在这里&#xff0c;因为最后一个节点也要加入到path中 // 这才到了叶子节…

架构之伸缩性维度

伸缩性的主要场景&#xff1a;电商的秒杀和抢购 热点业务&#xff1a;支付、下单、添加购物车、商品详情页、搜索 热点数据&#xff1a;秒杀产品、动态数据、静态数据 伸缩性实现方案&#xff1a; 无状态应用弹性伸缩&#xff1a;无服务器化Serverless serverless实现&#xf…

Java面试相关问题

一.MySql篇 1优化相关问题 1.1.MySql中如何定位慢查询&#xff1f; 慢查询的概念&#xff1a;在MySQL中&#xff0c;慢查询是指执行时间超过一定阈值的SQL语句。这个阈值是由long_query_time参数设定的&#xff0c;它的默认值是10秒1。也就是说&#xff0c;如果一条SQL语句的执…

axios前端参数的传递几种方法

直接拼接url const axios require(axios);// 假设有两个参数&#xff1a;id 和 category const id 123;// 使用模板字符串将参数拼接在 URL 上 axios.get(https://api.xxx.com/data?id${id}).then(response > {console.log(response.data);}).catch(error > {console.…

复习Day1_

1205. 买不到的数目 - AcWing题库 结论法&#xff1a;两个数a,b不能组合出的最大的数是(a-1)*(b-1)-1 #include <iostream> #include <algorithm> using namespace std; int n, m, minn, maxx, ans; bool dp[1000000];//记录每一个数是否能被凑出来 int main() {…

算法打卡Day14

今日任务&#xff1a; 1&#xff09;104.二叉树的最大深度 2&#xff09;559.n叉树的最大深度 3&#xff09;111.二叉树的最小深度 4&#xff09;222.完全二叉树的节点个数 104.二叉树的最大深度 题目链接&#xff1a;104. 二叉树的最大深度 - 力扣&#xff08;LeetCode&#…

Leetcode 70.爬楼梯

心路历程&#xff1a; 这道题是之前学院的一道复试题&#xff0c;大家都没怎么刷过算法题&#xff0c;只记得当年凭借几次试错自己把这道题做出来了&#xff0c;当时也不知道动态规划之类的。 正常来讲&#xff0c;这种找不到循环结构的题一般都是递归解决。 注意的点&#x…

C语言经典面试题目(二十八)

1、解释一下C语言中的do-while循环语句。 do-while循环语句是C语言中的一种循环结构&#xff0c;它与while循环相似&#xff0c;不同之处在于条件判断的时机。do-while循环先执行循环体&#xff0c;然后再判断条件是否成立&#xff0c;如果条件成立&#xff0c;则继续执行循环…

Linux学习笔记16 - 系统命令

1. Linux 常见系统管理命令 命令含义格式su切换用户su [选项] [用户名]ps显示系统由该用户运行的进程列表ps [选项]top动态显示系统中运行的程序&#xff08;一般为每隔 5s&#xff09;topkill输出特定的信号给指定 PID&#xff08;进程号&#xff09;的进程&#xff0c;并根据…

鸿蒙开发实例【使用高德地图鸿蒙SDK】(一)

概述 地图SDK适配鸿蒙NEXT特性介绍 赋能开发者-提供地图鸿蒙原生ArkTS开发接口 开发者可以使用鸿蒙NEXT推荐的ArkTS接口开发应用集成地图功能&#xff0c;组件使用ArkUI原生组件&#xff0c;兼容方舟UI框架代码全面适配鸿蒙NEXTSDK&#xff0c;所有系统接口均使用鸿蒙NEXTAP…

Kerberos 5安装与使用

目录 简介 实验准备 实验步骤 一、安装Kerberos相关服务并建立集群所有节点互信(node1、2、3) 二、修改配置文件 三、 修改管理员权限配置文件(仅node1) 四、初始化KDC数据库(仅node1) 五、启动Kerberos相关服务(仅node1) 六、创建Kerberos管理员用户(仅node1…

编程题:最长的顺子(Java)

题目描述 斗地主起源于湖北十堰房县&#xff0c;据说是一位叫吴修全的年轻人根据当地流行的扑克玩法“跑得快”改编的&#xff0c;如今已风靡整个中国&#xff0c;并流行于互联网上。 牌型&#xff1a; 单顺&#xff0c;又称顺子&#xff0c;最少5张牌&#xff0c;最多12张牌…

在面对一个大型的代码,需要分文件编写的时候,应该怎么办呢;以及在编写出一个功能时,有人想要买这个功能,怎么在不给出源代码的情况下让买家可以使用这个代码功能呢?

我们一点点来&#xff0c;首先&#xff0c;假设我们要写一个加法功能的实现&#xff0c; 这里是在单个文件里调用函数&#xff0c;实现一个加法的功能&#xff0c; 下面我们把自定义函数放在下面&#xff0c;上面对自定义函数进行一个声明&#xff0c; 下面我们把代码放到多个…

【分享】3种方法保护Excel文件不能随意打开

做好的Excel表格不想被他人随意打开&#xff1f;那就一定要看看下面小编分享的3种方法&#xff0c;看看如何禁止打开Excel表格。 方法一&#xff1a;设置密码保护 我们可以给Excel表格设置“打开密码”&#xff0c;这样只有输入正确的密码&#xff0c;才能打开表格。 设置步骤…

java动态规划学习笔记

学习笔记目录&#xff0c;这里记录个大纲&#xff0c;详情点链接 背包问题 01背包问题综述 01背包问题&#xff08;二维数组&#xff09;https://blog.csdn.net/m0_73065928/article/details/136794406?spm1001.2014.3001.5501 01背包问题&#xff08;滚动数组&#xff09…

S32 Design Studio 中断

中断向量表 中断数量可以在规格书里面看&#xff0c;也可以在SDK\platform\devices\S32K146\include\S32Kxxx.h里面看。 譬如我这个146芯片就有151个中断号 /** Interrupt Number Definitions */ #define NUMBER_OF_INT_VECTORS 151u /**< Number of inter…

Java基础知识总结(13)

数据结构 链表 优点&#xff1a;随机增删元素效率高&#xff08;因为增删元素不涉及到大量元素的位移&#xff09; 缺点&#xff1a;查询效率较低&#xff0c;每一次查找某个元素的时候都需要从头结点开始往下遍历 LinkedList集合 /* 链表的优点&#xff1a; 由于链表的元…