分布式的消息流平台之Pulsar

Pulsar 流处理详解

Apache Pulsar 是一个分布式的消息流平台,集成了**消息队列(MQ)流处理(Stream Processing)**能力。Pulsar 不仅提供低延迟、高吞吐的消息传输能力,还支持基于 Pulsar Functions、Flink、Spark Streaming 的流式处理能力。

本篇详细介绍 Pulsar 的流处理能力,涵盖 核心概念、流处理模式、编程模型、集成生态、应用场景 等方面。


1. Pulsar 流处理概述

(1)Pulsar 的流处理能力

Pulsar 主要通过以下方式实现流处理:

  • Pulsar Functions:轻量级流处理框架,适用于简单的 ETL、数据转换、事件处理等任务。
  • Flink & Spark Streaming 集成:Pulsar 提供 Flink 和 Spark Streaming 连接器,支持复杂流处理任务,如窗口计算、数据聚合、模式匹配等。
  • Pulsar IO:内置的 Source/Sink 连接器,支持数据流的输入输出,如 Kafka、Elasticsearch、JDBC、HDFS 等。

(2)Pulsar 流处理 VS 传统流处理

特性Pulsar FunctionsFlink on PulsarKafka Streams
复杂度低(适合轻量任务)高(适合复杂任务)中等(偏向事件流处理)
集成性内置在 Pulsar 中需集成 Flink/Spark依赖 Kafka
扩展性高(自动扩展)高(分布式计算)中等(依赖 Kafka 集群)
窗口计算支持基本窗口计算强大,支持滚动、滑动、会话窗口支持窗口操作

2. Pulsar 流处理核心概念

(1)Pulsar Functions

Pulsar Functions 是一种轻量级计算框架,专为 Pulsar 设计,允许开发者编写无状态(Stateless)或有状态(Stateful)的流处理逻辑,并直接运行在 Pulsar 集群中,而无需额外的计算框架(如 Flink 或 Spark)。

Pulsar Functions 关键特性
  • 轻量级:无需外部计算框架,适用于简单任务。
  • 原生集成:与 Pulsar 主题(Topic)无缝对接,延迟低。
  • 内置管理:支持负载均衡、故障恢复。
  • 支持多种语言:可用 Java、Python、Go 编写。
Pulsar Functions 编程模型

Pulsar Functions 的计算逻辑类似于 map-reduce,用户编写 Function(函数) 处理输入数据,并将结果写入另一个 Pulsar 主题。

示例:Java 版 Pulsar Function

public class MyFunction implements Function<String, String> {@Overridepublic String process(String input, Context context) {return input.toUpperCase(); // 处理逻辑:转换为大写}
}

注册 Pulsar Function:

pulsar-admin functions create \--tenant public --namespace default \--name my-function \--inputs persistent://public/default/input-topic \--output persistent://public/default/output-topic \--classname MyFunction \--jar my-function.jar

(2)Pulsar IO

Pulsar IO 提供了开箱即用的 Source(数据源)和 Sink(数据输出)连接器,允许 Pulsar 作为数据流的中心,连接各种外部存储和计算系统。

常见 Source/Sink 连接器
类型连接器示例
数据库MySQL、PostgreSQL、MongoDB
消息系统Kafka、RabbitMQ
存储系统HDFS、S3、Elasticsearch
计算引擎Flink、Spark

示例:启动一个 Kafka Source 连接器

pulsar-admin sources create \--name kafka-source \--tenant public --namespace default \--source-type kafka \--destination-topic-name persistent://public/default/kafka-topic \--source-config '{"bootstrapServers": "kafka-broker:9092","topic": "source-topic"}'

(3)Pulsar + Flink/Spark Streaming

Pulsar 也可作为 Flink / Spark Streaming 的流式数据源,支持复杂计算,如:

  • 窗口计算(Tumbling, Sliding, Session Window)
  • 聚合计算(sum, avg, count)
  • 状态管理(Stateful Processing)
  • 事件模式检测(CEP)

示例:Flink Pulsar 读取流数据

PulsarSource<String> source = PulsarSource.builder().setServiceUrl("pulsar://localhost:6650").setTopics("persistent://public/default/input-topic").setDeserializationSchema(SimpleStringSchema.class).build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar Source");stream.map(value -> value.toUpperCase()).print();env.execute();

3. Pulsar 流处理的运行模式

Pulsar Functions 支持三种运行模式:

运行模式说明
本地模式(LocalRun)在本地测试和运行 Functions
进程模式(Process)在 Pulsar Worker 进程中独立运行
Kubernetes 模式(K8s)在 Kubernetes 集群中运行 Pulsar Functions

示例:在 Kubernetes 上运行 Pulsar Function

pulsar-admin functions create \--name my-k8s-function \--runtime JAVA \--inputs persistent://public/default/input-topic \--output persistent://public/default/output-topic \--parallelism 3 \--jar my-function.jar \--kubernetes-namespace pulsar

4. Pulsar 流处理应用场景

(1)实时数据流处理

