kafka集成flink api编写教程

1.引入依赖(pox.xml)

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.13.6</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.13.6</version></dependency></dependencies>

2.创建日志配置文件

把$FLINK_HOME/conf/log4j.properties 内容复制粘贴过来

# This affects logging for both user code and Flink
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = MainAppender# Uncomment this if you want to _only_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
logger.shaded_zookeeper.name = org.apache.flink.shaded.zookeeper3
logger.shaded_zookeeper.level = INFO# Log all infos in the given file
appender.main.name = MainAppender
appender.main.type = RollingFile
appender.main.append = true
appender.main.fileName = ${sys:log.file}
appender.main.filePattern = ${sys:log.file}.%i
appender.main.layout.type = PatternLayout
appender.main.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
appender.main.policies.type = Policies
appender.main.policies.size.type = SizeBasedTriggeringPolicy
appender.main.policies.size.size = 100MB
appender.main.policies.startup.type = OnStartupTriggeringPolicy
appender.main.strategy.type = DefaultRolloverStrategy
appender.main.strategy.max = ${env:MAX_LOG_FILE_NUMBER:-10}# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF

3.flink生产者api

package com.ljr.flink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;import java.util.ArrayList;
import java.util.Properties;public class MyFlinkKafkaProducer {//输入main tab 键 即创建入main 方法public static void main(String[] args) throws Exception {//1.获取环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//设置的槽数与分区相等env.setParallelism(3);//2.准备数据源ArrayList<String> wordlist = new ArrayList<>();wordlist.add("zhangsan");wordlist.add("lisi");DataStreamSource<String> stream = env.fromCollection(wordlist);//创建kafka生产者Properties pros = new Properties();pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("customers", new SimpleStringSchema(), pros);//3.添加数据源stream.addSink(kafkaProducer);//4.执行代码env.execute();}
}

运行;kafka消费者消费结果

4.flink消费者api

