Flink mongo Kafka

Apache Flink 是一个流处理和批处理的开源平台,用于在分布式环境中处理无界和有界数据流。它提供了用于数据处理的数据流 API(DataStream API)和表 API(Table API),并可以与各种外部数据源和存储系统进行交互。
MongoDB 是一个基于文档的 NoSQL 数据库,它提供了高性能、可扩展和灵活的数据存储。而 Apache Kafka 是一个流处理平台,它允许发布和订阅记录流,类似于消息队列或企业消息系统。
当 Flink 与 MongoDB 和 Kafka 结合使用时,可以构建强大的数据处理管道,用于实时数据流分析和批处理任务。以下是这些组件结合使用时可能的一些用途:

  1. Flink 与 Kafka:
  • Flink 可以作为 Kafka 的消费者(Consumer),从 Kafka 主题(Topics)中读取数据流,并对其进行实时处理。
  • Flink 也可以将数据写入 Kafka,使其成为一个中间存储或数据传递的桥梁。
  • 通过 Flink 的时间窗口和状态管理等特性,可以对 Kafka 中的数据流进行复杂的实时分析。
  1. Flink 与 MongoDB:
  • Flink 可以从 MongoDB 中读取数据,用于批处理或实时分析。
  • Flink 也可以将处理后的数据写入 MongoDB,用于持久化存储或进一步的数据分析。
    使用 Flink 的表 API(Table API)和 SQL 支持,可以方便地对 MongoDB 中的数据进行查询和分析。
  1. Kafka、Flink 和 MongoDB 结合使用:
  • Kafka 可以作为数据源,提供实时数据流给 Flink 进行处理。
  • Flink 对 Kafka 中的数据流进行实时分析,并可能将结果写入 MongoDB 进行存储。
  • MongoDB 中的数据也可以作为 Flink 批处理任务的输入,用于历史数据分析或与其他数据源进行联合分析。

MONGO 2 KAFKA

下面例子是从mongo获取数据插入到kafka:
代码:

public class MongoDBToKafka {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置MongoDB源MongoSource<String> mongoSource = MongoSource.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection("wellCastingInfo")
//                .setProjectedFields("_id", "f0", "f1").setFetchSize(2048).setLimit(10000).setNoCursorTimeout(true).setPartitionStrategy(PartitionStrategy.SAMPLE).setPartitionSize(MemorySize.ofMebiBytes(64)).setSamplesPerPartition(10).setDeserializationSchema(new MongoDeserializationSchema<String>() {@Overridepublic String deserialize(BsonDocument document) {return document.toJson();}@Overridepublic TypeInformation<String> getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}).build();// 创建MongoDB数据流DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source");
//        env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "MongoDB-Source")
//                .setParallelism(2)
//                .print()
//                .setParallelism(1);// 配置KafkasinkKafkaSink<String> kafkaSink = KafkaSink.<String>builder().setBootstrapServers("localhost:9092")
//                .setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
//                .setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")// 如果你使用String类型的键.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")// 如果你使用byte[]类型的值.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(Constants.TOPIC_NAME).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 将数据流写入KafkasourceStream.sinkTo(kafkaSink);// 执行任务env.execute("MongoDB to Kafka");}
}

pom.xml

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-mongodb</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId></dependency>

KAFKA 2 FILE

从kafka获取数据写入到本地文件
代码:

public class KafkaToWriteText {public static void main(String[] args) throws Exception {// 1. 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();String brokers = "localhost:9092";KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers).setTopics(TOPIC_NAME).setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 创建RollingFileSinkString outputPath = "sink.csv";FileSink<String> sink = FileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMinutes(15)).withInactivityInterval(Duration.ofMinutes(5)).withMaxPartSize(MemorySize.ofMebiBytes(1024)).build()).build();rs.sinkTo(sink);// 6. 执行 Flink 作业env.execute("Kafka Flink Job");}
}

pom.xml

		<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId></dependency>

KAFKA 部署

