Google Cloud dataflow streaming job简介

简单介绍

首先 gcp 的dataflow 是1个ETL 组件, 它是基于Apache beam的

Apache beam 是1个较新的开源ETL 框架。
对于我们常用的ETL tool Spring batch 有下面的区别

  1. spring batch 更偏向batch (后台处理)的ETL, 而apache beam 同时支持batch 和 streaming 的ETL, 对streaming 的ETL 有更好的支持
  2. spring batch 基于java,apache beam SDK 支持 java, python 和 GO
  3. spring batch 更加轻量级, 但是依赖于1个关系数据存储ETL job(配置, 历史记录)数据. 而且不需要开发人员去设置. 而 Apache Beam本身并没有内置的作业执行历史记录功能 , 这些数据需要自己去记录处理(在runner上)
  4. spring batch 的运行环境需要自己搭建, 而apache beam 这是1个SDK,它只定义 ETL pipeline的流程, 它需要额外的runner去执行

apache beam 暂时支持下面的runner
GCP dataflow
Apache Flink
Apache Spark
AWS data analysic
Java direct runner(调试用)

所以对于日志记录更多地去交给runner去实现。

所以讲, dataflow 只是1个runner , 核心还是apache beam SDK





什么是streaming

在计算领域中,“streaming”(流式处理)是一种数据处理模式,它允许实时处理连续流入的数据,而不是一次性处理静态数据集。
传统的批处理模式是将数据分成固定大小的块(batch),然后对每个批次进行处理。这种方式适用于静态数据集,但对于实时数据流,它可能无法满足实时性和低延迟的要求。
流式处理模式通过连续接收和处理数据流,实现了实时性和低延迟。数据流可以是连续的数据记录、事件流、传感器数据等。流式处理系统会持续地接收数据流,并立即对其进行处理和分析,以产生实时的结果。
流式处理通常具有以下特点:
连续性:数据流是连续不断的,没有明确的开始和结束。处理系统需要实时接收和处理数据流,而不是等待所有数据到达后再进行处理。
实时性:流式处理系统需要尽可能快地处理数据,并产生实时的结果。这对于需要实时决策、监控和反馈的应用程序非常重要。
有限状态:流式处理系统通常使用有限的内存和状态来处理数据流。它们需要在有限的资源下有效地处理无限的数据流。
流式处理可以应用于各种场景,如实时分析、实时监控、实时推荐、欺诈检测等。流式处理框架(如Apache Flink、Apache Kafka Streams、Apache Beam等)提供了方便的工具和API来开发和部署流式处理应用程序。





1个例子

 public void process() {log.info("processing3...");DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs("").as(DataflowPipelineOptions.class);options.setJobName("dataflow-exam3");options.setProject(this.projectId);options.setRegion("europe-west1");options.setTempLocation("gs://jason-hsbc-dataflow/tmp");options.setSubnetwork("regions/europe-west1/subnetworks/subnet-1");options.setNumWorkers(1);options.setStreaming(true);options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.INFO);options.setNumberOfWorkerHarnessThreads(2);//options.setGcpCredential(new File(...));options.setRunner(DataflowRunner.class);log.info(getCurrentAccountName());Pipeline pipeline = Pipeline.create(options);/*** * The effect of using the @UnknownKeyFor annotation is to tell the Apache Beam framework that the PCollection does not need to be grouped or associated based on a specific key.* As a result, Apache Beam can perform more efficient parallel computations and optimizations for operations that do not require key associations.** * When the @NonNull annotation is applied to a PCollection element type, it indicates that elements in the PCollection are not allowed to be null.* This means that when processing data streams, the Apache Beam framework checks elements for non-nullability and issues warnings or errors at compile time to avoid potential null pointer exceptions.the @Initialized annotation informs the compiler that the variable has been properly initialized by marking it at variable declaration time to help detect possible null pointer exceptionsand improve the reliability and readability of the code. However, it should be noted that the @Initialized annotation is only an auxiliary tool, and correct logic and design still need to be ensured in the actual programming process.*/PCollection<@UnknownKeyFor @NonNull @Initialized PubsubMessage> message = pipeline.apply("Read Pub/Sub Messages", PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription("projects/jason-hsbc/subscriptions/SubscriptionA1"));PCollection<KV<String, String>> combinedMsg = message.apply("Extract", ParDo.of(new ExtractMessageAttributeFn())).apply("appying windowing", Window.into(FixedWindows.of(Duration.standardMinutes(1)))).apply("Group by fileName", GroupByKey.create()).apply("Combine Message",ParDo.of(new CombinedMessagesFn()));combinedMsg.apply("Write to GCS", ParDo.of(new WriteToGCSFn(this.bucketName,this.projectId)));//yupipeline.run().waitUntilFinish();pipeline.run();log.info("processing3... end!");}

