【实战-08】flink 消费kafka自定义序列化

目的

让从kafka消费出来的数据,直接就转换成我们的对象

mvn pom

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License athttp://www.apache.org/licenses/LICENSE-2.0Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.boke</groupId><artifactId>Flink1.7.1</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><!-- Apache Flink dependencies --><!-- These dependencies are provided, because they should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><!-- table 环境依赖【connectors 和 formats 和driver】 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/overview/		--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version>
<!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.18</version></dependency><!--idea 运行比西甲这个否则报错:【 Make sure a planner module is on the classpath】--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version><!--			<scope>provided</scope>--></dependency><!--第三方的包--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.83</version></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.boke.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

核心代码

package com.boke.kafka;

import com.alibaba.fastjson.JSONObject;

public class Student {
public String name;
public Integer age;

public Student(String name, Integer age) {this.name = name;this.age = age;
}public static Student fromJson(String s){JSONObject jsonObject = JSONObject.parseObject(s);String name = jsonObject.getString("name");Integer age = jsonObject.getInteger("age");return new Student(name,age);
}public String getName() {return name;
}public void setName(String name) {this.name = name;
}public Integer getAge() {return age;
}public void setAge(Integer age) {this.age = age;
}

}

//下面是main主函数
package com.boke.kafka;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.nio.charset.StandardCharsets;

public class kafkaSource {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource source = KafkaSource.builder()
.setBootstrapServers(“brokers”)
.setTopics(“input-topic”)
.setGroupId(“my-group”)
.setStartingOffsets(OffsetsInitializer.earliest())//【无论如何都从最早开始消费】
// .setStartingOffsets(OffsetsInitializer.latest())//【无论如何都从最新开始消费】
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))//【groupid 存在offset 则从offset消费,否则从最早开始消费】
// .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))//【groupid 存在offset 则从offset消费,否则从最新开始消费】

// .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema())))
// .setDeserializer(KafkaRecordDeserializationSchema.of(new SimpleStringSchema());
.setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserializationSchema()))
// .setDeserializer(KafkaRecordDeserializationSchema.valueOnly())
.build();

    env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
}

}
class MyKafkaDeserializationSchema implements KafkaDeserializationSchema{

@Override
public boolean isEndOfStream(Student nextElement) {return false;
}

//Deserializes the Kafka record.
//Params:
//record – Kafka record to be deserialized.
//Returns:
//The deserialized message as an object (null if the message cannot be deserialized).
@Override
public Student deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
/*
*自定义kafka反序列化
*如果数据异常,可以直接返回nulll即可,源码中有一句英文:null if the message cannot be deserialized
* */
String topic = record.topic();
long KafkaTimeStamp = record.timestamp();
int partitionNum = record.partition();
String value = new String(record.value(), StandardCharsets.UTF_8);
return Student.fromJson(value);
}

@Override
public TypeInformation<Student> getProducedType() {return TypeInformation.of(new TypeHint<Student>() {});
}

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/132648.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

mysql 问题解答 2

3 存储 3.1 存储引擎 3、InnoDB 的四大特性? InnoDB 是 MySQL 数据库中最常用的存储引擎之一,它的四大特性通常指的是: ACID 兼容性: 原子性 (Atomicity): 保证事务内的操作要么全部成功,要么全部失败,不会出现中间状态。例如,银行转账操作,从一个账户向另一个账户转…

前端uniapp提交表单调用接口方法最新

目录 源码1源码2最后 源码1 <template><view class"my-add-bank-card"><!-- name"bank_name" form表单提交的input里面一定要加name绑定要传的参数 name"bank_name" type"text" v-model"address.bank_name"…

flink的起源、概念、特点、应用

flink的起源 Flink的起源可以追溯到2010年&#xff0c;当时它作为一个研究项目开始。该项目最初由德国柏林工业大学&#xff08;Berlin Institute of Technology&#xff09;的一群研究人员发起&#xff0c;包括Matei Zaharia、Kostas Tzoumas和Stephan Ewen等。 项目最初被称为…

【系统集成项目管理工程师】——6.习题一

1.&#xff08;&#xff09; 通过独立的、对网络行为和主机操作提供全面与忠实的记录&#xff0c;方便用户分析与审查事故原因&#xff0c;很像飞机上的黑匣子 A、防火墙&#xff1b;B、扫描器&#xff1b;C、防毒软件&#xff1b;D、安全审计系统 答案&#xff1a;D 2.《GB/…

【数据结构】排序算法复杂度 及 稳定性分析 【图文详解】

排序算法总结 前言[ 一 ] 小数据基本排序算法&#xff08;1&#xff09;冒泡排序&#xff08;2&#xff09;直接插入排序 [ 二 ] &#xff08;由基本排序衍生的用作&#xff09;处理大数据处理排序&#xff08;1&#xff09;堆排序&#xff08;2&#xff09;希尔排序 [ 三 ] 大…

unity中移动方案--物理渲染分层

一、三种基本移动方案 unity中的移动分为Transform和Rigidbody以及CharacterController&#xff0c;其中CharacterController功能完善&#xff0c;已经可以避免了穿墙&#xff0c;并实现了贴墙走等情况&#xff0c;需要结合性能考虑选择不同的方式。 1.使用transform,直接修改…

鳄鱼指标的3颜色线都代表什么?澳福官网一段话明白了