  • 实时 ETL:流式数据清洗、转换,存入数据湖或数据仓库(Iceberg、Doris)。
  • 用户行为分析:分析用户操作日志,计算热点数据。

(2)事件驱动架构(EDA)

  • 金融风控:实时监控交易流,检测欺诈行为。
  • IoT 监控:处理物联网传感器数据,异常报警。

(3)数据同步 & 数据管道

  • CDC 数据同步:从 MySQL/PostgreSQL 读取变更数据,实时写入 Pulsar 供下游消费。
  • 消息系统桥接:Kafka → Pulsar → Flink,实现高效流数据处理。

5. 总结

Pulsar 提供强大的流处理能力,主要包括:

  1. Pulsar Functions(轻量级流处理)
  2. Pulsar IO(数据连接器)
  3. Flink / Spark Streaming(复杂流计算)
  4. 多种运行模式(Local、Process、K8s)

Pulsar 适用于高吞吐、低延迟的流式数据处理场景,可用于数据管道、事件驱动架构、实时分析等领域。

如果你的应用场景需要 流处理 + 消息队列,Pulsar 是一个值得考虑的方案!🚀

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/73914.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【C++多线程】thread

C中的std::thread是C11引入的线程库的一部分&#xff0c;提供了创建和管理线程的能力。它封装了操作系统的线程接口&#xff0c;使得在C中更方便地进行多线程编程。 1. std::thread 的定义 std::thread 类位于<thread>头文件中&#xff0c;定义在std命名空间下&#xff…

【css酷炫效果】纯CSS实现故障文字特效

【css酷炫效果】纯CSS实现故障文字特效 缘创作背景html结构css样式完整代码基础版进阶版(3D效果) 效果图 想直接拿走的老板&#xff0c;链接放在这里&#xff1a;https://download.csdn.net/download/u011561335/90492053 缘 创作随缘&#xff0c;不定时更新。 创作背景 刚…

uniapp配置代理解决跨域问题

两种方式&#xff1a; 1、manifest.json中配置 "h5" : {"template" : "static/index.html","devServer" : {"port" : 9090,"https" : false,"proxy":{"/prod-api":{"target":&quo…

物联网为什么用MQTT不用 HTTP 或 UDP?

先来两个代码对比&#xff0c;上传温度数据给服务器。 MQTT代码示例 // MQTT 客户端连接到 MQTT 服务器 mqttClient.connect("mqtt://broker.server.com:8883", clientId) // 订阅特定主题 mqttClient.subscribe("sensor/data", qos1) // …

Flutter:页面滚动,导航栏背景颜色过渡动画

记录&#xff1a;导航默认透明&#xff0c;页面发生滚动后&#xff0c;导航背景色由0-1&#xff0c;过渡到白色背景。 view import package:ducafe_ui_core/ducafe_ui_core.dart; import package:flutter/material.dart; import package:get/get.dart; import package:redo…

STM32 —— MCU、MPU、ARM、FPGA、DSP

在嵌入式系统中&#xff0c;MCU、MPU、ARM、FPGA和DSP是核心组件&#xff0c;各自在架构、功能和应用场景上有显著差异。以下从专业角度详细解析这些概念&#xff1a; 一、 MCU&#xff08;Microcontroller Unit&#xff0c;微控制器单元&#xff09; 核心定义 集成系统芯片&a…

批量删除 PPT 空白幻灯片页面

如果我们需要删除 PPT 文档中的空白幻灯片页面&#xff0c;我们可以借助 Office 工具来完成&#xff0c;但是如果是大量的 PPT 文档需要批量删除空白幻灯片页面&#xff0c;那就需要使用专业的批量处理工具来完成&#xff0c;今天就给大家介绍一种批量删除 PPT 空白幻灯片页面的…

【canvas】一键自动布局:如何让流程图节点自动找到最佳位置

一键自动布局&#xff1a;如何让流程图节点自动找到最佳位置 引言 在流程图、拓扑图和系统架构图设计中&#xff0c;节点布局往往是最令人头疼的问题。如果手动调整每个节点位置&#xff0c;不仅耗时费力&#xff0c;还难以保证美观性和一致性。本文将深入解析如何实现自动布…

【平台优化】大数据集群一个客户端参数引起的任务性能差的问题

大数据集群一个客户端参数引起的任务性能差的问题 背景介绍排查过程任务慢的具体原因Executor中数据内存往磁盘溢写结果数据写入分区路径 分析解决方案 结语&思考 背景介绍 随着业务量不断扩大&#xff0c;平台逐步发展成HDFS多联邦的架构&#xff0c;这个过程中&#xff…

【微信小程序变通实现DeepSeek支持语音】

微信小程序实现录音转文字&#xff0c;并调用后端服务&#xff08;Node.js&#xff09;进行语音识别和&#xff0c;然后调用DeepSeek 处理的完整实现。 整体架构 前端&#xff08;微信小程序&#xff09;&#xff1a; 实现录音功能。将录音文件上传到后端。接收后端返回的语音…

uniapp常用组件

写在前面 今天将uniapp中的组件都过了一遍&#xff0c;上手难度不大&#xff0c;但是还是遇到了一些问题&#xff1a; HBuilder实在是太难用&#xff0c;不管是插件生态还是设计之类的&#xff0c;总之就是用的哪哪不顺手虽然打开内置浏览器是挺方便的&#xff0c;但是不知道…

【Linux】应用层自定义协议 + 序列化和反序列化

应用层自定义协议 序列化和反序列化 一.应用层1.再谈 "协议"2.序列化 和 反序列化 二. Jsoncpp1.序列化2.反序列化 三. Tcp全双工 面向字节流四.自定义协议 保证报文的完整性1.Makefile2.Mutex.hpp3.Cond.hpp4.Log.hpp5.Thread.hpp6.ThreadPool.hpp7.Common.hpp8.…

二.使用ffmpeg对原始音频数据重采样并进行AAC编码

重采样&#xff1a;将音频三元组【采样率 采样格式 通道数】之中的任何一个或者多个值改变。 一.为什么要进行重采样&#xff1f; 1.原始音频数据和编码器的数据格式不一致 2.播放器要求的和获取的数据不一致 3.方便运算 二.本次编码流程 1.了解自己本机麦克风参数&#x…

器材借用管理系统详细设计基于Spring Boot-SSM

‌ 目录 ‌摘要 一、系统概述‌ ‌二、系统架构设计‌ 2‌.1技术选型‌ ‌2.2系统架构‌ ‌三、需求分析 3.1用户需求分析 3.2功能模块设计‌ 3.3、性能需求分析 3.4、安全需求分析 ‌四、数据库设计‌ ‌五、安全性设计‌ ‌六、系统测试与维护‌ ‌七、总结‌…

麒麟V10 arm cpu aarch64 下编译 RocketMQ-Client-CPP 2.2.0

国产自主可控服务器需要访问RocketMQ消息队列&#xff0c;最新的CSDK是2020年发布的 rocketmq-client-cpp-2.2.0 这个版本支持TLS模式。 用默认的版本安装遇到一些问题&#xff0c;记录一下。 下载Releases apache/rocketmq-client-cpp GitHubhttps://github.com/apache/roc…

C语言每日一练——day_12(最后一天)

引言 针对初学者&#xff0c;每日练习几个题&#xff0c;快速上手C语言。第十二天。&#xff08;最后一天&#xff0c;完结散花啦&#xff09; 采用在线OJ的形式 什么是在线OJ&#xff1f; 在线判题系统&#xff08;英语&#xff1a;Online Judge&#xff0c;缩写OJ&#xff0…

网络安全应急入门到实战

奇安信&#xff1a;95015网络安全应急响应分析报告&#xff08;2022-2024年&#xff09;官网可以下载 https://github.com/Bypass007/Emergency-Response-Notes 应急响应实战笔记 网络安全应急响应技术实战指南 .pdf 常见场景 第4章 勒索病毒网络安全应急响应 第5章 挖矿木…

jvm中每个类的Class对象是唯一的吗

jvm中每个类的Class对象是唯一的吗 在 Java 中&#xff0c;同一个类的 Class 对象在由同一个类加载器加载时是唯一的。析&#xff1a; 1. 同一类加载器的唯一性 规则&#xff1a;若一个类被同一个类加载器加载&#xff0c;无论创建多少实例&#xff0c;其 Class 对象始终唯一…

Visual Studio里的调试(debugging)功能介绍

参考 1- Introduction to Debugging | Basic Visual Studio Debugging&#xff08;这是一位印度博主视频&#xff0c;我下面做到笔记也主要参考她的视频&#xff0c;但不得不说口音太重了&#xff0c;一股咖喱味&#xff09; 目录 个人对调试浅显的认识和对调试的介绍逐行调…

NLP高频面试题(六)——decoder-only、encoder-only和encoder-decoder的区别与联系

一、基本概念与代表模型 1. Encoder-only 架构 Encoder-only 架构最具代表性的模型是 BERT。BERT 使用 masked language modeling&#xff08;MLM&#xff09;进行预训练&#xff0c;即随机遮蔽部分输入词汇&#xff0c;让模型预测被遮蔽的词汇。由于这种架构能够同时看到输入…