flink入门

1.安装flink,启动flink

文档地址:Apache Flink 1.3-SNAPSHOT 中文文档: Apache Flink 中文文档

代码:GitHub - apache/flink: Apache Flink

2. 打开端口  端口号, 启动jar

### 切换到flink 目录bin下
[root@localhost ~]# cd /home/flink/flink-1.14.4/bin/
### 运行
[root@localhost bin]# ./start-cluster.sh###开启端口9000
nc -l  9000
#### 运行jar./bin/flink  run  /home/flink/flink-1.14.4/examples/streaming/SocketWindowWordCount.jar --port 9000

3.测试jar,输入字符

注:1. 部署启动遇到的jar缺失

注释:jar可以下载源码查看,方法如图所示,也可以根据错误信息搜索对应的包

附: mysql+mq+mybatis +spring 需要的包

2,.代码

package com.javaland.flink.mq;import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;public class Mq2Flink {/*** 实时监控mq数据,插入到mysql数据库*/public static void mq2mysql() throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRestartStrategy(RestartStrategies.noRestart());RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder().setHost("localhost").setPort(5672).setUserName("test").setPassword("test").setVirtualHost("/").build();final DataStream<String> stream = env.addSource(new RMQSource<String>( connectionConfig, "task_queue", true, new SimpleStringSchema())).setParallelism(1);stream.addSink(new SinkToMySQL());stream.print();env.execute("mq数据插入到mysql");}}
package com.javaland.flink.mq;import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import com.javaland.flink.mapper.MessageMapper;
import com.javaland.flink.po.MessagePO;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.ibatis.datasource.pooled.PooledDataSource;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import java.util.List;public class SinkToMySQL extends RichSinkFunction<String> {static MybatisSqlSessionFactoryBean sqlSessionFactory;static SqlSessionFactory sessionFactory;static SqlSession sqlSession;static {sqlSessionFactory = new MybatisSqlSessionFactoryBean();// 配置多数据源PooledDataSource pooledDataSource = new PooledDataSource();pooledDataSource.setDriver("com.mysql.cj.jdbc.Driver");pooledDataSource.setUsername("root");pooledDataSource.setPassword("root");pooledDataSource.setUrl("jdbc:mysql://localhost:3306/test?serverTimezone=Asia/Shanghai");sqlSessionFactory.setDataSource(pooledDataSource);try {sqlSessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:mapper/*.xml"));sqlSessionFactory.setTypeAliasesPackage("com.javaland.flink.po");sessionFactory = sqlSessionFactory.getObject();} catch (Exception e) {throw new RuntimeException(e);}}/*** @param parameters* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);sqlSession = sessionFactory.openSession();}/*** @throws Exception*/@Overridepublic void close() throws Exception {super.close();}/*** @param value* @param context*/@Overridepublic void invoke(String value, Context context) {MessageMapper messageMapper = sqlSession.getMapper(MessageMapper.class);MessagePO messagePO=new MessagePO();messagePO.setUsername(value);messageMapper.insert(messagePO);List<MessagePO> all = messageMapper.selectList(null);if(all!=null){for (int i = all.size() - 1; i >= 0; i--) {System.out.println("查询的message:"+all.get(i));}}}}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.javaland</groupId><artifactId>javaland</artifactId><version>0.0.1</version></parent><packaging>jar</packaging><groupId>org.javaland</groupId><artifactId>javaland-flink</artifactId><properties><flink.version>1.14.4</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>1.11.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-jdbc_2.11</artifactId><version>1.10.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-rabbitmq_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-force-shading</artifactId><version>14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.11.1</version></dependency><dependency><groupId>commons-logging</groupId><artifactId>commons-logging</artifactId><version>1.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.31</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.3.2</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.18</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-jar-plugin</artifactId><version>2.4</version><executions><execution><id>mq2flink</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Mq2Flink</classifier><archive><manifestEntries><program-class>com.javaland.flink.Mq2Flink</program-class></manifestEntries></archive></configuration></execution><execution><id>Flink2Mq</id><phase>package</phase><goals><goal>jar</goal></goals><configuration><classifier>Flink2Mq</classifier><archive><manifestEntries><program-class>com.javaland.flink.Flink2Mq</program-class></manifestEntries></archive></configuration></execution></executions></plugin></plugins></build></project>

3. 最后打包的好处就是可以部署多个job

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/148701.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

参考文献格式

目录 期刊会议预印本&#xff08;如arxiv&#xff09; 期刊 找不到页码可以在文献中查看bibtex格式&#xff0c;其中有 外文期刊可在web of science中查找卷号、期号和所在页数&#xff1a; [1] ZHANG F, HU Z Q, FU Y K, et al. A New Identification Method for Surface …

python 词云 wordcloud使用paddle模式 庆余年人物分析--不是特别准,可以看着玩一玩

看完之后你也可以生成自己的词云 提供一个过滤人名的英中词性分析对应&#xff0c;更多的可以去我的码云上看看 https://gitee.com/billion_lines_of_code/learn-wordcloud # 只过滤人名 En2Cn_name {nr: 名词-人名,nr1: 名词-汉语姓氏,nr2: 名词-汉语名字,nrf: 名词-音译人…

【0到1学习Unity脚本编程】第一人称视角的角色控制器

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;【0…

图像滤波处理

滤波处理是图像处理中常用的技术之一&#xff0c;用于去除图像中的噪声、平滑图像、边缘检测等。以下是几种常见的滤波处理方法&#xff1a; 1. 均值滤波 (Mean Filtering) 原理&#xff1a; 均值滤波使用一个固定大小的滤波器&#xff0c;在图像上滑动并取周围像素的平均值来…

ClickHouse SQL 查询优化

1 单表查询 1.1 Prewhere替代where Prewhere和where语句的作用相同&#xff0c;用来过滤数据。不同之处在于prewhere只支持 *MergeTree 族系列引擎的表&#xff0c;首先会读取指定的列数据&#xff0c;来判断数据过滤&#xff0c;等待数据过滤之后再读取select 声明的列字段来补…

自动驾驶学习笔记(九)——车辆控制

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo Beta宣讲和线下沙龙》免费报名—>传送门 文章目录 前言 控制器设计 比例积分微分控制 线性…

在 Linux 环境下的简单调试技巧

在 Linux 环境下的简单调试技巧 GDB&#xff08;GNU调试器&#xff09;是一个强大的命令行调试工具&#xff0c;用于调试C、C等程序。下面是使用GDB的一些基本步骤&#xff1a; 编译程序时包含调试信息 确保在编译程序时使用 -g 选项来包含调试信息。例如&#xff1a; gcc -g …

Kotlin 知识体系

Kotlin 知识体系 1、Kotlin 文档2、Kotlin 基础3、桌面应用程序4、Android 与 iOS 应用程序 1、Kotlin 文档 Kotlin 是一门现代但已成熟的编程语言&#xff0c;旨在让开发人员更幸福快乐。 它简洁、安全、可与 Java 及其他语言互操作&#xff0c;并提供了多种方式在多个平台间复…

『亚马逊云科技产品测评』活动征文|借助AWS EC2搭建服务器群组运维系统Zabbix+spug

授权声明&#xff1a;本篇文章授权活动官方亚马逊云科技文章转发、改写权&#xff0c;包括不限于在 Developer Centre, 知乎&#xff0c;自媒体平台&#xff0c;第三方开发者媒体等亚马逊云科技官方渠道。 本文基于以下软硬件工具&#xff1a; aws ec2 frp-0.52.3 zabbix 6…

LRU最近最少使用算法

LRU(LeastRecentlyUsed)“最近最少使用”算法&#xff1a; 1.当缓存空间已满耗用时&#xff0c;淘汰最近最少使用数据的缓存对象以释放更多的缓存空间(用于历史缓存对象的维护)。 2. 哈希表:快速查找缓存对象&#xff1b;双向链表:维护 历史数据所在的节点顺序。 步骤&#xff…

掌握深度学习利器——TensorFlow 2.x实战应用与进阶

掌握深度学习利器——TensorFlow 2.x实战应用与进阶 摘要&#xff1a;随着人工智能技术的飞速发展&#xff0c;深度学习已成为当下最热门的领域之一。作为深度学习领域的重要工具&#xff0c;TensorFlow 2.x 备受关注。本文将通过介绍TensorFlow 2.x的基本概念和特性&#xff…

在 Linux 上搭建 Java Web 项目环境(最简单的进行搭建)

要在 Linux 上安装的程序有 1.JDK (要想运行 java 程序 JDK 是必不可少的) 2.Tomcat &#xff08;HTTP 服务器&#xff0c;是管理 Web 项目的常用工具&#xff09; 3. mysql &#xff08;数据库&#xff09; 一.安装 JDK 博主使用的 Linux 发行版是 centos &#xff0c;cen…

母婴服务预约小程序的效果如何

二胎家庭增速明显&#xff0c;占比较大&#xff0c;成为市场各母婴品牌的目标&#xff0c;而随着行业发展及市场变化&#xff0c;线上互联网深入人们生活&#xff0c;各家母婴品牌开始向“数字化”靠拢。 目前母婴门店商家主要面临服务/产品线上曝光不足、宣传度不够或扩圈无门…

【Kingbase FlySync】命令模式:安装部署同步软件,实现KES到KES实现同步

【Kingbase FlySync】命令模式:安装部署同步软件&#xff0c;实现KES到KES实现同步迁移 概述准备环境目标资源1.测试虚拟机下载地址包含node1,node22.同步工具下载地址3.临时授权下载地址4.ruby工具下载地址5.EXAMv0.11.sql下载地址 实操&#xff1a;同步软件安装部署1.node1准…

git rebase 和 git merge的区别?以及你对它们的理解?

文章目录 前言是什么分析区别后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;git操作相关 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌握&#xff0c;正在不断努力填补技术短板。(如果出现错误&#xff0c;感谢…

【论文解读】FFHQ-UV:用于3D面部重建的归一化面部UV纹理数据集

【论文解读】FFHQ-UV 论文地址&#xff1a;https://arxiv.org/pdf/2211.13874.pdf 0. 摘要 我们提出了一个大规模的面部UV纹理数据集&#xff0c;其中包含超过50,000张高质量的纹理UV贴图&#xff0c;这些贴图具有均匀的照明、中性的表情和清洁的面部区域&#xff0c;这些都是…

mybatis动态sql语法

<?xml version"1.0" encoding"UTF-8" ?> <!DOCTYPE mapperPUBLIC "-//mybatis.org//DTD Mapper 3.0//EN""http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace"com.qvfan.mybatistest.mapper.Emp…

基于深度学习的恶意软件检测

恶意软件是指恶意软件犯罪者用来感染个人计算机或整个组织的网络的软件。 它利用目标系统漏洞&#xff0c;例如可以被劫持的合法软件&#xff08;例如浏览器或 Web 应用程序插件&#xff09;中的错误。 恶意软件渗透可能会造成灾难性的后果&#xff0c;包括数据被盗、勒索或网…

sqli-labs关卡18(基于http头部报错盲注)通关思路

文章目录 前言一、靶场通关需要了解的知识点1、什么是http请求头2、为什么http头部可以进行注入 二、靶场第十八关通关思路1、判断注入点2、爆数据库名3、爆数据库表4、爆数据库列5、爆数据库关键信息 总结 前言 此文章只用于学习和反思巩固sql注入知识&#xff0c;禁止用于做…

LV.12 D18 中断处理 学习笔记

一、ARM的异常处理机制及工程代码结构 1.1异常概念 处理器在正常执行程序的过程中可能会遇到一些不正常的事件发生 这时处理器就要将当前的程序暂停下来转而去处理这个异常的事件 异常事件处理完成之后再返回到被异常打断的点继续执行程序。 1.2异常处理机制 不同的处…