centos7安装flink集群_《从0到1学习Flink》—— Flink 写入数据到 Kafka

c94060866227a5cb0c61925fd714c285.png

前言

之前文章 《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch 写了如何将 Kafka 中的数据存储到 ElasticSearch 中,里面其实就已经用到了 Flink 自带的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一种情况,那么如果我们有多个地方需要这份通过 Flink 转换后的数据,是不是又要我们继续写个 sink 的插件呢?确实,所以 Flink 里面就默认支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那么这篇文章我们就讲讲如何将数据写入到 Kafka。

08dde176b41ce7f88193fd3cea4bcd98.png

准备

添加依赖

Flink 里面支持 Kafka 0.8、0.9、0.10、0.11 ,以后有时间可以分析下源码的实现。

71709041651c345e7154f58a55db3ec8.png

这里我们需要安装下 Kafka,请对应添加对应的 Flink Kafka connector 依赖的版本,这里我们使用的是 0.11 版本:

<dependency>

Kafka 安装

这里就不写这块内容了,可以参考我以前的文章 Kafka 安装及快速入门。

这里我们演示把其他 Kafka 集群中 topic 数据原样写入到自己本地起的 Kafka 中去。

配置文件

kafka.brokers=xxx:9092,xxx:9092,xxx:9092
kafka.group.id=metrics-group-test
kafka.zookeeper.connect=xxx:2181
metrics.topic=xxx
stream.parallelism=5
kafka.sink.brokers=localhost:9092
kafka.sink.topic=metric-test
stream.checkpoint.interval=1000
stream.checkpoint.enable=false
stream.sink.parallelism=5

目前我们先看下本地 Kafka 是否有这个 metric-test topic 呢?需要执行下这个命令:

bin/kafka-topics.sh --list --zookeeper localhost:2181

087a6f41b8b3d6ef2015736d2cc39290.png

可以看到本地的 Kafka 是没有任何 topic 的,如果等下我们的程序运行起来后,再次执行这个命令出现 metric-test topic,那么证明我的程序确实起作用了,已经将其他集群的 Kafka 数据写入到本地 Kafka 了。

程序代码

Main.java

public 

运行结果

启动程序,查看运行结果,不段执行上面命令,查看是否有新的 topic 出来:

763326b7b0d13d98c80655cfa1de6093.png

执行命令可以查看该 topic 的信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test

724c4a3a6d4750c1c45839fcfe498157.png

分析

上面代码我们使用 Flink Kafka Producer 只传了三个参数:brokerList、topicId、serializationSchema(序列化)

a6cbc50413ed49fd666dc4daa6bbefed.png

其实也可以传入多个参数进去,现在有的参数用的是默认参数,因为这个内容比较多,后面可以抽出一篇文章单独来讲。

总结

本篇文章写了 Flink 读取其他 Kafka 集群的数据,然后写入到本地的 Kafka 上。我在 Flink 这层没做什么数据转换,只是原样的将数据转发了下,如果你们有什么其他的需求,是可以在 Flink 这层将数据进行各种转换操作,比如这篇文章中的一些转换:《从0到1学习Flink》—— Flink Data transformation(转换),然后将转换后的数据发到 Kafka 上去。

本文原创地址是: http://www.54tianzhisheng.cn/2019/01/06/Flink-Kafka-sink/ , 未经允许禁止转载。

关注我

微信公众号:zhisheng

另外我自己整理了些 Flink 的学习资料,目前已经全部放到微信公众号了。你可以加我的微信:zhisheng_tian,然后回复关键字:Flink 即可无条件获取到。

6276b6bf14f626de1ab6611a8e8c733b.png

Github 代码仓库

https://github.com/zhisheng17/flink-learning/

以后这个项目的所有代码都将放在这个仓库里,包含了自己学习 flink 的一些 demo 和博客

相关文章

1、《从0到1学习Flink》—— Apache Flink 介绍

2、《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

3、《从0到1学习Flink》—— Flink 配置文件详解

4、《从0到1学习Flink》—— Data Source 介绍

5、《从0到1学习Flink》—— 如何自定义 Data Source ?

6、《从0到1学习Flink》—— Data Sink 介绍

7、《从0到1学习Flink》—— 如何自定义 Data Sink ?

