java上传kafka的方法_哪种方法是将所有数据从Kafka主题复制到接收器(文件或Hive表)的最佳方法?...

我正在使用Kafka Consumer API将所有数据从Kafka主题复制到Hive表 . 为此,我使用HDFS作为中间步骤 . 我使用唯一的组ID并将偏移重置为“最早”,以便从头开始获取所有数据,并在执行后忽略提交 . 然后我遍历Kafka主题中的记录,并将每条记录保存到HDFS中的临时文件中 . 然后我使用Spark从HDFS读取数据,然后使用日期作为文件名将其保存到Parquet文件中 . 然后,我在Hive表中创建一个带日期的分区,最后在Parquet中将文件作为分区加载到Hive中 .

正如您在下面的代码中看到的,我使用了几个中间步骤,这使得我的代码远非最佳 . 这是从Kafka主题复制所有数据的最佳推荐方法吗?我做了一些研究,到目前为止,这是我设法开始工作的变通方法,但是,随着记录数量每天增加,我的执行时间达到了可容忍的极限(从2分钟变为6分钟到6分钟)周) .

代码在这里:

def start( lowerDate: String, upperDate: String )={

// Configurations for kafka consumer

val conf = ConfigFactory.parseResources("properties.conf")

val brokersip = conf.getString("enrichment.brokers.value")

val topics_in = conf.getString("enrichment.topics_in.value")

// Crea la sesion de Spark

val spark = SparkSession

.builder()

.master("yarn")

.appName("ParaTiUserXY")

.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

import spark.implicits._

val properties = new Properties

properties.put("key.deserializer", classOf[StringDeserializer])

properties.put("value.deserializer", classOf[StringDeserializer])

properties.put("bootstrap.servers", brokersip)

properties.put("auto.offset.reset", "earliest")

properties.put("group.id", "ParaTiUserXYZZ12345")

//Schema para transformar los valores del topico de Kafka a JSON

val my_schema = new StructType()

.add("longitudCliente", StringType)

.add("latitudCliente", StringType)

.add("dni", StringType)

.add("alias", StringType)

.add("segmentoCliente", StringType)

.add("timestampCliente", StringType)

.add("dateCliente", StringType)

.add("timeCliente", StringType)

.add("tokenCliente", StringType)

.add("telefonoCliente", StringType)

val consumer = new KafkaConsumer[String, String](properties)

consumer.subscribe( util.Collections.singletonList("parati_rt_geoevents") )

val fs = {

val conf = new Configuration()

FileSystem.get(conf)

}

val temp_path:Path = new Path("hdfs:///tmp/s70956/tmpstgtopics")

if( fs.exists(temp_path)){

fs.delete(temp_path, true)

}

while(true)

{

val records=consumer.poll(100)

for (record

val data = record.value.toString

//println(data)

val dataos: FSDataOutputStream = fs.create(temp_path)

val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))

bw.append(data)

bw.close

val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/s70956/tmpstgtopics")

val fechaCliente = data_schema.select("dateCliente").first.getString(0)

if( fechaCliente < upperDate && fechaCliente >= lowerDate){

data_schema.select("longitudCliente", "latitudCliente","dni", "alias",

"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",

"tokenCliente", "telefonoCliente")

.coalesce(1).write.mode(SaveMode.Append).parquet("/desa/landing/parati/xyuser/" + fechaCliente)

}

else if( fechaCliente < lowerDate){

//

}

else if( fechaCliente >= upperDate){

break;

}

}

}

consumer.close()

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/542596.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

openstack nova-network 的小bug的排错经历

环境是 nova-network vmwareflatdhcp错误表现为 开出来的虚拟机有一定几率获取不到dhcp地址&#xff0c;手工赋予ip则正常&#xff0c;用flat模式注入的ip正常&#xff0c;下面是排错过程1首先找网络防火墙已经把 dnsmasq对应的端口已经打开抓包结果&#xff1a;可以看到虚拟机…

anaconda base环境_anaconda中安装packages:pip还是conda install?

conda install我就不说了&#xff0c;这都不会别学了就。Using command:$ which -a pip, the terminal will return:This indicates two different pip path to install packages[1].在tf23环境中pip install在base环境中pip install在windows下powershell内&#xff0c;进入到…

Java ClassLoader setDefaultAssertionStatus()方法与示例

ClassLoader类setDefaultAssertionStatus()方法 (ClassLoader Class setDefaultAssertionStatus() method) setDefaultAssertionStatus() method is available in java.lang package. setDefaultAssertionStatus()方法在java.lang包中可用。 setDefaultAssertionStatus() metho…

【风马一族_xml】xmlp之dtd1

什么是XML约束&#xff1f;在xml技术里&#xff0c;可以编写一个文档来约束一个xml文档的写法&#xff0c;这称之为xml约束 2. 为什么要使用xml约束&#xff1f; 参看提示栏 3. xml约束的作用&#xff1f; 约束xml的写法对xml进行校验4. 常见的xml约束技术 xml dtdxml Schema…

java ssm框架 缓存_SSM框架之MyBatis3专题4:查询缓存

查询缓存的使用&#xff0c;主要是为了提高查询访问速度。将用户对同一数据的重复查询过程简化&#xff0c;不再每次均从数据库中查询获取结果数据&#xff0c;从而提高访问速度。MyBatis的查询缓存机制&#xff0c;根据缓存区的作用域(声明周期)可划分为两种&#xff1a;一级查…