package com.ljr.flink;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;import java.util.Properties;public class MyFlinkKafkaConsumer {public static void main(String[] args) throws Exception {//1 初始化flink环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(3);//2 创建消费者Properties pros = new Properties();pros.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//pros.put(ConsumerConfig.GROUP_ID_CONFIG,"hh")FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("customers", new SimpleStringSchema(), pros);//3 关联消费者和flink流env.addSource(flinkKafkaConsumer).print();//4 执行env.execute();}
}

运行,用3中的生产者生产数据,消费结果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/25257.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++ | 拷贝赋值运算符函数】一文了解C++的 拷贝赋值运算符函数

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; ⏰发布时间⏰&#xff1a;2024-06-09 1…

对WEB标准以及W3C的理解与认识

Web标准简单来说可以分为结构&#xff0c;表现&#xff0c;行为&#xff1a; 结构&#xff08;HTML&#xff09;: HTML&#xff08;HyperText Markup Language&#xff09;定义了网页的结构和内容。它通过各种标签来组织信息&#xff0c;如标题、段落、图像、链接等。HTML 提供…

antd DatePicker 日期 与 时间 分开选择

自定义组件 import { DatePicker } from "antd"; import dayjs from "dayjs"; import { FC, useRef } from "react";/*** 日期 与 时间 分开选择** 版本号: * "antd": "^5.17.4",* "dayjs": "^1.11.11"…

树莓派debain 12更换apt-get源到阿里源

1、备份 总共需要备份两个文件 a、/etc/apt/sources.list.d/raspi.list b、/etc/apt/sources.list 2、删除上述两个文件内到所有内容&#xff0c;然后添加如下内容 /etc/apt/sources.list.d/raspi.list deb https://mirrors.aliyun.com/debian/ bookworm main non-free non…

给gRPC增加负载均衡功能

在现代的分布式系统中&#xff0c;负载均衡是确保服务高可用性和性能的关键技术之一。而gRPC作为一种高性能的RPC框架&#xff0c;自然也支持负载均衡功能。本文将探讨如何为gRPC服务增加负载均衡功能&#xff0c;从而提高系统的性能和可扩展性。 什么是负载均衡&#xff1f; …

域名的端口号范围

域名的端口号范围是从0到65535。这些端口可以大致分为两类&#xff1a; 知名端口&#xff08;Well-Known Ports&#xff09;&#xff1a;范围从0到1023。这些端口号一般固定分配给一些服务&#xff0c;如21端口分配给FTP服务&#xff0c;25端口分配给SMTP&#xff08;简单邮件…

新手如何学习编程!

选择编程语言&#xff1a;根据你的兴趣和目标选择一门编程语言。例如&#xff0c;Python 适合初学者和数据科学&#xff0c;JavaScript 适合网页开发&#xff0c;Java 和 C# 适合企业级应用。 理解基本概念&#xff1a;学习编程的基本概念&#xff0c;如变量、数据类型、控制结…

Ansible——stat模块

目录 参数总结 返回值 基础语法 常见的命令行示例 示例1&#xff1a;检查文件是否存在 示例2&#xff1a;获取文件详细信息 示例3&#xff1a;检查目录是否存在 示例4&#xff1a;获取文件的 MD5 校验和 示例5&#xff1a;获取文件的 MIME 类型 高级使用 示例6&…

[leetcode]longest-common-prefix 最长公共前缀

. - 力扣&#xff08;LeetCode&#xff09; 编写一个函数来查找字符串数组中的最长公共前缀。 如果不存在公共前缀&#xff0c;返回空字符串 ""。 示例 1&#xff1a; 输入&#xff1a;strs ["flower","flow","flight"] 输出&…

第52集《摄大乘论》

请大家打开《讲义》第一七二页&#xff0c;戊七、辨修圆满。 前一科我们讲到观照力。这观照力&#xff0c;六波罗蜜多里面的观照力&#xff0c;是观照我空、法空的真如理&#xff0c;使令内心能够得到安住&#xff1b;另外在六波罗蜜多以外&#xff0c;又开出四种波罗蜜多&…

03 Linux 内核数据结构

Linux kernel 有四种重要的数据结构:链表、队列、映射、二叉树。普通驱动开发者只需要掌握链表和队列即可。 链表和队列 Linux 内核都有完整的实现,我们不需要深究其实现原理,只需要会使用 API 接口即可。 1、链表 链表是 Linux 内核中最简单、最普通的数据结构。链表是一…

19082 中位特征值

【2022】贝壳找房秋招测试开发工程师笔试卷2 给你一棵以T为根&#xff0c;有n个节点的树。&#xff08;n为奇数&#xff09;每个点有一个价值V&#xff0c;并且每个点有一个特征值P。 每个点的特征值P为&#xff1a;以这个点为根的子树的所有点&#xff08;包括根&#xff09;…

C#面:应⽤程序池集成模式和经典模式的区别

C# 应用程序池是用于托管和执行应用程序的进程。在 IIS&#xff08;Internet Information Services&#xff09;中&#xff0c;C# 应用程序池有两种集成模式&#xff1a;集成模式和经典模式。 集成模式&#xff08;Integrated Mode&#xff09;&#xff1a; 集成模式是 IIS 7…

深度网络及经典网络简介

深度网络及经典网络简介 导语加深网络一个更深的CNN提高识别精度Data Augmentation 层的加深 经典网络VGGGoogLeNetResNet 高速学习迁移学习GPU分布式学习计算位缩减 强化学习总结参考文献 导语 深度学习简单来说&#xff0c;就是加深了层数的神经网络&#xff0c;前面已经提到…

Java:110-SpringMVC的底层原理(上篇)

SpringMVC的底层原理 在前面我们学习了SpringMVC的使用&#xff08;67章博客开始&#xff09;&#xff0c;现在开始说明他的原理&#xff08;实际上更多的细节只存在67章博客中&#xff0c;这篇博客只是讲一点深度&#xff0c;重复的东西尽量少说明点&#xff09; MVC 体系结…

深入理解指针(三)

一、指针运算 1.1指针-整数 下面我们来看一个指针加整数的例子&#xff1a; #include<stdio.h> int main() { int arr[10] { 1,2,3,4,5,6,7,8,9,10 }; int* p &arr[0]; int i 0; int sz sizeof(arr) / sizeof(arr[0]); for (i 0; i < …

Netty原理与实战

1.为什么选择Netty&#xff1f; 高性能低延迟 事件分发器&#xff1a; reactor采用同步IO&#xff0c;Proactor采用异步IO 网络框架选型&#xff1a; 2.Netty整体架构设计&#xff08;4.X&#xff09; 三个模块&#xff1a;Core核心层、Protocal Support协议支持层、…

leetcode:不同的二叉树

class Solution { public:int numTrees(int n) {vector<int> dp(n1);dp[0] 1;dp[1] 1;for(int i 2;i < n;i){for(int j 1;j < i;j) // 当根节点为j时{dp[i] dp[j-1] * dp[i-j];}}return dp[n];} }; /* dp[i] i个不同的数组成的二叉搜索数的个数假设 i 5当根…

IDEA 连接GitHub仓库并上传项目(同时解决SSH问题)

目录 1 确认自己电脑上已经安装好Git 2 添加GitHub账号 2.1 Setting -> 搜索GitHub-> ‘’ -> Log In with Token 2.2 点击Generate 去GitHub生成Token 2.3 勾选SSH后其他不变直接生成token 2.4 然后复制token添加登录账号即可 3 点击导航栏中VCS -> Create…

低压电工参考资料题

一. 单选题 1.电路一般都是由电源.负载.中间环节( C)基本部分组成的。 答:电路一般都是由电源.负载.开关.导线组成。 A二个 B