投资者一直在使用鳄鱼指标进行交易&#xff0c;但是对指标上面的3种颜色的K线都代表什么不明白&#xff1f;直到看到澳福官网一段话才明白&#xff0c;原来这么简单&#xff01; 鳄鱼指标&#xff0c;这一工具是由三条移动平均线组合而成。具体来说&#xff0c;蓝线&#xff0…

mybatis动态表名

1.基于mybatis官方文档 Configuration public class MybatisPlusConfig {Beanpublic MybatisPlusInterceptor mybatisPlusInterceptor() {MybatisPlusInterceptor interceptor new MybatisPlusInterceptor();DynamicTableNameInnerInterceptor dynamicTableNameInnerIntercep…

(算法题)最多约数问题

最多约数问题 正整数x的约数是能整除x的正整数。正整数x 的约数个数记为div(x)。例如&#xff0c;1&#xff0c;2&#xff0c;5&#xff0c;10 都是正整数10 的约数&#xff0c;且div(10)4。设a 和b 是2 个正整数&#xff0c;a≤b&#xff0c;找出a和b之间约数个数最多的数x及其…

Spring Boot 3系列之-启动类详解

Spring Boot是一个功能强大、灵活且易于使用的框架&#xff0c;它极大地简化了Spring应用程序的开发和部署流程&#xff0c;使得开发人员能够更专注于业务逻辑的实现。在我们的Spring Boot 3系列之一&#xff08;初始化项目&#xff09;文章中&#xff0c;我们使用了Spring官方…

高效开发之:封装抛异常工具类

一、首先自定义个错误码枚举类 public enum ErrorCode {SUCCESS(0, "ok"),PARAMS_ERROR(40000, "请求参数错误"),NOT_LOGIN_ERROR(40100, "未登录"),NO_AUTH_ERROR(40101, "无权限"),NOT_FOUND_ERROR(40400, "请求数据不存在&qu…

MVC、MVP、MVI、MVVM、MVVM-C和VIPER等

以下是最重要的架构模式列表&#xff1a; 1、MVC(Model-View-Controller)&#xff1a; 它是最早被采用的设计模式之一。其主要目标是将应用程序的数据、用户界面和控制逻辑分离成三个相互关联的组件。 在这里&#xff0c;模型Model管理数据和逻辑&#xff0c;视图View显示信息…

桶装水订水系统水厂送水小程序开发;

桶装水小程序正式上线&#xff0c;支持多种商品展示形式&#xff0c;会员卡、积分、分销等功能&#xff1b; 开发订水送水小程序系统&#xff0c;基于用户、员工、商品、订单、配送站和售后管理模块&#xff0c;对每个模块进行统计分析&#xff0c;简化了分配过程&#xff0c;提…

C语言-------------#与##运算符使用分析

#运算符 1、#运算符用于在预编译期将宏参数转换为字符串 #include <stdio.h>#define message(X) #X#define CALL(f,p) (printf("This is fuction %s\n",#f),f(p))int square (int n) {return n*n; }int main() {printf("%s\r\n",message(hello…

【教3妹学编程-算法题】最大单词长度乘积

3妹&#xff1a;哇&#xff0c;今天好冷啊&#xff0c; 不想上班。 2哥&#xff1a;今天气温比昨天低8度&#xff0c;3妹要空厚一点啊。 3妹 : 嗯&#xff0c; 赶紧把我的羽绒服找出来穿上&#xff01; 2哥&#xff1a;哈哈&#xff0c;那倒还不至于&#xff0c; 不过气温骤降&…

【简朴PlainApp】通过浏览器就能管理手机文件的开源利器

简朴PlainApp 简朴是一个开源应用&#xff0c;允许您通过网络浏览器管理您的手机。您可以通过安全、易于使用的网络界面从桌面访问文件、视频、音乐、联系人、短信、通话记录等等&#xff01; 这对于手机上文件较多的朋友来说&#xff0c;从任意电脑上管理手机文件的需求还是挺…

详解IPD需求分析工具$APPEALS

够让企业生存下去的是客户&#xff0c;所以&#xff0c;众多企业提出要“以客户为中心”&#xff0c;那如何做到以客户为中心&#xff1f;IPD中给出的答案是需求管理。 需求管理流程&#xff0c;是IPD&#xff08;集成管理开发&#xff09;体系中的四大支撑流程之一&#xff0…

【源码解析】Spring Bean定义常见错误

案例1 隐式扫描不到Bean的定义 RestController public class HelloWorldController {RequestMapping(path "/hiii",method RequestMethod.GET)public String hi() {return "hi hellowrd";}}SpringBootApplication RestController public class Applicati…

android 升级后包依赖更新

1、将Android Studio 升级到3.4&#xff08;3.2以上即可&#xff09;&#xff0c;并且将gradle升级到3.4.1&#xff08;3.2.0以上即可&#xff09;&#xff0c;设置targetSdkVersion 28&#xff0c;Project的build.gradle中classpath ‘com.android.tools.build:gradle:3.4.1’…

基于AOSP源码Android-10.0.0_r41分支编译,framework开发,修改系统默认字体大小

文章目录 基于AOSP源码Android-10.0.0_r41分支编译&#xff0c;framework开发&#xff0c;修改系统默认字体大小 基于AOSP源码Android-10.0.0_r41分支编译&#xff0c;framework开发&#xff0c;修改系统默认字体大小 主要修改一个地方就行 代码源码路径 frameworks/base/co…