matplotlib画图_漂亮,超详细的matplotlib画图基础

来自 | 逐梦erhttps://zhumenger.blog.csdn.net/article/details/106530281本文仅作技术交流&#xff0c;如有侵权&#xff0c;请联系后台删除。数据可视化非常重要&#xff0c;因为错误或不充分的数据表示方法可能会毁掉原本很出色的数据分析工作。matplotlib 库是专门用于开发…

c# 2维数组 取一维_C#| 不同类型的一维数组声明

c# 2维数组 取一维In the below example, we are declaring an integer array (one dimensional) with following styles: 在下面的示例中&#xff0c;我们声明具有以下样式的整数数组(一维) &#xff1a; 1) One dimensional Array declaration with initialization (without…

Java编程经典10道_Java经典编程题50道之十二

企业发放的奖金根据利润提成&#xff1a;利润(I)低于或等于10万元时&#xff0c;奖金可提10%&#xff1b;利润高于10万元&#xff0c;低于20万元时&#xff0c;低于10万元的部分按10%提成&#xff0c; 高于10万元的部分 &#xff0c;可提成7.5%&#xff1b;20万到40万之间时&am…

RHEL7 单独安装图形 X11

RHEL7 默认是最小化安装&#xff08;Minimal Install&#xff09;&#xff0c;没有图形界面&#xff0c; 我们应该选择Server with GUI。若已错过此步骤&#xff0c;我们采用以下方式补充安装GUI界面。 先配置yum源可以参考我的这篇文章http://blog.itpub.net/27771627/viewspa…

android recycleview长按多选_UI设计中Android和IOS设计差异总结

由于设计师、产品经理使用的移动设备大部分是iPhone&#xff0c;所以在做设计时&#xff0c;容易忽略Android和iOS的差异&#xff0c;按照iOS的规范进行设计&#xff0c;两端只做一套。只做一套的会存在两个问题&#xff1a;1、安卓用户的使用习惯不太适应iOS的设计&#xff0c…

Kotlin程序用于打印JVM版本的Kotlin(打印Java属性)

Here, we will create a Kotlin program to print Kotlin, JVM version (printing Java properties). As Kotlin can be seen as an upgrade of Java, so we will get all versions of java (JVM) using Kotlin also. 在这里&#xff0c;我们将创建一个Kotlin程序以打印JVM版本…

自定义动画属性java_创建酷炫动画效果的10个JavaScript库

原标题&#xff1a;创建酷炫动画效果的10个JavaScript库1) Dynamics.jsDynamics.js是设计基于物理规律的动画的重要Java库。它可以赋予生命给所有包含CSS 和SVG属性的DOM(文本对象模型)元素&#xff0c;换句话说&#xff0c;Dynamics.js适用于所有Java对象以及一系列其它的元素…

php xlsx里插入图片_常见的 PHP 面试题和答案分享

如何直接将输出显示给浏览器&#xff1f;将输出直接显示给浏览器&#xff0c;我们必须使用特殊标记 <&#xff1f;and&#xff1f;>。PHP 是否支持多重继承&#xff1f;PHP 只支持单继承。PHP 的类使用关键字 extends 继承另一个类获取图片属性&#xff08;size, width, …

java调用构造函数中某一个值_Java如何在枚举的构造函数中调用另一个枚举值

Java中的枚举(enum)是一种存储一组常量值的数据类型。您可以使用枚举来存储固定值&#xff0c;例如一周中的天&#xff0c;一年中的月等。您可以使用关键字 enum定义枚举&#xff0c;后跟枚举的名称为-enum Days {SUNDAY, MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATUR…

python 示例_Python日历类| yeardatescalendar()方法与示例

python 示例Python Calendar.yeardatescalendar()方法 (Python Calendar.yeardatescalendar() Method) Calendar.yeardatescalendar() method is an inbuilt method of the Calendar class of calendar module in Python. It uses an instance of this class and returns a lis…

js:插入节点appendChild insertBefore使用方法

首先 从定义来理解 这两个方法&#xff1a; appendChild() 方法&#xff1a;可向节点的子节点列表的末尾添加新的子节点。语法&#xff1a;appendChild(newchild) insertBefore() 方法&#xff1a;可在已有的子节点前插入一个新的子节点。语法 &#xff1a;insertBefore(newchi…

pandas concat_pandas-数据合并-concat(最全参数解释,含代码和实例)

pandas中的concat的功能&#xff1a;假设你现在需要将多个数据合并&#xff0c;前提是&#xff1a;这几个文件列名都一致&#xff0c;也就是说这几个文件格式完全一样&#xff0c;只是数据不太一样&#xff0c;类似于合并多个文件这种&#xff0c;实际数据分析中也会遇到这种情…

java中的de是什么_【转】java中main函数解析

源地址&#xff1a;http://www.cnblogs.com/xwdreamer/archive/2012/04/09/2438845.html从写java至今&#xff0c;写的最多的可能就是主函数public static void main(String[] args) {}但是以前一直都没有问自己&#xff0c;为什么要这么写&#xff0c;因为在c语言中就没有这样…

JAVA多线程(一)线程安全问题产生的原因

JAVA线程内存与主存间映射示意图Java内存模型中规定了所有的变量都存储在主内存中&#xff0c;每条线程还有自己的工作内存&#xff0c;线程的工作内存中保存了该线程使用的变量到主内存副本拷贝&#xff0c;线程对变量的所有操作&#xff08;读取、赋值&#xff09;都必须在工…