  1. 下载地址:
    https://downloads.apache.org/kafka/3.7.0/kafka_2.12-3.7.0.tgz
  2. 运行zookeeper
# Start the ZooKeeper service
$ bin/zookeeper-server-start.sh config/zookeeper.properties
  1. 运行kafka
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
  1. 验证
# 接受信息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo --from-beginning
# 发送信息
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic TOPIC_WellCastingInfo

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/24466.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Diffusers代码学习: IP-Adapter

从操作的角度来看&#xff0c;IP-Adapter和图生图是很相似的&#xff0c;都是有一个原始的图片&#xff0c;加上提示词&#xff0c;生成目标图片。但它们的底层实现方式是完全不一样的&#xff0c;我们通过源码解读来看一下。以下是ip adapter的实现方式 # 以下代码为程序运行…

51单片机通过键盘输入数值,控制流水灯的方向和速度。

1、功能描述 通过键盘输入数值&#xff0c;控制流水灯的方向和速度。 2、实验原理 键盘输入原理&#xff1a; 键盘通常通过矩阵形式连接到单片机的I/O端口。当用户按下某个按键时&#xff0c;会改变键盘矩阵中对应行和列的电平&#xff0c;单片机通过检测这些变化来确定哪个按…

Python opencv读取深度图,网格化显示深度

效果图&#xff1a; 代码&#xff1a; import cv2 import osimg_path "./outdir/180m_norm_depth.png" depth_img cv2.imread(img_path, cv2.IMREAD_ANYDEPTH) filename os.path.basename(img_path) img_hig, img_wid depth_img.shape # (1080, 1920) print(de…

C# MemoryCache 缓存应用

摘要 缓存是一种非常常见的性能优化技术&#xff0c;在开发过程中经常会用到。.NET提供了内置的内存缓存类 MemoryCache&#xff0c;它可以很方便地存储数据并在后续的请求中快速读取&#xff0c;从而提高应用程序的响应速度。 正文 通过使用 Microsoft.Extensions.Caching.Me…

mqtt-emqx:设置遗嘱消息

【pom.xml】 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><version>2.3.12.RELEASE</version> </dependency> <dependency><groupId>org.eclipse…

OpenAI新成果揭秘语言模型神经活动:稀疏自编码器的前沿探索

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

手机自动化测试:6.某团文字的提取

我们要进行的操作重点就是文字的提取&#xff0c;然后循环&#xff0c;提取不是吗&#xff1f; try:# 使用XPath定位带有index属性的FrameLayout元素frame_layout_elements WebDriverWait(driver, timeout, poll_frequency).until(EC.presence_of_all_elements_located((By.X…

搜索之道:信息素养与终身学习的新引擎

&#x1f4d1;前言 在这个信息如同潮水般涌来的时代&#xff0c;我们每天都在与海量的数据和信息打交道。无论是学习、工作还是生活&#xff0c;我们都渴望能够迅速、准确地找到我们所需的信息。然而&#xff0c;面对如此繁杂的信息海洋&#xff0c;如何高效、精准地搜索到我们…

【C语言训练题库】扫雷->简单小游戏!

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 题目 2. 解析 3. 代码 4. 小结 1. 题目 小sun上课的时候非常喜欢玩扫雷。他现小sun有一个初始的雷矩阵&#xff0c;他希望你帮他生成一个扫雷矩阵。 扫雷…

Matplotlib常见图汇总

Matplotlib是python的一个画图库&#xff0c;便于数据可视化。 安装命令 pip install matplotlib 常用命令&#xff1a; 绘制直线&#xff0c;连接两个点 import matplotlib.pyplot as plt plt.plot([0,5],[2,4]) plt.show() 运行结果如下&#xff1a; 多条线&#xff1a;…

速盾:服务器cdn加速超时如何解决?

在当今互联网时代&#xff0c;网站内容加载速度成为用户体验的重要指标之一。然而&#xff0c;由于网络环境的复杂性和服务器的负载压力&#xff0c;服务器CDN加速超时问题经常会出现。在这篇文章中&#xff0c;我们将讨论服务器CDN加速超时的原因和解决方法。 首先&#xff0…

巨擘之舞:探索AI大模型的发展历程与特性比较

巨擘之舞&#xff1a;探索AI大模型的发展历程与特性比较 文章目录 巨擘之舞&#xff1a;探索AI大模型的发展历程与特性比较引言1. GPT系列&#xff08;Generative Pre-trained Transformer&#xff09;发展历程优点缺点 2. BERT&#xff08;Bidirectional Encoder Representati…

学习笔记——路由网络基础——汇总静态路由

4、汇总静态路由 (1)定义 静态路由汇总&#xff1a;多条静态路由都使用相同的送出接口或下一跳 IP 地址。(将多条路由汇总成一条路由表示) (2)目的 1.减少路由条目数量&#xff0c;减小路由表&#xff0c;加快查表速度 2.增加网络稳定性 (3)路由黑洞以及路由环路的产生…

京准科技 | PTP时钟服务器(卫星主时钟)在预审系统应用

京准科技 | PTP时钟服务器&#xff08;卫星主时钟&#xff09;在预审系统应用 京准科技 | PTP时钟服务器&#xff08;卫星主时钟&#xff09;在预审系统应用 某某省公安厅以科技强警建设的重要目标&#xff0c;决定建立全国第一个全省联网的信息化公安预审系统&#xff0c;本文…

LeetCode题练习与总结:三角形最小路径和--120

一、题目描述 给定一个三角形 triangle &#xff0c;找出自顶向下的最小路径和。 每一步只能移动到下一行中相邻的结点上。相邻的结点 在这里指的是 下标 与 上一层结点下标 相同或者等于 上一层结点下标 1 的两个结点。也就是说&#xff0c;如果正位于当前行的下标 i &…

一次改SQLMAP的操作

前言 sqlmap这个工具&#xff0c;相信各位大佬们都不陌生&#xff0c;但sqlmap虽好&#xff0c;也时常会有些实际存在但无法注入的地方&#xff0c;这时候就需要我们改它的配置了&#xff0c;今天就以本人遇到的事件进行阐述。 正文 确认注入点 通过一系列测试最终确定这里…

循环语句大揭秘:while、do-while、for、foreach你都掌握了吗?

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

招募来袭 | 与热爱技术的谷歌开发者一起创造精彩

写在前面 技术的进步在不断推动着世界发展。从 Android、Flutter 等产品的稳步更新迭代&#xff0c;到秉承着负责任的态度对 AI 进行探索&#xff0c;我们通过每一次的技术跃进&#xff0c;帮助大家打开新的视野&#xff0c;激发更多的灵感&#xff0c;将我们的工具和平台打造成…

css3 都有哪些新属性

1. css3 都有哪些新属性 1.1. 圆角边框 (border-radius)1.2. 盒子阴影 (box-shadow)1.3. 文本阴影 (text-shadow)1.4. 响应式设计相关属性1.5. 渐变背景 (gradient backgrounds)1.6. 透明度 (opacity 和 rgba/hsla)1.7. 多列布局 (column-count, column-gap, etc.)1.8. 变换 (t…

Android --- MVVM+DataBinding+Fragment+Retrofit+Adapter 简单示例

首先&#xff0c;我们将使用 Retrofit 进行网络请求&#xff0c;所以我们需要添加 Retrofit 的依赖。在你的 build.gradle 文件中添加以下依赖&#xff1a; implementation com.squareup.retrofit2:retrofit:2.9.0 implementation com.squareup.retrofit2:converter-gson:2.9.…