Flink解析kafka canal未压平数据为message报错

canal使用非flatmessage方式获取mysql bin log日志发至kafka比直接发送json效率要高很多,数据发到kafka后需要实时解析为json,这里可以使用strom或者flink,公司本来就是使用strom解析,但是在吞吐量上有瓶颈,优化空间不大。所以试一试通过flink来做。

非flatmessage需要使用特定的反序列化方式来处理为Message对象,所以这里需要自定义一个类

 1 /**
 2  * 反序列化canal binlog
 3  *
 4  * @author   @ 2019-02-20
 5  * @version 1.0.0
 6  */
 7 @PublicEvolving
 8 public class MessageDeserializationSchema implements KeyedDeserializationSchema<Message> {
 9 
10     private static final long serialVersionUID = -678988040385271953L;
11     private MessageDeserializer mesDesc;
12 
13     @Override
14     public Message deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
15         try {
16             if (mesDesc == null) {
17                 mesDesc = new MessageDeserializer();
18             }
19             Message result = mesDesc.deserialize(topic, message);
20             //result.setMetaData(topic, partition, offset);
21             return result;
22         } catch (Exception e) {
23             System.out.println(e);
24         }
25         return null;
26     }
27 
28     @Override
29     public boolean isEndOfStream(Message nextElement) {
30         return false;
31     }
32 
33     @Override
34     public TypeInformation<Message> getProducedType() {
35         return getForClass(Message.class);
36     }
37 }

 

然后就可以获取到DataStream[Message],但是在做算子操作的时候就报错了,意思是不支持kryo序列化

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
props_ (com.alibaba.otter.canal.protocol.CanalEntry$Header)
header_ (com.alibaba.otter.canal.protocol.CanalEntry$Entry)
entries (com.alibaba.otter.canal.protocol.Message)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:577)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:89)at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:154)at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationExceptionat java.util.Collections$UnmodifiableCollection.add(Collections.java:1055)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)... 29 more

 

参考官方文档,需要注册类的序列化方式:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/custom_serializers.html

  //message 不支持kryo序列化 不然在map flatmap的时候报错

  env.getConfig.addDefaultKryoSerializer(classOf[Message], classOf[StringSerializer])

如果在算子之间会有其他对象传输的话,也同样需要注册。最后通过测试,flink解析的量大概在单个solt 1W+/s 左右。

 

转载于:https://www.cnblogs.com/createweb/p/10580281.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/264466.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

初识python之 APP store排行榜 蜘蛛抓取(一)

直接上干货&#xff01;&#xff01; 采用python 2.7.5-windows 打开 http://www.apple.com/cn/itunes/charts/free-apps/ 如上图可以见采用的是utf-8 编码 经过一番思想斗争 编码如下 &#xff08;拍砖别打脸&#xff09; #codingutf-8 import urllib2 import urllib …

什么叫做matlab的信号频谱分析,基于MATLAB的信号频谱分析及实现.doc

基于MATLAB的信号频谱分析及实现数 字 信 号 处 理课程设计题目&#xff1a; 基于MATLAB的信号频谱分析及实现学院&#xff1a; 信息工程专业&#xff1a; 通信工程班级&#xff1a; 1001学号&#xff1a; 2010013448&#xff0c; 2010013466姓名&#xff1a; 常珍珍 &#xff…

PP团队圣经巨著《Application Architecture Guide2.0》14章-数据访问层

