3.阿里云flinkselectdb-py作业

1.概述

Python API中文文档
本文介绍在阿里云实时计算flink中使用python作业,把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发;
先说结论:
在实际开发中遇到了很多问题,导致python作业基本基本无法运行。最后放弃了;

  • python作业中的标量函数的错误没有日志,永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator,定位不到具体问题;
  • python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数,提交到flink中就报错。怀疑是环境中没有flink-python.jar,自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样);
  • 如果各位遇到些问题并且有解决方案,麻烦也告知我,非常感谢;

2.目标

把阿里云sls日志中的数据准实时同步到云服务selectdb;

源表flink结果表
阿里云sls实时计算flink云服务selectdb

3.步骤

3.1.搭建环境

#**创建虚拟环境essa-flink,pyhton版本为3.11.9
conda create -n essa-flink python=3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大,指定国内的pip源
pip install apache-flink==1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

3.2.创建作业

作业代码本身很简单,逐行读取sls的日志,进行转换后保存到selectdb中。转换函数为do_active_log,在本地测试过程中遇到了第一个问题后,很轻松愉快就通过了。部署在flink中出现了其它问题;

  • 首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用,报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码,确实没有定义此类。提工单后,建设使用低版本解决;
  • 然后报错缺少flink-python,不能执行python函数。于是把flink-python上传,并在作业中引用依赖;
  • 最后报错ExceptionInChainedOperatorException: Could not forward element to next,无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃,改用jar作业;
def do_active_log(row: Row) -> Row:'''用户登录日志处理'''logging.info('执行do_active_log函数...')params = json.loads(row[2])occurred = datetime.fromtimestamp(float(row[1]))user_id = params['userId']platform = params['platform']last_active_time = occurredcreate_time = occurredid = occurred.strftime("%Y%m%d") + str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):'''创建用户登录日志结果表'''sql = '''create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with ('connector' = 'doris','fenodes' = '{}','table.identifier' = '{}','username' = '{}','password' = '{}','sink.properties.format' = 'json')'''.format(sink_table, sink_config['fenodes'], sink_config['table.identifier'], sink_config['username'], sink_config['password'])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):'''创建datastream'''times = {'start_time': '', 'stop_time': ''}sql = '''create temporary table essa_ubc(ip string,`time` string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with ('connector' = 'sls','endpoint' = '{}','accessId' = '{}','accessKey' = '{}','project' = '{}','logstore' ='essa-ubc','startTime' = '{}','stopTime' = '{}','exitAfterFinish' = 'true')'''.format(source_config['sls_endpoint'], source_config['access_id'], source_config['access_secret'],source_config['sls_project'], times['start_time'], times['stop_time'])table_env.execute_sql(sql)source_table = table_env.from_path('essa_ubc')return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ == '__main__':env = StreamExecutionEnvironment.get_execution_environment()t_env = StreamTableEnvironment.create(stream_execution_environment=env)#**加载依赖的jar包t_env.get_config().set("pipeline.jars", "依赖包.jar")#**创建sls源ds = get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据,然后使用自定义标量函数处理数据ds = ds.filter(lambda d: d[3] == 'activeLog').map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table = t_env.from_data_stream(ds)active_log_sink_table = 'user_active_log'create_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/web/64198.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

互联网视频云平台EasyDSS无人机推流直播技术如何助力野生动植物保护工作?

在当今社会,随着科技的飞速发展,无人机技术已经广泛应用于各个领域,为我们的生活带来了诸多便利。而在动植物保护工作中,无人机的应用更是为这一领域注入了新的活力。EasyDSS,作为一款集视频处理、分发、存储于一体的综…

51c视觉~YOLO~合集8

我自己的原文哦~ https://blog.51cto.com/whaosoft/12897680 1、Yolo9 1.1、YOLOv9SAM实现动态目标检测和分割 主要介绍基于YOLOv9SAM实现动态目标检测和分割 背景介绍 在本文中,我们使用YOLOv9SAM在RF100 Construction-Safety-2 数据集上实现自定义对象检测模…

Docker Container 可观测性最佳实践

Docker Container 介绍 Docker Container( Docker 容器)是一种轻量级、可移植的、自给自足的软件运行环境,它在 Docker 引擎的宿主机上运行。容器在许多方面类似于虚拟机,但它们更轻量,因为它们不需要模拟整个操作系统…

气相色谱-质谱联用分析方法中的常用部件,分流平板更换

分流平板,是气相色谱-质谱联用分析方法中的一个常用部件,它可以实现气相色谱柱流与MS检测器流的分离和分流。常见的气质联用仪分流平板有很多种,如单层T型分流平板、双层T型分流平板、螺旋分流平板等等。 操作视频http://www.spcctech.com/v…

易基因: BS+ChIP-seq揭示DNA甲基化调控非编码RNA(VIM-AS1)抑制肿瘤侵袭性|Exp Mol Med

大家好,这里是专注表观组学十余年,领跑多组学科研服务的易基因。 肝细胞癌(hepatocellular carcinoma,HCC)早期复发仍然是一个具有挑战性的领域,其中涉及的机制尚未完全被理解。尽管微血管侵犯&#xff08…

鸿蒙系统文件管理基础服务的设计背景和设计目标

