Spark-Streaming+Kafka+mysql实战示例

文章目录

  • 前言
  • 一、简介
    • 1. Spark-Streaming简介
    • 2. Kafka简介
  • 二、实战演练
    • 1. MySQL数据库部分
    • 2. 导入依赖
    • 3. 编写实体类代码
    • 4. 编写kafka主题管理代码
    • 5. 编写kafka生产者代码
    • 6. 编写Spark-Streaming代码
  • 总结


前言

本文将介绍一个使用Spark Streaming和Kafka进行实时数据处理的示例。通过该示例,读者将了解到如何使用Spark Streaming和Kafka处理实时数据流,以及如何将处理后的数据保存到MySQL数据库中。示例涵盖了从环境搭建到代码实现的全过程,帮助读者快速上手实时数据处理的开发。


一、简介

1. Spark-Streaming简介

Spark Streaming是Apache Spark的一个组件,用于实时流数据处理。它提供了高级别的API,可以使用类似于批处理的方式处理实时数据流。Spark Streaming可以与各种消息队列系统集成,包括Kafka、RabbitMQ等。

2. Kafka简介

Kafka是一个分布式流处理平台,具有高吞吐量、可扩展性和可靠性。它提供了一种可持久化、分布式、分区的日志服务,用于处理实时数据流。Kafka使用发布-订阅模型,消息被发布到一个或多个主题,然后由订阅该主题的消费者进行消费。

二、实战演练

1. MySQL数据库部分

这部分代码用于创建MySQL数据库和数据表,以及将从Kafka获取的数据保存到数据库中。

create database kafkademo;

创建数据表:

CREATE TABLE kafka_tb
(`txid`      varchar(255) PRIMARY KEY,`version`   varchar(255),`connector` varchar(255),`name`      varchar(255),`ts_ms`     varchar(255),`snapshot`  varchar(255),`db`        varchar(255),`sequence`  varchar(255),`schema`    varchar(255),`table`     varchar(255),`lsn`       varchar(255),`xmin`      varchar(255)
);

2. 导入依赖

这部分代码是Maven的依赖配置,用于引入所需的Spark、Kafka和MySQL相关的库。

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.4.0</version>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.8.0</version>
</dependency>
<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><scope>compile</scope>
</dependency>

3. 编写实体类代码

这部分代码定义了一个Java类EntityMessage,用于将从Kafka获取的JSON数据转换为Java对象。

import lombok.Data;import java.io.Serializable;@Data
public class EntityMessage implements Serializable {private String op;private String ts_ms;private String transaction;private DataItem dataItem;@Datapublic static class DataItem {private String version;private String connector;private String name;private String ts_ms;private String snapshot;private String db;private String[] sequence;private String schema;private String table;private String txId;private String lsn;private String xmin;}
}

4. 编写kafka主题管理代码