8、《从0到1学习Flink》—— Flink Data transformation(转换)

9、《从0到1学习Flink》—— 介绍Flink中的Stream Windows

10、《从0到1学习Flink》—— Flink 中的几种 Time 详解

11、《从0到1学习Flink》—— Flink 写入数据到 ElasticSearch

12、《从0到1学习Flink》—— Flink 项目如何运行?

13、《从0到1学习Flink》—— Flink 写入数据到 Kafka

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/437189.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【转】C#字节数组_字符串相互转换

https://www.cnblogs.com/Asa-Zhu/archive/2012/11/08/2761137.html 如果还想从 System.String 类中找到方法进行字符串和字节数组之间的转换&#xff0c;恐怕你会失望了。为了进行这样的转换&#xff0c;我们不得不借助另一个类&#xff1a;System.Text.Encoding。该类提供了…

Qt中的私有信号

一、什么是Qt私有信号&#xff1f; 直接引用Qt文档中的描述&#xff1a; 二、私有信号的作用 私有信号只能被响应&#xff0c;不能被用户代码来发射&#xff08;emit&#xff09;。这是一种对某些信号的权限控制&#xff0c;也就是用户代码没有权力“发号施令”&#xff0c;只…

opencv获取图片像素坐标_利用OpenCV从图片中提取矩形并标注坐标(室内平面地图)(一)

​某城市会展中心室内地图背景一名室内设计师的日常工作从设计一张会展地图开始。常常有这样的场景&#xff1a;划分除规范的展位后&#xff0c;进入销售阶段&#xff0c;频繁的需要修改这张地图&#xff0c;如展示拆分、合并、换位置、标记已交易。问题从上图中标记色块的是有…

【转】C#中ToString()格式详解

以下内容均摘自博客园&#xff0c;仅供资料查询。 ToString格式化 在很多对象显示为字符串的时候都会使用到ToString中的格式化&#xff0c;由于以前没怎么注意到这个问题&#xff0c;想总结一下各个基础结构对象的格式化&#xff0c;以便后备之用&#xff01;&#xff01;&am…

【编译原理】入门总结

教程资源 入门教程在&#xff1a;手把手教你做一个 C 语言编译器 学习过程 19年尝试学了一下&#xff0c;中途看不懂放弃了。20年底从头再看一遍&#xff0c;经过一年的知识积累&#xff0c;在仔细研读之下&#xff0c;终于算是学懂了。此文中记录了我在最初学习时遇到的问题…

python数据可视化从入门到实战_《Python编程从入门到实践》json数据可视化练习详解...

《Python编程从入门到实践》16.2中&#xff0c;计算收盘价均值的程序有些不易看懂&#xff0c;结合我自己的理解进行一些说明。使用的数据集&#xff1a;join格式的数据&#xff0c;数据集是由多个字典为元素组成的列表。每个字典包含如下信息[{"date": "2017-0…

【转】C# WebAPI中为自定义模型设置JSonConverter

我的WebAPI应用程序中有一个模型,用.NET 4.0编写,含有一个System.Net.Mime.ContentType类型的属性,如下所示&#xff1a; [Serializable] public class FileData {private ContentType contentType;private long size;private string name;public ContentType ContentType{get …

Qt添加翻译文件

以在Qt Creator中开发为例&#xff1a; 第一步 在.pro文件中添加一行 TRANSLATIONS projectName_zh.ts 保存&#xff0c;执行一次qmake。 注&#xff1a;ts文件是xml文件保存了需要翻译的信息。 第二步 选择菜单&#xff1a;工具->外部->Qt预言家->更新翻译。 可…

python爬虫淘宝手机_【Python3 爬虫】14_爬取淘宝上的手机图片

现在我们想要使用爬虫爬取淘宝上的手机图片&#xff0c;那么该如何爬取呢&#xff1f;该做些什么准备工作呢&#xff1f;首先&#xff0c;我们需要分析网页&#xff0c;先看看网页有哪些规律我们可以看到左侧是主题市场&#xff0c;将鼠标移动到【女装/男装/内衣】这一栏目&…

【转】WebSocket初探

