Spark-streaming核心编程

1.导入依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>

<version>3.0.0</version>

</dependency>

2.编写代码

创建SparkConfStreamingContext

定义Kafka相关参数,如bootstrap serversgroup idkeyvaluedeserializer

使用KafkaUtils.createDirectStream方法创建DStream,该方法接受StreamingContext、位置策略、消费者策略等参数。

提取数据中的value部分,并进行word count计算。

启动StreamingContext并等待其终止。

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.{Seconds, StreamingContext}

import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object DirectAPI {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf,Seconds(3))

    //定义kafka相关参数

    val kafkaPara :Map[String,Object] = Map[String,Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG

      ->"node01:9092,node02:9092,node03:9092",

      ConsumerConfig.GROUP_ID_CONFIG->"kafka",

      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",

      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"

    )

    //通过读取kafka数据,创建DStream

    val kafkaDStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream[String,String](

      ssc,LocationStrategies.PreferConsistent,

      ConsumerStrategies.Subscribe[String,String](Set("kafka"),kafkaPara)

    )

    //提取出数据中的value部分

    val valueDStream :DStream[String] = kafkaDStream.map(record=>record.value())

    //wordCount计算逻辑

    valueDStream.flatMap(_.split(" "))

      .map((_,1))

      .reduceByKey(_+_)

      .print()

    ssc.start()

    ssc.awaitTermination()

  }

  }

3.运行程序

开启Kafka集群。

4.使用Kafka生产者产生数据。

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

​5运行Spark Streaming程序,接收Kafka生产的数据并进行处理。

6.查看消费进度

使用Kafka提供的kafka-consumer-groups.sh脚本查看消费组的消费进度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/76975.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Kafka的ISR机制是什么?如何保证数据一致性?

一、Kafka ISR机制深度解析 1. ISR机制定义 ISR&#xff08;In-Sync Replicas&#xff09;是Kafka保证数据一致性的核心机制&#xff0c;由Leader副本&#xff08;复杂读写&#xff09;和Follower副本(负责备份)组成。当Follower副本的延迟超过replica.lag.time.max.ms&#…

Docker 基本概念与安装指南

Docker 基本概念与安装指南 一、Docker 核心概念 1. 容器&#xff08;Container&#xff09; 容器是 Docker 的核心运行单元&#xff0c;本质是一个轻量级的沙盒环境。它基于镜像创建&#xff0c;包含应用程序及其运行所需的依赖&#xff08;如代码、库、环境变量等&#xf…

数据库监控 | MongoDB监控全解析

PART 01 MongoDB&#xff1a;灵活、可扩展的文档数据库 MongoDB作为一款开源的NoSQL数据库&#xff0c;凭借其灵活的数据模型&#xff08;基于BSON的文档存储&#xff09;、水平扩展能力&#xff08;分片集群&#xff09;和高可用性&#xff08;副本集架构&#xff09;&#x…

OpenFeign和Gateway

OpenFeign和Gateway 一.OpenFeign介绍二.快速上手1.引入依赖2.开启openfeign的功能3.编写客户端4.修改远程调用代码5.测试 三.OpenFeign参数传递1.传递单个参数2.多个参数、传递对象和传递JSON字符串3.最佳方式写代码继承的方式抽取的方式 四.部署OpenFeign五.统一服务入口-Gat…

spark-streaming(二)

DStream创建&#xff08;kafka数据源&#xff09; 1.在idea中的 pom.xml 中添加依赖 <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version> </…

JAVA聚焦OutOfMemoryError 异常

个人主页 文章专栏 在正文开始前&#xff0c;我想多说几句&#xff0c;也就是吐苦水吧…最近这段时间一直想写点东西&#xff0c;停下来反思思考一下。 心中万言&#xff0c;真正执笔时又不知先写些什么。通常这个时候&#xff0c;我都会随便写写&#xff0c;文风极像散文&…

如何在Spring Boot中配置自定义端口运行应用程序

Spring Boot 应用程序默认在端口 8080 上运行嵌入式 Web 服务器&#xff08;如 Tomcat、Jetty 或 Undertow&#xff09;。然而&#xff0c;在开发、测试或生产环境中&#xff0c;开发者可能需要将应用程序配置为在自定义端口上运行&#xff0c;例如避免端口冲突、适配微服务架构…

linux嵌入式(进程与线程1)

Linux进程 进程介绍 1. 进程的基本概念 定义&#xff1a;进程是程序的一次执行过程&#xff0c;拥有独立的地址空间、资源&#xff08;如内存、文件描述符&#xff09;和唯一的进程 ID&#xff08;PID&#xff09;。 组成&#xff1a; 代码段&#xff1a;程序的指令。 数据…

智驭未来:NVIDIA自动驾驶安全白皮书与实验室创新实践深度解析

