Flink Kafka获取数据写入到MongoDB中 样例

简述

Apache Flink 是一个流处理和批处理的开源框架,它允许从各种数据源(如 Kafka)读取数据,处理数据,然后将数据写入到不同的目标系统(如 MongoDB)。以下是一个简化的流程,描述如何使用 Flink 从 Kafka 读取数据并保存到 MongoDB:

1、环境准备

  • 安装并配置 Apache Flink。
  • 安装并配置 Apache Kafka。
  • 安装并配置 MongoDB。
  • 创建一个 Kafka 主题,并发送一些测试数据。
  • 确保 Flink 可以连接到 Kafka 和 MongoDB。

部署参考:
1、flink:Flink 部署执行模式
2、kafka:Flink mongo & Kafka
3、mongoDb:mongo副本集本地部署

2. 添加依赖

在Flink 项目中,需要添加 Kafka 和 MongoDB 的连接器依赖。对于 Maven 项目,可以在 pom.xml 文件中添加相应的依赖。
对于 Kafka,需要添加 Flink Kafka Connector 的依赖。
对于 MongoDB,需要添加 Flink MongoDB Sink 的依赖。

3. 编写 Flink 作业

* 创建一个 Flink 作业,使用 Flink 的 `FlinkKafkaConsumer` 从 Kafka 主题中读取数据。  
* 对读取的数据进行必要的转换或处理。  
* 使用 MongoDB 的 Java 驱动程序或第三方库将处理后的数据写入 MongoDB。

4. 运行 Flink 作业

使用 Flink 的命令行工具或 IDE 运行 Flink 作业。确保 Kafka 和 MongoDB 正在运行,并且 Flink 可以访问它们。

参考:Flink 命令行提交、展示和取消作业

5. 监控和调试

使用 Flink 的 Web UI 或其他监控工具来监控作业。如果出现问题,检查日志并进行调试。

6. 优化和扩展

根据需求和数据量,优化 Flink 作业的性能和可扩展性。这可能包括调整并行度、增加资源、优化数据处理逻辑等。

代码