上面定义了1个dataflow pipeline, 它会从pubsub 里读取消息, 并把数据作为文件存储到GCS中

我们知道pubsub 是1个streaming 传输的工具, 如果这个job 执行一次就接受, 其实意义不大。

假如这个dataflow pipeling 定义的是1个batch的job而非streaming, 我们需要引入其他组件 才能持续监控pubsub的消息

例如
data sender -> pubsub topic -> pubusb trigger -> cloud functioin -> dataflow job

或者
datasender -> pubsub topic -> push 类型的subscription -> http 服务(cloud run/GKE) -> dataflow job

但是上面代码例子, 它有一行
options.setStreaming(true);
显式制定这个job 是streaming的

但是其实dataflow 本身也会根据一些规则去决定这个job是否为streaming(例如数据源是否为pubsub等)
但是用代码指定会更加安心.





waitUntilFinish() 的作用

代码的最后 有两个方法触发dataflow job方法
分别是 pipeline.run() 和 pipeline.run().waitUntilFinish()

前者会直接执行完成, 不会等待job的状态去执行下一行代码
而后者会等待job执行完成并返回状态结果。

但是, 如果是1个streaming的job的话, waitUntilFinish() 是无意义的, 因为streaming的job就是要长期运行(监控数据源)的啊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/97073.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[python 刷题] 3 Longest Substring Without Repeating Characters

[python 刷题] 3 Longest Substring Without Repeating Characters 题目&#xff1a; Given a string s, find the length of the longest substring without repeating characters. 这到提要求找的是最长的&#xff0c;没有重复符号的子字符串 解题思路是用双指针哈希表&…

mysql MVCC(多版本并发控制)理解

最近看MVCC相关资料&#xff0c;这边做一个记录总结&#xff0c;方便后续理解。 目录 一、MVCC相关概念 二、MVCC实现原理 1.隐藏字段 2.undo log 3.Read View 4.MVCC的整体处理流程 5. RC&#xff0c;RR级级别下的innoDB快照读有什么不同 6.总结 一、MVCC相关概念 1…

华为云云耀云服务器L实例评测|部署私有网盘 Nextcloud

华为云云耀云服务器L实例评测&#xff5c;部署私有网盘 Nextcloud 一、云耀云服务器L实例介绍1.1 云服务器介绍1.2 产品规格1.3 应用场景 二、云耀云服务器L实例配置2.1 重置密码2.2 服务器连接2.3 安全组配置 三、部署 Nextcloud3.1 Nextcloud 介绍3.2 Docker 环境搭建3.3 Nex…

LeetCode 面试题 08.06. 汉诺塔问题

文章目录 一、题目二、C# 题解 一、题目 在经典汉诺塔问题中&#xff0c;有 3 根柱子及 N 个不同大小的穿孔圆盘&#xff0c;盘子可以滑入任意一根柱子。一开始&#xff0c;所有盘子自上而下按升序依次套在第一根柱子上(即每一个盘子只能放在更大的盘子上面)。移动圆盘时受到以…

服务器的外网IP查阅方式

服务器的外网IP查阅方式 linux服务器查询外网ip地址&#xff0c;可以有以下三种方式查阅 查看服务器的外网IP 1、curl cip.cc IP : 183.1X.2XX.88 地址 : 中国 广东 深圳 运营商 : 电信 数据二 : 广东省深圳市 | 电信 数据三 : 中国广东省深圳市 | 电信 2、curl myip.ipip.n…

mysql面试题23:如果某个表有近千万数据,CRUD比较慢,如何优化?

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:如果某个表有近千万数据,CRUD比较慢,如何优化? 当某个表存在近千万数据且CRUD(增删改查)操作比较慢时,可以考虑以下优化策略: 使用索引:索…

数百个下载能够传播 Rootkit 的恶意 NPM 软件包

供应链安全公司 ReversingLabs 警告称&#xff0c;最近观察到的一次恶意活动依靠拼写错误来诱骗用户下载恶意 NPM 软件包&#xff0c;该软件包会通过 rootkit 感染他们的系统。 该恶意软件包名为“node-hide-console-windows”&#xff0c;旨在模仿 NPM 存储库上合法的“node-…

Java中使用正则表达式

正则表达式 正则表达式&#xff08;Regular Expression&#xff09;是一种用于匹配、查找和替换文本的强大工具。它由一系列字符和特殊字符组成&#xff0c;可以用来描述字符串的模式。在编程和文本处理中&#xff0c;正则表达式常被用于验证输入、提取信息、搜索和替换文本等…