第十四章 数据访问层指导 概览 这一章主要描述设计数据访问层时要注意的主要原则。它们覆盖了设计数据访问层遇到的通常问题及错误。下面的图表展示了数据层怎样嵌入一个通用的应用架构。 (cnblog我的图片一直上传不了&#xff0c;报Remote server Error,只能使用网络图片了) 数…

20个Flutter实例视频教程-第03节: 不规则底部工具栏制作-1

第03节: 不规则底部工具栏制作-1 博客地址&#xff1a; https://jspang.com/post/flutterDemo.html#toc-973 视频地址&#xff1a; https://www.bilibili.com/video/av39709290?p3 视频里面的评论&#xff1a;动态组件就是可以setState的组件 flutter create demo02的项目 这里…

javascript保留两位

//保留两位小数 //功能&#xff1a;将浮点数四舍五入&#xff0c;取小数点后2位 function toDecimal(x) { var f parseFloat(x); if (isNaN(f)) { return; } f Math.round(x*100)/100; return f; } //制保留2位小数&#xff0c;如&#xff1…

mysql内置的变量,MySQL服务器模式及相关内置变量

本章我们主要包含两部分的内容&#xff1a;MySQL服务器模式MySQL内置变量1. MySQL服务器模式不同的MySQL客户端可以通过不同的模式操作MySQL Server。DBA可以设置一个全局模式&#xff0c;而每个应用程序可以根据需要为相应的会话设置不同的模式。MySQL操作模式会影响到SQL的语…

python模块之smtplib: 用python发送SSL/TLS安全邮件

转载请注明原文出自 http://blog.csdn.net/zhaoweikid/ python的smtplib提供了一种很方便的途径发送电子邮件。它对smtp协议进行了简单的封装。smtp协议的基本命令包括&#xff1a; HELO 向服务器标识用户身份 MAIL 初始化邮件传输 mail from: RCPT 标识单个的邮件…

B-树

6.7 B-树★4◎3 1&#xff0e;B-树的定义  B-树是一种平衡的多路查找树&#xff0c;它在文件系统中很有用。  定义&#xff1a;一棵m阶的B-树&#xff0c;或者为空树&#xff0c;或为满足下列特性的m叉树&#xff1a;  &#xff08;1&#xff09;树中每个结点至多有m棵子…

dom4j和jaxp解析工具的

dom4j解析中的几个对象node --branch --document --element --commment --attribute --text branch --document --element jaxp解析中的几个对象 node --document --element --commment --attr --text dom4j解析 中全部对象都是node对象的子节点&#xff0c;也就是说它把说有…

mysql数据库交叉连接,MySQL数据库联合查询与连接查询

联合查询基本概念联合查询是可合并多个相似的选择查询的结果集。等同于将一个表追加到另一个表&#xff0c;从而实现将两个表的查询组合在一起&#xff0c;使用为此为UNINO或UNION ALL联合查询&#xff1a;将多个查询的结果合并到一起(纵向合并)&#xff1a;字段数不变&#xf…

C#-委托和事件

要为类构造一个事件&#xff0c;必须用 event 来声明一个 delegate 型的字段&#xff0c;如&#xff1a; puclic calss Test{ public delegate EventHandler(object sender, EventArgs e); //声明为delegate 型的事件; } 然后要指定一个事件的名称&#xff0c;并写出处…

性能测试四十五:性能测试策略

1、项目具体需求,及业务场景&#xff1a;关注真实用户会是怎样的一个业务场景&#xff0c;确定用户的用户习惯。 2、指标&#xff1a;响应时间在多少以内&#xff0c;并发数多少&#xff0c;tps多少&#xff0c;总tps多少&#xff0c;稳定性交易总量多少&#xff0c;事务成功率…

原创:MD5 32位加密软件

网站后台数据库切勿使用明文保存密码&#xff0c;否则一旦黑客拿下你的Webshell&#xff0c;后果不堪设想。网站后台密码加密大多数采用的就是MD5算法加密。今天给大家送一个本人用c#简单编写的MD5 32位加密程序&#xff0c;虽然没有什么技术含量&#xff0c;但保证没有后门。 …

(教学思路 c#之类一)声明类和对象、定义类成员及其引用

上一节&#xff08;教学思路 c#之面向对象二&#xff09;初步理解面向对象的基本概念中&#xff0c;我没有提到任何的代码&#xff0c;只是用语言和实例来说明什么是类和对象以及面向对象的特性等基本概念&#xff0c;类是c#程序语言的重要核心&#xff0c;也是构建应用程序最主…

matlab数值过小为0,MATLAB数值计算——0

MATLAB数值计算MATLAB中文论坛基础板块常见问题归纳(出处: MATLAB中文论坛)登录http:www.mathworks.com/moler 获取NCM文件&#xff0c;注册账户&#xff0c;下载Toolbox格式&#xff0c;点击安装——在MATLAB中输入ncmgui出现图案即为success&#xff01;(pathtool添加默认路径…

【springboot】之自动配置原理

使用springboot开发web应用是很方便&#xff0c;只需要引入相对应的GAV就可以使用对应的功能&#xff0c;springboot默认会帮我们配置好一些常用配置。那么springboot是怎么做到的呢?这篇文章将一步步跟踪源码&#xff0c;查看springboot到底是如何帮我们做自动化配置。 sprin…

阴雨连绵潮湿加剧 车辆防潮提升保值

近日来&#xff0c;申城阴雨绵绵&#xff0c;不但增加了行车的难度&#xff0c;也使爱车潮气严重&#xff0c;开上一会就会发现前车窗布满水汽&#xff0c;需要开空调吹干才能保证良好视野。此外潮气也容易对人体和车辆本身造成影响&#xff0c;首当其冲的是车内电器&#xff0…

php nsdata,iOS开发之数据存储之NSData

1、概述使用archiveRootObject:toFile:方法可以将一个对象直接写入到一个文件中&#xff0c;但有时候可能想将多个对象写入到同一个文件中&#xff0c;那么就要使用NSData来进行归档对象。NSData可以为一些数据提供临时存储空间&#xff0c;以便随后写入文件&#xff0c;或者存…

php:兄弟连之面向对象版图形计算器1

曾经看细说PHP的时候就想做这个&#xff0c;可是一直没什么时间&#xff0c;这次总算忙里偷闲搞了代码量比較多的project。 首先&#xff0c;文档结构&#xff0c;都在一个文件夹下就好了&#xff0c;我的就例如以下。 一開始&#xff0c;进入index.php文件。 <html><…

asp.net控件开发基础(19)

上两篇讨论了基本数据绑定控件的实现步骤&#xff0c;基本上我们按着步骤来就可以做出简单的数据绑定控件了。过年前在看DataGrid的实现&#xff0c;本来想写这个的&#xff0c;但2.0出了GridView了&#xff0c;再说表格控件实现比较复杂&#xff0c;所以先放着。我们一起打开M…