Spark踩坑记——数据库(Hbase+Mysql)转

转自:http://www.cnblogs.com/xlturing/p/spark.html

前言

在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值。最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己踩到的一些坑进行记录。

Spark Streaming持久化设计模式

DStreams输出操作

  • print:打印driver结点上每个Dstream中的前10个batch元素,常用于开发和调试
  • saveAsTextFiles(prefix, [suffix]):将当前Dstream保存为文件,每个interval batch的文件名命名规则基于prefix和suffix:"prefix-TIME_IN_MS[.suffix]".
  • saveAsObjectFiles(prefix, [suffix]):将当前的Dstream内容作为Java可序列化对象的序列化文件进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
  • saveAsHadoopFiles(prefix, [suffix]):将Dstream以hadoop文件的形式进行保存,每个interval batch的文件命名规则基于prefix和suffix:: "prefix-TIME_IN_MS[.suffix]".
  • foreachRDD(func):最通用的输出操作,可以对从数据流中产生的每一个RDD应用函数_fun_。通常_fun_会将每个RDD中的数据保存到外部系统,如:将RDD保存到文件,或者通过网络连接保存到数据库。值得注意的是:_fun_执行在跑应用的driver进程中,并且通常会包含RDD action以促使数据流RDD开始计算。

使用foreachRDD的设计模式

dstream.foreachRDD对于开发而言提供了很大的灵活性,但在使用时也要避免很多常见的坑。我们通常将数据保存到外部系统中的流程是:建立远程连接->通过连接传输数据到远程系统->关闭连接。针对这个流程我们很直接的想到了下面的程序代码:

dstream.foreachRDD { rdd =>val connection = createNewConnection()  // executed at the driver rdd.foreach { record => connection.send(record) // executed at the worker } }

在spark踩坑记——初试中,对spark的worker和driver进行了整理,我们知道在集群模式下,上述代码中的connection需要通过序列化对象的形式从driver发送到worker,但是connection是无法在机器之间传递的,即connection是无法序列化的,这样可能会引起_serialization errors (connection object not serializable)_的错误。为了避免这种错误,我们将conenction在worker当中建立,代码如下:

dstream.foreachRDD { rdd =>rdd.foreach { record =>val connection = createNewConnection()connection.send(record)connection.close() } }

似乎这样问题解决了?但是细想下,我们在每个rdd的每条记录当中都进行了connection的建立和关闭,这会导致不必要的高负荷并且降低整个系统的吞吐量。所以一个更好的方式是使用_rdd.foreachPartition_即对于每一个rdd的partition建立唯一的连接(注:每个partition是内的rdd是运行在同一worker之上的),代码如下:

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()}
}

这样我们降低了频繁建立连接的负载,通常我们在连接数据库时会使用连接池,把连接池的概念引入,代码优化如下:

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>// ConnectionPool is a static, lazily initialized pool of connectionsval connection = ConnectionPool.getConnection()partitionOfRecords.foreach(record => connection.send(record))ConnectionPool.returnConnection(connection)  // return to the pool for future reuse}
}

通过持有一个静态连接池对象,我们可以重复利用connection而进一步优化了连接建立的开销,从而降低了负载。另外值得注意的是,同数据库的连接池类似,我们这里所说的连接池同样应该是lazy的按需建立连接,并且及时的收回超时的连接。
另外值得注意的是:

  • 如果在spark streaming中使用了多次foreachRDD,它们之间是按照程序顺序向下执行的
  • Dstream对于输出操作的执行策略是lazy的,所以如果我们在foreachRDD中不添加任何RDD action,那么系统仅仅会接收数据然后将数据丢弃。

Spark访问Hbase

上面我们阐述了将spark streaming的Dstream输出到外部系统的基本设计模式,这里我们阐述如何将Dstream输出到Hbase集群。

Hbase通用连接类