这部分代码用于创建、删除和修改Kafka主题的一些操作。

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;import java.util.*;
import java.util.concurrent.ExecutionException;public class KafkaTopicManager {private static final String BOOTSTRAP_SERVERS = "192.168.145.103:9092,192.168.145.104:9092,192.168.145.105:9092";public void createTopic(String topicName, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/211572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

实战1-python爬取安全客新闻

一般步骤&#xff1a;确定网站--搭建关系--发送请求--接受响应--筛选数据--保存本地 1.拿到网站首先要查看我们要爬取的目录是否被允许 一般网站都会议/robots.txt目录&#xff0c;告诉你哪些地址可爬&#xff0c;哪些不可爬&#xff0c;以安全客为例子 2. 首先测试在不登录的…

Docker Network(网络)——8

目录&#xff1a; Docker 为什么需要网络管理Docker 网络架构简介 CNMLibnetwork驱动常见网络类型 bridge 网络host 网络container 网络none 网络overlay 网络docker 网络管理命令 docker network createdocker network inspectdocker network connectdocker network disconne…

class072 最长递增子序列问题与扩展【算法】

class072 最长递增子序列问题与扩展【算法】 code1 300. 最长递增子序列 // 最长递增子序列和最长不下降子序列 // 给定一个整数数组nums // 找到其中最长严格递增子序列长度、最长不下降子序列长度 // 测试链接 : https://leetcode.cn/problems/longest-increasing-subsequen…

830. 单调栈

​​​​​​ ​​​​​​830. 单调栈 - AcWing题库 给定一个长度为 N 的整数数列&#xff0c;输出每个数左边第一个比它小的数&#xff0c;如果不存在则输出 −1−1。 输入格式 第一行包含整数 N&#xff0c;表示数列长度。 第二行包含 N个整数&#xff0c;表示整数数列…

你知道MySQL中 group by 怎么优化吗

更好的阅读体验&#xff0c;请点击 YinKai s Blog。 ​ 在 MySQL 中 group by 用于按照一个或多个列对结果集进行分组。在讨论 group by 怎么优化之前&#xff0c;我们先来看看 group by 的执行流程&#xff0c;这样我们才能对症下药。 group by 执行流程 ​ 我们先用下面的 …

Ubuntu 18.04使用Qemu和GDB搭建运行内核的环境

安装busybox 参考博客&#xff1a; 使用GDBQEMU调试Linux内核环境搭建 一文教你如何使用GDBQemu调试Linux内核 ubuntu22.04搭建qemu环境测试内核 交叉编译busybox 编译busybox出现Library m is needed, can’t exclude it (yet)的解释 S3C2440 制作最新busybox文件系统 https:…

block-recurrent-transformer-pytorch 学习笔记

目录 有依赖项1&#xff1a; 没有依赖项&#xff0c;没有使用例子 没有依赖项2&#xff1a; 有依赖项1&#xff1a; GitHub - dashstander/block-recurrent-transformer: Pytorch implementation of "Block Recurrent Transformers" (Hutchins & Schlag et a…

gd32和stm32的区别

gd32和stm32的区别 现在的市场上有很多种不同类型的微控制器&#xff0c;其中比较常见的有两种&#xff0c;即gd32和stm32。两种微控制器都是中国和欧洲的两个公司分别推出的&#xff0c;但是它们之间有很多区别&#xff0c;本文将会深入探讨这些区别。 1.起源和历史 gd32是…

2024年网络安全竞赛-Web安全应用

Web安全应用 (一)拓扑图 任务环境说明: 1.获取PHP的版本号作为Flag值提交;(例如:5.2.14) 2.获取MySQL数据库的版本号作为Flag值提交;(例如:5.0.22) 3.获取系统的内核版本号作为Flag值提交;(例如:2.6.18) 4.获取网站后台管理员admin用户的密码作为Flag值提交…

udp多播组播

import socket ,struct,time# 组播地址和端口号 MCAST_GRP 239.0.0.1 MCAST_PORT 8888 # 创建UDP socket对象 sock socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # 绑定socket对象到本地端口号 # sock.bind((MCAST_GRP, MCAST_PORT)) …

【4】PyQt输入框

1. 单行文本输入框 QLineEdit控件可以输入单行文本 from PyQt5.QtWidgets import QApplication, QWidget, QLineEdit, QVBoxLayout from PyQt5.QtCore import * from PyQt5.QtGui import QIcon import sysdef init_widget(w: QWidget):# 修改窗口标题w.setWindowTitle(单行输…

前端面试——CSS面经(持续更新)

1. CSS选择器及其优先级 !important > 行内样式 > id选择器 > 类/伪类/属性选择器 > 标签/伪元素选择器 > 子/后台选择器 > *通配符 2. 重排和重绘是什么&#xff1f;浏览器的渲染机制是什么&#xff1f; 重排(回流)&#xff1a;当增加或删除dom节点&…

【面试经典150 | 二叉树】从中序与后序遍历序列构造二叉树

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a;递归 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带一些对于本题涉及到的数据结构等内容…

Android : Room 数据库的基本用法 —简单应用

1.Room介绍&#xff1a; Android Room 是 Android 官方提供的一个持久性库&#xff0c;用于在 Android 应用程序中管理数据库。它提供了一个简单的 API 层&#xff0c;使得使用 SQLite 数据库变得更加容易和方便。 以下是 Android Room 的主要特点&#xff1a; 对象关系映射…

9.MySQL 索引

目录 ​​​​​​​概述 概念&#xff1a; 单列索引 普通索引 创建索引 查看索引 删除索引 唯一索引 创建唯一索引 删除唯一索引 主键索引 组合索引 创建索引 全文索引 概述 使用全文索引 空间索引 内部原理 相关算法&#xff1a; hash算法 二叉树算法 …

Spring基于XML文件配置AOP

AOP AOP&#xff0c;面向切面编程&#xff0c;是对面向对象编程OOP的升华。OOP是纵向对一个事物的抽象&#xff0c;一个对象包括静态的属性信息&#xff0c;包括动态的方法信息等。而AOP是横向的对不同事物的抽象&#xff0c;属性与属性、方法与方法、对象与对象都可以组成一个…

12.10多种编码方式,编码方案选择策略(递归级联),PDE,RLE代码

作者如何选择和设计编码方案&#xff0c;以实现高效的解压缩和高压缩比&#xff1f;BtrBlocks是否适用于所有类型的数据&#xff1f; 选择和设计编码方案&#xff1a; 结合多种高效编码方案&#xff1a;BtrBlocks 通过选择一组针对不同数据分布的高效编码方案&#xff0c;实现…

js判断是否对象自身为空

文章目录 一、前言二、JSON.stringify三、for in 配合 hasOwnProperty四、Object.keys五、Object.getOwnPropertyNames六、Object.getOwnPropertyNames 结合 Object.getOwnPropertySymbols七、Reflect.ownKeys八、最后 一、前言 如何判断一个对象为空&#xff1f; 先上结论&a…

MySql复习笔记03(小滴课堂) 事务,视图,触发器,存储过程

mysql 必备核心知识之事务的详细解析&#xff1a; 创建一个数据库表&#xff1a; 添加数据并开启事务。 添加数据并查询。 登录另一台服务器发现查不到这个表中的数据。 这是因为事务开启了&#xff0c;但是没有提交&#xff0c;只是把数据存到了内存中&#xff0c;还没有写入…