有一定经验的开发者通常对文件管理相关的api应用或者底层逻辑都比较熟悉,但是关于文件管理服务的设计背景和设计目标可能了解得不那么清楚,本文旨在分享文件管理服务的设计背景及目标,方便广大开发者更好地理解鸿蒙系统文件管理服务。 1 鸿蒙…

如何配置 Java 环境变量:设置 JAVA_HOME 和 PATH

目录 一、什么是 Java 环境变量? 二、配置 Java 环境变量 1. 下载并安装 JDK 2. 配置 JAVA_HOME Windows 系统 Linux / macOS 系统 3. 配置 PATH Windows 系统 Linux / macOS 系统 4. 验证配置 三、常见问题与解决方案 1. 无法识别 java 或 javac 命令 …

Doris 数据库外部表-JDBC 外表,Oracle to Doris

简介 提供了 Doris 通过数据库访问的标准接口 (JDBC) 来访问外部表,外部表省去了繁琐的数据导入工作,让 Doris 可以具有了访问各式数据库的能力,并借助 Doris 本身的 OLAP 的能力来解决外部表的数据分析问题: 支持各种数据源接入…

分布式 IO 模块助力冲压机械臂产线实现智能控制

在当今制造业蓬勃发展的浪潮中,冲压机械臂产线的智能化控制已然成为提升生产效率、保障产品质量以及增强企业竞争力的关键所在。而分布式 IO 模块的应用,正如同为这条产线注入了一股强大的智能动力,开启了全新的高效生产篇章。 传统挑战 冲压…

CSS系列(37)-- Overscroll Behavior详解

前端技术探索系列:CSS Overscroll Behavior详解 📱 致读者:探索滚动交互的艺术 👋 前端开发者们, 今天我们将深入探讨 CSS Overscroll Behavior,这个强大的滚动行为控制特性。 基础概念 🚀 …

深度学习中的并行策略概述:4 Tensor Parallelism

深度学习中的并行策略概述:4 Tensor Parallelism 使用 PyTorch 实现 Tensor Parallelism 。首先定义了一个简单的模型 SimpleModel,它包含两个全连接层。然后,本文使用 torch.distributed.device_mesh 初始化了一个设备网格,这代…

企业销售人员培训系统|Java|SSM|VUE| 前后端分离

【技术栈】 1⃣️:架构: B/S、MVC 2⃣️:系统环境:Windowsh/Mac 3⃣️:开发环境:IDEA、JDK1.8、Maven、Mysql5.7 4⃣️:技术栈:Java、Mysql、SSM、Mybatis-Plus、VUE、jquery,html 5⃣️数据库…

vue 本地自测iframe通讯

使用 postMessage API 来实现跨窗口(跨域)的消息传递。postMessage 允许你安全地发送消息到其他窗口,包括嵌套的 iframe,而不需要担心同源策略的问题。 发送消息(父应用) 1. 父应用:发送消息给…

Linux:code:network:devinet_sysctl_forward;IN_DEV_FORWARD

文章目录 简介sysctl 设置使用,arp_process间接使用IN_DEV_RX_REDIRECTSdev_disable_lro简介 最近在看Linux里的forwarding的功能。顺便在这里总结一下。有些详细代码逻辑,如果可以记录一下,会好一点。 sysctl 设置 这个函数在查看的时候需要注意的问题:变量名起的有点简…

自然语言处理与知识图谱的融合与应用

目录 前言1. 知识图谱与自然语言处理的关系1.1 知识图谱的定义与特点1.2 自然语言处理的核心任务1.3 二者的互补性 2. NLP在知识图谱构建中的应用2.1 信息抽取2.1.1 实体识别2.1.2 关系抽取2.1.3 属性抽取 2.2 知识融合2.3 知识推理 3. NLP与知识图谱融合的实际应用3.1 智能问答…

PHP 数组

PHP 数组 PHP 是一种流行的服务器端编程语言,它提供了强大的数组处理能力。PHP 数组是一种数据结构,用于存储相同类型或不同类型的多个值。在 PHP 中,数组可以分为一维数组、二维数组和多维数组。本文将详细介绍 PHP 数组的各种操作&#xf…

CSS(三)盒子模型

目录 Content Padding Border Margin 盒子模型计算方式 使用 box-sizing 属性控制盒子模型的计算 所有的HTML元素都可以看作像下图这样一个矩形盒子: 这个模型包括了四个区域:content(内容区域)、padding(内边距…

基于NodeMCU的物联网窗帘控制系统设计

最终效果 基于NodeMCU的物联网窗帘控制系统设计 项目介绍 该项目是“物联网实验室监测控制系统设计(仿智能家居)”项目中的“家电控制设计”中的“窗帘控制”子项目,最前者还包括“物联网设计”、“环境监测设计”、“门禁系统设计计”和“小…

有没有免费提取音频的软件?音频编辑软件介绍!

出于工作和生活娱乐等原因,有时候我们需要把音频单独提取出来(比如歌曲伴奏、人声清唱等、乐器独奏等)。要提取音频必须借助音频处理软件,那么有没有免费提取音频的软件呢?下面我们将为大家介绍几款免费软件&#xff0…

WPF自定义窗口 输入验证不生效

WPF自定义窗口 输入验证不生效 WPF ValidationRule 不生效 WPF ValidationRule 不生效 解决方案&#xff1a;在WindowStyle的Template中添加AdornerDecorator标签。 <Style x:Key"WindowStyle1" TargetType"{x:Type Window}"><Setter Property&…