package com.wfg.flink.connector.kafka;import com.mongodb.client.model.InsertOneModel;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.bson.BsonDocument;import static com.wfg.flink.connector.constants.Constants.KAFKA_BROKERS;
import static com.wfg.flink.connector.constants.Constants.TEST_TOPIC_PV;/*** @author wfg*/
public class KafkaToWriteMongo {public static void main(String[] args) throws Exception {// 1. 设置 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(KAFKA_BROKERS).setTopics(TEST_TOPIC_PV).setGroupId("my-test-topic-pv").setStartingOffsets(OffsetsInitializer.latest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> rs = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");// 创建RollingFileSinkMongoSink<String> sink = MongoSink.<String>builder().setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin").setDatabase("sjzz").setCollection("TestMongoPv").setMaxRetries(3)
//                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setSerializationSchema((input, context) -> {System.out.println(input);return new InsertOneModel<>(BsonDocument.parse(input));}).build();rs.sinkTo(sink);// 6. 执行 Flink 作业env.execute("Kafka Flink Job");}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/29646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Vue61-消息订阅与发布-任意组件之间的通信

一、原理图 原生的JS不能实现订阅与发布&#xff0c;要借助第三方库&#xff1a;pubsub-js&#xff08;任何一个框架都能用&#xff01;&#xff09; 二、案例实现 school组件&#xff0c;需要数据&#xff08;订阅消息&#xff09;&#xff0c;student组件提供数据&#xff0…

Linux中的EINTR和EAGAIN错误码

Linux中的EINTR和EAGAIN错误码 在Linux系统中&#xff0c;进行系统调用时经常会遇到各种错误码。其中&#xff0c;EINTR&#xff08;Interrupted system call&#xff09;和EAGAIN&#xff08;Resource temporarily unavailable&#xff09;是两个较为常见的错误码&#xff0c…

MySQL 高级 - 第十二章 | 数据库的设计规范

目录 第十二章 数据库的设计规范12.1 为什么需要数据库设计12.2 范式12.2.1 范式简介12.2.2 范式都包括哪些12.2.3 键和相关属性的概念12.2.4 第一范式&#xff08;1st NF&#xff09;12.2.5 第二范式&#xff08;2nd NF&#xff09;12.2.6 第三范式&#xff08;3rd NF&#xf…

JWT详解、JWTUtil工具类的构建方法

一、前言 使用一些用户不友好的项目时&#xff0c;会发现&#xff0c;每一次进入网站&#xff0c;我们都要重新登录。 这是为什么呢&#xff1f; 现代多采用前后端分离的项目架构&#xff0c;这种架构&#xff0c;前后端使用不同的服务器&#xff0c;两个服务器上存储的信息不…

onnx进阶算子优化

一、定义 如何保证pytorch 模型顺利转为onnx. 前言pytorch 算子是如何与onnx 算子对齐的&#xff1f;Asinh 算子出现于第 9 个 ONNX 算子集。PyTorch 在 9 号版本的符号表文件中是怎样支持这个算子的&#xff1f;BitShift 算子出现于第11个 ONNX 算子集。PyTorch 在 11 号版本…

事务AOP

事物管理 事务管理是指对一系列数据库操作进行管理&#xff0c;确保这些操作要么全部成功执行&#xff0c;要么在遇到错误时全部回滚&#xff0c;以维护数据的一致性和完整性。在多用户并发操作和大数据处理的现代软件开发领域中&#xff0c;事务管理已成为确保数据一致性和完…

链表相对于数组的优势,以及栈和队列的基本操作

链表&#xff08;Linked List&#xff09;和数组&#xff08;Array&#xff09;是两种常见的数据结构&#xff0c;它们各自在不同的场景下有其优势和劣势。链表相对于数组的优势主要体现在以下几个方面&#xff1a; 动态大小&#xff1a; 链表在插入和删除元素时&#xff0c;不…

4M-21:霸气侧漏高效的20+多模态AI模型

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调或者LLM背后的基础模型重新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则…

软件设计不是CRUD(22):在流式数据处理系统中进行业务抽象落地——设计思考

(接上文《软件设计不是CRUD(21):在流式数据处理系统中进行业务抽象落地——需求分析》) 那么思考到这里我们就能做一些关于设计思路的总结: 每一个独立的数据处理流,就是数据采集系统中的一个功能。这个功能具备一个静态的控制逻辑(当然控制逻辑也可以是动态的,本文不…

嵌入式技术学习——c51单片机——蜂鸣器

一、蜂鸣器介绍 蜂鸣器时一种将电信号转化成声音信号的器件&#xff0c;常用来产生设备的按键音&#xff0c;报警音等提示信号。 蜂鸣器分为有源蜂鸣器&#xff0c;无源蜂鸣器 。 有源蜂鸣器&#xff1a;内部自带震荡源&#xff0c;将正负极街上直流电压即可持续发声&#x…

通过摄像头检测步频

通过摄像头识别运动频率&#xff0c;比如步频。 打开摄像头 循环读取视频帧 灰度转换 统计中间一行数值和 人在摄像头前运动&#xff0c;该数值会呈现周期变化 通过快速傅里叶转换&#xff0c;将和步频相似频率显示出来。 &#xff08;尝试人脸检测&#xff0c;跟着人脸位置变…

深度学习(十)——神经网络:非线性激活

一、Padding Layers简介 nn.ZeroPad2d&#xff1a;在输入的tensor数据类型周围用0进行填充 nn.ConstantPad2d&#xff1a;在输入的tensor数据类型周围用常数进行填充 这个函数的主要作用是对输入的图像进行填充&#xff0c;但里面所有功能都能用nn.Conv2d实现。 二、Non-li…

一文读懂OpenGVLab带来的最新视觉预训练框架

大模型技术论文不断&#xff0c;每个月总会新增上千篇。本专栏精选论文重点解读&#xff0c;主题还是围绕着行业实践和工程量产。若在某个环节出现卡点&#xff0c;可以回到大模型必备腔调或者LLM背后的基础模型重新阅读。而最新科技&#xff08;Mamba,xLSTM,KAN&#xff09;则…

为什么直接用 cv2.imwrite 保存 PIL 的图片会导致奇怪的颜色?

在处理图像保存时&#xff0c;使用不同的库可能会导致颜色显示上的差异。特别是 Image.fromarray(synthesis).save 和 cv2.imwrite(save_dir, image) 两种方法之间的区别&#xff0c;会导致保存的图像颜色不同。这篇博客将解释这些方法的区别&#xff0c;以及具体导致颜色差异的…

.NET周刊【6月第3期 2024-06-18】

国内文章 记一次 .NET某游戏币自助机后端 内存暴涨分析 https://www.cnblogs.com/huangxincheng/p/18243233 文章讨论了程序中非托管内存暴涨的问题。作者描述了友人发现内存问题并请他帮助分析的背景&#xff0c;利用WinDbg工具分析Linux平台上的内存泄漏情况。文章介绍了如…

Maven POM:掌握项目对象模型的艺术

Maven POM&#xff1a;掌握项目对象模型的艺术 1. 引言 Maven&#xff0c;作为一个强大的项目管理和构建自动化工具&#xff0c;已经成为了Java社区中不可或缺的一部分。在Maven的世界里&#xff0c;POM&#xff08;Project Object Model&#xff0c;项目对象模型&#xff09…

N32G031 DMA

目录 N32G031 DMA概述 DMA主要特点 DMA总线架构 DMA使用场景 DMA配置和使用 优点&#xff1a; 缺点&#xff1a; N32G031 DMA概述 N32G031系列芯片基于32位ARM Cortex-M0微控制器&#xff0c;其内置了DMA&#xff08;直接内存访问&#xff09;控制器。DMA控制器允许数据…

潮玩宇宙大逃杀APP系统开发成品案例分享指南

这是一款多人游戏&#xff0c;玩家需要选择一个房间躲避杀手。满足人数后&#xff0c;杀手会随机挑选一个房间杀掉里面所有的参与者&#xff0c;其他房间的幸存者将平均瓜分被杀房间的元宝。玩家在选中房间后&#xff0c;倒计时结束前可以自由切换不同房间。 软件项目开发成品…

LabVIEW开发为什么沟通需求非常重要

在LabVIEW开发项目中&#xff0c;需求沟通是项目成功的基石。以下是需求沟通的重要性及其原因&#xff1a; 明确项目目标&#xff1a; 定义清晰的目标&#xff1a;通过与用户的沟通&#xff0c;可以明确项目的目标和范围&#xff0c;确保开发团队理解用户的实际需求&#xff0c…

【Android-Compose】流式布局FlowRow 不能居中对齐的一种解决办法

问题描述&#xff1a; 在安卓Compose 开发中使用LazyColumn 流式布局 FlowRow 有时候比延迟网格布局更灵活&#xff0c;但是也可能出现自动流向下一行之后&#xff0c;末尾处留下一些小空白。如图&#xff1a; 问题解决&#xff1a; 为了尽可能居中对齐&#xff0c;我们可…