【极数系列】Flink集成DataSource读取Socket请求数据(09)

文章目录

  • 01 引言
  • 02 简介概述
  • 03 基于socket套接字读取数据
    • 3.1 从套接字读取。元素可以由分隔符分隔。
    • 3.2 windows安装netcat工具
      • (1)下载netcat工具
      • (2)安装部署
      • (3)启动socket端口监听
  • 04 源码实战demo
    • 4.1 pom.xm依赖
    • 4.2创建socket数据流作业
    • 4.3实时cmd窗口输入数据

01 引言

源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git
模块:aurora_flink
主类:FlinkSocketSourceJob(socket请求)

02 简介概述

1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。

03 基于socket套接字读取数据

3.1 从套接字读取。元素可以由分隔符分隔。

3.2 windows安装netcat工具

(1)下载netcat工具

下载地址:https://eternallybored.org/misc/netcat/

(2)安装部署

注意:不是拷贝整个文件夹,而是文件夹里面的全部文件。

将解压后的单个文件全部拷贝到C:\Windows\System32的文件夹下。

(3)启动socket端口监听

注意:该端口需要跟代码中监听的端口一致,否则获取不到数据

nc -l -p 8081

04 源码实战demo

4.1 pom.xm依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.xsy</groupId><artifactId>aurora_flink</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>11</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.18.0</flink.version><!--scala版本--><scala.binary.version>2.11</scala.binary.version><!--log4j依赖--><log4j.version>2.17.1</log4j.version></properties><!--通用依赖--><dependencies><!-- json --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!--================================集成外部依赖==========================================--><!--集成日志框架 start--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!--集成日志框架 end--></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build><!--配置Maven项目中需要使用的远程仓库--><repositories><repository><id>aliyun-repos</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></repository></repositories><!--用来配置maven插件的远程仓库--><pluginRepositories><pluginRepository><id>aliyun-plugin</id><url>https://maven.aliyun.com/nexus/content/groups/public/</url><snapshots><enabled>false</enabled></snapshots></pluginRepository></pluginRepositories></project>

4.2创建socket数据流作业

