Flink订阅Kafka消息队列实战案例

1、Kafka介绍

Kafka是一款开源的分布式消息系统,最初由LinkedIn公司开发并开源。它被设计用于处理海量的实时数据流,可以支持高吞吐量和低延迟的数据传输。

Kafka的设计主要目标是提供一个持久化的、高吞吐量的、可扩展的、分布式发布/订阅消息系统,以解决实时数据处理的需求。它基于发布/订阅模型,通过将消息发布到主题(Topic)并让订阅者订阅相关主题,实现了消息的生产者和消费者之间的解耦。

Kafka的架构和设计思想非常灵活,主要由以下几个核心组件组成:

Producer(生产者): 负责将消息发布到Kafka集群中的指定主题。

Consumer(消费者): 订阅并消费特定主题的消息。

Topic(主题): 消息的类别或者主题分类,消息被发布到特定的主题,消费者可以订阅感兴趣的主题。

Broker(代理): Kafka集群中的服务器节点,负责消息的存储和转发。

Partition(分区): 每个主题可以分为多个分区,每个分区都是有序且持久化的消息记录队列。

Producer API(生产者API)和Consumer API(消费者API): 提供了丰富的编程接口,方便开发者在应用程序中集成Kafka。

Kafka的特点包括:

高性能:Kafka可以支持每秒数十万条消息的高吞吐量处理。

可扩展性:Kafka的持久化消息存储和分区机制可以方便地进行水平扩展。

容错性:Kafka具备良好的容错机制,即使在某个节点故障的情况下,仍然可以保证消息的可靠传输。

消息保留:Kafka可以根据配置保留消息的时间或大小限制。

多语言支持:Kafka提供了多种编程语言的客户端,方便开发者使用不同语言来集成和使用Kafka。

Kafka广泛应用于各种领域,特别是大数据处理、实时流处理和日志收集等场景。

2、Flink介绍

Flink(Apache Flink)是一种开源的流处理和批处理框架,它提供了可靠、高性能、可伸缩的大数据处理能力。Flink最初由德国柏林工业大学(Berlin TU)的一个研究小组开发,并于2014年成为Apache软件基金会的顶级项目。

Flink的设计目标是实时流处理和批处理的无缝融合,它提供了统一的数据处理模型,使得开发人员可以方便地编写和运行具有低延迟和高吞吐量需求的大规模数据处理应用。

Flink的核心概念是流(Stream)和转换(Transformation)。应用程序通过定义数据流(DataStream)来描述输入数据和计算过程,并且可以应用各种转换操作(如过滤、转换、合并等)对数据流进行操作和处理。Flink提供了丰富的转换函数和算子,可以轻松地实现各种复杂的数据处理逻辑。

Flink具有以下特点:

低延迟和高吞吐量:Flink采用了基于内存的流式计算模型,能够实现毫秒级的实时响应。

Exactly-Once语义:Flink可以确保数据处理的精确一次性,即数据不会丢失也不会重复处理。

可容错性:Flink通过在集群中保存数据的一致性检查点(Checkpoint)来提供故障恢复和容错处理能力。

状态管理:Flink能够在处理过程中维护和管理状态,这对于处理窗口操作和流-流连接等场景非常重要。

可伸缩性:Flink可以方便地进行水平扩展,支持集群模式和分布式部署。

支持大规模数据处理:Flink可以处理海量数据,适用于大数据和实时流处理等场景。

Flink在实时流处理、批处理、连续查询、机器学习和图分析等领域得到了广泛应用。它提供了易于使用的API和丰富的生态系统,可以与主流的大数据存储和计算平台(如Hadoop、Kafka、Cassandra等)进行无缝集成,为用户提供了强大的数据处理能力和灵活性。

3、Flink订阅Kafka消息实战代码

