kafka集成spark

1.新建Scala项目

具体教程可见在idea中创建Scala项目教程-CSDN博客

1.1右键项目名-添加框架支持-勾选scala

1.2main目录下新建scala目录-右键Scala目录-将目录标记为-勾选源代码根目录

1.3创建包com.ljr.spark

1.4引入依赖(pox.xml)

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version></dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency></dependencies>

1.5把spark conf/目录下的log4j.properties 复制到项目的resources目录

2.集成spark生产者

新建SparkKafkaProducer (注意选择的是object而不是class)

package com.ljr.spark
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializerimport java.util.Propertiesobject SparkKafkaProducer {def main(args: Array[String]): Unit = {//1 属性配置val pros = new Properties()pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])//2 创建生产者val producer = new KafkaProducer[String, String](pros)//3 发送数据for (i <- 1 to 5) {producer.send(new ProducerRecord[String,String]("customers","Lili" + i))}//4 关闭资源producer.close()}
}

运行,开启Kafka 消费者消费数据

kafka-console-consumer.sh --bootstrap-server node1:9092 --topic customers

能接收到信息,可见spark作为生产者集成Kafka成功

3.集成spark消费者

package com.ljr.sparkimport org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkKafkaConsumer {def main(args: Array[String]): Unit = {//1 初始化上下文环境val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")val sc = new StreamingContext(conf, Seconds(3))//2 消费数据val kafkapara = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"node1:9092,node2:9092",ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],ConsumerConfig.GROUP_ID_CONFIG->"KFK-SP")val kafkaDstream = KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("customers"), kafkapara))val valueDstream = kafkaDstream.map(record => record.value())valueDstream.print()//3 执行代码并阻塞sc.start()sc.awaitTermination()}
}

运行,

开启Kafka 生产者生产数据

kafka-console-producer.sh.sh --bootstrap-server node1:9092 --topic customers

控制台可以消费到数据,可见spark作为消费者集成Kafka成功。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/25990.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[FreeRTOS 基础知识] 保存现场与恢复现场

文章目录 什么是现场&#xff1f;保存现场的数据存放在哪里&#xff1f;保护现场的场景 什么是现场&#xff1f; 在[FreeRTOS 基础知识] 栈 与 汇编语言文章中解析了fun_c汇编函数&#xff0c;假设在执行fun_c函数的过程中产生高优先级的中断。如下图所示。 此时刚从RAM的SP栈…

秋招突击——6/10——复习{(树形DP)树的最长路径、}——新作{电话号码的字母组合}

文章目录 引言复习树形DP——树的最长路径思路分析参考思路求图的最长的直径的通用方法证明 树形DP分析方法问题 参考代码使用一维数组模拟邻接表存储树形结构或者稀疏图 新作电话号码的组合思路分析参考实现 总结 引言 中间面试了两天&#xff0c;去上海呆了一天&#xff0c;…

Linux 安装ab测试工具

yum -y install httpd-tools ab -help #10个并发连接&#xff0c;100个请求 ab -n 200 -c 100 http://www.baidu.com/

基于51单片机的车辆动态称重系统设计

一 动态称重 所谓动态称重是指通过分析和测量车胎运动中的力,来计算该运动车辆的总重量、轴重、轮重和部分重量数据的过程。动态称重系统按经过车辆行驶的速度划分,可分为低速动态称重系统与高速动态称重系统。因为我国高速公路的限速最高是120,所以高速动态称重系统在理论…

奇数求和【菜蛋题解】

计算非负整数m到n(包括m和n&#xff09;之间的所有奇数的和&#xff0c;其中&#xff0c;m不大于n,且n不大于300.例如m3&#xff0c;n12,其和则为&#xff1a;35791135 输入&#xff1a;两个数m和n(0<m<n<300),两个数以一个空格分开 输出&#xff1a;一行&#xff0…

【Rd-03E】使用CH340给Rd03_E雷达模块烧录固件

Rd03_E 指导手册 安信可新品雷达模组Rd-03搭配STM32制作简易人体感应雷达灯教程 http://t.csdnimg.cn/mqhkE 测距指导手册网址&#xff1a; https://docs.ai-thinker.com/_media/rd-03e%E7%B2%BE%E5%87%86%E6%B5%8B%E8%B7%9D%E7%94%A8%E6%88%B7%E6%89%8B%E5%86%8C%E4%B8%AD%…

万能表单与AI的完美融合,打造个性化AI小程序

在人工智能技术日益成熟的今天&#xff0c;如何将AI智能与用户界面无缝结合&#xff0c;已成为软件开发领域的新挑战。MyCms 以其创新的“万能表单结合AI”功能&#xff0c;为开发者提供了一个全新的解决方案&#xff0c;让个性化AI小程序的开发变得前所未有的简单和高效。 一、…

【解读】小提琴图

ref&#xff1a;解读文献中的箱线图&#xff08;Box-plot&#xff09;和小提琴图&#xff08;Violin-plot)&#xff09;_小提琴图和箱线图的区别-CSDN博客小提琴图展示了每个变量的数据分布情况&#xff0c;通过图中的“小提琴”形状可以看出数据的密度和分布情况。 在图中&…

2024-6-10-Model-Agnostic Meta-Learning (MAML)

摘自&#xff1a;Meta-Transfer Learning for Zero-Shot Super-Resolution 近年来&#xff0c;提出了各种元学习算法。它们可以分为三类&#xff1a; 基于度量的方法&#xff1a;这些方法通过学习度量空间&#xff0c;使得在少量样本内进行高效的学习。例如[35, 38, 39]。基于…

JS实现移动端的轮播图滑动事件

在移动端实现轮播图滑动事件&#xff0c;我们通常使用 touchstart、touchmove 和 touchend 这三个事件。下面是一个基本的示例&#xff0c;展示了如何使用原生JavaScript来创建一个简单的移动端轮播图滑动效果&#xff1a; HTML结构&#xff1a; <div id"carousel&qu…

微信小程序录音机

微信小程序是一种非常流行的移动应用&#xff0c;它具有强大的功能和便捷的使用体验。其中&#xff0c;录音功能是其重要的一部分&#xff0c;用户可以通过微信小程序进行录音&#xff0c;这为用户提供了一个便捷的交流和沟通方式。 一、录音功能的应用场景 录音功能可以应用…

ElasticSearch聚合方式

聚合方式 ES支持灵活的聚合方式,它不仅支持聚合和查询相结合,而且还可以使聚合的过滤条件不影响搜索条件,并且还支持在聚合后的结果中进行过滤筛选。 1.1 直接聚合 直接聚合指的是聚合时的DSL中国没有query子句,是直接对索引内的所有文档进行聚合。 比如下面的DSL: 1.2 先…

入门级的卷积神经网络训练识别手写数字-小白轻松上手-含数据集+pyqt界面

代码下载地址&#xff1a; https://download.csdn.net/download/qq_34904125/89374845 本代码是基于python pytorch环境安装的。 下载本代码后&#xff0c;有个requirement.txt文本&#xff0c;里面介绍了如何安装环境&#xff0c;环境需要自行配置。 或可直接参考下面博文…

苹果WWDC大会AI亮点:大揭晓

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Java网络通信实现

UDP UDPServer import java.io.IOException; import java.net.DatagramPacket; import java.net.DatagramSocket;public class UDPServer {public static void main(String[] args) throws IOException {System.out.println("UdpServer启动");// 创建upd套接字Data…

两台电脑通过网线直连共享数据(超详细)- 我的实践记录

原文链接 按照原文的操作&#xff0c;成功通过直连网线连接了两台windows电脑并共享传输数据。 ping不通可能是防火墙没关闭导致的&#xff0c;但是完全关闭防火墙又不安全。 那么有没有不关闭防火墙&#xff0c;能够上网&#xff0c;又能直连另一台电脑呢&#xff1f; 我们…

拓扑排序-java

主要通过宽度优先搜索&#xff08;BFS&#xff09;来实现有向无环图的拓扑序列&#xff0c;邻接表存储图。数组模拟单链表、队列&#xff0c;实现BFS基本操作。 文章目录 前言 一、有向图的拓扑序列 二、算法思路 1.拓扑序列 2.算法思路 三、使用步骤 1.代码如下&#xff08;示…

大模型实战-【Langchain4J中Using AI Services in Spring Boot Application②】

Using AI Services in Spring Boot Application LangChain4j Spring Boot starter greatly simplifies using AI Services in Spring Boot applications. @SystemMessage Now, let’s look at a more complicated example. We’ll force the LLM reply using slang 😉 Th…

QT 使用资源文件的注意点

不要存放没有使用的资源文件 即使在代码中没有使用到的资源文件&#xff0c;也会编译到执行文件或者DLL里面去这样会增大它的体积。如下 在代码没有使用这个资源文件(10.4M的2k图片)&#xff0c;但是编译出来的程序有 12M左右的大小 1 假设我们有一个比较复杂的项目&#…

Web前端学习之路:深入探索学习时长与技能进阶的奥秘

Web前端学习之路&#xff1a;深入探索学习时长与技能进阶的奥秘 在数字化时代&#xff0c;Web前端技术成为了连接用户与互联网世界的桥梁。对于初学者来说&#xff0c;学习Web前端究竟需要多久&#xff0c;以及如何高效掌握相关技能&#xff0c;一直是困扰他们的难题。本文将从…