centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka

c94060866227a5cb0c61925fd714c285.png

前言

之前文章 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用到了 Flink 自带的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一种情况,那么如果我们有多个地方需要这份通过 Flink 转换后的数据,是不是又要我们继续写个 sink 的插件呢?确实,所以 Flink 里面就默认支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那么这篇文章我们就讲讲如何将数据写入到 Kafka。

08dde176b41ce7f88193fd3cea4bcd98.png

准备

添加依赖

Flink 里面支持 Kafka 0.8、0.9、0.10、0.11 ,以后有时间可以分析下源码的实现。

71709041651c345e7154f58a55db3ec8.png

这里我们需要安装下 Kafka,请对应添加对应的 Flink Kafka connector 依赖的版本,这里我们使用的是 0.11 版本:

<dependency>

Kafka 安装

这里就不写这块内容了,可以参考我以前的文章 Kafka 安装及快速入门。

这里我们演示把其他 Kafka 集群中 topic 数据原样写入到自己本地起的 Kafka 中去。

配置文件

kafka.brokers=xxx:9092,xxx:9092,xxx:9092
kafka.group.id=metrics-group-test
kafka.zookeeper.connect=xxx:2181
metrics.topic=xxx
stream.parallelism=5
kafka.sink.brokers=localhost:9092
kafka.sink.topic=metric-test
stream.checkpoint.interval=1000
stream.checkpoint.enable=false
stream.sink.parallelism=5

目前我们先看下本地 Kafka 是否有这个 metric-test topic 呢?需要执行下这个命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

087a6f41b8b3d6ef2015736d2cc39290.png

可以看到本地的 Kafka 是没有任何 topic 的,如果等下我们的程序运行起来后,再次执行这个命令出现 metric-test topic,那么证明我的程序确实起作用了,已经将其他集群的 Kafka 数据写入到本地 Kafka 了。

程序代码

Main.java

public 

运行结果

启动程序,查看运行结果,不段执行上面命令,查看是否有新的 topic 出来:

763326b7b0d13d98c80655cfa1de6093.png

执行命令可以查看该 topic 的信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test

724c4a3a6d4750c1c45839fcfe498157.png

分析

上面代码我们使用 Flink Kafka Producer 只传了三个参数:brokerList、topicId、serializationSchema(序列化)

a6cbc50413ed49fd666dc4daa6bbefed.png

其实也可以传入多个参数进去,现在有的参数用的是默认参数,因为这个内容比较多,后面可以抽出一篇文章单独来讲。

总结

本篇文章写了 Flink 读取其他 Kafka 集群的数据,然后写入到本地的 Kafka 上。我在 Flink 这层没做什么数据转换,只是原样的将数据转发了下,如果你们有什么其他的需求,是可以在 Flink 这层将数据进行各种转换操作,比如这篇文章中的一些转换:《从0到1学习Flink》—— Flink Data transformation(转换),然后将转换后的数据发到 Kafka 上去。

本文原创地址是: http://www.54tianzhisheng.cn/2019/01/06/Flink-Kafka-sink/ , 未经允许禁止转载。

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

6276b6bf14f626de1ab6611a8e8c733b.png

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt中的私有信号

一、什么是Qt私有信号&#xff1f; 直接引用Qt文档中的描述&#xff1a; 二、私有信号的作用 私有信号只能被响应&#xff0c;不能被用户代码来发射&#xff08;emit&#xff09;。这是一种对某些信号的权限控制&#xff0c;也就是用户代码没有权力“发号施令”&#xff0c;只…

opencv获取图片像素坐标_利用OpenCV从图片中提取矩形并标注坐标(室内平面地图)(一)

​某城市会展中心室内地图背景一名室内设计师的日常工作从设计一张会展地图开始。常常有这样的场景&#xff1a;划分除规范的展位后&#xff0c;进入销售阶段&#xff0c;频繁的需要修改这张地图&#xff0c;如展示拆分、合并、换位置、标记已交易。问题从上图中标记色块的是有…

【转】C#中ToString()格式详解

以下内容均摘自博客园&#xff0c;仅供资料查询。 ToString格式化 在很多对象显示为字符串的时候都会使用到ToString中的格式化&#xff0c;由于以前没怎么注意到这个问题&#xff0c;想总结一下各个基础结构对象的格式化&#xff0c;以便后备之用&#xff01;&#xff01;&am…

【编译原理】入门总结

教程资源 入门教程在&#xff1a;手把手教你做一个 C 语言编译器 学习过程 19年尝试学了一下&#xff0c;中途看不懂放弃了。20年底从头再看一遍&#xff0c;经过一年的知识积累&#xff0c;在仔细研读之下&#xff0c;终于算是学懂了。此文中记录了我在最初学习时遇到的问题…

Qt添加翻译文件

以在Qt Creator中开发为例&#xff1a; 第一步 在.pro文件中添加一行 TRANSLATIONS projectName_zh.ts 保存&#xff0c;执行一次qmake。 注&#xff1a;ts文件是xml文件保存了需要翻译的信息。 第二步 选择菜单&#xff1a;工具->外部->Qt预言家->更新翻译。 可…

应用程序标准输入输出、Shell、程序界面的关系

发展史 计算机在发展初期&#xff0c;电脑上的软件是没有窗口的&#xff0c;只有一个命令行&#xff0c;这个软件叫Shell&#xff0c;中文的意思是外壳。Shell是一个统一的叫法&#xff0c;实际在不同的系统中&#xff0c;又有很多种Shell软件&#xff0c;如下表所示&#xff…

安卓简单天气预报app源码_七个个小众但实用的APP,效率翻倍~