package com.aurora.source;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.ArrayList;/*** @description flink的socket请求的source应用* @author 浅夏的猫* @datetime 23:03 2024/1/28
*/
public class FlinkSocketSourceJob {private static final Logger logger = LoggerFactory.getLogger(FlinkSocketSourceJob.class);public static void main(String[] args) throws Exception {//1.创建Flink运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//2.设置Flink运行模式://STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式)env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//3.基于socket请求的source使用DataStreamSource<String> dataStreamSource = env.socketTextStream("localhost",8081);//4.输出打印dataStreamSource.print();//5.启动运行env.execute();}}

4.3实时cmd窗口输入数据

注意:先启动cmd窗口监听再启动程序,否则会报端口连接失败

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/658498.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

前端实现token的无感刷新--VUE

token刷新的方案 方案一&#xff1a;后端返回过期时间&#xff0c;前端判断token过期时间&#xff0c;去调用刷新token的接口 缺点&#xff1a;需要后端提供一个token过期时间的字段&#xff1b;使用本地时间判断&#xff0c;若本地时间被修改&#xff0c;本地时间比服务…

【解决】Unity 工程无法正常打开而崩溃问题

开发平台&#xff1a;Unity 2022.3.17f1c1 一、问题描述 访问 Unity 工程等待 Open Projet&#xff08;busy for 时间&#xff09;&#xff0c;出现崩溃、闪退等情况&#xff0c;导致无法正常进入Unity编辑页面。 二、问题分析 笔者在 URP 渲染管线下处理 Obi Fluid 流体插件 D…

Docker本地部署Firefox浏览器并结合内网穿透公网访问

文章目录 1. 部署Firefox2. 本地访问Firefox3. Linux安装Cpolar4. 配置Firefox公网地址5. 远程访问Firefox6. 固定Firefox公网地址7. 固定地址访问Firefox Firefox是一款免费开源的网页浏览器&#xff0c;由Mozilla基金会开发和维护。它是第一个成功挑战微软Internet Explorer浏…

尝试搭建域

使用window 7 作为dmz 主机 server_2008 作为 预控 前提两台主机都在同一个LAN 下&#xff08;设置一个LAN 区域&#xff09;&#xff0c;同样防火墙关闭状态 首先搞定server 2008 中的域 1. 在计算机属性中选择更改设置中将主机名修改一下&#xff1a; 2. 在server 2008 中…

echarts 柱状图数据过多时自动滚动

当我们柱状图中X轴数据太多的时候&#xff0c;会自动把柱形的宽度挤的很细&#xff0c;带来的交互非常不好&#xff0c;我们可以用dataZoom属性来解决 简易的版本&#xff0c;横向滚动。 option.dataZoom [{type: "slider",show: true,startValue: 0, //数据窗口范…

【刷题】牛客网 NC132 环形链表的约瑟夫问题

NC132 环形链表的约瑟夫问题 题目描述思路一&#xff08;链表直通版&#xff09;思路二&#xff08;数组巧解版&#xff09;思路三&#xff08;变态秒杀版&#xff09;Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读下一篇文章见&#xff01;&#xff01;&#xff…

Oracle 的闪回技术是什么

什么是闪回 Oracle 数据库闪回技术是一组独特而丰富的数据恢复解决方案&#xff0c;能够有选择性地高效撤销一个错误的影响&#xff0c;从人为错误中恢复。闪回是一种数据恢复技术&#xff0c;它使得数据库可以回到过去的某个状态&#xff0c;可以满足用户的逻辑错误的快速恢复…

SpringCloudAlibaba组件总结笔记(如Nacos、SpringCloudGateway、OpenFeign,Ribbon,RabbitMQ)

这目录 1.Ribbon负载均衡1负载均衡原理2.负载均衡策略1.负载均衡策略2.自定义负载均衡策略 3.饥饿加载 2.Nacos注册中心与Eureka的区别3.Nacos配置中心1.从微服务拉取配置2.配置热更新1.2.1.方式一1.2.2.方式二 3.配置共享1.配置共享的优先级 4.Feign1.Feign使用优化2.配置连接…

使用Promethues+Grafana监控Elasticsearch

PromethuesGrafana监控Elasticsearch 监控选用说明指标上报流程说明实现监控的步骤搭建elasticsearch-exporter服务搭建promethues和grafana服务 监控选用说明 虽然用Kibana来监控ES&#xff0c;能展示一些关键指标&#xff0c;但ES本身收集的指标并不全面&#xff0c;还需要在…

torchvision.models._utils.IntermediateLayerGetter()使用

torchvision.models._utils.IntermediateLayerGetter&#xff08;&#xff09;使用 源码如下&#xff1a; from collections import OrderedDictimport torch from torch import nnclass IntermediateLayerGetter(nn.ModuleDict):"""Module wrapper that ret…

Python采集学习笔记-读取excel数据

表格格式 方法一:使用xlrd import xlrd 1.读取Excel文件 workbook xlrd.open_workbook(plc.xlsx) 2.读取第一个表 sheet workbook.sheet_by_index(0) 3.获取表格总行数 total_rows sheet.nrows 4.创建列表,存储表格一行中每一列信息 plc_info [] for row in range(1…

【JAVA】Long类型返回到前端,精度丢失

一. 问题阐述 20位long类型的数字&#xff0c;从后端接口返回到前端后【四舍五入】 MYSQL端 &#xff08;1&#xff09;bigint (20) &#xff08;2&#xff09;具体某一条数据 JAVA端 &#xff08;1&#xff09;实体类 &#xff08;2&#xff09;服务类 &#xff08;3&…

docker可视化操作

docker可视化操作&#xff0c;ui界面操作 1.检查docker服务状态 docker status2.安装服务 部署 Portainer 1.从镜像仓库中拉取 Portainer&#xff1a;docker pull portainer/portainer2.创建数据卷&#xff1a; docker volume create portainer_db 3.启动 Portainer命令&…

86.网游逆向分析与插件开发-物品使用-物品丢弃的逆向分析与C++代码的封装

内容参考于&#xff1a;易道云信息技术研究院VIP课 上一个内容&#xff1a;物品使用的逆向分析与C代码的封装-CSDN博客 码云地址&#xff08;ui显示角色数据 分支&#xff09;&#xff1a;https://gitee.com/dye_your_fingers/sro_-ex.git 码云版本号&#xff1a;7563f86877c…

NFTScan 与 Merlin Protocol 共同推出 BRC20 Indexer Oracle,于今日正式上线!

近日&#xff0c;NFT 数据基础设施 NFTScan 与 Merlin Protocol 进行战略合作&#xff0c;联合推出了比特币网络原生资产 Indexer Oracle 服务&#xff0c;现在该服务已在 NFTScan 开发者平台上线&#xff0c;任何开发者都可以注册使用&#xff01; Merlin Protocol 是一个专用…

linux 磁盘标签类型MBR转换为GPT

[rootlocalhost /]# fdisk -l 磁盘 /dev/sda&#xff1a;299.4 GB, 299439751168 字节&#xff0c;584843264 个扇区 Units 扇区 of 1 * 512 512 bytes 扇区大小(逻辑/物理)&#xff1a;512 字节 / 512 字节 I/O 大小(最小/最佳)&#xff1a;512 字节 / 512 字节 磁盘标签类…

解读 HTTP 和 HTTPS:有何异同?

超文本传输安全协议&#xff08;HTTPS&#xff09;是建立在超文本传输协议&#xff08;HTTP&#xff09;之上的一种安全网络传输协议。在计算机网络上传输时&#xff0c;HTTPS 通过传输层安全性&#xff08;TLS&#xff09;或它的前身安全套接字层&#xff08;SSL&#xff09;为…

谷歌上架防关联VPS开到和原来一样的IP造成关联?应该怎么选?

随着Google paly的发展&#xff0c;竞争越来越激烈&#xff0c;开发者们也面临的越来越多的挑战。其中&#xff0c;如何降低关联风险是开发者们重点关注的问题。 为了防止开发者账号的滥用或欺诈&#xff0c;谷歌会通过判断账号之间是否存在关联&#xff0c;并对违规账号进行处…

【Vue】二、Vue 组件展示控制的优雅解决方案

vue项目中展示的组件&#xff0c;我平常都是通过v-show进行展示控制&#xff0c;类似这样 通常情况下&#xff0c;一个正常展示组件的流程&#xff0c;是通过前端用户点击触发函数&#xff0c;在函数中对data数据进行操作&#xff0c;从而展示不同的页面 showWork: false, sho…

首次接触共享办公室,有哪些问题需要注意?

随着互联网和创业的发展&#xff0c;越来越多的企业和个人选择共享办公空间作为他们的办公场所。共享办公空间是一种提供灵活的办公模式和配套的设施和服务的空间&#xff0c;可以帮助企业和个人节省成本和空间&#xff0c;提高效率和创新&#xff0c;拓展人脉和资源。但是&…