Kafka命令行的使用/Spark-Streaming核心编程(二)

Kafka命令行的使用

创建topic

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic test1 --partitions 3 --replication-factor 3

分区数量,副本数量,都是必须的。

数据的形式:

主题名称-分区编号。

在Kafka的数据目录下查看。

设定副本数量,不能大于broker的数量。

2.2查看所有的topic

kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181

2.3查看某个topic的详细信息

kafka-topics.sh --describe --zookeeper node01:2181,node02:2181,node03:2181 --topic test1

ISR: In-Sync Replicas   可以提供服务的副本。

AR = ISR + OSR

2.4删除topic

kafka-topics.sh --delete --zookeeper node01:2181,node02:2181,node03:2181 --topic test1

2.5生产数据

kafka-console-producer.sh:

指定broker

指定topic

写数据的命令:

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test1

Spark-Streaming核心编程(二)

  1. 需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
  2. 导入依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    <version>3.0.0</version>
</dependency>

  1. 编写代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object DirectAPI {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("direct")

    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 定义 Kafka 相关参数
    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:9092,node02:9092,node03:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "kafka",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer]
    )

    // 通过读取 Kafka 数据,创建 DStream
    val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara)
    )

    // 提取出数据中的 value 部分
    val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())

    // wordCount 计算逻辑
    valueDStream.flatMap(_.split(" "))
      .map((_, 1))
      .reduceByKey(_ + _)
      .print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  1. 开启Kafka集群

  1. 开启Kafka生产者,产生数据

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafka

  1. 运行程序,接收Kafka生产的数据并进行相应处理

8)查看消费进度

kafka-consumer-groups.sh --describe --bootstrap-server node01:9092,node02:9092,node03:9092 --group kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/78640.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python3:Jupyterlab 安装和配置

Python3:Jupyterlab 安装和配置 Jupyter源于Ipython Notebook项目&#xff0c;是使用Python&#xff08;也有R、Julia、Node等其他语言的内核&#xff09;进行代码演示、数据分析、机器学习、可视化、教学的非常好的工具。 最新的基于web的交互式开发环境&#xff0c;适用于n…

快速排序及其在Unity游戏开发中的应用

一、快速排序(Quick Sort) 快速排序是一种**分治法(Divide and Conquer)**思想的排序算法,它的基本步骤是: 选一个基准元素(pivot):通常选第一个元素、最后一个元素,或者随机一个。分区(Partition):把数组分成两部分,小于等于 pivot 的放左边,大于 pivot 的放右…

【硬核干货】SonarQube安全功能

原文链接&#xff1a;【硬核干货】SonarQube安全功能 关于晓数神州 晓数神州坚持以“客户为中心”的宗旨&#xff0c;为客户提供专业的解决方案和技术服务&#xff0c;构建多引擎数字化体系。 核心业务1&#xff1a;聚焦DevOps全栈产品&#xff0c;打造需求管理、项目管理、开…

修改el-select背景颜色

