flink入门

1.安装flink,启动flink

文档地址:Apache Flink 1.3-SNAPSHOT 中文文档: Apache Flink 中文文档

代码:GitHub - apache/flink: Apache Flink

2. 打开端口  端口号, 启动jar

### 切换到flink 目录bin下
[root@localhost ~]# cd /home/flink/flink-1.14.4/bin/
### 运行
[root@localhost bin]# ./start-cluster.sh###开启端口9000
nc -l  9000
#### 运行jar./bin/flink  run  /home/flink/flink-1.14.4/examples/streaming/SocketWindowWordCount.jar --port 9000

3.测试jar,输入字符

注:1. 部署启动遇到的jar缺失

注释:jar可以下载源码查看,方法如图所示,也可以根据错误信息搜索对应的包

附: mysql+mq+mybatis +spring 需要的包

2,.代码

package com.javaland.flink.mq;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;public class Mq2Flink {/*** 实时监控mq数据,插入到mysql数据库*/public static void mq2mysql() throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}}
package com.javaland.flink.mq;import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.javaland.flink.mapper.MessageMapper;
import com.javaland.flink.po.MessagePO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import java.util.List;public class SinkToMySQL extends RichSinkFunction<String> {static MybatisSqlSessionFactoryBean sqlSessionFactory;static SqlSessionFactory sessionFactory;static SqlSession sqlSession;static {sqlSessionFactory = new MybatisSqlSessionFactoryBean();// 配置多数据源PooledDataSource pooledDataSource = new PooledDataSource();pooledDataSource.setDriver("com.mysql.cj.jdbc.Driver");pooledDataSource.setUsername("root");pooledDataSource.setPassword("root");pooledDataSource.setUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai");sqlSessionFactory.setDataSource(pooledDataSource);try {sqlSessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));sqlSessionFactory.setTypeAliasesPackage("com.javaland.flink.po");sessionFactory = sqlSessionFactory.getObject();} catch (Exception e) {throw new RuntimeException(e);}}/*** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);sqlSession = sessionFactory.openSession();}/*** @throws Exception*/@Overridepublic void close() throws Exception {super.close();}/*** @param value* @param context*/@Overridepublic void invoke(String value, Context context) {MessageMapper messageMapper = sqlSession.getMapper(MessageMapper.class);MessagePO messagePO=new MessagePO();messagePO.setUsername(value);messageMapper.insert(messagePO);List<MessagePO> all = messageMapper.selectList(null);if(all!=null){for (int i = all.size() - 1; i >= 0; i--) {System.out.println("查询的message:"+all.get(i));}}}}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.javaland</groupId><artifactId>javaland</artifactId><version>0.0.1</version></parent><packaging>jar</packaging><groupId>org.javaland</groupId><artifactId>javaland-flink</artifactId><properties><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-force-shading</artifactId><version>14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.11.1</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.18</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><executions><execution><id>mq2flink</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Mq2Flink</classifier><archive><manifestEntries><program-class>com.javaland.flink.Mq2Flink</program-class></manifestEntries></archive></configuration></execution><execution><id>Flink2Mq</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Flink2Mq</classifier><archive><manifestEntries><program-class>com.javaland.flink.Flink2Mq</program-class></manifestEntries></archive></configuration></execution></executions></plugin></plugins></build></project>

3. 最后打包的好处就是可以部署多个job

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148701.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

参考文献格式

目录 期刊会议预印本&#xff08;如arxiv&#xff09; 期刊 找不到页码可以在文献中查看bibtex格式&#xff0c;其中有 外文期刊可在web of science中查找卷号、期号和所在页数&#xff1a; [1] ZHANG F, HU Z Q, FU Y K, et al. A New Identification Method for Surface …

【0到1学习Unity脚本编程】第一人称视角的角色控制器

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;【0…

自动驾驶学习笔记(九)——车辆控制

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo Beta宣讲和线下沙龙》免费报名—>传送门 文章目录 前言 控制器设计 比例积分微分控制 线性…

Kotlin 知识体系

Kotlin 知识体系 1、Kotlin 文档2、Kotlin 基础3、桌面应用程序4、Android 与 iOS 应用程序 1、Kotlin 文档 Kotlin 是一门现代但已成熟的编程语言&#xff0c;旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作&#xff0c;并提供了多种方式在多个平台间复…

『亚马逊云科技产品测评』活动征文|借助AWS EC2搭建服务器群组运维系统Zabbix+spug

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道。 本文基于以下软硬件工具&#xff1a; aws ec2 frp-0.52.3 zabbix 6…

LRU最近最少使用算法

LRU(LeastRecentlyUsed)“最近最少使用”算法&#xff1a; 1.当缓存空间已满耗用时&#xff0c;淘汰最近最少使用数据的缓存对象以释放更多的缓存空间(用于历史缓存对象的维护)。 2. 哈希表:快速查找缓存对象&#xff1b;双向链表:维护 历史数据所在的节点顺序。 步骤&#xff…

掌握深度学习利器——TensorFlow 2.x实战应用与进阶

掌握深度学习利器——TensorFlow 2.x实战应用与进阶 摘要&#xff1a;随着人工智能技术的飞速发展&#xff0c;深度学习已成为当下最热门的领域之一。作为深度学习领域的重要工具&#xff0c;TensorFlow 2.x 备受关注。本文将通过介绍TensorFlow 2.x的基本概念和特性&#xff…

在 Linux 上搭建 Java Web 项目环境(最简单的进行搭建)

要在 Linux 上安装的程序有 1.JDK (要想运行 java 程序 JDK 是必不可少的) 2.Tomcat &#xff08;HTTP 服务器&#xff0c;是管理 Web 项目的常用工具&#xff09; 3. mysql &#xff08;数据库&#xff09; 一.安装 JDK 博主使用的 Linux 发行版是 centos &#xff0c;cen…

母婴服务预约小程序的效果如何

二胎家庭增速明显&#xff0c;占比较大&#xff0c;成为市场各母婴品牌的目标&#xff0c;而随着行业发展及市场变化&#xff0c;线上互联网深入人们生活&#xff0c;各家母婴品牌开始向“数字化”靠拢。 目前母婴门店商家主要面临服务/产品线上曝光不足、宣传度不够或扩圈无门…

git rebase 和 git merge的区别?以及你对它们的理解?

文章目录 前言是什么分析区别后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;git操作相关 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&#xff0c;感谢…

【论文解读】FFHQ-UV:用于3D面部重建的归一化面部UV纹理数据集

【论文解读】FFHQ-UV 论文地址&#xff1a;https://arxiv.org/pdf/2211.13874.pdf 0. 摘要 我们提出了一个大规模的面部UV纹理数据集&#xff0c;其中包含超过50,000张高质量的纹理UV贴图&#xff0c;这些贴图具有均匀的照明、中性的表情和清洁的面部区域&#xff0c;这些都是…

基于深度学习的恶意软件检测

恶意软件是指恶意软件犯罪者用来感染个人计算机或整个组织的网络的软件。 它利用目标系统漏洞&#xff0c;例如可以被劫持的合法软件&#xff08;例如浏览器或 Web 应用程序插件&#xff09;中的错误。 恶意软件渗透可能会造成灾难性的后果&#xff0c;包括数据被盗、勒索或网…

sqli-labs关卡18(基于http头部报错盲注)通关思路

文章目录 前言一、靶场通关需要了解的知识点1、什么是http请求头2、为什么http头部可以进行注入 二、靶场第十八关通关思路1、判断注入点2、爆数据库名3、爆数据库表4、爆数据库列5、爆数据库关键信息 总结 前言 此文章只用于学习和反思巩固sql注入知识&#xff0c;禁止用于做…

LV.12 D18 中断处理 学习笔记

一、ARM的异常处理机制及工程代码结构 1.1异常概念 处理器在正常执行程序的过程中可能会遇到一些不正常的事件发生 这时处理器就要将当前的程序暂停下来转而去处理这个异常的事件 异常事件处理完成之后再返回到被异常打断的点继续执行程序。 1.2异常处理机制 不同的处…

【Python】解析CPP类定义代码,获取UML类图信息

参考 & 鸣谢 CppHeaderParser - 官方文档Python解析C头文件win10直接获得文件绝对路径的方法总结 目的 解析CPP头文件中的类定义&#xff0c;获取UML中的属性。用于画UML类图。如下所示格式&#xff0c;图片来源-链接 即获取&#xff0c;类名&#xff0c;成员函数&#x…

H110主板搭配魔改QNCW升级小记

最近搬家完毕&#xff0c;翻出来一块闲置已久的qncw&#xff0c;隐约记得是买的主板套装&#xff0c;现在主板早已不知踪影&#xff0c;剩下孤零零一个CPU&#xff0c;一起翻出来一个G3900T亮机CPU&#xff0c;应该是同时代的产物。 qncw百度上一搜&#xff0c;发现参数还行&am…

【ES6标准入门】JavaScript中的模块Module语法的使用细节:export命令和imprt命令详细使用,超级详细!!!

&#x1f601; 作者简介&#xff1a;一名大四的学生&#xff0c;致力学习前端开发技术 ⭐️个人主页&#xff1a;夜宵饽饽的主页 ❔ 系列专栏&#xff1a;JavaScript进阶指南 &#x1f450;学习格言&#xff1a;成功不是终点&#xff0c;失败也并非末日&#xff0c;最重要的是继…

如何将vscode和Linux远程链接:

如何将vscode和Linux远程链接&#xff1a; Remote - SSH - 远程登录Linux 安装Remote - SSH 我们下载完后&#xff0c;就会出现这些图标 这里点一下号 查看一下我们的主机名&#xff0c;并复制 输入ssh 用户名主机名 这里是要将ssh这个文件要放在主机下的哪个路径下&#xff…

Android 10.0 系统修改usb连接电脑mtp和PTP的显示名称

1.前言 在10.0的产品定制化开发中,在usb模块otg连接电脑,调整为mtp文件传输模式的时候,这时可以在电脑看到手机的内部存储 显示在电脑的盘符中,会有一个mtp名称做盘符,所以为了统一这个名称,就需要修改这个名称,接下来分析下处理的 方法来解决这个问题 2.系统修改usb连…

gRPC 四模式之 双向流RPC模式

双向流RPC模式 在双向流 RPC 模式中&#xff0c;客户端以消息流的形式发送请求到服务器端&#xff0c;服务器端也以消息流的形式进行响应。调用必须由客户端发起&#xff0c;但在此之后&#xff0c;通信完全基于 gRPC 客户端和服务器端的应用程序逻辑。 为什么有了双向流模式…