import java.util.Properties// 配置Kafka的属性
val properties: Properties = new Properties
// 设置服务
properties.setProperty("bootstrap.servers", "bigdata_server1:9092,bigdata_server2:9092,bigdata_server3:9092")
// 设置消费者组
properties.setProperty("group.id", "test_group")
// kafka反序列化消息是在消费端
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// 由于网络传输过来的是byte[],只有反序列化后才能得到生产者发送的真实的消息内容。
// 属性key.deserializer和value.deserializer就是key和value指定的反序列化方式
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
// 指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下改如何处理。
properties.setProperty("auto.offset.reset", "latest")
import main.flink.com.bg.Config.Config
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchemaval env: StreamExecutionEnvironment    // Flink执行环境
val topic : String                     // 话题// 创建一个消费者,读取指定topic话题的消息
val consumer = new FlinkKafkaConsumer(topic,                                         // Kafka话题new JSONKeyValueDeserializationSchema(true),Config.getKafkaProperties()                   // Kafka配置,类型
)// 将Kafka消费者添加到输入源
val stream = env.addSource(consumer)  // 返回DataStream[ObjectNode]类型
// 打印并执行消息
stream.print()
env.execute("flink kafka demo")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/4234.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PHP客服系统-PhpWorkmanChat客服系统修改管理员密码

作为一款流行的开源PHP客服系统,基于thinkphp和workman,跨平台轻量级客服系统源码 管理员表是v2_admin 账户是admin,如果密码忘记了怎么办。可以直接修改数据库表v2_admin ,密码规则是md5(密码 加密盐) , 加密盐可以在…

开发工具篇第二十六讲:使用IDEA进行本地调试和远程调试

开发工具篇第二十六讲:使用IDEA进行本地调试和远程调试 Debug用来追踪代码的运行流程,通常在程序运行过程中出现异常,启用Debug模式可以分析定位异常发生的位置,以及在运行过程中参数的变化;并且在实际的排错过程中&am…

Ceph分布式存储系统

Ceph分布式存储系统 一、存储基础1、单机存储设备2、单机存储的问题 二、分布式存储(软件定义的存储SDS)1、分布式存储介绍2、分布式存储的类型3、Ceph简介4、Ceph优势5、Ceph架构6、Ceph核心组件7、OSD存储后端8、Ceph 数据的存储过程9、Ceph 版本发行生…

移动App安全检测的重要性,好用的App安全测试工具分享

一、移动App安全检测的重要性 在移动互联网时代,移动App成为人们生活不可或缺的一部分,人们使用App处理各种个人和敏感信息,因此保护用户的隐私和数据安全至关重要。而移动App安全检测是保障用户隐私和数据安全的重要环节。通过安全检测&…

【AT89C52单片机项目】数字密码锁设计

实验目的 使用单片机设计数字密码锁。 实验仪器 一套STC89C52RC开发板套件,包括STC89C52RC开发板,以及USB烧录线。 设计要求 1、有设置密码、开锁工作模式; 2、可以每次都设置密码,也可以设置一次密码多次使用。 实验原理 …

java电子病历系统源码

电子病历系统采取结构化与自由式录入的新模式,自由书写,轻松录入。化实现病人医疗记录(包含有首页、病程记录、检查检验结果、医嘱、手术记录、护理记录等等。)的保存、管理、传输和重现,取代手写纸张病历。不仅实现了…

WordPress删除长时间未登录的垃圾用户

有一段时间没看网站,昨天来翻了一下,没想到飞龙出海已经有一万多注册用户了,这些用户中会员只有1000多号人,99%是注册不付费的白嫖党,可能你们觉得一万注册用户很牛逼,但是对于我来说,任何不付费…

存储服务的演化与MySQL分库分表

文章目录 一、存储服务的演化1.单体结构2.单表单库的数据量膨胀 -> 分库分表3.单个MySQL的读写压力过大 -> MySQL索引优化4.进一步缓解MySQL读写压力 -> 读写分离5.冷热数据分离 -> 使用Redis缓存 二、MySQL分库分表1.策略2.需要注意的问题 一、存储服务的演化 1.…

3D全景虚拟旅游在旅游行业中具备哪些应用价值?

在网络强国战略指引下,我们的网络基础设施建设步伐正在加快,与此同时,虚拟技术也在不断的更新迭代,虚拟旅游也逐渐崭露头角,将真实世界中的景点、文化以及历史场景等数字化,让游客身临其境地感受这些景点和…

HBase(一)HBase v2.2 高可用多节点搭建