修改el-select背景颜色 /* 修改el-select样式--直接覆盖默认样式&#xff08;推荐&#xff09; */ ::v-deep .el-select .el-input__inner {background-color: #1d2b72 !important; /* 修改输入框背景色 */color: #fff; } ::v-deep .el-select .el-input__wrapper {background-…

Unity-粒子系统:萤火虫粒子特效效果及参数

萤火虫特效由两部分组成。萤火虫粒子底色粒子面片。萤火虫的旋转飞动主要由 Noise参数和Color over Lifetime模块控制。 贴图&#xff1a;中间实周边虚的圆&#xff0c;可随意自行制作 Shader&#xff1a;Universal Render Pipeline/2D/Sprite-Lit-Default 以下是粒子详细参…

K8S Service 原理、图例——深度好文

一、理论介绍 1.1、3W 法则 1、是什么&#xff1f; Service 是一种为一组功能相同的 pod 提供单一不变的接入点的资源。当 Service 存在时&#xff0c;它的IP地址和端口不会改变。客户端通过IP地址和端口号与 Service 建立连接&#xff0c;这些连接会被路由到提供该 Service 的…

Alibaba Cloud Linux 3.2104 LTS 64位 容器优化版安装docker docker compose记录

整个安装过程耗时4小时。&#xff08;包含以下检查内容:&#xff09; 检查该linux版本信息&#xff08;并通过监控指标检查运行状态/cpu占用/内存占用/磁盘读取写入IOPS /同时连接数&#xff09; 1&#xff1a;根据当前的系统进行yum与dnf的升级&#xff0c;保持稳定修复的版本…

STM32N6570-DK ISP调试

STM32N6570-DK之ISP调试应用 准备工作-下载安装软件包:一、使用STM32CubeProgrammer给板子烧入STM32N6_ISP_IQTune_App_revC01-v1.1.0-trusted.bin。二、打开STM32 ISP IQTune.exe ,出现可连接端口:三、根据教程进行相应调试:准备工作-下载安装软件包: https://www.st.co…

12.thinkphp验证

一&#xff0e;验证器定义 1. 验证器的使用&#xff0c;我们必须先定义它&#xff0c;系统提供了一条命令直接生成想要的类&#xff1b; php think make:validate User 2. 这条命令会自动在应用目录下生成一个validate文件夹&#xff0c;并生成User.php类&#xff1b; class…

OpenWrt 与 Docker:打造轻量级容器化应用平台技术分享

文章目录 前言一、OpenWrt 与 Docker 的集成前提1.1 硬件与内核要求1.2 软件依赖 二、Docker 环境部署与验证2.1 基础服务配置2.2 存储驱动适配 三、容器化应用部署实践3.1 资源限制策略3.2 Docker Compose 适配 四、性能优化与监控4.1 容器资源监控4.2 镜像精简策略 五、典型问…

EasyRTC音视频实时通话嵌入式SDK,打造社交娱乐低延迟实时互动的新体验

一、方案背景 在数字化时代&#xff0c;社交娱乐已经成为人们生活中不可或缺的一部分。随着移动互联网和智能设备的普及&#xff0c;用户对实时互动的需求越来越高。EasyRTC作为一款基于WebRTC技术的实时音视频通信解决方案&#xff0c;凭借其低延迟、高稳定性和跨平台兼容性&…

软件编程命名规范

编程命名规范是保证代码可读性、可维护性和团队协作效率的重要基础。以下是涵盖主流编程语言的通用命名规范&#xff0c;结合行业最佳实践和常见规范&#xff08;如Google、Microsoft、Airbnb等风格指南&#xff09;&#xff1a; 一、通用命名原则 清晰优先&#xff1a;名称应…

换张电话卡能改变IP属地吗?一文解读

在互联网时代&#xff0c;IP属地&#xff08;即网络定位信息&#xff09;的显示引发了许多用户的关注。有人好奇&#xff1a;更换电话卡&#xff08;SIM卡&#xff09;是否能改变自己的IP属地&#xff1f;本文将解析IP属地的定义、电话卡的作用&#xff0c;并深入探讨两者之间的…

前端:纯HTML、CSS和JS菜单样式

实现了一个多级折叠菜单系统,使用纯HTML、CSS和JavaScript(无任何框架) 一、二级菜单展开 1、实现效果 初始状态-展示全部一级菜单 选中共状态,一级标题选中共为蓝色背景色,二级标题选中共为蓝色文字,展开右侧图标为-,后缩状态右侧图标为+ 2、实现 ​​HTML结构​​ …

Centos8 安装 Docker

yum 更换国内源 1. 备份原 yum 配置 cd /etc/yum.repos.d/ mkdir backup mv *.repo backup/2. 下载新 yum 配置&#xff08;阿里源&#xff09; wget -O /etc/yum.repos.d/CentOS-Base.repo https://mirrors.aliyun.com/repo/Centos-8.repo3. 替换源中的系统版本变量 sed -…

AI测试工具Testim——告别自动化测试维护难题

随着人工智能技术的快速发展&#xff0c;AI测试工具正在成为提升软件研发效能的关键。每款AI的特性各有差异&#xff0c;今天&#xff0c;我们就给大家介绍一款专注于Web和移动应用的端到端的AI测试工具--Testim。 Testim的简介 官网地址&#xff1a;https://www.testim.io/ 简…

【默子AI】万字长文:MCP与A2A协议详解

【默子AI】万字长文&#xff1a;MCP与A2A协议详解 引言&#xff1a; 让一个大模型凭空解决所有问题&#xff0c;就像让一个书呆子不借助工具就去修汽车 即便他脑子里装满了理论知识&#xff0c;也缺少实践的“手脚”。 长期以来&#xff0c;AI助手&#xff08;尤其是LLM&#x…

LeNet5 神经网络的参数解析和图片尺寸解析

1.LeNet-5 神经网络 以下是针对 LeNet-5 神经网络的详细参数解析和图片尺寸变化分析&#xff0c;和原始论文设计&#xff0c;通过分步计算说明各层的张量变换过程。 经典的 LeNet-5架构简化版&#xff08;原始论文输入为 32x32&#xff0c;MNIST 常用 28x28 需调整&#xff09…

第二节:文件系统

理论知识 文件系统的基本概念&#xff1a;文件系统是操作系统中负责管理持久数据的子系统&#xff0c;它将数据组织成文件和目录的形式&#xff0c;方便用户存储和访问数据。Linux文件系统的类型&#xff1a;常见的 Linux 文件系统类型有 Ext2、Ext3、Ext4、XFS、Btrfs 等。Ex…

Python数据结构与算法(5)——动态规划

Python数据结构与算法(5)——动态规划 0. 学习目标1. 动态规划的基本概念1.1 什么是动态规划1.2 动态规划的核心思想1.3 动态规划的适用条件2. 动态规划的实现思路2.1 自顶向下:备忘录法 (Memoization)2.2 自底向上:表格法(Tabulation)3. 0/1 背包问题4. 最长公共子序列5…