Flink-Sink_将结果输出到Kafka_Redis_ES_Mysql中

Sink

将计算好结果输出到外部系统, 调用 addSink()传入指定的SinkFunction()

  1. 将结果输出到 Kafka 中
  2. 将结果输出到 Redis 中
  3. 将结果输出到 ES 中
  4. 将结果输出到 Mysql 中: 事先创建好表结构

pom.xml 事先导入对应的 connector:

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!--依赖的一些组件需要 Scala 环境--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency><!--kafka依赖--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.12</artifactId><version>1.10.1</version></dependency><!--rabbit依赖--><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-rabbitmq --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>1.10.1</version></dependency><!--flink 数据存入 redis--><!-- https://mvnrepository.com/artifact/org.apache.bahir/flink-connector-redis --><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch-base --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.1</version></dependency><!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.44</version></dependency></dependencies>

实操代码如下:

import com.regotto.entity.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.ArrayList;
import java.util.HashMap;/*** @author regotto*/
public class SinkTest {private static void saveToRedis(DataStream<SensorReading> dataStream) {FlinkJedisPoolConfig.Builder builder = new FlinkJedisPoolConfig.Builder();builder.setHost("localhost");// 顶级接口 SinkFunction, 核心方法 invokedataStream.addSink(new RedisSink<>(builder.build(), new RedisMapper<SensorReading>() {/*** 将温度数据保存为 id-temperature hash 形式到 redis* @return*/@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.HSET, "sensor");}@Overridepublic String getKeyFromData(SensorReading sensorReading) {return sensorReading.getId();}@Overridepublic String getValueFromData(SensorReading sensorReading) {return sensorReading.getTemperature().toString();}}));}private static void saveToKafka(DataStream<SensorReading> dataStream) {// 将数据输出到 Kafka 中dataStream.map((MapFunction<SensorReading, String>) value -> value.toString()).addSink(new FlinkKafkaProducer011<String>("localhost:9092", "test", new SimpleStringSchema()));}private static void saveToEs(DataStream<SensorReading> dataStream) {// 将数据输出到 ElasticSearchArrayList<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(new HttpHost("localhost", 9200));//真正的 SinkFunction 是 ElasticsearchSink(使用构建者构建), ElasticsearchSinkFunction 只是负责处理以哪种方式存入dataStream.addSink(new ElasticsearchSink.Builder<>(httpHosts, (ElasticsearchSinkFunction<SensorReading>) (sensorReading, runtimeContext, requestIndexer) -> {HashMap<String, String> source = new HashMap<>();source.put("id", sensorReading.getId());source.put("temp", sensorReading.getTemperature().toString());source.put("time", sensorReading.getTimestamp().toString());IndexRequest indexRequest = Requests.indexRequest().index("sensor").type("readingData").source(source);requestIndexer.add(indexRequest);}).build());}private static void saveToMysql(DataStream<SensorReading> dataStream) {/*由于性能问题, 官方未提供 mysqlSink, 将数据存入 mysql, 自定义 sinkjdbc 要连接处理, 使用 RichSinkFunction, 利用 open, close 方法*/dataStream.addSink(new RichSinkFunction<SensorReading>() {Connection connection = null;PreparedStatement insertStatement = null;@Overridepublic void open(Configuration parameters) throws Exception {Class.forName("com.mysql.jdbc.Driver");connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test", "root", "123456");insertStatement = connection.prepareStatement("insert into sensorreading (id, timestamp, temperature)values(?,?,?)");}@Overridepublic void invoke(SensorReading value, Context context) throws Exception {insertStatement.setString(1, value.getId());insertStatement.setLong(2, value.getTimestamp());insertStatement.setDouble(3, value.getTemperature());insertStatement.execute();}@Overridepublic void close() throws Exception {insertStatement.close();connection.close();}});}public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> input = env.readTextFile("sensor.txt");DataStream<SensorReading> dataStream = input.map((MapFunction<String, SensorReading>) value -> {String[] split = value.split(",");return new SensorReading(split[0], Long.valueOf(split[1]), Double.valueOf(split[2]));});saveToMysql(dataStream);env.execute();}
}

总结

进行数据存储时, 指定对应 SinkFunction 即可.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/468520.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ado 字符串变量

这次变量主要针对 Mfc 的 Cstring 类型的变量&#xff08;前面VC 链接Access 数据库 插入变量到表&#xff09; 思路; 1 把cstring 类型 转为 string 2 string 转 char 数组 3 sprintf 写入数组 string 转 char 数组函数[cpp]view plaincopyprint?char* zhuanhuan(std::strin…

周立功先生和他的AWorks团队招聘

我之前写的一篇文章&#xff0c;介绍了周立功先生&#xff0c;我记得那篇文章的阅读量非常多&#xff0c;也让我迎来一段小高潮&#xff0c;随着时间的推移&#xff0c;慢慢的增加了我对周立功先生的了解&#xff0c;我们很多人&#xff0c;像我吧&#xff0c;工作的时候&#…

mongodb python 大于_Python中使用MongoDB详解

作者&#xff1a;Zarten知乎专栏&#xff1a;Python爬虫深入详解知乎ID&#xff1a; Zarten简介&#xff1a; 互联网一线工作者&#xff0c;尊重原创并欢迎评论留言指出不足之处&#xff0c;也希望多些关注和点赞是给作者最好的鼓励 &#xff01;介绍MongoDB是一种面向文档型的…

这不是商业互吹,是学习的宝藏

学习如逆水行舟&#xff0c;不进则退&#xff1b;只有坚持不断的学习,才能保持进步。今天给大家精心挑选的这几个优质的公众号&#xff0c;在行业深耕已久&#xff0c;相信大家一定会有所收获&#xff0c;感兴趣的可以关注一下。互联网架构师 号主985计算机硕士毕业&#xff…

【Ubuntu】ubuntu系统下python3和python2环境自由切换

shell里执行&#xff1a;sudo update-alternatives --install /usr/bin/python python /usr/local/lib/python2.7 100sudo update-alternatives --install /usr/bin/python python /usr/local/lib/python3.2 150此时你会发现如果要切换到Python2&#xff0c;执行&#xff1a;su…

打印机更换感光鼓单元k_干货,激光打印机常见故障维修方法总结

激光打印机是日常生活和办公中运用较多的打印机类型下面&#xff0c;我们来总结一下激光打印机常见故障维修方法。硒鼓组件常见故障&#xff0c;维修方法。激光打印机硒鼓的常见故障包括硒鼓漏粉&#xff0c;打印出黑横线&#xff0c;打印文件颜色不正常打印的图像及文字变形&a…

关于这些那些

关于篮球先说下&#xff0c;我刚才已经写完文章了&#xff0c;但是因为没有保存&#xff0c;浏览器想着周末早点回去休息就闪退了&#xff0c;把写好的文章给闪退没有了&#xff0c;这个真是拿起自己的坑砸死了自己&#xff0c;那种赶脚只有自己能够明白&#xff0c;真的是太难…

mysqldump 定时备份数据(全量)

MYSQL 数据库备份有很多种(cp、tar、lvm2、mysqldump、xtarbackup)等等&#xff0c;具体使用哪一个还要看你的数据规模。下面给出一个表 #摘自《学会用各种姿态备份Mysql数据库》 备份方法备份速度恢复速度便捷性功能一般用于cp快快一般、灵活性低很弱少量数据备份mysqldump慢慢…

第3章 Linux内核调试手段之内核打印

开始前面说的话在我写代码的生涯里&#xff0c;我看到过很多大神炫耀自己的调试手段&#xff0c;也看到很多大神写过非常厉害的代码&#xff0c;我认为&#xff0c;相比于写代码&#xff0c;调试更加重要&#xff0c;而那些能在写代码的时候就加入了自己的调试信息的&#xff0…

电源管理 解析_智能电源控制箱

智能电源控制箱?智能电源控制箱又被称之为&#xff1a;智能监控箱、智能设备箱、智能运维箱&#xff0c;智能电源控制箱的作用主要就是为视频监控打造良好的运行环境&#xff0c;保障视频监控系统稳定的运行。说到视频监控&#xff0c;大家都知道视频监控的故障率是比较高的&a…

centos7 开机后进去了命令行_Linux系统管理:开机启动流程(二)

CentOS71.BIOS(开机自检)2.MBR ( Master Boot Record 主引导记录)3.GRUB2 Bootloader&#xff08;引导菜单&#xff09;4.Kernel&#xff08;内核引导&#xff09;5.Systemd &#xff08;不再使用init&#xff0c;改成了systemd&#xff09;6.Runlevel-Target &#xff08;运行…

一点小思考

我记得12年的时候&#xff0c;我就申请了微信公众号&#xff0c;那时候我的号主是TCL&#xff0c;是公司的同事用我的微信号申请公司的主体号&#xff0c;那时候我也有一点想法自己做个公众号写点文章&#xff0c;但是一直没有下决心&#xff0c;后来离职了&#xff0c;原来用我…

DetachedCriteria和Criteria的使用方法

DetachedCriteria和Criteria的使用方法 /* * 下载统计 * return */ public String downloadStatistics(){ logger.info("正在执行目录内容统计下载"); String type getServletRequest().getParameter("type"); DetachedCriteria dc DetachedCrit…

第3章 Linux内核调试手段之二

gdb 和 addr2line 调试内核模块内核模块插入内核链表的时候&#xff0c;会调用 init 里面的程序&#xff0c;我们上面给的那个例程的程序因为是经过多年风吹雨打的&#xff0c;但是如果你是一个萌新的码农&#xff0c;你能保证自己写的内核模块没有问题吗&#xff1f;所以就需要…

儿童手表怎么删除联系人_华为儿童手表4X体验:与你一起守护孩子的成长,带娃不再辛苦...

带娃是一件很辛苦的事情&#xff0c;从身体到精神的辛苦&#xff0c;相信各位家长都懂。对于照看正在成长期的孩子&#xff0c;更是让很多家长亲身感受"成长的烦恼"。孩子活泼好动的天性让很多家长都不放心&#xff0c;同时大部分家长又没有能力随时在身边守护&#…

Jmeter分布式测试过程中遇到的问题及摘抄前辈问题汇总

遇到的常见问题&#xff1a; 1、在Controller端上控制某台机器Run&#xff0c;提示"Bad call to remote host"。 解决方法&#xff1a;检查被控制机器上的jmeter-server有没有启动&#xff0c;或者JMeter.properties中remote_hosts的配置错误。2、Agent机器启动Jmete…

介绍一个我创业的朋友

大家好&#xff0c;今天给大家介绍一位跟我一样正在创业路上的朋友&#xff0c;不知道大家对我之前的文章还有没有印象&#xff0c;最近我在做一件特别有意思的事情&#xff0c;这件有意思的事情一直催促着我起床上班&#xff0c;每天都充满能量和动力&#xff0c;又累又充实的…

微信小程序装修解决方案ppt_装修公司微信小程序都有哪些功能?

传统装修行业存在收费不透明、消费者装修服务过程体验差等问题&#xff0c;传统装修已无法满足消费者的实际需求&#xff0c;面临无客量、无签约的困境。然而&#xff0c;随着移动互联网的发展&#xff0c;许多装饰企业利用微信小程序来帮助其疏导和推广。一个装修公司小程序能…

JavaScript一个简易枚举类型实现扑克牌

<script type"text/javascript"> /*** 这个函数创建一个新的枚举类型&#xff0c;实参对象表示类的每个实例的名字和值* 返回值是一个构造函数&#xff0c;它标识这个新类* 注意&#xff0c;这个构造函数也会抛出异常&#xff0c;不能使用它来创建该类型的新实…

要用什么态度去面对生活?

最近&#xff0c;张扣扣的新闻铺天盖地&#xff0c;因为我非常喜欢逛知乎&#xff0c;刚好张扣扣的新闻这几天上了知乎热搜&#xff0c;所以我就关注上了&#xff0c;说实话&#xff0c;有点痛心&#xff0c;外人看起来很爽&#xff0c;忍辱负重好多年&#xff0c;终于把自己的…