Scala连接Hbase是通过zookeeper获取信息,所以在配置时需要提供zookeeper的相关信息,如下:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Connection
import org.apache.hadoop.hbase.HConstants import org.apache.hadoop.hbase.client.ConnectionFactory object HbaseUtil extends Serializable { private val conf = HBaseConfiguration.create() private val para = Conf.hbaseConfig // Conf为配置类,获取hbase的配置 conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, para.get("port").getOrElse("2181")) conf.set(HConstants.ZOOKEEPER_QUORUM, para.get("quorum").getOrElse("127-0-0-1")) // hosts private val connection = ConnectionFactory.createConnection(conf) def getHbaseConn: Connection = connection }

根据网上资料,Hbase的连接的特殊性我们并没有使用连接池

Hbase输出操作

我们以put操作为例,演示将上述设计模式应用到Hbase输出操作当中:

dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {val connection = HbaseUtil.getHbaseConn // 获取Hbase连接partitionRecords.foreach(data => {val tableName = TableName.valueOf("tableName")val t = connection.getTable(tableName)try { val put = new Put(Bytes.toBytes(_rowKey_)) // row key // column, qualifier, value put.addColumn(_column_.getBytes, _qualifier_.getBytes, _value_.getBytes) Try(t.put(put)).getOrElse(t.close()) // do some log(显示在worker上) } catch { case e: Exception => // log error e.printStackTrace() } finally { t.close() } }) }) // do some log(显示在driver上) } })

关于Hbase的其他操作可以参考Spark 下操作 HBase(1.0.0 新 API)

填坑记录

重点记录在连接Hbase过程中配置HConstants.ZOOKEEPER_QUORUM的问题:

  • 由于Hbase的连接不能直接使用ip地址进行访问,往往需要配置hosts,例如我在上述代码段中127-0-0-1(任意),我们在hosts中需要配置

    127-0-0-1 127.0.0.1
  • 在单机情况下,我们只需要配置一台zookeeper所在Hbase的hosts即可,但是当切换到Hbase集群是遇到一个诡异的bug
    问题描述:在foreachRDD中将Dstream保存到Hbase时会卡住,并且没有任何错误信息爆出(没错!它就是卡住,没反应)
    问题分析:由于Hbase集群有多台机器,而我们只配置了一台Hbase机器的hosts,这样导致Spark集群在访问Hbase时不断的去寻找但却找不到就卡在那里
    解决方式:对每个worker上的hosts配置了所有hbase的节点ip,问题解决

Spark访问Mysql

同访问Hbase类似,我们也需要有一个可序列化的类来建立Mysql连接,这里我们利用了Mysql的C3P0连接池

MySQL通用连接类

import java.sql.Connection
import java.util.Propertiesimport com.mchange.v2.c3p0.ComboPooledDataSource class MysqlPool extends Serializable { private val cpds: ComboPooledDataSource = new ComboPooledDataSource(true) private val conf = Conf.mysqlConfig try { cpds.setJdbcUrl(conf.get("url").getOrElse("jdbc:mysql://127.0.0.1:3306/test_bee?useUnicode=true&characterEncoding=UTF-8")); cpds.setDriverClass("com.mysql.jdbc.Driver"); cpds.setUser(conf.get("username").getOrElse("root")); cpds.setPassword(conf.get("password").getOrElse("")) cpds.setMaxPoolSize(200) cpds.setMinPoolSize(20) cpds.setAcquireIncrement(5) cpds.setMaxStatements(180) } catch { case e: Exception => e.printStackTrace() } def getConnection: Connection = { try { return cpds.getConnection(); } catch { case ex: Exception => ex.printStackTrace() null } } } object MysqlManager { var mysqlManager: MysqlPool = _ def getMysqlManager: MysqlPool = { synchronized { if (mysqlManager == null) { mysqlManager = new MysqlPool } } mysqlManager } }

我们利用c3p0建立Mysql连接池,然后访问的时候每次从连接池中取出连接用于数据传输。

Mysql输出操作

同样利用之前的foreachRDD设计模式,将Dstream输出到mysql的代码如下:

dstream.foreachRDD(rdd => {if (!rdd.isEmpty) {rdd.foreachPartition(partitionRecords => {//从连接池中获取一个连接val conn = MysqlManager.getMysqlManager.getConnectionval statement = conn.createStatementtry {conn.setAutoCommit(false)partitionRecords.foreach(record => { val sql = "insert into table..." // 需要执行的sql操作 statement.addBatch(sql) }) statement.executeBatch conn.commit } catch { case e: Exception => // do some log } finally { statement.close() conn.close() } }) } })

值得注意的是:

  • 我们在提交Mysql的操作的时候,并不是每条记录提交一次,而是采用了批量提交的形式,所以需要将conn.setAutoCommit(false),这样可以进一步提高mysql的效率。
  • 如果我们更新Mysql中带索引的字段时,会导致更新速度较慢,这种情况应想办法避免,如果不可避免,那就硬上吧(T^T)

部署

提供一下Spark连接Mysql和Hbase所需要的jar包的maven配置:

<dependency><!-- Hbase --><groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.0</version> </dependency> <dependency><!-- Mysql --> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.31</version> </dependency> <dependency> <groupId>c3p0</groupId> <artifactId>c3p0</artifactId> <version>0.9.1.2</version> </dependency>

参考文献:

  1. Spark Streaming Programming Guide
  2. HBase介绍
  3. Spark 下操作 HBase(1.0.0 新 API)
  4. Spark开发快速入门
  5. kafka->spark->streaming->mysql(scala)实时数据处理示例
  6. Spark Streaming 中使用c3p0连接池操作mysql数据库

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/397747.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

解决Failed to connect session for conifg 故障

服务器升级openssh之后jenkins构建报错了&#xff0c;报错信息如下&#xff1a; Failed to connet or change directory jenkins.plugins.publish_over.BapPublisherException:Failed to connect session for config.....Message [Algorithm negotiation fail] 升级前ssh版本&a…

78oa mysql_78oa系统版本升级方法

可升级版本预览升级方法&#xff1a;1、备份数据库、附件目录、二次开发程序打开开始菜单——控制面板——管理工具——服务&#xff0c;右键点击停止 78oa mysql service 服务&#xff0c;完整复制【D:\78OA\server\modules\storage\data\78oa】(数据库)文件夹至备份区域。完整…

Excel导出显示服务器意外,C# 调用Excel 出现服务器出现意外状况. (异常来自 HRESULT:0x80010105 (RPC_E_SERVERFAULT)...

C# 调用Excel 出现服务器出现意外状况. (异常来自 HRESULT:0x80010105 (RPC_E_SERVERFAULT)htmlprivate Microsoft.Office.Interop.Excel.Application xApp;private Microsoft.Office.Interop.Excel.Workbook xBook;服务器//变量xApp new Microsoft.Office.Interop.Excel.Appl…

列表、元组、字典、集合的定义、操作与综合练习

l[A,B,C] t{A,B,C}l.append(B)print(l)scores[66,77,88]d{A:66,B:77,C:88} d[B]99 d[D]111 d.pop(C) print(d)s1{A,B,C} s2{A,C,D} print(s1&s2) print(s1|s2) 转载于:https://www.cnblogs.com/chenjunyu666/p/9147417.html

xargs

find /tmp/ -name "*.log" -mtime 4 | xargs -i -t mv {} /home/ find /tmp/ -name "*.log" -mtime 4 -print0 | xargs -0 rm -f xargs(1) xargs是给命令传递参数的一个过滤器&#xff0c;也是组合多个命令的一个工具。它把一个数据流分割为一些足够小的块…

export mysql home_mysql的Linux下安装笔记

注&#xff1a;在5.7之后MySQL不在生成my-default.cnf配置。tar -xzvf mysql-5.7.28-linux-glibc2.12-x86_64.tar.gzmv mysql-5.7.28-linux-glibc2.12-x86_64.tar.gz/ /usr/local/mysql新建 useradd mysql新建文件夹mkdir /usr/local/mysql/data生成配置&#xff1a;./mysqld -…

[转]DevExpress GridControl 关于使用CardView的一点小结

最近项目里需要显示商品的一系列图片&#xff0c;打算用CardView来显示&#xff0c;由于第一次使用&#xff0c;遇到许多问题&#xff0c;发现网上这方面的资源很少&#xff0c;所以把自己的一点点实际经验小结一下&#xff0c;供自己和大家以后参考。 1、选择CardView&#xf…

thinkphp5 ajax搜索+分页

<center> <table > <tr> 水果名称<input type"text" name"f_name" class"f_name"> 水果分类 &…

EventBus学习

EventBus是android 下高效的发布/订阅事件总线机制&#xff0c;可以代替传统的Intent&#xff0c;Handler&#xff0c;BroadCast 或者Fragment&#xff0c;Activity&#xff0c;Service&#xff0c;线程之间传递数据&#xff0c;是一种发布订阅设计模式&#xff08;观察者模式&…

Uediter的引用和取值

页面应用Uediter控件&#xff0c;代码如下&#xff1a; <tr><td align"center" class"xwnr_j"><asp: TextBox ID "txtContent" TextMode "MultiLine" Height "274px" Width "95%" runat"serv…

java程序 构建mycircle类_Java语言程序设计(十九)对象和类的应用实例

1.我们定义一个Circle类并使用该类创建对象&#xff0c;我们创建三个圆对象&#xff0c;1.0&#xff0c;25和125&#xff0c;然后显示这三个圆的半径和面积&#xff0c;将第二个对象的半径改为100&#xff0c;然后显示它的新半径和面积。程序清单如下&#xff1a;package testc…

Django抛错不存在(DoesNotExist)

from django.core.exceptions import ObjectDoesNotExist try:disabledusers.objects.get(sAMAccountNameliu) except ObjectDoesNotExist:print a except modelname.DoesNotExist:转载于:https://www.cnblogs.com/dreamer-fish/p/5835465.html

mysql ddl dql_mysql DDL、DML、DCL、DQL区分

mysql [Structure Query Language] 的组成分4个部分&#xff1a;DDL [Data Mefinition Language] 数据定义语言DML [Data Manipulation Language]  数据操纵语言DCL [Data Control Language] 数据控制语言DQL [Data Query Language ] 数据查询语言1、…

hiho图的联通性(自留)

无向图割边割点算法 而当(u,v)为树边且low[v]>dfn[u]时&#xff0c;表示v节点只能通过该边(u,v)与u连通&#xff0c;那么(u,v)即为割边。 1 void dfs(int u) {2 //记录dfs遍历次序3 static int counter 0; 4 5 //记录节点u的子树数6 int children …

《Git权威指南》笔记2

2019独角兽企业重金招聘Python工程师标准>>> ###Git克隆 Git使用git clone命令实现版本库克隆&#xff0c;主要有如下3种用法&#xff1a; 1&#xff09;git clone <repository> <direcctory> 将repository指向的版本库创建一个克隆岛directory目录。目…

SQL数据库挂起 SQL数据库附加报错 SQL数据库824错误修复

SQL数据库挂起 SQL数据库附加报错 SQL数据库824错误修复 数据类型 MSSQL 2012数据大小 4.5 GB故障检测 附加数据库提示824错误 一般是由于断电非法关机导致页面损坏。客户要求 恢复数据库数据 ERP可直接使用。修复结果 文件传来后 检测发现页面没有及时正常关闭导致SQL认为页不…

查找算法

a. 线性查找&#xff1a;从数据中&#xff0c;第一个元素开始查找&#xff0c;将其与查找的值进行比对&#xff0c;如果相同&#xff0c;就停止查找&#xff0c;如果不相同&#xff0c;则继续下一个元素的比对。直到查找到匹配的值&#xff0c;或者是有数据遍历完毕&#xff0c…

mysql测试数据图表_mysql测试数据表

1.截取至后盾人用于mysql数据测试请在navicat中执行一下命令生成测试数据表/*Navicat Premium Data TransferSource Server : 我的本地连接Source Server Type : MySQLSource Server Version : 50726Source Host : localhost:3306Source Schema : laravelTarget Server Type : …

常用归档压缩命令

1. 打包tar打包表示把一堆文件变成一个tar ####打包工具-f ####指定生成包的名字-c ####创建包-v ####显示创建过程-t ####查看包中内容-x ####解包-r ####添加文件到包中--delete filename ##删除包中指定文件--get filename ##取出包中指定文件cffrcvf 等组合使用2. 压缩…

spring集合的注入

<bean id"date" class"java.util.Date"></bean> <bean id"test" class"test.Test"> <!--注入list-->   <property name"list">     <list>       <value>1</valu…