定义&#xff1a; 遵循RFC6544协议的通信协议。Webcoket协议和http协议属于并行关系&#xff0c;但是websocket是以http协议为基础开发出来的&#xff08;微软用IhttpHandler接口中同时处理这两种协议&#xff09;&#xff0c;同时他们都是以TCP协议为基础。可以进行双向通信、…

应用程序标准输入输出、Shell、程序界面的关系

发展史 计算机在发展初期&#xff0c;电脑上的软件是没有窗口的&#xff0c;只有一个命令行&#xff0c;这个软件叫Shell&#xff0c;中文的意思是外壳。Shell是一个统一的叫法&#xff0c;实际在不同的系统中&#xff0c;又有很多种Shell软件&#xff0c;如下表所示&#xff…

安卓简单天气预报app源码_七个个小众但实用的APP,效率翻倍~

推荐7个小众但实用的APP1、PDF处理助手下面就是软件的启动图&#xff0c;没有任何广告。并且直接标明了这个软件的三大特点&#xff1a;简单、免费、快捷下面就是软件的启动图&#xff0c;没有任何广告。而且免注册登录即可使用&#xff0c;简直是一款良心软件了。2、菜鸟教程菜…

【转】Jenkins详细教程

最近花了一段时间研究jenkins这个工具。所以写下这篇文章&#xff0c;算是当做记录吧&#xff01; 一、jenkins是什么&#xff1f; Jenkins是一个开源的、提供友好操作界面的持续集成(CI)工具&#xff0c;起源于Hudson&#xff08;Hudson是商用的&#xff09;&#xff0c;主要…

【Github】怎么在README.md中添加图片?

原理是将图片作为文档上传&#xff0c;在README.md中引用即可。 参考博客&#xff1a;【GitHub】给GitHub上的ReadMe.md文件中添加图片怎么做 、 gitHub创建文件夹

拼接符 防注入正则校验_Apache Kylin 命令注入漏洞调试分析(CVE-2020-1956)

1、前言Apache Kylin是一个开源的、分布式的分析型数据仓库&#xff0c;提供Hadoop/Spark 之上的 SQL 查询接口及多维分析(OLAP)能力以支持超大规模数据。近日&#xff0c;百度云安全团队监测到Apache官方发出了一个漏洞通告&#xff0c;披露了Apache kylin多版本存在命令注入漏…

【转】一个ASP.NET MVC中ajax调用WebApi返回500 Internal Server Error的调错方法。

ASP.NET MVC 引入的WebApi自然且较好地满足了ajax的交互需求&#xff0c;但使用jQuery ajax调用WebApi返回500 Internal Server Error时却不太好查找错误。在一个实际项目中&#xff0c;WebApi方法全部使用了try-catch捕获异常&#xff0c;并返回定制的错误消息&#xff0c;想当…

【Github】开源项目xterm.js

项目github地址&#xff1a; xterm.js on github 学习笔记 从项目介绍中进行知识拓展 名称资料npmnpm 是干什么的&#xff1f;&#xff08;非教程&#xff09;semversemver&#xff1a;语义化版本规范在 Node.js 中的实现Hyper软件官网 >Theia官网 >Electron官网 >…

can使能上拉 gpio_IMX6ULL 的 GPIO 操作方法

来源&#xff1a;百问网作者&#xff1a;韦东山本文字数&#xff1a;1652&#xff0c;阅读时长&#xff1a;4分钟CCM: Clock Controller Module (时钟控制模块) IOMUXC : IOMUX Controller&#xff0c;IO 复用控制器 GPIO: General-purpose input/output&#xff0c;通用的输入…

【转】.NET 的 WebSocket 开发包比较

转载于http://www.oschina.net/translate/websocket-libraries-comparison-2 编者按 本文出现在第三方产品评论部分中。在这一部分的文章只提供给会员&#xff0c;不允许工具供应商用来以任何方式和形式来促销或宣传产品。请会员报告任何垃圾信息或广告。 Web项目常常需要将数…

【编译原理】为什么编程语言中,标识符不能以数字开头?

标识符不能以数字为开头, 是为了简化词法解析器设计和实现&#xff0c;规避词法解析中以数字开头的变量与数字解析冲突的问题。 如果两种类型的词&#xff0c;如果起始符号不同&#xff0c;那么可以很容易把二者区分开&#xff1b;如果起始符号相同&#xff0c;那么以下符号&a…