kafka集成flink api编写教程

1.引入依赖(pox.xml)

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.6</version></dependency></dependencies>

2.创建日志配置文件

把$FLINK_HOME/conf/log4j.properties 内容复制粘贴过来

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

3.flink生产者api

package com.ljr.flink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.ArrayList;
import java.util.Properties;public class MyFlinkKafkaProducer {//输入main tab 键 即创建入main 方法public static void main(String[] args) throws Exception {//1.获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置的槽数与分区相等env.setParallelism(3);//2.准备数据源ArrayList<String> wordlist = new ArrayList<>();wordlist.add("zhangsan");wordlist.add("lisi");DataStreamSource<String> stream = env.fromCollection(wordlist);//创建kafka生产者Properties pros = new Properties();pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("customers", new SimpleStringSchema(), pros);//3.添加数据源stream.addSink(kafkaProducer);//4.执行代码env.execute();}
}

运行;kafka消费者消费结果

4.flink消费者api

package com.ljr.flink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;public class MyFlinkKafkaConsumer {public static void main(String[] args) throws Exception {//1 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//2 创建消费者Properties pros = new Properties();pros.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//pros.put(ConsumerConfig.GROUP_ID_CONFIG,"hh")FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("customers", new SimpleStringSchema(), pros);//3 关联消费者和flink流env.addSource(flinkKafkaConsumer).print();//4 执行env.execute();}
}

运行,用3中的生产者生产数据,消费结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25257.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++ | 拷贝赋值运算符函数】一文了解C++的 拷贝赋值运算符函数

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; ⏰发布时间⏰&#xff1a;2024-06-09 1…

深度网络及经典网络简介

深度网络及经典网络简介 导语加深网络一个更深的CNN提高识别精度Data Augmentation 层的加深 经典网络VGGGoogLeNetResNet 高速学习迁移学习GPU分布式学习计算位缩减 强化学习总结参考文献 导语 深度学习简单来说&#xff0c;就是加深了层数的神经网络&#xff0c;前面已经提到…

Java:110-SpringMVC的底层原理(上篇)

SpringMVC的底层原理 在前面我们学习了SpringMVC的使用&#xff08;67章博客开始&#xff09;&#xff0c;现在开始说明他的原理&#xff08;实际上更多的细节只存在67章博客中&#xff0c;这篇博客只是讲一点深度&#xff0c;重复的东西尽量少说明点&#xff09; MVC 体系结…

深入理解指针(三)

一、指针运算 1.1指针-整数 下面我们来看一个指针加整数的例子&#xff1a; #include<stdio.h> int main() { int arr[10] { 1,2,3,4,5,6,7,8,9,10 }; int* p &arr[0]; int i 0; int sz sizeof(arr) / sizeof(arr[0]); for (i 0; i < …

Netty原理与实战

1.为什么选择Netty&#xff1f; 高性能低延迟 事件分发器&#xff1a; reactor采用同步IO&#xff0c;Proactor采用异步IO 网络框架选型&#xff1a; 2.Netty整体架构设计&#xff08;4.X&#xff09; 三个模块&#xff1a;Core核心层、Protocal Support协议支持层、…

leetcode:不同的二叉树

class Solution { public:int numTrees(int n) {vector<int> dp(n1);dp[0] 1;dp[1] 1;for(int i 2;i < n;i){for(int j 1;j < i;j) // 当根节点为j时{dp[i] dp[j-1] * dp[i-j];}}return dp[n];} }; /* dp[i] i个不同的数组成的二叉搜索数的个数假设 i 5当根…

IDEA 连接GitHub仓库并上传项目(同时解决SSH问题)

目录 1 确认自己电脑上已经安装好Git 2 添加GitHub账号 2.1 Setting -> 搜索GitHub-> ‘’ -> Log In with Token 2.2 点击Generate 去GitHub生成Token 2.3 勾选SSH后其他不变直接生成token 2.4 然后复制token添加登录账号即可 3 点击导航栏中VCS -> Create…

Python Flask实现蓝图Blueprint配置和模块渲染

Python基础学习&#xff1a; Pyhton 语法基础Python 变量Python控制流Python 函数与类Python Exception处理Python 文件操作Python 日期与时间Python Socket的使用Python 模块Python 魔法方法与属性 Flask基础学习&#xff1a; Python中如何选择Web开发框架&#xff1f;Pyth…

(Proteus仿真设计)基于51单片机的电梯程序控制系统

&#xff08;Proteus仿真设计&#xff09;基于51单片机的电梯程序控制系统 一.项目介绍 本设计模拟的是一个五层的&#xff0c;各楼层间隔为4.5m的电梯程序控制系统&#xff0c;能够完成各楼层乘客的接送任务。形象地说&#xff0c;就是要对不同楼层乘客的不同需求&#xff0…

学习Canvas过程中2D的方法、注释及感悟一(通俗易懂)

1.了解Canvas&#xff1a; Canvas是前端一个很重要的知识点&#xff0c;<canvas>标签用于创建画布绘制图形&#xff0c;通过JavaScript进行操作。它为开发者提供一个动态绘制图形的区域&#xff0c;用于创建图标、游戏动画、图像处理等。 对于能够熟练使用Canvas的开发者…

星舰四飞成功!SpaceX 今年还要飞 4 次?星舰未来 10 年规划展望

SpaceX 的星舰&#xff08;Starship&#xff09;项目一直备受瞩目&#xff0c;最近的第四次试飞再次引发了全球关注。本文将详细回顾星舰第四次发射的成功经验&#xff0c;并探讨其未来的十年规划。 一、引言 星舰是 SpaceX 研制的下一代重型运载火箭系统&#xff0c;旨在实现…

苍穹外卖笔记-06-菜品管理-菜品分类,公共字段填充

菜品分类 1 菜品分类模块1.1 需求分析与设计1.1.1 产品原型1.1.2 接口设计1.1.3 表设计 1.3 代码实现1.4 测试分类分页查询启用禁用分类修改分类信息新增菜品分类删除菜品分类 2 公共字段自动填充2.1 问题分析2.2 实现思路自定义注解AutoFill自定义切面AutoFillAspectMapper接口…

LeetCode338比特位计数

题目描述 给你一个整数 n &#xff0c;对于 0 < i < n 中的每个 i &#xff0c;计算其二进制表示中 1 的个数 &#xff0c;返回一个长度为 n 1 的数组 ans 作为答案。 解析 动态规划&#xff0c;将当前的数的最后一位去掉&#xff0c;然后判断去掉的最后一位是0还是1。…

Qwen2来了

Qwen2整体介绍 Qwen2开源模型下载 Demo使用 Git 官方使用文档 变化 1、增大了上下文长度支持&#xff0c;Qwen2-72B-Instruct支持128K tokens&#xff0c;并且处理完美 2、代码和数学能力显著提升 3、多个评测基准上的领先表现 4、中英之外增加了27种语言相关的高质量…

CNCF项目全景图介绍

本文首发在个人博客上&#xff0c;欢迎来踩&#xff01; 云原生计算基金会&#xff08;CNCF&#xff09;介绍 CNCF(Cloud Native Computing Foundation)官网链接&#xff1a;https://www.cncf.io/ 官方的介绍如下&#xff1a; 云原生技术有利于各组织在公有云、私有云和混合…

Transformer论文精读

Transformer&#xff1a;Attention is all you need Abstract&#xff1a; 在主流的序列转录模型&#xff08;sequence transduction models&#xff1a;给一个序列&#xff0c;生成另一个序列&#xff09;&#xff0c;主要依赖循环或者卷积神经网络&#xff0c;一般是用enco…

Buildroot和Debian文件系统修改方法

本文档主要介绍在没有编译环境的情况下&#xff0c;如何修改buildroot和debian文件系统方法&#xff0c;如在buildroot文件系统中添加文件、修改目录等文件操作&#xff0c;在debian文件系统中&#xff0c;安装软件库、工具、扩大文件系统空间等等操作。 1.Debian文件系统 …

算法 | hbut期末复习笔记

贪心选择策略&#xff1a;所求问题的整体最优解可以通过一系列局部最优的选择&#xff08;贪心选择&#xff09;得到 最优子结构&#xff1a;问题的最优解包括了其子问题的最优解 回溯法&#xff1a;具有限界函数的深度优先搜索法 回溯法的解空间&#xff1a;子集树&排列…

全新抖音快手小红书视频解析去水印系统网站源码

这个系统支持几十种平台&#xff0c;包括抖音、快手小红书以及其他热门社交媒体平台。它可以帮助轻松地下载这些平台上的任何视频&#xff0c;并去除其中的水印&#xff0c;让你可以自由地保存和分享这些视频。 使用方法&#xff1a; 上传压缩包解压&#xff0c;网站信息在inc…

【JAVASE】面向对象编程综合案例--------模仿电影信息系统

需求&#xff1a; &#xff08;1&#xff09;展示系统中的全部电影&#xff08;每部电影展示&#xff1a;名称、价格&#xff09; &#xff08;2&#xff09;允许用户根据电影编号&#xff08;ID&#xff09;查询出某个电影的详细信息。 目标&#xff1a;使用所学的面向对象…