最近刚刚完成了HBase相关的一个项目,作为项目的技术负责人,完成了大部分的项目部署,特性调研工作,以此系列文章作为上一阶段工作的总结. 前言 其实目前就大多数做应用的情况来讲,我们并不需要去自己搭建一套HBase的集群,现有的很多云厂商提供的服务已经极大的方便日常的应用使…

javaweb使用Thymeleaf 最凝练的CRUD项目-中

javaweb使用Thymeleaf 最凝练的CRUD项目-中 6、显示首页 ①目标 浏览器访问index.html&#xff0c;通过首页Servlet&#xff0c;渲染视图&#xff0c;显示首页。 ②思路 ③代码 [1]创建PortalServlet <servlet><servlet-name>PortalServlet</servlet-name…

怎么用Midjourney制作表情包

要使用Midjourney制作表情包&#xff0c;可以按照以下步骤进行操作&#xff1a; 1. 打开Midjourney的官方网站或下载Midjourney应用程序&#xff0c;并登录你的账户。 2. 在Midjourney中&#xff0c;选择创建新项目或表情包。 3. 在项目中&#xff0c;你可以选择使用预设的模…

spring带bean和config,通过main启动测试

main方法&#xff1a; package com.xxx.tmp; import org.springframework.context.annotation.AnnotationConfigApplicationContext; public class Main {public static void main(final String[] args) {final AnnotationConfigApplicationContext applicationContext new An…

为什么hive表不经常用索引

Hive 表不经常使用索引的主要原因是由于其设计初衷和使用场景的特点。下面是一些可能的解释&#xff1a; 1. 批处理性能为主 Hive 主要用于处理大规模数据集的批量分析任务&#xff0c;而不是对单个记录的实时查询。对于批处理任务&#xff0c;全表扫描通常是更为高效的方式&…

类 和 对象

目录 1、面向对象编程 2、面向对象编程 2.1面向对象编程特征 3、类和对象的概念 3.1类的定义 3.11属性 3.12方法 3.13重载 3.14递归 3.13返回值return 3.2对象 3.2.1对象组合 4、jvm内主要三块内存空间 5、参数传值 1、面向对象编程 面向过程&#xff1a;关注的是步骤…

spring boot 上传文件的大小限制

spring boot 上传文件的大小限制 根据spring boot 版本不同在application.properties文件添加不同的配置 Spring Boot 1.3 或之前的版本&#xff0c;配置: multipart.maxFileSize 500Mb multipart.maxRequestSize500MbSpring Boot 1.4 版本后配置更改为: spring.http.multi…

将大模型集成到语音识别系统中的例子

概述 本文旨在探索将大型语言模型&#xff08;LLMs&#xff09;集成到自动语音识别&#xff08;ASR&#xff09;系统中以提高转录准确性的潜力。 文章介绍了目前的ASR方法及其存在的问题&#xff0c;并对使用LLMs的上下文学习能力来改进ASR系统的性能进行了合理的动机论证。 本…

深度学习标量、向量、矩阵、张量之间的区别与联系

文章目录 前言1、张量**注意**&#xff1a; 2、**标量** (scalar)&#xff1a;0阶的张量&#xff0c;0个轴&#xff0c;一个单独的数(整数或实数)&#xff1b;3、**向量**(vector)&#xff1a;1阶的张量&#xff0c;也叫矢量&#xff0c;1个轴&#xff0c;一个数组&#xff1b;…

那些你必须知道的4个matlab小技巧(附最新安装包)

文末福利&#xff1a;MATLAB R2022b软件安装包 MATLAB 简介 01 一、MATLAB简介 数学类科技应用软件包括数值计算&#xff08;Number Crunching&#xff09;型软件和数学分析&#xff08;Math Analysis&#xff09;型软件 数值计算型软件 它们对大批数据具有较强的管理、计…

微信小程序下拉刷新获取数据和触底事件刷新实现

一、下拉刷新 1.json文件 说明&#xff1a;开启下拉刷新&#xff0c;然后设置窗口的背景色&#xff0c;方便观看。 "enablePullDownRefresh": true,"backgroundColor":"#FFC0CB" 2. js文件 说明&#xff1a;重新发起请求&#xff0c;并显示加…