推荐7个小众但实用的APP1、PDF处理助手下面就是软件的启动图&#xff0c;没有任何广告。并且直接标明了这个软件的三大特点&#xff1a;简单、免费、快捷下面就是软件的启动图&#xff0c;没有任何广告。而且免注册登录即可使用&#xff0c;简直是一款良心软件了。2、菜鸟教程菜…

【转】Jenkins详细教程

最近花了一段时间研究jenkins这个工具。所以写下这篇文章&#xff0c;算是当做记录吧&#xff01; 一、jenkins是什么&#xff1f; Jenkins是一个开源的、提供友好操作界面的持续集成(CI)工具&#xff0c;起源于Hudson&#xff08;Hudson是商用的&#xff09;&#xff0c;主要…

拼接符 防注入正则校验_Apache Kylin 命令注入漏洞调试分析(CVE-2020-1956)

1、前言Apache Kylin是一个开源的、分布式的分析型数据仓库&#xff0c;提供Hadoop/Spark 之上的 SQL 查询接口及多维分析(OLAP)能力以支持超大规模数据。近日&#xff0c;百度云安全团队监测到Apache官方发出了一个漏洞通告&#xff0c;披露了Apache kylin多版本存在命令注入漏…

can使能上拉 gpio_IMX6ULL 的 GPIO 操作方法

来源&#xff1a;百问网作者&#xff1a;韦东山本文字数&#xff1a;1652&#xff0c;阅读时长&#xff1a;4分钟CCM: Clock Controller Module (时钟控制模块) IOMUXC : IOMUX Controller&#xff0c;IO 复用控制器 GPIO: General-purpose input/output&#xff0c;通用的输入…

【转】.NET 的 WebSocket 开发包比较

转载于http://www.oschina.net/translate/websocket-libraries-comparison-2 编者按 本文出现在第三方产品评论部分中。在这一部分的文章只提供给会员&#xff0c;不允许工具供应商用来以任何方式和形式来促销或宣传产品。请会员报告任何垃圾信息或广告。 Web项目常常需要将数…

vb checkbox选中和不选中_UE4 4.23 RetainerBox 选中框位置不正确

Bug表现在UE4 4.23版本&#xff0c;实现自定义控件时&#xff0c;继承了RetainerBox&#xff0c;发现当内部包裹子控件时&#xff0c;先编辑器面板选中子物体&#xff0c;发现选中位置有偏移。位置偏移当窗口缩放时&#xff0c;选中框位置发现当窗口最小化时&#xff0c;偏移位…

去哪查阅ISO国际标准?

ISO官网 访问ISO官网&#xff0c;不需要翻墙&#xff0c;只要懂点英文就行。 ISO官网地址&#xff1a;https://www.iso.org/&#xff08;可进行全面了解、购买付费内容&#xff09; ISO部分公开标准下载地址&#xff1a;https://standards.iso.org/ittf/PubliclyAvailableSt…

【转】开源的C# websocket-sharp组件解析

下面我们介绍一款WebSocket组件websocket-sharp的相关内容。 一.websocket-sharp组件概述 websocket-sharp是一个C#实现websocket协议客户端和服务端&#xff0c;websocket-sharp支持RFC 6455&#xff1b;WebSocket客户端和服务器&#xff1b;消息压缩扩展&#xff1b;安全连接…

java异或_JAVA面试必备之HashMap必会点

今天我们就面试会问到关于HashMap的问题进行一个汇总&#xff0c;以及对这些问题进行解答。1、HashMap的数据结构是什么&#xff1f;2、为啥是线程不安全的&#xff1f;3、Hash算法是怎样实现的&#xff1f;4、HashMap是如何处理Hash碰撞的&#xff1f;5、增加元素的方法是怎么…

对分查找的最多次数_「剑指offer题解」数组中出现次数超过一半的数字

关注我——个人公众号&#xff1a;后端技术漫谈我目前是一名后端开发工程师。主要关注后端开发&#xff0c;数据安全&#xff0c;网络爬虫&#xff0c;物联网&#xff0c;边缘计算等方向。原创博客主要内容Java知识点复习全手册Leetcode算法题解析剑指offer算法题解析SpringClo…

rabbitmq导出队列_消息队列BCMQ在大云运维管理平台BCDeepWatch中的应用

友情提示&#xff1a;全文约2600字&#xff0c;预计阅读时间12分钟摘要消息队列作为重要的中间件&#xff0c;广泛用于分布式系统中各子系统间的异步解耦&#xff1b;本文主要介绍了大云消息队列中间件BC-MQ在BC-DeepWatch中的应用案例。一、消息队列应用场景简介消息队列是分布…

【编译原理】如何编写BNF?

此篇文章承接上一篇&#xff1a;【编译原理】理解BNF 前言 理解了BNF&#xff0c;就能实现代码解析了吗&#xff1f;还有点早&#xff0c;因为理解了BNF&#xff0c;还要会写BNF。实际上&#xff0c;BNF实现有固定的模式&#xff0c;也有现成的工具&#xff0c;比如可以使用ya…

python 当前时间减一个月_python排序了解一下

排序是每个开发人员都需要掌握的技能。排序是对程序本身有一个全面的理解。不同的排序算法很好地展示了算法设计上如何强烈的影响程序的复杂度、运行速度和效率。今天的文章和谈谈大家都熟悉的各种排序使用 Python 如何实现&#xff0c;废话就不多说啦&#xff0c;开干&#xf…

【转】4.1触碰jQuery:AJAX异步详解

传送门&#xff1a;异步编程系列目录…… 示例源码&#xff1a;触碰jQuery&#xff1a;AJAX异步详解.rar AJAX 全称 Asynchronous JavaScript and XML&#xff08;异步的 JavaScript 和 XML&#xff09;。它并非一种新的技术&#xff0c;而是以下几种原有技术的结合体。 1) 使…