Kafka与Flink的整合 -- sink、source

1、首先导入依赖:
        <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.15.2</version></dependency>
2、 source:Flink从Kafka中读取数据
public class Demo01KafkaSource {public static void main(String[] args) throws Exception{//构建环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//构建kafka source 环境KafkaSource<String> source = KafkaSource.<String>builder()//指定broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定topic.setTopics("bigdata")//指定消费组.setGroupId("my-group")//指定数据的读取的位置,earliest指的是读取最早的数据,latest:指定的读取的是最新的数据.setStartingOffsets(OffsetsInitializer.earliest())//读取数据格式:.setValueOnlyDeserializer(new SimpleStringSchema()).build();//使用kafka数据源DataStreamSource<String> kafkaSourceDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");kafkaSourceDS.print();//启动flinkenv.execute();}
}
        启动生产kafka:
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic bigdata
3、sink:Flink向Kafka中写入数据
public class Demo02KafkaSink {public static void main(String[] args) throws Exception{//构建flink的环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//读取数据文件:DataStreamSource<String> studentDS = env.readTextFile("flink/data/students.txt");//创建kafka sinkKafkaSink<String> sink = KafkaSink.<String>builder()//指定flink broker列表.setBootstrapServers("master:9092,node1:9092,node2:9092")//指定数据的格式:.setRecordSerializer(KafkaRecordSerializationSchema.builder()//指定topic,如果topic不存在就会自动的创建一个分区是1个副本是1个的topic.setTopic("student")//指定数据的格式.setValueSerializationSchema(new SimpleStringSchema()).build())//指定数据处理的语义:.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();//执行flinkstudentDS.sinkTo(sink);//构建flink环境env.execute();}
}
        启动消费kafka:
kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic student

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/136781.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记录:Unity脚本的编写5.0

目录 前言创建动画Unity Animation、Animator常用类关于两者的区别Animator 编写脚本 大型连续剧之在untiy中&#xff08;或者别的什么活动&#xff09; 前言 之前在场景中添加了背景音乐&#xff0c;而在我们的日常的体验中&#xff0c;可以发现游戏或者场景中有很多有趣的动…

java通过FTP跨服务器动态监听读取指定目录下文件数据

背景&#xff1a; 1、文件数据在A服务器&#xff08;windows&#xff09;&#xff08;不定期在指定目录下生成&#xff09;&#xff0c;项目应用部署在B服务器&#xff08;Linux&#xff09;&#xff1b; 2、项目应用在B服务器&#xff0c;监听A服务器指定目录&#xff0c;有新…

网络安全之CSRF漏洞原理和实战,以及CSRF漏洞防护方法

一、引言 总体来说CSRF属于一种欺骗行为&#xff0c;是一种针对网站的恶意利用&#xff0c;尽管听起来像跨站脚本&#xff08;XSS&#xff09;&#xff0c;但是与XSS非常不同&#xff0c;并且攻击方式几乎向佐。XSS利用站点内的信任用户&#xff0c;而CSRF则通过伪装来自受信任…

【Linux】补充:进程管理之手动控制进程,以及计划任务

目录 一、手动启动进程 1、理解前台启动与后台启动 2、如何完成前台启动后台启动的切换 3、完成并行执行多个任务 4、结束进程 1、kill 2、killall 2、pkill 二、计划任务 1、at一次性计划任务 2、实操 2、周期性计划任务 1、关于设置周期性任务的配置文件以及格式…

MySQL数据库的表操作

1.创建表 1.1一般格式 create table table_name( Field1 datatype, Field2 datatype, Field3 datatype ) charset 字符集名 collate 校验规则 engine 存储引擎; 解释&#xff1a; Field &#xff1a; 表示列名datatype&#xff1a; 表示列的类型 charset 字符集&#xff1a;…

EOCR-3E420,3EZ,3DE电机保护器与变频器配合使用的方法

上海韩施电气自动化设备有限公司提供 在工业现场中&#xff0c;电动机的起动与运行很多时候需要变频器参与其中&#xff0c;以达到降低电机维护成本、增加电机寿命的目的。采用变频器运转时&#xff0c;随着电机的加速相应提高频率和电压&#xff0c;起动电流被限制在 150%额定…

小程序 打开方式 页面效果 表单页面 点击跳到详情页 图标 获取后台数据 进行页面渲染

请求地址&#xff1a;geecg-uniapp 同源策略 数据请求 获取后台数据 ui库安装 冲突解决&#xff08;3&#xff09;-CSDN博客 一.uniapp转小程序 (1) 运行微信开发工具 &#xff08;2&#xff09; 配置id 然后运行 打开小程序 路径 E:\通\uniapp-jeecg\unpackage\dist\d…

linux之进程控制

进程创建&fork函数 fork函数之前就已经提到,它从已存在进程中创建一个新进程,新进程为子进程,而原进程为父进程。 调用接口&#xff1a;fork() 头文件&#xff1a;unistd.h 功能&#xff1a;创建一个子进程&#xff0c;给子进程返回0&#xff0c;父进程返回子进程pid …

聊聊logback的StatusManager

序 本文主要研究一下logback的StatusManager StatusManager ch/qos/logback/core/status/StatusManager.java public interface StatusManager {/*** Add a new status message.* * param status*/void add(Status status);/*** Obtain a copy of the status list maintain…

ChatGPT付费创作系统V2.4.9独立版 +WEB端+ H5端 + 小程序端系统测试安装教程

播资源提供的GPT付费体验系统最新版系统是一款基于ThinkPHP框架开发的AI问答小程序&#xff0c;是基于国外很火的ChatGPT进行开发的Ai智能问答小程序。当前全民热议ChatGPT&#xff0c;流量超级大&#xff0c;引流不要太简单&#xff01;一键下单即可拥有自己的GPT&#xff01;…

【计算机网络笔记】网络层服务模型——数据报网络

系列文章目录 什么是计算机网络&#xff1f; 什么是网络协议&#xff1f; 计算机网络的结构 数据交换之电路交换 数据交换之报文交换和分组交换 分组交换 vs 电路交换 计算机网络性能&#xff08;1&#xff09;——速率、带宽、延迟 计算机网络性能&#xff08;2&#xff09;…

掌动智能:UI自动化测试工具的五大功能

在现代软件开发中&#xff0c;保证应用程序的质量和性能至关重要。UI自动化测试工具是一种关键的资源&#xff0c;它们能够有效地检查应用程序的用户界面&#xff0c;确保它们在各种情况下都能正常运行。本文将探讨UI自动化测试工具的功能有哪些! UI自动化测试工具的五大功能&a…

性价比高的照明品牌,五款经济实惠的照明品牌推荐

很多家长有时候会说孩子觉得家里的台灯灯光刺眼&#xff0c;看书看久了就不舒服。这不仅要看光线亮度是否柔和&#xff0c;还要考虑台灯是不是有做遮光式设计。没有遮光式设计的台灯&#xff0c;光源外露&#xff0c;灯光会直射孩子头部&#xff0c;孩子视线较低&#xff0c;很…

基于鱼鹰算法的无人机航迹规划-附代码

基于鱼鹰算法的无人机航迹规划 文章目录 基于鱼鹰算法的无人机航迹规划1.鱼鹰搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用鱼鹰算法来优化无人机航迹规划。 1.鱼鹰搜索算法 …

ubuntu20.04 MYNTEYE S 相机运行与标定记录

ubuntu20.04 MYNTEYE S 相机运行与标定记录 环境 ubuntu20.04 opencv3.3.1 硬件 mynteye S1030 OpenCV 3.4.3 安装 Jetson Nano小觅相机(MYNT EYE S)开发调试指南 mkdir -p ~/tools/opencv cd ~/tools/opencvgit clone https://github.com/opencv/opencv.git cd opencv/…

Easypoi map方式导入数据 ,List<Map<String, String>> 日期项数据为空(null)解决办法

目录 前言解决办法 前言 在使用easypoi map的方式解析excel文件&#xff0c;若文件中的某列数据格式是日期类型&#xff0c;那么它这个工具是读取不到&#xff0c;因为它的源码读取到某列为日期格式&#xff0c;数据必须为字符串类型&#xff0c;它才会处理 switch (cell.get…

Linux - 实现一个简单的 shell

前言 之前我们对进程的替换&#xff0c;进程地址空间等等的概念进行了说明&#xff0c;本篇博客会基于这些知识点来 实现一个简单的 shell &#xff0c;如有疑问&#xff0c;可以参考下述博客&#xff1a;Linux - 进程程序替换 - C/C 如何实现与各个语言之间的相互调用 - 替换…

flutter实践:慎用Expanded

问题&#xff1a;在一个Android原生的弹框里显示flutter view,由于使用了Expanded导致组件未显示出来 最神奇的地方在于debug调试模式显示正常&#xff0c;然后用release版本发布时怎么都显示不出来&#xff0c;还导致点击后无响应ANR 问题代码&#xff1a; child: Stateful…

【服务器学习】 iomanager IO协程调度模块

iomanager IO协程调度模块 以下是从sylar服务器中学的&#xff0c;对其的复习&#xff1b; 参考资料 继承自协程调度器&#xff0c;封装了epoll&#xff0c;支持为socket fd注册读写事件回调函数 IO协程调度还解决了调度器在idle状态下忙等待导致CPU占用率高的问题。IO协程调…

后台管理系统解决方案-中大型-Vben Admin

后台管理系统解决方案-中大型-Vben Admin 官网 Vben Admin 在线演示 Vben Admin 为什么选择它 github现有20K星&#xff0c;并且它有个可视化生成表单&#xff0c;我很喜欢 快速开始 # 拉取代码 git clone https://github.com/vbenjs/vue-vben-admin-doc# 安装依赖 yarn#…