一、引言&#xff1a;自动驾驶安全的范式革新 在当今数字化浪潮的推动下&#xff0c;全球自动驾驶技术正大步迈入商业化的深水区。随着越来越多的自动驾驶车辆走上道路&#xff0c;其安全性已成为整个行业乃至社会关注的核心命题。在这个关键的转折点上&#xff0c;NVIDIA 凭借…

多模态大模型 Qwen2.5-VL 的学习之旅

Qwen-VL 是阿里云研发的大规模视觉语言模型&#xff08;Large Vision Language Model, LVLM&#xff09;。Qwen-VL 可以以图像、文本、检测框作为输入&#xff0c;并以文本和检测框作为输出。Qwen-VL 系列模型性能强大&#xff0c;具备多语言对话、多图交错对话等能力&#xff…

Redis 与 Memcache 全面对比:功能、性能与应用场景解析

Redis 和 Memcache 都是常用的内存数据库&#xff0c;以下是它们在多个方面的能力比较&#xff1a; 一、数据类型 Redis&#xff1a;支持丰富的数据类型&#xff0c;如字符串&#xff08;String&#xff09;、哈希&#xff08;Hash&#xff09;、列表&#xff08;List&#x…

Oracle--PL/SQL编程

前言&#xff1a;本博客仅作记录学习使用&#xff0c;部分图片出自网络&#xff0c;如有侵犯您的权益&#xff0c;请联系删除 PL/SQL&#xff08;Procedural Language/SQL&#xff09;是Oracle数据库中的一种过程化编程语言&#xff0c;构建于SQL之上&#xff0c;允许编写包含S…

新增优惠券

文章目录 概要整体架构流程技术细节小结 概要 接口分析 一个基本的新增接口&#xff0c;按照Restful风格设计即可&#xff0c;关键是请求参数。之前表分析时已经详细介绍过这个页面及其中的字段&#xff0c;这里不再赘述。 需要特别注意的是&#xff0c;如果优惠券限定了使…

力扣面试经典150题(第二十三题)- KMP算法

问题 给你两个字符串 haystack 和 needle &#xff0c;请你在 haystack 字符串中找出 needle 字符串的第一个匹配项的下标&#xff08;下标从 0 开始&#xff09;。如果 needle 不是 haystack 的一部分&#xff0c;则返回 -1 。 示例 1&#xff1a; 输入&#xff1a;haysta…

PostgreSQL 的 MVCC 机制了解

PostgreSQL 的 MVCC 机制了解 PostgreSQL 使用多版本并发控制(MVCC)作为其核心并发控制机制&#xff0c;这是它与许多其他数据库系统的关键区别之一。MVCC 允许读操作不阻塞写操作&#xff0c;写操作也不阻塞读操作&#xff0c;从而提供高度并发性。 一 MVCC 基本原理 1.1 M…

互联网大厂Java面试:RocketMQ、RabbitMQ与Kafka的深度解析

互联网大厂Java面试&#xff1a;RocketMQ、RabbitMQ与Kafka的深度解析 面试场景 面试官&#xff1a;马架构&#xff0c;您好&#xff01;欢迎参加我们的面试。今天我们将围绕消息中间件展开讨论&#xff0c;尤其是RocketMQ、RabbitMQ和Kafka。您有十年的Java研发和架构设计经…

《巧用DeepSeek快速搞定数据分析》书籍分享

文章目录 前言内容简介作者简介购书链接书籍目录 前言 随着大数据时代的到来&#xff0c;数据分析和人工智能技术正迅速改变着各行各业的运作方式。DeepSeek作为先进的人工智能模型&#xff0c;不仅在自然语言处理领域具有广泛应用&#xff0c;还在数据分析、图像识别、推荐系…

4.Three.js 中 Camera 摄像机详解

一、什么是 Camera&#xff1f; 在 Three.js 中&#xff0c;Camera&#xff08;摄像机&#xff09;决定了我们如何观察三维场景。 你可以把它理解为我们“眼睛”的位置和方向&#xff0c;场景中的物体再复杂&#xff0c;如果没有摄像机&#xff0c;就没有“观察角度”&#x…

gem5-gpu教程03 当前的gem5-gpu软件架构(因为涉及太多专业名词所以用英语表达)

Current gem5-gpu Software Architecture 这是当前gem5-gpu软件架构的示意图。 Ruby是在gem5-gpu上下文中用于处理CPU和GPU之间内存访问的高度可配置的内存系统 CudaCore (src/gpu/gpgpu-sim/cuda_core.*, src/gpu/gpgpu-sim/CudaCore.py) Wrapper for GPGPU-Sim shader_cor…

负载均衡的实现方式有哪些?

负载均衡实现方式常见的有: 软件负载均衡、硬件负载均衡、DNS负载均衡 扩展 二层负载均衡&#xff1a;在数据链路层&#xff0c;基于MAC地址进行流量分发&#xff0c;较少见于实际应用中 三层负载均衡&#xff1a;在网络层&#xff0c;基于IP地址来分配流量&#xff0c;例如某…