使用 Apache Camel 和 Quarkus 的微服务(一)

【squids.cn】 全网zui低价RDS&#xff0c;免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 ​Apache Camel 绝非Java企业技术栈领域的新手。它由James Strachan在2007年创建&#xff0c;旨在实现著名的 "EIP 书"&#xff08;由Gregor Hohpe和Bobby W…

2.5 数字传输系统

笔记&#xff1a; 针对这一节的内容&#xff0c;我为您提供一个笔记的整理方法。将内容按重要性、逻辑关系进行组织&#xff0c;再进行简化。 ## 2.5 数字传输系统 ### 背景介绍&#xff1a; 1. **早期电话网**&#xff1a;市话局到用户采用双绞线电缆&#xff0c;长途干线采…

汇总开源大模型的本地API启动方式

文章目录 CodeGeex2ChatGLM2_6BBaichuan2_13Bsqlcoder开启后测试 CodeGeex2 from fastapi import FastAPI, Request from transformers import AutoTokenizer, AutoModel import uvicorn, json, datetime import torch import argparse try:import chatglm_cppenable_chatglm_…

git stash详解

stash:保存现场 1.建议&#xff08;规范&#xff09; &#xff1a;在功能未没有开发完毕前&#xff0c;不要commit 2.规定&#xff08;必须&#xff09; &#xff1a; 在没有commit之前&#xff0c;不能chekcout切换分支 &#xff08;不在同一个commit阶段&#xff09; 如果还没…

缓存与数据库双写一致性问题解决方案

其实如果使用缓存&#xff0c;就会出现缓存和数据库的不一致问题&#xff0c;关键在于我们可以接受不一致的时间是多少&#xff0c;根据不同的需求采取不同的实现方案。 第一种&#xff1a;先更新数据库后更新缓存 做法简单&#xff0c;但是并发写情况下&#xff0c;会出现数…

Mac安装GYM遇到的一些坑

以下是遇到的一些问题 安装GitHub上说的直接 pip install gym成功了&#xff0c;但是运行实例报错没安装gym[classic_control]&#xff0c;所以就全安装一下[all] 安装GitHub上说的直接 pip install gym成功了&#xff0c;但是运行实例报错没安装gym[classic_control]&#xff…

nodejs+vue快递管理服务系统elementui

电子商务改变了人们的传统经济活动中的交易方式和流通技术&#xff0c; 复杂的物流快递信息需要有效的进行处理&#xff0c;构建一个快递管理服务系统可以确保物流信息的一致性、员工登录&#xff1a;通过用户名和密码登录。这也间接带动了物流快递行业的高速发展。 &#xff0…

运行huggingface Kosmos2报错 nameerror: name ‘kosmos2tokenizer‘ is not defined

尝试运行huggingface上的Kosmos,https://huggingface.co/ydshieh/kosmos-2-patch14-224失败,报错: nameerror: name kosmos2tokenizer is not defined查看报错代码: vi /root/.cache/huggingface/modules/transformers_modules/ydshieh/kosmos-2-patch14-224/48e3edebaeb…

用rabbitMq 怎么处理“延迟消息队列”?

延迟消息队列是一种允许消息在发送后等待一段时间&#xff0c;然后再被消费的机制。这种机制通常用于需要延迟处理的应用场景&#xff0c;如定时任务、消息重试、消息调度等。在 RabbitMQ 中&#xff0c;实现延迟消息队列需要使用一些额外的组件和技术&#xff0c;因为 RabbitM…

一天一八股——TCP保活keepalive和HTTP的Keep-Alive

TCP属于传输层&#xff0c;关于TCP的设置在内核态完成 HTTP属于用户层的协议&#xff0c;主要用于web服务器和浏览器之间的 http的Keep-Alive都是为了减少多次建立tcp连接采用的保持长连接的机制&#xff0c;而tcp的keepalive是为了保证已经建立的tcp连接依旧可用(双端依旧可以…

KylinOSv10系统k8s集群启动mysql5.7占用内存高的问题

问题现象 麒麟系统搭建k8s集群 mysql的pod启动失败 describe查看ommkill&#xff0c;放大limit资源限制到30G依旧启动失败 系统 报错信息 原因 内存占用太高 open_files_limit初始化太高 解决&#xff1a; 1、更换镜像 链接: https://pan.baidu.com/s/1b9uJLcc5Os0uDqD1e…

3. 无重复字符的最长子串(枚举+滑动窗口)

目录 一、题目 二、代码 一、题目 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 二、代码 class Solution { public:int lengthOfLongestSubstring(string s) {int _MaxLength 0;int left 0, right 0;vector<int>hash(128, 0);//ASCII…